From 5b28aba228cae23374ef7cd46661042a259d25a9 Mon Sep 17 00:00:00 2001 From: zhouyuyan Date: Thu, 1 Feb 2018 16:52:24 +0800 Subject: [PATCH] muit channel --- broker/broker.go | 61 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 19 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index f239edd..05c7460 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -27,6 +27,11 @@ var ( brokerLog = logger.Get().Named("Broker") ) +const ( + MessagePoolNum = 1024 + MessageNum = 1024 +) + type Message struct { client *client packet packets.ControlPacket @@ -45,12 +50,21 @@ type Broker struct { remotes sync.Map nodes map[string]interface{} clusterChannel chan *Message - clientChannel chan *Message + messagePool []chan *Message sl *Sublist rl *RetainList queues map[string]int } +func newMessagePool() []chan *Message { + mp := make([]chan *Message, 0) + for i := 0; i < MessagePoolNum; i++ { + tempCh := make(chan *Message, MessageNum) + mp = append(mp, tempCh) + } + return mp +} + func NewBroker(config *Config) (*Broker, error) { b := &Broker{ id: GenUniqueId(), @@ -61,7 +75,7 @@ func NewBroker(config *Config) (*Broker, error) { nodes: make(map[string]interface{}), queues: make(map[string]int), clusterChannel: make(chan *Message), - clientChannel: make(chan *Message), + messagePool: newMessagePool(), } if b.config.TlsPort != "" { tlsconfig, err := NewTLSConfig(b.config.TlsInfo) @@ -84,15 +98,19 @@ func NewBroker(config *Config) (*Broker, error) { } func (b *Broker) StartDispatcher() { - for { - msg, ok := <-b.clientChannel - if !ok { - brokerLog.Error("read message from client channel error") - return - } - b.wpool.Submit(func() { - ProcessMessage(msg) - }) + for i := 0; i < MessagePoolNum; i++ { + go func(idx int) { + for { + msg, ok := <-b.messagePool[idx] + if !ok { + brokerLog.Error("read message from client channel error") + return + } + b.wpool.Submit(func() { + ProcessMessage(msg) + }) + } + }(i) } } @@ -167,9 +185,10 @@ func (b *Broker) StartWebsocketListening() { func (b *Broker) wsHandler(ws *websocket.Conn) { // io.Copy(ws, ws) - atomic.AddUint64(&b.cid, 1) ws.PayloadType = websocket.BinaryFrame - b.handleConnection(CLIENT, ws, b.cid) + + idx := atomic.AddUint64(&b.cid, 1) + b.handleConnection(CLIENT, ws, idx) } func (b *Broker) StartClientListening(Tls bool) { @@ -207,8 +226,8 @@ func (b *Broker) StartClientListening(Tls bool) { continue } tmpDelay = ACCEPT_MIN_SLEEP - atomic.AddUint64(&b.cid, 1) - go b.handleConnection(CLIENT, conn, b.cid) + idx := atomic.AddUint64(&b.cid, 1) + go b.handleConnection(CLIENT, conn, idx) } } @@ -252,7 +271,6 @@ func (b *Broker) StartClusterListening() { return } - var idx uint64 = 0 tmpDelay := 10 * ACCEPT_MIN_SLEEP for { conn, err := l.Accept() @@ -271,7 +289,7 @@ func (b *Broker) StartClusterListening() { continue } tmpDelay = ACCEPT_MIN_SLEEP - + idx := atomic.AddUint64(&b.cid, 1) go b.handleConnection(ROUTER, conn, idx) } } @@ -356,7 +374,9 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { b.routes.Store(cid, c) } - c.readLoop(b.clientChannel) + mpool := b.messagePool[idx%MessagePoolNum] + + c.readLoop(mpool) } func (b *Broker) ConnectToDiscovery() { @@ -479,7 +499,10 @@ func (b *Broker) connectRouter(id, addr string) { c.SendConnect() - go c.readLoop(b.clientChannel) + idx := atomic.AddUint64(&b.cid, 1) + mpool := b.messagePool[idx%MessagePoolNum] + + go c.readLoop(mpool) go c.StartPing() }