mirror of
https://github.com/fhmq/hmq.git
synced 2026-05-04 07:08:32 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
026cfbe123 | ||
|
|
6f26df7a9a | ||
|
|
fd2adb7a26 | ||
|
|
c0fea6a5ba | ||
|
|
47500910e1 |
@@ -3,13 +3,10 @@
|
|||||||
package broker
|
package broker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/fhmq/hmq/lib/acl"
|
"github.com/fhmq/hmq/lib/acl"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"github.com/fsnotify/fsnotify"
|
"github.com/fsnotify/fsnotify"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
@@ -11,18 +11,17 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/fhmq/hmq/lib/acl"
|
|
||||||
|
|
||||||
"github.com/eclipse/paho.mqtt.golang/packets"
|
"github.com/eclipse/paho.mqtt.golang/packets"
|
||||||
|
"github.com/fhmq/hmq/lib/acl"
|
||||||
|
"github.com/fhmq/hmq/pool"
|
||||||
"github.com/shirou/gopsutil/mem"
|
"github.com/shirou/gopsutil/mem"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"golang.org/x/net/websocket"
|
"golang.org/x/net/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
const (
|
||||||
log *zap.Logger
|
MessagePoolNum = 1024
|
||||||
messagePoolQueueSize = 4096
|
MessagePoolMessageNum = 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
@@ -37,26 +36,38 @@ type Broker struct {
|
|||||||
config *Config
|
config *Config
|
||||||
tlsConfig *tls.Config
|
tlsConfig *tls.Config
|
||||||
AclConfig *acl.ACLConfig
|
AclConfig *acl.ACLConfig
|
||||||
dispatcher *Dispatcher
|
wpool *pool.WorkerPool
|
||||||
clients sync.Map
|
clients sync.Map
|
||||||
routes sync.Map
|
routes sync.Map
|
||||||
remotes sync.Map
|
remotes sync.Map
|
||||||
nodes map[string]interface{}
|
nodes map[string]interface{}
|
||||||
|
clusterPool chan *Message
|
||||||
sl *Sublist
|
sl *Sublist
|
||||||
rl *RetainList
|
rl *RetainList
|
||||||
queues map[string]int
|
queues map[string]int
|
||||||
|
// messagePool []chan *Message
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBroker(config *Config, logger *zap.Logger) (*Broker, error) {
|
func newMessagePool() []chan *Message {
|
||||||
log = logger
|
pool := make([]chan *Message, 0)
|
||||||
|
for i := 0; i < MessagePoolNum; i++ {
|
||||||
|
ch := make(chan *Message, MessagePoolMessageNum)
|
||||||
|
pool = append(pool, ch)
|
||||||
|
}
|
||||||
|
return pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBroker(config *Config) (*Broker, error) {
|
||||||
b := &Broker{
|
b := &Broker{
|
||||||
id: GenUniqueId(),
|
id: GenUniqueId(),
|
||||||
config: config,
|
config: config,
|
||||||
dispatcher: NewDispatcher(),
|
wpool: pool.New(config.Worker),
|
||||||
sl: NewSublist(),
|
sl: NewSublist(),
|
||||||
rl: NewRetainList(),
|
rl: NewRetainList(),
|
||||||
nodes: make(map[string]interface{}),
|
nodes: make(map[string]interface{}),
|
||||||
queues: make(map[string]int),
|
queues: make(map[string]int),
|
||||||
|
clusterPool: make(chan *Message),
|
||||||
|
// messagePool: newMessagePool(),
|
||||||
}
|
}
|
||||||
if b.config.TlsPort != "" {
|
if b.config.TlsPort != "" {
|
||||||
tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
|
tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
|
||||||
@@ -78,8 +89,18 @@ func NewBroker(config *Config, logger *zap.Logger) (*Broker, error) {
|
|||||||
return b, nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Broker) DispatchMessage(msg *Message) {
|
func (b *Broker) SubmitWork(msg *Message) {
|
||||||
b.dispatcher.Dispatch(msg)
|
if b.wpool == nil {
|
||||||
|
b.wpool = pool.New(b.config.Worker)
|
||||||
|
}
|
||||||
|
|
||||||
|
if msg.client.typ == CLUSTER {
|
||||||
|
b.clusterPool <- msg
|
||||||
|
} else {
|
||||||
|
b.wpool.Submit(func() {
|
||||||
|
ProcessMessage(msg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -111,6 +132,7 @@ func (b *Broker) Start() {
|
|||||||
|
|
||||||
//connect on other node in cluster
|
//connect on other node in cluster
|
||||||
if b.config.Router != "" {
|
if b.config.Router != "" {
|
||||||
|
go b.processClusterInfo()
|
||||||
b.ConnectToDiscovery()
|
b.ConnectToDiscovery()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -153,7 +175,7 @@ func (b *Broker) wsHandler(ws *websocket.Conn) {
|
|||||||
// io.Copy(ws, ws)
|
// io.Copy(ws, ws)
|
||||||
atomic.AddUint64(&b.cid, 1)
|
atomic.AddUint64(&b.cid, 1)
|
||||||
ws.PayloadType = websocket.BinaryFrame
|
ws.PayloadType = websocket.BinaryFrame
|
||||||
b.handleConnection(CLIENT, ws, b.cid)
|
b.handleConnection(CLIENT, ws)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Broker) StartClientListening(Tls bool) {
|
func (b *Broker) StartClientListening(Tls bool) {
|
||||||
@@ -192,7 +214,7 @@ func (b *Broker) StartClientListening(Tls bool) {
|
|||||||
}
|
}
|
||||||
tmpDelay = ACCEPT_MIN_SLEEP
|
tmpDelay = ACCEPT_MIN_SLEEP
|
||||||
atomic.AddUint64(&b.cid, 1)
|
atomic.AddUint64(&b.cid, 1)
|
||||||
go b.handleConnection(CLIENT, conn, b.cid)
|
go b.handleConnection(CLIENT, conn)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -236,7 +258,6 @@ func (b *Broker) StartClusterListening() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var idx uint64 = 0
|
|
||||||
tmpDelay := 10 * ACCEPT_MIN_SLEEP
|
tmpDelay := 10 * ACCEPT_MIN_SLEEP
|
||||||
for {
|
for {
|
||||||
conn, err := l.Accept()
|
conn, err := l.Accept()
|
||||||
@@ -256,11 +277,11 @@ func (b *Broker) StartClusterListening() {
|
|||||||
}
|
}
|
||||||
tmpDelay = ACCEPT_MIN_SLEEP
|
tmpDelay = ACCEPT_MIN_SLEEP
|
||||||
|
|
||||||
go b.handleConnection(ROUTER, conn, idx)
|
go b.handleConnection(ROUTER, conn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
|
func (b *Broker) handleConnection(typ int, conn net.Conn) {
|
||||||
//process connect packet
|
//process connect packet
|
||||||
packet, err := packets.ReadPacket(conn)
|
packet, err := packets.ReadPacket(conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -340,6 +361,8 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
|
|||||||
b.routes.Store(cid, c)
|
b.routes.Store(cid, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum]
|
||||||
|
|
||||||
c.readLoop()
|
c.readLoop()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -391,6 +414,18 @@ func (b *Broker) ConnectToDiscovery() {
|
|||||||
go c.StartPing()
|
go c.StartPing()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Broker) processClusterInfo() {
|
||||||
|
for {
|
||||||
|
msg, ok := <-b.clusterPool
|
||||||
|
if !ok {
|
||||||
|
log.Error("read message from cluster channel error")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ProcessMessage(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (b *Broker) connectRouter(id, addr string) {
|
func (b *Broker) connectRouter(id, addr string) {
|
||||||
var conn net.Conn
|
var conn net.Conn
|
||||||
var err error
|
var err error
|
||||||
@@ -451,6 +486,7 @@ func (b *Broker) connectRouter(id, addr string) {
|
|||||||
|
|
||||||
c.SendConnect()
|
c.SendConnect()
|
||||||
|
|
||||||
|
// mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum]
|
||||||
go c.readLoop()
|
go c.readLoop()
|
||||||
go c.StartPing()
|
go c.StartPing()
|
||||||
|
|
||||||
|
|||||||
@@ -88,6 +88,9 @@ func (c *client) init() {
|
|||||||
|
|
||||||
func (c *client) keepAlive(ch chan int) {
|
func (c *client) keepAlive(ch chan int) {
|
||||||
defer close(ch)
|
defer close(ch)
|
||||||
|
|
||||||
|
b := c.broker
|
||||||
|
|
||||||
keepalive := time.Duration(c.info.keepalive*3/2) * time.Second
|
keepalive := time.Duration(c.info.keepalive*3/2) * time.Second
|
||||||
timer := time.NewTimer(keepalive)
|
timer := time.NewTimer(keepalive)
|
||||||
|
|
||||||
@@ -101,7 +104,10 @@ func (c *client) keepAlive(ch chan int) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive))
|
log.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive))
|
||||||
c.broker.DispatchMessage(&Message{client: c, packet: DisconnectdPacket})
|
|
||||||
|
msg := &Message{client: c, packet: DisconnectdPacket}
|
||||||
|
b.SubmitWork(msg)
|
||||||
|
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
return
|
return
|
||||||
case _, ok := <-c.closed:
|
case _, ok := <-c.closed:
|
||||||
@@ -114,7 +120,8 @@ func (c *client) keepAlive(ch chan int) {
|
|||||||
|
|
||||||
func (c *client) readLoop() {
|
func (c *client) readLoop() {
|
||||||
nc := c.conn
|
nc := c.conn
|
||||||
if nc == nil {
|
b := c.broker
|
||||||
|
if nc == nil || b == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -130,13 +137,15 @@ func (c *client) readLoop() {
|
|||||||
// keepalive channel
|
// keepalive channel
|
||||||
ch <- 1
|
ch <- 1
|
||||||
|
|
||||||
c.broker.DispatchMessage(&Message{
|
msg := &Message{
|
||||||
client: c,
|
client: c,
|
||||||
packet: packet,
|
packet: packet,
|
||||||
})
|
}
|
||||||
|
b.SubmitWork(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.broker.DispatchMessage(&Message{client: c, packet: DisconnectdPacket})
|
msg := &Message{client: c, packet: DisconnectdPacket}
|
||||||
|
b.SubmitWork(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func ProcessMessage(msg *Message) {
|
func ProcessMessage(msg *Message) {
|
||||||
@@ -524,7 +533,7 @@ func (c *client) Close() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *client) WriterPacket(packet packets.ControlPacket) error {
|
func (c *client) WriterPacket(packet packets.ControlPacket) error {
|
||||||
if packet == nil {
|
if c == nil || packet == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,11 +9,10 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/fhmq/hmq/logger"
|
"github.com/fhmq/hmq/logger"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
@@ -52,6 +51,10 @@ var DefaultConfig *Config = &Config{
|
|||||||
Acl: false,
|
Acl: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
log *zap.Logger
|
||||||
|
)
|
||||||
|
|
||||||
func showHelp() {
|
func showHelp() {
|
||||||
fmt.Printf("%s\n", usageStr)
|
fmt.Printf("%s\n", usageStr)
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
|
|||||||
@@ -1,25 +0,0 @@
|
|||||||
package broker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Dispatcher will delegate ProcessMessage func to multiple goroutines
|
|
||||||
type Dispatcher struct {
|
|
||||||
workerPool *sync.Pool
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewDispatcher create a *Dispatcher instance
|
|
||||||
func NewDispatcher() *Dispatcher {
|
|
||||||
return &Dispatcher{workerPool: &sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
return NewWorker()
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dispatch a message to the workers
|
|
||||||
func (d *Dispatcher) Dispatch(message *Message) {
|
|
||||||
d.workerPool.Get().(Worker).WorkerChannel <- Work{WorkerPool: d.workerPool, Message: message}
|
|
||||||
}
|
|
||||||
@@ -4,12 +4,10 @@ package broker
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
simplejson "github.com/bitly/go-simplejson"
|
||||||
|
|
||||||
"github.com/eclipse/paho.mqtt.golang/packets"
|
"github.com/eclipse/paho.mqtt.golang/packets"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"time"
|
||||||
simplejson "github.com/bitly/go-simplejson"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *client) SendInfo() {
|
func (c *client) SendInfo() {
|
||||||
|
|||||||
@@ -1,9 +1,8 @@
|
|||||||
package broker
|
package broker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/eclipse/paho.mqtt.golang/packets"
|
"github.com/eclipse/paho.mqtt.golang/packets"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RetainList struct {
|
type RetainList struct {
|
||||||
|
|||||||
@@ -4,9 +4,8 @@ package broker
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A result structure better optimized for queue subs.
|
// A result structure better optimized for queue subs.
|
||||||
|
|||||||
@@ -1,28 +0,0 @@
|
|||||||
package broker
|
|
||||||
|
|
||||||
import "sync"
|
|
||||||
|
|
||||||
type Work struct {
|
|
||||||
WorkerPool *sync.Pool
|
|
||||||
Message *Message
|
|
||||||
}
|
|
||||||
|
|
||||||
type Worker struct {
|
|
||||||
WorkerChannel chan Work
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewWorker() Worker {
|
|
||||||
w := Worker{WorkerChannel: make(chan Work)}
|
|
||||||
return w.Start()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w Worker) Start() Worker {
|
|
||||||
go func() {
|
|
||||||
for work := range w.WorkerChannel {
|
|
||||||
ProcessMessage(work.Message)
|
|
||||||
// put the worker back
|
|
||||||
work.WorkerPool.Put(w)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return w
|
|
||||||
}
|
|
||||||
@@ -8,40 +8,43 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
logInstance *zap.Logger
|
// env can be setup at build time with Go Linker. Value could be prod or whatever else for dev env
|
||||||
|
instance *zap.Logger
|
||||||
|
logCfg zap.Config
|
||||||
)
|
)
|
||||||
|
|
||||||
// InitDevLogger instanciate a logger for dev builds
|
// NewDevLogger return a logger for dev builds
|
||||||
func InitDevLogger() {
|
func NewDevLogger() (*zap.Logger, error) {
|
||||||
logCfg := zap.NewDevelopmentConfig()
|
logCfg := zap.NewDevelopmentConfig()
|
||||||
logInstance, _ = logCfg.Build()
|
return logCfg.Build()
|
||||||
}
|
}
|
||||||
|
|
||||||
// InitProdLogger instanciate a logger for production builds
|
// NewProdLogger return a logger for production builds
|
||||||
func InitProdLogger() {
|
func NewProdLogger() (*zap.Logger, error) {
|
||||||
logCfg := zap.NewProductionConfig()
|
logCfg := zap.NewProductionConfig()
|
||||||
logCfg.DisableStacktrace = true
|
logCfg.DisableStacktrace = true
|
||||||
logCfg.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
|
logCfg.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
|
||||||
logInstance, _ = logCfg.Build()
|
return logCfg.Build()
|
||||||
}
|
}
|
||||||
|
|
||||||
func InitLogger(debug bool) {
|
func InitLogger(debug bool) {
|
||||||
var err error
|
var err error
|
||||||
|
var log *zap.Logger
|
||||||
if debug {
|
if debug {
|
||||||
InitDevLogger()
|
log, err = NewDevLogger()
|
||||||
} else {
|
} else {
|
||||||
InitProdLogger()
|
log, err = NewProdLogger()
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic("Unable to create a logger.")
|
panic("Unable to create a logger.")
|
||||||
}
|
}
|
||||||
logInstance.Debug("Logger initialization succeeded")
|
defer log.Sync()
|
||||||
|
|
||||||
|
log.Debug("Logger initialization succeeded")
|
||||||
|
instance = log.Named("hmq")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the existing *zap.Logger instance. If none have been created, it'll instanciate de dev logger
|
// Get return a *zap.Logger instance
|
||||||
func Get() *zap.Logger {
|
func Get() *zap.Logger {
|
||||||
if logInstance == nil {
|
return instance
|
||||||
InitDevLogger()
|
|
||||||
}
|
|
||||||
return logInstance
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,11 +19,15 @@ func TestGet(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestNewDevLogger(t *testing.T) {
|
func TestNewDevLogger(t *testing.T) {
|
||||||
InitDevLogger()
|
logger, err := NewDevLogger()
|
||||||
assert.True(t, Get().Core().Enabled(zap.DebugLevel))
|
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.True(t, logger.Core().Enabled(zap.DebugLevel))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewProdLogger(t *testing.T) {
|
func TestNewProdLogger(t *testing.T) {
|
||||||
InitProdLogger()
|
logger, err := NewProdLogger()
|
||||||
assert.False(t, Get().Core().Enabled(zap.DebugLevel))
|
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.False(t, logger.Core().Enabled(zap.DebugLevel))
|
||||||
}
|
}
|
||||||
|
|||||||
8
main.go
8
main.go
@@ -8,12 +8,10 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/fhmq/hmq/broker"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
|
||||||
"github.com/fhmq/hmq/broker"
|
|
||||||
"github.com/fhmq/hmq/logger"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@@ -23,8 +21,8 @@ func main() {
|
|||||||
fmt.Println("configure broker config error: ", err)
|
fmt.Println("configure broker config error: ", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logger.InitLogger(config.Debug)
|
|
||||||
b, err := broker.NewBroker(config, logger.Get())
|
b, err := broker.NewBroker(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("New Broker error: ", err)
|
fmt.Println("New Broker error: ", err)
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user