From fcfba55567cdab11020461e379281c3f535ab9cb Mon Sep 17 00:00:00 2001 From: joyz Date: Tue, 25 Dec 2018 10:49:30 +0800 Subject: [PATCH] add online/offline notification --- broker/broker.go | 12 ++++++++++++ broker/client.go | 5 ++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/broker/broker.go b/broker/broker.go index fd543e2..923dae8 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -4,6 +4,7 @@ package broker import ( "crypto/tls" + "fmt" "net" "net/http" "runtime/debug" @@ -368,6 +369,8 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) { } } b.clients.Store(cid, c) + + b.OnlineOfflineNotification(cid, true) case ROUTER: old, exist = b.routes.Load(cid) if exist { @@ -638,3 +641,12 @@ func (b *Broker) BroadcastUnSubscribe(subs map[string]*subscription) { b.BroadcastSubOrUnsubMessage(unsub) } } + +func (b *Broker) OnlineOfflineNotification(clientID string, online bool) { + + packet := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket) + packet.Qos = 0 + packet.Payload = []byte(fmt.Sprintf(`{"clientID":"%s","online":"%v","timestamp":"%s"}`, clientID, online, time.Now().Format(time.RFC3339))) + + b.PublishMessage(packet) +} diff --git a/broker/client.go b/broker/client.go index 237e505..6efb129 100644 --- a/broker/client.go +++ b/broker/client.go @@ -87,6 +87,7 @@ func (c *client) init() { c.info.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0] c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0] c.ctx, c.cancelFunc = context.WithCancel(context.Background()) + c.subMap = make(map[string]*subscription) c.topicsMgr = c.broker.topicsMgr } @@ -237,7 +238,6 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { continue } } - err := s.client.WriterPacket(packet) if err != nil { log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) @@ -383,7 +383,10 @@ func (c *client) Close() { if c.typ == CLIENT { b.BroadcastUnSubscribe(subs) + //offline notification + b.OnlineOfflineNotification(c.info.clientID, false) } + if c.info.willMsg != nil { b.PublishMessage(c.info.willMsg) }