From 55576c1eb3ddf7889eb80a40ec2f9196de5db74e Mon Sep 17 00:00:00 2001 From: "joy.zhou" Date: Wed, 18 Sep 2019 14:00:19 +0800 Subject: [PATCH] update kafka plugins --- plugins/bridge/kafka.go | 55 +++++++++++++++++++++++++-------- plugins/bridge/kafka/kafka.json | 9 +++--- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/plugins/bridge/kafka.go b/plugins/bridge/kafka.go index 37e8024..f7f6730 100644 --- a/plugins/bridge/kafka.go +++ b/plugins/bridge/kafka.go @@ -4,7 +4,7 @@ import ( "encoding/json" "errors" "io/ioutil" - "regexp" + "strings" "github.com/Shopify/sarama" "go.uber.org/zap" @@ -17,7 +17,7 @@ type kafakConfig struct { PublishTopic string `json:"onPublish"` UnsubscribeTopic string `json:"onUnsubscribe"` DisconnectTopic string `json:"onDisconnect"` - RegexpMap map[string]string `json:"regexpMap"` + DeliverMap map[string]string `json:"deliverMap"` } type kafka struct { @@ -65,34 +65,34 @@ func (k *kafka) connect() { func (k *kafka) Publish(e *Elements) error { config := k.kafakConfig key := e.ClientID - var topics []string + var topics map[string]bool switch e.Action { case Connect: if config.ConnectTopic != "" { - topics = append(topics, config.ConnectTopic) + topics[config.ConnectTopic] = true } case Publish: if config.PublishTopic != "" { - topics = append(topics, config.PublishTopic) + topics[config.PublishTopic] = true } // foreach regexp map config - for reg, topic := range config.RegexpMap { - match, _ := regexp.MatchString(reg, e.Topic) + for reg, topic := range config.DeliverMap { + match := matchTopic(reg, e.Topic) if match { - topics = append(topics, topic) + topics[topic] = true } } case Subscribe: if config.SubscribeTopic != "" { - topics = append(topics, config.SubscribeTopic) + topics[config.SubscribeTopic] = true } case Unsubscribe: if config.UnsubscribeTopic != "" { - topics = append(topics, config.UnsubscribeTopic) + topics[config.UnsubscribeTopic] = true } case Disconnect: if config.DisconnectTopic != "" { - topics = append(topics, config.DisconnectTopic) + topics[config.DisconnectTopic] = true } default: return errors.New("error action: " + e.Action) @@ -102,13 +102,13 @@ func (k *kafka) Publish(e *Elements) error { } -func (k *kafka) publish(topics []string, key string, msg *Elements) error { +func (k *kafka) publish(topics map[string]bool, key string, msg *Elements) error { payload, err := json.Marshal(msg) if err != nil { return err } - for _, topic := range topics { + for topic, _ := range topics { k.kafkaClient.Input() <- &sarama.ProducerMessage{ Topic: topic, Key: sarama.ByteEncoder(key), @@ -118,3 +118,32 @@ func (k *kafka) publish(topics []string, key string, msg *Elements) error { return nil } + +func match(subTopic []string, topic []string) bool { + if len(subTopic) == 0 { + if len(topic) == 0 { + return true + } + return false + } + + if len(topic) == 0 { + if subTopic[0] == "#" { + return true + } + return false + } + + if subTopic[0] == "#" { + return true + } + + if (subTopic[0] == "+") || (subTopic[0] == topic[0]) { + return match(subTopic[1:], topic[1:]) + } + return false +} + +func matchTopic(subTopic string, topic string) bool { + return match(strings.Split(subTopic, "/"), strings.Split(topic, "/")) +} diff --git a/plugins/bridge/kafka/kafka.json b/plugins/bridge/kafka/kafka.json index a2fbdb7..75180a6 100644 --- a/plugins/bridge/kafka/kafka.json +++ b/plugins/bridge/kafka/kafka.json @@ -7,9 +7,8 @@ "onSubscribe": "onSubscribe", "onDisconnect": "onDisconnect", "onUnsubscribe": "onUnsubscribe", - "regexpMap": [ - { - "^/(.+)/(.+)/upload/(.*)$": "upload" - } - ] + "deliverMap": { + "#": "publish", + "/upload/+/#": "upload" + } } \ No newline at end of file