mirror of
https://github.com/fhmq/hmq.git
synced 2026-05-02 14:28:34 +00:00
Compare commits
3 Commits
muti_chann
...
pool
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f92087eada | ||
|
|
a8465585b9 | ||
|
|
8337e14452 |
60
README.md
60
README.md
@@ -16,43 +16,26 @@ $ go run main.go
|
||||
|
||||
## Usage of hmq:
|
||||
~~~
|
||||
Usage of ./hmq:
|
||||
-w int
|
||||
worker num to process message, perfer (client num)/10. (default 1024)
|
||||
-worker int
|
||||
worker num to process message, perfer (client num)/10. (default 1024)
|
||||
-h string
|
||||
Network host to listen on. (default "0.0.0.0")
|
||||
-host string
|
||||
Network host to listen on. (default "0.0.0.0")
|
||||
-p string
|
||||
Port to listen on. (default "1883")
|
||||
-port string
|
||||
Port to listen on. (default "1883")
|
||||
-c string
|
||||
config file for hmq
|
||||
-config string
|
||||
config file for hmq
|
||||
-cluster string
|
||||
Cluster ip from which members can connect.
|
||||
-cluster_listen string
|
||||
Cluster ip from which members can connect.
|
||||
-cluster_port string
|
||||
Cluster port from which members can connect.
|
||||
-cp string
|
||||
Cluster port from which members can connect.
|
||||
-r string
|
||||
Router who maintenance cluster info
|
||||
-router string
|
||||
Router who maintenance cluster info
|
||||
-ws_path string
|
||||
path for ws to listen on
|
||||
-ws_port string
|
||||
port for ws to listen on
|
||||
-wspath string
|
||||
path for ws to listen on
|
||||
-wsport string
|
||||
port for ws to listen on
|
||||
Usage: hmq [options]
|
||||
|
||||
Broker Options:
|
||||
-w, --worker <number> Worker num to process message, perfer (client num)/10. (default 1024)
|
||||
-p, --port <port> Use port for clients (default: 1883)
|
||||
--host <host> Network host to listen on. (default "0.0.0.0")
|
||||
-ws, --wsport <port> Use port for websocket monitoring
|
||||
-wsp,--wspath <path> Use path for websocket monitoring
|
||||
-c, --config <file> Configuration file
|
||||
|
||||
Logging Options:
|
||||
-d, --debug <bool> Enable debugging output (default false)
|
||||
-D Debug and trace
|
||||
|
||||
Cluster Options:
|
||||
-r, --router <rurl> Router who maintenance cluster info
|
||||
-cp, --clusterport <cluster-port> Cluster listen port for others
|
||||
|
||||
Common Options:
|
||||
-h, --help Show this message
|
||||
~~~
|
||||
|
||||
### hmq.config
|
||||
@@ -105,6 +88,9 @@ Usage of ./hmq:
|
||||
### Cluster
|
||||
```bash
|
||||
1, start router for hmq (https://github.com/fhmq/router.git)
|
||||
$ go get github.com/fhmq/router
|
||||
$ cd $GOPATH/github.com/fhmq/router
|
||||
$ go run main.go
|
||||
2, config router in hmq.config ("router": "127.0.0.1:9888")
|
||||
|
||||
```
|
||||
|
||||
115
broker/broker.go
115
broker/broker.go
@@ -19,17 +19,10 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"golang.org/x/net/websocket"
|
||||
|
||||
"github.com/fhmq/hmq/logger"
|
||||
)
|
||||
|
||||
var (
|
||||
brokerLog = logger.Get().Named("Broker")
|
||||
)
|
||||
|
||||
const (
|
||||
MessagePoolNum = 1024
|
||||
MessageNum = 1024
|
||||
brokerLog *zap.Logger
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
@@ -38,44 +31,35 @@ type Message struct {
|
||||
}
|
||||
|
||||
type Broker struct {
|
||||
id string
|
||||
cid uint64
|
||||
mu sync.Mutex
|
||||
config *Config
|
||||
tlsConfig *tls.Config
|
||||
AclConfig *acl.ACLConfig
|
||||
wpool *pool.WorkerPool
|
||||
clients sync.Map
|
||||
routes sync.Map
|
||||
remotes sync.Map
|
||||
nodes map[string]interface{}
|
||||
clusterChannel chan *Message
|
||||
messagePool []chan *Message
|
||||
sl *Sublist
|
||||
rl *RetainList
|
||||
queues map[string]int
|
||||
}
|
||||
|
||||
func newMessagePool() []chan *Message {
|
||||
mp := make([]chan *Message, 0)
|
||||
for i := 0; i < MessagePoolNum; i++ {
|
||||
tempCh := make(chan *Message, MessageNum)
|
||||
mp = append(mp, tempCh)
|
||||
}
|
||||
return mp
|
||||
id string
|
||||
cid uint64
|
||||
mu sync.Mutex
|
||||
config *Config
|
||||
tlsConfig *tls.Config
|
||||
AclConfig *acl.ACLConfig
|
||||
wpool *pool.WorkerPool
|
||||
clients sync.Map
|
||||
routes sync.Map
|
||||
remotes sync.Map
|
||||
nodes map[string]interface{}
|
||||
clusterPool chan *Message
|
||||
messagePool chan *Message
|
||||
sl *Sublist
|
||||
rl *RetainList
|
||||
queues map[string]int
|
||||
}
|
||||
|
||||
func NewBroker(config *Config) (*Broker, error) {
|
||||
b := &Broker{
|
||||
id: GenUniqueId(),
|
||||
config: config,
|
||||
wpool: pool.New(config.Worker),
|
||||
sl: NewSublist(),
|
||||
rl: NewRetainList(),
|
||||
nodes: make(map[string]interface{}),
|
||||
queues: make(map[string]int),
|
||||
clusterChannel: make(chan *Message),
|
||||
messagePool: newMessagePool(),
|
||||
id: GenUniqueId(),
|
||||
config: config,
|
||||
wpool: pool.New(config.Worker),
|
||||
sl: NewSublist(),
|
||||
rl: NewRetainList(),
|
||||
nodes: make(map[string]interface{}),
|
||||
queues: make(map[string]int),
|
||||
clusterPool: make(chan *Message),
|
||||
messagePool: make(chan *Message),
|
||||
}
|
||||
if b.config.TlsPort != "" {
|
||||
tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
|
||||
@@ -98,19 +82,15 @@ func NewBroker(config *Config) (*Broker, error) {
|
||||
}
|
||||
|
||||
func (b *Broker) StartDispatcher() {
|
||||
for i := 0; i < MessagePoolNum; i++ {
|
||||
go func(idx int) {
|
||||
for {
|
||||
msg, ok := <-b.messagePool[idx]
|
||||
if !ok {
|
||||
brokerLog.Error("read message from client channel error")
|
||||
return
|
||||
}
|
||||
b.wpool.Submit(func() {
|
||||
ProcessMessage(msg)
|
||||
})
|
||||
}
|
||||
}(i)
|
||||
for {
|
||||
msg, ok := <-b.messagePool
|
||||
if !ok {
|
||||
brokerLog.Error("read message from client channel error")
|
||||
return
|
||||
}
|
||||
b.wpool.Submit(func() {
|
||||
ProcessMessage(msg)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
@@ -185,10 +165,9 @@ func (b *Broker) StartWebsocketListening() {
|
||||
|
||||
func (b *Broker) wsHandler(ws *websocket.Conn) {
|
||||
// io.Copy(ws, ws)
|
||||
atomic.AddUint64(&b.cid, 1)
|
||||
ws.PayloadType = websocket.BinaryFrame
|
||||
|
||||
idx := atomic.AddUint64(&b.cid, 1)
|
||||
b.handleConnection(CLIENT, ws, idx)
|
||||
b.handleConnection(CLIENT, ws, b.cid)
|
||||
}
|
||||
|
||||
func (b *Broker) StartClientListening(Tls bool) {
|
||||
@@ -226,8 +205,8 @@ func (b *Broker) StartClientListening(Tls bool) {
|
||||
continue
|
||||
}
|
||||
tmpDelay = ACCEPT_MIN_SLEEP
|
||||
idx := atomic.AddUint64(&b.cid, 1)
|
||||
go b.handleConnection(CLIENT, conn, idx)
|
||||
atomic.AddUint64(&b.cid, 1)
|
||||
go b.handleConnection(CLIENT, conn, b.cid)
|
||||
|
||||
}
|
||||
}
|
||||
@@ -271,6 +250,7 @@ func (b *Broker) StartClusterListening() {
|
||||
return
|
||||
}
|
||||
|
||||
var idx uint64 = 0
|
||||
tmpDelay := 10 * ACCEPT_MIN_SLEEP
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
@@ -289,7 +269,7 @@ func (b *Broker) StartClusterListening() {
|
||||
continue
|
||||
}
|
||||
tmpDelay = ACCEPT_MIN_SLEEP
|
||||
idx := atomic.AddUint64(&b.cid, 1)
|
||||
|
||||
go b.handleConnection(ROUTER, conn, idx)
|
||||
}
|
||||
}
|
||||
@@ -374,9 +354,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
|
||||
b.routes.Store(cid, c)
|
||||
}
|
||||
|
||||
mpool := b.messagePool[idx%MessagePoolNum]
|
||||
|
||||
c.readLoop(mpool)
|
||||
c.readLoop(b.messagePool)
|
||||
}
|
||||
|
||||
func (b *Broker) ConnectToDiscovery() {
|
||||
@@ -423,13 +401,13 @@ func (b *Broker) ConnectToDiscovery() {
|
||||
c.SendConnect()
|
||||
c.SendInfo()
|
||||
|
||||
go c.readLoop(b.clusterChannel)
|
||||
go c.readLoop(b.clusterPool)
|
||||
go c.StartPing()
|
||||
}
|
||||
|
||||
func (b *Broker) processClusterInfo() {
|
||||
for {
|
||||
msg, ok := <-b.clusterChannel
|
||||
msg, ok := <-b.clusterPool
|
||||
if !ok {
|
||||
brokerLog.Error("read message from cluster channel error")
|
||||
return
|
||||
@@ -499,10 +477,7 @@ func (b *Broker) connectRouter(id, addr string) {
|
||||
|
||||
c.SendConnect()
|
||||
|
||||
idx := atomic.AddUint64(&b.cid, 1)
|
||||
mpool := b.messagePool[idx%MessagePoolNum]
|
||||
|
||||
go c.readLoop(mpool)
|
||||
go c.readLoop(b.messagePool)
|
||||
go c.StartPing()
|
||||
|
||||
}
|
||||
|
||||
@@ -10,7 +10,9 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
||||
"github.com/fhmq/hmq/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@@ -28,6 +30,7 @@ type Config struct {
|
||||
TlsInfo TLSInfo `json:"tlsInfo"`
|
||||
Acl bool `json:"acl"`
|
||||
AclConf string `json:"aclConf"`
|
||||
Debug bool `json:"-"`
|
||||
}
|
||||
|
||||
type RouteInfo struct {
|
||||
@@ -49,30 +52,60 @@ var DefaultConfig *Config = &Config{
|
||||
Acl: false,
|
||||
}
|
||||
|
||||
func ConfigureConfig() (*Config, error) {
|
||||
func showHelp() {
|
||||
fmt.Printf("%s\n", usageStr)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
func ConfigureConfig(args []string) (*Config, error) {
|
||||
config := &Config{}
|
||||
var (
|
||||
help bool
|
||||
configFile string
|
||||
)
|
||||
flag.IntVar(&config.Worker, "w", 1024, "worker num to process message, perfer (client num)/10.")
|
||||
flag.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.")
|
||||
flag.StringVar(&config.Port, "port", "1883", "Port to listen on.")
|
||||
flag.StringVar(&config.Port, "p", "1883", "Port to listen on.")
|
||||
flag.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on.")
|
||||
flag.StringVar(&config.Host, "h", "0.0.0.0", "Network host to listen on.")
|
||||
flag.StringVar(&config.Cluster.Host, "cluster", "", "Cluster ip from which members can connect.")
|
||||
flag.StringVar(&config.Cluster.Host, "cluster_listen", "", "Cluster ip from which members can connect.")
|
||||
flag.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.")
|
||||
flag.StringVar(&config.Cluster.Port, "cluster_port", "", "Cluster port from which members can connect.")
|
||||
flag.StringVar(&config.Router, "r", "", "Router who maintenance cluster info")
|
||||
flag.StringVar(&config.Router, "router", "", "Router who maintenance cluster info")
|
||||
flag.StringVar(&config.WsPort, "wsport", "", "port for ws to listen on")
|
||||
flag.StringVar(&config.WsPort, "ws_port", "", "port for ws to listen on")
|
||||
flag.StringVar(&config.WsPath, "wspath", "", "path for ws to listen on")
|
||||
flag.StringVar(&config.WsPath, "ws_path", "", "path for ws to listen on")
|
||||
flag.StringVar(&configFile, "config", "", "config file for hmq")
|
||||
flag.StringVar(&configFile, "c", "", "config file for hmq")
|
||||
flag.Parse()
|
||||
fs := flag.NewFlagSet("hmq-broker", flag.ExitOnError)
|
||||
fs.Usage = showHelp
|
||||
|
||||
fs.BoolVar(&help, "h", false, "Show this message.")
|
||||
fs.BoolVar(&help, "help", false, "Show this message.")
|
||||
fs.IntVar(&config.Worker, "w", 1024, "worker num to process message, perfer (client num)/10.")
|
||||
fs.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.")
|
||||
fs.StringVar(&config.Port, "port", "1883", "Port to listen on.")
|
||||
fs.StringVar(&config.Port, "p", "1883", "Port to listen on.")
|
||||
fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on")
|
||||
fs.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.")
|
||||
fs.StringVar(&config.Cluster.Port, "clusterport", "", "Cluster port from which members can connect.")
|
||||
fs.StringVar(&config.Router, "r", "", "Router who maintenance cluster info")
|
||||
fs.StringVar(&config.Router, "router", "", "Router who maintenance cluster info")
|
||||
fs.StringVar(&config.WsPort, "ws", "", "port for ws to listen on")
|
||||
fs.StringVar(&config.WsPort, "wsport", "", "port for ws to listen on")
|
||||
fs.StringVar(&config.WsPath, "wsp", "", "path for ws to listen on")
|
||||
fs.StringVar(&config.WsPath, "wspath", "", "path for ws to listen on")
|
||||
fs.StringVar(&configFile, "config", "", "config file for hmq")
|
||||
fs.StringVar(&configFile, "c", "", "config file for hmq")
|
||||
fs.BoolVar(&config.Debug, "debug", false, "enable Debug logging.")
|
||||
fs.BoolVar(&config.Debug, "d", false, "enable Debug logging.")
|
||||
|
||||
fs.Bool("D", true, "enable Debug logging.")
|
||||
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if help {
|
||||
showHelp()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
fs.Visit(func(f *flag.Flag) {
|
||||
switch f.Name {
|
||||
case "D":
|
||||
config.Debug = true
|
||||
}
|
||||
})
|
||||
|
||||
logger.InitLogger(config.Debug)
|
||||
brokerLog = logger.Get().Named("Broker")
|
||||
|
||||
if configFile != "" {
|
||||
tmpConfig, e := LoadConfig(configFile)
|
||||
|
||||
24
broker/usage.go
Normal file
24
broker/usage.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package broker
|
||||
|
||||
var usageStr = `
|
||||
Usage: hmq [options]
|
||||
|
||||
Broker Options:
|
||||
-w, --worker <number> Worker num to process message, perfer (client num)/10. (default 1024)
|
||||
-p, --port <port> Use port for clients (default: 1883)
|
||||
--host <host> Network host to listen on. (default "0.0.0.0")
|
||||
-ws, --wsport <port> Use port for websocket monitoring
|
||||
-wsp,--wspath <path> Use path for websocket monitoring
|
||||
-c, --config <file> Configuration file
|
||||
|
||||
Logging Options:
|
||||
-d, --debug <bool> Enable debugging output (default false)
|
||||
-D Debug and trace
|
||||
|
||||
Cluster Options:
|
||||
-r, --router <rurl> Router who maintenance cluster info
|
||||
-cp, --clusterport <cluster-port> Cluster listen port for others
|
||||
|
||||
Common Options:
|
||||
-h, --help Show this message
|
||||
`
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
|
||||
var (
|
||||
// env can be setup at build time with Go Linker. Value could be prod or whatever else for dev env
|
||||
env string
|
||||
instance *zap.Logger
|
||||
logCfg zap.Config
|
||||
)
|
||||
@@ -28,13 +27,13 @@ func NewProdLogger() (*zap.Logger, error) {
|
||||
return logCfg.Build()
|
||||
}
|
||||
|
||||
func init() {
|
||||
func InitLogger(debug bool) {
|
||||
var err error
|
||||
var log *zap.Logger
|
||||
if env == "prod" {
|
||||
log, err = NewProdLogger()
|
||||
} else {
|
||||
if debug {
|
||||
log, err = NewDevLogger()
|
||||
} else {
|
||||
log, err = NewProdLogger()
|
||||
}
|
||||
if err != nil {
|
||||
panic("Unable to create a logger.")
|
||||
|
||||
15
main.go
15
main.go
@@ -7,36 +7,31 @@ copyright notice and this permission notice appear in all copies.
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
|
||||
"github.com/fhmq/hmq/broker"
|
||||
"github.com/fhmq/hmq/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
log = logger.Get().Named("Main")
|
||||
)
|
||||
|
||||
func main() {
|
||||
runtime.GOMAXPROCS(runtime.NumCPU())
|
||||
config, err := broker.ConfigureConfig()
|
||||
config, err := broker.ConfigureConfig(os.Args[1:])
|
||||
if err != nil {
|
||||
log.Error("configure broker config error: ", zap.Error(err))
|
||||
fmt.Println("configure broker config error: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
b, err := broker.NewBroker(config)
|
||||
if err != nil {
|
||||
log.Error("New Broker error: ", zap.Error(err))
|
||||
fmt.Println("New Broker error: ", err)
|
||||
return
|
||||
}
|
||||
b.Start()
|
||||
|
||||
s := waitForSignal()
|
||||
log.Info("signal received, broker closed.", zap.Any("signal", s))
|
||||
fmt.Println("signal received, broker closed.", s)
|
||||
}
|
||||
|
||||
func waitForSignal() os.Signal {
|
||||
|
||||
Reference in New Issue
Block a user