7 Commits

Author SHA1 Message Date
Yuyan Zhou
aad11371fd add go modules 2019-04-24 14:52:44 +08:00
Yuyan Zhou
c7c59d5a81 fix pool for message order 2019-04-22 10:57:46 +08:00
Yuyan Zhou
edc46c1ee6 remove publish message check 2019-04-22 10:21:14 +08:00
joyz
6193be74fa Merge branch 'master' of https://github.com/fhmq/hmq 2019-01-22 22:11:59 +08:00
joyz
90beada459 some modify 2019-01-22 22:11:54 +08:00
Marc Magnin
6c7fe6a0f7 simple fix (#35) 2019-01-07 19:56:00 +08:00
joyz
2b56664d85 remove no use 2018-12-27 21:22:32 +08:00
10 changed files with 272 additions and 276 deletions

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"net"
"net/http"
_ "net/http/pprof"
"runtime/debug"
"sync"
"sync/atomic"
@@ -103,7 +104,7 @@ func NewBroker(config *Config) (*Broker, error) {
return b, nil
}
func (b *Broker) SubmitWork(msg *Message) {
func (b *Broker) SubmitWork(clientId string, msg *Message) {
if b.wpool == nil {
b.wpool = pool.New(b.config.Worker)
}
@@ -111,7 +112,7 @@ func (b *Broker) SubmitWork(msg *Message) {
if msg.client.typ == CLUSTER {
b.clusterPool <- msg
} else {
b.wpool.Submit(func() {
b.wpool.Submit(clientId, func() {
ProcessMessage(msg)
})
}
@@ -153,6 +154,16 @@ func (b *Broker) Start() {
//system monitor
go StateMonitor()
if b.config.Debug {
startPProf()
}
}
func startPProf() {
go func() {
http.ListenAndServe(":10060", nil)
}()
}
func StateMonitor() {
@@ -311,6 +322,9 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
log.Error("received msg that was not Connect")
return
}
log.Info("reconnect connect from ", zap.String("clientID", msg.ClientIdentifier))
connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket)
connack.ReturnCode = packets.Accepted
connack.SessionPresent = msg.CleanSession
@@ -617,7 +631,9 @@ func (b *Broker) removeClient(c *client) {
func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
var subs []interface{}
var qoss []byte
b.mu.Lock()
err := b.topicsMgr.Subscribers([]byte(packet.TopicName), packet.Qos, &subs, &qoss)
b.mu.Unlock()
if err != nil {
log.Error("search sub client error, ", zap.Error(err))
return

View File

@@ -51,11 +51,6 @@ type client struct {
rmsgs []*packets.PublishPacket
}
type subInfo struct {
sub *subscription
num int
}
type subscription struct {
client *client
topic string
@@ -116,7 +111,7 @@ func (c *client) readLoop() {
if err != nil {
log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
msg := &Message{client: c, packet: DisconnectdPacket}
b.SubmitWork(msg)
b.SubmitWork(c.info.clientID, msg)
return
}
@@ -124,7 +119,7 @@ func (c *client) readLoop() {
client: c,
packet: packet,
}
b.SubmitWork(msg)
b.SubmitWork(c.info.clientID, msg)
}
}
@@ -202,9 +197,6 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
}
func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if c.status == Disconnected {
return
}
b := c.broker
if b == nil {
@@ -218,7 +210,9 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
}
}
c.mu.Lock()
err := c.topicsMgr.Subscribers([]byte(packet.TopicName), packet.Qos, &c.subs, &c.qoss)
c.mu.Unlock()
if err != nil {
log.Error("Error retrieving subscribers list: ", zap.String("ClientID", c.info.clientID))
return
@@ -367,7 +361,7 @@ func (c *client) Close() {
c.status = Disconnected
//wait for message complete
time.Sleep(1 * time.Second)
// time.Sleep(1 * time.Second)
// c.status = Disconnected
if c.conn != nil {

View File

@@ -1,3 +1,5 @@
/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
*/
package broker
import "github.com/eclipse/paho.mqtt.golang/packets"

15
go.mod Normal file
View File

@@ -0,0 +1,15 @@
module github.com/fhmq/hmq
go 1.12
require (
github.com/bitly/go-simplejson v0.5.0
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/fsnotify/fsnotify v1.4.7
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/shirou/gopsutil v2.18.12+incompatible
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.9.1
golang.org/x/net v0.0.0-20190424024845-afe8014c977f
)

22
go.sum Normal file
View File

@@ -0,0 +1,22 @@
github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkNEM+Y=
github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e h1:uO75wNGioszjmIzcY/tvdDYKRLVvzggtAmmJkn9j4GQ=
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e/go.mod h1:tm/wZFQ8e24NYaBGIlnO2WGCAi67re4HHuOm0sftE/M=
github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM=
github.com/shirou/gopsutil v2.18.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190424024845-afe8014c977f h1:uALRiwYevCJtciRa4mKKFkrs5jY4F2OTf1D2sfi1swY=
golang.org/x/net v0.0.0-20190424024845-afe8014c977f/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View File

@@ -1,17 +1,3 @@
// Copyright (c) 2014 The SurgeMQ Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sessions
import (

View File

@@ -1,95 +0,0 @@
package sessions
import (
"time"
log "github.com/cihub/seelog"
"github.com/go-redis/redis"
jsoniter "github.com/json-iterator/go"
)
var redisClient *redis.Client
var _ SessionsProvider = (*redisProvider)(nil)
const (
sessionName = "session"
)
type redisProvider struct {
}
func init() {
Register("redis", NewRedisProvider())
}
func InitRedisConn(url string) {
redisClient = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 0, // use default DB
})
err := redisClient.Ping().Err()
for err != nil {
log.Error("connect redis error: ", err, " 3s try again...")
time.Sleep(3 * time.Second)
err = redisClient.Ping().Err()
}
}
func NewRedisProvider() *redisProvider {
return &redisProvider{}
}
func (r *redisProvider) New(id string) (*Session, error) {
val, _ := jsoniter.Marshal(&Session{id: id})
err := redisClient.HSet(sessionName, id, val).Err()
if err != nil {
return nil, err
}
result, err := redisClient.HGet(sessionName, id).Bytes()
if err != nil {
return nil, err
}
sess := Session{}
err = jsoniter.Unmarshal(result, &sess)
if err != nil {
return nil, err
}
return &sess, nil
}
func (r *redisProvider) Get(id string) (*Session, error) {
result, err := redisClient.HGet(sessionName, id).Bytes()
if err != nil {
return nil, err
}
sess := Session{}
err = jsoniter.Unmarshal(result, &sess)
if err != nil {
return nil, err
}
return &sess, nil
}
func (r *redisProvider) Del(id string) {
redisClient.HDel(sessionName, id)
}
func (r *redisProvider) Save(id string) error {
return nil
}
func (r *redisProvider) Count() int {
return int(redisClient.HLen(sessionName).Val())
}
func (r *redisProvider) Close() error {
return redisClient.Del(sessionName).Err()
}

10
main.go
View File

@@ -7,7 +7,7 @@ copyright notice and this permission notice appear in all copies.
package main
import (
"fmt"
"log"
"os"
"os/signal"
"runtime"
@@ -19,19 +19,17 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
config, err := broker.ConfigureConfig(os.Args[1:])
if err != nil {
fmt.Println("configure broker config error: ", err)
return
log.Fatal("configure broker config error: ", err)
}
b, err := broker.NewBroker(config)
if err != nil {
fmt.Println("New Broker error: ", err)
return
log.Fatal("New Broker error: ", err)
}
b.Start()
s := waitForSignal()
fmt.Println("signal received, broker closed.", s)
log.Println("signal received, broker closed.", s)
}
func waitForSignal() os.Signal {

58
pool/fixpool.go Normal file
View File

@@ -0,0 +1,58 @@
package pool
import (
"github.com/segmentio/fasthash/fnv1a"
)
type WorkerPool struct {
maxWorkers int
taskQueue []chan func()
stoppedChan chan struct{}
}
func New(maxWorkers int) *WorkerPool {
// There must be at least one worker.
if maxWorkers < 1 {
maxWorkers = 1
}
// taskQueue is unbuffered since items are always removed immediately.
pool := &WorkerPool{
taskQueue: make([]chan func(), maxWorkers),
maxWorkers: maxWorkers,
stoppedChan: make(chan struct{}),
}
// Start the task dispatcher.
pool.dispatch()
return pool
}
func (p *WorkerPool) Submit(uid string, task func()) {
idx := fnv1a.HashString64(uid) % uint64(p.maxWorkers)
if task != nil {
p.taskQueue[idx] <- task
}
}
func (p *WorkerPool) dispatch() {
for i := 0; i < p.maxWorkers; i++ {
p.taskQueue[i] = make(chan func())
go startWorker(p.taskQueue[i])
}
}
func startWorker(taskChan chan func()) {
go func() {
var task func()
var ok bool
for {
task, ok = <-taskChan
if !ok {
break
}
// Execute the task.
task()
}
}()
}

View File

@@ -1,166 +1,166 @@
package pool
import "time"
// import "time"
const (
// This value is the size of the queue that workers register their
// availability to the dispatcher. There may be hundreds of workers, but
// only a small channel is needed to register some of the workers.
readyQueueSize = 16
// const (
// // This value is the size of the queue that workers register their
// // availability to the dispatcher. There may be hundreds of workers, but
// // only a small channel is needed to register some of the workers.
// readyQueueSize = 16
// If worker pool receives no new work for this period of time, then stop
// a worker goroutine.
idleTimeoutSec = 5
)
// // If worker pool receives no new work for this period of time, then stop
// // a worker goroutine.
// idleTimeoutSec = 5
// )
type WorkerPool struct {
maxWorkers int
timeout time.Duration
taskQueue chan func()
readyWorkers chan chan func()
stoppedChan chan struct{}
}
// type WorkerPool struct {
// maxWorkers int
// timeout time.Duration
// taskQueue chan func()
// readyWorkers chan chan func()
// stoppedChan chan struct{}
// }
func New(maxWorkers int) *WorkerPool {
// There must be at least one worker.
if maxWorkers < 1 {
maxWorkers = 1
}
// func New(maxWorkers int) *WorkerPool {
// // There must be at least one worker.
// if maxWorkers < 1 {
// maxWorkers = 1
// }
// taskQueue is unbuffered since items are always removed immediately.
pool := &WorkerPool{
taskQueue: make(chan func()),
maxWorkers: maxWorkers,
readyWorkers: make(chan chan func(), readyQueueSize),
timeout: time.Second * idleTimeoutSec,
stoppedChan: make(chan struct{}),
}
// // taskQueue is unbuffered since items are always removed immediately.
// pool := &WorkerPool{
// taskQueue: make(chan func()),
// maxWorkers: maxWorkers,
// readyWorkers: make(chan chan func(), readyQueueSize),
// timeout: time.Second * idleTimeoutSec,
// stoppedChan: make(chan struct{}),
// }
// Start the task dispatcher.
go pool.dispatch()
// // Start the task dispatcher.
// go pool.dispatch()
return pool
}
// return pool
// }
func (p *WorkerPool) Stop() {
if p.Stopped() {
return
}
close(p.taskQueue)
<-p.stoppedChan
}
// func (p *WorkerPool) Stop() {
// if p.Stopped() {
// return
// }
// close(p.taskQueue)
// <-p.stoppedChan
// }
func (p *WorkerPool) Stopped() bool {
select {
case <-p.stoppedChan:
return true
default:
}
return false
}
// func (p *WorkerPool) Stopped() bool {
// select {
// case <-p.stoppedChan:
// return true
// default:
// }
// return false
// }
func (p *WorkerPool) Submit(task func()) {
if task != nil {
p.taskQueue <- task
}
}
// func (p *WorkerPool) Submit(task func()) {
// if task != nil {
// p.taskQueue <- task
// }
// }
func (p *WorkerPool) SubmitWait(task func()) {
if task == nil {
return
}
doneChan := make(chan struct{})
p.taskQueue <- func() {
task()
close(doneChan)
}
<-doneChan
}
// func (p *WorkerPool) SubmitWait(task func()) {
// if task == nil {
// return
// }
// doneChan := make(chan struct{})
// p.taskQueue <- func() {
// task()
// close(doneChan)
// }
// <-doneChan
// }
func (p *WorkerPool) dispatch() {
defer close(p.stoppedChan)
timeout := time.NewTimer(p.timeout)
var workerCount int
var task func()
var ok bool
var workerTaskChan chan func()
startReady := make(chan chan func())
Loop:
for {
timeout.Reset(p.timeout)
select {
case task, ok = <-p.taskQueue:
if !ok {
break Loop
}
// Got a task to do.
select {
case workerTaskChan = <-p.readyWorkers:
// A worker is ready, so give task to worker.
workerTaskChan <- task
default:
// No workers ready.
// Create a new worker, if not at max.
if workerCount < p.maxWorkers {
workerCount++
go func(t func()) {
startWorker(startReady, p.readyWorkers)
// Submit the task when the new worker.
taskChan := <-startReady
taskChan <- t
}(task)
} else {
// Start a goroutine to submit the task when an existing
// worker is ready.
go func(t func()) {
taskChan := <-p.readyWorkers
taskChan <- t
}(task)
}
}
case <-timeout.C:
// Timed out waiting for work to arrive. Kill a ready worker.
if workerCount > 0 {
select {
case workerTaskChan = <-p.readyWorkers:
// A worker is ready, so kill.
close(workerTaskChan)
workerCount--
default:
// No work, but no ready workers. All workers are busy.
}
}
}
}
// func (p *WorkerPool) dispatch() {
// defer close(p.stoppedChan)
// timeout := time.NewTimer(p.timeout)
// var workerCount int
// var task func()
// var ok bool
// var workerTaskChan chan func()
// startReady := make(chan chan func())
// Loop:
// for {
// timeout.Reset(p.timeout)
// select {
// case task, ok = <-p.taskQueue:
// if !ok {
// break Loop
// }
// // Got a task to do.
// select {
// case workerTaskChan = <-p.readyWorkers:
// // A worker is ready, so give task to worker.
// workerTaskChan <- task
// default:
// // No workers ready.
// // Create a new worker, if not at max.
// if workerCount < p.maxWorkers {
// workerCount++
// go func(t func()) {
// startWorker(startReady, p.readyWorkers)
// // Submit the task when the new worker.
// taskChan := <-startReady
// taskChan <- t
// }(task)
// } else {
// // Start a goroutine to submit the task when an existing
// // worker is ready.
// go func(t func()) {
// taskChan := <-p.readyWorkers
// taskChan <- t
// }(task)
// }
// }
// case <-timeout.C:
// // Timed out waiting for work to arrive. Kill a ready worker.
// if workerCount > 0 {
// select {
// case workerTaskChan = <-p.readyWorkers:
// // A worker is ready, so kill.
// close(workerTaskChan)
// workerCount--
// default:
// // No work, but no ready workers. All workers are busy.
// }
// }
// }
// }
// Stop all remaining workers as they become ready.
for workerCount > 0 {
workerTaskChan = <-p.readyWorkers
close(workerTaskChan)
workerCount--
}
}
// // Stop all remaining workers as they become ready.
// for workerCount > 0 {
// workerTaskChan = <-p.readyWorkers
// close(workerTaskChan)
// workerCount--
// }
// }
func startWorker(startReady, readyWorkers chan chan func()) {
go func() {
taskChan := make(chan func())
var task func()
var ok bool
// Register availability on starReady channel.
startReady <- taskChan
for {
// Read task from dispatcher.
task, ok = <-taskChan
if !ok {
// Dispatcher has told worker to stop.
break
}
// func startWorker(startReady, readyWorkers chan chan func()) {
// go func() {
// taskChan := make(chan func())
// var task func()
// var ok bool
// // Register availability on starReady channel.
// startReady <- taskChan
// for {
// // Read task from dispatcher.
// task, ok = <-taskChan
// if !ok {
// // Dispatcher has told worker to stop.
// break
// }
// Execute the task.
task()
// // Execute the task.
// task()
// Register availability on readyWorkers channel.
readyWorkers <- taskChan
}
}()
}
// // Register availability on readyWorkers channel.
// readyWorkers <- taskChan
// }
// }()
// }