This commit is contained in:
zhouyuyan
2017-09-07 17:03:33 +08:00
parent 4300a32f6b
commit 8c98346546
3 changed files with 19 additions and 28 deletions

View File

@@ -26,7 +26,11 @@ type Broker struct {
remotes sync.Map
sl *Sublist
rl *RetainList
queues map[string]int
queues map[string]Queue
}
type Queue struct {
count int
index int
}
func NewBroker(config *Config) (*Broker, error) {
@@ -35,7 +39,7 @@ func NewBroker(config *Config) (*Broker, error) {
config: config,
sl: NewSublist(),
rl: NewRetainList(),
queues: make(map[string]int),
queues: make(map[string]Queue),
}
if b.config.TlsPort != "" {
tlsconfig, err := NewTLSConfig(b.config.TlsInfo)

View File

@@ -221,7 +221,10 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
}
}
qnum := 0
idx, exist := b.queues[topic]
if exist {
}
for _, sub := range r.qsubs {
if sub.client.typ == ROUTER {
if typ == ROUTER {
@@ -234,31 +237,6 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
qnum = qnum + sub.client.rsubs[topic].num
}
}
var qsub *subscription
min := 0
max := 0
if cnt, exist := b.queues[topic]; exist {
for i, sub := range r.qsubs {
if sub.client.typ == ROUTER {
if typ == ROUTER {
continue
}
}
if sub.client.typ == CLIENT {
max = max + 1
} else {
max = max + sub.client.rsubs[topic].num
}
if cnt >= min && cnt <= max {
qsub = r.qsubs[i]
break
}
min = max
}
}
b.queues[topic] = (b.queues[topic] + 1) % qnum
if qsub != nil {
err := qsub.client.WriterPacket(packet)
@@ -294,6 +272,12 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
if queue {
if len(t) > 7 {
t = t[7:]
if _, exists := b.queues[topic]; !exists {
b.queues[topic] = Queue{
count: 0,
index: 0,
}
}
} else {
retcodes = append(retcodes, QosFailure)
continue
@@ -309,6 +293,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
case CLIENT:
if _, exist := c.subs[topic]; !exist {
c.subs[topic] = sub
b.queues[topic].count++
} else {
//if exist ,check whether qos change
c.subs[topic].qos = qoss[i]
@@ -319,9 +304,11 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
if subinfo, exist := c.rsubs[topic]; !exist {
sinfo := &subInfo{sub: sub, num: 1}
c.rsubs[topic] = sinfo
b.queues[topic].count++
} else {
subinfo.num = subinfo.num + 1
retcodes = append(retcodes, qoss[i])
b.queues[topic].count++
continue
}
}

BIN
hmq

Binary file not shown.