diff --git a/.gitignore b/.gitignore index d6f02e2..b9777a7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ hmq log log/* +*.test \ No newline at end of file diff --git a/broker/auth.go b/broker/auth.go index 76ea9c8..02bcd4a 100644 --- a/broker/auth.go +++ b/broker/auth.go @@ -6,8 +6,8 @@ import ( "strings" "github.com/fhmq/hmq/lib/acl" + "go.uber.org/zap" - log "github.com/cihub/seelog" "github.com/fsnotify/fsnotify" ) @@ -43,10 +43,10 @@ func (b *Broker) handleFsEvent(event fsnotify.Event) error { case b.config.AclConf: if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create { - log.Info("text:handling acl config change event:", event) + log.Info("text:handling acl config change event:", zap.String("filename", event.Name)) aclconfig, err := acl.AclConfigLoad(event.Name) if err != nil { - log.Error("aclconfig change failed, load acl conf error: ", err) + log.Error("aclconfig change failed, load acl conf error: ", zap.Error(err)) return err } b.AclConfig = aclconfig @@ -59,14 +59,14 @@ func (b *Broker) StartAclWatcher() { go func() { wch, e := fsnotify.NewWatcher() if e != nil { - log.Error("start monitor acl config file error,", e) + log.Error("start monitor acl config file error,", zap.Error(e)) return } defer wch.Close() for _, i := range watchList { if err := wch.Add(i); err != nil { - log.Error("start monitor acl config file error,", err) + log.Error("start monitor acl config file error,", zap.Error(err)) return } } @@ -76,7 +76,7 @@ func (b *Broker) StartAclWatcher() { case evt := <-wch.Events: b.handleFsEvent(evt) case err := <-wch.Errors: - log.Error("error:", err.Error()) + log.Error("error:", zap.Error(err)) } } }() diff --git a/broker/broker.go b/broker/broker.go index 7ccf81e..4b09bf9 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -15,10 +15,15 @@ import ( "github.com/eclipse/paho.mqtt.golang/packets" "github.com/shirou/gopsutil/mem" + "go.uber.org/zap" "golang.org/x/net/websocket" - log "github.com/cihub/seelog" + "github.com/fhmq/hmq/logger" +) + +var ( + log = logger.Get().Named("Broker") ) type Broker struct { @@ -49,7 +54,7 @@ func NewBroker(config *Config) (*Broker, error) { if b.config.TlsPort != "" { tlsconfig, err := NewTLSConfig(b.config.TlsInfo) if err != nil { - log.Error("new tlsConfig error: ", err) + log.Error("new tlsConfig error", zap.Error(err)) return nil, err } b.tlsConfig = tlsconfig @@ -57,7 +62,7 @@ func NewBroker(config *Config) (*Broker, error) { if b.config.Acl { aclconfig, err := acl.AclConfigLoad(b.config.AclConf) if err != nil { - log.Error("Load acl conf error: ", err) + log.Error("Load acl conf error", zap.Error(err)) return nil, err } b.AclConfig = aclconfig @@ -98,7 +103,7 @@ func (b *Broker) Start() { b.ConnectToDiscovery() } - //system montior + //system monitor go StateMonitor() } @@ -119,7 +124,7 @@ func StateMonitor() { func (b *Broker) StartWebsocketListening() { path := b.config.WsPath hp := ":" + b.config.WsPort - log.Info("Start Webscoker Listening on ", hp, path) + log.Info("Start Websocket Listening on ", zap.String("hp", hp), zap.String("path", path)) http.Handle(path, websocket.Handler(b.wsHandler)) var err error if b.config.WsTLS { @@ -147,14 +152,14 @@ func (b *Broker) StartClientListening(Tls bool) { if Tls { hp = b.config.TlsHost + ":" + b.config.TlsPort l, err = tls.Listen("tcp", hp, b.tlsConfig) - log.Info("Start TLS Listening client on ", hp) + log.Info("Start TLS Listening client on ", zap.String("hp", hp)) } else { hp := b.config.Host + ":" + b.config.Port l, err = net.Listen("tcp", hp) - log.Info("Start Listening client on ", hp) + log.Info("Start Listening client on ", zap.String("hp", hp)) } if err != nil { - log.Error("Error listening on ", err) + log.Error("Error listening on ", zap.Error(err)) return } tmpDelay := 10 * ACCEPT_MIN_SLEEP @@ -163,14 +168,14 @@ func (b *Broker) StartClientListening(Tls bool) { if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { log.Error("Temporary Client Accept Error(%v), sleeping %dms", - ne, tmpDelay/time.Millisecond) + zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond)) time.Sleep(tmpDelay) tmpDelay *= 2 if tmpDelay > ACCEPT_MAX_SLEEP { tmpDelay = ACCEPT_MAX_SLEEP } } else { - log.Error("Accept error: %v", err) + log.Error("Accept error: %v", zap.Error(err)) } continue } @@ -189,7 +194,7 @@ func (b *Broker) Handshake(conn net.Conn) bool { // Force handshake if err := nc.Handshake(); err != nil { - log.Error("TLS handshake error, ", err) + log.Error("TLS handshake error, ", zap.Error(err)) return false } nc.SetReadDeadline(time.Time{}) @@ -212,11 +217,11 @@ func TlsTimeout(conn *tls.Conn) { func (b *Broker) StartClusterListening() { var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port - log.Info("Start Listening cluster on ", hp) + log.Info("Start Listening cluster on ", zap.String("hp", hp)) l, e := net.Listen("tcp", hp) if e != nil { - log.Error("Error listening on ", e) + log.Error("Error listening on ", zap.Error(e)) return } @@ -227,14 +232,14 @@ func (b *Broker) StartClusterListening() { if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { log.Error("Temporary Client Accept Error(%v), sleeping %dms", - ne, tmpDelay/time.Millisecond) + zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond)) time.Sleep(tmpDelay) tmpDelay *= 2 if tmpDelay > ACCEPT_MAX_SLEEP { tmpDelay = ACCEPT_MAX_SLEEP } } else { - log.Error("Accept error: %v", err) + log.Error("Accept error: %v", zap.Error(err)) } continue } @@ -248,7 +253,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { //process connect packet packet, err := packets.ReadPacket(conn) if err != nil { - log.Error("read connect packet error: ", err) + log.Error("read connect packet error: ", zap.Error(err)) return } if packet == nil { @@ -265,7 +270,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { connack.SessionPresent = msg.CleanSession err = connack.Write(conn) if err != nil { - log.Error("send connack error, ", err, " clientID = ", msg.ClientIdentifier) + log.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier)) return } @@ -308,7 +313,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { c.mp = msgPool old, exist = b.clients.Load(cid) if exist { - log.Warn("client exist, close old...", " clientID = ", c.info.clientID) + log.Warn("client exist, close old...", zap.String("clientID", c.info.clientID)) ol, ok := old.(*client) if ok { msg := &Message{client: c, packet: DisconnectdPacket} @@ -341,7 +346,7 @@ func (b *Broker) ConnectToDiscovery() { for { conn, err = net.Dial("tcp", b.config.Router) if err != nil { - log.Error("Error trying to connect to route: ", err) + log.Error("Error trying to connect to route: ", zap.Error(err)) log.Debug("Connect to route timeout ,retry...") if 0 == tempDelay { @@ -358,8 +363,7 @@ func (b *Broker) ConnectToDiscovery() { } break } - - log.Debug("connect to router success :", b.config.Router) + log.Debug("connect to router success :", zap.String("Router", b.config.Router)) cid := b.id info := info{ @@ -398,7 +402,7 @@ func (b *Broker) connectRouter(id, addr string) { conn, err = net.Dial("tcp", addr) if err != nil { - log.Error("Error trying to connect to route: ", err) + log.Error("Error trying to connect to route: ", zap.Error(err)) if retryTimes > 50 { return @@ -506,7 +510,7 @@ func (b *Broker) SendLocalSubsToRouter(c *client) { if len(subInfo.Topics) > 0 { err := c.WriterPacket(subInfo) if err != nil { - log.Error("Send localsubs To Router error :", err) + log.Error("Send localsubs To Router error :", zap.Error(err)) } } } @@ -563,7 +567,7 @@ func (b *Broker) PublishMessage(packet *packets.PublishPacket) { if sub != nil { err := sub.client.WriterPacket(packet) if err != nil { - log.Error("process message for psub error, ", err) + log.Error("process message for psub error, ", zap.Error(err)) } } } diff --git a/broker/client.go b/broker/client.go index 958293d..556afbe 100644 --- a/broker/client.go +++ b/broker/client.go @@ -9,8 +9,7 @@ import ( "time" "github.com/eclipse/paho.mqtt.golang/packets" - - log "github.com/cihub/seelog" + "go.uber.org/zap" ) const ( @@ -102,7 +101,7 @@ func (c *client) keepAlive(ch chan int) { timer.Reset(keepalive) continue } - log.Error("Client exceeded timeout, disconnecting. clientID = ", c.info.clientID, " keepalive = ", c.info.keepalive) + log.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive)) msg := &Message{client: c, packet: DisconnectdPacket} msgPool.queue <- msg timer.Stop() @@ -128,7 +127,7 @@ func (c *client) readLoop() { for { packet, err := packets.ReadPacket(nc) if err != nil { - log.Error("read packet error: ", err, " clientID = ", c.info.clientID) + log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) break } @@ -151,7 +150,7 @@ func ProcessMessage(msg *Message) { if ca == nil { return } - log.Debug("Recv message from client, ID = ", c.info.clientID) + log.Debug("Recv message from client, ID = ", zap.String("ClientID", c.info.clientID)) switch ca.(type) { case *packets.ConnackPacket: @@ -177,7 +176,7 @@ func ProcessMessage(msg *Message) { case *packets.DisconnectPacket: c.Close() default: - log.Info("Recv Unknow message.......", " clientID = ", c.info.clientID) + log.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID)) } } @@ -193,7 +192,7 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) { } if !c.CheckTopicAuth(PUB, topic) { - log.Error("Pub Topics Auth failed, ", topic, " clientID = ", c.info.clientID) + log.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) return } @@ -204,21 +203,21 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) { puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket) puback.MessageID = packet.MessageID if err := c.WriterPacket(puback); err != nil { - log.Error("send puback error, ", err, " clientID = ", c.info.clientID) + log.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) return } c.ProcessPublishMessage(packet) case QosExactlyOnce: return default: - log.Error("publish with unknown qos", " clientID = ", c.info.clientID) + log.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID)) return } if packet.Retain { if b := c.broker; b != nil { err := b.rl.Insert(topic, packet) if err != nil { - log.Error("Insert Retain Message error: ", err, " clientID = ", c.info.clientID) + log.Error("Insert Retain Message error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) } } } @@ -252,7 +251,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { if sub != nil { err := sub.client.WriterPacket(packet) if err != nil { - log.Error("process message for psub error, ", err, " clientID = ", c.info.clientID) + log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) } } } @@ -278,7 +277,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { if sub != nil { err := sub.client.WriterPacket(packet) if err != nil { - log.Error("send publish error, ", err, " clientID = ", c.info.clientID) + log.Error("send publish error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) } } @@ -332,7 +331,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { t := topic //check topic auth for client if !c.CheckTopicAuth(SUB, topic) { - log.Error("Sub topic Auth failed: ", topic, " clientID = ", c.info.clientID) + log.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) retcodes = append(retcodes, QosFailure) continue } @@ -379,7 +378,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { } err := b.sl.Insert(sub) if err != nil { - log.Error("Insert subscription error: ", err, " clientID = ", c.info.clientID) + log.Error("Insert subscription error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) retcodes = append(retcodes, QosFailure) } else { retcodes = append(retcodes, qoss[i]) @@ -389,7 +388,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { err := c.WriterPacket(suback) if err != nil { - log.Error("send suback error, ", err, " clientID = ", c.info.clientID) + log.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) return } //broadcast subscribe message @@ -401,7 +400,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { for _, t := range topics { packets := b.rl.Match(t) for _, packet := range packets { - log.Info("process retain message: ", packet, " clientID = ", c.info.clientID) + log.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID)) if packet != nil { c.WriterPacket(packet) } @@ -448,7 +447,7 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) { err := c.WriterPacket(unsuback) if err != nil { - log.Error("send unsuback error, ", err, " clientID = ", c.info.clientID) + log.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) return } // //process ubsubscribe message @@ -477,7 +476,7 @@ func (c *client) ProcessPing() { resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket) err := c.WriterPacket(resp) if err != nil { - log.Error("send PingResponse error, ", err, " clientID = ", c.info.clientID) + log.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) return } } @@ -508,7 +507,7 @@ func (c *client) Close() { for _, sub := range subs { err := b.sl.Remove(sub) if err != nil { - log.Error("closed client but remove sublist error, ", err, " clientID = ", c.info.clientID) + log.Error("closed client but remove sublist error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) } } if c.typ == CLIENT { diff --git a/broker/config.go b/broker/config.go index 6417d51..f4feb5e 100644 --- a/broker/config.go +++ b/broker/config.go @@ -11,7 +11,7 @@ import ( "fmt" "io/ioutil" - log "github.com/cihub/seelog" + "go.uber.org/zap" ) type Config struct { @@ -95,7 +95,7 @@ func LoadConfig(filename string) (*Config, error) { content, err := ioutil.ReadFile(filename) if err != nil { - log.Error("Read config file error: ", err) + log.Error("Read config file error: ", zap.Error(err)) return nil, err } // log.Info(string(content)) @@ -103,7 +103,7 @@ func LoadConfig(filename string) (*Config, error) { var config Config err = json.Unmarshal(content, &config) if err != nil { - log.Error("Unmarshal config file error: ", err) + log.Error("Unmarshal config file error: ", zap.Error(err)) return nil, err } @@ -151,11 +151,11 @@ func NewTLSConfig(tlsInfo TLSInfo) (*tls.Config, error) { cert, err := tls.LoadX509KeyPair(tlsInfo.CertFile, tlsInfo.KeyFile) if err != nil { - return nil, fmt.Errorf("error parsing X509 certificate/key pair: %v", err) + return nil, fmt.Errorf("error parsing X509 certificate/key pair: %v", zap.Error(err)) } cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0]) if err != nil { - return nil, fmt.Errorf("error parsing certificate: %v", err) + return nil, fmt.Errorf("error parsing certificate: %v", zap.Error(err)) } // Create TLSConfig diff --git a/broker/info.go b/broker/info.go index c45a949..45c77a7 100644 --- a/broker/info.go +++ b/broker/info.go @@ -7,9 +7,9 @@ import ( "time" "github.com/eclipse/paho.mqtt.golang/packets" + "go.uber.org/zap" simplejson "github.com/bitly/go-simplejson" - log "github.com/cihub/seelog" ) func (c *client) SendInfo() { @@ -21,7 +21,7 @@ func (c *client) SendInfo() { infoMsg := NewInfo(c.broker.id, url, false) err := c.WriterPacket(infoMsg) if err != nil { - log.Error("send info message error, ", err) + log.Error("send info message error, ", zap.Error(err)) return } } @@ -34,7 +34,7 @@ func (c *client) StartPing() { case <-timeTicker.C: err := c.WriterPacket(ping) if err != nil { - log.Error("ping error: ", err) + log.Error("ping error: ", zap.Error(err)) c.Close() } case _, ok := <-c.closed: @@ -57,7 +57,7 @@ func (c *client) SendConnect() { m.Keepalive = uint16(60) err := c.WriterPacket(m) if err != nil { - log.Error("send connect message error, ", err) + log.Error("send connect message error, ", zap.Error(err)) return } log.Info("send connect success") @@ -81,17 +81,17 @@ func (c *client) ProcessInfo(packet *packets.PublishPacket) { return } - log.Info("recv remoteInfo: ", string(packet.Payload)) + log.Info("recv remoteInfo: ", zap.String("payload", string(packet.Payload))) - js, e := simplejson.NewJson(packet.Payload) - if e != nil { - log.Warn("parse info message err", e) + js, err := simplejson.NewJson(packet.Payload) + if err != nil { + log.Warn("parse info message err", zap.Error(err)) return } routes, err := js.Get("data").Map() if routes == nil { - log.Error("receive info message error, ", err) + log.Error("receive info message error, ", zap.Error(err)) return } diff --git a/broker/sublist.go b/broker/sublist.go index 05d45f2..3ee0ff7 100644 --- a/broker/sublist.go +++ b/broker/sublist.go @@ -6,7 +6,7 @@ import ( "errors" "sync" - log "github.com/cihub/seelog" + "go.uber.org/zap" ) // A result structure better optimized for queue subs. @@ -211,7 +211,7 @@ func (s *Sublist) Match(topic string) *SublistResult { tokens, err := PublishTopicCheckAndSpilt(topic) if err != nil { - log.Error("\tserver/sublist.go: ", err) + log.Error("\tserver/sublist.go: ", zap.Error(err)) return nil } diff --git a/logger/logger.go b/logger/logger.go new file mode 100644 index 0000000..34ee617 --- /dev/null +++ b/logger/logger.go @@ -0,0 +1,51 @@ +/* Copyright (c) 2018, joy.zhou + */ + +package logger + +import ( + "go.uber.org/zap" +) + +var ( + // env can be setup at build time with Go Linker. Value could be prod or whatever else for dev env + env string + instance *zap.Logger + logCfg zap.Config +) + +// NewDevLogger return a logger for dev builds +func NewDevLogger() (*zap.Logger, error) { + logCfg := zap.NewDevelopmentConfig() + return logCfg.Build() +} + +// NewProdLogger return a logger for production builds +func NewProdLogger() (*zap.Logger, error) { + logCfg := zap.NewProductionConfig() + logCfg.DisableStacktrace = true + logCfg.Level = zap.NewAtomicLevelAt(zap.InfoLevel) + return logCfg.Build() +} + +func init() { + var err error + var log *zap.Logger + if env == "prod" { + log, err = NewProdLogger() + } else { + log, err = NewDevLogger() + } + if err != nil { + panic("Unable to create a logger.") + } + defer log.Sync() + + log.Debug("Logger initialization succeeded") + instance = log.Named("hmq") +} + +// Get return a *zap.Logger instance +func Get() *zap.Logger { + return instance +} diff --git a/logger/logger_test.go b/logger/logger_test.go new file mode 100644 index 0000000..c1291d4 --- /dev/null +++ b/logger/logger_test.go @@ -0,0 +1,33 @@ +/* Copyright (c) 2018, joy.zhou + */ +package logger + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "go.uber.org/zap" +) + +func TestGet(t *testing.T) { + var l *zap.Logger + logger := Get() + + assert.NotNil(t, logger) + assert.IsType(t, l, logger) +} + +func TestNewDevLogger(t *testing.T) { + logger, err := NewDevLogger() + + assert.Nil(t, err) + assert.True(t, logger.Core().Enabled(zap.DebugLevel)) +} + +func TestNewProdLogger(t *testing.T) { + logger, err := NewProdLogger() + + assert.Nil(t, err) + assert.False(t, logger.Core().Enabled(zap.DebugLevel)) +} diff --git a/main.go b/main.go index 5e8d5c0..691505b 100644 --- a/main.go +++ b/main.go @@ -12,45 +12,31 @@ import ( "runtime" "github.com/fhmq/hmq/broker" - - log "github.com/cihub/seelog" + "github.com/fhmq/hmq/logger" + "go.uber.org/zap" ) -func init() { - testConfig := ` - - - - - - - -` - - logger, err := log.LoggerFromConfigAsBytes([]byte(testConfig)) - if err != nil { - panic(err) - } - log.ReplaceLogger(logger) -} +var ( + log = logger.Get().Named("Main") +) func main() { runtime.GOMAXPROCS(runtime.NumCPU()) - config, er := broker.ConfigureConfig() - if er != nil { - log.Error("configure broker config error: ", er) + config, err := broker.ConfigureConfig() + if err != nil { + log.Error("configure broker config error: ", zap.Error(err)) return } b, err := broker.NewBroker(config) if err != nil { - log.Error("New Broker error: ", er) + log.Error("New Broker error: ", zap.Error(err)) return } b.Start() s := waitForSignal() - log.Infof("signal got: %v ,broker closed.", s) + log.Info("signal received, broker closed.", zap.Any("signal", s)) } func waitForSignal() os.Signal {