3 Commits

Author SHA1 Message Date
zhouyuyan
f92087eada modify readme 2018-02-02 16:50:56 +08:00
zhouyuyan
a8465585b9 modify usage 2018-02-02 09:54:48 +08:00
zhouyuyan
8337e14452 del buf 2018-02-01 16:57:56 +08:00
6 changed files with 154 additions and 142 deletions

View File

@@ -16,43 +16,26 @@ $ go run main.go
## Usage of hmq: ## Usage of hmq:
~~~ ~~~
Usage of ./hmq: Usage: hmq [options]
-w int
worker num to process message, perfer (client num)/10. (default 1024) Broker Options:
-worker int -w, --worker <number> Worker num to process message, perfer (client num)/10. (default 1024)
worker num to process message, perfer (client num)/10. (default 1024) -p, --port <port> Use port for clients (default: 1883)
-h string --host <host> Network host to listen on. (default "0.0.0.0")
Network host to listen on. (default "0.0.0.0") -ws, --wsport <port> Use port for websocket monitoring
-host string -wsp,--wspath <path> Use path for websocket monitoring
Network host to listen on. (default "0.0.0.0") -c, --config <file> Configuration file
-p string
Port to listen on. (default "1883") Logging Options:
-port string -d, --debug <bool> Enable debugging output (default false)
Port to listen on. (default "1883") -D Debug and trace
-c string
config file for hmq Cluster Options:
-config string -r, --router <rurl> Router who maintenance cluster info
config file for hmq -cp, --clusterport <cluster-port> Cluster listen port for others
-cluster string
Cluster ip from which members can connect. Common Options:
-cluster_listen string -h, --help Show this message
Cluster ip from which members can connect.
-cluster_port string
Cluster port from which members can connect.
-cp string
Cluster port from which members can connect.
-r string
Router who maintenance cluster info
-router string
Router who maintenance cluster info
-ws_path string
path for ws to listen on
-ws_port string
port for ws to listen on
-wspath string
path for ws to listen on
-wsport string
port for ws to listen on
~~~ ~~~
### hmq.config ### hmq.config
@@ -105,6 +88,9 @@ Usage of ./hmq:
### Cluster ### Cluster
```bash ```bash
1, start router for hmq (https://github.com/fhmq/router.git) 1, start router for hmq (https://github.com/fhmq/router.git)
$ go get github.com/fhmq/router
$ cd $GOPATH/github.com/fhmq/router
$ go run main.go
2, config router in hmq.config ("router": "127.0.0.1:9888") 2, config router in hmq.config ("router": "127.0.0.1:9888")
``` ```

View File

@@ -19,17 +19,10 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
"github.com/fhmq/hmq/logger"
) )
var ( var (
brokerLog = logger.Get().Named("Broker") brokerLog *zap.Logger
)
const (
MessagePoolNum = 1024
MessageNum = 1024
) )
type Message struct { type Message struct {
@@ -38,44 +31,35 @@ type Message struct {
} }
type Broker struct { type Broker struct {
id string id string
cid uint64 cid uint64
mu sync.Mutex mu sync.Mutex
config *Config config *Config
tlsConfig *tls.Config tlsConfig *tls.Config
AclConfig *acl.ACLConfig AclConfig *acl.ACLConfig
wpool *pool.WorkerPool wpool *pool.WorkerPool
clients sync.Map clients sync.Map
routes sync.Map routes sync.Map
remotes sync.Map remotes sync.Map
nodes map[string]interface{} nodes map[string]interface{}
clusterChannel chan *Message clusterPool chan *Message
messagePool []chan *Message messagePool chan *Message
sl *Sublist sl *Sublist
rl *RetainList rl *RetainList
queues map[string]int queues map[string]int
}
func newMessagePool() []chan *Message {
mp := make([]chan *Message, 0)
for i := 0; i < MessagePoolNum; i++ {
tempCh := make(chan *Message, MessageNum)
mp = append(mp, tempCh)
}
return mp
} }
func NewBroker(config *Config) (*Broker, error) { func NewBroker(config *Config) (*Broker, error) {
b := &Broker{ b := &Broker{
id: GenUniqueId(), id: GenUniqueId(),
config: config, config: config,
wpool: pool.New(config.Worker), wpool: pool.New(config.Worker),
sl: NewSublist(), sl: NewSublist(),
rl: NewRetainList(), rl: NewRetainList(),
nodes: make(map[string]interface{}), nodes: make(map[string]interface{}),
queues: make(map[string]int), queues: make(map[string]int),
clusterChannel: make(chan *Message), clusterPool: make(chan *Message),
messagePool: newMessagePool(), messagePool: make(chan *Message),
} }
if b.config.TlsPort != "" { if b.config.TlsPort != "" {
tlsconfig, err := NewTLSConfig(b.config.TlsInfo) tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
@@ -98,19 +82,15 @@ func NewBroker(config *Config) (*Broker, error) {
} }
func (b *Broker) StartDispatcher() { func (b *Broker) StartDispatcher() {
for i := 0; i < MessagePoolNum; i++ { for {
go func(idx int) { msg, ok := <-b.messagePool
for { if !ok {
msg, ok := <-b.messagePool[idx] brokerLog.Error("read message from client channel error")
if !ok { return
brokerLog.Error("read message from client channel error") }
return b.wpool.Submit(func() {
} ProcessMessage(msg)
b.wpool.Submit(func() { })
ProcessMessage(msg)
})
}
}(i)
} }
} }
@@ -185,10 +165,9 @@ func (b *Broker) StartWebsocketListening() {
func (b *Broker) wsHandler(ws *websocket.Conn) { func (b *Broker) wsHandler(ws *websocket.Conn) {
// io.Copy(ws, ws) // io.Copy(ws, ws)
atomic.AddUint64(&b.cid, 1)
ws.PayloadType = websocket.BinaryFrame ws.PayloadType = websocket.BinaryFrame
b.handleConnection(CLIENT, ws, b.cid)
idx := atomic.AddUint64(&b.cid, 1)
b.handleConnection(CLIENT, ws, idx)
} }
func (b *Broker) StartClientListening(Tls bool) { func (b *Broker) StartClientListening(Tls bool) {
@@ -226,8 +205,8 @@ func (b *Broker) StartClientListening(Tls bool) {
continue continue
} }
tmpDelay = ACCEPT_MIN_SLEEP tmpDelay = ACCEPT_MIN_SLEEP
idx := atomic.AddUint64(&b.cid, 1) atomic.AddUint64(&b.cid, 1)
go b.handleConnection(CLIENT, conn, idx) go b.handleConnection(CLIENT, conn, b.cid)
} }
} }
@@ -271,6 +250,7 @@ func (b *Broker) StartClusterListening() {
return return
} }
var idx uint64 = 0
tmpDelay := 10 * ACCEPT_MIN_SLEEP tmpDelay := 10 * ACCEPT_MIN_SLEEP
for { for {
conn, err := l.Accept() conn, err := l.Accept()
@@ -289,7 +269,7 @@ func (b *Broker) StartClusterListening() {
continue continue
} }
tmpDelay = ACCEPT_MIN_SLEEP tmpDelay = ACCEPT_MIN_SLEEP
idx := atomic.AddUint64(&b.cid, 1)
go b.handleConnection(ROUTER, conn, idx) go b.handleConnection(ROUTER, conn, idx)
} }
} }
@@ -374,9 +354,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
b.routes.Store(cid, c) b.routes.Store(cid, c)
} }
mpool := b.messagePool[idx%MessagePoolNum] c.readLoop(b.messagePool)
c.readLoop(mpool)
} }
func (b *Broker) ConnectToDiscovery() { func (b *Broker) ConnectToDiscovery() {
@@ -423,13 +401,13 @@ func (b *Broker) ConnectToDiscovery() {
c.SendConnect() c.SendConnect()
c.SendInfo() c.SendInfo()
go c.readLoop(b.clusterChannel) go c.readLoop(b.clusterPool)
go c.StartPing() go c.StartPing()
} }
func (b *Broker) processClusterInfo() { func (b *Broker) processClusterInfo() {
for { for {
msg, ok := <-b.clusterChannel msg, ok := <-b.clusterPool
if !ok { if !ok {
brokerLog.Error("read message from cluster channel error") brokerLog.Error("read message from cluster channel error")
return return
@@ -499,10 +477,7 @@ func (b *Broker) connectRouter(id, addr string) {
c.SendConnect() c.SendConnect()
idx := atomic.AddUint64(&b.cid, 1) go c.readLoop(b.messagePool)
mpool := b.messagePool[idx%MessagePoolNum]
go c.readLoop(mpool)
go c.StartPing() go c.StartPing()
} }

View File

@@ -10,7 +10,9 @@ import (
"flag" "flag"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os"
"github.com/fhmq/hmq/logger"
"go.uber.org/zap" "go.uber.org/zap"
) )
@@ -28,6 +30,7 @@ type Config struct {
TlsInfo TLSInfo `json:"tlsInfo"` TlsInfo TLSInfo `json:"tlsInfo"`
Acl bool `json:"acl"` Acl bool `json:"acl"`
AclConf string `json:"aclConf"` AclConf string `json:"aclConf"`
Debug bool `json:"-"`
} }
type RouteInfo struct { type RouteInfo struct {
@@ -49,30 +52,60 @@ var DefaultConfig *Config = &Config{
Acl: false, Acl: false,
} }
func ConfigureConfig() (*Config, error) { func showHelp() {
fmt.Printf("%s\n", usageStr)
os.Exit(0)
}
func ConfigureConfig(args []string) (*Config, error) {
config := &Config{} config := &Config{}
var ( var (
help bool
configFile string configFile string
) )
flag.IntVar(&config.Worker, "w", 1024, "worker num to process message, perfer (client num)/10.") fs := flag.NewFlagSet("hmq-broker", flag.ExitOnError)
flag.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.") fs.Usage = showHelp
flag.StringVar(&config.Port, "port", "1883", "Port to listen on.")
flag.StringVar(&config.Port, "p", "1883", "Port to listen on.") fs.BoolVar(&help, "h", false, "Show this message.")
flag.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on.") fs.BoolVar(&help, "help", false, "Show this message.")
flag.StringVar(&config.Host, "h", "0.0.0.0", "Network host to listen on.") fs.IntVar(&config.Worker, "w", 1024, "worker num to process message, perfer (client num)/10.")
flag.StringVar(&config.Cluster.Host, "cluster", "", "Cluster ip from which members can connect.") fs.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.")
flag.StringVar(&config.Cluster.Host, "cluster_listen", "", "Cluster ip from which members can connect.") fs.StringVar(&config.Port, "port", "1883", "Port to listen on.")
flag.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.") fs.StringVar(&config.Port, "p", "1883", "Port to listen on.")
flag.StringVar(&config.Cluster.Port, "cluster_port", "", "Cluster port from which members can connect.") fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on")
flag.StringVar(&config.Router, "r", "", "Router who maintenance cluster info") fs.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.")
flag.StringVar(&config.Router, "router", "", "Router who maintenance cluster info") fs.StringVar(&config.Cluster.Port, "clusterport", "", "Cluster port from which members can connect.")
flag.StringVar(&config.WsPort, "wsport", "", "port for ws to listen on") fs.StringVar(&config.Router, "r", "", "Router who maintenance cluster info")
flag.StringVar(&config.WsPort, "ws_port", "", "port for ws to listen on") fs.StringVar(&config.Router, "router", "", "Router who maintenance cluster info")
flag.StringVar(&config.WsPath, "wspath", "", "path for ws to listen on") fs.StringVar(&config.WsPort, "ws", "", "port for ws to listen on")
flag.StringVar(&config.WsPath, "ws_path", "", "path for ws to listen on") fs.StringVar(&config.WsPort, "wsport", "", "port for ws to listen on")
flag.StringVar(&configFile, "config", "", "config file for hmq") fs.StringVar(&config.WsPath, "wsp", "", "path for ws to listen on")
flag.StringVar(&configFile, "c", "", "config file for hmq") fs.StringVar(&config.WsPath, "wspath", "", "path for ws to listen on")
flag.Parse() fs.StringVar(&configFile, "config", "", "config file for hmq")
fs.StringVar(&configFile, "c", "", "config file for hmq")
fs.BoolVar(&config.Debug, "debug", false, "enable Debug logging.")
fs.BoolVar(&config.Debug, "d", false, "enable Debug logging.")
fs.Bool("D", true, "enable Debug logging.")
if err := fs.Parse(args); err != nil {
return nil, err
}
if help {
showHelp()
return nil, nil
}
fs.Visit(func(f *flag.Flag) {
switch f.Name {
case "D":
config.Debug = true
}
})
logger.InitLogger(config.Debug)
brokerLog = logger.Get().Named("Broker")
if configFile != "" { if configFile != "" {
tmpConfig, e := LoadConfig(configFile) tmpConfig, e := LoadConfig(configFile)

24
broker/usage.go Normal file
View File

@@ -0,0 +1,24 @@
package broker
var usageStr = `
Usage: hmq [options]
Broker Options:
-w, --worker <number> Worker num to process message, perfer (client num)/10. (default 1024)
-p, --port <port> Use port for clients (default: 1883)
--host <host> Network host to listen on. (default "0.0.0.0")
-ws, --wsport <port> Use port for websocket monitoring
-wsp,--wspath <path> Use path for websocket monitoring
-c, --config <file> Configuration file
Logging Options:
-d, --debug <bool> Enable debugging output (default false)
-D Debug and trace
Cluster Options:
-r, --router <rurl> Router who maintenance cluster info
-cp, --clusterport <cluster-port> Cluster listen port for others
Common Options:
-h, --help Show this message
`

View File

@@ -9,7 +9,6 @@ import (
var ( var (
// env can be setup at build time with Go Linker. Value could be prod or whatever else for dev env // env can be setup at build time with Go Linker. Value could be prod or whatever else for dev env
env string
instance *zap.Logger instance *zap.Logger
logCfg zap.Config logCfg zap.Config
) )
@@ -28,13 +27,13 @@ func NewProdLogger() (*zap.Logger, error) {
return logCfg.Build() return logCfg.Build()
} }
func init() { func InitLogger(debug bool) {
var err error var err error
var log *zap.Logger var log *zap.Logger
if env == "prod" { if debug {
log, err = NewProdLogger()
} else {
log, err = NewDevLogger() log, err = NewDevLogger()
} else {
log, err = NewProdLogger()
} }
if err != nil { if err != nil {
panic("Unable to create a logger.") panic("Unable to create a logger.")

15
main.go
View File

@@ -7,36 +7,31 @@ copyright notice and this permission notice appear in all copies.
package main package main
import ( import (
"fmt"
"os" "os"
"os/signal" "os/signal"
"runtime" "runtime"
"github.com/fhmq/hmq/broker" "github.com/fhmq/hmq/broker"
"github.com/fhmq/hmq/logger"
"go.uber.org/zap"
)
var (
log = logger.Get().Named("Main")
) )
func main() { func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
config, err := broker.ConfigureConfig() config, err := broker.ConfigureConfig(os.Args[1:])
if err != nil { if err != nil {
log.Error("configure broker config error: ", zap.Error(err)) fmt.Println("configure broker config error: ", err)
return return
} }
b, err := broker.NewBroker(config) b, err := broker.NewBroker(config)
if err != nil { if err != nil {
log.Error("New Broker error: ", zap.Error(err)) fmt.Println("New Broker error: ", err)
return return
} }
b.Start() b.Start()
s := waitForSignal() s := waitForSignal()
log.Info("signal received, broker closed.", zap.Any("signal", s)) fmt.Println("signal received, broker closed.", s)
} }
func waitForSignal() os.Signal { func waitForSignal() os.Signal {