mirror of
https://github.com/fhmq/hmq.git
synced 2026-04-24 10:38:34 +00:00
fixbug
This commit is contained in:
@@ -55,6 +55,10 @@ type route struct {
|
||||
remoteUrl string
|
||||
}
|
||||
|
||||
var (
|
||||
disconnectdPacket = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
|
||||
)
|
||||
|
||||
func (c *client) init() {
|
||||
typ := c.typ
|
||||
if typ == ROUTER {
|
||||
@@ -72,25 +76,35 @@ func (c *client) readLoop(msgPool *MessagePool) {
|
||||
return
|
||||
}
|
||||
|
||||
msg := &Message{}
|
||||
lastIn := uint16(time.Now().Unix())
|
||||
var nowTime uint16
|
||||
for {
|
||||
nowTime = uint16(time.Now().Unix())
|
||||
if 0 != c.info.keepalive && nowTime-lastIn > c.info.keepalive*3/2 {
|
||||
log.Errorf("Client %s has exceeded timeout, disconnecting.\n", c.info.clientID)
|
||||
c.Close()
|
||||
msg := &Message{
|
||||
client: c,
|
||||
packet: disconnectdPacket,
|
||||
}
|
||||
msgPool.queue <- msg
|
||||
return
|
||||
}
|
||||
packet, err := packets.ReadPacket(nc)
|
||||
if err != nil {
|
||||
log.Error("read packet error: ", err)
|
||||
c.Close()
|
||||
msg := &Message{
|
||||
client: c,
|
||||
packet: disconnectdPacket,
|
||||
}
|
||||
msgPool.queue <- msg
|
||||
return
|
||||
}
|
||||
// log.Info("recv buf: ", packet)
|
||||
lastIn = uint16(time.Now().Unix())
|
||||
msg.client = c
|
||||
msg.packet = packet
|
||||
msg := &Message{
|
||||
client: c,
|
||||
packet: packet,
|
||||
}
|
||||
msgPool.queue <- msg
|
||||
}
|
||||
msgPool.Reduce()
|
||||
@@ -99,7 +113,7 @@ func (c *client) readLoop(msgPool *MessagePool) {
|
||||
func ProcessMessage(msg *Message) {
|
||||
c := msg.client
|
||||
ca := msg.packet
|
||||
if c == nil || ca == nil {
|
||||
if ca == nil {
|
||||
return
|
||||
}
|
||||
switch ca.(type) {
|
||||
|
||||
Reference in New Issue
Block a user