mirror of
https://github.com/fhmq/hmq.git
synced 2026-05-06 07:35:32 +00:00
Feature qos1&qos2 (#99)
* client publish qos2 * server dispatch qos1&qos2 * Use at most one timer for each client * Use at most one timer for each client
This commit is contained in:
+73
-3
@@ -152,8 +152,78 @@ func publish(sub *subscription, packet *packets.PublishPacket) {
|
||||
// log.Error("process message for psub error, ", zap.Error(err))
|
||||
// }
|
||||
|
||||
err := sub.client.WriterPacket(packet)
|
||||
if err != nil {
|
||||
log.Error("process message for psub error, ", zap.Error(err))
|
||||
switch packet.Qos {
|
||||
case QosAtMostOnce:
|
||||
err := sub.client.WriterPacket(packet)
|
||||
if err != nil {
|
||||
log.Error("process message for psub error, ", zap.Error(err))
|
||||
}
|
||||
case QosAtLeastOnce, QosExactlyOnce:
|
||||
sub.client.inflight[packet.MessageID] = &inflightElem{status: Publish, packet: packet, timestamp: time.Now().Unix()}
|
||||
err := sub.client.WriterPacket(packet)
|
||||
if err != nil {
|
||||
log.Error("process message for psub error, ", zap.Error(err))
|
||||
}
|
||||
sub.client.ensureRetryTimer()
|
||||
default:
|
||||
log.Error("publish with unknown qos", zap.String("ClientID", sub.client.info.clientID))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// timer for retry delivery
|
||||
func (c *client) ensureRetryTimer(interval ...int64) {
|
||||
if c.retryTimer != nil {
|
||||
return
|
||||
}
|
||||
if len(interval) > 1 {
|
||||
return
|
||||
}
|
||||
timerInterval := retryInterval
|
||||
if len(interval) == 1 {
|
||||
timerInterval = interval[0]
|
||||
}
|
||||
c.retryTimerLock.Lock()
|
||||
c.retryTimer = time.AfterFunc(time.Duration(timerInterval)*time.Second, c.retryDelivery)
|
||||
c.retryTimerLock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (c *client) resetRetryTimer() {
|
||||
if c.retryTimer == nil {
|
||||
return
|
||||
}
|
||||
// reset timer
|
||||
c.retryTimerLock.Lock()
|
||||
c.retryTimer = nil
|
||||
c.retryTimerLock.Unlock()
|
||||
|
||||
}
|
||||
|
||||
func (c *client) retryDelivery() {
|
||||
c.resetRetryTimer()
|
||||
if c.conn == nil || len(c.inflight) == 0 { //Reset timer when client offline OR inflight is empty
|
||||
return
|
||||
}
|
||||
now := time.Now().Unix()
|
||||
for _, infEle := range c.inflight {
|
||||
age := now - infEle.timestamp
|
||||
if age >= retryInterval {
|
||||
if infEle.status == Publish {
|
||||
c.WriterPacket(infEle.packet)
|
||||
c.inflight[infEle.packet.MessageID].timestamp = now
|
||||
} else if infEle.status == Pubrel {
|
||||
pubrel := packets.NewControlPacket(packets.Pubrel).(*packets.PubrelPacket)
|
||||
pubrel.MessageID = infEle.packet.MessageID
|
||||
c.WriterPacket(pubrel)
|
||||
c.inflight[infEle.packet.MessageID].timestamp = now
|
||||
}
|
||||
} else {
|
||||
if age < 0 {
|
||||
age = 0
|
||||
}
|
||||
c.ensureRetryTimer(retryInterval - age)
|
||||
}
|
||||
}
|
||||
c.ensureRetryTimer()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user