diff --git a/broker/broker.go b/broker/broker.go index de1aa73..53518ff 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -617,7 +617,9 @@ func (b *Broker) removeClient(c *client) { func (b *Broker) PublishMessage(packet *packets.PublishPacket) { var subs []interface{} var qoss []byte + b.mu.Lock() err := b.topicsMgr.Subscribers([]byte(packet.TopicName), packet.Qos, &subs, &qoss) + b.mu.Unlock() if err != nil { log.Error("search sub client error, ", zap.Error(err)) return diff --git a/broker/client.go b/broker/client.go index 27ab9be..65b7ffa 100644 --- a/broker/client.go +++ b/broker/client.go @@ -213,7 +213,9 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { } } + c.mu.Lock() err := c.topicsMgr.Subscribers([]byte(packet.TopicName), packet.Qos, &c.subs, &c.qoss) + c.mu.Unlock() if err != nil { log.Error("Error retrieving subscribers list: ", zap.String("ClientID", c.info.clientID)) return