This commit is contained in:
zhouyuyan
2017-09-08 10:44:01 +08:00
parent 7f45bd6bc9
commit 258912b33a
2 changed files with 51 additions and 24 deletions

View File

@@ -26,11 +26,7 @@ type Broker struct {
remotes sync.Map
sl *Sublist
rl *RetainList
queues map[string]*Queue
}
type Queue struct {
count int
index int
queues map[string]int
}
func NewBroker(config *Config) (*Broker, error) {
@@ -39,7 +35,7 @@ func NewBroker(config *Config) (*Broker, error) {
config: config,
sl: NewSublist(),
rl: NewRetainList(),
queues: make(map[string]*Queue),
queues: make(map[string]int),
}
if b.config.TlsPort != "" {
tlsconfig, err := NewTLSConfig(b.config.TlsInfo)

View File

@@ -2,7 +2,6 @@ package broker
import (
"hmq/packets"
"math/rand"
"net"
"strings"
"sync"
@@ -222,30 +221,59 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
}
}
idx := GenerateRangeNum(0, len(r.qsubs))
for {
sub := r.qsubs[idx]
if sub.client.typ == ROUTER {
if typ == ROUTER {
idx = GenerateRangeNum(0, len(r.qsubs))
continue
pre := 0
now := -1
t := "$queue/" + topic
cnt, exist := b.queues[t]
if exist {
log.Info("queue index : ", cnt)
for _, sub := range r.qsubs {
if sub.client.typ == ROUTER {
if c.typ == ROUTER {
continue
}
}
}
if sub != nil {
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("process will message for qsub error, ", err)
if c.typ == CLIENT {
now = now + 1
} else {
now = now + sub.client.rsubs[t].num
}
break
if cnt >= pre && cnt <= now {
if sub != nil {
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("send publish error, ", err)
}
}
break
}
pre = now
}
}
length := getQueueSubscribeNum(r.qsubs)
if length > 0 {
b.queues[t] = (b.queues[t] + 1) % length
}
}
func GenerateRangeNum(min, max int) int {
// rand.Seed(time.Now().Unix())
randNum := rand.Intn(max-min) + min
return randNum
func getQueueSubscribeNum(qsubs []*subscription) int {
topic := "$queue/"
if len(qsubs) < 1 {
return 0
} else {
topic = topic + qsubs[0].topic
}
num := 0
for _, sub := range qsubs {
if sub.client.typ == CLIENT {
num = num + 1
} else {
num = num + sub.client.rsubs[topic].num
}
}
return num
}
func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
@@ -273,6 +301,9 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
if queue {
if len(t) > 7 {
t = t[7:]
if _, exists := b.queues[topic]; !exists {
b.queues[topic] = 0
}
} else {
retcodes = append(retcodes, QosFailure)
continue