mirror of
https://github.com/fhmq/hmq.git
synced 2026-05-02 14:28:34 +00:00
Compare commits
3 Commits
muti_chann
...
pool
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f92087eada | ||
|
|
a8465585b9 | ||
|
|
8337e14452 |
60
README.md
60
README.md
@@ -16,43 +16,26 @@ $ go run main.go
|
|||||||
|
|
||||||
## Usage of hmq:
|
## Usage of hmq:
|
||||||
~~~
|
~~~
|
||||||
Usage of ./hmq:
|
Usage: hmq [options]
|
||||||
-w int
|
|
||||||
worker num to process message, perfer (client num)/10. (default 1024)
|
Broker Options:
|
||||||
-worker int
|
-w, --worker <number> Worker num to process message, perfer (client num)/10. (default 1024)
|
||||||
worker num to process message, perfer (client num)/10. (default 1024)
|
-p, --port <port> Use port for clients (default: 1883)
|
||||||
-h string
|
--host <host> Network host to listen on. (default "0.0.0.0")
|
||||||
Network host to listen on. (default "0.0.0.0")
|
-ws, --wsport <port> Use port for websocket monitoring
|
||||||
-host string
|
-wsp,--wspath <path> Use path for websocket monitoring
|
||||||
Network host to listen on. (default "0.0.0.0")
|
-c, --config <file> Configuration file
|
||||||
-p string
|
|
||||||
Port to listen on. (default "1883")
|
Logging Options:
|
||||||
-port string
|
-d, --debug <bool> Enable debugging output (default false)
|
||||||
Port to listen on. (default "1883")
|
-D Debug and trace
|
||||||
-c string
|
|
||||||
config file for hmq
|
Cluster Options:
|
||||||
-config string
|
-r, --router <rurl> Router who maintenance cluster info
|
||||||
config file for hmq
|
-cp, --clusterport <cluster-port> Cluster listen port for others
|
||||||
-cluster string
|
|
||||||
Cluster ip from which members can connect.
|
Common Options:
|
||||||
-cluster_listen string
|
-h, --help Show this message
|
||||||
Cluster ip from which members can connect.
|
|
||||||
-cluster_port string
|
|
||||||
Cluster port from which members can connect.
|
|
||||||
-cp string
|
|
||||||
Cluster port from which members can connect.
|
|
||||||
-r string
|
|
||||||
Router who maintenance cluster info
|
|
||||||
-router string
|
|
||||||
Router who maintenance cluster info
|
|
||||||
-ws_path string
|
|
||||||
path for ws to listen on
|
|
||||||
-ws_port string
|
|
||||||
port for ws to listen on
|
|
||||||
-wspath string
|
|
||||||
path for ws to listen on
|
|
||||||
-wsport string
|
|
||||||
port for ws to listen on
|
|
||||||
~~~
|
~~~
|
||||||
|
|
||||||
### hmq.config
|
### hmq.config
|
||||||
@@ -105,6 +88,9 @@ Usage of ./hmq:
|
|||||||
### Cluster
|
### Cluster
|
||||||
```bash
|
```bash
|
||||||
1, start router for hmq (https://github.com/fhmq/router.git)
|
1, start router for hmq (https://github.com/fhmq/router.git)
|
||||||
|
$ go get github.com/fhmq/router
|
||||||
|
$ cd $GOPATH/github.com/fhmq/router
|
||||||
|
$ go run main.go
|
||||||
2, config router in hmq.config ("router": "127.0.0.1:9888")
|
2, config router in hmq.config ("router": "127.0.0.1:9888")
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|||||||
115
broker/broker.go
115
broker/broker.go
@@ -19,17 +19,10 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"golang.org/x/net/websocket"
|
"golang.org/x/net/websocket"
|
||||||
|
|
||||||
"github.com/fhmq/hmq/logger"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
brokerLog = logger.Get().Named("Broker")
|
brokerLog *zap.Logger
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
MessagePoolNum = 1024
|
|
||||||
MessageNum = 1024
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
@@ -38,44 +31,35 @@ type Message struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Broker struct {
|
type Broker struct {
|
||||||
id string
|
id string
|
||||||
cid uint64
|
cid uint64
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
config *Config
|
config *Config
|
||||||
tlsConfig *tls.Config
|
tlsConfig *tls.Config
|
||||||
AclConfig *acl.ACLConfig
|
AclConfig *acl.ACLConfig
|
||||||
wpool *pool.WorkerPool
|
wpool *pool.WorkerPool
|
||||||
clients sync.Map
|
clients sync.Map
|
||||||
routes sync.Map
|
routes sync.Map
|
||||||
remotes sync.Map
|
remotes sync.Map
|
||||||
nodes map[string]interface{}
|
nodes map[string]interface{}
|
||||||
clusterChannel chan *Message
|
clusterPool chan *Message
|
||||||
messagePool []chan *Message
|
messagePool chan *Message
|
||||||
sl *Sublist
|
sl *Sublist
|
||||||
rl *RetainList
|
rl *RetainList
|
||||||
queues map[string]int
|
queues map[string]int
|
||||||
}
|
|
||||||
|
|
||||||
func newMessagePool() []chan *Message {
|
|
||||||
mp := make([]chan *Message, 0)
|
|
||||||
for i := 0; i < MessagePoolNum; i++ {
|
|
||||||
tempCh := make(chan *Message, MessageNum)
|
|
||||||
mp = append(mp, tempCh)
|
|
||||||
}
|
|
||||||
return mp
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBroker(config *Config) (*Broker, error) {
|
func NewBroker(config *Config) (*Broker, error) {
|
||||||
b := &Broker{
|
b := &Broker{
|
||||||
id: GenUniqueId(),
|
id: GenUniqueId(),
|
||||||
config: config,
|
config: config,
|
||||||
wpool: pool.New(config.Worker),
|
wpool: pool.New(config.Worker),
|
||||||
sl: NewSublist(),
|
sl: NewSublist(),
|
||||||
rl: NewRetainList(),
|
rl: NewRetainList(),
|
||||||
nodes: make(map[string]interface{}),
|
nodes: make(map[string]interface{}),
|
||||||
queues: make(map[string]int),
|
queues: make(map[string]int),
|
||||||
clusterChannel: make(chan *Message),
|
clusterPool: make(chan *Message),
|
||||||
messagePool: newMessagePool(),
|
messagePool: make(chan *Message),
|
||||||
}
|
}
|
||||||
if b.config.TlsPort != "" {
|
if b.config.TlsPort != "" {
|
||||||
tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
|
tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
|
||||||
@@ -98,19 +82,15 @@ func NewBroker(config *Config) (*Broker, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *Broker) StartDispatcher() {
|
func (b *Broker) StartDispatcher() {
|
||||||
for i := 0; i < MessagePoolNum; i++ {
|
for {
|
||||||
go func(idx int) {
|
msg, ok := <-b.messagePool
|
||||||
for {
|
if !ok {
|
||||||
msg, ok := <-b.messagePool[idx]
|
brokerLog.Error("read message from client channel error")
|
||||||
if !ok {
|
return
|
||||||
brokerLog.Error("read message from client channel error")
|
}
|
||||||
return
|
b.wpool.Submit(func() {
|
||||||
}
|
ProcessMessage(msg)
|
||||||
b.wpool.Submit(func() {
|
})
|
||||||
ProcessMessage(msg)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -185,10 +165,9 @@ func (b *Broker) StartWebsocketListening() {
|
|||||||
|
|
||||||
func (b *Broker) wsHandler(ws *websocket.Conn) {
|
func (b *Broker) wsHandler(ws *websocket.Conn) {
|
||||||
// io.Copy(ws, ws)
|
// io.Copy(ws, ws)
|
||||||
|
atomic.AddUint64(&b.cid, 1)
|
||||||
ws.PayloadType = websocket.BinaryFrame
|
ws.PayloadType = websocket.BinaryFrame
|
||||||
|
b.handleConnection(CLIENT, ws, b.cid)
|
||||||
idx := atomic.AddUint64(&b.cid, 1)
|
|
||||||
b.handleConnection(CLIENT, ws, idx)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Broker) StartClientListening(Tls bool) {
|
func (b *Broker) StartClientListening(Tls bool) {
|
||||||
@@ -226,8 +205,8 @@ func (b *Broker) StartClientListening(Tls bool) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
tmpDelay = ACCEPT_MIN_SLEEP
|
tmpDelay = ACCEPT_MIN_SLEEP
|
||||||
idx := atomic.AddUint64(&b.cid, 1)
|
atomic.AddUint64(&b.cid, 1)
|
||||||
go b.handleConnection(CLIENT, conn, idx)
|
go b.handleConnection(CLIENT, conn, b.cid)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -271,6 +250,7 @@ func (b *Broker) StartClusterListening() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var idx uint64 = 0
|
||||||
tmpDelay := 10 * ACCEPT_MIN_SLEEP
|
tmpDelay := 10 * ACCEPT_MIN_SLEEP
|
||||||
for {
|
for {
|
||||||
conn, err := l.Accept()
|
conn, err := l.Accept()
|
||||||
@@ -289,7 +269,7 @@ func (b *Broker) StartClusterListening() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
tmpDelay = ACCEPT_MIN_SLEEP
|
tmpDelay = ACCEPT_MIN_SLEEP
|
||||||
idx := atomic.AddUint64(&b.cid, 1)
|
|
||||||
go b.handleConnection(ROUTER, conn, idx)
|
go b.handleConnection(ROUTER, conn, idx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -374,9 +354,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
|
|||||||
b.routes.Store(cid, c)
|
b.routes.Store(cid, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
mpool := b.messagePool[idx%MessagePoolNum]
|
c.readLoop(b.messagePool)
|
||||||
|
|
||||||
c.readLoop(mpool)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Broker) ConnectToDiscovery() {
|
func (b *Broker) ConnectToDiscovery() {
|
||||||
@@ -423,13 +401,13 @@ func (b *Broker) ConnectToDiscovery() {
|
|||||||
c.SendConnect()
|
c.SendConnect()
|
||||||
c.SendInfo()
|
c.SendInfo()
|
||||||
|
|
||||||
go c.readLoop(b.clusterChannel)
|
go c.readLoop(b.clusterPool)
|
||||||
go c.StartPing()
|
go c.StartPing()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Broker) processClusterInfo() {
|
func (b *Broker) processClusterInfo() {
|
||||||
for {
|
for {
|
||||||
msg, ok := <-b.clusterChannel
|
msg, ok := <-b.clusterPool
|
||||||
if !ok {
|
if !ok {
|
||||||
brokerLog.Error("read message from cluster channel error")
|
brokerLog.Error("read message from cluster channel error")
|
||||||
return
|
return
|
||||||
@@ -499,10 +477,7 @@ func (b *Broker) connectRouter(id, addr string) {
|
|||||||
|
|
||||||
c.SendConnect()
|
c.SendConnect()
|
||||||
|
|
||||||
idx := atomic.AddUint64(&b.cid, 1)
|
go c.readLoop(b.messagePool)
|
||||||
mpool := b.messagePool[idx%MessagePoolNum]
|
|
||||||
|
|
||||||
go c.readLoop(mpool)
|
|
||||||
go c.StartPing()
|
go c.StartPing()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,7 +10,9 @@ import (
|
|||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/fhmq/hmq/logger"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -28,6 +30,7 @@ type Config struct {
|
|||||||
TlsInfo TLSInfo `json:"tlsInfo"`
|
TlsInfo TLSInfo `json:"tlsInfo"`
|
||||||
Acl bool `json:"acl"`
|
Acl bool `json:"acl"`
|
||||||
AclConf string `json:"aclConf"`
|
AclConf string `json:"aclConf"`
|
||||||
|
Debug bool `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type RouteInfo struct {
|
type RouteInfo struct {
|
||||||
@@ -49,30 +52,60 @@ var DefaultConfig *Config = &Config{
|
|||||||
Acl: false,
|
Acl: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
func ConfigureConfig() (*Config, error) {
|
func showHelp() {
|
||||||
|
fmt.Printf("%s\n", usageStr)
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ConfigureConfig(args []string) (*Config, error) {
|
||||||
config := &Config{}
|
config := &Config{}
|
||||||
var (
|
var (
|
||||||
|
help bool
|
||||||
configFile string
|
configFile string
|
||||||
)
|
)
|
||||||
flag.IntVar(&config.Worker, "w", 1024, "worker num to process message, perfer (client num)/10.")
|
fs := flag.NewFlagSet("hmq-broker", flag.ExitOnError)
|
||||||
flag.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.")
|
fs.Usage = showHelp
|
||||||
flag.StringVar(&config.Port, "port", "1883", "Port to listen on.")
|
|
||||||
flag.StringVar(&config.Port, "p", "1883", "Port to listen on.")
|
fs.BoolVar(&help, "h", false, "Show this message.")
|
||||||
flag.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on.")
|
fs.BoolVar(&help, "help", false, "Show this message.")
|
||||||
flag.StringVar(&config.Host, "h", "0.0.0.0", "Network host to listen on.")
|
fs.IntVar(&config.Worker, "w", 1024, "worker num to process message, perfer (client num)/10.")
|
||||||
flag.StringVar(&config.Cluster.Host, "cluster", "", "Cluster ip from which members can connect.")
|
fs.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.")
|
||||||
flag.StringVar(&config.Cluster.Host, "cluster_listen", "", "Cluster ip from which members can connect.")
|
fs.StringVar(&config.Port, "port", "1883", "Port to listen on.")
|
||||||
flag.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.")
|
fs.StringVar(&config.Port, "p", "1883", "Port to listen on.")
|
||||||
flag.StringVar(&config.Cluster.Port, "cluster_port", "", "Cluster port from which members can connect.")
|
fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on")
|
||||||
flag.StringVar(&config.Router, "r", "", "Router who maintenance cluster info")
|
fs.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.")
|
||||||
flag.StringVar(&config.Router, "router", "", "Router who maintenance cluster info")
|
fs.StringVar(&config.Cluster.Port, "clusterport", "", "Cluster port from which members can connect.")
|
||||||
flag.StringVar(&config.WsPort, "wsport", "", "port for ws to listen on")
|
fs.StringVar(&config.Router, "r", "", "Router who maintenance cluster info")
|
||||||
flag.StringVar(&config.WsPort, "ws_port", "", "port for ws to listen on")
|
fs.StringVar(&config.Router, "router", "", "Router who maintenance cluster info")
|
||||||
flag.StringVar(&config.WsPath, "wspath", "", "path for ws to listen on")
|
fs.StringVar(&config.WsPort, "ws", "", "port for ws to listen on")
|
||||||
flag.StringVar(&config.WsPath, "ws_path", "", "path for ws to listen on")
|
fs.StringVar(&config.WsPort, "wsport", "", "port for ws to listen on")
|
||||||
flag.StringVar(&configFile, "config", "", "config file for hmq")
|
fs.StringVar(&config.WsPath, "wsp", "", "path for ws to listen on")
|
||||||
flag.StringVar(&configFile, "c", "", "config file for hmq")
|
fs.StringVar(&config.WsPath, "wspath", "", "path for ws to listen on")
|
||||||
flag.Parse()
|
fs.StringVar(&configFile, "config", "", "config file for hmq")
|
||||||
|
fs.StringVar(&configFile, "c", "", "config file for hmq")
|
||||||
|
fs.BoolVar(&config.Debug, "debug", false, "enable Debug logging.")
|
||||||
|
fs.BoolVar(&config.Debug, "d", false, "enable Debug logging.")
|
||||||
|
|
||||||
|
fs.Bool("D", true, "enable Debug logging.")
|
||||||
|
|
||||||
|
if err := fs.Parse(args); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if help {
|
||||||
|
showHelp()
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
fs.Visit(func(f *flag.Flag) {
|
||||||
|
switch f.Name {
|
||||||
|
case "D":
|
||||||
|
config.Debug = true
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
logger.InitLogger(config.Debug)
|
||||||
|
brokerLog = logger.Get().Named("Broker")
|
||||||
|
|
||||||
if configFile != "" {
|
if configFile != "" {
|
||||||
tmpConfig, e := LoadConfig(configFile)
|
tmpConfig, e := LoadConfig(configFile)
|
||||||
|
|||||||
24
broker/usage.go
Normal file
24
broker/usage.go
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
package broker
|
||||||
|
|
||||||
|
var usageStr = `
|
||||||
|
Usage: hmq [options]
|
||||||
|
|
||||||
|
Broker Options:
|
||||||
|
-w, --worker <number> Worker num to process message, perfer (client num)/10. (default 1024)
|
||||||
|
-p, --port <port> Use port for clients (default: 1883)
|
||||||
|
--host <host> Network host to listen on. (default "0.0.0.0")
|
||||||
|
-ws, --wsport <port> Use port for websocket monitoring
|
||||||
|
-wsp,--wspath <path> Use path for websocket monitoring
|
||||||
|
-c, --config <file> Configuration file
|
||||||
|
|
||||||
|
Logging Options:
|
||||||
|
-d, --debug <bool> Enable debugging output (default false)
|
||||||
|
-D Debug and trace
|
||||||
|
|
||||||
|
Cluster Options:
|
||||||
|
-r, --router <rurl> Router who maintenance cluster info
|
||||||
|
-cp, --clusterport <cluster-port> Cluster listen port for others
|
||||||
|
|
||||||
|
Common Options:
|
||||||
|
-h, --help Show this message
|
||||||
|
`
|
||||||
@@ -9,7 +9,6 @@ import (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
// env can be setup at build time with Go Linker. Value could be prod or whatever else for dev env
|
// env can be setup at build time with Go Linker. Value could be prod or whatever else for dev env
|
||||||
env string
|
|
||||||
instance *zap.Logger
|
instance *zap.Logger
|
||||||
logCfg zap.Config
|
logCfg zap.Config
|
||||||
)
|
)
|
||||||
@@ -28,13 +27,13 @@ func NewProdLogger() (*zap.Logger, error) {
|
|||||||
return logCfg.Build()
|
return logCfg.Build()
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func InitLogger(debug bool) {
|
||||||
var err error
|
var err error
|
||||||
var log *zap.Logger
|
var log *zap.Logger
|
||||||
if env == "prod" {
|
if debug {
|
||||||
log, err = NewProdLogger()
|
|
||||||
} else {
|
|
||||||
log, err = NewDevLogger()
|
log, err = NewDevLogger()
|
||||||
|
} else {
|
||||||
|
log, err = NewProdLogger()
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic("Unable to create a logger.")
|
panic("Unable to create a logger.")
|
||||||
|
|||||||
15
main.go
15
main.go
@@ -7,36 +7,31 @@ copyright notice and this permission notice appear in all copies.
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
|
||||||
"github.com/fhmq/hmq/broker"
|
"github.com/fhmq/hmq/broker"
|
||||||
"github.com/fhmq/hmq/logger"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
log = logger.Get().Named("Main")
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
runtime.GOMAXPROCS(runtime.NumCPU())
|
runtime.GOMAXPROCS(runtime.NumCPU())
|
||||||
config, err := broker.ConfigureConfig()
|
config, err := broker.ConfigureConfig(os.Args[1:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("configure broker config error: ", zap.Error(err))
|
fmt.Println("configure broker config error: ", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := broker.NewBroker(config)
|
b, err := broker.NewBroker(config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("New Broker error: ", zap.Error(err))
|
fmt.Println("New Broker error: ", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
b.Start()
|
b.Start()
|
||||||
|
|
||||||
s := waitForSignal()
|
s := waitForSignal()
|
||||||
log.Info("signal received, broker closed.", zap.Any("signal", s))
|
fmt.Println("signal received, broker closed.", s)
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForSignal() os.Signal {
|
func waitForSignal() os.Signal {
|
||||||
|
|||||||
Reference in New Issue
Block a user