modify remote

This commit is contained in:
zhouyuyan
2017-09-05 13:52:53 +08:00
parent 18d18738be
commit 43a6bb8c5d
3 changed files with 22 additions and 6 deletions

View File

@@ -74,6 +74,9 @@ func (b *Broker) Start() {
if b.config.TlsPort != "" {
go b.StartTLSListening()
}
if len(b.config.Cluster.Routes) > 0 {
b.ConnectToRouters()
}
}
func (b *Broker) StartWebsocketListening() {
@@ -141,6 +144,7 @@ func (b *Broker) StartListening(typ int) {
return
}
var idx uint64 = 0
tmpDelay := 10 * ACCEPT_MIN_SLEEP
for {
conn, err := l.Accept()
@@ -159,8 +163,19 @@ func (b *Broker) StartListening(typ int) {
continue
}
tmpDelay = ACCEPT_MIN_SLEEP
atomic.AddUint64(&b.cid, 1)
go b.handleConnection(typ, conn, b.cid)
if typ == CLIENT {
atomic.AddUint64(&b.cid, 1)
go b.handleConnection(typ, conn, b.cid)
} else {
go b.handleConnection(typ, conn, idx)
if idx == 1 {
idx = 0
} else {
idx = idx + 1
}
}
}
}
@@ -227,7 +242,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
old, exist = b.clients.Load(cid)
b.clients.Store(cid, c)
case ROUTER:
msgPool = MSGPool[MessagePoolNum].GetPool()
msgPool = MSGPool[(MessagePoolNum + idx)].GetPool()
old, exist = b.routes.Load(cid)
b.routes.Store(cid, c)
}
@@ -278,8 +293,9 @@ func (b *Broker) connectRouter(url, remoteID string) {
c.SendConnect()
c.SendInfo()
// s.createRemote(conn, route)
msgPool := MSGPool[(MessagePoolNum + 1)].GetPool()
c.readLoop(msgPool)
// msgPool := MSGPool[(MessagePoolNum + 1)].GetPool()
c.StartPing()
// c.readLoop(msgPool)
}
}

View File

@@ -401,7 +401,7 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
func (c *client) unsubscribe(sub *subscription) {
if c.typ == CLIENT {
delete(c.subs, string(sub.topic))
delete(c.subs, sub.topic)
}
if c.broker != nil {

BIN
hmq

Binary file not shown.