diff --git a/broker/auth.go b/broker/auth.go index d3e7dfc..a4b5d36 100644 --- a/broker/auth.go +++ b/broker/auth.go @@ -3,13 +3,10 @@ package broker import ( - "strings" - "github.com/fhmq/hmq/lib/acl" - - "go.uber.org/zap" - "github.com/fsnotify/fsnotify" + "go.uber.org/zap" + "strings" ) const ( @@ -44,10 +41,10 @@ func (b *Broker) handleFsEvent(event fsnotify.Event) error { case b.config.AclConf: if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create { - brokerLog.Info("text:handling acl config change event:", zap.String("filename", event.Name)) + log.Info("text:handling acl config change event:", zap.String("filename", event.Name)) aclconfig, err := acl.AclConfigLoad(event.Name) if err != nil { - brokerLog.Error("aclconfig change failed, load acl conf error: ", zap.Error(err)) + log.Error("aclconfig change failed, load acl conf error: ", zap.Error(err)) return err } b.AclConfig = aclconfig @@ -60,24 +57,24 @@ func (b *Broker) StartAclWatcher() { go func() { wch, e := fsnotify.NewWatcher() if e != nil { - brokerLog.Error("start monitor acl config file error,", zap.Error(e)) + log.Error("start monitor acl config file error,", zap.Error(e)) return } defer wch.Close() for _, i := range watchList { if err := wch.Add(i); err != nil { - brokerLog.Error("start monitor acl config file error,", zap.Error(err)) + log.Error("start monitor acl config file error,", zap.Error(err)) return } } - brokerLog.Info("watching acl config file change...") + log.Info("watching acl config file change...") for { select { case evt := <-wch.Events: b.handleFsEvent(evt) case err := <-wch.Errors: - brokerLog.Error("error:", zap.Error(err)) + log.Error("error:", zap.Error(err)) } } }() diff --git a/broker/broker.go b/broker/broker.go index de8d230..4a2f6d1 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -4,25 +4,24 @@ package broker import ( "crypto/tls" + "github.com/eclipse/paho.mqtt.golang/packets" + "github.com/fhmq/hmq/lib/acl" + "github.com/fhmq/hmq/pool" + "github.com/segmentio/fasthash/fnv1a" + "github.com/shirou/gopsutil/mem" + "go.uber.org/zap" + "golang.org/x/net/websocket" "net" "net/http" "runtime/debug" "sync" "sync/atomic" "time" - - "github.com/fhmq/hmq/lib/acl" - "github.com/fhmq/hmq/pool" - - "github.com/eclipse/paho.mqtt.golang/packets" - "github.com/shirou/gopsutil/mem" - "go.uber.org/zap" - - "golang.org/x/net/websocket" ) -var ( - brokerLog *zap.Logger +const ( + MessagePoolNum = 1024 + MessagePoolMessageNum = 1024 ) type Message struct { @@ -43,12 +42,21 @@ type Broker struct { remotes sync.Map nodes map[string]interface{} clusterPool chan *Message - messagePool chan *Message + messagePool []chan *Message sl *Sublist rl *RetainList queues map[string]int } +func newMessagePool() []chan *Message { + pool := make([]chan *Message, 0) + for i := 0; i < MessagePoolNum; i++ { + ch := make(chan *Message, MessagePoolMessageNum) + pool = append(pool, ch) + } + return pool +} + func NewBroker(config *Config) (*Broker, error) { b := &Broker{ id: GenUniqueId(), @@ -59,12 +67,12 @@ func NewBroker(config *Config) (*Broker, error) { nodes: make(map[string]interface{}), queues: make(map[string]int), clusterPool: make(chan *Message), - messagePool: make(chan *Message), + messagePool: newMessagePool(), } if b.config.TlsPort != "" { tlsconfig, err := NewTLSConfig(b.config.TlsInfo) if err != nil { - brokerLog.Error("new tlsConfig error", zap.Error(err)) + log.Error("new tlsConfig error", zap.Error(err)) return nil, err } b.tlsConfig = tlsconfig @@ -72,7 +80,7 @@ func NewBroker(config *Config) (*Broker, error) { if b.config.Acl { aclconfig, err := acl.AclConfigLoad(b.config.AclConf) if err != nil { - brokerLog.Error("Load acl conf error", zap.Error(err)) + log.Error("Load acl conf error", zap.Error(err)) return nil, err } b.AclConfig = aclconfig @@ -82,22 +90,26 @@ func NewBroker(config *Config) (*Broker, error) { } func (b *Broker) StartDispatcher() { - for { - msg, ok := <-b.messagePool - if !ok { - brokerLog.Error("read message from client channel error") - return - } - b.wpool.Submit(func() { - ProcessMessage(msg) - }) + for _, mpool := range b.messagePool { + go func(ch chan *Message) { + for { + msg, ok := <-ch + if !ok { + log.Error("read message from client channel error") + return + } + b.wpool.Submit(func() { + ProcessMessage(msg) + }) + } + }(mpool) } } func (b *Broker) Start() { if b == nil { - brokerLog.Error("broker is null") + log.Error("broker is null") return } go b.StartDispatcher() @@ -149,7 +161,7 @@ func StateMonitor() { func (b *Broker) StartWebsocketListening() { path := b.config.WsPath hp := ":" + b.config.WsPort - brokerLog.Info("Start Websocket Listener on:", zap.String("hp", hp), zap.String("path", path)) + log.Info("Start Websocket Listener on:", zap.String("hp", hp), zap.String("path", path)) http.Handle(path, websocket.Handler(b.wsHandler)) var err error if b.config.WsTLS { @@ -158,7 +170,7 @@ func (b *Broker) StartWebsocketListening() { err = http.ListenAndServe(hp, nil) } if err != nil { - brokerLog.Error("ListenAndServe:" + err.Error()) + log.Error("ListenAndServe:" + err.Error()) return } } @@ -167,7 +179,7 @@ func (b *Broker) wsHandler(ws *websocket.Conn) { // io.Copy(ws, ws) atomic.AddUint64(&b.cid, 1) ws.PayloadType = websocket.BinaryFrame - b.handleConnection(CLIENT, ws, b.cid) + b.handleConnection(CLIENT, ws) } func (b *Broker) StartClientListening(Tls bool) { @@ -177,14 +189,14 @@ func (b *Broker) StartClientListening(Tls bool) { if Tls { hp = b.config.TlsHost + ":" + b.config.TlsPort l, err = tls.Listen("tcp", hp, b.tlsConfig) - brokerLog.Info("Start TLS Listening client on ", zap.String("hp", hp)) + log.Info("Start TLS Listening client on ", zap.String("hp", hp)) } else { hp := b.config.Host + ":" + b.config.Port l, err = net.Listen("tcp", hp) - brokerLog.Info("Start Listening client on ", zap.String("hp", hp)) + log.Info("Start Listening client on ", zap.String("hp", hp)) } if err != nil { - brokerLog.Error("Error listening on ", zap.Error(err)) + log.Error("Error listening on ", zap.Error(err)) return } tmpDelay := 10 * ACCEPT_MIN_SLEEP @@ -192,7 +204,7 @@ func (b *Broker) StartClientListening(Tls bool) { conn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { - brokerLog.Error("Temporary Client Accept Error(%v), sleeping %dms", + log.Error("Temporary Client Accept Error(%v), sleeping %dms", zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond)) time.Sleep(tmpDelay) tmpDelay *= 2 @@ -200,13 +212,13 @@ func (b *Broker) StartClientListening(Tls bool) { tmpDelay = ACCEPT_MAX_SLEEP } } else { - brokerLog.Error("Accept error: %v", zap.Error(err)) + log.Error("Accept error: %v", zap.Error(err)) } continue } tmpDelay = ACCEPT_MIN_SLEEP atomic.AddUint64(&b.cid, 1) - go b.handleConnection(CLIENT, conn, b.cid) + go b.handleConnection(CLIENT, conn) } } @@ -219,7 +231,7 @@ func (b *Broker) Handshake(conn net.Conn) bool { // Force handshake if err := nc.Handshake(); err != nil { - brokerLog.Error("TLS handshake error, ", zap.Error(err)) + log.Error("TLS handshake error, ", zap.Error(err)) return false } nc.SetReadDeadline(time.Time{}) @@ -235,28 +247,27 @@ func TlsTimeout(conn *tls.Conn) { } cs := nc.ConnectionState() if !cs.HandshakeComplete { - brokerLog.Error("TLS handshake timeout") + log.Error("TLS handshake timeout") nc.Close() } } func (b *Broker) StartClusterListening() { var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port - brokerLog.Info("Start Listening cluster on ", zap.String("hp", hp)) + log.Info("Start Listening cluster on ", zap.String("hp", hp)) l, e := net.Listen("tcp", hp) if e != nil { - brokerLog.Error("Error listening on ", zap.Error(e)) + log.Error("Error listening on ", zap.Error(e)) return } - var idx uint64 = 0 tmpDelay := 10 * ACCEPT_MIN_SLEEP for { conn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { - brokerLog.Error("Temporary Client Accept Error(%v), sleeping %dms", + log.Error("Temporary Client Accept Error(%v), sleeping %dms", zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond)) time.Sleep(tmpDelay) tmpDelay *= 2 @@ -264,30 +275,30 @@ func (b *Broker) StartClusterListening() { tmpDelay = ACCEPT_MAX_SLEEP } } else { - brokerLog.Error("Accept error: %v", zap.Error(err)) + log.Error("Accept error: %v", zap.Error(err)) } continue } tmpDelay = ACCEPT_MIN_SLEEP - go b.handleConnection(ROUTER, conn, idx) + go b.handleConnection(ROUTER, conn) } } -func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { +func (b *Broker) handleConnection(typ int, conn net.Conn) { //process connect packet packet, err := packets.ReadPacket(conn) if err != nil { - brokerLog.Error("read connect packet error: ", zap.Error(err)) + log.Error("read connect packet error: ", zap.Error(err)) return } if packet == nil { - brokerLog.Error("received nil packet") + log.Error("received nil packet") return } msg, ok := packet.(*packets.ConnectPacket) if !ok { - brokerLog.Error("received msg that was not Connect") + log.Error("received msg that was not Connect") return } connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket) @@ -295,7 +306,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { connack.SessionPresent = msg.CleanSession err = connack.Write(conn) if err != nil { - brokerLog.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier)) + log.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier)) return } @@ -335,7 +346,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { case CLIENT: old, exist = b.clients.Load(cid) if exist { - brokerLog.Warn("client exist, close old...", zap.String("clientID", c.info.clientID)) + log.Warn("client exist, close old...", zap.String("clientID", c.info.clientID)) ol, ok := old.(*client) if ok { ol.Close() @@ -345,7 +356,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { case ROUTER: old, exist = b.routes.Load(cid) if exist { - brokerLog.Warn("router exist, close old...") + log.Warn("router exist, close old...") ol, ok := old.(*client) if ok { ol.Close() @@ -354,7 +365,9 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { b.routes.Store(cid, c) } - c.readLoop(b.messagePool) + mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum] + + c.readLoop(mpool) } func (b *Broker) ConnectToDiscovery() { @@ -364,8 +377,8 @@ func (b *Broker) ConnectToDiscovery() { for { conn, err = net.Dial("tcp", b.config.Router) if err != nil { - brokerLog.Error("Error trying to connect to route: ", zap.Error(err)) - brokerLog.Debug("Connect to route timeout ,retry...") + log.Error("Error trying to connect to route: ", zap.Error(err)) + log.Debug("Connect to route timeout ,retry...") if 0 == tempDelay { tempDelay = 1 * time.Second @@ -381,7 +394,7 @@ func (b *Broker) ConnectToDiscovery() { } break } - brokerLog.Debug("connect to router success :", zap.String("Router", b.config.Router)) + log.Debug("connect to router success :", zap.String("Router", b.config.Router)) cid := b.id info := info{ @@ -409,7 +422,7 @@ func (b *Broker) processClusterInfo() { for { msg, ok := <-b.clusterPool if !ok { - brokerLog.Error("read message from cluster channel error") + log.Error("read message from cluster channel error") return } ProcessMessage(msg) @@ -431,13 +444,13 @@ func (b *Broker) connectRouter(id, addr string) { conn, err = net.Dial("tcp", addr) if err != nil { - brokerLog.Error("Error trying to connect to route: ", zap.Error(err)) + log.Error("Error trying to connect to route: ", zap.Error(err)) if retryTimes > 50 { return } - brokerLog.Debug("Connect to route timeout ,retry...") + log.Debug("Connect to route timeout ,retry...") if 0 == timeDelay { timeDelay = 1 * time.Second @@ -477,7 +490,8 @@ func (b *Broker) connectRouter(id, addr string) { c.SendConnect() - go c.readLoop(b.messagePool) + mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum] + go c.readLoop(mpool) go c.StartPing() } @@ -536,7 +550,7 @@ func (b *Broker) SendLocalSubsToRouter(c *client) { if len(subInfo.Topics) > 0 { err := c.WriterPacket(subInfo) if err != nil { - brokerLog.Error("Send localsubs To Router error :", zap.Error(err)) + log.Error("Send localsubs To Router error :", zap.Error(err)) } } } @@ -553,7 +567,7 @@ func (b *Broker) BroadcastInfoMessage(remoteID string, msg *packets.PublishPacke return true }) - // brokerLog.Info("BroadcastInfoMessage success ") + // log.Info("BroadcastInfoMessage success ") } func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) { @@ -565,7 +579,7 @@ func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) { } return true }) - // brokerLog.Info("BroadcastSubscribeMessage remotes: ", s.remotes) + // log.Info("BroadcastSubscribeMessage remotes: ", s.remotes) } func (b *Broker) removeClient(c *client) { @@ -579,7 +593,7 @@ func (b *Broker) removeClient(c *client) { case REMOTE: b.remotes.Delete(clientId) } - // brokerLog.Info("delete client ,", clientId) + // log.Info("delete client ,", clientId) } func (b *Broker) PublishMessage(packet *packets.PublishPacket) { @@ -593,7 +607,7 @@ func (b *Broker) PublishMessage(packet *packets.PublishPacket) { if sub != nil { err := sub.client.WriterPacket(packet) if err != nil { - brokerLog.Error("process message for psub error, ", zap.Error(err)) + log.Error("process message for psub error, ", zap.Error(err)) } } } diff --git a/broker/client.go b/broker/client.go index 4fdc5ab..33636f3 100644 --- a/broker/client.go +++ b/broker/client.go @@ -3,14 +3,13 @@ package broker import ( + "github.com/eclipse/paho.mqtt.golang/packets" + "go.uber.org/zap" "net" "reflect" "strings" "sync" "time" - - "github.com/eclipse/paho.mqtt.golang/packets" - "go.uber.org/zap" ) const ( @@ -100,7 +99,7 @@ func (c *client) keepAlive(ch chan int, mpool chan *Message) { timer.Reset(keepalive) continue } - brokerLog.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive)) + log.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive)) msg := &Message{client: c, packet: DisconnectdPacket} mpool <- msg timer.Stop() @@ -125,7 +124,7 @@ func (c *client) readLoop(mpool chan *Message) { for { packet, err := packets.ReadPacket(nc) if err != nil { - brokerLog.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) break } // keepalive channel @@ -149,7 +148,7 @@ func ProcessMessage(msg *Message) { return } - brokerLog.Debug("Recv message:", zap.String("message type", reflect.TypeOf(msg.packet).String()[9:]), zap.String("ClientID", c.info.clientID)) + log.Debug("Recv message:", zap.String("message type", reflect.TypeOf(msg.packet).String()[9:]), zap.String("ClientID", c.info.clientID)) switch ca.(type) { case *packets.ConnackPacket: case *packets.ConnectPacket: @@ -174,7 +173,7 @@ func ProcessMessage(msg *Message) { case *packets.DisconnectPacket: c.Close() default: - brokerLog.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID)) + log.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID)) } } @@ -190,7 +189,7 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) { } if !c.CheckTopicAuth(PUB, topic) { - brokerLog.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) + log.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) return } @@ -201,21 +200,21 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) { puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket) puback.MessageID = packet.MessageID if err := c.WriterPacket(puback); err != nil { - brokerLog.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + log.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) return } c.ProcessPublishMessage(packet) case QosExactlyOnce: return default: - brokerLog.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID)) + log.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID)) return } if packet.Retain { if b := c.broker; b != nil { err := b.rl.Insert(topic, packet) if err != nil { - brokerLog.Error("Insert Retain Message error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + log.Error("Insert Retain Message error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) } } } @@ -235,7 +234,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { topic := packet.TopicName r := b.sl.Match(topic) - // brokerLog.Info("psubs num: ", len(r.psubs)) + // log.Info("psubs num: ", len(r.psubs)) if len(r.qsubs) == 0 && len(r.psubs) == 0 { return } @@ -249,7 +248,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { if sub != nil { err := sub.client.WriterPacket(packet) if err != nil { - brokerLog.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) } } } @@ -259,7 +258,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { t := "$queue/" + topic cnt, exist := b.queues[t] if exist { - // brokerLog.Info("queue index : ", cnt) + // log.Info("queue index : ", cnt) for _, sub := range r.qsubs { if sub.client.typ == ROUTER { if typ != CLIENT { @@ -275,7 +274,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { if sub != nil { err := sub.client.WriterPacket(packet) if err != nil { - brokerLog.Error("send publish error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + log.Error("send publish error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) } } @@ -329,7 +328,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { t := topic //check topic auth for client if !c.CheckTopicAuth(SUB, topic) { - brokerLog.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) + log.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) retcodes = append(retcodes, QosFailure) continue } @@ -376,7 +375,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { } err := b.sl.Insert(sub) if err != nil { - brokerLog.Error("Insert subscription error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + log.Error("Insert subscription error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) retcodes = append(retcodes, QosFailure) } else { retcodes = append(retcodes, qoss[i]) @@ -386,7 +385,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { err := c.WriterPacket(suback) if err != nil { - brokerLog.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + log.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) return } //broadcast subscribe message @@ -398,7 +397,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { for _, t := range topics { packets := b.rl.Match(t) for _, packet := range packets { - brokerLog.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID)) + log.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID)) if packet != nil { c.WriterPacket(packet) } @@ -445,7 +444,7 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) { err := c.WriterPacket(unsuback) if err != nil { - brokerLog.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + log.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) return } // //process ubsubscribe message @@ -474,7 +473,7 @@ func (c *client) ProcessPing() { resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket) err := c.WriterPacket(resp) if err != nil { - brokerLog.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + log.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) return } } @@ -505,7 +504,7 @@ func (c *client) Close() { for _, sub := range subs { err := b.sl.Remove(sub) if err != nil { - brokerLog.Error("closed client but remove sublist error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + log.Error("closed client but remove sublist error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) } } if c.typ == CLIENT { diff --git a/broker/config.go b/broker/config.go index eea8b84..e354988 100644 --- a/broker/config.go +++ b/broker/config.go @@ -9,11 +9,10 @@ import ( "errors" "flag" "fmt" - "io/ioutil" - "os" - "github.com/fhmq/hmq/logger" "go.uber.org/zap" + "io/ioutil" + "os" ) type Config struct { @@ -52,6 +51,10 @@ var DefaultConfig *Config = &Config{ Acl: false, } +var ( + log *zap.Logger +) + func showHelp() { fmt.Printf("%s\n", usageStr) os.Exit(0) @@ -105,7 +108,7 @@ func ConfigureConfig(args []string) (*Config, error) { }) logger.InitLogger(config.Debug) - brokerLog = logger.Get().Named("Broker") + log = logger.Get().Named("Broker") if configFile != "" { tmpConfig, e := LoadConfig(configFile) @@ -128,15 +131,15 @@ func LoadConfig(filename string) (*Config, error) { content, err := ioutil.ReadFile(filename) if err != nil { - brokerLog.Error("Read config file error: ", zap.Error(err)) + log.Error("Read config file error: ", zap.Error(err)) return nil, err } - // brokerLog.Info(string(content)) + // log.Info(string(content)) var config Config err = json.Unmarshal(content, &config) if err != nil { - brokerLog.Error("Unmarshal config file error: ", zap.Error(err)) + log.Error("Unmarshal config file error: ", zap.Error(err)) return nil, err } @@ -168,7 +171,7 @@ func (config *Config) check() error { if config.TlsPort != "" { if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" { - brokerLog.Error("tls config error, no cert or key file.") + log.Error("tls config error, no cert or key file.") return errors.New("tls config error, no cert or key file.") } if config.TlsHost == "" { diff --git a/broker/info.go b/broker/info.go index e167767..1e6e715 100644 --- a/broker/info.go +++ b/broker/info.go @@ -4,12 +4,10 @@ package broker import ( "fmt" - "time" - + simplejson "github.com/bitly/go-simplejson" "github.com/eclipse/paho.mqtt.golang/packets" "go.uber.org/zap" - - simplejson "github.com/bitly/go-simplejson" + "time" ) func (c *client) SendInfo() { @@ -21,7 +19,7 @@ func (c *client) SendInfo() { infoMsg := NewInfo(c.broker.id, url, false) err := c.WriterPacket(infoMsg) if err != nil { - brokerLog.Error("send info message error, ", zap.Error(err)) + log.Error("send info message error, ", zap.Error(err)) return } } @@ -34,7 +32,7 @@ func (c *client) StartPing() { case <-timeTicker.C: err := c.WriterPacket(ping) if err != nil { - brokerLog.Error("ping error: ", zap.Error(err)) + log.Error("ping error: ", zap.Error(err)) c.Close() } case _, ok := <-c.closed: @@ -57,10 +55,10 @@ func (c *client) SendConnect() { m.Keepalive = uint16(60) err := c.WriterPacket(m) if err != nil { - brokerLog.Error("send connect message error, ", zap.Error(err)) + log.Error("send connect message error, ", zap.Error(err)) return } - brokerLog.Info("send connect success") + log.Info("send connect success") } func NewInfo(sid, url string, isforword bool) *packets.PublishPacket { @@ -69,7 +67,7 @@ func NewInfo(sid, url string, isforword bool) *packets.PublishPacket { pub.TopicName = BrokerInfoTopic pub.Retain = false info := fmt.Sprintf(`{"brokerID":"%s","brokerUrl":"%s"}`, sid, url) - // brokerLog.Info("new info", string(info)) + // log.Info("new info", string(info)) pub.Payload = []byte(info) return pub } @@ -81,17 +79,17 @@ func (c *client) ProcessInfo(packet *packets.PublishPacket) { return } - brokerLog.Info("recv remoteInfo: ", zap.String("payload", string(packet.Payload))) + log.Info("recv remoteInfo: ", zap.String("payload", string(packet.Payload))) js, err := simplejson.NewJson(packet.Payload) if err != nil { - brokerLog.Warn("parse info message err", zap.Error(err)) + log.Warn("parse info message err", zap.Error(err)) return } routes, err := js.Get("data").Map() if routes == nil { - brokerLog.Error("receive info message error, ", zap.Error(err)) + log.Error("receive info message error, ", zap.Error(err)) return } diff --git a/broker/retain.go b/broker/retain.go index a13f8a8..1c5cf12 100644 --- a/broker/retain.go +++ b/broker/retain.go @@ -1,9 +1,8 @@ package broker import ( - "sync" - "github.com/eclipse/paho.mqtt.golang/packets" + "sync" ) type RetainList struct { @@ -39,7 +38,7 @@ func (r *RetainList) Insert(topic string, buf *packets.PublishPacket) error { if err != nil { return err } - // brokerLog.Info("insert tokens:", tokens) + // log.Info("insert tokens:", tokens) r.Lock() l := r.root @@ -72,7 +71,7 @@ func (r *RetainList) Match(topic string) []*packets.PublishPacket { l := r.root matchRLevel(l, tokens, results) r.Unlock() - // brokerLog.Info("results: ", results) + // log.Info("results: ", results) return results.msg } @@ -82,7 +81,7 @@ func matchRLevel(l *rlevel, toks []string, results *RetainResult) { if l == nil { return } - // brokerLog.Info("l info :", l.nodes) + // log.Info("l info :", l.nodes) if t == "#" { for _, n := range l.nodes { n.GetAll(results) @@ -111,7 +110,7 @@ func matchRLevel(l *rlevel, toks []string, results *RetainResult) { } func (r *rnode) GetAll(results *RetainResult) { - // brokerLog.Info("node 's message: ", string(r.msg)) + // log.Info("node 's message: ", string(r.msg)) if r.msg != nil { results.msg = append(results.msg, r.msg) } diff --git a/broker/sublist.go b/broker/sublist.go index 19f9cce..e1d8ecc 100644 --- a/broker/sublist.go +++ b/broker/sublist.go @@ -4,9 +4,8 @@ package broker import ( "errors" - "sync" - "go.uber.org/zap" + "sync" ) // A result structure better optimized for queue subs. @@ -211,7 +210,7 @@ func (s *Sublist) Match(topic string) *SublistResult { tokens, err := PublishTopicCheckAndSpilt(topic) if err != nil { - brokerLog.Error("\tserver/sublist.go: ", zap.Error(err)) + log.Error("\tserver/sublist.go: ", zap.Error(err)) return nil } diff --git a/main.go b/main.go index 0062ff6..f58d31d 100644 --- a/main.go +++ b/main.go @@ -8,11 +8,10 @@ package main import ( "fmt" + "github.com/fhmq/hmq/broker" "os" "os/signal" "runtime" - - "github.com/fhmq/hmq/broker" ) func main() {