This commit is contained in:
joy.zhou
2019-08-02 15:54:21 +08:00
parent f1b12c9fa8
commit e2a8180f12

View File

@@ -372,12 +372,10 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
ol.Close()
}
} else {
b.QueryConnect(cid)
go b.QueryConnect(cid)
}
b.clients.Store(cid, c)
b.OnlineOfflineNotification(cid, true)
//TODO notify othernode to close connect
}
c.readLoop()
@@ -472,33 +470,44 @@ func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
}
}
b.ProcessRemote(packet, qsub)
go b.ProcessRemote(packet, qsub)
}
func (b *Broker) ProcessRemote(packet *packets.PublishPacket, loaclShareSub []*subscription) {
shareRemoteID := ""
remoteSubInfo := b.QuerySubscribe(packet.TopicName, packet.Qos)
totalShare := len(loaclShareSub)
for _, v := range remoteSubInfo {
totalShare = totalShare + v.shareSubCount
}
target := r.Intn(totalShare)
if target < len(loaclShareSub) {
shareRemoteID = b.id
} else {
target = target - len(loaclShareSub)
for k, v := range remoteSubInfo {
if target < v.shareSubCount {
shareRemoteID = k
return
remoteSubInfo := b.QuerySubscribe(packet.TopicName, packet.Qos)
//calc which node process share message
shareRemoteID := ""
{
totalShare := len(loaclShareSub)
for _, v := range remoteSubInfo {
totalShare = totalShare + v.shareSubCount
}
target := r.Intn(totalShare)
if target < len(loaclShareSub) {
shareRemoteID = b.id
//send local
if shareRemoteID == b.id {
sub := loaclShareSub[target]
publish(sub, packet)
}
} else {
target = target - len(loaclShareSub)
for k, v := range remoteSubInfo {
if target < v.shareSubCount {
shareRemoteID = k
return
}
target = target - v.shareSubCount
}
target = target - v.shareSubCount
}
}
//send remote
//send remote message
for id, sub := range remoteSubInfo {
rpcCli := b.rpcClient[id]
if sub.subCount > 0 {
@@ -506,10 +515,4 @@ func (b *Broker) ProcessRemote(packet *packets.PublishPacket, loaclShareSub []*s
}
}
//send local
if shareRemoteID == b.id {
sub := loaclShareSub[target]
publish(sub, packet)
}
}