This commit is contained in:
zhouyuyan
2018-02-01 16:57:56 +08:00
parent 9dac0e0f1e
commit 8337e14452
+30 -30
View File
@@ -33,35 +33,35 @@ type Message struct {
} }
type Broker struct { type Broker struct {
id string id string
cid uint64 cid uint64
mu sync.Mutex mu sync.Mutex
config *Config config *Config
tlsConfig *tls.Config tlsConfig *tls.Config
AclConfig *acl.ACLConfig AclConfig *acl.ACLConfig
wpool *pool.WorkerPool wpool *pool.WorkerPool
clients sync.Map clients sync.Map
routes sync.Map routes sync.Map
remotes sync.Map remotes sync.Map
nodes map[string]interface{} nodes map[string]interface{}
clusterChannel chan *Message clusterPool chan *Message
clientChannel chan *Message messagePool chan *Message
sl *Sublist sl *Sublist
rl *RetainList rl *RetainList
queues map[string]int queues map[string]int
} }
func NewBroker(config *Config) (*Broker, error) { func NewBroker(config *Config) (*Broker, error) {
b := &Broker{ b := &Broker{
id: GenUniqueId(), id: GenUniqueId(),
config: config, config: config,
wpool: pool.New(config.Worker), wpool: pool.New(config.Worker),
sl: NewSublist(), sl: NewSublist(),
rl: NewRetainList(), rl: NewRetainList(),
nodes: make(map[string]interface{}), nodes: make(map[string]interface{}),
queues: make(map[string]int), queues: make(map[string]int),
clusterChannel: make(chan *Message), clusterPool: make(chan *Message),
clientChannel: make(chan *Message), messagePool: make(chan *Message),
} }
if b.config.TlsPort != "" { if b.config.TlsPort != "" {
tlsconfig, err := NewTLSConfig(b.config.TlsInfo) tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
@@ -85,7 +85,7 @@ func NewBroker(config *Config) (*Broker, error) {
func (b *Broker) StartDispatcher() { func (b *Broker) StartDispatcher() {
for { for {
msg, ok := <-b.clientChannel msg, ok := <-b.messagePool
if !ok { if !ok {
brokerLog.Error("read message from client channel error") brokerLog.Error("read message from client channel error")
return return
@@ -356,7 +356,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
b.routes.Store(cid, c) b.routes.Store(cid, c)
} }
c.readLoop(b.clientChannel) c.readLoop(b.messagePool)
} }
func (b *Broker) ConnectToDiscovery() { func (b *Broker) ConnectToDiscovery() {
@@ -403,13 +403,13 @@ func (b *Broker) ConnectToDiscovery() {
c.SendConnect() c.SendConnect()
c.SendInfo() c.SendInfo()
go c.readLoop(b.clusterChannel) go c.readLoop(b.clusterPool)
go c.StartPing() go c.StartPing()
} }
func (b *Broker) processClusterInfo() { func (b *Broker) processClusterInfo() {
for { for {
msg, ok := <-b.clusterChannel msg, ok := <-b.clusterPool
if !ok { if !ok {
brokerLog.Error("read message from cluster channel error") brokerLog.Error("read message from cluster channel error")
return return
@@ -479,7 +479,7 @@ func (b *Broker) connectRouter(id, addr string) {
c.SendConnect() c.SendConnect()
go c.readLoop(b.clientChannel) go c.readLoop(b.messagePool)
go c.StartPing() go c.StartPing()
} }