replace plugin

This commit is contained in:
joy.zhou
2019-08-16 18:18:19 +08:00
parent e4e736d1e2
commit 148738800b
9 changed files with 147 additions and 91 deletions

23
plugins/auth/auth.go Normal file
View File

@@ -0,0 +1,23 @@
package auth
import (
"github.com/fhmq/rhmq/plugins/auth/authhttp"
)
const (
AuthHTTP = "authhttp"
)
type Auth interface {
CheckACL(action, username, topic string) bool
CheckConnect(clientID, username, password string) bool
}
func NewAuth(name string) Auth {
switch name {
case AuthHTTP:
return authhttp.Init()
default:
return &mockAuth{}
}
}

View File

@@ -10,21 +10,10 @@ import (
"strings" "strings"
"time" "time"
"github.com/fhmq/hmq/logger" "github.com/fhmq/rhmq/logger"
"go.uber.org/zap" "go.uber.org/zap"
) )
const (
//AuthHTTP plugin name
AuthHTTP = "authhttp"
)
var (
config Config
log = logger.Get().Named("http")
httpClient *http.Client
)
//Config device kafka config //Config device kafka config
type Config struct { type Config struct {
AuthURL string `json:"auth"` AuthURL string `json:"auth"`
@@ -32,9 +21,19 @@ type Config struct {
SuperURL string `json:"super"` SuperURL string `json:"super"`
} }
type authHTTP struct {
client *http.Client
}
var (
config Config
log = logger.Get().Named("authhttp")
httpClient *http.Client
)
//Init init kafak client //Init init kafak client
func Init() { func Init() *authHTTP {
content, err := ioutil.ReadFile("./plugins/authhttp/http.json") content, err := ioutil.ReadFile("./plugins/auth/authhttp/http.json")
if err != nil { if err != nil {
log.Fatal("Read config file error: ", zap.Error(err)) log.Fatal("Read config file error: ", zap.Error(err))
} }
@@ -54,11 +53,11 @@ func Init() {
}, },
Timeout: time.Second * 100, Timeout: time.Second * 100,
} }
return &authHTTP{client: httpClient}
} }
//CheckAuth check mqtt connect //CheckAuth check mqtt connect
func CheckAuth(clientID, username, password string) bool { func (a *authHTTP) CheckConnect(clientID, username, password string) bool {
action := "connect" action := "connect"
{ {
aCache := checkCache(action, clientID, username, password, "") aCache := checkCache(action, clientID, username, password, "")
@@ -82,7 +81,7 @@ func CheckAuth(clientID, username, password string) bool {
req.Header.Add("Content-Type", "application/x-www-form-urlencoded") req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode())))
resp, err := httpClient.Do(req) resp, err := a.client.Do(req)
if err != nil { if err != nil {
log.Error("request super: ", zap.Error(err)) log.Error("request super: ", zap.Error(err))
return false return false
@@ -98,48 +97,48 @@ func CheckAuth(clientID, username, password string) bool {
return false return false
} }
//CheckSuper check mqtt connect // //CheckSuper check mqtt connect
func CheckSuper(clientID, username, password string) bool { // func CheckSuper(clientID, username, password string) bool {
action := "connect" // action := "connect"
{ // {
aCache := checkCache(action, clientID, username, password, "") // aCache := checkCache(action, clientID, username, password, "")
if aCache != nil { // if aCache != nil {
if aCache.password == password && aCache.username == username && aCache.action == action { // if aCache.password == password && aCache.username == username && aCache.action == action {
return true // return true
} // }
} // }
} // }
data := url.Values{} // data := url.Values{}
data.Add("username", username) // data.Add("username", username)
data.Add("clientid", clientID) // data.Add("clientid", clientID)
data.Add("password", password) // data.Add("password", password)
req, err := http.NewRequest("POST", config.SuperURL, strings.NewReader(data.Encode())) // req, err := http.NewRequest("POST", config.SuperURL, strings.NewReader(data.Encode()))
if err != nil { // if err != nil {
log.Error("new request super: ", zap.Error(err)) // log.Error("new request super: ", zap.Error(err))
return false // return false
} // }
req.Header.Add("Content-Type", "application/x-www-form-urlencoded") // req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) // req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode())))
resp, err := httpClient.Do(req) // resp, err := httpClient.Do(req)
if err != nil { // if err != nil {
log.Error("request super: ", zap.Error(err)) // log.Error("request super: ", zap.Error(err))
return false // return false
} // }
defer resp.Body.Close() // defer resp.Body.Close()
io.Copy(ioutil.Discard, resp.Body) // io.Copy(ioutil.Discard, resp.Body)
if resp.StatusCode == http.StatusOK { // if resp.StatusCode == http.StatusOK {
return true // return true
} // }
return false // return false
} // }
//CheckACL check mqtt connect //CheckACL check mqtt connect
func CheckACL(username, access, topic string) bool { func (a *authHTTP) CheckACL(username, access, topic string) bool {
action := access action := access
{ {
aCache := checkCache(action, "", username, "", topic) aCache := checkCache(action, "", username, "", topic)
@@ -163,7 +162,7 @@ func CheckACL(username, access, topic string) bool {
data.Add("access", access) data.Add("access", access)
req.URL.RawQuery = data.Encode() req.URL.RawQuery = data.Encode()
// fmt.Println("req:", req) // fmt.Println("req:", req)
resp, err := httpClient.Do(req) resp, err := a.client.Do(req)
if err != nil { if err != nil {
log.Error("request acl: ", zap.Error(err)) log.Error("request acl: ", zap.Error(err))
return false return false

11
plugins/auth/mock.go Normal file
View File

@@ -0,0 +1,11 @@
package auth
type mockAuth struct{}
func (m *mockAuth) CheckACL(action, username, topic string) bool {
return true
}
func (m *mockAuth) CheckConnect(clientID, username, password string) bool {
return true
}

View File

@@ -1,4 +1,6 @@
package plugins package bridge
import "github.com/fhmq/rhmq/logger"
const ( const (
//Connect mqtt connect //Connect mqtt connect
@@ -13,6 +15,10 @@ const (
Disconnect = "disconnect" Disconnect = "disconnect"
) )
var (
log = logger.Get().Named("bridge")
)
//Elements kafka publish elements //Elements kafka publish elements
type Elements struct { type Elements struct {
ClientID string `json:"clientid"` ClientID string `json:"clientid"`
@@ -23,3 +29,21 @@ type Elements struct {
Size int32 `json:"size"` Size int32 `json:"size"`
Action string `json:"action"` Action string `json:"action"`
} }
const (
//Kafka plugin name
Kafka = "kafka"
)
type BridgeMQ interface {
Publish(e *Elements) error
}
func NewBridgeMQ(name string) BridgeMQ {
switch name {
case Kafka:
return InitKafka()
default:
return &mockMQ{}
}
}

View File

@@ -1,29 +1,16 @@
package kafka package bridge
import ( import (
"encoding/json" "encoding/json"
"errors"
"io/ioutil" "io/ioutil"
"regexp" "regexp"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/fhmq/hmq/logger"
"github.com/fhmq/hmq/plugins"
"go.uber.org/zap" "go.uber.org/zap"
) )
const ( type kafakConfig struct {
//Kafka plugin name
Kafka = "kafka"
)
var (
kafkaClient sarama.AsyncProducer
config Config
log = logger.Get().Named("kafka")
)
//Config device kafka config
type Config struct {
Addr []string `json:"addr"` Addr []string `json:"addr"`
ConnectTopic string `json:"onConnect"` ConnectTopic string `json:"onConnect"`
SubscribeTopic string `json:"onSubscribe"` SubscribeTopic string `json:"onSubscribe"`
@@ -33,28 +20,34 @@ type Config struct {
RegexpMap map[string]string `json:"regexpMap"` RegexpMap map[string]string `json:"regexpMap"`
} }
type kafka struct {
kafakConfig kafakConfig
kafkaClient sarama.AsyncProducer
}
//Init init kafak client //Init init kafak client
func Init() { func InitKafka() *kafka {
log.Info("start connect kafka....") log.Info("start connect kafka....")
content, err := ioutil.ReadFile("./plugins/kafka/kafka.json") content, err := ioutil.ReadFile("./plugins/mq/kafka/kafka.json")
if err != nil { if err != nil {
log.Fatal("Read config file error: ", zap.Error(err)) log.Fatal("Read config file error: ", zap.Error(err))
} }
// log.Info(string(content)) // log.Info(string(content))
var config kafakConfig
err = json.Unmarshal(content, &config) err = json.Unmarshal(content, &config)
if err != nil { if err != nil {
log.Fatal("Unmarshal config file error: ", zap.Error(err)) log.Fatal("Unmarshal config file error: ", zap.Error(err))
} }
connect() c := &kafka{kafakConfig: config}
c.connect()
return c
} }
//connect //connect
func connect() { func (k *kafka) connect() {
var err error
conf := sarama.NewConfig() conf := sarama.NewConfig()
conf.Version = sarama.V1_1_1_0 conf.Version = sarama.V1_1_1_0
kafkaClient, err = sarama.NewAsyncProducer(config.Addr, conf) kafkaClient, err := sarama.NewAsyncProducer(k.kafakConfig.Addr, conf)
if err != nil { if err != nil {
log.Fatal("create kafka async producer failed: ", zap.Error(err)) log.Fatal("create kafka async producer failed: ", zap.Error(err))
} }
@@ -64,18 +57,21 @@ func connect() {
log.Error("send msg to kafka failed: ", zap.Error(err)) log.Error("send msg to kafka failed: ", zap.Error(err))
} }
}() }()
k.kafkaClient = kafkaClient
} }
//Publish publish to kafka //Publish publish to kafka
func Publish(e *plugins.Elements) { func (k *kafka) Publish(e *Elements) error {
config := k.kafakConfig
key := e.ClientID key := e.ClientID
var topics []string var topics []string
switch e.Action { switch e.Action {
case plugins.Connect: case Connect:
if config.ConnectTopic != "" { if config.ConnectTopic != "" {
topics = append(topics, config.ConnectTopic) topics = append(topics, config.ConnectTopic)
} }
case plugins.Publish: case Publish:
if config.PublishTopic != "" { if config.PublishTopic != "" {
topics = append(topics, config.PublishTopic) topics = append(topics, config.PublishTopic)
} }
@@ -86,38 +82,34 @@ func Publish(e *plugins.Elements) {
topics = append(topics, topic) topics = append(topics, topic)
} }
} }
case plugins.Subscribe: case Subscribe:
if config.SubscribeTopic != "" { if config.SubscribeTopic != "" {
topics = append(topics, config.SubscribeTopic) topics = append(topics, config.SubscribeTopic)
} }
case plugins.Unsubscribe: case Unsubscribe:
if config.UnsubscribeTopic != "" { if config.UnsubscribeTopic != "" {
topics = append(topics, config.UnsubscribeTopic) topics = append(topics, config.UnsubscribeTopic)
} }
case plugins.Disconnect: case Disconnect:
if config.DisconnectTopic != "" { if config.DisconnectTopic != "" {
topics = append(topics, config.DisconnectTopic) topics = append(topics, config.DisconnectTopic)
} }
default: default:
log.Error("error action: ", zap.String("action", e.Action)) return errors.New("error action: " + e.Action)
return
} }
err := publish(topics, key, e) return k.publish(topics, key, e)
if err != nil {
log.Error("publish kafka error: ", zap.Error(err))
}
} }
func publish(topics []string, key string, msg *plugins.Elements) error { func (k *kafka) publish(topics []string, key string, msg *Elements) error {
payload, err := json.Marshal(msg) payload, err := json.Marshal(msg)
if err != nil { if err != nil {
return err return err
} }
for _, topic := range topics { for _, topic := range topics {
kafkaClient.Input() <- &sarama.ProducerMessage{ k.kafkaClient.Input() <- &sarama.ProducerMessage{
Topic: topic, Topic: topic,
Key: sarama.ByteEncoder(key), Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(payload), Value: sarama.ByteEncoder(payload),

7
plugins/bridge/mock.go Normal file
View File

@@ -0,0 +1,7 @@
package bridge
type mockMQ struct{}
func (m *mockMQ) Publish(e *Elements) error {
return nil
}