From 5ed4728575a119b426fc1d4540a3d115538de5ee Mon Sep 17 00:00:00 2001 From: "joy.zhou" Date: Wed, 4 Apr 2018 13:49:52 +0800 Subject: [PATCH] Wpool (#23) * pool * pool * wpool --- broker/broker.go | 54 ++++++++++++++++++++++-------------------------- broker/client.go | 25 ++++++++++++++-------- 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index 4a2f6d1..11006f5 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -4,19 +4,19 @@ package broker import ( "crypto/tls" - "github.com/eclipse/paho.mqtt.golang/packets" - "github.com/fhmq/hmq/lib/acl" - "github.com/fhmq/hmq/pool" - "github.com/segmentio/fasthash/fnv1a" - "github.com/shirou/gopsutil/mem" - "go.uber.org/zap" - "golang.org/x/net/websocket" "net" "net/http" "runtime/debug" "sync" "sync/atomic" "time" + + "github.com/eclipse/paho.mqtt.golang/packets" + "github.com/fhmq/hmq/lib/acl" + "github.com/fhmq/hmq/pool" + "github.com/shirou/gopsutil/mem" + "go.uber.org/zap" + "golang.org/x/net/websocket" ) const ( @@ -42,10 +42,10 @@ type Broker struct { remotes sync.Map nodes map[string]interface{} clusterPool chan *Message - messagePool []chan *Message sl *Sublist rl *RetainList queues map[string]int + // messagePool []chan *Message } func newMessagePool() []chan *Message { @@ -67,7 +67,7 @@ func NewBroker(config *Config) (*Broker, error) { nodes: make(map[string]interface{}), queues: make(map[string]int), clusterPool: make(chan *Message), - messagePool: newMessagePool(), + // messagePool: newMessagePool(), } if b.config.TlsPort != "" { tlsconfig, err := NewTLSConfig(b.config.TlsInfo) @@ -89,20 +89,17 @@ func NewBroker(config *Config) (*Broker, error) { return b, nil } -func (b *Broker) StartDispatcher() { - for _, mpool := range b.messagePool { - go func(ch chan *Message) { - for { - msg, ok := <-ch - if !ok { - log.Error("read message from client channel error") - return - } - b.wpool.Submit(func() { - ProcessMessage(msg) - }) - } - }(mpool) +func (b *Broker) SubmitWork(msg *Message) { + if b.wpool == nil { + b.wpool = pool.New(b.config.Worker) + } + + if msg.client.typ == CLUSTER { + b.clusterPool <- msg + } else { + b.wpool.Submit(func() { + ProcessMessage(msg) + }) } } @@ -112,7 +109,6 @@ func (b *Broker) Start() { log.Error("broker is null") return } - go b.StartDispatcher() //listen clinet over tcp if b.config.Port != "" { @@ -365,9 +361,9 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) { b.routes.Store(cid, c) } - mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum] + // mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum] - c.readLoop(mpool) + c.readLoop() } func (b *Broker) ConnectToDiscovery() { @@ -414,7 +410,7 @@ func (b *Broker) ConnectToDiscovery() { c.SendConnect() c.SendInfo() - go c.readLoop(b.clusterPool) + go c.readLoop() go c.StartPing() } @@ -490,8 +486,8 @@ func (b *Broker) connectRouter(id, addr string) { c.SendConnect() - mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum] - go c.readLoop(mpool) + // mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum] + go c.readLoop() go c.StartPing() } diff --git a/broker/client.go b/broker/client.go index 33636f3..4a68cf9 100644 --- a/broker/client.go +++ b/broker/client.go @@ -3,13 +3,14 @@ package broker import ( - "github.com/eclipse/paho.mqtt.golang/packets" - "go.uber.org/zap" "net" "reflect" "strings" "sync" "time" + + "github.com/eclipse/paho.mqtt.golang/packets" + "go.uber.org/zap" ) const ( @@ -85,8 +86,11 @@ func (c *client) init() { c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0] } -func (c *client) keepAlive(ch chan int, mpool chan *Message) { +func (c *client) keepAlive(ch chan int) { defer close(ch) + + b := c.broker + keepalive := time.Duration(c.info.keepalive*3/2) * time.Second timer := time.NewTimer(keepalive) @@ -100,8 +104,10 @@ func (c *client) keepAlive(ch chan int, mpool chan *Message) { continue } log.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive)) + msg := &Message{client: c, packet: DisconnectdPacket} - mpool <- msg + b.SubmitWork(msg) + timer.Stop() return case _, ok := <-c.closed: @@ -112,14 +118,15 @@ func (c *client) keepAlive(ch chan int, mpool chan *Message) { } } -func (c *client) readLoop(mpool chan *Message) { +func (c *client) readLoop() { nc := c.conn - if nc == nil || mpool == nil { + b := c.broker + if nc == nil || b == nil { return } ch := make(chan int, 1000) - go c.keepAlive(ch, mpool) + go c.keepAlive(ch) for { packet, err := packets.ReadPacket(nc) @@ -134,11 +141,11 @@ func (c *client) readLoop(mpool chan *Message) { client: c, packet: packet, } - mpool <- msg + b.SubmitWork(msg) } msg := &Message{client: c, packet: DisconnectdPacket} - mpool <- msg + b.SubmitWork(msg) } func ProcessMessage(msg *Message) {