modify_message_pool

This commit is contained in:
zhouyuyan
2018-02-24 13:19:43 +08:00
parent 47500910e1
commit c0fea6a5ba
8 changed files with 134 additions and 126 deletions

View File

@@ -3,13 +3,10 @@
package broker package broker
import ( import (
"strings"
"github.com/fhmq/hmq/lib/acl" "github.com/fhmq/hmq/lib/acl"
"go.uber.org/zap"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"go.uber.org/zap"
"strings"
) )
const ( const (
@@ -44,10 +41,10 @@ func (b *Broker) handleFsEvent(event fsnotify.Event) error {
case b.config.AclConf: case b.config.AclConf:
if event.Op&fsnotify.Write == fsnotify.Write || if event.Op&fsnotify.Write == fsnotify.Write ||
event.Op&fsnotify.Create == fsnotify.Create { event.Op&fsnotify.Create == fsnotify.Create {
brokerLog.Info("text:handling acl config change event:", zap.String("filename", event.Name)) log.Info("text:handling acl config change event:", zap.String("filename", event.Name))
aclconfig, err := acl.AclConfigLoad(event.Name) aclconfig, err := acl.AclConfigLoad(event.Name)
if err != nil { if err != nil {
brokerLog.Error("aclconfig change failed, load acl conf error: ", zap.Error(err)) log.Error("aclconfig change failed, load acl conf error: ", zap.Error(err))
return err return err
} }
b.AclConfig = aclconfig b.AclConfig = aclconfig
@@ -60,24 +57,24 @@ func (b *Broker) StartAclWatcher() {
go func() { go func() {
wch, e := fsnotify.NewWatcher() wch, e := fsnotify.NewWatcher()
if e != nil { if e != nil {
brokerLog.Error("start monitor acl config file error,", zap.Error(e)) log.Error("start monitor acl config file error,", zap.Error(e))
return return
} }
defer wch.Close() defer wch.Close()
for _, i := range watchList { for _, i := range watchList {
if err := wch.Add(i); err != nil { if err := wch.Add(i); err != nil {
brokerLog.Error("start monitor acl config file error,", zap.Error(err)) log.Error("start monitor acl config file error,", zap.Error(err))
return return
} }
} }
brokerLog.Info("watching acl config file change...") log.Info("watching acl config file change...")
for { for {
select { select {
case evt := <-wch.Events: case evt := <-wch.Events:
b.handleFsEvent(evt) b.handleFsEvent(evt)
case err := <-wch.Errors: case err := <-wch.Errors:
brokerLog.Error("error:", zap.Error(err)) log.Error("error:", zap.Error(err))
} }
} }
}() }()

View File

@@ -4,25 +4,24 @@ package broker
import ( import (
"crypto/tls" "crypto/tls"
"github.com/eclipse/paho.mqtt.golang/packets"
"github.com/fhmq/hmq/lib/acl"
"github.com/fhmq/hmq/pool"
"github.com/segmentio/fasthash/fnv1a"
"github.com/shirou/gopsutil/mem"
"go.uber.org/zap"
"golang.org/x/net/websocket"
"net" "net"
"net/http" "net/http"
"runtime/debug" "runtime/debug"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/fhmq/hmq/lib/acl"
"github.com/fhmq/hmq/pool"
"github.com/eclipse/paho.mqtt.golang/packets"
"github.com/shirou/gopsutil/mem"
"go.uber.org/zap"
"golang.org/x/net/websocket"
) )
var ( const (
brokerLog *zap.Logger MessagePoolNum = 1024
MessagePoolMessageNum = 1024
) )
type Message struct { type Message struct {
@@ -43,12 +42,21 @@ type Broker struct {
remotes sync.Map remotes sync.Map
nodes map[string]interface{} nodes map[string]interface{}
clusterPool chan *Message clusterPool chan *Message
messagePool chan *Message messagePool []chan *Message
sl *Sublist sl *Sublist
rl *RetainList rl *RetainList
queues map[string]int queues map[string]int
} }
func newMessagePool() []chan *Message {
pool := make([]chan *Message, 0)
for i := 0; i < MessagePoolNum; i++ {
ch := make(chan *Message, MessagePoolMessageNum)
pool = append(pool, ch)
}
return pool
}
func NewBroker(config *Config) (*Broker, error) { func NewBroker(config *Config) (*Broker, error) {
b := &Broker{ b := &Broker{
id: GenUniqueId(), id: GenUniqueId(),
@@ -59,12 +67,12 @@ func NewBroker(config *Config) (*Broker, error) {
nodes: make(map[string]interface{}), nodes: make(map[string]interface{}),
queues: make(map[string]int), queues: make(map[string]int),
clusterPool: make(chan *Message), clusterPool: make(chan *Message),
messagePool: make(chan *Message), messagePool: newMessagePool(),
} }
if b.config.TlsPort != "" { if b.config.TlsPort != "" {
tlsconfig, err := NewTLSConfig(b.config.TlsInfo) tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
if err != nil { if err != nil {
brokerLog.Error("new tlsConfig error", zap.Error(err)) log.Error("new tlsConfig error", zap.Error(err))
return nil, err return nil, err
} }
b.tlsConfig = tlsconfig b.tlsConfig = tlsconfig
@@ -72,7 +80,7 @@ func NewBroker(config *Config) (*Broker, error) {
if b.config.Acl { if b.config.Acl {
aclconfig, err := acl.AclConfigLoad(b.config.AclConf) aclconfig, err := acl.AclConfigLoad(b.config.AclConf)
if err != nil { if err != nil {
brokerLog.Error("Load acl conf error", zap.Error(err)) log.Error("Load acl conf error", zap.Error(err))
return nil, err return nil, err
} }
b.AclConfig = aclconfig b.AclConfig = aclconfig
@@ -82,22 +90,26 @@ func NewBroker(config *Config) (*Broker, error) {
} }
func (b *Broker) StartDispatcher() { func (b *Broker) StartDispatcher() {
for { for _, mpool := range b.messagePool {
msg, ok := <-b.messagePool go func(ch chan *Message) {
if !ok { for {
brokerLog.Error("read message from client channel error") msg, ok := <-ch
return if !ok {
} log.Error("read message from client channel error")
b.wpool.Submit(func() { return
ProcessMessage(msg) }
}) b.wpool.Submit(func() {
ProcessMessage(msg)
})
}
}(mpool)
} }
} }
func (b *Broker) Start() { func (b *Broker) Start() {
if b == nil { if b == nil {
brokerLog.Error("broker is null") log.Error("broker is null")
return return
} }
go b.StartDispatcher() go b.StartDispatcher()
@@ -149,7 +161,7 @@ func StateMonitor() {
func (b *Broker) StartWebsocketListening() { func (b *Broker) StartWebsocketListening() {
path := b.config.WsPath path := b.config.WsPath
hp := ":" + b.config.WsPort hp := ":" + b.config.WsPort
brokerLog.Info("Start Websocket Listener on:", zap.String("hp", hp), zap.String("path", path)) log.Info("Start Websocket Listener on:", zap.String("hp", hp), zap.String("path", path))
http.Handle(path, websocket.Handler(b.wsHandler)) http.Handle(path, websocket.Handler(b.wsHandler))
var err error var err error
if b.config.WsTLS { if b.config.WsTLS {
@@ -158,7 +170,7 @@ func (b *Broker) StartWebsocketListening() {
err = http.ListenAndServe(hp, nil) err = http.ListenAndServe(hp, nil)
} }
if err != nil { if err != nil {
brokerLog.Error("ListenAndServe:" + err.Error()) log.Error("ListenAndServe:" + err.Error())
return return
} }
} }
@@ -167,7 +179,7 @@ func (b *Broker) wsHandler(ws *websocket.Conn) {
// io.Copy(ws, ws) // io.Copy(ws, ws)
atomic.AddUint64(&b.cid, 1) atomic.AddUint64(&b.cid, 1)
ws.PayloadType = websocket.BinaryFrame ws.PayloadType = websocket.BinaryFrame
b.handleConnection(CLIENT, ws, b.cid) b.handleConnection(CLIENT, ws)
} }
func (b *Broker) StartClientListening(Tls bool) { func (b *Broker) StartClientListening(Tls bool) {
@@ -177,14 +189,14 @@ func (b *Broker) StartClientListening(Tls bool) {
if Tls { if Tls {
hp = b.config.TlsHost + ":" + b.config.TlsPort hp = b.config.TlsHost + ":" + b.config.TlsPort
l, err = tls.Listen("tcp", hp, b.tlsConfig) l, err = tls.Listen("tcp", hp, b.tlsConfig)
brokerLog.Info("Start TLS Listening client on ", zap.String("hp", hp)) log.Info("Start TLS Listening client on ", zap.String("hp", hp))
} else { } else {
hp := b.config.Host + ":" + b.config.Port hp := b.config.Host + ":" + b.config.Port
l, err = net.Listen("tcp", hp) l, err = net.Listen("tcp", hp)
brokerLog.Info("Start Listening client on ", zap.String("hp", hp)) log.Info("Start Listening client on ", zap.String("hp", hp))
} }
if err != nil { if err != nil {
brokerLog.Error("Error listening on ", zap.Error(err)) log.Error("Error listening on ", zap.Error(err))
return return
} }
tmpDelay := 10 * ACCEPT_MIN_SLEEP tmpDelay := 10 * ACCEPT_MIN_SLEEP
@@ -192,7 +204,7 @@ func (b *Broker) StartClientListening(Tls bool) {
conn, err := l.Accept() conn, err := l.Accept()
if err != nil { if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() { if ne, ok := err.(net.Error); ok && ne.Temporary() {
brokerLog.Error("Temporary Client Accept Error(%v), sleeping %dms", log.Error("Temporary Client Accept Error(%v), sleeping %dms",
zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond)) zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond))
time.Sleep(tmpDelay) time.Sleep(tmpDelay)
tmpDelay *= 2 tmpDelay *= 2
@@ -200,13 +212,13 @@ func (b *Broker) StartClientListening(Tls bool) {
tmpDelay = ACCEPT_MAX_SLEEP tmpDelay = ACCEPT_MAX_SLEEP
} }
} else { } else {
brokerLog.Error("Accept error: %v", zap.Error(err)) log.Error("Accept error: %v", zap.Error(err))
} }
continue continue
} }
tmpDelay = ACCEPT_MIN_SLEEP tmpDelay = ACCEPT_MIN_SLEEP
atomic.AddUint64(&b.cid, 1) atomic.AddUint64(&b.cid, 1)
go b.handleConnection(CLIENT, conn, b.cid) go b.handleConnection(CLIENT, conn)
} }
} }
@@ -219,7 +231,7 @@ func (b *Broker) Handshake(conn net.Conn) bool {
// Force handshake // Force handshake
if err := nc.Handshake(); err != nil { if err := nc.Handshake(); err != nil {
brokerLog.Error("TLS handshake error, ", zap.Error(err)) log.Error("TLS handshake error, ", zap.Error(err))
return false return false
} }
nc.SetReadDeadline(time.Time{}) nc.SetReadDeadline(time.Time{})
@@ -235,28 +247,27 @@ func TlsTimeout(conn *tls.Conn) {
} }
cs := nc.ConnectionState() cs := nc.ConnectionState()
if !cs.HandshakeComplete { if !cs.HandshakeComplete {
brokerLog.Error("TLS handshake timeout") log.Error("TLS handshake timeout")
nc.Close() nc.Close()
} }
} }
func (b *Broker) StartClusterListening() { func (b *Broker) StartClusterListening() {
var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port
brokerLog.Info("Start Listening cluster on ", zap.String("hp", hp)) log.Info("Start Listening cluster on ", zap.String("hp", hp))
l, e := net.Listen("tcp", hp) l, e := net.Listen("tcp", hp)
if e != nil { if e != nil {
brokerLog.Error("Error listening on ", zap.Error(e)) log.Error("Error listening on ", zap.Error(e))
return return
} }
var idx uint64 = 0
tmpDelay := 10 * ACCEPT_MIN_SLEEP tmpDelay := 10 * ACCEPT_MIN_SLEEP
for { for {
conn, err := l.Accept() conn, err := l.Accept()
if err != nil { if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() { if ne, ok := err.(net.Error); ok && ne.Temporary() {
brokerLog.Error("Temporary Client Accept Error(%v), sleeping %dms", log.Error("Temporary Client Accept Error(%v), sleeping %dms",
zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond)) zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond))
time.Sleep(tmpDelay) time.Sleep(tmpDelay)
tmpDelay *= 2 tmpDelay *= 2
@@ -264,30 +275,30 @@ func (b *Broker) StartClusterListening() {
tmpDelay = ACCEPT_MAX_SLEEP tmpDelay = ACCEPT_MAX_SLEEP
} }
} else { } else {
brokerLog.Error("Accept error: %v", zap.Error(err)) log.Error("Accept error: %v", zap.Error(err))
} }
continue continue
} }
tmpDelay = ACCEPT_MIN_SLEEP tmpDelay = ACCEPT_MIN_SLEEP
go b.handleConnection(ROUTER, conn, idx) go b.handleConnection(ROUTER, conn)
} }
} }
func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { func (b *Broker) handleConnection(typ int, conn net.Conn) {
//process connect packet //process connect packet
packet, err := packets.ReadPacket(conn) packet, err := packets.ReadPacket(conn)
if err != nil { if err != nil {
brokerLog.Error("read connect packet error: ", zap.Error(err)) log.Error("read connect packet error: ", zap.Error(err))
return return
} }
if packet == nil { if packet == nil {
brokerLog.Error("received nil packet") log.Error("received nil packet")
return return
} }
msg, ok := packet.(*packets.ConnectPacket) msg, ok := packet.(*packets.ConnectPacket)
if !ok { if !ok {
brokerLog.Error("received msg that was not Connect") log.Error("received msg that was not Connect")
return return
} }
connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket) connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket)
@@ -295,7 +306,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
connack.SessionPresent = msg.CleanSession connack.SessionPresent = msg.CleanSession
err = connack.Write(conn) err = connack.Write(conn)
if err != nil { if err != nil {
brokerLog.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier)) log.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier))
return return
} }
@@ -335,7 +346,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
case CLIENT: case CLIENT:
old, exist = b.clients.Load(cid) old, exist = b.clients.Load(cid)
if exist { if exist {
brokerLog.Warn("client exist, close old...", zap.String("clientID", c.info.clientID)) log.Warn("client exist, close old...", zap.String("clientID", c.info.clientID))
ol, ok := old.(*client) ol, ok := old.(*client)
if ok { if ok {
ol.Close() ol.Close()
@@ -345,7 +356,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
case ROUTER: case ROUTER:
old, exist = b.routes.Load(cid) old, exist = b.routes.Load(cid)
if exist { if exist {
brokerLog.Warn("router exist, close old...") log.Warn("router exist, close old...")
ol, ok := old.(*client) ol, ok := old.(*client)
if ok { if ok {
ol.Close() ol.Close()
@@ -354,7 +365,9 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
b.routes.Store(cid, c) b.routes.Store(cid, c)
} }
c.readLoop(b.messagePool) mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum]
c.readLoop(mpool)
} }
func (b *Broker) ConnectToDiscovery() { func (b *Broker) ConnectToDiscovery() {
@@ -364,8 +377,8 @@ func (b *Broker) ConnectToDiscovery() {
for { for {
conn, err = net.Dial("tcp", b.config.Router) conn, err = net.Dial("tcp", b.config.Router)
if err != nil { if err != nil {
brokerLog.Error("Error trying to connect to route: ", zap.Error(err)) log.Error("Error trying to connect to route: ", zap.Error(err))
brokerLog.Debug("Connect to route timeout ,retry...") log.Debug("Connect to route timeout ,retry...")
if 0 == tempDelay { if 0 == tempDelay {
tempDelay = 1 * time.Second tempDelay = 1 * time.Second
@@ -381,7 +394,7 @@ func (b *Broker) ConnectToDiscovery() {
} }
break break
} }
brokerLog.Debug("connect to router success :", zap.String("Router", b.config.Router)) log.Debug("connect to router success :", zap.String("Router", b.config.Router))
cid := b.id cid := b.id
info := info{ info := info{
@@ -409,7 +422,7 @@ func (b *Broker) processClusterInfo() {
for { for {
msg, ok := <-b.clusterPool msg, ok := <-b.clusterPool
if !ok { if !ok {
brokerLog.Error("read message from cluster channel error") log.Error("read message from cluster channel error")
return return
} }
ProcessMessage(msg) ProcessMessage(msg)
@@ -431,13 +444,13 @@ func (b *Broker) connectRouter(id, addr string) {
conn, err = net.Dial("tcp", addr) conn, err = net.Dial("tcp", addr)
if err != nil { if err != nil {
brokerLog.Error("Error trying to connect to route: ", zap.Error(err)) log.Error("Error trying to connect to route: ", zap.Error(err))
if retryTimes > 50 { if retryTimes > 50 {
return return
} }
brokerLog.Debug("Connect to route timeout ,retry...") log.Debug("Connect to route timeout ,retry...")
if 0 == timeDelay { if 0 == timeDelay {
timeDelay = 1 * time.Second timeDelay = 1 * time.Second
@@ -477,7 +490,8 @@ func (b *Broker) connectRouter(id, addr string) {
c.SendConnect() c.SendConnect()
go c.readLoop(b.messagePool) mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum]
go c.readLoop(mpool)
go c.StartPing() go c.StartPing()
} }
@@ -536,7 +550,7 @@ func (b *Broker) SendLocalSubsToRouter(c *client) {
if len(subInfo.Topics) > 0 { if len(subInfo.Topics) > 0 {
err := c.WriterPacket(subInfo) err := c.WriterPacket(subInfo)
if err != nil { if err != nil {
brokerLog.Error("Send localsubs To Router error :", zap.Error(err)) log.Error("Send localsubs To Router error :", zap.Error(err))
} }
} }
} }
@@ -553,7 +567,7 @@ func (b *Broker) BroadcastInfoMessage(remoteID string, msg *packets.PublishPacke
return true return true
}) })
// brokerLog.Info("BroadcastInfoMessage success ") // log.Info("BroadcastInfoMessage success ")
} }
func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) { func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) {
@@ -565,7 +579,7 @@ func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) {
} }
return true return true
}) })
// brokerLog.Info("BroadcastSubscribeMessage remotes: ", s.remotes) // log.Info("BroadcastSubscribeMessage remotes: ", s.remotes)
} }
func (b *Broker) removeClient(c *client) { func (b *Broker) removeClient(c *client) {
@@ -579,7 +593,7 @@ func (b *Broker) removeClient(c *client) {
case REMOTE: case REMOTE:
b.remotes.Delete(clientId) b.remotes.Delete(clientId)
} }
// brokerLog.Info("delete client ,", clientId) // log.Info("delete client ,", clientId)
} }
func (b *Broker) PublishMessage(packet *packets.PublishPacket) { func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
@@ -593,7 +607,7 @@ func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
if sub != nil { if sub != nil {
err := sub.client.WriterPacket(packet) err := sub.client.WriterPacket(packet)
if err != nil { if err != nil {
brokerLog.Error("process message for psub error, ", zap.Error(err)) log.Error("process message for psub error, ", zap.Error(err))
} }
} }
} }

View File

@@ -3,14 +3,13 @@
package broker package broker
import ( import (
"github.com/eclipse/paho.mqtt.golang/packets"
"go.uber.org/zap"
"net" "net"
"reflect" "reflect"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/eclipse/paho.mqtt.golang/packets"
"go.uber.org/zap"
) )
const ( const (
@@ -100,7 +99,7 @@ func (c *client) keepAlive(ch chan int, mpool chan *Message) {
timer.Reset(keepalive) timer.Reset(keepalive)
continue continue
} }
brokerLog.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive)) log.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive))
msg := &Message{client: c, packet: DisconnectdPacket} msg := &Message{client: c, packet: DisconnectdPacket}
mpool <- msg mpool <- msg
timer.Stop() timer.Stop()
@@ -125,7 +124,7 @@ func (c *client) readLoop(mpool chan *Message) {
for { for {
packet, err := packets.ReadPacket(nc) packet, err := packets.ReadPacket(nc)
if err != nil { if err != nil {
brokerLog.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
break break
} }
// keepalive channel // keepalive channel
@@ -149,7 +148,7 @@ func ProcessMessage(msg *Message) {
return return
} }
brokerLog.Debug("Recv message:", zap.String("message type", reflect.TypeOf(msg.packet).String()[9:]), zap.String("ClientID", c.info.clientID)) log.Debug("Recv message:", zap.String("message type", reflect.TypeOf(msg.packet).String()[9:]), zap.String("ClientID", c.info.clientID))
switch ca.(type) { switch ca.(type) {
case *packets.ConnackPacket: case *packets.ConnackPacket:
case *packets.ConnectPacket: case *packets.ConnectPacket:
@@ -174,7 +173,7 @@ func ProcessMessage(msg *Message) {
case *packets.DisconnectPacket: case *packets.DisconnectPacket:
c.Close() c.Close()
default: default:
brokerLog.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID)) log.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID))
} }
} }
@@ -190,7 +189,7 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
} }
if !c.CheckTopicAuth(PUB, topic) { if !c.CheckTopicAuth(PUB, topic) {
brokerLog.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) log.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
return return
} }
@@ -201,21 +200,21 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket) puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
puback.MessageID = packet.MessageID puback.MessageID = packet.MessageID
if err := c.WriterPacket(puback); err != nil { if err := c.WriterPacket(puback); err != nil {
brokerLog.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return return
} }
c.ProcessPublishMessage(packet) c.ProcessPublishMessage(packet)
case QosExactlyOnce: case QosExactlyOnce:
return return
default: default:
brokerLog.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID)) log.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID))
return return
} }
if packet.Retain { if packet.Retain {
if b := c.broker; b != nil { if b := c.broker; b != nil {
err := b.rl.Insert(topic, packet) err := b.rl.Insert(topic, packet)
if err != nil { if err != nil {
brokerLog.Error("Insert Retain Message error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("Insert Retain Message error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
} }
} }
} }
@@ -235,7 +234,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
topic := packet.TopicName topic := packet.TopicName
r := b.sl.Match(topic) r := b.sl.Match(topic)
// brokerLog.Info("psubs num: ", len(r.psubs)) // log.Info("psubs num: ", len(r.psubs))
if len(r.qsubs) == 0 && len(r.psubs) == 0 { if len(r.qsubs) == 0 && len(r.psubs) == 0 {
return return
} }
@@ -249,7 +248,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if sub != nil { if sub != nil {
err := sub.client.WriterPacket(packet) err := sub.client.WriterPacket(packet)
if err != nil { if err != nil {
brokerLog.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
} }
} }
} }
@@ -259,7 +258,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
t := "$queue/" + topic t := "$queue/" + topic
cnt, exist := b.queues[t] cnt, exist := b.queues[t]
if exist { if exist {
// brokerLog.Info("queue index : ", cnt) // log.Info("queue index : ", cnt)
for _, sub := range r.qsubs { for _, sub := range r.qsubs {
if sub.client.typ == ROUTER { if sub.client.typ == ROUTER {
if typ != CLIENT { if typ != CLIENT {
@@ -275,7 +274,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if sub != nil { if sub != nil {
err := sub.client.WriterPacket(packet) err := sub.client.WriterPacket(packet)
if err != nil { if err != nil {
brokerLog.Error("send publish error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("send publish error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
} }
} }
@@ -329,7 +328,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
t := topic t := topic
//check topic auth for client //check topic auth for client
if !c.CheckTopicAuth(SUB, topic) { if !c.CheckTopicAuth(SUB, topic) {
brokerLog.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) log.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
retcodes = append(retcodes, QosFailure) retcodes = append(retcodes, QosFailure)
continue continue
} }
@@ -376,7 +375,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
} }
err := b.sl.Insert(sub) err := b.sl.Insert(sub)
if err != nil { if err != nil {
brokerLog.Error("Insert subscription error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("Insert subscription error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
retcodes = append(retcodes, QosFailure) retcodes = append(retcodes, QosFailure)
} else { } else {
retcodes = append(retcodes, qoss[i]) retcodes = append(retcodes, qoss[i])
@@ -386,7 +385,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
err := c.WriterPacket(suback) err := c.WriterPacket(suback)
if err != nil { if err != nil {
brokerLog.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return return
} }
//broadcast subscribe message //broadcast subscribe message
@@ -398,7 +397,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
for _, t := range topics { for _, t := range topics {
packets := b.rl.Match(t) packets := b.rl.Match(t)
for _, packet := range packets { for _, packet := range packets {
brokerLog.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID)) log.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID))
if packet != nil { if packet != nil {
c.WriterPacket(packet) c.WriterPacket(packet)
} }
@@ -445,7 +444,7 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
err := c.WriterPacket(unsuback) err := c.WriterPacket(unsuback)
if err != nil { if err != nil {
brokerLog.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return return
} }
// //process ubsubscribe message // //process ubsubscribe message
@@ -474,7 +473,7 @@ func (c *client) ProcessPing() {
resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket) resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket)
err := c.WriterPacket(resp) err := c.WriterPacket(resp)
if err != nil { if err != nil {
brokerLog.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return return
} }
} }
@@ -505,7 +504,7 @@ func (c *client) Close() {
for _, sub := range subs { for _, sub := range subs {
err := b.sl.Remove(sub) err := b.sl.Remove(sub)
if err != nil { if err != nil {
brokerLog.Error("closed client but remove sublist error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("closed client but remove sublist error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
} }
} }
if c.typ == CLIENT { if c.typ == CLIENT {

View File

@@ -9,11 +9,10 @@ import (
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
"io/ioutil"
"os"
"github.com/fhmq/hmq/logger" "github.com/fhmq/hmq/logger"
"go.uber.org/zap" "go.uber.org/zap"
"io/ioutil"
"os"
) )
type Config struct { type Config struct {
@@ -52,6 +51,10 @@ var DefaultConfig *Config = &Config{
Acl: false, Acl: false,
} }
var (
log *zap.Logger
)
func showHelp() { func showHelp() {
fmt.Printf("%s\n", usageStr) fmt.Printf("%s\n", usageStr)
os.Exit(0) os.Exit(0)
@@ -105,7 +108,7 @@ func ConfigureConfig(args []string) (*Config, error) {
}) })
logger.InitLogger(config.Debug) logger.InitLogger(config.Debug)
brokerLog = logger.Get().Named("Broker") log = logger.Get().Named("Broker")
if configFile != "" { if configFile != "" {
tmpConfig, e := LoadConfig(configFile) tmpConfig, e := LoadConfig(configFile)
@@ -128,15 +131,15 @@ func LoadConfig(filename string) (*Config, error) {
content, err := ioutil.ReadFile(filename) content, err := ioutil.ReadFile(filename)
if err != nil { if err != nil {
brokerLog.Error("Read config file error: ", zap.Error(err)) log.Error("Read config file error: ", zap.Error(err))
return nil, err return nil, err
} }
// brokerLog.Info(string(content)) // log.Info(string(content))
var config Config var config Config
err = json.Unmarshal(content, &config) err = json.Unmarshal(content, &config)
if err != nil { if err != nil {
brokerLog.Error("Unmarshal config file error: ", zap.Error(err)) log.Error("Unmarshal config file error: ", zap.Error(err))
return nil, err return nil, err
} }
@@ -168,7 +171,7 @@ func (config *Config) check() error {
if config.TlsPort != "" { if config.TlsPort != "" {
if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" { if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" {
brokerLog.Error("tls config error, no cert or key file.") log.Error("tls config error, no cert or key file.")
return errors.New("tls config error, no cert or key file.") return errors.New("tls config error, no cert or key file.")
} }
if config.TlsHost == "" { if config.TlsHost == "" {

View File

@@ -4,12 +4,10 @@ package broker
import ( import (
"fmt" "fmt"
"time" simplejson "github.com/bitly/go-simplejson"
"github.com/eclipse/paho.mqtt.golang/packets" "github.com/eclipse/paho.mqtt.golang/packets"
"go.uber.org/zap" "go.uber.org/zap"
"time"
simplejson "github.com/bitly/go-simplejson"
) )
func (c *client) SendInfo() { func (c *client) SendInfo() {
@@ -21,7 +19,7 @@ func (c *client) SendInfo() {
infoMsg := NewInfo(c.broker.id, url, false) infoMsg := NewInfo(c.broker.id, url, false)
err := c.WriterPacket(infoMsg) err := c.WriterPacket(infoMsg)
if err != nil { if err != nil {
brokerLog.Error("send info message error, ", zap.Error(err)) log.Error("send info message error, ", zap.Error(err))
return return
} }
} }
@@ -34,7 +32,7 @@ func (c *client) StartPing() {
case <-timeTicker.C: case <-timeTicker.C:
err := c.WriterPacket(ping) err := c.WriterPacket(ping)
if err != nil { if err != nil {
brokerLog.Error("ping error: ", zap.Error(err)) log.Error("ping error: ", zap.Error(err))
c.Close() c.Close()
} }
case _, ok := <-c.closed: case _, ok := <-c.closed:
@@ -57,10 +55,10 @@ func (c *client) SendConnect() {
m.Keepalive = uint16(60) m.Keepalive = uint16(60)
err := c.WriterPacket(m) err := c.WriterPacket(m)
if err != nil { if err != nil {
brokerLog.Error("send connect message error, ", zap.Error(err)) log.Error("send connect message error, ", zap.Error(err))
return return
} }
brokerLog.Info("send connect success") log.Info("send connect success")
} }
func NewInfo(sid, url string, isforword bool) *packets.PublishPacket { func NewInfo(sid, url string, isforword bool) *packets.PublishPacket {
@@ -69,7 +67,7 @@ func NewInfo(sid, url string, isforword bool) *packets.PublishPacket {
pub.TopicName = BrokerInfoTopic pub.TopicName = BrokerInfoTopic
pub.Retain = false pub.Retain = false
info := fmt.Sprintf(`{"brokerID":"%s","brokerUrl":"%s"}`, sid, url) info := fmt.Sprintf(`{"brokerID":"%s","brokerUrl":"%s"}`, sid, url)
// brokerLog.Info("new info", string(info)) // log.Info("new info", string(info))
pub.Payload = []byte(info) pub.Payload = []byte(info)
return pub return pub
} }
@@ -81,17 +79,17 @@ func (c *client) ProcessInfo(packet *packets.PublishPacket) {
return return
} }
brokerLog.Info("recv remoteInfo: ", zap.String("payload", string(packet.Payload))) log.Info("recv remoteInfo: ", zap.String("payload", string(packet.Payload)))
js, err := simplejson.NewJson(packet.Payload) js, err := simplejson.NewJson(packet.Payload)
if err != nil { if err != nil {
brokerLog.Warn("parse info message err", zap.Error(err)) log.Warn("parse info message err", zap.Error(err))
return return
} }
routes, err := js.Get("data").Map() routes, err := js.Get("data").Map()
if routes == nil { if routes == nil {
brokerLog.Error("receive info message error, ", zap.Error(err)) log.Error("receive info message error, ", zap.Error(err))
return return
} }

View File

@@ -1,9 +1,8 @@
package broker package broker
import ( import (
"sync"
"github.com/eclipse/paho.mqtt.golang/packets" "github.com/eclipse/paho.mqtt.golang/packets"
"sync"
) )
type RetainList struct { type RetainList struct {
@@ -39,7 +38,7 @@ func (r *RetainList) Insert(topic string, buf *packets.PublishPacket) error {
if err != nil { if err != nil {
return err return err
} }
// brokerLog.Info("insert tokens:", tokens) // log.Info("insert tokens:", tokens)
r.Lock() r.Lock()
l := r.root l := r.root
@@ -72,7 +71,7 @@ func (r *RetainList) Match(topic string) []*packets.PublishPacket {
l := r.root l := r.root
matchRLevel(l, tokens, results) matchRLevel(l, tokens, results)
r.Unlock() r.Unlock()
// brokerLog.Info("results: ", results) // log.Info("results: ", results)
return results.msg return results.msg
} }
@@ -82,7 +81,7 @@ func matchRLevel(l *rlevel, toks []string, results *RetainResult) {
if l == nil { if l == nil {
return return
} }
// brokerLog.Info("l info :", l.nodes) // log.Info("l info :", l.nodes)
if t == "#" { if t == "#" {
for _, n := range l.nodes { for _, n := range l.nodes {
n.GetAll(results) n.GetAll(results)
@@ -111,7 +110,7 @@ func matchRLevel(l *rlevel, toks []string, results *RetainResult) {
} }
func (r *rnode) GetAll(results *RetainResult) { func (r *rnode) GetAll(results *RetainResult) {
// brokerLog.Info("node 's message: ", string(r.msg)) // log.Info("node 's message: ", string(r.msg))
if r.msg != nil { if r.msg != nil {
results.msg = append(results.msg, r.msg) results.msg = append(results.msg, r.msg)
} }

View File

@@ -4,9 +4,8 @@ package broker
import ( import (
"errors" "errors"
"sync"
"go.uber.org/zap" "go.uber.org/zap"
"sync"
) )
// A result structure better optimized for queue subs. // A result structure better optimized for queue subs.
@@ -211,7 +210,7 @@ func (s *Sublist) Match(topic string) *SublistResult {
tokens, err := PublishTopicCheckAndSpilt(topic) tokens, err := PublishTopicCheckAndSpilt(topic)
if err != nil { if err != nil {
brokerLog.Error("\tserver/sublist.go: ", zap.Error(err)) log.Error("\tserver/sublist.go: ", zap.Error(err))
return nil return nil
} }

View File

@@ -8,11 +8,10 @@ package main
import ( import (
"fmt" "fmt"
"github.com/fhmq/hmq/broker"
"os" "os"
"os/signal" "os/signal"
"runtime" "runtime"
"github.com/fhmq/hmq/broker"
) )
func main() { func main() {