diff --git a/README.md b/README.md index 8894fc8..66ea299 100644 --- a/README.md +++ b/README.md @@ -16,43 +16,26 @@ $ go run main.go ## Usage of hmq: ~~~ -Usage of ./hmq: - -w int - worker num to process message, perfer (client num)/10. (default 1024) - -worker int - worker num to process message, perfer (client num)/10. (default 1024) - -h string - Network host to listen on. (default "0.0.0.0") - -host string - Network host to listen on. (default "0.0.0.0") - -p string - Port to listen on. (default "1883") - -port string - Port to listen on. (default "1883") - -c string - config file for hmq - -config string - config file for hmq - -cluster string - Cluster ip from which members can connect. - -cluster_listen string - Cluster ip from which members can connect. - -cluster_port string - Cluster port from which members can connect. - -cp string - Cluster port from which members can connect. - -r string - Router who maintenance cluster info - -router string - Router who maintenance cluster info - -ws_path string - path for ws to listen on - -ws_port string - port for ws to listen on - -wspath string - path for ws to listen on - -wsport string - port for ws to listen on +Usage: hmq [options] + +Broker Options: + -w, --worker Worker num to process message, perfer (client num)/10. (default 1024) + -p, --port Use port for clients (default: 1883) + --host Network host to listen on. (default "0.0.0.0") + -ws, --wsport Use port for websocket monitoring + -wsp,--wspath Use path for websocket monitoring + -c, --config Configuration file + +Logging Options: + -d, --debug Enable debugging output (default false) + -D Debug and trace + +Cluster Options: + -r, --router Router who maintenance cluster info + -cp, --clusterport Cluster listen port for others + +Common Options: + -h, --help Show this message ~~~ ### hmq.config @@ -105,6 +88,9 @@ Usage of ./hmq: ### Cluster ```bash 1, start router for hmq (https://github.com/fhmq/router.git) + $ go get github.com/fhmq/router + $ cd $GOPATH/github.com/fhmq/router + $ go run main.go 2, config router in hmq.config ("router": "127.0.0.1:9888") ``` diff --git a/broker/auth.go b/broker/auth.go index 02bcd4a..d3e7dfc 100644 --- a/broker/auth.go +++ b/broker/auth.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/fhmq/hmq/lib/acl" + "go.uber.org/zap" "github.com/fsnotify/fsnotify" @@ -43,10 +44,10 @@ func (b *Broker) handleFsEvent(event fsnotify.Event) error { case b.config.AclConf: if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create { - log.Info("text:handling acl config change event:", zap.String("filename", event.Name)) + brokerLog.Info("text:handling acl config change event:", zap.String("filename", event.Name)) aclconfig, err := acl.AclConfigLoad(event.Name) if err != nil { - log.Error("aclconfig change failed, load acl conf error: ", zap.Error(err)) + brokerLog.Error("aclconfig change failed, load acl conf error: ", zap.Error(err)) return err } b.AclConfig = aclconfig @@ -59,24 +60,24 @@ func (b *Broker) StartAclWatcher() { go func() { wch, e := fsnotify.NewWatcher() if e != nil { - log.Error("start monitor acl config file error,", zap.Error(e)) + brokerLog.Error("start monitor acl config file error,", zap.Error(e)) return } defer wch.Close() for _, i := range watchList { if err := wch.Add(i); err != nil { - log.Error("start monitor acl config file error,", zap.Error(err)) + brokerLog.Error("start monitor acl config file error,", zap.Error(err)) return } } - log.Info("watching acl config file change...") + brokerLog.Info("watching acl config file change...") for { select { case evt := <-wch.Events: b.handleFsEvent(evt) case err := <-wch.Errors: - log.Error("error:", zap.Error(err)) + brokerLog.Error("error:", zap.Error(err)) } } }() diff --git a/broker/broker.go b/broker/broker.go index 4b09bf9..de8d230 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -12,49 +12,59 @@ import ( "time" "github.com/fhmq/hmq/lib/acl" + "github.com/fhmq/hmq/pool" "github.com/eclipse/paho.mqtt.golang/packets" "github.com/shirou/gopsutil/mem" "go.uber.org/zap" "golang.org/x/net/websocket" - - "github.com/fhmq/hmq/logger" ) var ( - log = logger.Get().Named("Broker") + brokerLog *zap.Logger ) +type Message struct { + client *client + packet packets.ControlPacket +} + type Broker struct { - id string - cid uint64 - mu sync.Mutex - config *Config - tlsConfig *tls.Config - AclConfig *acl.ACLConfig - clients sync.Map - routes sync.Map - remotes sync.Map - nodes map[string]interface{} - sl *Sublist - rl *RetainList - queues map[string]int + id string + cid uint64 + mu sync.Mutex + config *Config + tlsConfig *tls.Config + AclConfig *acl.ACLConfig + wpool *pool.WorkerPool + clients sync.Map + routes sync.Map + remotes sync.Map + nodes map[string]interface{} + clusterPool chan *Message + messagePool chan *Message + sl *Sublist + rl *RetainList + queues map[string]int } func NewBroker(config *Config) (*Broker, error) { b := &Broker{ - id: GenUniqueId(), - config: config, - sl: NewSublist(), - rl: NewRetainList(), - nodes: make(map[string]interface{}), - queues: make(map[string]int), + id: GenUniqueId(), + config: config, + wpool: pool.New(config.Worker), + sl: NewSublist(), + rl: NewRetainList(), + nodes: make(map[string]interface{}), + queues: make(map[string]int), + clusterPool: make(chan *Message), + messagePool: make(chan *Message), } if b.config.TlsPort != "" { tlsconfig, err := NewTLSConfig(b.config.TlsInfo) if err != nil { - log.Error("new tlsConfig error", zap.Error(err)) + brokerLog.Error("new tlsConfig error", zap.Error(err)) return nil, err } b.tlsConfig = tlsconfig @@ -62,7 +72,7 @@ func NewBroker(config *Config) (*Broker, error) { if b.config.Acl { aclconfig, err := acl.AclConfigLoad(b.config.AclConf) if err != nil { - log.Error("Load acl conf error", zap.Error(err)) + brokerLog.Error("Load acl conf error", zap.Error(err)) return nil, err } b.AclConfig = aclconfig @@ -71,12 +81,26 @@ func NewBroker(config *Config) (*Broker, error) { return b, nil } +func (b *Broker) StartDispatcher() { + for { + msg, ok := <-b.messagePool + if !ok { + brokerLog.Error("read message from client channel error") + return + } + b.wpool.Submit(func() { + ProcessMessage(msg) + }) + } + +} + func (b *Broker) Start() { if b == nil { - log.Error("broker is null") + brokerLog.Error("broker is null") return } - StartDispatcher() + go b.StartDispatcher() //listen clinet over tcp if b.config.Port != "" { @@ -100,6 +124,7 @@ func (b *Broker) Start() { //connect on other node in cluster if b.config.Router != "" { + go b.processClusterInfo() b.ConnectToDiscovery() } @@ -124,7 +149,7 @@ func StateMonitor() { func (b *Broker) StartWebsocketListening() { path := b.config.WsPath hp := ":" + b.config.WsPort - log.Info("Start Websocket Listening on ", zap.String("hp", hp), zap.String("path", path)) + brokerLog.Info("Start Websocket Listener on:", zap.String("hp", hp), zap.String("path", path)) http.Handle(path, websocket.Handler(b.wsHandler)) var err error if b.config.WsTLS { @@ -133,7 +158,7 @@ func (b *Broker) StartWebsocketListening() { err = http.ListenAndServe(hp, nil) } if err != nil { - log.Error("ListenAndServe: " + err.Error()) + brokerLog.Error("ListenAndServe:" + err.Error()) return } } @@ -152,14 +177,14 @@ func (b *Broker) StartClientListening(Tls bool) { if Tls { hp = b.config.TlsHost + ":" + b.config.TlsPort l, err = tls.Listen("tcp", hp, b.tlsConfig) - log.Info("Start TLS Listening client on ", zap.String("hp", hp)) + brokerLog.Info("Start TLS Listening client on ", zap.String("hp", hp)) } else { hp := b.config.Host + ":" + b.config.Port l, err = net.Listen("tcp", hp) - log.Info("Start Listening client on ", zap.String("hp", hp)) + brokerLog.Info("Start Listening client on ", zap.String("hp", hp)) } if err != nil { - log.Error("Error listening on ", zap.Error(err)) + brokerLog.Error("Error listening on ", zap.Error(err)) return } tmpDelay := 10 * ACCEPT_MIN_SLEEP @@ -167,7 +192,7 @@ func (b *Broker) StartClientListening(Tls bool) { conn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { - log.Error("Temporary Client Accept Error(%v), sleeping %dms", + brokerLog.Error("Temporary Client Accept Error(%v), sleeping %dms", zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond)) time.Sleep(tmpDelay) tmpDelay *= 2 @@ -175,7 +200,7 @@ func (b *Broker) StartClientListening(Tls bool) { tmpDelay = ACCEPT_MAX_SLEEP } } else { - log.Error("Accept error: %v", zap.Error(err)) + brokerLog.Error("Accept error: %v", zap.Error(err)) } continue } @@ -194,7 +219,7 @@ func (b *Broker) Handshake(conn net.Conn) bool { // Force handshake if err := nc.Handshake(); err != nil { - log.Error("TLS handshake error, ", zap.Error(err)) + brokerLog.Error("TLS handshake error, ", zap.Error(err)) return false } nc.SetReadDeadline(time.Time{}) @@ -210,18 +235,18 @@ func TlsTimeout(conn *tls.Conn) { } cs := nc.ConnectionState() if !cs.HandshakeComplete { - log.Error("TLS handshake timeout") + brokerLog.Error("TLS handshake timeout") nc.Close() } } func (b *Broker) StartClusterListening() { var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port - log.Info("Start Listening cluster on ", zap.String("hp", hp)) + brokerLog.Info("Start Listening cluster on ", zap.String("hp", hp)) l, e := net.Listen("tcp", hp) if e != nil { - log.Error("Error listening on ", zap.Error(e)) + brokerLog.Error("Error listening on ", zap.Error(e)) return } @@ -231,7 +256,7 @@ func (b *Broker) StartClusterListening() { conn, err := l.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { - log.Error("Temporary Client Accept Error(%v), sleeping %dms", + brokerLog.Error("Temporary Client Accept Error(%v), sleeping %dms", zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond)) time.Sleep(tmpDelay) tmpDelay *= 2 @@ -239,7 +264,7 @@ func (b *Broker) StartClusterListening() { tmpDelay = ACCEPT_MAX_SLEEP } } else { - log.Error("Accept error: %v", zap.Error(err)) + brokerLog.Error("Accept error: %v", zap.Error(err)) } continue } @@ -253,16 +278,16 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { //process connect packet packet, err := packets.ReadPacket(conn) if err != nil { - log.Error("read connect packet error: ", zap.Error(err)) + brokerLog.Error("read connect packet error: ", zap.Error(err)) return } if packet == nil { - log.Error("received nil packet") + brokerLog.Error("received nil packet") return } msg, ok := packet.(*packets.ConnectPacket) if !ok { - log.Error("received msg that was not Connect") + brokerLog.Error("received msg that was not Connect") return } connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket) @@ -270,7 +295,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { connack.SessionPresent = msg.CleanSession err = connack.Write(conn) if err != nil { - log.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier)) + brokerLog.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier)) return } @@ -303,40 +328,33 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { cid := c.info.clientID - var msgPool *MessagePool var exist bool var old interface{} switch typ { case CLIENT: - msgPool = MSGPool[idx%MessagePoolNum].GetPool() - c.mp = msgPool old, exist = b.clients.Load(cid) if exist { - log.Warn("client exist, close old...", zap.String("clientID", c.info.clientID)) + brokerLog.Warn("client exist, close old...", zap.String("clientID", c.info.clientID)) ol, ok := old.(*client) if ok { - msg := &Message{client: c, packet: DisconnectdPacket} - ol.mp.queue <- msg + ol.Close() } } b.clients.Store(cid, c) case ROUTER: - msgPool = MSGPool[(MessagePoolNum + idx)].GetPool() - c.mp = msgPool old, exist = b.routes.Load(cid) if exist { - log.Warn("router exist, close old...") + brokerLog.Warn("router exist, close old...") ol, ok := old.(*client) if ok { - msg := &Message{client: c, packet: DisconnectdPacket} - ol.mp.queue <- msg + ol.Close() } } b.routes.Store(cid, c) } - c.readLoop() + c.readLoop(b.messagePool) } func (b *Broker) ConnectToDiscovery() { @@ -346,8 +364,8 @@ func (b *Broker) ConnectToDiscovery() { for { conn, err = net.Dial("tcp", b.config.Router) if err != nil { - log.Error("Error trying to connect to route: ", zap.Error(err)) - log.Debug("Connect to route timeout ,retry...") + brokerLog.Error("Error trying to connect to route: ", zap.Error(err)) + brokerLog.Debug("Connect to route timeout ,retry...") if 0 == tempDelay { tempDelay = 1 * time.Second @@ -363,7 +381,7 @@ func (b *Broker) ConnectToDiscovery() { } break } - log.Debug("connect to router success :", zap.String("Router", b.config.Router)) + brokerLog.Debug("connect to router success :", zap.String("Router", b.config.Router)) cid := b.id info := info{ @@ -383,11 +401,22 @@ func (b *Broker) ConnectToDiscovery() { c.SendConnect() c.SendInfo() - c.mp = &MSGPool[(MessagePoolNum + 2)] - go c.readLoop() + go c.readLoop(b.clusterPool) go c.StartPing() } +func (b *Broker) processClusterInfo() { + for { + msg, ok := <-b.clusterPool + if !ok { + brokerLog.Error("read message from cluster channel error") + return + } + ProcessMessage(msg) + } + +} + func (b *Broker) connectRouter(id, addr string) { var conn net.Conn var err error @@ -402,13 +431,13 @@ func (b *Broker) connectRouter(id, addr string) { conn, err = net.Dial("tcp", addr) if err != nil { - log.Error("Error trying to connect to route: ", zap.Error(err)) + brokerLog.Error("Error trying to connect to route: ", zap.Error(err)) if retryTimes > 50 { return } - log.Debug("Connect to route timeout ,retry...") + brokerLog.Debug("Connect to route timeout ,retry...") if 0 == timeDelay { timeDelay = 1 * time.Second @@ -446,12 +475,9 @@ func (b *Broker) connectRouter(id, addr string) { c.init() b.remotes.Store(cid, c) - c.mp = MSGPool[(MessagePoolNum + 1)].GetPool() - c.SendConnect() - // c.SendInfo() - go c.readLoop() + go c.readLoop(b.messagePool) go c.StartPing() } @@ -510,7 +536,7 @@ func (b *Broker) SendLocalSubsToRouter(c *client) { if len(subInfo.Topics) > 0 { err := c.WriterPacket(subInfo) if err != nil { - log.Error("Send localsubs To Router error :", zap.Error(err)) + brokerLog.Error("Send localsubs To Router error :", zap.Error(err)) } } } @@ -527,7 +553,7 @@ func (b *Broker) BroadcastInfoMessage(remoteID string, msg *packets.PublishPacke return true }) - // log.Info("BroadcastInfoMessage success ") + // brokerLog.Info("BroadcastInfoMessage success ") } func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) { @@ -539,7 +565,7 @@ func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) { } return true }) - // log.Info("BroadcastSubscribeMessage remotes: ", s.remotes) + // brokerLog.Info("BroadcastSubscribeMessage remotes: ", s.remotes) } func (b *Broker) removeClient(c *client) { @@ -553,7 +579,7 @@ func (b *Broker) removeClient(c *client) { case REMOTE: b.remotes.Delete(clientId) } - // log.Info("delete client ,", clientId) + // brokerLog.Info("delete client ,", clientId) } func (b *Broker) PublishMessage(packet *packets.PublishPacket) { @@ -567,7 +593,7 @@ func (b *Broker) PublishMessage(packet *packets.PublishPacket) { if sub != nil { err := sub.client.WriterPacket(packet) if err != nil { - log.Error("process message for psub error, ", zap.Error(err)) + brokerLog.Error("process message for psub error, ", zap.Error(err)) } } } diff --git a/broker/client.go b/broker/client.go index 09dede6..941c2b2 100644 --- a/broker/client.go +++ b/broker/client.go @@ -4,6 +4,7 @@ package broker import ( "net" + "reflect" "strings" "sync" "time" @@ -38,7 +39,6 @@ type client struct { status int closed chan int smu sync.RWMutex - mp *MessagePool subs map[string]*subscription rsubs map[string]*subInfo } @@ -86,11 +86,10 @@ func (c *client) init() { c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0] } -func (c *client) keepAlive(ch chan int) { +func (c *client) keepAlive(ch chan int, mpool chan *Message) { defer close(ch) keepalive := time.Duration(c.info.keepalive*3/2) * time.Second timer := time.NewTimer(keepalive) - msgPool := c.mp for { select { @@ -101,9 +100,9 @@ func (c *client) keepAlive(ch chan int) { timer.Reset(keepalive) continue } - log.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive)) + brokerLog.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive)) msg := &Message{client: c, packet: DisconnectdPacket} - msgPool.queue <- msg + mpool <- msg timer.Stop() return case _, ok := <-c.closed: @@ -114,34 +113,33 @@ func (c *client) keepAlive(ch chan int) { } } -func (c *client) readLoop() { +func (c *client) readLoop(mpool chan *Message) { nc := c.conn - msgPool := c.mp - if nc == nil || msgPool == nil { + if nc == nil || mpool == nil { return } ch := make(chan int, 1000) - go c.keepAlive(ch) + go c.keepAlive(ch, mpool) for { packet, err := packets.ReadPacket(nc) if err != nil { - log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + brokerLog.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) break } - + // keepalive channel ch <- 1 msg := &Message{ client: c, packet: packet, } - msgPool.queue <- msg + mpool <- msg } + msg := &Message{client: c, packet: DisconnectdPacket} - msgPool.queue <- msg - msgPool.Reduce() + mpool <- msg } func ProcessMessage(msg *Message) { @@ -150,10 +148,10 @@ func ProcessMessage(msg *Message) { if ca == nil { return } - log.Debug("Recv message from client,", zap.String("ClientID", c.info.clientID)) + + brokerLog.Debug("Recv message:", zap.String("message type", reflect.TypeOf(msg.packet).String()[9:]), zap.String("ClientID", c.info.clientID)) switch ca.(type) { case *packets.ConnackPacket: - case *packets.ConnectPacket: case *packets.PublishPacket: packet := ca.(*packets.PublishPacket) @@ -176,7 +174,7 @@ func ProcessMessage(msg *Message) { case *packets.DisconnectPacket: c.Close() default: - log.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID)) + brokerLog.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID)) } } @@ -192,7 +190,7 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) { } if !c.CheckTopicAuth(PUB, topic) { - log.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) + brokerLog.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) return } @@ -203,21 +201,21 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) { puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket) puback.MessageID = packet.MessageID if err := c.WriterPacket(puback); err != nil { - log.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + brokerLog.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) return } c.ProcessPublishMessage(packet) case QosExactlyOnce: return default: - log.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID)) + brokerLog.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID)) return } if packet.Retain { if b := c.broker; b != nil { err := b.rl.Insert(topic, packet) if err != nil { - log.Error("Insert Retain Message error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + brokerLog.Error("Insert Retain Message error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) } } } @@ -237,7 +235,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { topic := packet.TopicName r := b.sl.Match(topic) - // log.Info("psubs num: ", len(r.psubs)) + // brokerLog.Info("psubs num: ", len(r.psubs)) if len(r.qsubs) == 0 && len(r.psubs) == 0 { return } @@ -251,7 +249,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { if sub != nil { err := sub.client.WriterPacket(packet) if err != nil { - log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + brokerLog.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) } } } @@ -261,7 +259,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { t := "$queue/" + topic cnt, exist := b.queues[t] if exist { - // log.Info("queue index : ", cnt) + // brokerLog.Info("queue index : ", cnt) for _, sub := range r.qsubs { if sub.client.typ == ROUTER { if typ != CLIENT { @@ -277,7 +275,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { if sub != nil { err := sub.client.WriterPacket(packet) if err != nil { - log.Error("send publish error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + brokerLog.Error("send publish error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) } } @@ -331,7 +329,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { t := topic //check topic auth for client if !c.CheckTopicAuth(SUB, topic) { - log.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) + brokerLog.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) retcodes = append(retcodes, QosFailure) continue } @@ -378,7 +376,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { } err := b.sl.Insert(sub) if err != nil { - log.Error("Insert subscription error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + brokerLog.Error("Insert subscription error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) retcodes = append(retcodes, QosFailure) } else { retcodes = append(retcodes, qoss[i]) @@ -388,7 +386,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { err := c.WriterPacket(suback) if err != nil { - log.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + brokerLog.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) return } //broadcast subscribe message @@ -400,7 +398,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { for _, t := range topics { packets := b.rl.Match(t) for _, packet := range packets { - log.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID)) + brokerLog.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID)) if packet != nil { c.WriterPacket(packet) } @@ -447,7 +445,7 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) { err := c.WriterPacket(unsuback) if err != nil { - log.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + brokerLog.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) return } // //process ubsubscribe message @@ -476,7 +474,7 @@ func (c *client) ProcessPing() { resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket) err := c.WriterPacket(resp) if err != nil { - log.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + brokerLog.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) return } } @@ -507,7 +505,7 @@ func (c *client) Close() { for _, sub := range subs { err := b.sl.Remove(sub) if err != nil { - log.Error("closed client but remove sublist error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) + brokerLog.Error("closed client but remove sublist error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) } } if c.typ == CLIENT { diff --git a/broker/config.go b/broker/config.go index f4feb5e..eea8b84 100644 --- a/broker/config.go +++ b/broker/config.go @@ -10,7 +10,9 @@ import ( "flag" "fmt" "io/ioutil" + "os" + "github.com/fhmq/hmq/logger" "go.uber.org/zap" ) @@ -28,6 +30,7 @@ type Config struct { TlsInfo TLSInfo `json:"tlsInfo"` Acl bool `json:"acl"` AclConf string `json:"aclConf"` + Debug bool `json:"-"` } type RouteInfo struct { @@ -49,30 +52,60 @@ var DefaultConfig *Config = &Config{ Acl: false, } -func ConfigureConfig() (*Config, error) { +func showHelp() { + fmt.Printf("%s\n", usageStr) + os.Exit(0) +} + +func ConfigureConfig(args []string) (*Config, error) { config := &Config{} var ( + help bool configFile string ) - flag.IntVar(&config.Worker, "w", 1024, "worker num to process message, perfer (client num)/10.") - flag.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.") - flag.StringVar(&config.Port, "port", "1883", "Port to listen on.") - flag.StringVar(&config.Port, "p", "1883", "Port to listen on.") - flag.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on.") - flag.StringVar(&config.Host, "h", "0.0.0.0", "Network host to listen on.") - flag.StringVar(&config.Cluster.Host, "cluster", "", "Cluster ip from which members can connect.") - flag.StringVar(&config.Cluster.Host, "cluster_listen", "", "Cluster ip from which members can connect.") - flag.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.") - flag.StringVar(&config.Cluster.Port, "cluster_port", "", "Cluster port from which members can connect.") - flag.StringVar(&config.Router, "r", "", "Router who maintenance cluster info") - flag.StringVar(&config.Router, "router", "", "Router who maintenance cluster info") - flag.StringVar(&config.WsPort, "wsport", "", "port for ws to listen on") - flag.StringVar(&config.WsPort, "ws_port", "", "port for ws to listen on") - flag.StringVar(&config.WsPath, "wspath", "", "path for ws to listen on") - flag.StringVar(&config.WsPath, "ws_path", "", "path for ws to listen on") - flag.StringVar(&configFile, "config", "", "config file for hmq") - flag.StringVar(&configFile, "c", "", "config file for hmq") - flag.Parse() + fs := flag.NewFlagSet("hmq-broker", flag.ExitOnError) + fs.Usage = showHelp + + fs.BoolVar(&help, "h", false, "Show this message.") + fs.BoolVar(&help, "help", false, "Show this message.") + fs.IntVar(&config.Worker, "w", 1024, "worker num to process message, perfer (client num)/10.") + fs.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.") + fs.StringVar(&config.Port, "port", "1883", "Port to listen on.") + fs.StringVar(&config.Port, "p", "1883", "Port to listen on.") + fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on") + fs.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.") + fs.StringVar(&config.Cluster.Port, "clusterport", "", "Cluster port from which members can connect.") + fs.StringVar(&config.Router, "r", "", "Router who maintenance cluster info") + fs.StringVar(&config.Router, "router", "", "Router who maintenance cluster info") + fs.StringVar(&config.WsPort, "ws", "", "port for ws to listen on") + fs.StringVar(&config.WsPort, "wsport", "", "port for ws to listen on") + fs.StringVar(&config.WsPath, "wsp", "", "path for ws to listen on") + fs.StringVar(&config.WsPath, "wspath", "", "path for ws to listen on") + fs.StringVar(&configFile, "config", "", "config file for hmq") + fs.StringVar(&configFile, "c", "", "config file for hmq") + fs.BoolVar(&config.Debug, "debug", false, "enable Debug logging.") + fs.BoolVar(&config.Debug, "d", false, "enable Debug logging.") + + fs.Bool("D", true, "enable Debug logging.") + + if err := fs.Parse(args); err != nil { + return nil, err + } + + if help { + showHelp() + return nil, nil + } + + fs.Visit(func(f *flag.Flag) { + switch f.Name { + case "D": + config.Debug = true + } + }) + + logger.InitLogger(config.Debug) + brokerLog = logger.Get().Named("Broker") if configFile != "" { tmpConfig, e := LoadConfig(configFile) @@ -95,15 +128,15 @@ func LoadConfig(filename string) (*Config, error) { content, err := ioutil.ReadFile(filename) if err != nil { - log.Error("Read config file error: ", zap.Error(err)) + brokerLog.Error("Read config file error: ", zap.Error(err)) return nil, err } - // log.Info(string(content)) + // brokerLog.Info(string(content)) var config Config err = json.Unmarshal(content, &config) if err != nil { - log.Error("Unmarshal config file error: ", zap.Error(err)) + brokerLog.Error("Unmarshal config file error: ", zap.Error(err)) return nil, err } @@ -116,8 +149,6 @@ func (config *Config) check() error { config.Worker = 1024 } - WorkNum = config.Worker - if config.Port != "" { if config.Host == "" { config.Host = "0.0.0.0" @@ -137,7 +168,7 @@ func (config *Config) check() error { if config.TlsPort != "" { if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" { - log.Error("tls config error, no cert or key file.") + brokerLog.Error("tls config error, no cert or key file.") return errors.New("tls config error, no cert or key file.") } if config.TlsHost == "" { diff --git a/broker/dispatcher.go b/broker/dispatcher.go deleted file mode 100644 index 8fd11d6..0000000 --- a/broker/dispatcher.go +++ /dev/null @@ -1,44 +0,0 @@ -/* Copyright (c) 2018, joy.zhou - */ -package broker - -var WorkNum int - -type Dispatcher struct { - WorkerPool chan chan *Message -} - -func StartDispatcher() { - InitMessagePool() - dispatcher := NewDispatcher() - dispatcher.Run() -} - -func (d *Dispatcher) Run() { - // starting n number of workers - for i := 0; i < WorkNum; i++ { - worker := NewWorker(d.WorkerPool) - worker.Start() - } - go d.dispatch() -} - -func NewDispatcher() *Dispatcher { - pool := make(chan chan *Message, WorkNum) - return &Dispatcher{WorkerPool: pool} -} - -func (d *Dispatcher) dispatch() { - for i := 0; i < (MessagePoolNum + 3); i++ { - go func(idx int) { - for { - select { - case msg := <-MSGPool[idx].queue: - msgChannel := <-d.WorkerPool - msgChannel <- msg - } - } - }(i) - } - -} diff --git a/broker/info.go b/broker/info.go index 45c77a7..e167767 100644 --- a/broker/info.go +++ b/broker/info.go @@ -21,7 +21,7 @@ func (c *client) SendInfo() { infoMsg := NewInfo(c.broker.id, url, false) err := c.WriterPacket(infoMsg) if err != nil { - log.Error("send info message error, ", zap.Error(err)) + brokerLog.Error("send info message error, ", zap.Error(err)) return } } @@ -34,7 +34,7 @@ func (c *client) StartPing() { case <-timeTicker.C: err := c.WriterPacket(ping) if err != nil { - log.Error("ping error: ", zap.Error(err)) + brokerLog.Error("ping error: ", zap.Error(err)) c.Close() } case _, ok := <-c.closed: @@ -57,10 +57,10 @@ func (c *client) SendConnect() { m.Keepalive = uint16(60) err := c.WriterPacket(m) if err != nil { - log.Error("send connect message error, ", zap.Error(err)) + brokerLog.Error("send connect message error, ", zap.Error(err)) return } - log.Info("send connect success") + brokerLog.Info("send connect success") } func NewInfo(sid, url string, isforword bool) *packets.PublishPacket { @@ -69,7 +69,7 @@ func NewInfo(sid, url string, isforword bool) *packets.PublishPacket { pub.TopicName = BrokerInfoTopic pub.Retain = false info := fmt.Sprintf(`{"brokerID":"%s","brokerUrl":"%s"}`, sid, url) - // log.Info("new info", string(info)) + // brokerLog.Info("new info", string(info)) pub.Payload = []byte(info) return pub } @@ -81,17 +81,17 @@ func (c *client) ProcessInfo(packet *packets.PublishPacket) { return } - log.Info("recv remoteInfo: ", zap.String("payload", string(packet.Payload))) + brokerLog.Info("recv remoteInfo: ", zap.String("payload", string(packet.Payload))) js, err := simplejson.NewJson(packet.Payload) if err != nil { - log.Warn("parse info message err", zap.Error(err)) + brokerLog.Warn("parse info message err", zap.Error(err)) return } routes, err := js.Get("data").Map() if routes == nil { - log.Error("receive info message error, ", zap.Error(err)) + brokerLog.Error("receive info message error, ", zap.Error(err)) return } diff --git a/broker/msgpool.go b/broker/msgpool.go deleted file mode 100644 index 68c8361..0000000 --- a/broker/msgpool.go +++ /dev/null @@ -1,64 +0,0 @@ -/* Copyright (c) 2018, joy.zhou - */ -package broker - -import ( - "sync" - - "github.com/eclipse/paho.mqtt.golang/packets" -) - -const ( - MaxUser = 1024 * 1024 - MessagePoolNum = 1024 - MessagePoolUser = MaxUser / MessagePoolNum - MessagePoolMessageNum = MaxUser / MessagePoolNum * 4 -) - -type Message struct { - client *client - packet packets.ControlPacket -} - -var ( - MSGPool []MessagePool -) - -type MessagePool struct { - l sync.Mutex - maxuser int - user int - queue chan *Message -} - -func InitMessagePool() { - MSGPool = make([]MessagePool, (MessagePoolNum + 3)) - for i := 0; i < (MessagePoolNum + 3); i++ { - MSGPool[i].Init(MessagePoolUser, MessagePoolMessageNum) - } -} - -func (p *MessagePool) Init(num int, maxusernum int) { - p.maxuser = maxusernum - p.queue = make(chan *Message, num) -} - -func (p *MessagePool) GetPool() *MessagePool { - p.l.Lock() - if p.user+1 < p.maxuser { - p.user += 1 - p.l.Unlock() - return p - } else { - p.l.Unlock() - return nil - } - -} - -func (p *MessagePool) Reduce() { - p.l.Lock() - p.user -= 1 - p.l.Unlock() - -} diff --git a/broker/retain.go b/broker/retain.go index 8943198..a13f8a8 100644 --- a/broker/retain.go +++ b/broker/retain.go @@ -39,7 +39,7 @@ func (r *RetainList) Insert(topic string, buf *packets.PublishPacket) error { if err != nil { return err } - // log.Info("insert tokens:", tokens) + // brokerLog.Info("insert tokens:", tokens) r.Lock() l := r.root @@ -72,7 +72,7 @@ func (r *RetainList) Match(topic string) []*packets.PublishPacket { l := r.root matchRLevel(l, tokens, results) r.Unlock() - // log.Info("results: ", results) + // brokerLog.Info("results: ", results) return results.msg } @@ -82,7 +82,7 @@ func matchRLevel(l *rlevel, toks []string, results *RetainResult) { if l == nil { return } - // log.Info("l info :", l.nodes) + // brokerLog.Info("l info :", l.nodes) if t == "#" { for _, n := range l.nodes { n.GetAll(results) @@ -111,7 +111,7 @@ func matchRLevel(l *rlevel, toks []string, results *RetainResult) { } func (r *rnode) GetAll(results *RetainResult) { - // log.Info("node 's message: ", string(r.msg)) + // brokerLog.Info("node 's message: ", string(r.msg)) if r.msg != nil { results.msg = append(results.msg, r.msg) } diff --git a/broker/sublist.go b/broker/sublist.go index 3ee0ff7..19f9cce 100644 --- a/broker/sublist.go +++ b/broker/sublist.go @@ -211,7 +211,7 @@ func (s *Sublist) Match(topic string) *SublistResult { tokens, err := PublishTopicCheckAndSpilt(topic) if err != nil { - log.Error("\tserver/sublist.go: ", zap.Error(err)) + brokerLog.Error("\tserver/sublist.go: ", zap.Error(err)) return nil } diff --git a/broker/usage.go b/broker/usage.go new file mode 100644 index 0000000..b1c7755 --- /dev/null +++ b/broker/usage.go @@ -0,0 +1,24 @@ +package broker + +var usageStr = ` +Usage: hmq [options] + +Broker Options: + -w, --worker Worker num to process message, perfer (client num)/10. (default 1024) + -p, --port Use port for clients (default: 1883) + --host Network host to listen on. (default "0.0.0.0") + -ws, --wsport Use port for websocket monitoring + -wsp,--wspath Use path for websocket monitoring + -c, --config Configuration file + +Logging Options: + -d, --debug Enable debugging output (default false) + -D Debug and trace + +Cluster Options: + -r, --router Router who maintenance cluster info + -cp, --clusterport Cluster listen port for others + +Common Options: + -h, --help Show this message +` diff --git a/broker/worker.go b/broker/worker.go deleted file mode 100644 index eb41c94..0000000 --- a/broker/worker.go +++ /dev/null @@ -1,39 +0,0 @@ -/* Copyright (c) 2018, joy.zhou - */ -package broker - -type Worker struct { - WorkerPool chan chan *Message - MsgChannel chan *Message - quit chan bool -} - -func NewWorker(workerPool chan chan *Message) Worker { - return Worker{ - WorkerPool: workerPool, - MsgChannel: make(chan *Message), - quit: make(chan bool)} -} - -func (w Worker) Start() { - go func() { - for { - // register the current worker into the worker queue. - w.WorkerPool <- w.MsgChannel - select { - case msg := <-w.MsgChannel: - // we have received a work request. - ProcessMessage(msg) - case <-w.quit: - return - } - } - }() -} - -// Stop signals the worker to stop listening for work requests. -func (w Worker) Stop() { - go func() { - w.quit <- true - }() -} diff --git a/logger/logger.go b/logger/logger.go index 34ee617..d3c72e8 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -9,7 +9,6 @@ import ( var ( // env can be setup at build time with Go Linker. Value could be prod or whatever else for dev env - env string instance *zap.Logger logCfg zap.Config ) @@ -28,13 +27,13 @@ func NewProdLogger() (*zap.Logger, error) { return logCfg.Build() } -func init() { +func InitLogger(debug bool) { var err error var log *zap.Logger - if env == "prod" { - log, err = NewProdLogger() - } else { + if debug { log, err = NewDevLogger() + } else { + log, err = NewProdLogger() } if err != nil { panic("Unable to create a logger.") diff --git a/main.go b/main.go index 691505b..0062ff6 100644 --- a/main.go +++ b/main.go @@ -7,36 +7,31 @@ copyright notice and this permission notice appear in all copies. package main import ( + "fmt" "os" "os/signal" "runtime" "github.com/fhmq/hmq/broker" - "github.com/fhmq/hmq/logger" - "go.uber.org/zap" -) - -var ( - log = logger.Get().Named("Main") ) func main() { runtime.GOMAXPROCS(runtime.NumCPU()) - config, err := broker.ConfigureConfig() + config, err := broker.ConfigureConfig(os.Args[1:]) if err != nil { - log.Error("configure broker config error: ", zap.Error(err)) + fmt.Println("configure broker config error: ", err) return } b, err := broker.NewBroker(config) if err != nil { - log.Error("New Broker error: ", zap.Error(err)) + fmt.Println("New Broker error: ", err) return } b.Start() s := waitForSignal() - log.Info("signal received, broker closed.", zap.Any("signal", s)) + fmt.Println("signal received, broker closed.", s) } func waitForSignal() os.Signal { diff --git a/pool/pool.go b/pool/pool.go new file mode 100644 index 0000000..2ba7e46 --- /dev/null +++ b/pool/pool.go @@ -0,0 +1,166 @@ +package pool + +import "time" + +const ( + // This value is the size of the queue that workers register their + // availability to the dispatcher. There may be hundreds of workers, but + // only a small channel is needed to register some of the workers. + readyQueueSize = 16 + + // If worker pool receives no new work for this period of time, then stop + // a worker goroutine. + idleTimeoutSec = 5 +) + +type WorkerPool struct { + maxWorkers int + timeout time.Duration + taskQueue chan func() + readyWorkers chan chan func() + stoppedChan chan struct{} +} + +func New(maxWorkers int) *WorkerPool { + // There must be at least one worker. + if maxWorkers < 1 { + maxWorkers = 1 + } + + // taskQueue is unbuffered since items are always removed immediately. + pool := &WorkerPool{ + taskQueue: make(chan func()), + maxWorkers: maxWorkers, + readyWorkers: make(chan chan func(), readyQueueSize), + timeout: time.Second * idleTimeoutSec, + stoppedChan: make(chan struct{}), + } + + // Start the task dispatcher. + go pool.dispatch() + + return pool +} + +func (p *WorkerPool) Stop() { + if p.Stopped() { + return + } + close(p.taskQueue) + <-p.stoppedChan +} + +func (p *WorkerPool) Stopped() bool { + select { + case <-p.stoppedChan: + return true + default: + } + return false +} + +func (p *WorkerPool) Submit(task func()) { + if task != nil { + p.taskQueue <- task + } +} + +func (p *WorkerPool) SubmitWait(task func()) { + if task == nil { + return + } + doneChan := make(chan struct{}) + p.taskQueue <- func() { + task() + close(doneChan) + } + <-doneChan +} + +func (p *WorkerPool) dispatch() { + defer close(p.stoppedChan) + timeout := time.NewTimer(p.timeout) + var workerCount int + var task func() + var ok bool + var workerTaskChan chan func() + startReady := make(chan chan func()) +Loop: + for { + timeout.Reset(p.timeout) + select { + case task, ok = <-p.taskQueue: + if !ok { + break Loop + } + // Got a task to do. + select { + case workerTaskChan = <-p.readyWorkers: + // A worker is ready, so give task to worker. + workerTaskChan <- task + default: + // No workers ready. + // Create a new worker, if not at max. + if workerCount < p.maxWorkers { + workerCount++ + go func(t func()) { + startWorker(startReady, p.readyWorkers) + // Submit the task when the new worker. + taskChan := <-startReady + taskChan <- t + }(task) + } else { + // Start a goroutine to submit the task when an existing + // worker is ready. + go func(t func()) { + taskChan := <-p.readyWorkers + taskChan <- t + }(task) + } + } + case <-timeout.C: + // Timed out waiting for work to arrive. Kill a ready worker. + if workerCount > 0 { + select { + case workerTaskChan = <-p.readyWorkers: + // A worker is ready, so kill. + close(workerTaskChan) + workerCount-- + default: + // No work, but no ready workers. All workers are busy. + } + } + } + } + + // Stop all remaining workers as they become ready. + for workerCount > 0 { + workerTaskChan = <-p.readyWorkers + close(workerTaskChan) + workerCount-- + } +} + +func startWorker(startReady, readyWorkers chan chan func()) { + go func() { + taskChan := make(chan func()) + var task func() + var ok bool + // Register availability on starReady channel. + startReady <- taskChan + for { + // Read task from dispatcher. + task, ok = <-taskChan + if !ok { + // Dispatcher has told worker to stop. + break + } + + // Execute the task. + task() + + // Register availability on readyWorkers channel. + readyWorkers <- taskChan + } + }() +}