diff --git a/Dcokerfile b/Dcokerfile index d824655..f9ffa7a 100644 --- a/Dcokerfile +++ b/Dcokerfile @@ -5,7 +5,7 @@ COPY tls /tls COPY conf /conf EXPOSE 1883 -EXPOSE 1888 +EXPOSE 8883 EXPOSE 1993 CMD ["/hmq"] \ No newline at end of file diff --git a/README.md b/README.md index 0b848bc..2135588 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ $ go run main.go ### Features and Future -* Supports QOS 0 +* Supports QOS 0 and 1 * Cluster Support diff --git a/broker/auth.go b/broker/auth.go index 4fb82f9..dbd64ab 100644 --- a/broker/auth.go +++ b/broker/auth.go @@ -14,7 +14,7 @@ const ( ) func (c *client) CheckTopicAuth(typ int, topic string) bool { - if !c.broker.config.Acl { + if c.typ != CLIENT || !c.broker.config.Acl { return true } if strings.HasPrefix(topic, "$queue/") { diff --git a/broker/client.go b/broker/client.go index 175e2f7..53a93f6 100644 --- a/broker/client.go +++ b/broker/client.go @@ -151,11 +151,28 @@ func (c *client) ProcessConnAck(packet *packets.ConnackPacket) { func (c *client) ProcessPublish(packet *packets.PublishPacket) { topic := packet.TopicName - if c.typ != CLIENT || !c.CheckTopicAuth(PUB, topic) { + if !c.CheckTopicAuth(PUB, topic) { + log.Error("Pub Topics Auth failed, ", topic) return } - c.ProcessPublishMessage(packet) + switch packet.Qos { + case QosAtMostOnce: + c.ProcessPublishMessage(packet) + case QosAtLeastOnce: + puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket) + puback.MessageID = packet.MessageID + if err := c.WriterPacket(puback); err != nil { + log.Error("send puback error, ", err) + return + } + c.ProcessPublishMessage(packet) + case QosExactlyOnce: + return + default: + log.Error("publish with unknown qos") + return + } if packet.Retain { if b := c.broker; b != nil { err := b.rl.Insert(topic, packet) @@ -248,13 +265,12 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { for i, topic := range topics { t := topic //check topic auth for client - if c.typ == CLIENT { - if !c.CheckTopicAuth(SUB, topic) { - log.Error("CheckSubAuth failed") - retcodes = append(retcodes, QosFailure) - continue - } + if !c.CheckTopicAuth(SUB, topic) { + log.Error("Sub topic Auth failed: ", topic) + retcodes = append(retcodes, QosFailure) + continue } + if _, exist := c.subs[topic]; !exist { queue := false if strings.HasPrefix(topic, "$queue/") {