mirror of
https://github.com/fhmq/hmq.git
synced 2026-04-24 10:38:34 +00:00
modify keep alive
This commit is contained in:
@@ -85,6 +85,26 @@ func (c *client) init() {
|
||||
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
|
||||
}
|
||||
|
||||
func (c *client) keepAlive(ch chan int) {
|
||||
defer close(ch)
|
||||
keepalive := time.Duration(c.info.keepalive * 3 / 2)
|
||||
timeTicker := time.NewTimer(keepalive * time.Second)
|
||||
msgPool := c.mp
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
timeTicker.Reset(keepalive * time.Second)
|
||||
case <-timeTicker.C:
|
||||
log.Errorf("Client %s has exceeded timeout, disconnecting.\n", c.info.clientID)
|
||||
msg := &Message{client: c, packet: DisconnectdPacket}
|
||||
msgPool.queue <- msg
|
||||
timeTicker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) readLoop() {
|
||||
nc := c.conn
|
||||
msgPool := c.mp
|
||||
@@ -92,21 +112,17 @@ func (c *client) readLoop() {
|
||||
return
|
||||
}
|
||||
|
||||
lastIn := uint16(time.Now().Unix())
|
||||
var nowTime uint16
|
||||
ch := make(chan int, 1000)
|
||||
go c.keepAlive(ch)
|
||||
|
||||
for {
|
||||
nowTime = uint16(time.Now().Unix())
|
||||
if 0 != c.info.keepalive && nowTime-lastIn > c.info.keepalive*3/2 {
|
||||
log.Errorf("Client %s has exceeded timeout, disconnecting.\n", c.info.clientID)
|
||||
break
|
||||
}
|
||||
packet, err := packets.ReadPacket(nc)
|
||||
if err != nil {
|
||||
log.Error("read packet error: ", err, " clientID = ", c.info.clientID)
|
||||
break
|
||||
}
|
||||
// log.Info("recv buf: ", packet)
|
||||
lastIn = uint16(time.Now().Unix())
|
||||
|
||||
msg := &Message{
|
||||
client: c,
|
||||
packet: packet,
|
||||
|
||||
Reference in New Issue
Block a user