From ca7ebfb6e3575dcae7c49493d71697ad72d204fc Mon Sep 17 00:00:00 2001 From: zhouyuyan Date: Fri, 1 Sep 2017 22:17:22 +0800 Subject: [PATCH] cluster sub --- Dcokerfile | 1 + broker/client.go | 99 +++++++++++++++++++++++++++++------------------- 2 files changed, 60 insertions(+), 40 deletions(-) diff --git a/Dcokerfile b/Dcokerfile index f9ffa7a..42591e3 100644 --- a/Dcokerfile +++ b/Dcokerfile @@ -5,6 +5,7 @@ COPY tls /tls COPY conf /conf EXPOSE 1883 +EXPOSE 1888 EXPOSE 8883 EXPOSE 1993 diff --git a/broker/client.go b/broker/client.go index 53a93f6..990b62f 100644 --- a/broker/client.go +++ b/broker/client.go @@ -29,6 +29,7 @@ type client struct { info info route *route subs map[string]*subscription + rsubs map[string][]*subscription } type subscription struct { @@ -54,7 +55,12 @@ type route struct { } func (c *client) init() { - c.subs = make(map[string]*subscription, 10) + typ := c.typ + if typ == ROUTER { + c.rsubs = make(map[string][]*subscription) + } else if typ == CLIENT { + c.subs = make(map[string]*subscription, 10) + } c.info.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0] c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0] } @@ -271,47 +277,50 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { continue } - if _, exist := c.subs[topic]; !exist { - queue := false - if strings.HasPrefix(topic, "$queue/") { - if len(t) > 7 { - t = t[7:] - queue = true - // b.qmu.Lock() - if _, exists := b.queues[topic]; !exists { - b.queues[topic] = 0 - } - // b.qmu.Unlock() - } else { - retcodes = append(retcodes, QosFailure) - continue + queue := strings.HasPrefix(topic, "$queue/") + if queue { + if len(t) > 7 { + t = t[7:] + // b.qmu.Lock() + if _, exists := b.queues[topic]; !exists { + b.queues[topic] = 0 } - } - sub := &subscription{ - topic: t, - qos: qoss[i], - client: c, - queue: queue, - } - - c.mu.Lock() - c.subs[topic] = sub - c.mu.Unlock() - - err := b.sl.Insert(sub) - if err != nil { - log.Error("Insert subscription error: ", err) + // b.qmu.Unlock() + } else { retcodes = append(retcodes, QosFailure) + continue } - retcodes = append(retcodes, qoss[i]) + } + sub := &subscription{ + topic: t, + qos: qoss[i], + client: c, + queue: queue, + } + switch c.typ { + case CLIENT: + if _, exist := c.subs[topic]; !exist { + c.subs[topic] = sub + } else { + //if exist ,check whether qos change + c.subs[topic].qos = qoss[i] + retcodes = append(retcodes, qoss[i]) + continue + } + case ROUTER: + if _, exist := c.rsubs[topic]; !exist { + c.rsubs[topic] = make([]*subscription, 10) + } + c.rsubs[topic] = append(c.rsubs[topic], sub) + } + err := b.sl.Insert(sub) + if err != nil { + log.Error("Insert subscription error: ", err) + retcodes = append(retcodes, QosFailure) } else { - //if exist ,check whether qos change - c.subs[topic].qos = qoss[i] retcodes = append(retcodes, qoss[i]) } - } - suback.ReturnCodes = retcodes err := c.WriterPacket(suback) @@ -341,13 +350,23 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) { if b == nil { return } + typ := c.typ topics := packet.Topics for _, t := range topics { var sub *subscription ok := false - - if sub, ok = c.subs[t]; ok { + switch typ { + case CLIENT: + sub, ok = c.subs[t] + case ROUTER: + _, ok := c.rsubs[t] + if ok && len(c.rsubs[t]) > 0 { + sub = c.rsubs[t][0] + c.rsubs[t] = c.rsubs[t][1:] + } + } + if ok { go c.unsubscribe(sub) } @@ -369,9 +388,9 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) { func (c *client) unsubscribe(sub *subscription) { - c.mu.Lock() - delete(c.subs, string(sub.topic)) - c.mu.Unlock() + if c.typ == CLIENT { + delete(c.subs, string(sub.topic)) + } if c.broker != nil { c.broker.sl.Remove(sub)