This commit is contained in:
joy.zhou
2018-05-10 13:13:36 +08:00
committed by GitHub
parent 684584b208
commit 84e7fe2490
2 changed files with 40 additions and 37 deletions

View File

@@ -3,6 +3,7 @@
package broker package broker
import ( import (
"context"
"errors" "errors"
"net" "net"
"reflect" "reflect"
@@ -31,17 +32,18 @@ const (
) )
type client struct { type client struct {
typ int typ int
mu sync.Mutex mu sync.Mutex
broker *Broker broker *Broker
conn net.Conn conn net.Conn
info info info info
route route route route
status int status int
closed chan int smu sync.RWMutex
smu sync.RWMutex subs map[string]*subscription
subs map[string]*subscription rsubs map[string]*subInfo
rsubs map[string]*subInfo ctx context.Context
cancelFunc context.CancelFunc
} }
type subInfo struct { type subInfo struct {
@@ -79,12 +81,11 @@ func (c *client) init() {
c.smu.Lock() c.smu.Lock()
defer c.smu.Unlock() defer c.smu.Unlock()
c.status = Connected c.status = Connected
c.closed = make(chan int, 1)
c.rsubs = make(map[string]*subInfo) c.rsubs = make(map[string]*subInfo)
c.subs = make(map[string]*subscription, 10) c.subs = make(map[string]*subscription, 10)
c.info.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0] c.info.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0]
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0] c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
} }
func (c *client) keepAlive(ch chan int) { func (c *client) keepAlive(ch chan int) {
@@ -111,10 +112,8 @@ func (c *client) keepAlive(ch chan int) {
timer.Stop() timer.Stop()
return return
case _, ok := <-c.closed: case <-c.ctx.Done():
if !ok { return
return
}
} }
} }
} }
@@ -130,23 +129,28 @@ func (c *client) readLoop() {
go c.keepAlive(ch) go c.keepAlive(ch)
for { for {
packet, err := packets.ReadPacket(nc) select {
if err != nil { case <-c.ctx.Done():
log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) return
break default:
} packet, err := packets.ReadPacket(nc)
// keepalive channel if err != nil {
ch <- 1 log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
msg := &Message{client: c, packet: DisconnectdPacket}
b.SubmitWork(msg)
return
}
// keepalive channel
ch <- 1
msg := &Message{ msg := &Message{
client: c, client: c,
packet: packet, packet: packet,
}
b.SubmitWork(msg)
} }
b.SubmitWork(msg)
} }
msg := &Message{client: c, packet: DisconnectdPacket}
b.SubmitWork(msg)
} }
func ProcessMessage(msg *Message) { func ProcessMessage(msg *Message) {
@@ -500,6 +504,8 @@ func (c *client) Close() {
return return
} }
c.cancelFunc()
c.status = Disconnected c.status = Disconnected
//wait for message complete //wait for message complete
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
@@ -512,8 +518,6 @@ func (c *client) Close() {
c.smu.Unlock() c.smu.Unlock()
close(c.closed)
b := c.broker b := c.broker
subs := c.subs subs := c.subs
if b != nil { if b != nil {

View File

@@ -4,10 +4,11 @@ package broker
import ( import (
"fmt" "fmt"
"time"
simplejson "github.com/bitly/go-simplejson" simplejson "github.com/bitly/go-simplejson"
"github.com/eclipse/paho.mqtt.golang/packets" "github.com/eclipse/paho.mqtt.golang/packets"
"go.uber.org/zap" "go.uber.org/zap"
"time"
) )
func (c *client) SendInfo() { func (c *client) SendInfo() {
@@ -35,10 +36,8 @@ func (c *client) StartPing() {
log.Error("ping error: ", zap.Error(err)) log.Error("ping error: ", zap.Error(err))
c.Close() c.Close()
} }
case _, ok := <-c.closed: case <-c.ctx.Done():
if !ok { return
return
}
} }
} }
} }