5 Commits

Author SHA1 Message Date
zhouyuyan
026cfbe123 wpool 2018-03-02 13:29:18 +08:00
zhouyuyan
6f26df7a9a pool 2018-03-02 12:52:23 +08:00
zhouyuyan
fd2adb7a26 pool 2018-03-02 12:42:04 +08:00
zhouyuyan
c0fea6a5ba modify_message_pool 2018-02-24 13:19:43 +08:00
zhouyuyan
47500910e1 fix broker out painc 2018-02-06 11:01:06 +08:00
12 changed files with 128 additions and 135 deletions

View File

@@ -3,13 +3,10 @@
package broker
import (
"strings"
"github.com/fhmq/hmq/lib/acl"
"go.uber.org/zap"
"github.com/fsnotify/fsnotify"
"go.uber.org/zap"
"strings"
)
const (

View File

@@ -11,18 +11,17 @@ import (
"sync/atomic"
"time"
"github.com/fhmq/hmq/lib/acl"
"github.com/eclipse/paho.mqtt.golang/packets"
"github.com/fhmq/hmq/lib/acl"
"github.com/fhmq/hmq/pool"
"github.com/shirou/gopsutil/mem"
"go.uber.org/zap"
"golang.org/x/net/websocket"
)
var (
log *zap.Logger
messagePoolQueueSize = 4096
const (
MessagePoolNum = 1024
MessagePoolMessageNum = 1024
)
type Message struct {
@@ -31,32 +30,44 @@ type Message struct {
}
type Broker struct {
id string
cid uint64
mu sync.Mutex
config *Config
tlsConfig *tls.Config
AclConfig *acl.ACLConfig
dispatcher *Dispatcher
clients sync.Map
routes sync.Map
remotes sync.Map
nodes map[string]interface{}
sl *Sublist
rl *RetainList
queues map[string]int
id string
cid uint64
mu sync.Mutex
config *Config
tlsConfig *tls.Config
AclConfig *acl.ACLConfig
wpool *pool.WorkerPool
clients sync.Map
routes sync.Map
remotes sync.Map
nodes map[string]interface{}
clusterPool chan *Message
sl *Sublist
rl *RetainList
queues map[string]int
// messagePool []chan *Message
}
func NewBroker(config *Config, logger *zap.Logger) (*Broker, error) {
log = logger
func newMessagePool() []chan *Message {
pool := make([]chan *Message, 0)
for i := 0; i < MessagePoolNum; i++ {
ch := make(chan *Message, MessagePoolMessageNum)
pool = append(pool, ch)
}
return pool
}
func NewBroker(config *Config) (*Broker, error) {
b := &Broker{
id: GenUniqueId(),
config: config,
dispatcher: NewDispatcher(),
sl: NewSublist(),
rl: NewRetainList(),
nodes: make(map[string]interface{}),
queues: make(map[string]int),
id: GenUniqueId(),
config: config,
wpool: pool.New(config.Worker),
sl: NewSublist(),
rl: NewRetainList(),
nodes: make(map[string]interface{}),
queues: make(map[string]int),
clusterPool: make(chan *Message),
// messagePool: newMessagePool(),
}
if b.config.TlsPort != "" {
tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
@@ -78,8 +89,18 @@ func NewBroker(config *Config, logger *zap.Logger) (*Broker, error) {
return b, nil
}
func (b *Broker) DispatchMessage(msg *Message) {
b.dispatcher.Dispatch(msg)
func (b *Broker) SubmitWork(msg *Message) {
if b.wpool == nil {
b.wpool = pool.New(b.config.Worker)
}
if msg.client.typ == CLUSTER {
b.clusterPool <- msg
} else {
b.wpool.Submit(func() {
ProcessMessage(msg)
})
}
}
@@ -111,6 +132,7 @@ func (b *Broker) Start() {
//connect on other node in cluster
if b.config.Router != "" {
go b.processClusterInfo()
b.ConnectToDiscovery()
}
@@ -153,7 +175,7 @@ func (b *Broker) wsHandler(ws *websocket.Conn) {
// io.Copy(ws, ws)
atomic.AddUint64(&b.cid, 1)
ws.PayloadType = websocket.BinaryFrame
b.handleConnection(CLIENT, ws, b.cid)
b.handleConnection(CLIENT, ws)
}
func (b *Broker) StartClientListening(Tls bool) {
@@ -192,7 +214,7 @@ func (b *Broker) StartClientListening(Tls bool) {
}
tmpDelay = ACCEPT_MIN_SLEEP
atomic.AddUint64(&b.cid, 1)
go b.handleConnection(CLIENT, conn, b.cid)
go b.handleConnection(CLIENT, conn)
}
}
@@ -236,7 +258,6 @@ func (b *Broker) StartClusterListening() {
return
}
var idx uint64 = 0
tmpDelay := 10 * ACCEPT_MIN_SLEEP
for {
conn, err := l.Accept()
@@ -256,11 +277,11 @@ func (b *Broker) StartClusterListening() {
}
tmpDelay = ACCEPT_MIN_SLEEP
go b.handleConnection(ROUTER, conn, idx)
go b.handleConnection(ROUTER, conn)
}
}
func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
func (b *Broker) handleConnection(typ int, conn net.Conn) {
//process connect packet
packet, err := packets.ReadPacket(conn)
if err != nil {
@@ -340,6 +361,8 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
b.routes.Store(cid, c)
}
// mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum]
c.readLoop()
}
@@ -391,6 +414,18 @@ func (b *Broker) ConnectToDiscovery() {
go c.StartPing()
}
func (b *Broker) processClusterInfo() {
for {
msg, ok := <-b.clusterPool
if !ok {
log.Error("read message from cluster channel error")
return
}
ProcessMessage(msg)
}
}
func (b *Broker) connectRouter(id, addr string) {
var conn net.Conn
var err error
@@ -451,6 +486,7 @@ func (b *Broker) connectRouter(id, addr string) {
c.SendConnect()
// mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum]
go c.readLoop()
go c.StartPing()

View File

@@ -88,6 +88,9 @@ func (c *client) init() {
func (c *client) keepAlive(ch chan int) {
defer close(ch)
b := c.broker
keepalive := time.Duration(c.info.keepalive*3/2) * time.Second
timer := time.NewTimer(keepalive)
@@ -101,7 +104,10 @@ func (c *client) keepAlive(ch chan int) {
continue
}
log.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive))
c.broker.DispatchMessage(&Message{client: c, packet: DisconnectdPacket})
msg := &Message{client: c, packet: DisconnectdPacket}
b.SubmitWork(msg)
timer.Stop()
return
case _, ok := <-c.closed:
@@ -114,7 +120,8 @@ func (c *client) keepAlive(ch chan int) {
func (c *client) readLoop() {
nc := c.conn
if nc == nil {
b := c.broker
if nc == nil || b == nil {
return
}
@@ -130,13 +137,15 @@ func (c *client) readLoop() {
// keepalive channel
ch <- 1
c.broker.DispatchMessage(&Message{
msg := &Message{
client: c,
packet: packet,
})
}
b.SubmitWork(msg)
}
c.broker.DispatchMessage(&Message{client: c, packet: DisconnectdPacket})
msg := &Message{client: c, packet: DisconnectdPacket}
b.SubmitWork(msg)
}
func ProcessMessage(msg *Message) {
@@ -524,7 +533,7 @@ func (c *client) Close() {
}
func (c *client) WriterPacket(packet packets.ControlPacket) error {
if packet == nil {
if c == nil || packet == nil {
return nil
}

View File

@@ -9,11 +9,10 @@ import (
"errors"
"flag"
"fmt"
"io/ioutil"
"os"
"github.com/fhmq/hmq/logger"
"go.uber.org/zap"
"io/ioutil"
"os"
)
type Config struct {
@@ -52,6 +51,10 @@ var DefaultConfig *Config = &Config{
Acl: false,
}
var (
log *zap.Logger
)
func showHelp() {
fmt.Printf("%s\n", usageStr)
os.Exit(0)

View File

@@ -1,25 +0,0 @@
package broker
import (
"sync"
)
// Dispatcher will delegate ProcessMessage func to multiple goroutines
type Dispatcher struct {
workerPool *sync.Pool
}
// NewDispatcher create a *Dispatcher instance
func NewDispatcher() *Dispatcher {
return &Dispatcher{workerPool: &sync.Pool{
New: func() interface{} {
return NewWorker()
},
},
}
}
// Dispatch a message to the workers
func (d *Dispatcher) Dispatch(message *Message) {
d.workerPool.Get().(Worker).WorkerChannel <- Work{WorkerPool: d.workerPool, Message: message}
}

View File

@@ -4,12 +4,10 @@ package broker
import (
"fmt"
"time"
simplejson "github.com/bitly/go-simplejson"
"github.com/eclipse/paho.mqtt.golang/packets"
"go.uber.org/zap"
simplejson "github.com/bitly/go-simplejson"
"time"
)
func (c *client) SendInfo() {

View File

@@ -1,9 +1,8 @@
package broker
import (
"sync"
"github.com/eclipse/paho.mqtt.golang/packets"
"sync"
)
type RetainList struct {

View File

@@ -4,9 +4,8 @@ package broker
import (
"errors"
"sync"
"go.uber.org/zap"
"sync"
)
// A result structure better optimized for queue subs.

View File

@@ -1,28 +0,0 @@
package broker
import "sync"
type Work struct {
WorkerPool *sync.Pool
Message *Message
}
type Worker struct {
WorkerChannel chan Work
}
func NewWorker() Worker {
w := Worker{WorkerChannel: make(chan Work)}
return w.Start()
}
func (w Worker) Start() Worker {
go func() {
for work := range w.WorkerChannel {
ProcessMessage(work.Message)
// put the worker back
work.WorkerPool.Put(w)
}
}()
return w
}

View File

@@ -8,40 +8,43 @@ import (
)
var (
logInstance *zap.Logger
// env can be setup at build time with Go Linker. Value could be prod or whatever else for dev env
instance *zap.Logger
logCfg zap.Config
)
// InitDevLogger instanciate a logger for dev builds
func InitDevLogger() {
// NewDevLogger return a logger for dev builds
func NewDevLogger() (*zap.Logger, error) {
logCfg := zap.NewDevelopmentConfig()
logInstance, _ = logCfg.Build()
return logCfg.Build()
}
// InitProdLogger instanciate a logger for production builds
func InitProdLogger() {
// NewProdLogger return a logger for production builds
func NewProdLogger() (*zap.Logger, error) {
logCfg := zap.NewProductionConfig()
logCfg.DisableStacktrace = true
logCfg.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
logInstance, _ = logCfg.Build()
return logCfg.Build()
}
func InitLogger(debug bool) {
var err error
var log *zap.Logger
if debug {
InitDevLogger()
log, err = NewDevLogger()
} else {
InitProdLogger()
log, err = NewProdLogger()
}
if err != nil {
panic("Unable to create a logger.")
}
logInstance.Debug("Logger initialization succeeded")
defer log.Sync()
log.Debug("Logger initialization succeeded")
instance = log.Named("hmq")
}
// Get the existing *zap.Logger instance. If none have been created, it'll instanciate de dev logger
// Get return a *zap.Logger instance
func Get() *zap.Logger {
if logInstance == nil {
InitDevLogger()
}
return logInstance
return instance
}

View File

@@ -19,11 +19,15 @@ func TestGet(t *testing.T) {
}
func TestNewDevLogger(t *testing.T) {
InitDevLogger()
assert.True(t, Get().Core().Enabled(zap.DebugLevel))
logger, err := NewDevLogger()
assert.Nil(t, err)
assert.True(t, logger.Core().Enabled(zap.DebugLevel))
}
func TestNewProdLogger(t *testing.T) {
InitProdLogger()
assert.False(t, Get().Core().Enabled(zap.DebugLevel))
logger, err := NewProdLogger()
assert.Nil(t, err)
assert.False(t, logger.Core().Enabled(zap.DebugLevel))
}

View File

@@ -8,12 +8,10 @@ package main
import (
"fmt"
"github.com/fhmq/hmq/broker"
"os"
"os/signal"
"runtime"
"github.com/fhmq/hmq/broker"
"github.com/fhmq/hmq/logger"
)
func main() {
@@ -23,8 +21,8 @@ func main() {
fmt.Println("configure broker config error: ", err)
return
}
logger.InitLogger(config.Debug)
b, err := broker.NewBroker(config, logger.Get())
b, err := broker.NewBroker(config)
if err != nil {
fmt.Println("New Broker error: ", err)
return