From 148738800ba7b410ecc9e2db88441ae8ee97b515 Mon Sep 17 00:00:00 2001 From: "joy.zhou" Date: Fri, 16 Aug 2019 18:18:19 +0800 Subject: [PATCH] replace plugin --- plugins/auth/auth.go | 23 +++++ plugins/{ => auth}/authhttp/authhttp.go | 105 +++++++++++----------- plugins/{ => auth}/authhttp/cache.go | 0 plugins/{ => auth}/authhttp/http.json | 0 plugins/auth/mock.go | 11 +++ plugins/{elements.go => bridge/bridge.go} | 26 +++++- plugins/{kafka => bridge}/kafka.go | 66 ++++++-------- plugins/{ => bridge}/kafka/kafka.json | 0 plugins/bridge/mock.go | 7 ++ 9 files changed, 147 insertions(+), 91 deletions(-) create mode 100644 plugins/auth/auth.go rename plugins/{ => auth}/authhttp/authhttp.go (62%) rename plugins/{ => auth}/authhttp/cache.go (100%) rename plugins/{ => auth}/authhttp/http.json (100%) create mode 100644 plugins/auth/mock.go rename plugins/{elements.go => bridge/bridge.go} (62%) rename plugins/{kafka => bridge}/kafka.go (69%) rename plugins/{ => bridge}/kafka/kafka.json (100%) create mode 100644 plugins/bridge/mock.go diff --git a/plugins/auth/auth.go b/plugins/auth/auth.go new file mode 100644 index 0000000..7ddeab4 --- /dev/null +++ b/plugins/auth/auth.go @@ -0,0 +1,23 @@ +package auth + +import ( + "github.com/fhmq/rhmq/plugins/auth/authhttp" +) + +const ( + AuthHTTP = "authhttp" +) + +type Auth interface { + CheckACL(action, username, topic string) bool + CheckConnect(clientID, username, password string) bool +} + +func NewAuth(name string) Auth { + switch name { + case AuthHTTP: + return authhttp.Init() + default: + return &mockAuth{} + } +} diff --git a/plugins/authhttp/authhttp.go b/plugins/auth/authhttp/authhttp.go similarity index 62% rename from plugins/authhttp/authhttp.go rename to plugins/auth/authhttp/authhttp.go index a125155..33b52d5 100644 --- a/plugins/authhttp/authhttp.go +++ b/plugins/auth/authhttp/authhttp.go @@ -10,21 +10,10 @@ import ( "strings" "time" - "github.com/fhmq/hmq/logger" + "github.com/fhmq/rhmq/logger" "go.uber.org/zap" ) -const ( - //AuthHTTP plugin name - AuthHTTP = "authhttp" -) - -var ( - config Config - log = logger.Get().Named("http") - httpClient *http.Client -) - //Config device kafka config type Config struct { AuthURL string `json:"auth"` @@ -32,9 +21,19 @@ type Config struct { SuperURL string `json:"super"` } +type authHTTP struct { + client *http.Client +} + +var ( + config Config + log = logger.Get().Named("authhttp") + httpClient *http.Client +) + //Init init kafak client -func Init() { - content, err := ioutil.ReadFile("./plugins/authhttp/http.json") +func Init() *authHTTP { + content, err := ioutil.ReadFile("./plugins/auth/authhttp/http.json") if err != nil { log.Fatal("Read config file error: ", zap.Error(err)) } @@ -54,11 +53,11 @@ func Init() { }, Timeout: time.Second * 100, } - + return &authHTTP{client: httpClient} } //CheckAuth check mqtt connect -func CheckAuth(clientID, username, password string) bool { +func (a *authHTTP) CheckConnect(clientID, username, password string) bool { action := "connect" { aCache := checkCache(action, clientID, username, password, "") @@ -82,7 +81,7 @@ func CheckAuth(clientID, username, password string) bool { req.Header.Add("Content-Type", "application/x-www-form-urlencoded") req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) - resp, err := httpClient.Do(req) + resp, err := a.client.Do(req) if err != nil { log.Error("request super: ", zap.Error(err)) return false @@ -98,48 +97,48 @@ func CheckAuth(clientID, username, password string) bool { return false } -//CheckSuper check mqtt connect -func CheckSuper(clientID, username, password string) bool { - action := "connect" - { - aCache := checkCache(action, clientID, username, password, "") - if aCache != nil { - if aCache.password == password && aCache.username == username && aCache.action == action { - return true - } - } - } +// //CheckSuper check mqtt connect +// func CheckSuper(clientID, username, password string) bool { +// action := "connect" +// { +// aCache := checkCache(action, clientID, username, password, "") +// if aCache != nil { +// if aCache.password == password && aCache.username == username && aCache.action == action { +// return true +// } +// } +// } - data := url.Values{} - data.Add("username", username) - data.Add("clientid", clientID) - data.Add("password", password) +// data := url.Values{} +// data.Add("username", username) +// data.Add("clientid", clientID) +// data.Add("password", password) - req, err := http.NewRequest("POST", config.SuperURL, strings.NewReader(data.Encode())) - if err != nil { - log.Error("new request super: ", zap.Error(err)) - return false - } - req.Header.Add("Content-Type", "application/x-www-form-urlencoded") - req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) +// req, err := http.NewRequest("POST", config.SuperURL, strings.NewReader(data.Encode())) +// if err != nil { +// log.Error("new request super: ", zap.Error(err)) +// return false +// } +// req.Header.Add("Content-Type", "application/x-www-form-urlencoded") +// req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) - resp, err := httpClient.Do(req) - if err != nil { - log.Error("request super: ", zap.Error(err)) - return false - } +// resp, err := httpClient.Do(req) +// if err != nil { +// log.Error("request super: ", zap.Error(err)) +// return false +// } - defer resp.Body.Close() - io.Copy(ioutil.Discard, resp.Body) +// defer resp.Body.Close() +// io.Copy(ioutil.Discard, resp.Body) - if resp.StatusCode == http.StatusOK { - return true - } - return false -} +// if resp.StatusCode == http.StatusOK { +// return true +// } +// return false +// } //CheckACL check mqtt connect -func CheckACL(username, access, topic string) bool { +func (a *authHTTP) CheckACL(username, access, topic string) bool { action := access { aCache := checkCache(action, "", username, "", topic) @@ -163,7 +162,7 @@ func CheckACL(username, access, topic string) bool { data.Add("access", access) req.URL.RawQuery = data.Encode() // fmt.Println("req:", req) - resp, err := httpClient.Do(req) + resp, err := a.client.Do(req) if err != nil { log.Error("request acl: ", zap.Error(err)) return false diff --git a/plugins/authhttp/cache.go b/plugins/auth/authhttp/cache.go similarity index 100% rename from plugins/authhttp/cache.go rename to plugins/auth/authhttp/cache.go diff --git a/plugins/authhttp/http.json b/plugins/auth/authhttp/http.json similarity index 100% rename from plugins/authhttp/http.json rename to plugins/auth/authhttp/http.json diff --git a/plugins/auth/mock.go b/plugins/auth/mock.go new file mode 100644 index 0000000..ed002fd --- /dev/null +++ b/plugins/auth/mock.go @@ -0,0 +1,11 @@ +package auth + +type mockAuth struct{} + +func (m *mockAuth) CheckACL(action, username, topic string) bool { + return true +} + +func (m *mockAuth) CheckConnect(clientID, username, password string) bool { + return true +} diff --git a/plugins/elements.go b/plugins/bridge/bridge.go similarity index 62% rename from plugins/elements.go rename to plugins/bridge/bridge.go index a521ff4..9343b93 100644 --- a/plugins/elements.go +++ b/plugins/bridge/bridge.go @@ -1,4 +1,6 @@ -package plugins +package bridge + +import "github.com/fhmq/rhmq/logger" const ( //Connect mqtt connect @@ -13,6 +15,10 @@ const ( Disconnect = "disconnect" ) +var ( + log = logger.Get().Named("bridge") +) + //Elements kafka publish elements type Elements struct { ClientID string `json:"clientid"` @@ -23,3 +29,21 @@ type Elements struct { Size int32 `json:"size"` Action string `json:"action"` } + +const ( + //Kafka plugin name + Kafka = "kafka" +) + +type BridgeMQ interface { + Publish(e *Elements) error +} + +func NewBridgeMQ(name string) BridgeMQ { + switch name { + case Kafka: + return InitKafka() + default: + return &mockMQ{} + } +} diff --git a/plugins/kafka/kafka.go b/plugins/bridge/kafka.go similarity index 69% rename from plugins/kafka/kafka.go rename to plugins/bridge/kafka.go index f521db7..37e8024 100644 --- a/plugins/kafka/kafka.go +++ b/plugins/bridge/kafka.go @@ -1,29 +1,16 @@ -package kafka +package bridge import ( "encoding/json" + "errors" "io/ioutil" "regexp" "github.com/Shopify/sarama" - "github.com/fhmq/hmq/logger" - "github.com/fhmq/hmq/plugins" "go.uber.org/zap" ) -const ( - //Kafka plugin name - Kafka = "kafka" -) - -var ( - kafkaClient sarama.AsyncProducer - config Config - log = logger.Get().Named("kafka") -) - -//Config device kafka config -type Config struct { +type kafakConfig struct { Addr []string `json:"addr"` ConnectTopic string `json:"onConnect"` SubscribeTopic string `json:"onSubscribe"` @@ -33,28 +20,34 @@ type Config struct { RegexpMap map[string]string `json:"regexpMap"` } +type kafka struct { + kafakConfig kafakConfig + kafkaClient sarama.AsyncProducer +} + //Init init kafak client -func Init() { +func InitKafka() *kafka { log.Info("start connect kafka....") - content, err := ioutil.ReadFile("./plugins/kafka/kafka.json") + content, err := ioutil.ReadFile("./plugins/mq/kafka/kafka.json") if err != nil { log.Fatal("Read config file error: ", zap.Error(err)) } // log.Info(string(content)) - + var config kafakConfig err = json.Unmarshal(content, &config) if err != nil { log.Fatal("Unmarshal config file error: ", zap.Error(err)) } - connect() + c := &kafka{kafakConfig: config} + c.connect() + return c } //connect -func connect() { - var err error +func (k *kafka) connect() { conf := sarama.NewConfig() conf.Version = sarama.V1_1_1_0 - kafkaClient, err = sarama.NewAsyncProducer(config.Addr, conf) + kafkaClient, err := sarama.NewAsyncProducer(k.kafakConfig.Addr, conf) if err != nil { log.Fatal("create kafka async producer failed: ", zap.Error(err)) } @@ -64,18 +57,21 @@ func connect() { log.Error("send msg to kafka failed: ", zap.Error(err)) } }() + + k.kafkaClient = kafkaClient } //Publish publish to kafka -func Publish(e *plugins.Elements) { +func (k *kafka) Publish(e *Elements) error { + config := k.kafakConfig key := e.ClientID var topics []string switch e.Action { - case plugins.Connect: + case Connect: if config.ConnectTopic != "" { topics = append(topics, config.ConnectTopic) } - case plugins.Publish: + case Publish: if config.PublishTopic != "" { topics = append(topics, config.PublishTopic) } @@ -86,38 +82,34 @@ func Publish(e *plugins.Elements) { topics = append(topics, topic) } } - case plugins.Subscribe: + case Subscribe: if config.SubscribeTopic != "" { topics = append(topics, config.SubscribeTopic) } - case plugins.Unsubscribe: + case Unsubscribe: if config.UnsubscribeTopic != "" { topics = append(topics, config.UnsubscribeTopic) } - case plugins.Disconnect: + case Disconnect: if config.DisconnectTopic != "" { topics = append(topics, config.DisconnectTopic) } default: - log.Error("error action: ", zap.String("action", e.Action)) - return + return errors.New("error action: " + e.Action) } - err := publish(topics, key, e) - if err != nil { - log.Error("publish kafka error: ", zap.Error(err)) - } + return k.publish(topics, key, e) } -func publish(topics []string, key string, msg *plugins.Elements) error { +func (k *kafka) publish(topics []string, key string, msg *Elements) error { payload, err := json.Marshal(msg) if err != nil { return err } for _, topic := range topics { - kafkaClient.Input() <- &sarama.ProducerMessage{ + k.kafkaClient.Input() <- &sarama.ProducerMessage{ Topic: topic, Key: sarama.ByteEncoder(key), Value: sarama.ByteEncoder(payload), diff --git a/plugins/kafka/kafka.json b/plugins/bridge/kafka/kafka.json similarity index 100% rename from plugins/kafka/kafka.json rename to plugins/bridge/kafka/kafka.json diff --git a/plugins/bridge/mock.go b/plugins/bridge/mock.go new file mode 100644 index 0000000..8d097f7 --- /dev/null +++ b/plugins/bridge/mock.go @@ -0,0 +1,7 @@ +package bridge + +type mockMQ struct{} + +func (m *mockMQ) Publish(e *Elements) error { + return nil +}