fhmq/hmq#5 added zap logger (#11)

This commit is contained in:
Marc Magnin
2018-01-26 06:51:36 +01:00
committed by joy.zhou
parent 1058256235
commit ef252550dc
10 changed files with 163 additions and 89 deletions

View File

@@ -9,8 +9,7 @@ import (
"time"
"github.com/eclipse/paho.mqtt.golang/packets"
log "github.com/cihub/seelog"
"go.uber.org/zap"
)
const (
@@ -102,7 +101,7 @@ func (c *client) keepAlive(ch chan int) {
timer.Reset(keepalive)
continue
}
log.Error("Client exceeded timeout, disconnecting. clientID = ", c.info.clientID, " keepalive = ", c.info.keepalive)
log.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive))
msg := &Message{client: c, packet: DisconnectdPacket}
msgPool.queue <- msg
timer.Stop()
@@ -128,7 +127,7 @@ func (c *client) readLoop() {
for {
packet, err := packets.ReadPacket(nc)
if err != nil {
log.Error("read packet error: ", err, " clientID = ", c.info.clientID)
log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
break
}
@@ -151,7 +150,7 @@ func ProcessMessage(msg *Message) {
if ca == nil {
return
}
log.Debug("Recv message from client, ID = ", c.info.clientID)
log.Debug("Recv message from client, ID = ", zap.String("ClientID", c.info.clientID))
switch ca.(type) {
case *packets.ConnackPacket:
@@ -177,7 +176,7 @@ func ProcessMessage(msg *Message) {
case *packets.DisconnectPacket:
c.Close()
default:
log.Info("Recv Unknow message.......", " clientID = ", c.info.clientID)
log.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID))
}
}
@@ -193,7 +192,7 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
}
if !c.CheckTopicAuth(PUB, topic) {
log.Error("Pub Topics Auth failed, ", topic, " clientID = ", c.info.clientID)
log.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
return
}
@@ -204,21 +203,21 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
puback.MessageID = packet.MessageID
if err := c.WriterPacket(puback); err != nil {
log.Error("send puback error, ", err, " clientID = ", c.info.clientID)
log.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
c.ProcessPublishMessage(packet)
case QosExactlyOnce:
return
default:
log.Error("publish with unknown qos", " clientID = ", c.info.clientID)
log.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID))
return
}
if packet.Retain {
if b := c.broker; b != nil {
err := b.rl.Insert(topic, packet)
if err != nil {
log.Error("Insert Retain Message error: ", err, " clientID = ", c.info.clientID)
log.Error("Insert Retain Message error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
}
}
}
@@ -252,7 +251,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if sub != nil {
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("process message for psub error, ", err, " clientID = ", c.info.clientID)
log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
}
}
}
@@ -278,7 +277,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if sub != nil {
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("send publish error, ", err, " clientID = ", c.info.clientID)
log.Error("send publish error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
}
}
@@ -332,7 +331,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
t := topic
//check topic auth for client
if !c.CheckTopicAuth(SUB, topic) {
log.Error("Sub topic Auth failed: ", topic, " clientID = ", c.info.clientID)
log.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
retcodes = append(retcodes, QosFailure)
continue
}
@@ -379,7 +378,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
}
err := b.sl.Insert(sub)
if err != nil {
log.Error("Insert subscription error: ", err, " clientID = ", c.info.clientID)
log.Error("Insert subscription error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
retcodes = append(retcodes, QosFailure)
} else {
retcodes = append(retcodes, qoss[i])
@@ -389,7 +388,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
err := c.WriterPacket(suback)
if err != nil {
log.Error("send suback error, ", err, " clientID = ", c.info.clientID)
log.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
//broadcast subscribe message
@@ -401,7 +400,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
for _, t := range topics {
packets := b.rl.Match(t)
for _, packet := range packets {
log.Info("process retain message: ", packet, " clientID = ", c.info.clientID)
log.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID))
if packet != nil {
c.WriterPacket(packet)
}
@@ -448,7 +447,7 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
err := c.WriterPacket(unsuback)
if err != nil {
log.Error("send unsuback error, ", err, " clientID = ", c.info.clientID)
log.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
// //process ubsubscribe message
@@ -477,7 +476,7 @@ func (c *client) ProcessPing() {
resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket)
err := c.WriterPacket(resp)
if err != nil {
log.Error("send PingResponse error, ", err, " clientID = ", c.info.clientID)
log.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
}
@@ -508,7 +507,7 @@ func (c *client) Close() {
for _, sub := range subs {
err := b.sl.Remove(sub)
if err != nil {
log.Error("closed client but remove sublist error, ", err, " clientID = ", c.info.clientID)
log.Error("closed client but remove sublist error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
}
}
if c.typ == CLIENT {