diff --git a/plugins/kafka/kafka.go b/plugins/kafka/kafka.go index 7539456..ab17293 100644 --- a/plugins/kafka/kafka.go +++ b/plugins/kafka/kafka.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "io/ioutil" + "regexp" "time" "github.com/Shopify/sarama" @@ -64,6 +65,10 @@ func connect() { }() } +const ( + _ThingModelTopicRegexp = `^/\$system/(.+)/(.+)/tmodel/(.*))$` +) + //Publish publish to kafka func Publish(e *plugins.Elements) { topic, key := "", "" @@ -87,6 +92,15 @@ func Publish(e *plugins.Elements) { if err != nil { log.Error("publish kafka error: ", zap.Error(err)) } + + match, _ := regexp.MatchString(_ThingModelTopicRegexp, topic) + if match { + topic := "tmodel.msg.upstream" + err := publish(topic, key, e) + if err != nil { + log.Error("publish kafka error: ", zap.Error(err)) + } + } } func publish(topic, key string, msg *plugins.Elements) error {