mirror of
https://github.com/fhmq/hmq.git
synced 2026-04-26 11:38:33 +00:00
qos 1
This commit is contained in:
@@ -151,11 +151,28 @@ func (c *client) ProcessConnAck(packet *packets.ConnackPacket) {
|
||||
func (c *client) ProcessPublish(packet *packets.PublishPacket) {
|
||||
topic := packet.TopicName
|
||||
|
||||
if c.typ != CLIENT || !c.CheckTopicAuth(PUB, topic) {
|
||||
if !c.CheckTopicAuth(PUB, topic) {
|
||||
log.Error("Pub Topics Auth failed, ", topic)
|
||||
return
|
||||
}
|
||||
c.ProcessPublishMessage(packet)
|
||||
|
||||
switch packet.Qos {
|
||||
case QosAtMostOnce:
|
||||
c.ProcessPublishMessage(packet)
|
||||
case QosAtLeastOnce:
|
||||
puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
|
||||
puback.MessageID = packet.MessageID
|
||||
if err := c.WriterPacket(puback); err != nil {
|
||||
log.Error("send puback error, ", err)
|
||||
return
|
||||
}
|
||||
c.ProcessPublishMessage(packet)
|
||||
case QosExactlyOnce:
|
||||
return
|
||||
default:
|
||||
log.Error("publish with unknown qos")
|
||||
return
|
||||
}
|
||||
if packet.Retain {
|
||||
if b := c.broker; b != nil {
|
||||
err := b.rl.Insert(topic, packet)
|
||||
@@ -248,13 +265,12 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
|
||||
for i, topic := range topics {
|
||||
t := topic
|
||||
//check topic auth for client
|
||||
if c.typ == CLIENT {
|
||||
if !c.CheckTopicAuth(SUB, topic) {
|
||||
log.Error("CheckSubAuth failed")
|
||||
retcodes = append(retcodes, QosFailure)
|
||||
continue
|
||||
}
|
||||
if !c.CheckTopicAuth(SUB, topic) {
|
||||
log.Error("Sub topic Auth failed: ", topic)
|
||||
retcodes = append(retcodes, QosFailure)
|
||||
continue
|
||||
}
|
||||
|
||||
if _, exist := c.subs[topic]; !exist {
|
||||
queue := false
|
||||
if strings.HasPrefix(topic, "$queue/") {
|
||||
|
||||
Reference in New Issue
Block a user