cluster sub

This commit is contained in:
zhouyuyan
2017-09-01 22:17:22 +08:00
parent b98ae9ec6f
commit ca7ebfb6e3
2 changed files with 60 additions and 40 deletions

View File

@@ -5,6 +5,7 @@ COPY tls /tls
COPY conf /conf
EXPOSE 1883
EXPOSE 1888
EXPOSE 8883
EXPOSE 1993

View File

@@ -29,6 +29,7 @@ type client struct {
info info
route *route
subs map[string]*subscription
rsubs map[string][]*subscription
}
type subscription struct {
@@ -54,7 +55,12 @@ type route struct {
}
func (c *client) init() {
c.subs = make(map[string]*subscription, 10)
typ := c.typ
if typ == ROUTER {
c.rsubs = make(map[string][]*subscription)
} else if typ == CLIENT {
c.subs = make(map[string]*subscription, 10)
}
c.info.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0]
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
}
@@ -271,47 +277,50 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
continue
}
if _, exist := c.subs[topic]; !exist {
queue := false
if strings.HasPrefix(topic, "$queue/") {
if len(t) > 7 {
t = t[7:]
queue = true
// b.qmu.Lock()
if _, exists := b.queues[topic]; !exists {
b.queues[topic] = 0
}
// b.qmu.Unlock()
} else {
retcodes = append(retcodes, QosFailure)
continue
queue := strings.HasPrefix(topic, "$queue/")
if queue {
if len(t) > 7 {
t = t[7:]
// b.qmu.Lock()
if _, exists := b.queues[topic]; !exists {
b.queues[topic] = 0
}
}
sub := &subscription{
topic: t,
qos: qoss[i],
client: c,
queue: queue,
}
c.mu.Lock()
c.subs[topic] = sub
c.mu.Unlock()
err := b.sl.Insert(sub)
if err != nil {
log.Error("Insert subscription error: ", err)
// b.qmu.Unlock()
} else {
retcodes = append(retcodes, QosFailure)
continue
}
retcodes = append(retcodes, qoss[i])
}
sub := &subscription{
topic: t,
qos: qoss[i],
client: c,
queue: queue,
}
switch c.typ {
case CLIENT:
if _, exist := c.subs[topic]; !exist {
c.subs[topic] = sub
} else {
//if exist ,check whether qos change
c.subs[topic].qos = qoss[i]
retcodes = append(retcodes, qoss[i])
continue
}
case ROUTER:
if _, exist := c.rsubs[topic]; !exist {
c.rsubs[topic] = make([]*subscription, 10)
}
c.rsubs[topic] = append(c.rsubs[topic], sub)
}
err := b.sl.Insert(sub)
if err != nil {
log.Error("Insert subscription error: ", err)
retcodes = append(retcodes, QosFailure)
} else {
//if exist ,check whether qos change
c.subs[topic].qos = qoss[i]
retcodes = append(retcodes, qoss[i])
}
}
suback.ReturnCodes = retcodes
err := c.WriterPacket(suback)
@@ -341,13 +350,23 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
if b == nil {
return
}
typ := c.typ
topics := packet.Topics
for _, t := range topics {
var sub *subscription
ok := false
if sub, ok = c.subs[t]; ok {
switch typ {
case CLIENT:
sub, ok = c.subs[t]
case ROUTER:
_, ok := c.rsubs[t]
if ok && len(c.rsubs[t]) > 0 {
sub = c.rsubs[t][0]
c.rsubs[t] = c.rsubs[t][1:]
}
}
if ok {
go c.unsubscribe(sub)
}
@@ -369,9 +388,9 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
func (c *client) unsubscribe(sub *subscription) {
c.mu.Lock()
delete(c.subs, string(sub.topic))
c.mu.Unlock()
if c.typ == CLIENT {
delete(c.subs, string(sub.topic))
}
if c.broker != nil {
c.broker.sl.Remove(sub)