This commit is contained in:
joy.zhou
2019-07-12 18:39:59 +08:00
parent c394ee47d2
commit eca34813bf
+14
View File
@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"io/ioutil" "io/ioutil"
"regexp"
"time" "time"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
@@ -64,6 +65,10 @@ func connect() {
}() }()
} }
const (
_ThingModelTopicRegexp = `^/\$system/(.+)/(.+)/tmodel/(.*))$`
)
//Publish publish to kafka //Publish publish to kafka
func Publish(e *plugins.Elements) { func Publish(e *plugins.Elements) {
topic, key := "", "" topic, key := "", ""
@@ -87,6 +92,15 @@ func Publish(e *plugins.Elements) {
if err != nil { if err != nil {
log.Error("publish kafka error: ", zap.Error(err)) log.Error("publish kafka error: ", zap.Error(err))
} }
match, _ := regexp.MatchString(_ThingModelTopicRegexp, topic)
if match {
topic := "tmodel.msg.upstream"
err := publish(topic, key, e)
if err != nil {
log.Error("publish kafka error: ", zap.Error(err))
}
}
} }
func publish(topic, key string, msg *plugins.Elements) error { func publish(topic, key string, msg *plugins.Elements) error {