muit channel

This commit is contained in:
zhouyuyan
2018-02-01 16:52:24 +08:00
parent 9dac0e0f1e
commit 5b28aba228

View File

@@ -27,6 +27,11 @@ var (
brokerLog = logger.Get().Named("Broker")
)
const (
MessagePoolNum = 1024
MessageNum = 1024
)
type Message struct {
client *client
packet packets.ControlPacket
@@ -45,12 +50,21 @@ type Broker struct {
remotes sync.Map
nodes map[string]interface{}
clusterChannel chan *Message
clientChannel chan *Message
messagePool []chan *Message
sl *Sublist
rl *RetainList
queues map[string]int
}
func newMessagePool() []chan *Message {
mp := make([]chan *Message, 0)
for i := 0; i < MessagePoolNum; i++ {
tempCh := make(chan *Message, MessageNum)
mp = append(mp, tempCh)
}
return mp
}
func NewBroker(config *Config) (*Broker, error) {
b := &Broker{
id: GenUniqueId(),
@@ -61,7 +75,7 @@ func NewBroker(config *Config) (*Broker, error) {
nodes: make(map[string]interface{}),
queues: make(map[string]int),
clusterChannel: make(chan *Message),
clientChannel: make(chan *Message),
messagePool: newMessagePool(),
}
if b.config.TlsPort != "" {
tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
@@ -84,15 +98,19 @@ func NewBroker(config *Config) (*Broker, error) {
}
func (b *Broker) StartDispatcher() {
for {
msg, ok := <-b.clientChannel
if !ok {
brokerLog.Error("read message from client channel error")
return
}
b.wpool.Submit(func() {
ProcessMessage(msg)
})
for i := 0; i < MessagePoolNum; i++ {
go func(idx int) {
for {
msg, ok := <-b.messagePool[idx]
if !ok {
brokerLog.Error("read message from client channel error")
return
}
b.wpool.Submit(func() {
ProcessMessage(msg)
})
}
}(i)
}
}
@@ -167,9 +185,10 @@ func (b *Broker) StartWebsocketListening() {
func (b *Broker) wsHandler(ws *websocket.Conn) {
// io.Copy(ws, ws)
atomic.AddUint64(&b.cid, 1)
ws.PayloadType = websocket.BinaryFrame
b.handleConnection(CLIENT, ws, b.cid)
idx := atomic.AddUint64(&b.cid, 1)
b.handleConnection(CLIENT, ws, idx)
}
func (b *Broker) StartClientListening(Tls bool) {
@@ -207,8 +226,8 @@ func (b *Broker) StartClientListening(Tls bool) {
continue
}
tmpDelay = ACCEPT_MIN_SLEEP
atomic.AddUint64(&b.cid, 1)
go b.handleConnection(CLIENT, conn, b.cid)
idx := atomic.AddUint64(&b.cid, 1)
go b.handleConnection(CLIENT, conn, idx)
}
}
@@ -252,7 +271,6 @@ func (b *Broker) StartClusterListening() {
return
}
var idx uint64 = 0
tmpDelay := 10 * ACCEPT_MIN_SLEEP
for {
conn, err := l.Accept()
@@ -271,7 +289,7 @@ func (b *Broker) StartClusterListening() {
continue
}
tmpDelay = ACCEPT_MIN_SLEEP
idx := atomic.AddUint64(&b.cid, 1)
go b.handleConnection(ROUTER, conn, idx)
}
}
@@ -356,7 +374,9 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
b.routes.Store(cid, c)
}
c.readLoop(b.clientChannel)
mpool := b.messagePool[idx%MessagePoolNum]
c.readLoop(mpool)
}
func (b *Broker) ConnectToDiscovery() {
@@ -479,7 +499,10 @@ func (b *Broker) connectRouter(id, addr string) {
c.SendConnect()
go c.readLoop(b.clientChannel)
idx := atomic.AddUint64(&b.cid, 1)
mpool := b.messagePool[idx%MessagePoolNum]
go c.readLoop(mpool)
go c.StartPing()
}