mirror of
https://github.com/fhmq/hmq.git
synced 2026-05-06 07:35:32 +00:00
add online/offline notification
This commit is contained in:
@@ -4,6 +4,7 @@ package broker
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
@@ -368,6 +369,8 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
b.clients.Store(cid, c)
|
b.clients.Store(cid, c)
|
||||||
|
|
||||||
|
b.OnlineOfflineNotification(cid, true)
|
||||||
case ROUTER:
|
case ROUTER:
|
||||||
old, exist = b.routes.Load(cid)
|
old, exist = b.routes.Load(cid)
|
||||||
if exist {
|
if exist {
|
||||||
@@ -638,3 +641,12 @@ func (b *Broker) BroadcastUnSubscribe(subs map[string]*subscription) {
|
|||||||
b.BroadcastSubOrUnsubMessage(unsub)
|
b.BroadcastSubOrUnsubMessage(unsub)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Broker) OnlineOfflineNotification(clientID string, online bool) {
|
||||||
|
|
||||||
|
packet := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
|
||||||
|
packet.Qos = 0
|
||||||
|
packet.Payload = []byte(fmt.Sprintf(`{"clientID":"%s","online":"%v","timestamp":"%s"}`, clientID, online, time.Now().Format(time.RFC3339)))
|
||||||
|
|
||||||
|
b.PublishMessage(packet)
|
||||||
|
}
|
||||||
|
|||||||
+4
-1
@@ -87,6 +87,7 @@ func (c *client) init() {
|
|||||||
c.info.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0]
|
c.info.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0]
|
||||||
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
|
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
|
||||||
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
|
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
|
||||||
|
c.subMap = make(map[string]*subscription)
|
||||||
|
|
||||||
c.topicsMgr = c.broker.topicsMgr
|
c.topicsMgr = c.broker.topicsMgr
|
||||||
}
|
}
|
||||||
@@ -237,7 +238,6 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := s.client.WriterPacket(packet)
|
err := s.client.WriterPacket(packet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||||
@@ -383,7 +383,10 @@ func (c *client) Close() {
|
|||||||
|
|
||||||
if c.typ == CLIENT {
|
if c.typ == CLIENT {
|
||||||
b.BroadcastUnSubscribe(subs)
|
b.BroadcastUnSubscribe(subs)
|
||||||
|
//offline notification
|
||||||
|
b.OnlineOfflineNotification(c.info.clientID, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.info.willMsg != nil {
|
if c.info.willMsg != nil {
|
||||||
b.PublishMessage(c.info.willMsg)
|
b.PublishMessage(c.info.willMsg)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user