This commit is contained in:
zhouyuyan
2018-03-02 13:29:18 +08:00
parent 6f26df7a9a
commit 026cfbe123

View File

@@ -42,10 +42,10 @@ type Broker struct {
remotes sync.Map
nodes map[string]interface{}
clusterPool chan *Message
messagePool []chan *Message
sl *Sublist
rl *RetainList
queues map[string]int
// messagePool []chan *Message
}
func newMessagePool() []chan *Message {
@@ -89,31 +89,19 @@ func NewBroker(config *Config) (*Broker, error) {
return b, nil
}
// func (b *Broker) StartDispatcher() {
// for _, mpool := range b.messagePool {
// go func(ch chan *Message) {
// for {
// msg, ok := <-ch
// if !ok {
// log.Error("read message from client channel error")
// return
// }
// b.wpool.Submit(func() {
// ProcessMessage(msg)
// })
// }
// }(mpool)
// }
// }
func (b *Broker) SubmitWork(msg *Message) {
if b.wpool == nil {
b.wpool = pool.New(b.config.Worker)
}
b.wpool.Submit(func() {
ProcessMessage(msg)
})
if msg.client.typ == CLUSTER {
b.clusterPool <- msg
} else {
b.wpool.Submit(func() {
ProcessMessage(msg)
})
}
}
func (b *Broker) Start() {
@@ -121,7 +109,6 @@ func (b *Broker) Start() {
log.Error("broker is null")
return
}
// go b.StartDispatcher()
//listen clinet over tcp
if b.config.Port != "" {