add cluster

This commit is contained in:
zhouyuyan
2018-01-16 16:50:10 +08:00
parent 123bb7210f
commit 1d6f6a4a71
4 changed files with 103 additions and 69 deletions

View File

@@ -34,6 +34,7 @@ type client struct {
info info
route *route
status int
closed chan int
smu sync.RWMutex
mp *MessagePool
subs map[string]*subscription
@@ -75,12 +76,10 @@ func (c *client) init() {
c.smu.Lock()
defer c.smu.Unlock()
c.status = Connected
typ := c.typ
if typ == ROUTER {
c.rsubs = make(map[string]*subInfo)
} else if typ == CLIENT {
c.subs = make(map[string]*subscription, 10)
}
c.closed = make(chan int, 1)
c.rsubs = make(map[string]*subInfo)
c.subs = make(map[string]*subscription, 10)
c.info.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0]
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
}
@@ -101,6 +100,10 @@ func (c *client) keepAlive(ch chan int) {
msgPool.queue <- msg
timer.Stop()
return
case _, ok := <-c.closed:
if !ok {
return
}
}
}
}
@@ -121,8 +124,9 @@ func (c *client) readLoop() {
log.Error("read packet error: ", err, " clientID = ", c.info.clientID)
break
}
// log.Info("recv buf: ", packet)
ch <- 1
msg := &Message{
client: c,
packet: packet,
@@ -176,6 +180,11 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
}
topic := packet.TopicName
if topic == BrokerInfoTopic && c.typ != CLIENT {
c.ProcessInfo(packet)
return
}
if !c.CheckTopicAuth(PUB, topic) {
log.Error("Pub Topics Auth failed, ", topic, " clientID = ", c.info.clientID)
return
@@ -228,8 +237,8 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
}
for _, sub := range r.psubs {
if sub.client.typ == ROUTER {
if typ == ROUTER {
if sub.client.typ == REMOTE {
if typ == REMOTE {
continue
}
}
@@ -248,8 +257,8 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if exist {
// log.Info("queue index : ", cnt)
for _, sub := range r.qsubs {
if sub.client.typ == ROUTER {
if c.typ == ROUTER {
if sub.client.typ == REMOTE {
if c.typ == REMOTE {
continue
}
}
@@ -350,7 +359,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
retcodes = append(retcodes, qoss[i])
continue
}
case ROUTER:
case REMOTE:
if subinfo, exist := c.rsubs[topic]; !exist {
sinfo := &subInfo{sub: sub, num: 1}
c.rsubs[topic] = sinfo
@@ -405,29 +414,25 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
topics := packet.Topics
for _, t := range topics {
var sub *subscription
ok := false
switch typ {
case CLIENT:
sub, ok = c.subs[t]
case ROUTER:
sub, ok := c.subs[t]
if ok {
c.unsubscribe(sub)
}
case REMOTE:
subinfo, ok := c.rsubs[t]
if ok {
subinfo.num = subinfo.num - 1
if subinfo.num < 1 {
sub = subinfo.sub
delete(c.rsubs, t)
c.unsubscribe(subinfo.sub)
} else {
c.rsubs[t] = subinfo
sub = nil
}
} else {
return
}
}
if ok {
go c.unsubscribe(sub)
}
}
@@ -471,21 +476,24 @@ func (c *client) ProcessPing() {
}
func (c *client) Close() {
c.smu.Lock()
if c.status == Disconnected {
c.smu.Unlock()
return
}
//wait for message complete
time.Sleep(1 * time.Second)
c.smu.Lock()
c.status = Disconnected
c.smu.Unlock()
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
c.smu.Unlock()
close(c.closed)
b := c.broker
subs := c.subs
if b != nil {
@@ -505,12 +513,19 @@ func (c *client) Close() {
//do reconnect
if c.typ == REMOTE {
b.connectRouter(c.route.remoteUrl, "")
localUrl := c.info.localIP + ":" + c.broker.config.Cluster.Port
if c.route.remoteUrl != localUrl {
b.connectRouter(c.route.remoteUrl, "")
}
}
}
}
func (c *client) WriterPacket(packet packets.ControlPacket) error {
if packet == nil {
return nil
}
c.mu.Lock()
err := packet.Write(c.conn)
c.mu.Unlock()