diff --git a/broker/client.go b/broker/client.go index a06d7f8..405265b 100644 --- a/broker/client.go +++ b/broker/client.go @@ -3,6 +3,7 @@ package broker import ( "context" "errors" + "github.com/eapache/queue" "math/rand" "net" "reflect" @@ -41,29 +42,52 @@ const ( Disconnected = 2 ) +const ( + awaitRelTimeout int64 = 20 + retryInterval int64 = 20 +) + var ( groupCompile = regexp.MustCompile(_GroupTopicRegexp) ) type client struct { - typ int - mu sync.Mutex - broker *Broker - conn net.Conn - info info - route route - status int - ctx context.Context - cancelFunc context.CancelFunc - session *sessions.Session - subMap map[string]*subscription - topicsMgr *topics.Manager - subs []interface{} - qoss []byte - rmsgs []*packets.PublishPacket - routeSubMap map[string]uint64 + typ int + mu sync.Mutex + broker *Broker + conn net.Conn + info info + route route + status int + ctx context.Context + cancelFunc context.CancelFunc + session *sessions.Session + subMap map[string]*subscription + topicsMgr *topics.Manager + subs []interface{} + qoss []byte + rmsgs []*packets.PublishPacket + routeSubMap map[string]uint64 + awaitingRel map[uint16]int64 + maxAwaitingRel int + inflight map[uint16]*inflightElem + mqueue *queue.Queue + retryTimer *time.Timer + retryTimerLock sync.Mutex } +type InflightStatus uint8 + +const ( + Publish InflightStatus = 0 + Pubrel InflightStatus = 1 +) + +type inflightElem struct { + status InflightStatus + packet *packets.PublishPacket + timestamp int64 +} type subscription struct { client *client topic string @@ -108,6 +132,9 @@ func (c *client) init() { c.subMap = make(map[string]*subscription) c.topicsMgr = c.broker.topicsMgr c.routeSubMap = make(map[string]uint64) + c.awaitingRel = make(map[uint16]int64) + c.inflight = make(map[uint16]*inflightElem) + c.mqueue = queue.New() } func (c *client) readLoop() { @@ -177,9 +204,43 @@ func ProcessMessage(msg *Message) { packet := ca.(*packets.PublishPacket) c.ProcessPublish(packet) case *packets.PubackPacket: + packet := ca.(*packets.PubackPacket) + if _, found := c.inflight[packet.MessageID]; found { + delete(c.inflight, packet.MessageID) + } else { + log.Error("Duplicated PUBACK PacketId", zap.Uint16("MessageID", packet.MessageID)) + } case *packets.PubrecPacket: + packet := ca.(*packets.PubrecPacket) + if _, found := c.inflight[packet.MessageID]; found { + if c.inflight[packet.MessageID].status == Publish { + c.inflight[packet.MessageID].status = Pubrel + c.inflight[packet.MessageID].timestamp = time.Now().Unix() + } else if c.inflight[packet.MessageID].status == Pubrel { + log.Error("Duplicated PUBREC PacketId", zap.Uint16("MessageID", packet.MessageID)) + } + } else { + log.Error("The PUBREC PacketId is not found.", zap.Uint16("MessageID", packet.MessageID)) + } + + pubrel := packets.NewControlPacket(packets.Pubrel).(*packets.PubrelPacket) + pubrel.MessageID = packet.MessageID + if err := c.WriterPacket(pubrel); err != nil { + log.Error("send pubrel error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + return + } case *packets.PubrelPacket: + packet := ca.(*packets.PubrelPacket) + c.pubRel(packet.MessageID) + pubcomp := packets.NewControlPacket(packets.Pubcomp).(*packets.PubcompPacket) + pubcomp.MessageID = packet.MessageID + if err := c.WriterPacket(pubcomp); err != nil { + log.Error("send pubcomp error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + return + } case *packets.PubcompPacket: + packet := ca.(*packets.PubcompPacket) + delete(c.inflight, packet.MessageID) case *packets.SubscribePacket: packet := ca.(*packets.SubscribePacket) c.ProcessSubscribe(packet) @@ -279,6 +340,17 @@ func (c *client) processClientPublish(packet *packets.PublishPacket) { } c.ProcessPublishMessage(packet) case QosExactlyOnce: + if err := c.registerPublishPacketId(packet.MessageID); err != nil { + return + } else { + pubrec := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket) + pubrec.MessageID = packet.MessageID + if err := c.WriterPacket(pubrec); err != nil { + log.Error("send pubrec error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + return + } + c.ProcessPublishMessage(packet) + } return default: log.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID)) @@ -684,3 +756,50 @@ func (c *client) WriterPacket(packet packets.ControlPacket) error { c.mu.Unlock() return err } + +func (c *client) registerPublishPacketId(packetId uint16) error { + if c.isAwaitingFull() { + log.Error("Dropped qos2 packet for too many awaiting_rel", zap.Uint16("id", packetId)) + return errors.New("DROPPED_QOS2_PACKET_FOR_TOO_MANY_AWAITING_REL") + } + + if _, found := c.awaitingRel[packetId]; found { + return errors.New("RC_PACKET_IDENTIFIER_IN_USE") + } + c.awaitingRel[packetId] = time.Now().Unix() + time.AfterFunc(time.Duration(awaitRelTimeout)*time.Second, c.expireAwaitingRel) + return nil +} + +func (c *client) isAwaitingFull() bool { + if c.maxAwaitingRel == 0 { + return false + } + if len(c.awaitingRel) < c.maxAwaitingRel { + return false + } + return true +} + +func (c *client) expireAwaitingRel() { + if len(c.awaitingRel) == 0 { + return + } + now := time.Now().Unix() + for packetId, Timestamp := range c.awaitingRel { + if now-Timestamp >= awaitRelTimeout { + log.Error("Dropped qos2 packet for await_rel_timeout", zap.Uint16("id", packetId)) + delete(c.awaitingRel, packetId) + } + } +} + +func (c *client) pubRel(packetId uint16) error { + if _, found := c.awaitingRel[packetId]; found { + delete(c.awaitingRel, packetId) + } else { + log.Error("The PUBREL PacketId is not found", zap.Uint16("id", packetId)) + return errors.New("RC_PACKET_IDENTIFIER_NOT_FOUND") + } + return nil +} diff --git a/broker/comm.go b/broker/comm.go index 17182b3..52c05f1 100644 --- a/broker/comm.go +++ b/broker/comm.go @@ -152,8 +152,78 @@ func publish(sub *subscription, packet *packets.PublishPacket) { // log.Error("process message for psub error, ", zap.Error(err)) // } - err := sub.client.WriterPacket(packet) - if err != nil { - log.Error("process message for psub error, ", zap.Error(err)) + switch packet.Qos { + case QosAtMostOnce: + err := sub.client.WriterPacket(packet) + if err != nil { + log.Error("process message for psub error, ", zap.Error(err)) + } + case QosAtLeastOnce, QosExactlyOnce: + sub.client.inflight[packet.MessageID] = &inflightElem{status: Publish, packet: packet, timestamp: time.Now().Unix()} + err := sub.client.WriterPacket(packet) + if err != nil { + log.Error("process message for psub error, ", zap.Error(err)) + } + sub.client.ensureRetryTimer() + default: + log.Error("publish with unknown qos", zap.String("ClientID", sub.client.info.clientID)) + return } } + +// timer for retry delivery +func (c *client) ensureRetryTimer(interval ...int64) { + if c.retryTimer != nil { + return + } + if len(interval) > 1 { + return + } + timerInterval := retryInterval + if len(interval) == 1 { + timerInterval = interval[0] + } + c.retryTimerLock.Lock() + c.retryTimer = time.AfterFunc(time.Duration(timerInterval)*time.Second, c.retryDelivery) + c.retryTimerLock.Unlock() + return +} + +func (c *client) resetRetryTimer() { + if c.retryTimer == nil { + return + } + // reset timer + c.retryTimerLock.Lock() + c.retryTimer = nil + c.retryTimerLock.Unlock() + +} + +func (c *client) retryDelivery() { + c.resetRetryTimer() + if c.conn == nil || len(c.inflight) == 0 { //Reset timer when client offline OR inflight is empty + return + } + now := time.Now().Unix() + for _, infEle := range c.inflight { + age := now - infEle.timestamp + if age >= retryInterval { + if infEle.status == Publish { + c.WriterPacket(infEle.packet) + c.inflight[infEle.packet.MessageID].timestamp = now + } else if infEle.status == Pubrel { + pubrel := packets.NewControlPacket(packets.Pubrel).(*packets.PubrelPacket) + pubrel.MessageID = infEle.packet.MessageID + c.WriterPacket(pubrel) + c.inflight[infEle.packet.MessageID].timestamp = now + } + } else { + if age < 0 { + age = 0 + } + c.ensureRetryTimer(retryInterval - age) + } + } + c.ensureRetryTimer() +} diff --git a/go.mod b/go.mod index 9a7f990..644cf5e 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Shopify/sarama v1.23.0 github.com/bitly/go-simplejson v0.5.0 github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect + github.com/eapache/queue v1.1.0 github.com/eclipse/paho.mqtt.golang v1.2.0 github.com/gin-gonic/gin v1.4.0 github.com/golang/protobuf v1.3.2 // indirect