Added in detailed conn client logs

Created new data types to store the last time a message was received from a device along with publishing the last will topic, keepalive time, and credentials over the /connections topic. These were mirrored also over the REST API for synchronous stateful services starting up from a crashed k8s pod or other usecases. Start by subscribing to /connections/+ and then GET /api/v1/connections to get the open connections, if a device connects while you are setting up your state, your messageHandler will handle the setup since it has the same information. This information has been published over  for devices you don't have any control over and for relay purposes. You can take all of the device information and now create a faux client emulating your downstream device, this may sound strange; but I have a usecase for it, a lot of cheap chinese IoT's were not designed for mass production and we have to fix their messages in the cloud before relaying them to our other legacy servers
This commit is contained in:
Scott Joseph Spitler II
2023-12-11 02:30:31 -05:00
parent 3aea177ea8
commit 4f98faeefc
4 changed files with 94 additions and 8 deletions

4
.gitignore vendored
View File

@@ -7,3 +7,7 @@ log/*
.vscode/settings.json
.pre-commit-config.yaml
hmq.exe
*.sw*
*.swo
*.swp
*.swn

View File

@@ -7,6 +7,7 @@ import (
"net/http"
"sync"
"time"
encJson "encoding/json"
"github.com/fhmq/hmq/broker/lib/sessions"
"github.com/fhmq/hmq/broker/lib/topics"
@@ -404,8 +405,19 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
}
}
b.clients.Store(cid, c)
pubInfo := Info{
ClientID: info.clientID,
Username: info.username,
Password: info.password,
Keepalive: info.keepalive,
WillMsg: &PubPacket{
TopicName: info.willMsg.TopicName,
Payload: info.willMsg.Payload,
},
}
b.OnlineOfflineNotification(cid, true)
b.OnlineOfflineNotification(pubInfo, true, c.lastMsgTime)
{
b.Publish(&bridge.Elements{
ClientID: msg.ClientIdentifier,
@@ -695,11 +707,33 @@ func (b *Broker) BroadcastUnSubscribe(topicsToUnSubscribeFrom []string) {
b.BroadcastSubOrUnsubMessage(unsub)
}
func (b *Broker) OnlineOfflineNotification(clientID string, online bool) {
type OnlineOfflineMsg struct {
ClientID string `json:"clientID"`
Online bool `json:"online"`
Timestamp string `json:"timestamp"`
ClientInfo Info `json:"info"`
LastMsgTime int64 `json:"lastMsg"`
}
func (b *Broker) OnlineOfflineNotification(info Info, online bool, lastMsg int64) {
packet := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
packet.TopicName = "$SYS/broker/connection/clients/" + clientID
packet.TopicName = "$SYS/broker/connection/clients/" + info.ClientID
packet.Qos = 0
packet.Payload = []byte(fmt.Sprintf(`{"clientID":"%s","online":%v,"timestamp":"%s"}`, clientID, online, time.Now().UTC().Format(time.RFC3339)))
msg := OnlineOfflineMsg{
ClientID: info.ClientID,
Online: online,
Timestamp: time.Now().UTC().Format(time.RFC3339),
ClientInfo: info,
LastMsgTime: lastMsg,
}
if b, err := encJson.Marshal(msg); err != nil {
//This is a TERRIBLE situation, falling back to legacy format to not break API Contract
packet.Payload = []byte(fmt.Sprintf(`{"clientID":"%s","online":%v,"timestamp":"%s"}`, info.ClientID, online, time.Now().UTC().Format(time.RFC3339)))
} else {
packet.Payload = b
}
b.PublishMessage(packet)
}

View File

@@ -79,6 +79,7 @@ type client struct {
mqueue *queue.Queue
retryTimer *time.Timer
retryTimerLock sync.Mutex
lastMsgTime int64
}
type InflightStatus uint8
@@ -111,6 +112,19 @@ type info struct {
remoteIP string
}
type PubPacket struct {
TopicName string `json:"topicName"`
Payload []byte `json:"payload"`
}
type Info struct {
ClientID string `json:"clientId"`
Username string `json:"username"`
Password []byte `json:"password"`
Keepalive uint16 `json:"keepalive"`
WillMsg *PubPacket `json:"willMsg"`
}
type route struct {
remoteID string
remoteUrl string
@@ -122,6 +136,7 @@ var (
)
func (c *client) init() {
c.lastMsgTime = time.Now().Unix() //mark the connection packet time as last time messaged
c.status = Connected
c.info.localIP, _, _ = net.SplitHostPort(c.conn.LocalAddr().String())
remoteAddr := c.conn.RemoteAddr()
@@ -185,6 +200,8 @@ func (c *client) readLoop() {
if _, isDisconnect := packet.(*packets.DisconnectPacket); isDisconnect {
c.info.willMsg = nil
c.cancelFunc()
} else {
c.lastMsgTime = time.Now().Unix()
}
msg := &Message{
@@ -842,8 +859,18 @@ func (c *client) Close() {
if c.typ == CLIENT {
b.BroadcastUnSubscribe(unSubTopics)
pubInfo := Info{
ClientID: c.info.clientID,
Username: c.info.username,
Password: c.info.password,
Keepalive: c.info.keepalive,
WillMsg: &PubPacket{
TopicName: c.info.willMsg.TopicName,
Payload: c.info.willMsg.Payload,
},
}
//offline notification
b.OnlineOfflineNotification(c.info.clientID, false)
b.OnlineOfflineNotification(pubInfo, false, c.lastMsgTime)
}
if c.info.willMsg != nil {

View File

@@ -8,9 +8,14 @@ const (
CONNECTIONS = "api/v1/connections"
)
type ConnClient struct {
Info `json:"info"`
LastMsgTime int64 `json:"lastMsg"`
}
type resp struct {
Code int `json:"code,omitempty"`
Clients []string `json:"clients,omitempty"`
Clients []ConnClient `json:"clients,omitempty"`
}
func InitHTTPMoniter(b *Broker) {
@@ -29,9 +34,25 @@ func InitHTTPMoniter(b *Broker) {
c.JSON(200, &r)
})
router.GET(CONNECTIONS, func(c *gin.Context) {
conns := make([]string, 0)
conns := make([]ConnClient, 0)
b.clients.Range(func (k, v interface{}) bool {
conns = append(conns, v.(*client).info.clientID)
cl, _ := v.(*client)
msg := ConnClient{
Info: Info{
ClientID: cl.info.clientID,
Username: cl.info.username,
Password: cl.info.password,
Keepalive: cl.info.keepalive,
WillMsg: &PubPacket{
TopicName: cl.info.willMsg.TopicName,
Payload: cl.info.willMsg.Payload,
},
},
LastMsgTime: cl.lastMsgTime,
}
conns = append(conns, msg)
return true
})
r := resp{Clients: conns}