From a3fc611615713f643bdf03625ef1070c5da35d89 Mon Sep 17 00:00:00 2001 From: "joy.zhou" Date: Mon, 30 Sep 2019 11:04:46 +0800 Subject: [PATCH] fix issue --- Dockerfile | 1 - broker/auth.go | 3 --- broker/broker.go | 4 +++- broker/config.go | 29 ++++++++++++++++------------- broker/lib/topics/memtopics.go | 3 +++ conf/hmq.config | 1 + plugins/bridge/kafka.go | 9 ++++++++- 7 files changed, 31 insertions(+), 19 deletions(-) diff --git a/Dockerfile b/Dockerfile index fa67a21..5e45340 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,6 @@ FROM golang:1.12 as builder WORKDIR /go/src/github.com/fhmq/hmq COPY . . -COPY ./vendor . RUN CGO_ENABLED=0 go build -o hmq -a -ldflags '-extldflags "-static"' . diff --git a/broker/auth.go b/broker/auth.go index e87210a..bd7161f 100644 --- a/broker/auth.go +++ b/broker/auth.go @@ -34,9 +34,6 @@ func (b *Broker) CheckTopicAuth(action, username, topic string) bool { func (b *Broker) CheckConnectAuth(clientID, username, password string) bool { if b.auth != nil { - if clientID == "" || username == "" { - return false - } return b.auth.CheckConnect(clientID, username, password) } diff --git a/broker/broker.go b/broker/broker.go index 6ef80ec..f4fef82 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -123,7 +123,9 @@ func (b *Broker) Start() { return } - go InitHTTPMoniter(b) + if b.config.HTTPPort != "" { + go InitHTTPMoniter(b) + } //listen clinet over tcp if b.config.Port != "" { diff --git a/broker/config.go b/broker/config.go index 41e23cf..414f560 100644 --- a/broker/config.go +++ b/broker/config.go @@ -17,19 +17,20 @@ import ( ) type Config struct { - Worker int `json:"workerNum"` - Host string `json:"host"` - Port string `json:"port"` - Cluster RouteInfo `json:"cluster"` - Router string `json:"router"` - TlsHost string `json:"tlsHost"` - TlsPort string `json:"tlsPort"` - WsPath string `json:"wsPath"` - WsPort string `json:"wsPort"` - WsTLS bool `json:"wsTLS"` - TlsInfo TLSInfo `json:"tlsInfo"` - Debug bool `json:"debug"` - Plugin Plugins `json:"plugins"` + Worker int `json:"workerNum"` + HTTPPort string `json:"httpPort"` + Host string `json:"host"` + Port string `json:"port"` + Cluster RouteInfo `json:"cluster"` + Router string `json:"router"` + TlsHost string `json:"tlsHost"` + TlsPort string `json:"tlsPort"` + WsPath string `json:"wsPath"` + WsPort string `json:"wsPort"` + WsTLS bool `json:"wsTLS"` + TlsInfo TLSInfo `json:"tlsInfo"` + Debug bool `json:"debug"` + Plugin Plugins `json:"plugins"` } type Plugins struct { @@ -77,6 +78,8 @@ func ConfigureConfig(args []string) (*Config, error) { fs.BoolVar(&help, "help", false, "Show this message.") fs.IntVar(&config.Worker, "w", 1024, "worker num to process message, perfer (client num)/10.") fs.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.") + fs.StringVar(&config.HTTPPort, "httpport", "8080", "Port to listen on.") + fs.StringVar(&config.HTTPPort, "hp", "8080", "Port to listen on.") fs.StringVar(&config.Port, "port", "1883", "Port to listen on.") fs.StringVar(&config.Port, "p", "1883", "Port to listen on.") fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on") diff --git a/broker/lib/topics/memtopics.go b/broker/lib/topics/memtopics.go index eb81345..47e958a 100644 --- a/broker/lib/topics/memtopics.go +++ b/broker/lib/topics/memtopics.go @@ -244,6 +244,9 @@ func (this *snode) smatch(topic []byte, qos byte, subs *[]interface{}, qoss *[]b // let's find the subscribers that match the qos and append them to the list. if len(topic) == 0 { this.matchQos(qos, subs, qoss) + if mwcn, _ := this.snodes[MWC]; mwcn != nil { + mwcn.matchQos(qos, subs, qoss) + } return nil } diff --git a/conf/hmq.config b/conf/hmq.config index d4d62c2..f50ca0c 100644 --- a/conf/hmq.config +++ b/conf/hmq.config @@ -7,6 +7,7 @@ "host": "0.0.0.0", "port": "1993" }, + "httpPort": "8080", "router": "127.0.0.1:9888", "tlsPort": "8883", "tlsHost": "0.0.0.0", diff --git a/plugins/bridge/kafka.go b/plugins/bridge/kafka.go index 54a5fa2..6f7c87d 100644 --- a/plugins/bridge/kafka.go +++ b/plugins/bridge/kafka.go @@ -5,6 +5,7 @@ import ( "errors" "io/ioutil" "strings" + "time" "github.com/Shopify/sarama" "go.uber.org/zap" @@ -109,11 +110,17 @@ func (k *kafka) publish(topics map[string]bool, key string, msg *Elements) error } for topic, _ := range topics { - k.kafkaClient.Input() <- &sarama.ProducerMessage{ + select { + case k.kafkaClient.Input() <- &sarama.ProducerMessage{ Topic: topic, Key: sarama.ByteEncoder(key), Value: sarama.ByteEncoder(payload), + }: + continue + case <-time.After(5 * time.Second): + return errors.New("write kafka timeout") } + } return nil