mirror of
https://github.com/fhmq/hmq.git
synced 2026-04-24 10:38:34 +00:00
b1
This commit is contained in:
@@ -298,7 +298,52 @@ func (c *client) ProcessSubscribe(buf []byte) {
|
||||
}
|
||||
|
||||
func (c *client) ProcessUnSubscribe(buf []byte) {
|
||||
srv := c.broker
|
||||
if srv == nil {
|
||||
return
|
||||
}
|
||||
|
||||
unsub, err := DecodeUnsubscribeMessage(buf)
|
||||
if err != nil {
|
||||
log.Error("Decode UnSubscribe Message error: ", err)
|
||||
c.Close()
|
||||
return
|
||||
}
|
||||
topics := unsub.Topics()
|
||||
|
||||
for _, t := range topics {
|
||||
var sub *subscription
|
||||
ok := false
|
||||
|
||||
if sub, ok = c.subs[string(t)]; ok {
|
||||
go c.unsubscribe(sub)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
resp := message.NewUnsubackMessage()
|
||||
resp.SetPacketId(unsub.PacketId())
|
||||
|
||||
err1 := c.writeMessage(resp)
|
||||
if err1 != nil {
|
||||
log.Error("send ubsuback error, ", err1)
|
||||
return
|
||||
}
|
||||
// //process ubsubscribe message
|
||||
// if typ == CLIENT {
|
||||
// c.srv.BroadcastUnSubscribeMessage(msg)
|
||||
// }
|
||||
}
|
||||
|
||||
func (c *client) unsubscribe(sub *subscription) {
|
||||
|
||||
c.mu.Lock()
|
||||
delete(c.subs, string(sub.topic))
|
||||
c.mu.Unlock()
|
||||
|
||||
if c.broker != nil {
|
||||
c.broker.sl.Remove(sub)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) ProcessPing(buf []byte) {
|
||||
|
||||
Reference in New Issue
Block a user