cluster sub

This commit is contained in:
zhouyuyan
2017-09-07 14:32:01 +08:00
parent 34378164e0
commit ae1af54c6e
2 changed files with 28 additions and 45 deletions

View File

@@ -403,7 +403,7 @@ func (b *Broker) removeClient(c *client) {
// log.Info("delete client ,", clientId)
}
func (b *Broker) ProcessPublishMessage(packet *packets.PublishPacket) {
func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
topic := packet.TopicName
r := b.sl.Match(topic)
// log.Info("psubs num: ", len(r.psubs))

View File

@@ -1,7 +1,6 @@
package broker
import (
"errors"
"hmq/packets"
"net"
"strings"
@@ -30,7 +29,12 @@ type client struct {
info info
route *route
subs map[string]*subscription
rsubs map[string][]*subscription
rsubs map[string]*subInfo
}
type subInfo struct {
sub *subscription
num int
}
type subscription struct {
@@ -56,13 +60,13 @@ type route struct {
}
var (
disconnectdPacket = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
DisconnectdPacket = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
)
func (c *client) init() {
typ := c.typ
if typ == ROUTER {
c.rsubs = make(map[string][]*subscription)
c.rsubs = make(map[string]*subInfo)
} else if typ == CLIENT {
c.subs = make(map[string]*subscription, 10)
}
@@ -82,20 +86,14 @@ func (c *client) readLoop(msgPool *MessagePool) {
nowTime = uint16(time.Now().Unix())
if 0 != c.info.keepalive && nowTime-lastIn > c.info.keepalive*3/2 {
log.Errorf("Client %s has exceeded timeout, disconnecting.\n", c.info.clientID)
msg := &Message{
client: c,
packet: disconnectdPacket,
}
msg := &Message{client: c, packet: DisconnectdPacket}
msgPool.queue <- msg
return
}
packet, err := packets.ReadPacket(nc)
if err != nil {
log.Error("read packet error: ", err)
msg := &Message{
client: c,
packet: disconnectdPacket,
}
msg := &Message{client: c, packet: DisconnectdPacket}
msgPool.queue <- msg
return
}
@@ -332,10 +330,14 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
continue
}
case ROUTER:
if _, exist := c.rsubs[topic]; !exist {
c.rsubs[topic] = make([]*subscription, 10)
if subinfo, exist := c.rsubs[topic]; !exist {
sinfo := &subInfo{sub: sub, num: 1}
c.rsubs[topic] = sinfo
} else {
subinfo.num = subinfo.num + 1
retcodes = append(retcodes, qoss[i])
continue
}
c.rsubs[topic] = append(c.rsubs[topic], sub)
}
err := b.sl.Insert(sub)
if err != nil {
@@ -384,12 +386,15 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
case CLIENT:
sub, ok = c.subs[t]
case ROUTER:
_, ok := c.rsubs[t]
if ok && len(c.rsubs[t]) > 0 {
sub = c.rsubs[t][0]
c.rsubs[t] = c.rsubs[t][1:]
} else {
return
subinfo, ok := c.rsubs[t]
if ok {
subinfo.num = subinfo.num - 1
c.rsubs[t] = subinfo
if subinfo.num < 1 {
delete(c.rsubs, t)
} else {
c.rsubs[t] = subinfo
}
}
}
if ok {
@@ -447,7 +452,7 @@ func (c *client) Close() {
b.BroadcastUnSubscribe(subs)
}
if c.info.willMsg != nil {
b.ProcessPublishMessage(c.info.willMsg)
b.PublishMessage(c.info.willMsg)
}
}
if c.conn != nil {
@@ -462,25 +467,3 @@ func (c *client) WriterPacket(packet packets.ControlPacket) error {
c.mu.Unlock()
return err
}
func WriteBuffer(conn net.Conn, buf []byte) error {
if conn == nil {
return errors.New("conn is nul")
}
_, err := conn.Write(buf)
return err
}
func (c *client) writeBuffer(buf []byte) error {
c.mu.Lock()
err := WriteBuffer(c.conn, buf)
c.mu.Unlock()
return err
}
// func (c *client) writeMessage(msg message.Message) error {
// buf, err := EncodeMessage(msg)
// if err != nil {
// return err
// }
// return c.writeBuffer(buf)
// }