From 7f45bd6bc9f3578c0ade2c8b1ab27bde99cb4059 Mon Sep 17 00:00:00 2001 From: zhouyuyan Date: Fri, 8 Sep 2017 09:59:01 +0800 Subject: [PATCH] ls --- broker/client.go | 58 ++++++++++++++++++------------------------------ 1 file changed, 22 insertions(+), 36 deletions(-) diff --git a/broker/client.go b/broker/client.go index 9f106fb..2916e5e 100644 --- a/broker/client.go +++ b/broker/client.go @@ -2,6 +2,7 @@ package broker import ( "hmq/packets" + "math/rand" "net" "strings" "sync" @@ -221,40 +222,32 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { } } - subinfo, exist := b.queues[topic] - qnum := 0 - var qsub *subscription - if exist { - idx := subinfo.index - for _, sub := range r.qsubs { - if sub.client.typ == ROUTER { - if typ == ROUTER { - continue - } + idx := GenerateRangeNum(0, len(r.qsubs)) + for { + sub := r.qsubs[idx] + if sub.client.typ == ROUTER { + if typ == ROUTER { + idx = GenerateRangeNum(0, len(r.qsubs)) + continue } - if idx <= qnum { - qsub = sub - break - } - if sub.client.typ == CLIENT { - qnum = qnum + 1 - } else { - qnum = qnum + sub.client.rsubs[topic].num - } - } - subinfo.index = (idx + 1) % subinfo.count - } - - if qsub != nil { - err := qsub.client.WriterPacket(packet) - if err != nil { - log.Error("process will message for qsub error, ", err) + if sub != nil { + err := sub.client.WriterPacket(packet) + if err != nil { + log.Error("process will message for qsub error, ", err) + } + break } } } +func GenerateRangeNum(min, max int) int { + // rand.Seed(time.Now().Unix()) + randNum := rand.Intn(max-min) + min + return randNum +} + func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { b := c.broker if b == nil { @@ -280,12 +273,6 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { if queue { if len(t) > 7 { t = t[7:] - if _, exists := b.queues[topic]; !exists { - b.queues[topic] = &Queue{ - count: 0, - index: 0, - } - } } else { retcodes = append(retcodes, QosFailure) continue @@ -301,7 +288,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { case CLIENT: if _, exist := c.subs[topic]; !exist { c.subs[topic] = sub - b.queues[topic].count++ + } else { //if exist ,check whether qos change c.subs[topic].qos = qoss[i] @@ -312,11 +299,10 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { if subinfo, exist := c.rsubs[topic]; !exist { sinfo := &subInfo{sub: sub, num: 1} c.rsubs[topic] = sinfo - b.queues[topic].count++ + } else { subinfo.num = subinfo.num + 1 retcodes = append(retcodes, qoss[i]) - b.queues[topic].count++ continue } }