This commit is contained in:
chowyu08
2017-08-25 21:29:03 +08:00
parent 5c6e136c2b
commit 80e118728d
8 changed files with 342 additions and 46 deletions

View File

@@ -59,9 +59,8 @@ func (c *client) init() {
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
}
func (c *client) readLoop(idx int) {
func (c *client) readLoop(msgPool *MessagePool) {
nc := c.conn
msgPool := MSGPool[idx%MessagePoolNum].GetPool()
if nc == nil || msgPool == nil {
return
}
@@ -144,6 +143,7 @@ func (c *client) ProcessPublish(buf []byte) {
msg, err := DecodePublishMessage(buf)
if err != nil {
log.Error("Decode Publish Message error: ", err)
c.Close()
return
}
c.ProcessPublishMessage(buf, msg)
@@ -158,12 +158,14 @@ func (c *client) ProcessPublish(buf []byte) {
}
}
func (c *client) ProcessPublishMessage(buf []byte, msg *message.PublishMessage) {
b := c.broker
if b == nil {
return
}
typ := c.typ
topic := string(msg.Topic())
r := b.sl.Match(topic)
@@ -173,11 +175,11 @@ func (c *client) ProcessPublishMessage(buf []byte, msg *message.PublishMessage)
}
for _, sub := range r.psubs {
// if sub.client.typ == ROUTER {
// if typ == ROUTER {
// continue
// }
// }
if sub.client.typ == ROUTER {
if typ == ROUTER {
continue
}
}
if sub != nil {
err := sub.client.writeBuffer(buf)
if err != nil {
@@ -187,11 +189,11 @@ func (c *client) ProcessPublishMessage(buf []byte, msg *message.PublishMessage)
}
for i, sub := range r.qsubs {
// if sub.client.typ == ROUTER {
// if typ == ROUTER {
// continue
// }
// }
if sub.client.typ == ROUTER {
if typ == ROUTER {
continue
}
}
// s.qmu.Lock()
if cnt, exist := b.queues[string(sub.topic)]; exist && i == cnt {
if sub != nil {
@@ -224,8 +226,8 @@ func (c *client) ProcessPubComp(buf []byte) {
}
func (c *client) ProcessSubscribe(buf []byte) {
srv := c.broker
if srv == nil {
b := c.broker
if b == nil {
return
}
msg, err := DecodeSubscribeMessage(buf)
@@ -256,11 +258,11 @@ func (c *client) ProcessSubscribe(buf []byte) {
if len(t) > 7 {
t = t[7:]
queue = true
// srv.qmu.Lock()
if _, exists := srv.queues[topic]; !exists {
srv.queues[topic] = 0
// b.qmu.Lock()
if _, exists := b.queues[topic]; !exists {
b.queues[topic] = 0
}
// srv.qmu.Unlock()
// b.qmu.Unlock()
} else {
retcodes = append(retcodes, message.QosFailure)
continue
@@ -273,11 +275,11 @@ func (c *client) ProcessSubscribe(buf []byte) {
queue: queue,
}
// c.mu.Lock()
c.mu.Lock()
c.subs[topic] = sub
// c.mu.Unlock()
c.mu.Unlock()
err := srv.sl.Insert(sub)
err := b.sl.Insert(sub)
if err != nil {
log.Error("Insert subscription error: ", err)
retcodes = append(retcodes, message.QosFailure)
@@ -305,15 +307,13 @@ func (c *client) ProcessSubscribe(buf []byte) {
return
}
//broadcast subscribe message
// if typ == CLIENT {
// srv.startGoRoutine(func() {
// srv.BroadcastSubscribeMessage(buf)
// })
// }
if c.typ == CLIENT {
go b.BroadcastSubOrUnsubMessage(buf)
}
//process retain message
for _, t := range topics {
bufs := srv.rl.Match(t)
bufs := b.rl.Match(t)
for _, buf := range bufs {
log.Info("process retain message: ", string(buf))
if buf != nil && string(buf) != "" {
@@ -324,8 +324,8 @@ func (c *client) ProcessSubscribe(buf []byte) {
}
func (c *client) ProcessUnSubscribe(buf []byte) {
srv := c.broker
if srv == nil {
b := c.broker
if b == nil {
return
}
@@ -356,9 +356,9 @@ func (c *client) ProcessUnSubscribe(buf []byte) {
return
}
// //process ubsubscribe message
// if typ == CLIENT {
// c.srv.BroadcastUnSubscribeMessage(msg)
// }
if c.typ == CLIENT {
b.BroadcastSubOrUnsubMessage(buf)
}
}
func (c *client) unsubscribe(sub *subscription) {
@@ -389,15 +389,19 @@ func (c *client) ProcessPing(buf []byte) {
}
func (c *client) Close() {
srv := c.broker
b := c.broker
subs := c.subs
if srv != nil {
if b != nil {
b.removeClient(c)
for _, sub := range subs {
err := srv.sl.Remove(sub)
err := b.sl.Remove(sub)
if err != nil {
log.Error("closed client but remove sublist error, ", err)
}
}
if c.info.willMsg != nil {
b.ProcessPublishMessage(c.info.willMsg)
}
}
if c.conn != nil {
c.conn.Close()