This commit is contained in:
joy.zhou
2019-07-11 14:52:53 +08:00
parent 9067eb88b2
commit 3eea49cd42
2 changed files with 43 additions and 5 deletions

View File

@@ -61,7 +61,8 @@ Common Options:
"keyFile": "tls/server/key.pem"
},
"acl":true,
"aclConf":"conf/acl.conf"
"aclConf":"conf/acl.conf",
"plugins": ["authhttp","kafka"]
}
~~~
@@ -83,6 +84,17 @@ Common Options:
* Flexible ACL
* AuthHTTP Support
* Kafka Bridge Support
### QUEUE SUBSCRIBE
~~~
| Prefix | Examples |
| ------------- |---------------------------------|
| $queue/ | mosquitto_sub -t $queue/topic |
~~~
### Cluster
```bash
1, start router for hmq (https://github.com/fhmq/router.git)

View File

@@ -5,6 +5,7 @@ package broker
import (
"context"
"errors"
"math/rand"
"net"
"reflect"
"strings"
@@ -78,6 +79,7 @@ type route struct {
var (
DisconnectdPacket = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
r = rand.New(rand.NewSource(time.Now().UnixNano()))
)
func (c *client) init() {
@@ -237,7 +239,8 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
return
}
for _, sub := range c.subs {
var qsub []int
for i, sub := range c.subs {
s, ok := sub.(*subscription)
if ok {
if s.client.typ == ROUTER {
@@ -245,14 +248,28 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
continue
}
}
err := s.client.WriterPacket(packet)
if err != nil {
log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
if s.queue {
qsub = append(qsub, i)
} else {
err := s.client.WriterPacket(packet)
if err != nil {
log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
}
}
}
}
if len(qsub) > 0 {
idx := r.Intn(len(qsub))
sub := c.subs[qsub[idx]].(*subscription)
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("process message for qsub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
}
}
}
func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
@@ -290,10 +307,19 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
})
}
queue := strings.HasPrefix(topic, "$queue/")
if queue {
// topic = strings.TrimPrefix(topic, "$queue/")
if _, exists := b.queues[topic]; !exists {
b.queues[topic] = 0
}
}
sub := &subscription{
topic: t,
qos: qoss[i],
client: c,
queue: queue,
}
rqos, err := c.topicsMgr.Subscribe([]byte(topic), qoss[i], sub)