This commit is contained in:
joyz
2018-12-25 17:34:22 +08:00
parent fcfba55567
commit a75adfdba4
4 changed files with 21 additions and 10 deletions
+8
View File
@@ -145,6 +145,14 @@ Client -> | Rule1 | --nomatch--> | Rule2 | --nomatch--> | Rule3 | -->
allow | deny allow | deny allow | deny allow | deny allow | deny allow | deny
~~~ ~~~
### Online/Offline Notification
```bash
topic:
$SYS/broker/connection/clients/<clientID>
payload:
{"clientID":"client001","online":true/false,"timestamp":"2018-10-25T09:32:32Z"}
```
## Performance ## Performance
* High throughput * High throughput
+8 -4
View File
@@ -617,14 +617,18 @@ func (b *Broker) removeClient(c *client) {
func (b *Broker) PublishMessage(packet *packets.PublishPacket) { func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
var subs []interface{} var subs []interface{}
var qoss []byte var qoss []byte
b.topicsMgr.Subscribers([]byte(packet.TopicName), packet.Qos, &subs, &qoss) err := b.topicsMgr.Subscribers([]byte(packet.TopicName), packet.Qos, &subs, &qoss)
if err != nil {
log.Error("search sub client error, ", zap.Error(err))
return
}
for _, sub := range subs { for _, sub := range subs {
s, ok := sub.(*subscription) s, ok := sub.(*subscription)
if ok { if ok {
err := s.client.WriterPacket(packet) err := s.client.WriterPacket(packet)
if err != nil { if err != nil {
log.Error("process message for psub error, ", zap.Error(err)) log.Error("write message error, ", zap.Error(err))
} }
} }
} }
@@ -643,10 +647,10 @@ func (b *Broker) BroadcastUnSubscribe(subs map[string]*subscription) {
} }
func (b *Broker) OnlineOfflineNotification(clientID string, online bool) { func (b *Broker) OnlineOfflineNotification(clientID string, online bool) {
packet := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket) packet := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
packet.TopicName = "$SYS/broker/connection/clients/" + clientID
packet.Qos = 0 packet.Qos = 0
packet.Payload = []byte(fmt.Sprintf(`{"clientID":"%s","online":"%v","timestamp":"%s"}`, clientID, online, time.Now().Format(time.RFC3339))) packet.Payload = []byte(fmt.Sprintf(`{"clientID":"%s","online":%v,"timestamp":"%s"}`, clientID, online, time.Now().UTC().Format(time.RFC3339)))
b.PublishMessage(packet) b.PublishMessage(packet)
} }
-1
View File
@@ -88,7 +88,6 @@ func (c *client) init() {
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0] c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
c.ctx, c.cancelFunc = context.WithCancel(context.Background()) c.ctx, c.cancelFunc = context.WithCancel(context.Background())
c.subMap = make(map[string]*subscription) c.subMap = make(map[string]*subscription)
c.topicsMgr = c.broker.topicsMgr c.topicsMgr = c.broker.topicsMgr
} }
+5 -5
View File
@@ -446,12 +446,12 @@ func nextTopicLevel(topic []byte) ([]byte, []byte, error) {
s = stateSWC s = stateSWC
case '$': // case '$':
if i == 0 { // if i == 0 {
return nil, nil, fmt.Errorf("memtopics/nextTopicLevel: Cannot publish to $ topics") // return nil, nil, fmt.Errorf("memtopics/nextTopicLevel: Cannot publish to $ topics")
} // }
s = stateSYS // s = stateSYS
default: default:
if s == stateMWC || s == stateSWC { if s == stateMWC || s == stateSWC {