fix issue

This commit is contained in:
joy.zhou
2019-09-30 11:04:46 +08:00
parent e74b9facd1
commit a3fc611615
7 changed files with 31 additions and 19 deletions

View File

@@ -1,7 +1,6 @@
FROM golang:1.12 as builder
WORKDIR /go/src/github.com/fhmq/hmq
COPY . .
COPY ./vendor .
RUN CGO_ENABLED=0 go build -o hmq -a -ldflags '-extldflags "-static"' .

View File

@@ -34,9 +34,6 @@ func (b *Broker) CheckTopicAuth(action, username, topic string) bool {
func (b *Broker) CheckConnectAuth(clientID, username, password string) bool {
if b.auth != nil {
if clientID == "" || username == "" {
return false
}
return b.auth.CheckConnect(clientID, username, password)
}

View File

@@ -123,7 +123,9 @@ func (b *Broker) Start() {
return
}
go InitHTTPMoniter(b)
if b.config.HTTPPort != "" {
go InitHTTPMoniter(b)
}
//listen clinet over tcp
if b.config.Port != "" {

View File

@@ -17,19 +17,20 @@ import (
)
type Config struct {
Worker int `json:"workerNum"`
Host string `json:"host"`
Port string `json:"port"`
Cluster RouteInfo `json:"cluster"`
Router string `json:"router"`
TlsHost string `json:"tlsHost"`
TlsPort string `json:"tlsPort"`
WsPath string `json:"wsPath"`
WsPort string `json:"wsPort"`
WsTLS bool `json:"wsTLS"`
TlsInfo TLSInfo `json:"tlsInfo"`
Debug bool `json:"debug"`
Plugin Plugins `json:"plugins"`
Worker int `json:"workerNum"`
HTTPPort string `json:"httpPort"`
Host string `json:"host"`
Port string `json:"port"`
Cluster RouteInfo `json:"cluster"`
Router string `json:"router"`
TlsHost string `json:"tlsHost"`
TlsPort string `json:"tlsPort"`
WsPath string `json:"wsPath"`
WsPort string `json:"wsPort"`
WsTLS bool `json:"wsTLS"`
TlsInfo TLSInfo `json:"tlsInfo"`
Debug bool `json:"debug"`
Plugin Plugins `json:"plugins"`
}
type Plugins struct {
@@ -77,6 +78,8 @@ func ConfigureConfig(args []string) (*Config, error) {
fs.BoolVar(&help, "help", false, "Show this message.")
fs.IntVar(&config.Worker, "w", 1024, "worker num to process message, perfer (client num)/10.")
fs.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.")
fs.StringVar(&config.HTTPPort, "httpport", "8080", "Port to listen on.")
fs.StringVar(&config.HTTPPort, "hp", "8080", "Port to listen on.")
fs.StringVar(&config.Port, "port", "1883", "Port to listen on.")
fs.StringVar(&config.Port, "p", "1883", "Port to listen on.")
fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on")

View File

@@ -244,6 +244,9 @@ func (this *snode) smatch(topic []byte, qos byte, subs *[]interface{}, qoss *[]b
// let's find the subscribers that match the qos and append them to the list.
if len(topic) == 0 {
this.matchQos(qos, subs, qoss)
if mwcn, _ := this.snodes[MWC]; mwcn != nil {
mwcn.matchQos(qos, subs, qoss)
}
return nil
}

View File

@@ -7,6 +7,7 @@
"host": "0.0.0.0",
"port": "1993"
},
"httpPort": "8080",
"router": "127.0.0.1:9888",
"tlsPort": "8883",
"tlsHost": "0.0.0.0",

View File

@@ -5,6 +5,7 @@ import (
"errors"
"io/ioutil"
"strings"
"time"
"github.com/Shopify/sarama"
"go.uber.org/zap"
@@ -109,11 +110,17 @@ func (k *kafka) publish(topics map[string]bool, key string, msg *Elements) error
}
for topic, _ := range topics {
k.kafkaClient.Input() <- &sarama.ProducerMessage{
select {
case k.kafkaClient.Input() <- &sarama.ProducerMessage{
Topic: topic,
Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(payload),
}:
continue
case <-time.After(5 * time.Second):
return errors.New("write kafka timeout")
}
}
return nil