From ad7f4bc3f07acc11eb626317e17bf99577330117 Mon Sep 17 00:00:00 2001 From: turtletramp Date: Mon, 18 Jan 2021 07:45:38 +0100 Subject: [PATCH] bug-2 adding RWMutex to inflight map and update the map access to use the mutex (#108) --- broker/client.go | 21 +++++++++++++++------ broker/comm.go | 22 ++++++++++++++++++---- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/broker/client.go b/broker/client.go index 405265b..2a54ec2 100644 --- a/broker/client.go +++ b/broker/client.go @@ -3,7 +3,6 @@ package broker import ( "context" "errors" - "github.com/eapache/queue" "math/rand" "net" "reflect" @@ -12,6 +11,8 @@ import ( "sync" "time" + "github.com/eapache/queue" + "github.com/fhmq/hmq/broker/lib/sessions" "github.com/fhmq/hmq/broker/lib/topics" "github.com/fhmq/hmq/plugins/bridge" @@ -71,6 +72,7 @@ type client struct { awaitingRel map[uint16]int64 maxAwaitingRel int inflight map[uint16]*inflightElem + inflightMu sync.RWMutex mqueue *queue.Queue retryTimer *time.Timer retryTimerLock sync.Mutex @@ -205,18 +207,23 @@ func ProcessMessage(msg *Message) { c.ProcessPublish(packet) case *packets.PubackPacket: packet := ca.(*packets.PubackPacket) + c.inflightMu.Lock() if _, found := c.inflight[packet.MessageID]; found { delete(c.inflight, packet.MessageID) } else { log.Error("Duplicated PUBACK PacketId", zap.Uint16("MessageID", packet.MessageID)) } + c.inflightMu.Unlock() case *packets.PubrecPacket: packet := ca.(*packets.PubrecPacket) - if _, found := c.inflight[packet.MessageID]; found { - if c.inflight[packet.MessageID].status == Publish { - c.inflight[packet.MessageID].status = Pubrel - c.inflight[packet.MessageID].timestamp = time.Now().Unix() - } else if c.inflight[packet.MessageID].status == Pubrel { + c.inflightMu.RLock() + ielem, found := c.inflight[packet.MessageID] + c.inflightMu.RUnlock() + if found { + if ielem.status == Publish { + ielem.status = Pubrel + ielem.timestamp = time.Now().Unix() + } else if ielem.status == Pubrel { log.Error("Duplicated PUBREC PacketId", zap.Uint16("MessageID", packet.MessageID)) } } else { @@ -240,7 +247,9 @@ func ProcessMessage(msg *Message) { } case *packets.PubcompPacket: packet := ca.(*packets.PubcompPacket) + c.inflightMu.Lock() delete(c.inflight, packet.MessageID) + c.inflightMu.Unlock() case *packets.SubscribePacket: packet := ca.(*packets.SubscribePacket) c.ProcessSubscribe(packet) diff --git a/broker/comm.go b/broker/comm.go index 52c05f1..9ae2070 100644 --- a/broker/comm.go +++ b/broker/comm.go @@ -159,7 +159,9 @@ func publish(sub *subscription, packet *packets.PublishPacket) { log.Error("process message for psub error, ", zap.Error(err)) } case QosAtLeastOnce, QosExactlyOnce: + sub.client.inflightMu.Lock() sub.client.inflight[packet.MessageID] = &inflightElem{status: Publish, packet: packet, timestamp: time.Now().Unix()} + sub.client.inflightMu.Unlock() err := sub.client.WriterPacket(packet) if err != nil { log.Error("process message for psub error, ", zap.Error(err)) @@ -202,21 +204,33 @@ func (c *client) resetRetryTimer() { func (c *client) retryDelivery() { c.resetRetryTimer() - if c.conn == nil || len(c.inflight) == 0 { //Reset timer when client offline OR inflight is empty + c.inflightMu.RLock() + ilen := len(c.inflight) + if c.conn == nil || ilen == 0 { //Reset timer when client offline OR inflight is empty + c.inflightMu.RUnlock() return } - now := time.Now().Unix() + + // copy the to be retried elements out of the map to only hold the lock for a short time and use the new slice later to iterate + // through them + toRetryEle := make([]*inflightElem, 0, ilen) for _, infEle := range c.inflight { + toRetryEle = append(toRetryEle, infEle) + } + c.inflightMu.RUnlock() + now := time.Now().Unix() + + for _, infEle := range toRetryEle { age := now - infEle.timestamp if age >= retryInterval { if infEle.status == Publish { c.WriterPacket(infEle.packet) - c.inflight[infEle.packet.MessageID].timestamp = now + infEle.timestamp = now } else if infEle.status == Pubrel { pubrel := packets.NewControlPacket(packets.Pubrel).(*packets.PubrelPacket) pubrel.MessageID = infEle.packet.MessageID c.WriterPacket(pubrel) - c.inflight[infEle.packet.MessageID].timestamp = now + infEle.timestamp = now } } else { if age < 0 {