From 3eea49cd42fc9cf35b196304a3ddcf95a42ee9d4 Mon Sep 17 00:00:00 2001 From: "joy.zhou" Date: Thu, 11 Jul 2019 14:52:53 +0800 Subject: [PATCH] add acl --- README.md | 14 +++++++++++++- broker/client.go | 34 ++++++++++++++++++++++++++++++---- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index ce5003c..eb1c0bb 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,8 @@ Common Options: "keyFile": "tls/server/key.pem" }, "acl":true, - "aclConf":"conf/acl.conf" + "aclConf":"conf/acl.conf", + "plugins": ["authhttp","kafka"] } ~~~ @@ -83,6 +84,17 @@ Common Options: * Flexible ACL +* AuthHTTP Support + +* Kafka Bridge Support + +### QUEUE SUBSCRIBE +~~~ +| Prefix | Examples | +| ------------- |---------------------------------| +| $queue/ | mosquitto_sub -t ‘$queue/topic’ | +~~~ + ### Cluster ```bash 1, start router for hmq (https://github.com/fhmq/router.git) diff --git a/broker/client.go b/broker/client.go index 35f36bf..376ecfd 100644 --- a/broker/client.go +++ b/broker/client.go @@ -5,6 +5,7 @@ package broker import ( "context" "errors" + "math/rand" "net" "reflect" "strings" @@ -78,6 +79,7 @@ type route struct { var ( DisconnectdPacket = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket) + r = rand.New(rand.NewSource(time.Now().UnixNano())) ) func (c *client) init() { @@ -237,7 +239,8 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { return } - for _, sub := range c.subs { + var qsub []int + for i, sub := range c.subs { s, ok := sub.(*subscription) if ok { if s.client.typ == ROUTER { @@ -245,14 +248,28 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { continue } } - err := s.client.WriterPacket(packet) - if err != nil { - log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + if s.queue { + qsub = append(qsub, i) + } else { + err := s.client.WriterPacket(packet) + if err != nil { + log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + } } + } } + if len(qsub) > 0 { + idx := r.Intn(len(qsub)) + sub := c.subs[qsub[idx]].(*subscription) + err := sub.client.WriterPacket(packet) + if err != nil { + log.Error("process message for qsub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + } + } + } func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { @@ -290,10 +307,19 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { }) } + queue := strings.HasPrefix(topic, "$queue/") + if queue { + // topic = strings.TrimPrefix(topic, "$queue/") + if _, exists := b.queues[topic]; !exists { + b.queues[topic] = 0 + } + } + sub := &subscription{ topic: t, qos: qoss[i], client: c, + queue: queue, } rqos, err := c.topicsMgr.Subscribe([]byte(topic), qoss[i], sub)