2 Commits

Author SHA1 Message Date
Yuyan Zhou
aad11371fd add go modules 2019-04-24 14:52:44 +08:00
Yuyan Zhou
c7c59d5a81 fix pool for message order 2019-04-22 10:57:46 +08:00
7 changed files with 251 additions and 248 deletions

View File

@@ -104,7 +104,7 @@ func NewBroker(config *Config) (*Broker, error) {
return b, nil
}
func (b *Broker) SubmitWork(msg *Message) {
func (b *Broker) SubmitWork(clientId string, msg *Message) {
if b.wpool == nil {
b.wpool = pool.New(b.config.Worker)
}
@@ -112,7 +112,7 @@ func (b *Broker) SubmitWork(msg *Message) {
if msg.client.typ == CLUSTER {
b.clusterPool <- msg
} else {
b.wpool.Submit(func() {
b.wpool.Submit(clientId, func() {
ProcessMessage(msg)
})
}
@@ -322,6 +322,9 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
log.Error("received msg that was not Connect")
return
}
log.Info("reconnect connect from ", zap.String("clientID", msg.ClientIdentifier))
connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket)
connack.ReturnCode = packets.Accepted
connack.SessionPresent = msg.CleanSession

View File

@@ -111,7 +111,7 @@ func (c *client) readLoop() {
if err != nil {
log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
msg := &Message{client: c, packet: DisconnectdPacket}
b.SubmitWork(msg)
b.SubmitWork(c.info.clientID, msg)
return
}
@@ -119,7 +119,7 @@ func (c *client) readLoop() {
client: c,
packet: packet,
}
b.SubmitWork(msg)
b.SubmitWork(c.info.clientID, msg)
}
}
@@ -361,7 +361,7 @@ func (c *client) Close() {
c.status = Disconnected
//wait for message complete
time.Sleep(1 * time.Second)
// time.Sleep(1 * time.Second)
// c.status = Disconnected
if c.conn != nil {

15
go.mod Normal file
View File

@@ -0,0 +1,15 @@
module github.com/fhmq/hmq
go 1.12
require (
github.com/bitly/go-simplejson v0.5.0
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/fsnotify/fsnotify v1.4.7
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/shirou/gopsutil v2.18.12+incompatible
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.9.1
golang.org/x/net v0.0.0-20190424024845-afe8014c977f
)

22
go.sum Normal file
View File

@@ -0,0 +1,22 @@
github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkNEM+Y=
github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA=
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e h1:uO75wNGioszjmIzcY/tvdDYKRLVvzggtAmmJkn9j4GQ=
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e/go.mod h1:tm/wZFQ8e24NYaBGIlnO2WGCAi67re4HHuOm0sftE/M=
github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM=
github.com/shirou/gopsutil v2.18.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190424024845-afe8014c977f h1:uALRiwYevCJtciRa4mKKFkrs5jY4F2OTf1D2sfi1swY=
golang.org/x/net v0.0.0-20190424024845-afe8014c977f/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View File

@@ -1,95 +0,0 @@
package sessions
import (
"time"
log "github.com/cihub/seelog"
"github.com/go-redis/redis"
jsoniter "github.com/json-iterator/go"
)
var redisClient *redis.Client
var _ SessionsProvider = (*redisProvider)(nil)
const (
sessionName = "session"
)
type redisProvider struct {
}
func init() {
Register("redis", NewRedisProvider())
}
func InitRedisConn(url string) {
redisClient = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 0, // use default DB
})
err := redisClient.Ping().Err()
for err != nil {
log.Error("connect redis error: ", err, " 3s try again...")
time.Sleep(3 * time.Second)
err = redisClient.Ping().Err()
}
}
func NewRedisProvider() *redisProvider {
return &redisProvider{}
}
func (r *redisProvider) New(id string) (*Session, error) {
val, _ := jsoniter.Marshal(&Session{id: id})
err := redisClient.HSet(sessionName, id, val).Err()
if err != nil {
return nil, err
}
result, err := redisClient.HGet(sessionName, id).Bytes()
if err != nil {
return nil, err
}
sess := Session{}
err = jsoniter.Unmarshal(result, &sess)
if err != nil {
return nil, err
}
return &sess, nil
}
func (r *redisProvider) Get(id string) (*Session, error) {
result, err := redisClient.HGet(sessionName, id).Bytes()
if err != nil {
return nil, err
}
sess := Session{}
err = jsoniter.Unmarshal(result, &sess)
if err != nil {
return nil, err
}
return &sess, nil
}
func (r *redisProvider) Del(id string) {
redisClient.HDel(sessionName, id)
}
func (r *redisProvider) Save(id string) error {
return nil
}
func (r *redisProvider) Count() int {
return int(redisClient.HLen(sessionName).Val())
}
func (r *redisProvider) Close() error {
return redisClient.Del(sessionName).Err()
}

58
pool/fixpool.go Normal file
View File

@@ -0,0 +1,58 @@
package pool
import (
"github.com/segmentio/fasthash/fnv1a"
)
type WorkerPool struct {
maxWorkers int
taskQueue []chan func()
stoppedChan chan struct{}
}
func New(maxWorkers int) *WorkerPool {
// There must be at least one worker.
if maxWorkers < 1 {
maxWorkers = 1
}
// taskQueue is unbuffered since items are always removed immediately.
pool := &WorkerPool{
taskQueue: make([]chan func(), maxWorkers),
maxWorkers: maxWorkers,
stoppedChan: make(chan struct{}),
}
// Start the task dispatcher.
pool.dispatch()
return pool
}
func (p *WorkerPool) Submit(uid string, task func()) {
idx := fnv1a.HashString64(uid) % uint64(p.maxWorkers)
if task != nil {
p.taskQueue[idx] <- task
}
}
func (p *WorkerPool) dispatch() {
for i := 0; i < p.maxWorkers; i++ {
p.taskQueue[i] = make(chan func())
go startWorker(p.taskQueue[i])
}
}
func startWorker(taskChan chan func()) {
go func() {
var task func()
var ok bool
for {
task, ok = <-taskChan
if !ok {
break
}
// Execute the task.
task()
}
}()
}

View File

@@ -1,166 +1,166 @@
package pool
import "time"
// import "time"
const (
// This value is the size of the queue that workers register their
// availability to the dispatcher. There may be hundreds of workers, but
// only a small channel is needed to register some of the workers.
readyQueueSize = 16
// const (
// // This value is the size of the queue that workers register their
// // availability to the dispatcher. There may be hundreds of workers, but
// // only a small channel is needed to register some of the workers.
// readyQueueSize = 16
// If worker pool receives no new work for this period of time, then stop
// a worker goroutine.
idleTimeoutSec = 5
)
// // If worker pool receives no new work for this period of time, then stop
// // a worker goroutine.
// idleTimeoutSec = 5
// )
type WorkerPool struct {
maxWorkers int
timeout time.Duration
taskQueue chan func()
readyWorkers chan chan func()
stoppedChan chan struct{}
}
// type WorkerPool struct {
// maxWorkers int
// timeout time.Duration
// taskQueue chan func()
// readyWorkers chan chan func()
// stoppedChan chan struct{}
// }
func New(maxWorkers int) *WorkerPool {
// There must be at least one worker.
if maxWorkers < 1 {
maxWorkers = 1
}
// func New(maxWorkers int) *WorkerPool {
// // There must be at least one worker.
// if maxWorkers < 1 {
// maxWorkers = 1
// }
// taskQueue is unbuffered since items are always removed immediately.
pool := &WorkerPool{
taskQueue: make(chan func()),
maxWorkers: maxWorkers,
readyWorkers: make(chan chan func(), readyQueueSize),
timeout: time.Second * idleTimeoutSec,
stoppedChan: make(chan struct{}),
}
// // taskQueue is unbuffered since items are always removed immediately.
// pool := &WorkerPool{
// taskQueue: make(chan func()),
// maxWorkers: maxWorkers,
// readyWorkers: make(chan chan func(), readyQueueSize),
// timeout: time.Second * idleTimeoutSec,
// stoppedChan: make(chan struct{}),
// }
// Start the task dispatcher.
go pool.dispatch()
// // Start the task dispatcher.
// go pool.dispatch()
return pool
}
// return pool
// }
func (p *WorkerPool) Stop() {
if p.Stopped() {
return
}
close(p.taskQueue)
<-p.stoppedChan
}
// func (p *WorkerPool) Stop() {
// if p.Stopped() {
// return
// }
// close(p.taskQueue)
// <-p.stoppedChan
// }
func (p *WorkerPool) Stopped() bool {
select {
case <-p.stoppedChan:
return true
default:
}
return false
}
// func (p *WorkerPool) Stopped() bool {
// select {
// case <-p.stoppedChan:
// return true
// default:
// }
// return false
// }
func (p *WorkerPool) Submit(task func()) {
if task != nil {
p.taskQueue <- task
}
}
// func (p *WorkerPool) Submit(task func()) {
// if task != nil {
// p.taskQueue <- task
// }
// }
func (p *WorkerPool) SubmitWait(task func()) {
if task == nil {
return
}
doneChan := make(chan struct{})
p.taskQueue <- func() {
task()
close(doneChan)
}
<-doneChan
}
// func (p *WorkerPool) SubmitWait(task func()) {
// if task == nil {
// return
// }
// doneChan := make(chan struct{})
// p.taskQueue <- func() {
// task()
// close(doneChan)
// }
// <-doneChan
// }
func (p *WorkerPool) dispatch() {
defer close(p.stoppedChan)
timeout := time.NewTimer(p.timeout)
var workerCount int
var task func()
var ok bool
var workerTaskChan chan func()
startReady := make(chan chan func())
Loop:
for {
timeout.Reset(p.timeout)
select {
case task, ok = <-p.taskQueue:
if !ok {
break Loop
}
// Got a task to do.
select {
case workerTaskChan = <-p.readyWorkers:
// A worker is ready, so give task to worker.
workerTaskChan <- task
default:
// No workers ready.
// Create a new worker, if not at max.
if workerCount < p.maxWorkers {
workerCount++
go func(t func()) {
startWorker(startReady, p.readyWorkers)
// Submit the task when the new worker.
taskChan := <-startReady
taskChan <- t
}(task)
} else {
// Start a goroutine to submit the task when an existing
// worker is ready.
go func(t func()) {
taskChan := <-p.readyWorkers
taskChan <- t
}(task)
}
}
case <-timeout.C:
// Timed out waiting for work to arrive. Kill a ready worker.
if workerCount > 0 {
select {
case workerTaskChan = <-p.readyWorkers:
// A worker is ready, so kill.
close(workerTaskChan)
workerCount--
default:
// No work, but no ready workers. All workers are busy.
}
}
}
}
// func (p *WorkerPool) dispatch() {
// defer close(p.stoppedChan)
// timeout := time.NewTimer(p.timeout)
// var workerCount int
// var task func()
// var ok bool
// var workerTaskChan chan func()
// startReady := make(chan chan func())
// Loop:
// for {
// timeout.Reset(p.timeout)
// select {
// case task, ok = <-p.taskQueue:
// if !ok {
// break Loop
// }
// // Got a task to do.
// select {
// case workerTaskChan = <-p.readyWorkers:
// // A worker is ready, so give task to worker.
// workerTaskChan <- task
// default:
// // No workers ready.
// // Create a new worker, if not at max.
// if workerCount < p.maxWorkers {
// workerCount++
// go func(t func()) {
// startWorker(startReady, p.readyWorkers)
// // Submit the task when the new worker.
// taskChan := <-startReady
// taskChan <- t
// }(task)
// } else {
// // Start a goroutine to submit the task when an existing
// // worker is ready.
// go func(t func()) {
// taskChan := <-p.readyWorkers
// taskChan <- t
// }(task)
// }
// }
// case <-timeout.C:
// // Timed out waiting for work to arrive. Kill a ready worker.
// if workerCount > 0 {
// select {
// case workerTaskChan = <-p.readyWorkers:
// // A worker is ready, so kill.
// close(workerTaskChan)
// workerCount--
// default:
// // No work, but no ready workers. All workers are busy.
// }
// }
// }
// }
// Stop all remaining workers as they become ready.
for workerCount > 0 {
workerTaskChan = <-p.readyWorkers
close(workerTaskChan)
workerCount--
}
}
// // Stop all remaining workers as they become ready.
// for workerCount > 0 {
// workerTaskChan = <-p.readyWorkers
// close(workerTaskChan)
// workerCount--
// }
// }
func startWorker(startReady, readyWorkers chan chan func()) {
go func() {
taskChan := make(chan func())
var task func()
var ok bool
// Register availability on starReady channel.
startReady <- taskChan
for {
// Read task from dispatcher.
task, ok = <-taskChan
if !ok {
// Dispatcher has told worker to stop.
break
}
// func startWorker(startReady, readyWorkers chan chan func()) {
// go func() {
// taskChan := make(chan func())
// var task func()
// var ok bool
// // Register availability on starReady channel.
// startReady <- taskChan
// for {
// // Read task from dispatcher.
// task, ok = <-taskChan
// if !ok {
// // Dispatcher has told worker to stop.
// break
// }
// Execute the task.
task()
// // Execute the task.
// task()
// Register availability on readyWorkers channel.
readyWorkers <- taskChan
}
}()
}
// // Register availability on readyWorkers channel.
// readyWorkers <- taskChan
// }
// }()
// }