From 7bce53fa2b9b7d4f90dba79f2c39f471d9732230 Mon Sep 17 00:00:00 2001 From: "joy.zhou" Date: Wed, 10 Jul 2019 20:34:15 +0800 Subject: [PATCH] modify --- .vscode/settings.json | 8 +++ broker/auth.go | 71 +++-------------------- broker/broker.go | 72 ++++++++++++++++-------- broker/client.go | 43 ++++++++++++++ broker/config.go | 1 + conf/hmq.config | 3 +- go.mod | 1 + plugins/authhttp/authhttp.go | 93 ++++++++++++++++++++++++++++++ plugins/authhttp/conf.json | 5 ++ plugins/elements.go | 25 +++++++++ plugins/kafka/conf.json | 10 ++++ plugins/kafka/kafka.go | 106 +++++++++++++++++++++++++++++++++++ 12 files changed, 351 insertions(+), 87 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 plugins/authhttp/authhttp.go create mode 100644 plugins/authhttp/conf.json create mode 100644 plugins/elements.go create mode 100644 plugins/kafka/conf.json create mode 100644 plugins/kafka/kafka.go diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..b147d20 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,8 @@ +{ + "go.lintFlags": [ + "--disable=all", + "--enable=errcheck,varcheck,deadcode", + "--enable=varcheck", + "--enable=deadcode" + ] +} \ No newline at end of file diff --git a/broker/auth.go b/broker/auth.go index a4b5d36..045e1d4 100644 --- a/broker/auth.go +++ b/broker/auth.go @@ -3,10 +3,7 @@ package broker import ( - "github.com/fhmq/hmq/lib/acl" - "github.com/fsnotify/fsnotify" - "go.uber.org/zap" - "strings" + "github.com/fhmq/hmq/plugins/authhttp" ) const ( @@ -15,67 +12,17 @@ const ( ) func (c *client) CheckTopicAuth(typ int, topic string) bool { - if c.typ != CLIENT || !c.broker.config.Acl { + if c.typ != CLIENT || !c.broker.pluginAuthHTTP { return true } - if strings.HasPrefix(topic, "$queue/") { - topic = string([]byte(topic)[7:]) - if topic == "" { - return false - } + access := "sub" + switch typ { + case 1: + access = "pub" + case 2: + access = "sub" } - ip := c.info.remoteIP username := string(c.info.username) - clientid := string(c.info.clientID) - aclInfo := c.broker.AclConfig - return acl.CheckTopicAuth(aclInfo, typ, ip, username, clientid, topic) + return authhttp.CheckACL(username, access, topic) } - -var ( - watchList = []string{"./conf"} -) - -func (b *Broker) handleFsEvent(event fsnotify.Event) error { - switch event.Name { - case b.config.AclConf: - if event.Op&fsnotify.Write == fsnotify.Write || - event.Op&fsnotify.Create == fsnotify.Create { - log.Info("text:handling acl config change event:", zap.String("filename", event.Name)) - aclconfig, err := acl.AclConfigLoad(event.Name) - if err != nil { - log.Error("aclconfig change failed, load acl conf error: ", zap.Error(err)) - return err - } - b.AclConfig = aclconfig - } - } - return nil -} - -func (b *Broker) StartAclWatcher() { - go func() { - wch, e := fsnotify.NewWatcher() - if e != nil { - log.Error("start monitor acl config file error,", zap.Error(e)) - return - } - defer wch.Close() - - for _, i := range watchList { - if err := wch.Add(i); err != nil { - log.Error("start monitor acl config file error,", zap.Error(err)) - return - } - } - log.Info("watching acl config file change...") - for { - select { - case evt := <-wch.Events: - b.handleFsEvent(evt) - case err := <-wch.Errors: - log.Error("error:", zap.Error(err)) - } - } - }() -} diff --git a/broker/broker.go b/broker/broker.go index 74fa818..eebe72a 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -13,10 +13,13 @@ import ( "sync/atomic" "time" + "github.com/fhmq/hmq/plugins" + "github.com/eclipse/paho.mqtt.golang/packets" - "github.com/fhmq/hmq/lib/acl" "github.com/fhmq/hmq/lib/sessions" "github.com/fhmq/hmq/lib/topics" + "github.com/fhmq/hmq/plugins/authhttp" + "github.com/fhmq/hmq/plugins/kafka" "github.com/fhmq/hmq/pool" "github.com/shirou/gopsutil/mem" "go.uber.org/zap" @@ -34,21 +37,23 @@ type Message struct { } type Broker struct { - id string - cid uint64 - mu sync.Mutex - config *Config - tlsConfig *tls.Config - AclConfig *acl.ACLConfig - wpool *pool.WorkerPool - clients sync.Map - routes sync.Map - remotes sync.Map - nodes map[string]interface{} - clusterPool chan *Message - queues map[string]int - topicsMgr *topics.Manager - sessionMgr *sessions.Manager + id string + cid uint64 + mu sync.Mutex + config *Config + tlsConfig *tls.Config + wpool *pool.WorkerPool + clients sync.Map + routes sync.Map + remotes sync.Map + nodes map[string]interface{} + clusterPool chan *Message + queues map[string]int + topicsMgr *topics.Manager + sessionMgr *sessions.Manager + pluginAuthHTTP bool + pluginKafka bool + // messagePool []chan *Message } @@ -92,15 +97,18 @@ func NewBroker(config *Config) (*Broker, error) { } b.tlsConfig = tlsconfig } - if b.config.Acl { - aclconfig, err := acl.AclConfigLoad(b.config.AclConf) - if err != nil { - log.Error("Load acl conf error", zap.Error(err)) - return nil, err + + for _, plugin := range b.config.Plugins { + switch plugin { + case authhttp.AuthHTTP: + authhttp.Init() + b.pluginAuthHTTP = true + case kafka.Kafka: + kafka.Init() + b.pluginKafka = true } - b.AclConfig = aclconfig - b.StartAclWatcher() } + return b, nil } @@ -328,12 +336,30 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) { connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket) connack.ReturnCode = packets.Accepted connack.SessionPresent = msg.CleanSession + + if b.pluginAuthHTTP == true && authhttp.CheckAuth(string(msg.ClientIdentifier), string(msg.Username), string(msg.Password)) { + err = connack.Write(conn) + if err != nil { + log.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier)) + return + } + } + err = connack.Write(conn) if err != nil { log.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier)) return } + if b.pluginKafka && typ == CLIENT { + kafka.Publish(&plugins.Elements{ + ClientID: string(msg.ClientIdentifier), + Username: string(msg.Username), + Action: plugins.Connect, + Timestamp: time.Now().Unix(), + }) + } + willmsg := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket) if msg.WillFlag { willmsg.Qos = msg.WillQos diff --git a/broker/client.go b/broker/client.go index da1af9b..35f36bf 100644 --- a/broker/client.go +++ b/broker/client.go @@ -11,6 +11,9 @@ import ( "sync" "time" + "github.com/fhmq/hmq/plugins" + "github.com/fhmq/hmq/plugins/kafka" + "github.com/eclipse/paho.mqtt.golang/packets" "github.com/fhmq/hmq/lib/sessions" "github.com/fhmq/hmq/lib/topics" @@ -176,6 +179,17 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) { return } + if c.broker.pluginKafka && c.typ == CLIENT { + kafka.Publish(&plugins.Elements{ + ClientID: c.info.clientID, + Username: c.info.username, + Action: plugins.Publish, + Timestamp: time.Now().Unix(), + Payload: string(packet.Payload), + Topic: topic, + }) + } + switch packet.Qos { case QosAtMostOnce: c.ProcessPublishMessage(packet) @@ -266,6 +280,16 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { continue } + if c.broker.pluginKafka && c.typ == CLIENT { + kafka.Publish(&plugins.Elements{ + ClientID: c.info.clientID, + Username: c.info.username, + Action: plugins.Subscribe, + Timestamp: time.Now().Unix(), + Topic: topic, + }) + } + sub := &subscription{ topic: t, qos: qoss[i], @@ -324,6 +348,16 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) { c.session.RemoveTopic(topic) delete(c.subMap, topic) } + + if c.broker.pluginKafka && c.typ == CLIENT { + kafka.Publish(&plugins.Elements{ + ClientID: c.info.clientID, + Username: c.info.username, + Action: plugins.Unsubscribe, + Timestamp: time.Now().Unix(), + Topic: topic, + }) + } } unsuback := packets.NewControlPacket(packets.Unsuback).(*packets.UnsubackPacket) @@ -364,6 +398,15 @@ func (c *client) Close() { // time.Sleep(1 * time.Second) // c.status = Disconnected + if c.broker.pluginKafka && c.typ == CLIENT { + kafka.Publish(&plugins.Elements{ + ClientID: c.info.clientID, + Username: c.info.username, + Action: plugins.Disconnect, + Timestamp: time.Now().Unix(), + }) + } + if c.conn != nil { c.conn.Close() c.conn = nil diff --git a/broker/config.go b/broker/config.go index a567a20..b5b0f59 100644 --- a/broker/config.go +++ b/broker/config.go @@ -31,6 +31,7 @@ type Config struct { Acl bool `json:"acl"` AclConf string `json:"aclConf"` Debug bool `json:"-"` + Plugins []string `json:"plugins"` } type RouteInfo struct { diff --git a/conf/hmq.config b/conf/hmq.config index 6c8711c..72792a6 100644 --- a/conf/hmq.config +++ b/conf/hmq.config @@ -18,6 +18,5 @@ "certFile": "ssl/server/cert.pem", "keyFile": "ssl/server/key.pem" }, - "acl": false, - "aclConf": "conf/acl.conf" + "plugins": ["authhttp","kafka"] } diff --git a/go.mod b/go.mod index d5edef6..8788e3f 100644 --- a/go.mod +++ b/go.mod @@ -19,4 +19,5 @@ require ( golang.org/x/net v0.0.0-20190424024845-afe8014c977f golang.org/x/sys v0.0.0-20190422165155-953cdadca894 // indirect golang.org/x/tools v0.0.0-20190424031103-cb2dda6eabdf // indirect + github.com/Shopify/sarama v1.23.0 ) diff --git a/plugins/authhttp/authhttp.go b/plugins/authhttp/authhttp.go new file mode 100644 index 0000000..4252d2e --- /dev/null +++ b/plugins/authhttp/authhttp.go @@ -0,0 +1,93 @@ +package authhttp + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" + + "github.com/fhmq/hmq/logger" + "go.uber.org/zap" +) + +const ( + //AuthHTTP plugin name + AuthHTTP = "authhttp" +) + +var ( + config Config + log = logger.Get().Named("http") +) + +//Config device kafka config +type Config struct { + AuthURL string `json:"auth"` + ACLURL string `json:"onSubscribe"` + SuperURL string `json:"onPublish"` +} + +//Init init kafak client +func Init() { + content, err := ioutil.ReadFile("../../plugins/kafka/conf.json") + if err != nil { + log.Fatal("Read config file error: ", zap.Error(err)) + } + // log.Info(string(content)) + + err = json.Unmarshal(content, &config) + if err != nil { + log.Fatal("Unmarshal config file error: ", zap.Error(err)) + } + +} + +//CheckAuth check mqtt connect +func CheckAuth(clientID, username, password string) bool { + payload := fmt.Sprintf("username=%s&password=%s&clientid=%s", username, password, clientID) + resp, err := http.Post(config.AuthURL, + "application/x-www-form-urlencoded", + strings.NewReader(payload)) + if err != nil { + log.Error("request acl: ", zap.Error(err)) + } + + defer resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return true + } + return false +} + +//CheckSuper check mqtt connect +func CheckSuper(clientID, username, password string) bool { + payload := fmt.Sprintf("username=%s&password=%s&clientid=%s", username, password, clientID) + resp, err := http.Post(config.SuperURL, + "application/x-www-form-urlencoded", + strings.NewReader(payload)) + if err != nil { + log.Error("request acl: ", zap.Error(err)) + } + + defer resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return true + } + return false +} + +//CheckACL check mqtt connect +func CheckACL(username, access, topic string) bool { + url := fmt.Sprintf(config.ACLURL+"?username=%s&access=%s&topic=%s", username, access, topic) + resp, err := http.Get(url) + if err != nil { + // handle error + } + + defer resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return true + } + return false +} diff --git a/plugins/authhttp/conf.json b/plugins/authhttp/conf.json new file mode 100644 index 0000000..e0bdae7 --- /dev/null +++ b/plugins/authhttp/conf.json @@ -0,0 +1,5 @@ +{ + "auth": "http://127.0.0.1:9090/mqtt/auth", + "acl": "http://127.0.0.1:9090/mqtt/acl", + "super": "http://127.0.0.1:9090/mqtt/superuser" +} \ No newline at end of file diff --git a/plugins/elements.go b/plugins/elements.go new file mode 100644 index 0000000..a521ff4 --- /dev/null +++ b/plugins/elements.go @@ -0,0 +1,25 @@ +package plugins + +const ( + //Connect mqtt connect + Connect = "connect" + //Publish mqtt publish + Publish = "publish" + //Subscribe mqtt sub + Subscribe = "subscribe" + //Unsubscribe mqtt sub + Unsubscribe = "unsubscribe" + //Disconnect mqtt disconenct + Disconnect = "disconnect" +) + +//Elements kafka publish elements +type Elements struct { + ClientID string `json:"clientid"` + Username string `json:"username"` + Topic string `json:"topic"` + Payload string `json:"payload"` + Timestamp int64 `json:"ts"` + Size int32 `json:"size"` + Action string `json:"action"` +} diff --git a/plugins/kafka/conf.json b/plugins/kafka/conf.json new file mode 100644 index 0000000..d6a7dc3 --- /dev/null +++ b/plugins/kafka/conf.json @@ -0,0 +1,10 @@ +{ + "addr": [ + "127.0.0.1:9090" + ], + "onConnect": "onConnect", + "onPublish": "onPublish", + "onSubscribe": "onSubscribe", + "onDisconnect": "onDisconnect", + "onUnsubscribe": "onUnsubscribe" +} \ No newline at end of file diff --git a/plugins/kafka/kafka.go b/plugins/kafka/kafka.go new file mode 100644 index 0000000..7539456 --- /dev/null +++ b/plugins/kafka/kafka.go @@ -0,0 +1,106 @@ +package kafka + +import ( + "encoding/json" + "errors" + "io/ioutil" + "time" + + "github.com/Shopify/sarama" + "github.com/fhmq/hmq/logger" + "github.com/fhmq/hmq/plugins" + "go.uber.org/zap" +) + +const ( + //Kafka plugin name + Kafka = "kafka" +) + +var ( + kafkaClient sarama.AsyncProducer + config Config + log = logger.Get().Named("kafka") +) + +//Config device kafka config +type Config struct { + Addr []string `json:"addr"` + ConnectTopic string `json:"onConnect"` + SubscribeTopic string `json:"onSubscribe"` + PublishTopic string `json:"onPublish"` + UnsubscribeTopic string `json:"onUnsubscribe"` + DisconnectTopic string `json:"onDisconnect"` +} + +//Init init kafak client +func Init() { + content, err := ioutil.ReadFile("../../plugins/kafka/conf.json") + if err != nil { + log.Fatal("Read config file error: ", zap.Error(err)) + } + // log.Info(string(content)) + + err = json.Unmarshal(content, &config) + if err != nil { + log.Fatal("Unmarshal config file error: ", zap.Error(err)) + } + +} + +//connect +func connect() { + var err error + conf := sarama.NewConfig() + kafkaClient, err = sarama.NewAsyncProducer(config.Addr, conf) + if err != nil { + log.Fatal("create kafka async producer failed: ", zap.Error(err)) + } + + go func() { + for err := range kafkaClient.Errors() { + log.Error("send msg to kafka failed: ", zap.Error(err)) + } + }() +} + +//Publish publish to kafka +func Publish(e *plugins.Elements) { + topic, key := "", "" + switch e.Action { + case plugins.Connect: + topic = config.ConnectTopic + case plugins.Publish: + topic = config.PublishTopic + case plugins.Subscribe: + topic = config.SubscribeTopic + case plugins.Unsubscribe: + topic = config.UnsubscribeTopic + case plugins.Disconnect: + topic = config.DisconnectTopic + default: + log.Error("error action: ", zap.String("action", e.Action)) + return + } + key = e.Username + err := publish(topic, key, e) + if err != nil { + log.Error("publish kafka error: ", zap.Error(err)) + } +} + +func publish(topic, key string, msg *plugins.Elements) error { + payload, err := json.Marshal(msg) + if err != nil { + return err + } + select { + case kafkaClient.Input() <- &sarama.ProducerMessage{ + Topic: topic, + Key: sarama.ByteEncoder(key), + Value: sarama.ByteEncoder(payload)}: + return nil + case <-time.After(1 * time.Minute): + return errors.New("send to kafka time out") + } +}