This commit is contained in:
zhouyuyan
2017-09-08 09:59:01 +08:00
parent 7073e9b4ba
commit 7f45bd6bc9

View File

@@ -2,6 +2,7 @@ package broker
import (
"hmq/packets"
"math/rand"
"net"
"strings"
"sync"
@@ -221,40 +222,32 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
}
}
subinfo, exist := b.queues[topic]
qnum := 0
var qsub *subscription
if exist {
idx := subinfo.index
for _, sub := range r.qsubs {
if sub.client.typ == ROUTER {
if typ == ROUTER {
continue
}
idx := GenerateRangeNum(0, len(r.qsubs))
for {
sub := r.qsubs[idx]
if sub.client.typ == ROUTER {
if typ == ROUTER {
idx = GenerateRangeNum(0, len(r.qsubs))
continue
}
if idx <= qnum {
qsub = sub
break
}
if sub.client.typ == CLIENT {
qnum = qnum + 1
} else {
qnum = qnum + sub.client.rsubs[topic].num
}
}
subinfo.index = (idx + 1) % subinfo.count
}
if qsub != nil {
err := qsub.client.WriterPacket(packet)
if err != nil {
log.Error("process will message for qsub error, ", err)
if sub != nil {
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("process will message for qsub error, ", err)
}
break
}
}
}
func GenerateRangeNum(min, max int) int {
// rand.Seed(time.Now().Unix())
randNum := rand.Intn(max-min) + min
return randNum
}
func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
b := c.broker
if b == nil {
@@ -280,12 +273,6 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
if queue {
if len(t) > 7 {
t = t[7:]
if _, exists := b.queues[topic]; !exists {
b.queues[topic] = &Queue{
count: 0,
index: 0,
}
}
} else {
retcodes = append(retcodes, QosFailure)
continue
@@ -301,7 +288,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
case CLIENT:
if _, exist := c.subs[topic]; !exist {
c.subs[topic] = sub
b.queues[topic].count++
} else {
//if exist ,check whether qos change
c.subs[topic].qos = qoss[i]
@@ -312,11 +299,10 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
if subinfo, exist := c.rsubs[topic]; !exist {
sinfo := &subInfo{sub: sub, num: 1}
c.rsubs[topic] = sinfo
b.queues[topic].count++
} else {
subinfo.num = subinfo.num + 1
retcodes = append(retcodes, qoss[i])
b.queues[topic].count++
continue
}
}