From ae1af54c6ecc064535a9b2a84e9c5a8205025c3c Mon Sep 17 00:00:00 2001 From: zhouyuyan Date: Thu, 7 Sep 2017 14:32:01 +0800 Subject: [PATCH] cluster sub --- broker/broker.go | 2 +- broker/client.go | 71 ++++++++++++++++++------------------------------ 2 files changed, 28 insertions(+), 45 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index e1df7a4..7623170 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -403,7 +403,7 @@ func (b *Broker) removeClient(c *client) { // log.Info("delete client ,", clientId) } -func (b *Broker) ProcessPublishMessage(packet *packets.PublishPacket) { +func (b *Broker) PublishMessage(packet *packets.PublishPacket) { topic := packet.TopicName r := b.sl.Match(topic) // log.Info("psubs num: ", len(r.psubs)) diff --git a/broker/client.go b/broker/client.go index 484a7ee..c504dad 100644 --- a/broker/client.go +++ b/broker/client.go @@ -1,7 +1,6 @@ package broker import ( - "errors" "hmq/packets" "net" "strings" @@ -30,7 +29,12 @@ type client struct { info info route *route subs map[string]*subscription - rsubs map[string][]*subscription + rsubs map[string]*subInfo +} + +type subInfo struct { + sub *subscription + num int } type subscription struct { @@ -56,13 +60,13 @@ type route struct { } var ( - disconnectdPacket = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket) + DisconnectdPacket = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket) ) func (c *client) init() { typ := c.typ if typ == ROUTER { - c.rsubs = make(map[string][]*subscription) + c.rsubs = make(map[string]*subInfo) } else if typ == CLIENT { c.subs = make(map[string]*subscription, 10) } @@ -82,20 +86,14 @@ func (c *client) readLoop(msgPool *MessagePool) { nowTime = uint16(time.Now().Unix()) if 0 != c.info.keepalive && nowTime-lastIn > c.info.keepalive*3/2 { log.Errorf("Client %s has exceeded timeout, disconnecting.\n", c.info.clientID) - msg := &Message{ - client: c, - packet: disconnectdPacket, - } + msg := &Message{client: c, packet: DisconnectdPacket} msgPool.queue <- msg return } packet, err := packets.ReadPacket(nc) if err != nil { log.Error("read packet error: ", err) - msg := &Message{ - client: c, - packet: disconnectdPacket, - } + msg := &Message{client: c, packet: DisconnectdPacket} msgPool.queue <- msg return } @@ -332,10 +330,14 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { continue } case ROUTER: - if _, exist := c.rsubs[topic]; !exist { - c.rsubs[topic] = make([]*subscription, 10) + if subinfo, exist := c.rsubs[topic]; !exist { + sinfo := &subInfo{sub: sub, num: 1} + c.rsubs[topic] = sinfo + } else { + subinfo.num = subinfo.num + 1 + retcodes = append(retcodes, qoss[i]) + continue } - c.rsubs[topic] = append(c.rsubs[topic], sub) } err := b.sl.Insert(sub) if err != nil { @@ -384,12 +386,15 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) { case CLIENT: sub, ok = c.subs[t] case ROUTER: - _, ok := c.rsubs[t] - if ok && len(c.rsubs[t]) > 0 { - sub = c.rsubs[t][0] - c.rsubs[t] = c.rsubs[t][1:] - } else { - return + subinfo, ok := c.rsubs[t] + if ok { + subinfo.num = subinfo.num - 1 + c.rsubs[t] = subinfo + if subinfo.num < 1 { + delete(c.rsubs, t) + } else { + c.rsubs[t] = subinfo + } } } if ok { @@ -447,7 +452,7 @@ func (c *client) Close() { b.BroadcastUnSubscribe(subs) } if c.info.willMsg != nil { - b.ProcessPublishMessage(c.info.willMsg) + b.PublishMessage(c.info.willMsg) } } if c.conn != nil { @@ -462,25 +467,3 @@ func (c *client) WriterPacket(packet packets.ControlPacket) error { c.mu.Unlock() return err } - -func WriteBuffer(conn net.Conn, buf []byte) error { - if conn == nil { - return errors.New("conn is nul") - } - _, err := conn.Write(buf) - return err -} -func (c *client) writeBuffer(buf []byte) error { - c.mu.Lock() - err := WriteBuffer(c.conn, buf) - c.mu.Unlock() - return err -} - -// func (c *client) writeMessage(msg message.Message) error { -// buf, err := EncodeMessage(msg) -// if err != nil { -// return err -// } -// return c.writeBuffer(buf) -// }