2 Commits

Author SHA1 Message Date
Yuyan Zhou
aad11371fd add go modules 2019-04-24 14:52:44 +08:00
Yuyan Zhou
c7c59d5a81 fix pool for message order 2019-04-22 10:57:46 +08:00
7 changed files with 251 additions and 248 deletions

View File

@@ -104,7 +104,7 @@ func NewBroker(config *Config) (*Broker, error) {
return b, nil return b, nil
} }
func (b *Broker) SubmitWork(msg *Message) { func (b *Broker) SubmitWork(clientId string, msg *Message) {
if b.wpool == nil { if b.wpool == nil {
b.wpool = pool.New(b.config.Worker) b.wpool = pool.New(b.config.Worker)
} }
@@ -112,7 +112,7 @@ func (b *Broker) SubmitWork(msg *Message) {
if msg.client.typ == CLUSTER { if msg.client.typ == CLUSTER {
b.clusterPool <- msg b.clusterPool <- msg
} else { } else {
b.wpool.Submit(func() { b.wpool.Submit(clientId, func() {
ProcessMessage(msg) ProcessMessage(msg)
}) })
} }
@@ -322,6 +322,9 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
log.Error("received msg that was not Connect") log.Error("received msg that was not Connect")
return return
} }
log.Info("reconnect connect from ", zap.String("clientID", msg.ClientIdentifier))
connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket) connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket)
connack.ReturnCode = packets.Accepted connack.ReturnCode = packets.Accepted
connack.SessionPresent = msg.CleanSession connack.SessionPresent = msg.CleanSession

View File

