This commit is contained in:
chowyu08
2017-09-07 22:31:50 +08:00
parent 8c98346546
commit 7073e9b4ba
2 changed files with 23 additions and 15 deletions

View File

@@ -26,7 +26,7 @@ type Broker struct {
remotes sync.Map
sl *Sublist
rl *RetainList
queues map[string]Queue
queues map[string]*Queue
}
type Queue struct {
count int
@@ -39,7 +39,7 @@ func NewBroker(config *Config) (*Broker, error) {
config: config,
sl: NewSublist(),
rl: NewRetainList(),
queues: make(map[string]Queue),
queues: make(map[string]*Queue),
}
if b.config.TlsPort != "" {
tlsconfig, err := NewTLSConfig(b.config.TlsInfo)

View File

@@ -221,21 +221,29 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
}
}
idx, exist := b.queues[topic]
subinfo, exist := b.queues[topic]
qnum := 0
var qsub *subscription
if exist {
}
for _, sub := range r.qsubs {
if sub.client.typ == ROUTER {
if typ == ROUTER {
continue
idx := subinfo.index
for _, sub := range r.qsubs {
if sub.client.typ == ROUTER {
if typ == ROUTER {
continue
}
}
if idx <= qnum {
qsub = sub
break
}
if sub.client.typ == CLIENT {
qnum = qnum + 1
} else {
qnum = qnum + sub.client.rsubs[topic].num
}
}
if sub.client.typ == CLIENT {
qnum = qnum + 1
} else {
qnum = qnum + sub.client.rsubs[topic].num
}
subinfo.index = (idx + 1) % subinfo.count
}
if qsub != nil {
@@ -273,7 +281,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
if len(t) > 7 {
t = t[7:]
if _, exists := b.queues[topic]; !exists {
b.queues[topic] = Queue{
b.queues[topic] = &Queue{
count: 0,
index: 0,
}