From a75adfdba4342a310a61c35e8b9dc34cb015e90c Mon Sep 17 00:00:00 2001 From: joyz Date: Tue, 25 Dec 2018 17:34:22 +0800 Subject: [PATCH] modify --- README.md | 8 ++++++++ broker/broker.go | 12 ++++++++---- broker/client.go | 1 - lib/topics/memtopics.go | 10 +++++----- 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 57b2e33..0ec8673 100644 --- a/README.md +++ b/README.md @@ -145,6 +145,14 @@ Client -> | Rule1 | --nomatch--> | Rule2 | --nomatch--> | Rule3 | --> allow | deny allow | deny allow | deny ~~~ +### Online/Offline Notification +```bash + topic: + $SYS/broker/connection/clients/ + payload: + {"clientID":"client001","online":true/false,"timestamp":"2018-10-25T09:32:32Z"} +``` + ## Performance * High throughput diff --git a/broker/broker.go b/broker/broker.go index 923dae8..de1aa73 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -617,14 +617,18 @@ func (b *Broker) removeClient(c *client) { func (b *Broker) PublishMessage(packet *packets.PublishPacket) { var subs []interface{} var qoss []byte - b.topicsMgr.Subscribers([]byte(packet.TopicName), packet.Qos, &subs, &qoss) + err := b.topicsMgr.Subscribers([]byte(packet.TopicName), packet.Qos, &subs, &qoss) + if err != nil { + log.Error("search sub client error, ", zap.Error(err)) + return + } for _, sub := range subs { s, ok := sub.(*subscription) if ok { err := s.client.WriterPacket(packet) if err != nil { - log.Error("process message for psub error, ", zap.Error(err)) + log.Error("write message error, ", zap.Error(err)) } } } @@ -643,10 +647,10 @@ func (b *Broker) BroadcastUnSubscribe(subs map[string]*subscription) { } func (b *Broker) OnlineOfflineNotification(clientID string, online bool) { - packet := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket) + packet.TopicName = "$SYS/broker/connection/clients/" + clientID packet.Qos = 0 - packet.Payload = []byte(fmt.Sprintf(`{"clientID":"%s","online":"%v","timestamp":"%s"}`, clientID, online, time.Now().Format(time.RFC3339))) + packet.Payload = []byte(fmt.Sprintf(`{"clientID":"%s","online":%v,"timestamp":"%s"}`, clientID, online, time.Now().UTC().Format(time.RFC3339))) b.PublishMessage(packet) } diff --git a/broker/client.go b/broker/client.go index 6efb129..204b0c7 100644 --- a/broker/client.go +++ b/broker/client.go @@ -88,7 +88,6 @@ func (c *client) init() { c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0] c.ctx, c.cancelFunc = context.WithCancel(context.Background()) c.subMap = make(map[string]*subscription) - c.topicsMgr = c.broker.topicsMgr } diff --git a/lib/topics/memtopics.go b/lib/topics/memtopics.go index 851c8f1..2627ef9 100644 --- a/lib/topics/memtopics.go +++ b/lib/topics/memtopics.go @@ -446,12 +446,12 @@ func nextTopicLevel(topic []byte) ([]byte, []byte, error) { s = stateSWC - case '$': - if i == 0 { - return nil, nil, fmt.Errorf("memtopics/nextTopicLevel: Cannot publish to $ topics") - } + // case '$': + // if i == 0 { + // return nil, nil, fmt.Errorf("memtopics/nextTopicLevel: Cannot publish to $ topics") + // } - s = stateSYS + // s = stateSYS default: if s == stateMWC || s == stateSWC {