diff --git a/broker/broker.go b/broker/broker.go index 6ebb04e..0e63507 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -12,11 +12,11 @@ import ( "sync" "time" + "github.com/fhmq/hmq/broker/lib/sessions" + "github.com/fhmq/hmq/broker/lib/topics" "github.com/fhmq/hmq/plugins" "github.com/eclipse/paho.mqtt.golang/packets" - "github.com/fhmq/hmq/lib/sessions" - "github.com/fhmq/hmq/lib/topics" "github.com/fhmq/hmq/plugins/authhttp" "github.com/fhmq/hmq/plugins/kafka" "github.com/fhmq/hmq/pool" diff --git a/broker/client.go b/broker/client.go index 55adc64..d8e462f 100644 --- a/broker/client.go +++ b/broker/client.go @@ -13,11 +13,11 @@ import ( "sync" "time" + "github.com/fhmq/hmq/broker/lib/sessions" + "github.com/fhmq/hmq/broker/lib/topics" "github.com/fhmq/hmq/plugins" "github.com/eclipse/paho.mqtt.golang/packets" - "github.com/fhmq/hmq/lib/sessions" - "github.com/fhmq/hmq/lib/topics" "go.uber.org/zap" ) diff --git a/lib/acl/acl.go b/broker/lib/acl/acl.go similarity index 100% rename from lib/acl/acl.go rename to broker/lib/acl/acl.go diff --git a/lib/acl/aclcheck.go b/broker/lib/acl/aclcheck.go similarity index 100% rename from lib/acl/aclcheck.go rename to broker/lib/acl/aclcheck.go diff --git a/lib/acl/spilt.go b/broker/lib/acl/spilt.go similarity index 100% rename from lib/acl/spilt.go rename to broker/lib/acl/spilt.go diff --git a/lib/sessions/memprovider.go b/broker/lib/sessions/memprovider.go similarity index 100% rename from lib/sessions/memprovider.go rename to broker/lib/sessions/memprovider.go diff --git a/lib/sessions/session.go b/broker/lib/sessions/session.go similarity index 100% rename from lib/sessions/session.go rename to broker/lib/sessions/session.go diff --git a/lib/sessions/sessions.go b/broker/lib/sessions/sessions.go similarity index 100% rename from lib/sessions/sessions.go rename to broker/lib/sessions/sessions.go diff --git a/lib/topics/memtopics.go b/broker/lib/topics/memtopics.go similarity index 100% rename from lib/topics/memtopics.go rename to broker/lib/topics/memtopics.go diff --git a/lib/topics/topics.go b/broker/lib/topics/topics.go similarity index 100% rename from lib/topics/topics.go rename to broker/lib/topics/topics.go diff --git a/deploy/config.yaml b/deploy/config.yaml index 04a36bc..91fae61 100644 --- a/deploy/config.yaml +++ b/deploy/config.yaml @@ -23,11 +23,11 @@ data: "onUnsubscribe": "onUnsubscribe" } - authhttp.json: | - { - "auth": "http://127.0.0.1:9090/mqtt/auth", - "acl": "http://127.0.0.1:9090/mqtt/acl", - "super": "http://127.0.0.1:9090/mqtt/superuser" - } + authhttp.json: | + { + "auth": "http://127.0.0.1:9090/mqtt/auth", + "acl": "http://127.0.0.1:9090/mqtt/acl", + "super": "http://127.0.0.1:9090/mqtt/superuser" + } diff --git a/plugins/kafka/kafka.go b/plugins/kafka/kafka.go index ce491c3..f521db7 100644 --- a/plugins/kafka/kafka.go +++ b/plugins/kafka/kafka.go @@ -2,10 +2,8 @@ package kafka import ( "encoding/json" - "errors" "io/ioutil" "regexp" - "time" "github.com/Shopify/sarama" "github.com/fhmq/hmq/logger" @@ -26,12 +24,13 @@ var ( //Config device kafka config type Config struct { - Addr []string `json:"addr"` - ConnectTopic string `json:"onConnect"` - SubscribeTopic string `json:"onSubscribe"` - PublishTopic string `json:"onPublish"` - UnsubscribeTopic string `json:"onUnsubscribe"` - DisconnectTopic string `json:"onDisconnect"` + Addr []string `json:"addr"` + ConnectTopic string `json:"onConnect"` + SubscribeTopic string `json:"onSubscribe"` + PublishTopic string `json:"onPublish"` + UnsubscribeTopic string `json:"onUnsubscribe"` + DisconnectTopic string `json:"onDisconnect"` + RegexpMap map[string]string `json:"regexpMap"` } //Init init kafak client @@ -67,56 +66,63 @@ func connect() { }() } -const ( - _ThingModelTopicRegexp = `^/\$system/(.+)/(.+)/tmodel/(.*)$` -) - //Publish publish to kafka func Publish(e *plugins.Elements) { - topic, key := "", e.ClientID + key := e.ClientID + var topics []string switch e.Action { case plugins.Connect: - topic = config.ConnectTopic + if config.ConnectTopic != "" { + topics = append(topics, config.ConnectTopic) + } case plugins.Publish: - topic = config.PublishTopic + if config.PublishTopic != "" { + topics = append(topics, config.PublishTopic) + } + // foreach regexp map config + for reg, topic := range config.RegexpMap { + match, _ := regexp.MatchString(reg, e.Topic) + if match { + topics = append(topics, topic) + } + } case plugins.Subscribe: - topic = config.SubscribeTopic + if config.SubscribeTopic != "" { + topics = append(topics, config.SubscribeTopic) + } case plugins.Unsubscribe: - topic = config.UnsubscribeTopic + if config.UnsubscribeTopic != "" { + topics = append(topics, config.UnsubscribeTopic) + } case plugins.Disconnect: - topic = config.DisconnectTopic + if config.DisconnectTopic != "" { + topics = append(topics, config.DisconnectTopic) + } default: log.Error("error action: ", zap.String("action", e.Action)) return } - // fmt.Println("publish kafka: ", topic, key) - err := publish(topic, key, e) + + err := publish(topics, key, e) if err != nil { log.Error("publish kafka error: ", zap.Error(err)) } - match, _ := regexp.MatchString(_ThingModelTopicRegexp, e.Topic) - if match && e.Action == plugins.Publish { - topic := "tmodel.msg.upstream" - err := publish(topic, key, e) - if err != nil { - log.Error("publish kafka error: ", zap.Error(err)) - } - } } -func publish(topic, key string, msg *plugins.Elements) error { +func publish(topics []string, key string, msg *plugins.Elements) error { payload, err := json.Marshal(msg) if err != nil { return err } - select { - case kafkaClient.Input() <- &sarama.ProducerMessage{ - Topic: topic, - Key: sarama.ByteEncoder(key), - Value: sarama.ByteEncoder(payload)}: - return nil - case <-time.After(1 * time.Minute): - return errors.New("send to kafka time out") + + for _, topic := range topics { + kafkaClient.Input() <- &sarama.ProducerMessage{ + Topic: topic, + Key: sarama.ByteEncoder(key), + Value: sarama.ByteEncoder(payload), + } } + + return nil } diff --git a/plugins/kafka/kafka.json b/plugins/kafka/kafka.json index d6a7dc3..a2fbdb7 100644 --- a/plugins/kafka/kafka.json +++ b/plugins/kafka/kafka.json @@ -6,5 +6,10 @@ "onPublish": "onPublish", "onSubscribe": "onSubscribe", "onDisconnect": "onDisconnect", - "onUnsubscribe": "onUnsubscribe" + "onUnsubscribe": "onUnsubscribe", + "regexpMap": [ + { + "^/(.+)/(.+)/upload/(.*)$": "upload" + } + ] } \ No newline at end of file diff --git a/thing_model b/thing_model deleted file mode 100755 index 7f3608a..0000000 Binary files a/thing_model and /dev/null differ