2 Commits

Author SHA1 Message Date
Marc Magnin
d52d8dda07 #17 Enable use of MQTT broker as a library -> passing a logger reference to the broker instance 2018-02-08 11:13:50 +01:00
Marc Magnin
148dbbb23c #13 introduced sync.Pool 2018-02-05 15:14:46 +01:00
12 changed files with 188 additions and 170 deletions

View File

@@ -44,10 +44,10 @@ func (b *Broker) handleFsEvent(event fsnotify.Event) error {
case b.config.AclConf: case b.config.AclConf:
if event.Op&fsnotify.Write == fsnotify.Write || if event.Op&fsnotify.Write == fsnotify.Write ||
event.Op&fsnotify.Create == fsnotify.Create { event.Op&fsnotify.Create == fsnotify.Create {
brokerLog.Info("text:handling acl config change event:", zap.String("filename", event.Name)) log.Info("text:handling acl config change event:", zap.String("filename", event.Name))
aclconfig, err := acl.AclConfigLoad(event.Name) aclconfig, err := acl.AclConfigLoad(event.Name)
if err != nil { if err != nil {
brokerLog.Error("aclconfig change failed, load acl conf error: ", zap.Error(err)) log.Error("aclconfig change failed, load acl conf error: ", zap.Error(err))
return err return err
} }
b.AclConfig = aclconfig b.AclConfig = aclconfig
@@ -60,24 +60,24 @@ func (b *Broker) StartAclWatcher() {
go func() { go func() {
wch, e := fsnotify.NewWatcher() wch, e := fsnotify.NewWatcher()
if e != nil { if e != nil {
brokerLog.Error("start monitor acl config file error,", zap.Error(e)) log.Error("start monitor acl config file error,", zap.Error(e))
return return
} }
defer wch.Close() defer wch.Close()
for _, i := range watchList { for _, i := range watchList {
if err := wch.Add(i); err != nil { if err := wch.Add(i); err != nil {
brokerLog.Error("start monitor acl config file error,", zap.Error(err)) log.Error("start monitor acl config file error,", zap.Error(err))
return return
} }
} }
brokerLog.Info("watching acl config file change...") log.Info("watching acl config file change...")
for { for {
select { select {
case evt := <-wch.Events: case evt := <-wch.Events:
b.handleFsEvent(evt) b.handleFsEvent(evt)
case err := <-wch.Errors: case err := <-wch.Errors:
brokerLog.Error("error:", zap.Error(err)) log.Error("error:", zap.Error(err))
} }
} }
}() }()

View File

@@ -12,7 +12,6 @@ import (
"time" "time"
"github.com/fhmq/hmq/lib/acl" "github.com/fhmq/hmq/lib/acl"
"github.com/fhmq/hmq/pool"
"github.com/eclipse/paho.mqtt.golang/packets" "github.com/eclipse/paho.mqtt.golang/packets"
"github.com/shirou/gopsutil/mem" "github.com/shirou/gopsutil/mem"
@@ -22,7 +21,8 @@ import (
) )
var ( var (
brokerLog *zap.Logger log *zap.Logger
messagePoolQueueSize = 4096
) )
type Message struct { type Message struct {
@@ -31,40 +31,37 @@ type Message struct {
} }
type Broker struct { type Broker struct {
id string id string
cid uint64 cid uint64
mu sync.Mutex mu sync.Mutex
config *Config config *Config
tlsConfig *tls.Config tlsConfig *tls.Config
AclConfig *acl.ACLConfig AclConfig *acl.ACLConfig
wpool *pool.WorkerPool dispatcher *Dispatcher
clients sync.Map clients sync.Map
routes sync.Map routes sync.Map
remotes sync.Map remotes sync.Map
nodes map[string]interface{} nodes map[string]interface{}
clusterPool chan *Message sl *Sublist
messagePool chan *Message rl *RetainList
sl *Sublist queues map[string]int
rl *RetainList
queues map[string]int
} }
func NewBroker(config *Config) (*Broker, error) { func NewBroker(config *Config, logger *zap.Logger) (*Broker, error) {
log = logger
b := &Broker{ b := &Broker{
id: GenUniqueId(), id: GenUniqueId(),
config: config, config: config,
wpool: pool.New(config.Worker), dispatcher: NewDispatcher(),
sl: NewSublist(), sl: NewSublist(),
rl: NewRetainList(), rl: NewRetainList(),
nodes: make(map[string]interface{}), nodes: make(map[string]interface{}),
queues: make(map[string]int), queues: make(map[string]int),
clusterPool: make(chan *Message),
messagePool: make(chan *Message),
} }
if b.config.TlsPort != "" { if b.config.TlsPort != "" {
tlsconfig, err := NewTLSConfig(b.config.TlsInfo) tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
if err != nil { if err != nil {
brokerLog.Error("new tlsConfig error", zap.Error(err)) log.Error("new tlsConfig error", zap.Error(err))
return nil, err return nil, err
} }
b.tlsConfig = tlsconfig b.tlsConfig = tlsconfig
@@ -72,7 +69,7 @@ func NewBroker(config *Config) (*Broker, error) {
if b.config.Acl { if b.config.Acl {
aclconfig, err := acl.AclConfigLoad(b.config.AclConf) aclconfig, err := acl.AclConfigLoad(b.config.AclConf)
if err != nil { if err != nil {
brokerLog.Error("Load acl conf error", zap.Error(err)) log.Error("Load acl conf error", zap.Error(err))
return nil, err return nil, err
} }
b.AclConfig = aclconfig b.AclConfig = aclconfig
@@ -81,26 +78,16 @@ func NewBroker(config *Config) (*Broker, error) {
return b, nil return b, nil
} }
func (b *Broker) StartDispatcher() { func (b *Broker) DispatchMessage(msg *Message) {
for { b.dispatcher.Dispatch(msg)
msg, ok := <-b.messagePool
if !ok {
brokerLog.Error("read message from client channel error")
return
}
b.wpool.Submit(func() {
ProcessMessage(msg)
})
}
} }
func (b *Broker) Start() { func (b *Broker) Start() {
if b == nil { if b == nil {
brokerLog.Error("broker is null") log.Error("broker is null")
return return
} }
go b.StartDispatcher()
//listen clinet over tcp //listen clinet over tcp
if b.config.Port != "" { if b.config.Port != "" {
@@ -124,7 +111,6 @@ func (b *Broker) Start() {
//connect on other node in cluster //connect on other node in cluster
if b.config.Router != "" { if b.config.Router != "" {
go b.processClusterInfo()
b.ConnectToDiscovery() b.ConnectToDiscovery()
} }
@@ -149,7 +135,7 @@ func StateMonitor() {
func (b *Broker) StartWebsocketListening() { func (b *Broker) StartWebsocketListening() {
path := b.config.WsPath path := b.config.WsPath
hp := ":" + b.config.WsPort hp := ":" + b.config.WsPort
brokerLog.Info("Start Websocket Listener on:", zap.String("hp", hp), zap.String("path", path)) log.Info("Start Websocket Listener on:", zap.String("hp", hp), zap.String("path", path))
http.Handle(path, websocket.Handler(b.wsHandler)) http.Handle(path, websocket.Handler(b.wsHandler))
var err error var err error
if b.config.WsTLS { if b.config.WsTLS {
@@ -158,7 +144,7 @@ func (b *Broker) StartWebsocketListening() {
err = http.ListenAndServe(hp, nil) err = http.ListenAndServe(hp, nil)
} }
if err != nil { if err != nil {
brokerLog.Error("ListenAndServe:" + err.Error()) log.Error("ListenAndServe:" + err.Error())
return return
} }
} }
@@ -177,14 +163,14 @@ func (b *Broker) StartClientListening(Tls bool) {
if Tls { if Tls {
hp = b.config.TlsHost + ":" + b.config.TlsPort hp = b.config.TlsHost + ":" + b.config.TlsPort
l, err = tls.Listen("tcp", hp, b.tlsConfig) l, err = tls.Listen("tcp", hp, b.tlsConfig)
brokerLog.Info("Start TLS Listening client on ", zap.String("hp", hp)) log.Info("Start TLS Listening client on ", zap.String("hp", hp))
} else { } else {
hp := b.config.Host + ":" + b.config.Port hp := b.config.Host + ":" + b.config.Port
l, err = net.Listen("tcp", hp) l, err = net.Listen("tcp", hp)
brokerLog.Info("Start Listening client on ", zap.String("hp", hp)) log.Info("Start Listening client on ", zap.String("hp", hp))
} }
if err != nil { if err != nil {
brokerLog.Error("Error listening on ", zap.Error(err)) log.Error("Error listening on ", zap.Error(err))
return return
} }
tmpDelay := 10 * ACCEPT_MIN_SLEEP tmpDelay := 10 * ACCEPT_MIN_SLEEP
@@ -192,7 +178,7 @@ func (b *Broker) StartClientListening(Tls bool) {
conn, err := l.Accept() conn, err := l.Accept()
if err != nil { if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() { if ne, ok := err.(net.Error); ok && ne.Temporary() {
brokerLog.Error("Temporary Client Accept Error(%v), sleeping %dms", log.Error("Temporary Client Accept Error(%v), sleeping %dms",
zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond)) zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond))
time.Sleep(tmpDelay) time.Sleep(tmpDelay)
tmpDelay *= 2 tmpDelay *= 2
@@ -200,7 +186,7 @@ func (b *Broker) StartClientListening(Tls bool) {
tmpDelay = ACCEPT_MAX_SLEEP tmpDelay = ACCEPT_MAX_SLEEP
} }
} else { } else {
brokerLog.Error("Accept error: %v", zap.Error(err)) log.Error("Accept error: %v", zap.Error(err))
} }
continue continue
} }
@@ -219,7 +205,7 @@ func (b *Broker) Handshake(conn net.Conn) bool {
// Force handshake // Force handshake
if err := nc.Handshake(); err != nil { if err := nc.Handshake(); err != nil {
brokerLog.Error("TLS handshake error, ", zap.Error(err)) log.Error("TLS handshake error, ", zap.Error(err))
return false return false
} }
nc.SetReadDeadline(time.Time{}) nc.SetReadDeadline(time.Time{})
@@ -235,18 +221,18 @@ func TlsTimeout(conn *tls.Conn) {
} }
cs := nc.ConnectionState() cs := nc.ConnectionState()
if !cs.HandshakeComplete { if !cs.HandshakeComplete {
brokerLog.Error("TLS handshake timeout") log.Error("TLS handshake timeout")
nc.Close() nc.Close()
} }
} }
func (b *Broker) StartClusterListening() { func (b *Broker) StartClusterListening() {
var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port
brokerLog.Info("Start Listening cluster on ", zap.String("hp", hp)) log.Info("Start Listening cluster on ", zap.String("hp", hp))
l, e := net.Listen("tcp", hp) l, e := net.Listen("tcp", hp)
if e != nil { if e != nil {
brokerLog.Error("Error listening on ", zap.Error(e)) log.Error("Error listening on ", zap.Error(e))
return return
} }
@@ -256,7 +242,7 @@ func (b *Broker) StartClusterListening() {
conn, err := l.Accept() conn, err := l.Accept()
if err != nil { if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() { if ne, ok := err.(net.Error); ok && ne.Temporary() {
brokerLog.Error("Temporary Client Accept Error(%v), sleeping %dms", log.Error("Temporary Client Accept Error(%v), sleeping %dms",
zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond)) zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond))
time.Sleep(tmpDelay) time.Sleep(tmpDelay)
tmpDelay *= 2 tmpDelay *= 2
@@ -264,7 +250,7 @@ func (b *Broker) StartClusterListening() {
tmpDelay = ACCEPT_MAX_SLEEP tmpDelay = ACCEPT_MAX_SLEEP
} }
} else { } else {
brokerLog.Error("Accept error: %v", zap.Error(err)) log.Error("Accept error: %v", zap.Error(err))
} }
continue continue
} }
@@ -278,16 +264,16 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
//process connect packet //process connect packet
packet, err := packets.ReadPacket(conn) packet, err := packets.ReadPacket(conn)
if err != nil { if err != nil {
brokerLog.Error("read connect packet error: ", zap.Error(err)) log.Error("read connect packet error: ", zap.Error(err))
return return
} }
if packet == nil { if packet == nil {
brokerLog.Error("received nil packet") log.Error("received nil packet")
return return
} }
msg, ok := packet.(*packets.ConnectPacket) msg, ok := packet.(*packets.ConnectPacket)
if !ok { if !ok {
brokerLog.Error("received msg that was not Connect") log.Error("received msg that was not Connect")
return return
} }
connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket) connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket)
@@ -295,7 +281,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
connack.SessionPresent = msg.CleanSession connack.SessionPresent = msg.CleanSession
err = connack.Write(conn) err = connack.Write(conn)
if err != nil { if err != nil {
brokerLog.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier)) log.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier))
return return
} }
@@ -335,7 +321,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
case CLIENT: case CLIENT:
old, exist = b.clients.Load(cid) old, exist = b.clients.Load(cid)
if exist { if exist {
brokerLog.Warn("client exist, close old...", zap.String("clientID", c.info.clientID)) log.Warn("client exist, close old...", zap.String("clientID", c.info.clientID))
ol, ok := old.(*client) ol, ok := old.(*client)
if ok { if ok {
ol.Close() ol.Close()
@@ -345,7 +331,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
case ROUTER: case ROUTER:
old, exist = b.routes.Load(cid) old, exist = b.routes.Load(cid)
if exist { if exist {
brokerLog.Warn("router exist, close old...") log.Warn("router exist, close old...")
ol, ok := old.(*client) ol, ok := old.(*client)
if ok { if ok {
ol.Close() ol.Close()
@@ -354,7 +340,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
b.routes.Store(cid, c) b.routes.Store(cid, c)
} }
c.readLoop(b.messagePool) c.readLoop()
} }
func (b *Broker) ConnectToDiscovery() { func (b *Broker) ConnectToDiscovery() {
@@ -364,8 +350,8 @@ func (b *Broker) ConnectToDiscovery() {
for { for {
conn, err = net.Dial("tcp", b.config.Router) conn, err = net.Dial("tcp", b.config.Router)
if err != nil { if err != nil {
brokerLog.Error("Error trying to connect to route: ", zap.Error(err)) log.Error("Error trying to connect to route: ", zap.Error(err))
brokerLog.Debug("Connect to route timeout ,retry...") log.Debug("Connect to route timeout ,retry...")
if 0 == tempDelay { if 0 == tempDelay {
tempDelay = 1 * time.Second tempDelay = 1 * time.Second
@@ -381,7 +367,7 @@ func (b *Broker) ConnectToDiscovery() {
} }
break break
} }
brokerLog.Debug("connect to router success :", zap.String("Router", b.config.Router)) log.Debug("connect to router success :", zap.String("Router", b.config.Router))
cid := b.id cid := b.id
info := info{ info := info{
@@ -401,22 +387,10 @@ func (b *Broker) ConnectToDiscovery() {
c.SendConnect() c.SendConnect()
c.SendInfo() c.SendInfo()
go c.readLoop(b.clusterPool) go c.readLoop()
go c.StartPing() go c.StartPing()
} }
func (b *Broker) processClusterInfo() {
for {
msg, ok := <-b.clusterPool
if !ok {
brokerLog.Error("read message from cluster channel error")
return
}
ProcessMessage(msg)
}
}
func (b *Broker) connectRouter(id, addr string) { func (b *Broker) connectRouter(id, addr string) {
var conn net.Conn var conn net.Conn
var err error var err error
@@ -431,13 +405,13 @@ func (b *Broker) connectRouter(id, addr string) {
conn, err = net.Dial("tcp", addr) conn, err = net.Dial("tcp", addr)
if err != nil { if err != nil {
brokerLog.Error("Error trying to connect to route: ", zap.Error(err)) log.Error("Error trying to connect to route: ", zap.Error(err))
if retryTimes > 50 { if retryTimes > 50 {
return return
} }
brokerLog.Debug("Connect to route timeout ,retry...") log.Debug("Connect to route timeout ,retry...")
if 0 == timeDelay { if 0 == timeDelay {
timeDelay = 1 * time.Second timeDelay = 1 * time.Second
@@ -477,7 +451,7 @@ func (b *Broker) connectRouter(id, addr string) {
c.SendConnect() c.SendConnect()
go c.readLoop(b.messagePool) go c.readLoop()
go c.StartPing() go c.StartPing()
} }
@@ -536,7 +510,7 @@ func (b *Broker) SendLocalSubsToRouter(c *client) {
if len(subInfo.Topics) > 0 { if len(subInfo.Topics) > 0 {
err := c.WriterPacket(subInfo) err := c.WriterPacket(subInfo)
if err != nil { if err != nil {
brokerLog.Error("Send localsubs To Router error :", zap.Error(err)) log.Error("Send localsubs To Router error :", zap.Error(err))
} }
} }
} }
@@ -553,7 +527,7 @@ func (b *Broker) BroadcastInfoMessage(remoteID string, msg *packets.PublishPacke
return true return true
}) })
// brokerLog.Info("BroadcastInfoMessage success ") // log.Info("BroadcastInfoMessage success ")
} }
func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) { func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) {
@@ -565,7 +539,7 @@ func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) {
} }
return true return true
}) })
// brokerLog.Info("BroadcastSubscribeMessage remotes: ", s.remotes) // log.Info("BroadcastSubscribeMessage remotes: ", s.remotes)
} }
func (b *Broker) removeClient(c *client) { func (b *Broker) removeClient(c *client) {
@@ -579,7 +553,7 @@ func (b *Broker) removeClient(c *client) {
case REMOTE: case REMOTE:
b.remotes.Delete(clientId) b.remotes.Delete(clientId)
} }
// brokerLog.Info("delete client ,", clientId) // log.Info("delete client ,", clientId)
} }
func (b *Broker) PublishMessage(packet *packets.PublishPacket) { func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
@@ -593,7 +567,7 @@ func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
if sub != nil { if sub != nil {
err := sub.client.WriterPacket(packet) err := sub.client.WriterPacket(packet)
if err != nil { if err != nil {
brokerLog.Error("process message for psub error, ", zap.Error(err)) log.Error("process message for psub error, ", zap.Error(err))
} }
} }
} }

View File

@@ -86,7 +86,7 @@ func (c *client) init() {
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0] c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
} }
func (c *client) keepAlive(ch chan int, mpool chan *Message) { func (c *client) keepAlive(ch chan int) {
defer close(ch) defer close(ch)
keepalive := time.Duration(c.info.keepalive*3/2) * time.Second keepalive := time.Duration(c.info.keepalive*3/2) * time.Second
timer := time.NewTimer(keepalive) timer := time.NewTimer(keepalive)
@@ -100,9 +100,8 @@ func (c *client) keepAlive(ch chan int, mpool chan *Message) {
timer.Reset(keepalive) timer.Reset(keepalive)
continue continue
} }
brokerLog.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive)) log.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive))
msg := &Message{client: c, packet: DisconnectdPacket} c.broker.DispatchMessage(&Message{client: c, packet: DisconnectdPacket})
mpool <- msg
timer.Stop() timer.Stop()
return return
case _, ok := <-c.closed: case _, ok := <-c.closed:
@@ -113,33 +112,31 @@ func (c *client) keepAlive(ch chan int, mpool chan *Message) {
} }
} }
func (c *client) readLoop(mpool chan *Message) { func (c *client) readLoop() {
nc := c.conn nc := c.conn
if nc == nil || mpool == nil { if nc == nil {
return return
} }
ch := make(chan int, 1000) ch := make(chan int, 1000)
go c.keepAlive(ch, mpool) go c.keepAlive(ch)
for { for {
packet, err := packets.ReadPacket(nc) packet, err := packets.ReadPacket(nc)
if err != nil { if err != nil {
brokerLog.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
break break
} }
// keepalive channel // keepalive channel
ch <- 1 ch <- 1
msg := &Message{ c.broker.DispatchMessage(&Message{
client: c, client: c,
packet: packet, packet: packet,
} })
mpool <- msg
} }
msg := &Message{client: c, packet: DisconnectdPacket} c.broker.DispatchMessage(&Message{client: c, packet: DisconnectdPacket})
mpool <- msg
} }
func ProcessMessage(msg *Message) { func ProcessMessage(msg *Message) {
@@ -149,7 +146,7 @@ func ProcessMessage(msg *Message) {
return return
} }
brokerLog.Debug("Recv message:", zap.String("message type", reflect.TypeOf(msg.packet).String()[9:]), zap.String("ClientID", c.info.clientID)) log.Debug("Recv message:", zap.String("message type", reflect.TypeOf(msg.packet).String()[9:]), zap.String("ClientID", c.info.clientID))
switch ca.(type) { switch ca.(type) {
case *packets.ConnackPacket: case *packets.ConnackPacket:
case *packets.ConnectPacket: case *packets.ConnectPacket:
@@ -174,7 +171,7 @@ func ProcessMessage(msg *Message) {
case *packets.DisconnectPacket: case *packets.DisconnectPacket:
c.Close() c.Close()
default: default:
brokerLog.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID)) log.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID))
} }
} }
@@ -190,7 +187,7 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
} }
if !c.CheckTopicAuth(PUB, topic) { if !c.CheckTopicAuth(PUB, topic) {
brokerLog.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) log.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
return return
} }
@@ -201,21 +198,21 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket) puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
puback.MessageID = packet.MessageID puback.MessageID = packet.MessageID
if err := c.WriterPacket(puback); err != nil { if err := c.WriterPacket(puback); err != nil {
brokerLog.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return return
} }
c.ProcessPublishMessage(packet) c.ProcessPublishMessage(packet)
case QosExactlyOnce: case QosExactlyOnce:
return return
default: default:
brokerLog.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID)) log.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID))
return return
} }
if packet.Retain { if packet.Retain {
if b := c.broker; b != nil { if b := c.broker; b != nil {
err := b.rl.Insert(topic, packet) err := b.rl.Insert(topic, packet)
if err != nil { if err != nil {
brokerLog.Error("Insert Retain Message error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("Insert Retain Message error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
} }
} }
} }
@@ -235,7 +232,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
topic := packet.TopicName topic := packet.TopicName
r := b.sl.Match(topic) r := b.sl.Match(topic)
// brokerLog.Info("psubs num: ", len(r.psubs)) // log.Info("psubs num: ", len(r.psubs))
if len(r.qsubs) == 0 && len(r.psubs) == 0 { if len(r.qsubs) == 0 && len(r.psubs) == 0 {
return return
} }
@@ -249,7 +246,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if sub != nil { if sub != nil {
err := sub.client.WriterPacket(packet) err := sub.client.WriterPacket(packet)
if err != nil { if err != nil {
brokerLog.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
} }
} }
} }
@@ -259,7 +256,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
t := "$queue/" + topic t := "$queue/" + topic
cnt, exist := b.queues[t] cnt, exist := b.queues[t]
if exist { if exist {
// brokerLog.Info("queue index : ", cnt) // log.Info("queue index : ", cnt)
for _, sub := range r.qsubs { for _, sub := range r.qsubs {
if sub.client.typ == ROUTER { if sub.client.typ == ROUTER {
if typ != CLIENT { if typ != CLIENT {
@@ -275,7 +272,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if sub != nil { if sub != nil {
err := sub.client.WriterPacket(packet) err := sub.client.WriterPacket(packet)
if err != nil { if err != nil {
brokerLog.Error("send publish error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("send publish error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
} }
} }
@@ -329,7 +326,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
t := topic t := topic
//check topic auth for client //check topic auth for client
if !c.CheckTopicAuth(SUB, topic) { if !c.CheckTopicAuth(SUB, topic) {
brokerLog.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) log.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
retcodes = append(retcodes, QosFailure) retcodes = append(retcodes, QosFailure)
continue continue
} }
@@ -376,7 +373,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
} }
err := b.sl.Insert(sub) err := b.sl.Insert(sub)
if err != nil { if err != nil {
brokerLog.Error("Insert subscription error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("Insert subscription error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
retcodes = append(retcodes, QosFailure) retcodes = append(retcodes, QosFailure)
} else { } else {
retcodes = append(retcodes, qoss[i]) retcodes = append(retcodes, qoss[i])
@@ -386,7 +383,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
err := c.WriterPacket(suback) err := c.WriterPacket(suback)
if err != nil { if err != nil {
brokerLog.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return return
} }
//broadcast subscribe message //broadcast subscribe message
@@ -398,7 +395,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
for _, t := range topics { for _, t := range topics {
packets := b.rl.Match(t) packets := b.rl.Match(t)
for _, packet := range packets { for _, packet := range packets {
brokerLog.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID)) log.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID))
if packet != nil { if packet != nil {
c.WriterPacket(packet) c.WriterPacket(packet)
} }
@@ -445,7 +442,7 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
err := c.WriterPacket(unsuback) err := c.WriterPacket(unsuback)
if err != nil { if err != nil {
brokerLog.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return return
} }
// //process ubsubscribe message // //process ubsubscribe message
@@ -474,7 +471,7 @@ func (c *client) ProcessPing() {
resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket) resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket)
err := c.WriterPacket(resp) err := c.WriterPacket(resp)
if err != nil { if err != nil {
brokerLog.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return return
} }
} }
@@ -505,7 +502,7 @@ func (c *client) Close() {
for _, sub := range subs { for _, sub := range subs {
err := b.sl.Remove(sub) err := b.sl.Remove(sub)
if err != nil { if err != nil {
brokerLog.Error("closed client but remove sublist error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("closed client but remove sublist error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
} }
} }
if c.typ == CLIENT { if c.typ == CLIENT {

View File

@@ -105,7 +105,7 @@ func ConfigureConfig(args []string) (*Config, error) {
}) })
logger.InitLogger(config.Debug) logger.InitLogger(config.Debug)
brokerLog = logger.Get().Named("Broker") log = logger.Get().Named("Broker")
if configFile != "" { if configFile != "" {
tmpConfig, e := LoadConfig(configFile) tmpConfig, e := LoadConfig(configFile)
@@ -128,15 +128,15 @@ func LoadConfig(filename string) (*Config, error) {
content, err := ioutil.ReadFile(filename) content, err := ioutil.ReadFile(filename)
if err != nil { if err != nil {
brokerLog.Error("Read config file error: ", zap.Error(err)) log.Error("Read config file error: ", zap.Error(err))
return nil, err return nil, err
} }
// brokerLog.Info(string(content)) // log.Info(string(content))
var config Config var config Config
err = json.Unmarshal(content, &config) err = json.Unmarshal(content, &config)
if err != nil { if err != nil {
brokerLog.Error("Unmarshal config file error: ", zap.Error(err)) log.Error("Unmarshal config file error: ", zap.Error(err))
return nil, err return nil, err
} }
@@ -168,7 +168,7 @@ func (config *Config) check() error {
if config.TlsPort != "" { if config.TlsPort != "" {
if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" { if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" {
brokerLog.Error("tls config error, no cert or key file.") log.Error("tls config error, no cert or key file.")
return errors.New("tls config error, no cert or key file.") return errors.New("tls config error, no cert or key file.")
} }
if config.TlsHost == "" { if config.TlsHost == "" {

25
broker/dispatcher.go Normal file
View File

@@ -0,0 +1,25 @@
package broker
import (
"sync"
)
// Dispatcher will delegate ProcessMessage func to multiple goroutines
type Dispatcher struct {
workerPool *sync.Pool
}
// NewDispatcher create a *Dispatcher instance
func NewDispatcher() *Dispatcher {
return &Dispatcher{workerPool: &sync.Pool{
New: func() interface{} {
return NewWorker()
},
},
}
}
// Dispatch a message to the workers
func (d *Dispatcher) Dispatch(message *Message) {
d.workerPool.Get().(Worker).WorkerChannel <- Work{WorkerPool: d.workerPool, Message: message}
}

View File

@@ -21,7 +21,7 @@ func (c *client) SendInfo() {
infoMsg := NewInfo(c.broker.id, url, false) infoMsg := NewInfo(c.broker.id, url, false)
err := c.WriterPacket(infoMsg) err := c.WriterPacket(infoMsg)
if err != nil { if err != nil {
brokerLog.Error("send info message error, ", zap.Error(err)) log.Error("send info message error, ", zap.Error(err))
return return
} }
} }
@@ -34,7 +34,7 @@ func (c *client) StartPing() {
case <-timeTicker.C: case <-timeTicker.C:
err := c.WriterPacket(ping) err := c.WriterPacket(ping)
if err != nil { if err != nil {
brokerLog.Error("ping error: ", zap.Error(err)) log.Error("ping error: ", zap.Error(err))
c.Close() c.Close()
} }
case _, ok := <-c.closed: case _, ok := <-c.closed:
@@ -57,10 +57,10 @@ func (c *client) SendConnect() {
m.Keepalive = uint16(60) m.Keepalive = uint16(60)
err := c.WriterPacket(m) err := c.WriterPacket(m)
if err != nil { if err != nil {
brokerLog.Error("send connect message error, ", zap.Error(err)) log.Error("send connect message error, ", zap.Error(err))
return return
} }
brokerLog.Info("send connect success") log.Info("send connect success")
} }
func NewInfo(sid, url string, isforword bool) *packets.PublishPacket { func NewInfo(sid, url string, isforword bool) *packets.PublishPacket {
@@ -69,7 +69,7 @@ func NewInfo(sid, url string, isforword bool) *packets.PublishPacket {
pub.TopicName = BrokerInfoTopic pub.TopicName = BrokerInfoTopic
pub.Retain = false pub.Retain = false
info := fmt.Sprintf(`{"brokerID":"%s","brokerUrl":"%s"}`, sid, url) info := fmt.Sprintf(`{"brokerID":"%s","brokerUrl":"%s"}`, sid, url)
// brokerLog.Info("new info", string(info)) // log.Info("new info", string(info))
pub.Payload = []byte(info) pub.Payload = []byte(info)
return pub return pub
} }
@@ -81,17 +81,17 @@ func (c *client) ProcessInfo(packet *packets.PublishPacket) {
return return
} }
brokerLog.Info("recv remoteInfo: ", zap.String("payload", string(packet.Payload))) log.Info("recv remoteInfo: ", zap.String("payload", string(packet.Payload)))
js, err := simplejson.NewJson(packet.Payload) js, err := simplejson.NewJson(packet.Payload)
if err != nil { if err != nil {
brokerLog.Warn("parse info message err", zap.Error(err)) log.Warn("parse info message err", zap.Error(err))
return return
} }
routes, err := js.Get("data").Map() routes, err := js.Get("data").Map()
if routes == nil { if routes == nil {
brokerLog.Error("receive info message error, ", zap.Error(err)) log.Error("receive info message error, ", zap.Error(err))
return return
} }

View File

@@ -39,7 +39,7 @@ func (r *RetainList) Insert(topic string, buf *packets.PublishPacket) error {
if err != nil { if err != nil {
return err return err
} }
// brokerLog.Info("insert tokens:", tokens) // log.Info("insert tokens:", tokens)
r.Lock() r.Lock()
l := r.root l := r.root
@@ -72,7 +72,7 @@ func (r *RetainList) Match(topic string) []*packets.PublishPacket {
l := r.root l := r.root
matchRLevel(l, tokens, results) matchRLevel(l, tokens, results)
r.Unlock() r.Unlock()
// brokerLog.Info("results: ", results) // log.Info("results: ", results)
return results.msg return results.msg
} }
@@ -82,7 +82,7 @@ func matchRLevel(l *rlevel, toks []string, results *RetainResult) {
if l == nil { if l == nil {
return return
} }
// brokerLog.Info("l info :", l.nodes) // log.Info("l info :", l.nodes)
if t == "#" { if t == "#" {
for _, n := range l.nodes { for _, n := range l.nodes {
n.GetAll(results) n.GetAll(results)
@@ -111,7 +111,7 @@ func matchRLevel(l *rlevel, toks []string, results *RetainResult) {
} }
func (r *rnode) GetAll(results *RetainResult) { func (r *rnode) GetAll(results *RetainResult) {
// brokerLog.Info("node 's message: ", string(r.msg)) // log.Info("node 's message: ", string(r.msg))
if r.msg != nil { if r.msg != nil {
results.msg = append(results.msg, r.msg) results.msg = append(results.msg, r.msg)
} }

View File

@@ -211,7 +211,7 @@ func (s *Sublist) Match(topic string) *SublistResult {
tokens, err := PublishTopicCheckAndSpilt(topic) tokens, err := PublishTopicCheckAndSpilt(topic)
if err != nil { if err != nil {
brokerLog.Error("\tserver/sublist.go: ", zap.Error(err)) log.Error("\tserver/sublist.go: ", zap.Error(err))
return nil return nil
} }

28
broker/worker.go Normal file
View File

@@ -0,0 +1,28 @@
package broker
import "sync"
type Work struct {
WorkerPool *sync.Pool
Message *Message
}
type Worker struct {
WorkerChannel chan Work
}
func NewWorker() Worker {
w := Worker{WorkerChannel: make(chan Work)}
return w.Start()
}
func (w Worker) Start() Worker {
go func() {
for work := range w.WorkerChannel {
ProcessMessage(work.Message)
// put the worker back
work.WorkerPool.Put(w)
}
}()
return w
}

View File

@@ -8,43 +8,40 @@ import (
) )
var ( var (
// env can be setup at build time with Go Linker. Value could be prod or whatever else for dev env logInstance *zap.Logger
instance *zap.Logger
logCfg zap.Config
) )
// NewDevLogger return a logger for dev builds // InitDevLogger instanciate a logger for dev builds
func NewDevLogger() (*zap.Logger, error) { func InitDevLogger() {
logCfg := zap.NewDevelopmentConfig() logCfg := zap.NewDevelopmentConfig()
return logCfg.Build() logInstance, _ = logCfg.Build()
} }
// NewProdLogger return a logger for production builds // InitProdLogger instanciate a logger for production builds
func NewProdLogger() (*zap.Logger, error) { func InitProdLogger() {
logCfg := zap.NewProductionConfig() logCfg := zap.NewProductionConfig()
logCfg.DisableStacktrace = true logCfg.DisableStacktrace = true
logCfg.Level = zap.NewAtomicLevelAt(zap.InfoLevel) logCfg.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
return logCfg.Build() logInstance, _ = logCfg.Build()
} }
func InitLogger(debug bool) { func InitLogger(debug bool) {
var err error var err error
var log *zap.Logger
if debug { if debug {
log, err = NewDevLogger() InitDevLogger()
} else { } else {
log, err = NewProdLogger() InitProdLogger()
} }
if err != nil { if err != nil {
panic("Unable to create a logger.") panic("Unable to create a logger.")
} }
defer log.Sync() logInstance.Debug("Logger initialization succeeded")
log.Debug("Logger initialization succeeded")
instance = log.Named("hmq")
} }
// Get return a *zap.Logger instance // Get the existing *zap.Logger instance. If none have been created, it'll instanciate de dev logger
func Get() *zap.Logger { func Get() *zap.Logger {
return instance if logInstance == nil {
InitDevLogger()
}
return logInstance
} }

View File

@@ -19,15 +19,11 @@ func TestGet(t *testing.T) {
} }
func TestNewDevLogger(t *testing.T) { func TestNewDevLogger(t *testing.T) {
logger, err := NewDevLogger() InitDevLogger()
assert.True(t, Get().Core().Enabled(zap.DebugLevel))
assert.Nil(t, err)
assert.True(t, logger.Core().Enabled(zap.DebugLevel))
} }
func TestNewProdLogger(t *testing.T) { func TestNewProdLogger(t *testing.T) {
logger, err := NewProdLogger() InitProdLogger()
assert.False(t, Get().Core().Enabled(zap.DebugLevel))
assert.Nil(t, err)
assert.False(t, logger.Core().Enabled(zap.DebugLevel))
} }

View File

@@ -13,6 +13,7 @@ import (
"runtime" "runtime"
"github.com/fhmq/hmq/broker" "github.com/fhmq/hmq/broker"
"github.com/fhmq/hmq/logger"
) )
func main() { func main() {
@@ -22,8 +23,8 @@ func main() {
fmt.Println("configure broker config error: ", err) fmt.Println("configure broker config error: ", err)
return return
} }
logger.InitLogger(config.Debug)
b, err := broker.NewBroker(config) b, err := broker.NewBroker(config, logger.Get())
if err != nil { if err != nil {
fmt.Println("New Broker error: ", err) fmt.Println("New Broker error: ", err)
return return