This commit is contained in:
zhouyuyan
2017-09-07 15:31:38 +08:00
parent ae1af54c6e
commit 4300a32f6b
2 changed files with 45 additions and 69 deletions

View File

@@ -407,7 +407,7 @@ func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
topic := packet.TopicName
r := b.sl.Match(topic)
// log.Info("psubs num: ", len(r.psubs))
if len(r.qsubs) == 0 && len(r.psubs) == 0 {
if len(r.psubs) == 0 {
return
}
@@ -419,21 +419,6 @@ func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
}
}
}
for i, sub := range r.qsubs {
// s.qmu.Lock()
if cnt, exist := b.queues[string(sub.topic)]; exist && i == cnt {
if sub != nil {
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("process will message for qsub error, ", err)
}
}
b.queues[topic] = (b.queues[topic] + 1) % len(r.qsubs)
break
}
// s.qmu.Unlock()
}
}
func (b *Broker) BroadcastUnSubscribe(subs map[string]*subscription) {

View File

@@ -117,32 +117,20 @@ func ProcessMessage(msg *Message) {
switch ca.(type) {
case *packets.ConnackPacket:
// log.Info("Recv conack message..........")
packet := ca.(*packets.ConnackPacket)
c.ProcessConnAck(packet)
case *packets.ConnectPacket:
// log.Info("Recv connect message..........")
packet := ca.(*packets.ConnectPacket)
c.ProcessConnect(packet)
case *packets.PublishPacket:
// log.Info("Recv publish message..........")
packet := ca.(*packets.PublishPacket)
c.ProcessPublish(packet)
case *packets.PubackPacket:
//log.Info("Recv publish ack message..........")
packet := ca.(*packets.PubackPacket)
c.ProcessPubAck(packet)
case *packets.PubrecPacket:
//log.Info("Recv publish rec message..........")
packet := ca.(*packets.PubrecPacket)
c.ProcessPubREC(packet)
case *packets.PubrelPacket:
//log.Info("Recv publish rel message..........")
packet := ca.(*packets.PubrelPacket)
c.ProcessPubREL(packet)
case *packets.PubcompPacket:
//log.Info("Recv publish ack message..........")
packet := ca.(*packets.PubcompPacket)
c.ProcessPubComp(packet)
case *packets.SubscribePacket:
// log.Info("Recv subscribe message.....")
packet := ca.(*packets.SubscribePacket)
@@ -168,14 +156,6 @@ func ProcessMessage(msg *Message) {
}
}
func (c *client) ProcessConnect(packet *packets.ConnectPacket) {
}
func (c *client) ProcessConnAck(packet *packets.ConnackPacket) {
}
func (c *client) ProcessPublish(packet *packets.PublishPacket) {
topic := packet.TopicName
@@ -241,40 +221,51 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
}
}
for i, sub := range r.qsubs {
qnum := 0
for _, sub := range r.qsubs {
if sub.client.typ == ROUTER {
if typ == ROUTER {
continue
}
}
// s.qmu.Lock()
if cnt, exist := b.queues[string(sub.topic)]; exist && i == cnt {
if sub != nil {
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("process will message for qsub error, ", err)
if sub.client.typ == CLIENT {
qnum = qnum + 1
} else {
qnum = qnum + sub.client.rsubs[topic].num
}
}
var qsub *subscription
min := 0
max := 0
if cnt, exist := b.queues[topic]; exist {
for i, sub := range r.qsubs {
if sub.client.typ == ROUTER {
if typ == ROUTER {
continue
}
}
b.queues[topic] = (b.queues[topic] + 1) % len(r.qsubs)
break
if sub.client.typ == CLIENT {
max = max + 1
} else {
max = max + sub.client.rsubs[topic].num
}
if cnt >= min && cnt <= max {
qsub = r.qsubs[i]
break
}
min = max
}
// s.qmu.Unlock()
}
}
b.queues[topic] = (b.queues[topic] + 1) % qnum
func (c *client) ProcessPubAck(packet *packets.PubackPacket) {
}
func (c *client) ProcessPubREC(packet *packets.PubrecPacket) {
}
func (c *client) ProcessPubREL(packet *packets.PubrelPacket) {
}
func (c *client) ProcessPubComp(packet *packets.PubcompPacket) {
if qsub != nil {
err := qsub.client.WriterPacket(packet)
if err != nil {
log.Error("process will message for qsub error, ", err)
}
}
}
@@ -303,11 +294,6 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
if queue {
if len(t) > 7 {
t = t[7:]
// b.qmu.Lock()
if _, exists := b.queues[topic]; !exists {
b.queues[topic] = 0
}
// b.qmu.Unlock()
} else {
retcodes = append(retcodes, QosFailure)
continue
@@ -389,12 +375,15 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
subinfo, ok := c.rsubs[t]
if ok {
subinfo.num = subinfo.num - 1
c.rsubs[t] = subinfo
if subinfo.num < 1 {
sub = subinfo.sub
delete(c.rsubs, t)
} else {
c.rsubs[t] = subinfo
sub = nil
}
} else {
return
}
}
if ok {
@@ -421,11 +410,13 @@ func (c *client) unsubscribe(sub *subscription) {
if c.typ == CLIENT {
delete(c.subs, sub.topic)
}
b := c.broker
if b != nil && sub != nil {
b.sl.Remove(sub)
}
if c.broker != nil {
c.broker.sl.Remove(sub)
}
}
func (c *client) ProcessPing() {