8 Commits

Author SHA1 Message Date
zhouyuyan
7ce2e3d185 context 2018-04-28 10:59:36 +08:00
zhouyuyan
684584b208 fix write logic 2018-04-28 09:37:37 +08:00
zhouyuyan
56fb4a2d54 fix issue 25 2018-04-28 09:08:28 +08:00
joy.zhou
5ed4728575 Wpool (#23)
* pool

* pool

* wpool
2018-04-04 13:49:52 +08:00
zhouyuyan
c0fea6a5ba modify_message_pool 2018-02-24 13:19:43 +08:00
zhouyuyan
47500910e1 fix broker out painc 2018-02-06 11:01:06 +08:00
joy.zhou
0ff20b6ee2 Update README.md 2018-02-03 13:11:53 +08:00
joy.zhou
7155667f6c Pool (#16)
* add pool

* elastic workerpool

* del buf

* modify usage

* modify readme
2018-02-03 12:42:25 +08:00
11 changed files with 318 additions and 274 deletions

View File

@@ -16,43 +16,26 @@ $ go run main.go
## Usage of hmq: ## Usage of hmq:
~~~ ~~~
Usage of ./hmq: Usage: hmq [options]
-w int
worker num to process message, perfer (client num)/10. (default 1024) Broker Options:
-worker int -w, --worker <number> Worker num to process message, perfer (client num)/10. (default 1024)
worker num to process message, perfer (client num)/10. (default 1024) -p, --port <port> Use port for clients (default: 1883)
-h string --host <host> Network host to listen on. (default "0.0.0.0")
Network host to listen on. (default "0.0.0.0") -ws, --wsport <port> Use port for websocket monitoring
-host string -wsp,--wspath <path> Use path for websocket monitoring
Network host to listen on. (default "0.0.0.0") -c, --config <file> Configuration file
-p string
Port to listen on. (default "1883") Logging Options:
-port string -d, --debug <bool> Enable debugging output (default false)
Port to listen on. (default "1883") -D Debug enabled
-c string
config file for hmq Cluster Options:
-config string -r, --router <rurl> Router who maintenance cluster info
config file for hmq -cp, --clusterport <cluster-port> Cluster listen port for others
-cluster string
Cluster ip from which members can connect. Common Options:
-cluster_listen string -h, --help Show this message
Cluster ip from which members can connect.
-cluster_port string
Cluster port from which members can connect.
-cp string
Cluster port from which members can connect.
-r string
Router who maintenance cluster info
-router string
Router who maintenance cluster info
-ws_path string
path for ws to listen on
-ws_port string
port for ws to listen on
-wspath string
path for ws to listen on
-wsport string
port for ws to listen on
~~~ ~~~
### hmq.config ### hmq.config
@@ -105,6 +88,9 @@ Usage of ./hmq:
### Cluster ### Cluster
```bash ```bash
1, start router for hmq (https://github.com/fhmq/router.git) 1, start router for hmq (https://github.com/fhmq/router.git)
$ go get github.com/fhmq/router
$ cd $GOPATH/github.com/fhmq/router
$ go run main.go
2, config router in hmq.config ("router": "127.0.0.1:9888") 2, config router in hmq.config ("router": "127.0.0.1:9888")
``` ```

View File

@@ -3,13 +3,10 @@
package broker package broker
import ( import (
"strings"
"github.com/fhmq/hmq/lib/acl" "github.com/fhmq/hmq/lib/acl"
"go.uber.org/zap"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"go.uber.org/zap"
"strings"
) )
const ( const (
@@ -44,10 +41,10 @@ func (b *Broker) handleFsEvent(event fsnotify.Event) error {
case b.config.AclConf: case b.config.AclConf:
if event.Op&fsnotify.Write == fsnotify.Write || if event.Op&fsnotify.Write == fsnotify.Write ||
event.Op&fsnotify.Create == fsnotify.Create { event.Op&fsnotify.Create == fsnotify.Create {
brokerLog.Info("text:handling acl config change event:", zap.String("filename", event.Name)) log.Info("text:handling acl config change event:", zap.String("filename", event.Name))
aclconfig, err := acl.AclConfigLoad(event.Name) aclconfig, err := acl.AclConfigLoad(event.Name)
if err != nil { if err != nil {
brokerLog.Error("aclconfig change failed, load acl conf error: ", zap.Error(err)) log.Error("aclconfig change failed, load acl conf error: ", zap.Error(err))
return err return err
} }
b.AclConfig = aclconfig b.AclConfig = aclconfig
@@ -60,24 +57,24 @@ func (b *Broker) StartAclWatcher() {
go func() { go func() {
wch, e := fsnotify.NewWatcher() wch, e := fsnotify.NewWatcher()
if e != nil { if e != nil {
brokerLog.Error("start monitor acl config file error,", zap.Error(e)) log.Error("start monitor acl config file error,", zap.Error(e))
return return
} }
defer wch.Close() defer wch.Close()
for _, i := range watchList { for _, i := range watchList {
if err := wch.Add(i); err != nil { if err := wch.Add(i); err != nil {
brokerLog.Error("start monitor acl config file error,", zap.Error(err)) log.Error("start monitor acl config file error,", zap.Error(err))
return return
} }
} }
brokerLog.Info("watching acl config file change...") log.Info("watching acl config file change...")
for { for {
select { select {
case evt := <-wch.Events: case evt := <-wch.Events:
b.handleFsEvent(evt) b.handleFsEvent(evt)
case err := <-wch.Errors: case err := <-wch.Errors:
brokerLog.Error("error:", zap.Error(err)) log.Error("error:", zap.Error(err))
} }
} }
}() }()

View File

@@ -11,25 +11,17 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/eclipse/paho.mqtt.golang/packets"
"github.com/fhmq/hmq/lib/acl" "github.com/fhmq/hmq/lib/acl"
"github.com/fhmq/hmq/pool" "github.com/fhmq/hmq/pool"
"github.com/eclipse/paho.mqtt.golang/packets"
"github.com/shirou/gopsutil/mem" "github.com/shirou/gopsutil/mem"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
"github.com/fhmq/hmq/logger"
)
var (
brokerLog = logger.Get().Named("Broker")
) )
const ( const (
MessagePoolNum = 1024 MessagePoolNum = 1024
MessageNum = 1024 MessagePoolMessageNum = 1024
) )
type Message struct { type Message struct {
@@ -38,49 +30,49 @@ type Message struct {
} }
type Broker struct { type Broker struct {
id string id string
cid uint64 cid uint64
mu sync.Mutex mu sync.Mutex
config *Config config *Config
tlsConfig *tls.Config tlsConfig *tls.Config
AclConfig *acl.ACLConfig AclConfig *acl.ACLConfig
wpool *pool.WorkerPool wpool *pool.WorkerPool
clients sync.Map clients sync.Map
routes sync.Map routes sync.Map
remotes sync.Map remotes sync.Map
nodes map[string]interface{} nodes map[string]interface{}
clusterChannel chan *Message clusterPool chan *Message
messagePool []chan *Message sl *Sublist
sl *Sublist rl *RetainList
rl *RetainList queues map[string]int
queues map[string]int // messagePool []chan *Message
} }
func newMessagePool() []chan *Message { func newMessagePool() []chan *Message {
mp := make([]chan *Message, 0) pool := make([]chan *Message, 0)
for i := 0; i < MessagePoolNum; i++ { for i := 0; i < MessagePoolNum; i++ {
tempCh := make(chan *Message, MessageNum) ch := make(chan *Message, MessagePoolMessageNum)
mp = append(mp, tempCh) pool = append(pool, ch)
} }
return mp return pool
} }
func NewBroker(config *Config) (*Broker, error) { func NewBroker(config *Config) (*Broker, error) {
b := &Broker{ b := &Broker{
id: GenUniqueId(), id: GenUniqueId(),
config: config, config: config,
wpool: pool.New(config.Worker), wpool: pool.New(config.Worker),
sl: NewSublist(), sl: NewSublist(),
rl: NewRetainList(), rl: NewRetainList(),
nodes: make(map[string]interface{}), nodes: make(map[string]interface{}),
queues: make(map[string]int), queues: make(map[string]int),
clusterChannel: make(chan *Message), clusterPool: make(chan *Message),
messagePool: newMessagePool(), // messagePool: newMessagePool(),
} }
if b.config.TlsPort != "" { if b.config.TlsPort != "" {
tlsconfig, err := NewTLSConfig(b.config.TlsInfo) tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
if err != nil { if err != nil {
brokerLog.Error("new tlsConfig error", zap.Error(err)) log.Error("new tlsConfig error", zap.Error(err))
return nil, err return nil, err
} }
b.tlsConfig = tlsconfig b.tlsConfig = tlsconfig
@@ -88,7 +80,7 @@ func NewBroker(config *Config) (*Broker, error) {
if b.config.Acl { if b.config.Acl {
aclconfig, err := acl.AclConfigLoad(b.config.AclConf) aclconfig, err := acl.AclConfigLoad(b.config.AclConf)
if err != nil { if err != nil {
brokerLog.Error("Load acl conf error", zap.Error(err)) log.Error("Load acl conf error", zap.Error(err))
return nil, err return nil, err
} }
b.AclConfig = aclconfig b.AclConfig = aclconfig
@@ -97,30 +89,26 @@ func NewBroker(config *Config) (*Broker, error) {
return b, nil return b, nil
} }
func (b *Broker) StartDispatcher() { func (b *Broker) SubmitWork(msg *Message) {
for i := 0; i < MessagePoolNum; i++ { if b.wpool == nil {
go func(idx int) { b.wpool = pool.New(b.config.Worker)
for { }
msg, ok := <-b.messagePool[idx]
if !ok { if msg.client.typ == CLUSTER {
brokerLog.Error("read message from client channel error") b.clusterPool <- msg
return } else {
} b.wpool.Submit(func() {
b.wpool.Submit(func() { ProcessMessage(msg)
ProcessMessage(msg) })
})
}
}(i)
} }
} }
func (b *Broker) Start() { func (b *Broker) Start() {
if b == nil { if b == nil {
brokerLog.Error("broker is null") log.Error("broker is null")
return return
} }
go b.StartDispatcher()
//listen clinet over tcp //listen clinet over tcp
if b.config.Port != "" { if b.config.Port != "" {
@@ -169,7 +157,7 @@ func StateMonitor() {
func (b *Broker) StartWebsocketListening() { func (b *Broker) StartWebsocketListening() {
path := b.config.WsPath path := b.config.WsPath
hp := ":" + b.config.WsPort hp := ":" + b.config.WsPort
brokerLog.Info("Start Websocket Listener on:", zap.String("hp", hp), zap.String("path", path)) log.Info("Start Websocket Listener on:", zap.String("hp", hp), zap.String("path", path))
http.Handle(path, websocket.Handler(b.wsHandler)) http.Handle(path, websocket.Handler(b.wsHandler))
var err error var err error
if b.config.WsTLS { if b.config.WsTLS {
@@ -178,17 +166,16 @@ func (b *Broker) StartWebsocketListening() {
err = http.ListenAndServe(hp, nil) err = http.ListenAndServe(hp, nil)
} }
if err != nil { if err != nil {
brokerLog.Error("ListenAndServe:" + err.Error()) log.Error("ListenAndServe:" + err.Error())
return return
} }
} }
func (b *Broker) wsHandler(ws *websocket.Conn) { func (b *Broker) wsHandler(ws *websocket.Conn) {
// io.Copy(ws, ws) // io.Copy(ws, ws)
atomic.AddUint64(&b.cid, 1)
ws.PayloadType = websocket.BinaryFrame ws.PayloadType = websocket.BinaryFrame
b.handleConnection(CLIENT, ws)
idx := atomic.AddUint64(&b.cid, 1)
b.handleConnection(CLIENT, ws, idx)
} }
func (b *Broker) StartClientListening(Tls bool) { func (b *Broker) StartClientListening(Tls bool) {
@@ -198,14 +185,14 @@ func (b *Broker) StartClientListening(Tls bool) {
if Tls { if Tls {
hp = b.config.TlsHost + ":" + b.config.TlsPort hp = b.config.TlsHost + ":" + b.config.TlsPort
l, err = tls.Listen("tcp", hp, b.tlsConfig) l, err = tls.Listen("tcp", hp, b.tlsConfig)
brokerLog.Info("Start TLS Listening client on ", zap.String("hp", hp)) log.Info("Start TLS Listening client on ", zap.String("hp", hp))
} else { } else {
hp := b.config.Host + ":" + b.config.Port hp := b.config.Host + ":" + b.config.Port
l, err = net.Listen("tcp", hp) l, err = net.Listen("tcp", hp)
brokerLog.Info("Start Listening client on ", zap.String("hp", hp)) log.Info("Start Listening client on ", zap.String("hp", hp))
} }
if err != nil { if err != nil {
brokerLog.Error("Error listening on ", zap.Error(err)) log.Error("Error listening on ", zap.Error(err))
return return
} }
tmpDelay := 10 * ACCEPT_MIN_SLEEP tmpDelay := 10 * ACCEPT_MIN_SLEEP
@@ -213,7 +200,7 @@ func (b *Broker) StartClientListening(Tls bool) {
conn, err := l.Accept() conn, err := l.Accept()
if err != nil { if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() { if ne, ok := err.(net.Error); ok && ne.Temporary() {
brokerLog.Error("Temporary Client Accept Error(%v), sleeping %dms", log.Error("Temporary Client Accept Error(%v), sleeping %dms",
zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond)) zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond))
time.Sleep(tmpDelay) time.Sleep(tmpDelay)
tmpDelay *= 2 tmpDelay *= 2
@@ -221,13 +208,13 @@ func (b *Broker) StartClientListening(Tls bool) {
tmpDelay = ACCEPT_MAX_SLEEP tmpDelay = ACCEPT_MAX_SLEEP
} }
} else { } else {
brokerLog.Error("Accept error: %v", zap.Error(err)) log.Error("Accept error: %v", zap.Error(err))
} }
continue continue
} }
tmpDelay = ACCEPT_MIN_SLEEP tmpDelay = ACCEPT_MIN_SLEEP
idx := atomic.AddUint64(&b.cid, 1) atomic.AddUint64(&b.cid, 1)
go b.handleConnection(CLIENT, conn, idx) go b.handleConnection(CLIENT, conn)
} }
} }
@@ -240,7 +227,7 @@ func (b *Broker) Handshake(conn net.Conn) bool {
// Force handshake // Force handshake
if err := nc.Handshake(); err != nil { if err := nc.Handshake(); err != nil {
brokerLog.Error("TLS handshake error, ", zap.Error(err)) log.Error("TLS handshake error, ", zap.Error(err))
return false return false
} }
nc.SetReadDeadline(time.Time{}) nc.SetReadDeadline(time.Time{})
@@ -256,18 +243,18 @@ func TlsTimeout(conn *tls.Conn) {
} }
cs := nc.ConnectionState() cs := nc.ConnectionState()
if !cs.HandshakeComplete { if !cs.HandshakeComplete {
brokerLog.Error("TLS handshake timeout") log.Error("TLS handshake timeout")
nc.Close() nc.Close()
} }
} }
func (b *Broker) StartClusterListening() { func (b *Broker) StartClusterListening() {
var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port
brokerLog.Info("Start Listening cluster on ", zap.String("hp", hp)) log.Info("Start Listening cluster on ", zap.String("hp", hp))
l, e := net.Listen("tcp", hp) l, e := net.Listen("tcp", hp)
if e != nil { if e != nil {
brokerLog.Error("Error listening on ", zap.Error(e)) log.Error("Error listening on ", zap.Error(e))
return return
} }
@@ -276,7 +263,7 @@ func (b *Broker) StartClusterListening() {
conn, err := l.Accept() conn, err := l.Accept()
if err != nil { if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() { if ne, ok := err.(net.Error); ok && ne.Temporary() {
brokerLog.Error("Temporary Client Accept Error(%v), sleeping %dms", log.Error("Temporary Client Accept Error(%v), sleeping %dms",
zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond)) zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond))
time.Sleep(tmpDelay) time.Sleep(tmpDelay)
tmpDelay *= 2 tmpDelay *= 2
@@ -284,30 +271,30 @@ func (b *Broker) StartClusterListening() {
tmpDelay = ACCEPT_MAX_SLEEP tmpDelay = ACCEPT_MAX_SLEEP
} }
} else { } else {
brokerLog.Error("Accept error: %v", zap.Error(err)) log.Error("Accept error: %v", zap.Error(err))
} }
continue continue
} }
tmpDelay = ACCEPT_MIN_SLEEP tmpDelay = ACCEPT_MIN_SLEEP
idx := atomic.AddUint64(&b.cid, 1)
go b.handleConnection(ROUTER, conn, idx) go b.handleConnection(ROUTER, conn)
} }
} }
func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { func (b *Broker) handleConnection(typ int, conn net.Conn) {
//process connect packet //process connect packet
packet, err := packets.ReadPacket(conn) packet, err := packets.ReadPacket(conn)
if err != nil { if err != nil {
brokerLog.Error("read connect packet error: ", zap.Error(err)) log.Error("read connect packet error: ", zap.Error(err))
return return
} }
if packet == nil { if packet == nil {
brokerLog.Error("received nil packet") log.Error("received nil packet")
return return
} }
msg, ok := packet.(*packets.ConnectPacket) msg, ok := packet.(*packets.ConnectPacket)
if !ok { if !ok {
brokerLog.Error("received msg that was not Connect") log.Error("received msg that was not Connect")
return return
} }
connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket) connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket)
@@ -315,7 +302,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
connack.SessionPresent = msg.CleanSession connack.SessionPresent = msg.CleanSession
err = connack.Write(conn) err = connack.Write(conn)
if err != nil { if err != nil {
brokerLog.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier)) log.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier))
return return
} }
@@ -355,7 +342,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
case CLIENT: case CLIENT:
old, exist = b.clients.Load(cid) old, exist = b.clients.Load(cid)
if exist { if exist {
brokerLog.Warn("client exist, close old...", zap.String("clientID", c.info.clientID)) log.Warn("client exist, close old...", zap.String("clientID", c.info.clientID))
ol, ok := old.(*client) ol, ok := old.(*client)
if ok { if ok {
ol.Close() ol.Close()
@@ -365,7 +352,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
case ROUTER: case ROUTER:
old, exist = b.routes.Load(cid) old, exist = b.routes.Load(cid)
if exist { if exist {
brokerLog.Warn("router exist, close old...") log.Warn("router exist, close old...")
ol, ok := old.(*client) ol, ok := old.(*client)
if ok { if ok {
ol.Close() ol.Close()
@@ -374,9 +361,9 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
b.routes.Store(cid, c) b.routes.Store(cid, c)
} }
mpool := b.messagePool[idx%MessagePoolNum] // mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum]
c.readLoop(mpool) c.readLoop()
} }
func (b *Broker) ConnectToDiscovery() { func (b *Broker) ConnectToDiscovery() {
@@ -386,8 +373,8 @@ func (b *Broker) ConnectToDiscovery() {
for { for {
conn, err = net.Dial("tcp", b.config.Router) conn, err = net.Dial("tcp", b.config.Router)
if err != nil { if err != nil {
brokerLog.Error("Error trying to connect to route: ", zap.Error(err)) log.Error("Error trying to connect to route: ", zap.Error(err))
brokerLog.Debug("Connect to route timeout ,retry...") log.Debug("Connect to route timeout ,retry...")
if 0 == tempDelay { if 0 == tempDelay {
tempDelay = 1 * time.Second tempDelay = 1 * time.Second
@@ -403,7 +390,7 @@ func (b *Broker) ConnectToDiscovery() {
} }
break break
} }
brokerLog.Debug("connect to router success :", zap.String("Router", b.config.Router)) log.Debug("connect to router success :", zap.String("Router", b.config.Router))
cid := b.id cid := b.id
info := info{ info := info{
@@ -423,15 +410,15 @@ func (b *Broker) ConnectToDiscovery() {
c.SendConnect() c.SendConnect()
c.SendInfo() c.SendInfo()
go c.readLoop(b.clusterChannel) go c.readLoop()
go c.StartPing() go c.StartPing()
} }
func (b *Broker) processClusterInfo() { func (b *Broker) processClusterInfo() {
for { for {
msg, ok := <-b.clusterChannel msg, ok := <-b.clusterPool
if !ok { if !ok {
brokerLog.Error("read message from cluster channel error") log.Error("read message from cluster channel error")
return return
} }
ProcessMessage(msg) ProcessMessage(msg)
@@ -453,13 +440,13 @@ func (b *Broker) connectRouter(id, addr string) {
conn, err = net.Dial("tcp", addr) conn, err = net.Dial("tcp", addr)
if err != nil { if err != nil {
brokerLog.Error("Error trying to connect to route: ", zap.Error(err)) log.Error("Error trying to connect to route: ", zap.Error(err))
if retryTimes > 50 { if retryTimes > 50 {
return return
} }
brokerLog.Debug("Connect to route timeout ,retry...") log.Debug("Connect to route timeout ,retry...")
if 0 == timeDelay { if 0 == timeDelay {
timeDelay = 1 * time.Second timeDelay = 1 * time.Second
@@ -499,10 +486,8 @@ func (b *Broker) connectRouter(id, addr string) {
c.SendConnect() c.SendConnect()
idx := atomic.AddUint64(&b.cid, 1) // mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum]
mpool := b.messagePool[idx%MessagePoolNum] go c.readLoop()
go c.readLoop(mpool)
go c.StartPing() go c.StartPing()
} }
@@ -561,7 +546,7 @@ func (b *Broker) SendLocalSubsToRouter(c *client) {
if len(subInfo.Topics) > 0 { if len(subInfo.Topics) > 0 {
err := c.WriterPacket(subInfo) err := c.WriterPacket(subInfo)
if err != nil { if err != nil {
brokerLog.Error("Send localsubs To Router error :", zap.Error(err)) log.Error("Send localsubs To Router error :", zap.Error(err))
} }
} }
} }
@@ -578,7 +563,7 @@ func (b *Broker) BroadcastInfoMessage(remoteID string, msg *packets.PublishPacke
return true return true
}) })
// brokerLog.Info("BroadcastInfoMessage success ") // log.Info("BroadcastInfoMessage success ")
} }
func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) { func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) {
@@ -590,7 +575,7 @@ func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) {
} }
return true return true
}) })
// brokerLog.Info("BroadcastSubscribeMessage remotes: ", s.remotes) // log.Info("BroadcastSubscribeMessage remotes: ", s.remotes)
} }
func (b *Broker) removeClient(c *client) { func (b *Broker) removeClient(c *client) {
@@ -604,7 +589,7 @@ func (b *Broker) removeClient(c *client) {
case REMOTE: case REMOTE:
b.remotes.Delete(clientId) b.remotes.Delete(clientId)
} }
// brokerLog.Info("delete client ,", clientId) // log.Info("delete client ,", clientId)
} }
func (b *Broker) PublishMessage(packet *packets.PublishPacket) { func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
@@ -618,7 +603,7 @@ func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
if sub != nil { if sub != nil {
err := sub.client.WriterPacket(packet) err := sub.client.WriterPacket(packet)
if err != nil { if err != nil {
brokerLog.Error("process message for psub error, ", zap.Error(err)) log.Error("process message for psub error, ", zap.Error(err))
} }
} }
} }

View File

@@ -3,6 +3,8 @@
package broker package broker
import ( import (
"context"
"errors"
"net" "net"
"reflect" "reflect"
"strings" "strings"
@@ -30,17 +32,18 @@ const (
) )
type client struct { type client struct {
typ int typ int
mu sync.Mutex mu sync.Mutex
broker *Broker broker *Broker
conn net.Conn conn net.Conn
info info info info
route route route route
status int status int
closed chan int smu sync.RWMutex
smu sync.RWMutex subs map[string]*subscription
subs map[string]*subscription rsubs map[string]*subInfo
rsubs map[string]*subInfo ctx context.Context
cancelFunc context.CancelFunc
} }
type subInfo struct { type subInfo struct {
@@ -78,16 +81,18 @@ func (c *client) init() {
c.smu.Lock() c.smu.Lock()
defer c.smu.Unlock() defer c.smu.Unlock()
c.status = Connected c.status = Connected
c.closed = make(chan int, 1)
c.rsubs = make(map[string]*subInfo) c.rsubs = make(map[string]*subInfo)
c.subs = make(map[string]*subscription, 10) c.subs = make(map[string]*subscription, 10)
c.info.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0] c.info.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0]
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0] c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
} }
func (c *client) keepAlive(ch chan int, mpool chan *Message) { func (c *client) keepAlive(ch chan int) {
defer close(ch) defer close(ch)
b := c.broker
keepalive := time.Duration(c.info.keepalive*3/2) * time.Second keepalive := time.Duration(c.info.keepalive*3/2) * time.Second
timer := time.NewTimer(keepalive) timer := time.NewTimer(keepalive)
@@ -100,46 +105,52 @@ func (c *client) keepAlive(ch chan int, mpool chan *Message) {
timer.Reset(keepalive) timer.Reset(keepalive)
continue continue
} }
brokerLog.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive)) log.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive))
msg := &Message{client: c, packet: DisconnectdPacket} msg := &Message{client: c, packet: DisconnectdPacket}
mpool <- msg b.SubmitWork(msg)
timer.Stop() timer.Stop()
return return
case _, ok := <-c.closed: case <-c.ctx.Done():
if !ok { return
return
}
} }
} }
} }
func (c *client) readLoop(mpool chan *Message) { func (c *client) readLoop() {
nc := c.conn nc := c.conn
if nc == nil || mpool == nil { b := c.broker
if nc == nil || b == nil {
return return
} }
ch := make(chan int, 1000) ch := make(chan int, 1000)
go c.keepAlive(ch, mpool) go c.keepAlive(ch)
for { for {
packet, err := packets.ReadPacket(nc) select {
if err != nil { case <-c.ctx.Done():
brokerLog.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) return
break default:
} packet, err := packets.ReadPacket(nc)
// keepalive channel if err != nil {
ch <- 1 log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
msg := &Message{client: c, packet: DisconnectdPacket}
b.SubmitWork(msg)
return
}
// keepalive channel
ch <- 1
msg := &Message{ msg := &Message{
client: c, client: c,
packet: packet, packet: packet,
}
b.SubmitWork(msg)
} }
mpool <- msg
} }
msg := &Message{client: c, packet: DisconnectdPacket}
mpool <- msg
} }
func ProcessMessage(msg *Message) { func ProcessMessage(msg *Message) {
@@ -149,7 +160,7 @@ func ProcessMessage(msg *Message) {
return return
} }
brokerLog.Debug("Recv message:", zap.String("message type", reflect.TypeOf(msg.packet).String()[9:]), zap.String("ClientID", c.info.clientID)) log.Debug("Recv message:", zap.String("message type", reflect.TypeOf(msg.packet).String()[9:]), zap.String("ClientID", c.info.clientID))
switch ca.(type) { switch ca.(type) {
case *packets.ConnackPacket: case *packets.ConnackPacket:
case *packets.ConnectPacket: case *packets.ConnectPacket:
@@ -174,7 +185,7 @@ func ProcessMessage(msg *Message) {
case *packets.DisconnectPacket: case *packets.DisconnectPacket:
c.Close() c.Close()
default: default:
brokerLog.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID)) log.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID))
} }
} }
@@ -190,7 +201,7 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
} }
if !c.CheckTopicAuth(PUB, topic) { if !c.CheckTopicAuth(PUB, topic) {
brokerLog.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) log.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
return return
} }
@@ -201,21 +212,21 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket) puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
puback.MessageID = packet.MessageID puback.MessageID = packet.MessageID
if err := c.WriterPacket(puback); err != nil { if err := c.WriterPacket(puback); err != nil {
brokerLog.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return return
} }
c.ProcessPublishMessage(packet) c.ProcessPublishMessage(packet)
case QosExactlyOnce: case QosExactlyOnce:
return return
default: default:
brokerLog.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID)) log.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID))
return return
} }
if packet.Retain { if packet.Retain {
if b := c.broker; b != nil { if b := c.broker; b != nil {
err := b.rl.Insert(topic, packet) err := b.rl.Insert(topic, packet)
if err != nil { if err != nil {
brokerLog.Error("Insert Retain Message error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("Insert Retain Message error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
} }
} }
} }
@@ -235,7 +246,11 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
topic := packet.TopicName topic := packet.TopicName
r := b.sl.Match(topic) r := b.sl.Match(topic)
// brokerLog.Info("psubs num: ", len(r.psubs)) if r == nil {
return
}
// log.Info("psubs num: ", len(r.psubs))
if len(r.qsubs) == 0 && len(r.psubs) == 0 { if len(r.qsubs) == 0 && len(r.psubs) == 0 {
return return
} }
@@ -249,7 +264,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if sub != nil { if sub != nil {
err := sub.client.WriterPacket(packet) err := sub.client.WriterPacket(packet)
if err != nil { if err != nil {
brokerLog.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
} }
} }
} }
@@ -259,7 +274,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
t := "$queue/" + topic t := "$queue/" + topic
cnt, exist := b.queues[t] cnt, exist := b.queues[t]
if exist { if exist {
// brokerLog.Info("queue index : ", cnt) // log.Info("queue index : ", cnt)
for _, sub := range r.qsubs { for _, sub := range r.qsubs {
if sub.client.typ == ROUTER { if sub.client.typ == ROUTER {
if typ != CLIENT { if typ != CLIENT {
@@ -275,7 +290,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if sub != nil { if sub != nil {
err := sub.client.WriterPacket(packet) err := sub.client.WriterPacket(packet)
if err != nil { if err != nil {
brokerLog.Error("send publish error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("send publish error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
} }
} }
@@ -329,7 +344,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
t := topic t := topic
//check topic auth for client //check topic auth for client
if !c.CheckTopicAuth(SUB, topic) { if !c.CheckTopicAuth(SUB, topic) {
brokerLog.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID)) log.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
retcodes = append(retcodes, QosFailure) retcodes = append(retcodes, QosFailure)
continue continue
} }
@@ -376,7 +391,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
} }
err := b.sl.Insert(sub) err := b.sl.Insert(sub)
if err != nil { if err != nil {
brokerLog.Error("Insert subscription error: ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("Insert subscription error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
retcodes = append(retcodes, QosFailure) retcodes = append(retcodes, QosFailure)
} else { } else {
retcodes = append(retcodes, qoss[i]) retcodes = append(retcodes, qoss[i])
@@ -386,7 +401,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
err := c.WriterPacket(suback) err := c.WriterPacket(suback)
if err != nil { if err != nil {
brokerLog.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return return
} }
//broadcast subscribe message //broadcast subscribe message
@@ -397,8 +412,11 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
//process retain message //process retain message
for _, t := range topics { for _, t := range topics {
packets := b.rl.Match(t) packets := b.rl.Match(t)
if packets == nil {
continue
}
for _, packet := range packets { for _, packet := range packets {
brokerLog.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID)) log.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID))
if packet != nil { if packet != nil {
c.WriterPacket(packet) c.WriterPacket(packet)
} }
@@ -445,7 +463,7 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
err := c.WriterPacket(unsuback) err := c.WriterPacket(unsuback)
if err != nil { if err != nil {
brokerLog.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return return
} }
// //process ubsubscribe message // //process ubsubscribe message
@@ -474,7 +492,7 @@ func (c *client) ProcessPing() {
resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket) resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket)
err := c.WriterPacket(resp) err := c.WriterPacket(resp)
if err != nil { if err != nil {
brokerLog.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return return
} }
} }
@@ -485,9 +503,13 @@ func (c *client) Close() {
c.smu.Unlock() c.smu.Unlock()
return return
} }
c.cancelFunc()
c.status = Disconnected
//wait for message complete //wait for message complete
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
c.status = Disconnected // c.status = Disconnected
if c.conn != nil { if c.conn != nil {
c.conn.Close() c.conn.Close()
@@ -496,8 +518,6 @@ func (c *client) Close() {
c.smu.Unlock() c.smu.Unlock()
close(c.closed)
b := c.broker b := c.broker
subs := c.subs subs := c.subs
if b != nil { if b != nil {
@@ -505,7 +525,7 @@ func (c *client) Close() {
for _, sub := range subs { for _, sub := range subs {
err := b.sl.Remove(sub) err := b.sl.Remove(sub)
if err != nil { if err != nil {
brokerLog.Error("closed client but remove sublist error, ", zap.Error(err), zap.String("ClientID", c.info.clientID)) log.Error("closed client but remove sublist error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
} }
} }
if c.typ == CLIENT { if c.typ == CLIENT {
@@ -527,9 +547,17 @@ func (c *client) Close() {
} }
func (c *client) WriterPacket(packet packets.ControlPacket) error { func (c *client) WriterPacket(packet packets.ControlPacket) error {
if c.status == Disconnected {
return nil
}
if packet == nil { if packet == nil {
return nil return nil
} }
if c.conn == nil {
c.Close()
return errors.New("connect lost ....")
}
c.mu.Lock() c.mu.Lock()
err := packet.Write(c.conn) err := packet.Write(c.conn)

View File

@@ -9,9 +9,10 @@ import (
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
"io/ioutil" "github.com/fhmq/hmq/logger"
"go.uber.org/zap" "go.uber.org/zap"
"io/ioutil"
"os"
) )
type Config struct { type Config struct {
@@ -28,6 +29,7 @@ type Config struct {
TlsInfo TLSInfo `json:"tlsInfo"` TlsInfo TLSInfo `json:"tlsInfo"`
Acl bool `json:"acl"` Acl bool `json:"acl"`
AclConf string `json:"aclConf"` AclConf string `json:"aclConf"`
Debug bool `json:"-"`
} }
type RouteInfo struct { type RouteInfo struct {
@@ -49,30 +51,64 @@ var DefaultConfig *Config = &Config{
Acl: false, Acl: false,
} }
func ConfigureConfig() (*Config, error) { var (
log *zap.Logger
)
func showHelp() {
fmt.Printf("%s\n", usageStr)
os.Exit(0)
}
func ConfigureConfig(args []string) (*Config, error) {
config := &Config{} config := &Config{}
var ( var (
help bool
configFile string configFile string
) )
flag.IntVar(&config.Worker, "w", 1024, "worker num to process message, perfer (client num)/10.") fs := flag.NewFlagSet("hmq-broker", flag.ExitOnError)
flag.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.") fs.Usage = showHelp
flag.StringVar(&config.Port, "port", "1883", "Port to listen on.")
flag.StringVar(&config.Port, "p", "1883", "Port to listen on.") fs.BoolVar(&help, "h", false, "Show this message.")
flag.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on.") fs.BoolVar(&help, "help", false, "Show this message.")
flag.StringVar(&config.Host, "h", "0.0.0.0", "Network host to listen on.") fs.IntVar(&config.Worker, "w", 1024, "worker num to process message, perfer (client num)/10.")
flag.StringVar(&config.Cluster.Host, "cluster", "", "Cluster ip from which members can connect.") fs.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.")
flag.StringVar(&config.Cluster.Host, "cluster_listen", "", "Cluster ip from which members can connect.") fs.StringVar(&config.Port, "port", "1883", "Port to listen on.")
flag.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.") fs.StringVar(&config.Port, "p", "1883", "Port to listen on.")
flag.StringVar(&config.Cluster.Port, "cluster_port", "", "Cluster port from which members can connect.") fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on")
flag.StringVar(&config.Router, "r", "", "Router who maintenance cluster info") fs.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.")
flag.StringVar(&config.Router, "router", "", "Router who maintenance cluster info") fs.StringVar(&config.Cluster.Port, "clusterport", "", "Cluster port from which members can connect.")
flag.StringVar(&config.WsPort, "wsport", "", "port for ws to listen on") fs.StringVar(&config.Router, "r", "", "Router who maintenance cluster info")
flag.StringVar(&config.WsPort, "ws_port", "", "port for ws to listen on") fs.StringVar(&config.Router, "router", "", "Router who maintenance cluster info")
flag.StringVar(&config.WsPath, "wspath", "", "path for ws to listen on") fs.StringVar(&config.WsPort, "ws", "", "port for ws to listen on")
flag.StringVar(&config.WsPath, "ws_path", "", "path for ws to listen on") fs.StringVar(&config.WsPort, "wsport", "", "port for ws to listen on")
flag.StringVar(&configFile, "config", "", "config file for hmq") fs.StringVar(&config.WsPath, "wsp", "", "path for ws to listen on")
flag.StringVar(&configFile, "c", "", "config file for hmq") fs.StringVar(&config.WsPath, "wspath", "", "path for ws to listen on")
flag.Parse() fs.StringVar(&configFile, "config", "", "config file for hmq")
fs.StringVar(&configFile, "c", "", "config file for hmq")
fs.BoolVar(&config.Debug, "debug", false, "enable Debug logging.")
fs.BoolVar(&config.Debug, "d", false, "enable Debug logging.")
fs.Bool("D", true, "enable Debug logging.")
if err := fs.Parse(args); err != nil {
return nil, err
}
if help {
showHelp()
return nil, nil
}
fs.Visit(func(f *flag.Flag) {
switch f.Name {
case "D":
config.Debug = true
}
})
logger.InitLogger(config.Debug)
log = logger.Get().Named("Broker")
if configFile != "" { if configFile != "" {
tmpConfig, e := LoadConfig(configFile) tmpConfig, e := LoadConfig(configFile)
@@ -95,15 +131,15 @@ func LoadConfig(filename string) (*Config, error) {
content, err := ioutil.ReadFile(filename) content, err := ioutil.ReadFile(filename)
if err != nil { if err != nil {
brokerLog.Error("Read config file error: ", zap.Error(err)) log.Error("Read config file error: ", zap.Error(err))
return nil, err return nil, err
} }
// brokerLog.Info(string(content)) // log.Info(string(content))
var config Config var config Config
err = json.Unmarshal(content, &config) err = json.Unmarshal(content, &config)
if err != nil { if err != nil {
brokerLog.Error("Unmarshal config file error: ", zap.Error(err)) log.Error("Unmarshal config file error: ", zap.Error(err))
return nil, err return nil, err
} }
@@ -135,7 +171,7 @@ func (config *Config) check() error {
if config.TlsPort != "" { if config.TlsPort != "" {
if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" { if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" {
brokerLog.Error("tls config error, no cert or key file.") log.Error("tls config error, no cert or key file.")
return errors.New("tls config error, no cert or key file.") return errors.New("tls config error, no cert or key file.")
} }
if config.TlsHost == "" { if config.TlsHost == "" {

View File

@@ -6,10 +6,9 @@ import (
"fmt" "fmt"
"time" "time"
simplejson "github.com/bitly/go-simplejson"
"github.com/eclipse/paho.mqtt.golang/packets" "github.com/eclipse/paho.mqtt.golang/packets"
"go.uber.org/zap" "go.uber.org/zap"
simplejson "github.com/bitly/go-simplejson"
) )
func (c *client) SendInfo() { func (c *client) SendInfo() {
@@ -21,7 +20,7 @@ func (c *client) SendInfo() {
infoMsg := NewInfo(c.broker.id, url, false) infoMsg := NewInfo(c.broker.id, url, false)
err := c.WriterPacket(infoMsg) err := c.WriterPacket(infoMsg)
if err != nil { if err != nil {
brokerLog.Error("send info message error, ", zap.Error(err)) log.Error("send info message error, ", zap.Error(err))
return return
} }
} }
@@ -34,13 +33,11 @@ func (c *client) StartPing() {
case <-timeTicker.C: case <-timeTicker.C:
err := c.WriterPacket(ping) err := c.WriterPacket(ping)
if err != nil { if err != nil {
brokerLog.Error("ping error: ", zap.Error(err)) log.Error("ping error: ", zap.Error(err))
c.Close() c.Close()
} }
case _, ok := <-c.closed: case <-c.ctx.Done():
if !ok { return
return
}
} }
} }
} }
@@ -57,10 +54,10 @@ func (c *client) SendConnect() {
m.Keepalive = uint16(60) m.Keepalive = uint16(60)
err := c.WriterPacket(m) err := c.WriterPacket(m)
if err != nil { if err != nil {
brokerLog.Error("send connect message error, ", zap.Error(err)) log.Error("send connect message error, ", zap.Error(err))
return return
} }
brokerLog.Info("send connect success") log.Info("send connect success")
} }
func NewInfo(sid, url string, isforword bool) *packets.PublishPacket { func NewInfo(sid, url string, isforword bool) *packets.PublishPacket {
@@ -69,7 +66,7 @@ func NewInfo(sid, url string, isforword bool) *packets.PublishPacket {
pub.TopicName = BrokerInfoTopic pub.TopicName = BrokerInfoTopic
pub.Retain = false pub.Retain = false
info := fmt.Sprintf(`{"brokerID":"%s","brokerUrl":"%s"}`, sid, url) info := fmt.Sprintf(`{"brokerID":"%s","brokerUrl":"%s"}`, sid, url)
// brokerLog.Info("new info", string(info)) // log.Info("new info", string(info))
pub.Payload = []byte(info) pub.Payload = []byte(info)
return pub return pub
} }
@@ -81,17 +78,17 @@ func (c *client) ProcessInfo(packet *packets.PublishPacket) {
return return
} }
brokerLog.Info("recv remoteInfo: ", zap.String("payload", string(packet.Payload))) log.Info("recv remoteInfo: ", zap.String("payload", string(packet.Payload)))
js, err := simplejson.NewJson(packet.Payload) js, err := simplejson.NewJson(packet.Payload)
if err != nil { if err != nil {
brokerLog.Warn("parse info message err", zap.Error(err)) log.Warn("parse info message err", zap.Error(err))
return return
} }
routes, err := js.Get("data").Map() routes, err := js.Get("data").Map()
if routes == nil { if routes == nil {
brokerLog.Error("receive info message error, ", zap.Error(err)) log.Error("receive info message error, ", zap.Error(err))
return return
} }

View File

@@ -1,9 +1,8 @@
package broker package broker
import ( import (
"sync"
"github.com/eclipse/paho.mqtt.golang/packets" "github.com/eclipse/paho.mqtt.golang/packets"
"sync"
) )
type RetainList struct { type RetainList struct {
@@ -39,7 +38,7 @@ func (r *RetainList) Insert(topic string, buf *packets.PublishPacket) error {
if err != nil { if err != nil {
return err return err
} }
// brokerLog.Info("insert tokens:", tokens) // log.Info("insert tokens:", tokens)
r.Lock() r.Lock()
l := r.root l := r.root
@@ -72,7 +71,7 @@ func (r *RetainList) Match(topic string) []*packets.PublishPacket {
l := r.root l := r.root
matchRLevel(l, tokens, results) matchRLevel(l, tokens, results)
r.Unlock() r.Unlock()
// brokerLog.Info("results: ", results) // log.Info("results: ", results)
return results.msg return results.msg
} }
@@ -82,7 +81,7 @@ func matchRLevel(l *rlevel, toks []string, results *RetainResult) {
if l == nil { if l == nil {
return return
} }
// brokerLog.Info("l info :", l.nodes) // log.Info("l info :", l.nodes)
if t == "#" { if t == "#" {
for _, n := range l.nodes { for _, n := range l.nodes {
n.GetAll(results) n.GetAll(results)
@@ -111,7 +110,7 @@ func matchRLevel(l *rlevel, toks []string, results *RetainResult) {
} }
func (r *rnode) GetAll(results *RetainResult) { func (r *rnode) GetAll(results *RetainResult) {
// brokerLog.Info("node 's message: ", string(r.msg)) // log.Info("node 's message: ", string(r.msg))
if r.msg != nil { if r.msg != nil {
results.msg = append(results.msg, r.msg) results.msg = append(results.msg, r.msg)
} }

View File

@@ -4,9 +4,8 @@ package broker
import ( import (
"errors" "errors"
"sync"
"go.uber.org/zap" "go.uber.org/zap"
"sync"
) )
// A result structure better optimized for queue subs. // A result structure better optimized for queue subs.
@@ -211,7 +210,7 @@ func (s *Sublist) Match(topic string) *SublistResult {
tokens, err := PublishTopicCheckAndSpilt(topic) tokens, err := PublishTopicCheckAndSpilt(topic)
if err != nil { if err != nil {
brokerLog.Error("\tserver/sublist.go: ", zap.Error(err)) log.Error("\tserver/sublist.go: ", zap.Error(err))
return nil return nil
} }

24
broker/usage.go Normal file
View File

@@ -0,0 +1,24 @@
package broker
var usageStr = `
Usage: hmq [options]
Broker Options:
-w, --worker <number> Worker num to process message, perfer (client num)/10. (default 1024)
-p, --port <port> Use port for clients (default: 1883)
--host <host> Network host to listen on. (default "0.0.0.0")
-ws, --wsport <port> Use port for websocket monitoring
-wsp,--wspath <path> Use path for websocket monitoring
-c, --config <file> Configuration file
Logging Options:
-d, --debug <bool> Enable debugging output (default false)
-D Debug and trace
Cluster Options:
-r, --router <rurl> Router who maintenance cluster info
-cp, --clusterport <cluster-port> Cluster listen port for others
Common Options:
-h, --help Show this message
`

View File

@@ -9,7 +9,6 @@ import (
var ( var (
// env can be setup at build time with Go Linker. Value could be prod or whatever else for dev env // env can be setup at build time with Go Linker. Value could be prod or whatever else for dev env
env string
instance *zap.Logger instance *zap.Logger
logCfg zap.Config logCfg zap.Config
) )
@@ -28,13 +27,13 @@ func NewProdLogger() (*zap.Logger, error) {
return logCfg.Build() return logCfg.Build()
} }
func init() { func InitLogger(debug bool) {
var err error var err error
var log *zap.Logger var log *zap.Logger
if env == "prod" { if debug {
log, err = NewProdLogger()
} else {
log, err = NewDevLogger() log, err = NewDevLogger()
} else {
log, err = NewProdLogger()
} }
if err != nil { if err != nil {
panic("Unable to create a logger.") panic("Unable to create a logger.")

18
main.go
View File

@@ -7,36 +7,30 @@ copyright notice and this permission notice appear in all copies.
package main package main
import ( import (
"fmt"
"github.com/fhmq/hmq/broker"
"os" "os"
"os/signal" "os/signal"
"runtime" "runtime"
"github.com/fhmq/hmq/broker"
"github.com/fhmq/hmq/logger"
"go.uber.org/zap"
)
var (
log = logger.Get().Named("Main")
) )
func main() { func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
config, err := broker.ConfigureConfig() config, err := broker.ConfigureConfig(os.Args[1:])
if err != nil { if err != nil {
log.Error("configure broker config error: ", zap.Error(err)) fmt.Println("configure broker config error: ", err)
return return
} }
b, err := broker.NewBroker(config) b, err := broker.NewBroker(config)
if err != nil { if err != nil {
log.Error("New Broker error: ", zap.Error(err)) fmt.Println("New Broker error: ", err)
return return
} }
b.Start() b.Start()
s := waitForSignal() s := waitForSignal()
log.Info("signal received, broker closed.", zap.Any("signal", s)) fmt.Println("signal received, broker closed.", s)
} }
func waitForSignal() os.Signal { func waitForSignal() os.Signal {