@@ -111,7 +111,7 @@ func (c *client) readLoop() {
if err != nil { if err != nil {
log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
msg := &Message{client: c, packet: DisconnectdPacket} msg := &Message{client: c, packet: DisconnectdPacket}
b.SubmitWork(msg) b.SubmitWork(c.info.clientID, msg)
return return
} }
@@ -119,7 +119,7 @@ func (c *client) readLoop() {
client: c, client: c,
packet: packet, packet: packet,
} }
b.SubmitWork(msg) b.SubmitWork(c.info.clientID, msg)
} }
} }
@@ -361,7 +361,7 @@ func (c *client) Close() {
c.status = Disconnected c.status = Disconnected
//wait for message complete //wait for message complete
time.Sleep(1 * time.Second) // time.Sleep(1 * time.Second)
// c.status = Disconnected // c.status = Disconnected
if c.conn != nil { if c.conn != nil {

15
go.mod Normal file
View File

@@ -0,0 +1,15 @@
module github.com/fhmq/hmq
go 1.12
require (
github.com/bitly/go-simplejson v0.5.0
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/fsnotify/fsnotify v1.4.7
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/shirou/gopsutil v2.18.12+incompatible
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.9.1
golang.org/x/net v0.0.0-20190424024845-afe8014c977f
)

22
go.sum Normal file
View File

@@ -0,0 +1,22 @@
github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkNEM+Y=
github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e h1:uO75wNGioszjmIzcY/tvdDYKRLVvzggtAmmJkn9j4GQ=
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e/go.mod h1:tm/wZFQ8e24NYaBGIlnO2WGCAi67re4HHuOm0sftE/M=
github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM=
github.com/shirou/gopsutil v2.18.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190424024845-afe8014c977f h1:uALRiwYevCJtciRa4mKKFkrs5jY4F2OTf1D2sfi1swY=
golang.org/x/net v0.0.0-20190424024845-afe8014c977f/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View File

@@ -1,95 +0,0 @@
package sessions
import (
"time"
log "github.com/cihub/seelog"
"github.com/go-redis/redis"
jsoniter "github.com/json-iterator/go"
)
var redisClient *redis.Client
var _ SessionsProvider = (*redisProvider)(nil)
const (
sessionName = "session"
)
type redisProvider struct {
}
func init() {
Register("redis", NewRedisProvider())
}
func InitRedisConn(url string) {
redisClient = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 0, // use default DB
})
err := redisClient.Ping().Err()
for err != nil {
log.Error("connect redis error: ", err, " 3s try again...")
time.Sleep(3 * time.Second)
err = redisClient.Ping().Err()
}
}
func NewRedisProvider() *redisProvider {
return &redisProvider{}
}
func (r *redisProvider) New(id string) (*Session, error) {
val, _ := jsoniter.Marshal(&Session{id: id})
err := redisClient.HSet(sessionName, id, val).Err()
if err != nil {
return nil, err
}
result, err := redisClient.HGet(sessionName, id).Bytes()
if err != nil {
return nil, err
}
sess := Session{}
err = jsoniter.Unmarshal(result, &sess)
if err != nil {
return nil, err
}
return &sess, nil
}
func (r *redisProvider) Get(id string) (*Session, error) {
result, err := redisClient.HGet(sessionName, id).Bytes()
if err != nil {
return nil, err
}
sess := Session{}
err = jsoniter.Unmarshal(result, &sess)
if err != nil {
return nil, err
}
return &sess, nil
}
func (r *redisProvider) Del(id string) {
redisClient.HDel(sessionName, id)
}
func (r *redisProvider) Save(id string) error {
return nil
}
func (r *redisProvider) Count() int {
return int(redisClient.HLen(sessionName).Val())
}
func (r *redisProvider) Close() error {
return redisClient.Del(sessionName).Err()
}

58
pool/fixpool.go Normal file
View File

@@ -0,0 +1,58 @@
package pool
import (
"github.com/segmentio/fasthash/fnv1a"
)
type WorkerPool struct {
maxWorkers int
taskQueue []chan func()
stoppedChan chan struct{}
}
func New(maxWorkers int) *WorkerPool {
// There must be at least one worker.
if maxWorkers < 1 {
maxWorkers = 1
}
// taskQueue is unbuffered since items are always removed immediately.
pool := &WorkerPool{
taskQueue: make([]chan func(), maxWorkers),
maxWorkers: maxWorkers,
stoppedChan: make(chan struct{}),
}
// Start the task dispatcher.
pool.dispatch()
return pool
}
func (p *WorkerPool) Submit(uid string, task func()) {
idx := fnv1a.HashString64(uid) % uint64(p.maxWorkers)
if task != nil {
p.taskQueue[idx] <- task
}
}
func (p *WorkerPool) dispatch() {
for i := 0; i < p.maxWorkers; i++ {
p.taskQueue[i] = make(chan func())
go startWorker(p.taskQueue[i])
}
}
func startWorker(taskChan chan func()) {
go func() {
var task func()
var ok bool
for {
task, ok = <-taskChan
if !ok {
break
}
// Execute the task.
task()
}
}()
}

View File

@@ -1,166 +1,166 @@
package pool package pool
import "time" // import "time"
const ( // const (
// This value is the size of the queue that workers register their // // This value is the size of the queue that workers register their
// availability to the dispatcher. There may be hundreds of workers, but // // availability to the dispatcher. There may be hundreds of workers, but
// only a small channel is needed to register some of the workers. // // only a small channel is needed to register some of the workers.
readyQueueSize = 16 // readyQueueSize = 16
// If worker pool receives no new work for this period of time, then stop // // If worker pool receives no new work for this period of time, then stop
// a worker goroutine. // // a worker goroutine.
idleTimeoutSec = 5 // idleTimeoutSec = 5
) // )
type WorkerPool struct { // type WorkerPool struct {
maxWorkers int // maxWorkers int
timeout time.Duration // timeout time.Duration
taskQueue chan func() // taskQueue chan func()
readyWorkers chan chan func() // readyWorkers chan chan func()
stoppedChan chan struct{} // stoppedChan chan struct{}
} // }
func New(maxWorkers int) *WorkerPool { // func New(maxWorkers int) *WorkerPool {
// There must be at least one worker. // // There must be at least one worker.
if maxWorkers < 1 { // if maxWorkers < 1 {
maxWorkers = 1 // maxWorkers = 1
} // }
// taskQueue is unbuffered since items are always removed immediately. // // taskQueue is unbuffered since items are always removed immediately.
pool := &WorkerPool{ // pool := &WorkerPool{
taskQueue: make(chan func()), // taskQueue: make(chan func()),
maxWorkers: maxWorkers, // maxWorkers: maxWorkers,
readyWorkers: make(chan chan func(), readyQueueSize), // readyWorkers: make(chan chan func(), readyQueueSize),
timeout: time.Second * idleTimeoutSec, // timeout: time.Second * idleTimeoutSec,
stoppedChan: make(chan struct{}), // stoppedChan: make(chan struct{}),
} // }
// Start the task dispatcher. // // Start the task dispatcher.
go pool.dispatch() // go pool.dispatch()
return pool // return pool
} // }
func (p *WorkerPool) Stop() { // func (p *WorkerPool) Stop() {
if p.Stopped() { // if p.Stopped() {
return // return
} // }
close(p.taskQueue) // close(p.taskQueue)
<-p.stoppedChan // <-p.stoppedChan
} // }
func (p *WorkerPool) Stopped() bool { // func (p *WorkerPool) Stopped() bool {
select { // select {
case <-p.stoppedChan: // case <-p.stoppedChan:
return true // return true
default: // default:
} // }
return false // return false
} // }
func (p *WorkerPool) Submit(task func()) { // func (p *WorkerPool) Submit(task func()) {
if task != nil { // if task != nil {
p.taskQueue <- task // p.taskQueue <- task
} // }
} // }
func (p *WorkerPool) SubmitWait(task func()) { // func (p *WorkerPool) SubmitWait(task func()) {
if task == nil { // if task == nil {
return // return
} // }
doneChan := make(chan struct{}) // doneChan := make(chan struct{})
p.taskQueue <- func() { // p.taskQueue <- func() {
task() // task()
close(doneChan) // close(doneChan)
} // }
<-doneChan // <-doneChan
} // }
func (p *WorkerPool) dispatch() { // func (p *WorkerPool) dispatch() {
defer close(p.stoppedChan) // defer close(p.stoppedChan)
timeout := time.NewTimer(p.timeout) // timeout := time.NewTimer(p.timeout)
var workerCount int // var workerCount int
var task func() // var task func()
var ok bool // var ok bool
var workerTaskChan chan func() // var workerTaskChan chan func()
startReady := make(chan chan func()) // startReady := make(chan chan func())
Loop: // Loop:
for { // for {
timeout.Reset(p.timeout) // timeout.Reset(p.timeout)
select { // select {
case task, ok = <-p.taskQueue: // case task, ok = <-p.taskQueue:
if !ok { // if !ok {
break Loop // break Loop
} // }
// Got a task to do. // // Got a task to do.
select { // select {
case workerTaskChan = <-p.readyWorkers: // case workerTaskChan = <-p.readyWorkers:
// A worker is ready, so give task to worker. // // A worker is ready, so give task to worker.
workerTaskChan <- task // workerTaskChan <- task
default: // default:
// No workers ready. // // No workers ready.
// Create a new worker, if not at max. // // Create a new worker, if not at max.
if workerCount < p.maxWorkers { // if workerCount < p.maxWorkers {
workerCount++ // workerCount++
go func(t func()) { // go func(t func()) {
startWorker(startReady, p.readyWorkers) // startWorker(startReady, p.readyWorkers)
// Submit the task when the new worker. // // Submit the task when the new worker.
taskChan := <-startReady // taskChan := <-startReady
taskChan <- t // taskChan <- t
}(task) // }(task)
} else { // } else {
// Start a goroutine to submit the task when an existing // // Start a goroutine to submit the task when an existing
// worker is ready. // // worker is ready.
go func(t func()) { // go func(t func()) {
taskChan := <-p.readyWorkers // taskChan := <-p.readyWorkers
taskChan <- t // taskChan <- t
}(task) // }(task)
} // }
} // }
case <-timeout.C: // case <-timeout.C:
// Timed out waiting for work to arrive. Kill a ready worker. // // Timed out waiting for work to arrive. Kill a ready worker.
if workerCount > 0 { // if workerCount > 0 {
select { // select {
case workerTaskChan = <-p.readyWorkers: // case workerTaskChan = <-p.readyWorkers:
// A worker is ready, so kill. // // A worker is ready, so kill.
close(workerTaskChan) // close(workerTaskChan)
workerCount-- // workerCount--
default: // default:
// No work, but no ready workers. All workers are busy. // // No work, but no ready workers. All workers are busy.
} // }
} // }
} // }
} // }
// Stop all remaining workers as they become ready. // // Stop all remaining workers as they become ready.
for workerCount > 0 { // for workerCount > 0 {
workerTaskChan = <-p.readyWorkers // workerTaskChan = <-p.readyWorkers
close(workerTaskChan) // close(workerTaskChan)
workerCount-- // workerCount--
} // }
} // }
func startWorker(startReady, readyWorkers chan chan func()) { // func startWorker(startReady, readyWorkers chan chan func()) {
go func() { // go func() {
taskChan := make(chan func()) // taskChan := make(chan func())
var task func() // var task func()
var ok bool // var ok bool
// Register availability on starReady channel. // // Register availability on starReady channel.
startReady <- taskChan // startReady <- taskChan
for { // for {
// Read task from dispatcher. // // Read task from dispatcher.
task, ok = <-taskChan // task, ok = <-taskChan
if !ok { // if !ok {
// Dispatcher has told worker to stop. // // Dispatcher has told worker to stop.
break // break
} // }
// Execute the task. // // Execute the task.
task() // task()
// Register availability on readyWorkers channel. // // Register availability on readyWorkers channel.
readyWorkers <- taskChan // readyWorkers <- taskChan
} // }
}() // }()
} // }