update kafka plugins

This commit is contained in:
joy.zhou
2019-09-18 14:00:19 +08:00
parent 80b64b147e
commit 55576c1eb3
2 changed files with 46 additions and 18 deletions
+42 -13
View File
@@ -4,7 +4,7 @@ import (
"encoding/json"
"errors"
"io/ioutil"
"regexp"
"strings"
"github.com/Shopify/sarama"
"go.uber.org/zap"
@@ -17,7 +17,7 @@ type kafakConfig struct {
PublishTopic string `json:"onPublish"`
UnsubscribeTopic string `json:"onUnsubscribe"`
DisconnectTopic string `json:"onDisconnect"`
RegexpMap map[string]string `json:"regexpMap"`
DeliverMap map[string]string `json:"deliverMap"`
}
type kafka struct {
@@ -65,34 +65,34 @@ func (k *kafka) connect() {
func (k *kafka) Publish(e *Elements) error {
config := k.kafakConfig
key := e.ClientID
var topics []string
var topics map[string]bool
switch e.Action {
case Connect:
if config.ConnectTopic != "" {
topics = append(topics, config.ConnectTopic)
topics[config.ConnectTopic] = true
}
case Publish:
if config.PublishTopic != "" {
topics = append(topics, config.PublishTopic)
topics[config.PublishTopic] = true
}
// foreach regexp map config
for reg, topic := range config.RegexpMap {
match, _ := regexp.MatchString(reg, e.Topic)
for reg, topic := range config.DeliverMap {
match := matchTopic(reg, e.Topic)
if match {
topics = append(topics, topic)
topics[topic] = true
}
}
case Subscribe:
if config.SubscribeTopic != "" {
topics = append(topics, config.SubscribeTopic)
topics[config.SubscribeTopic] = true
}
case Unsubscribe:
if config.UnsubscribeTopic != "" {
topics = append(topics, config.UnsubscribeTopic)
topics[config.UnsubscribeTopic] = true
}
case Disconnect:
if config.DisconnectTopic != "" {
topics = append(topics, config.DisconnectTopic)
topics[config.DisconnectTopic] = true
}
default:
return errors.New("error action: " + e.Action)
@@ -102,13 +102,13 @@ func (k *kafka) Publish(e *Elements) error {
}
func (k *kafka) publish(topics []string, key string, msg *Elements) error {
func (k *kafka) publish(topics map[string]bool, key string, msg *Elements) error {
payload, err := json.Marshal(msg)
if err != nil {
return err
}
for _, topic := range topics {
for topic, _ := range topics {
k.kafkaClient.Input() <- &sarama.ProducerMessage{
Topic: topic,
Key: sarama.ByteEncoder(key),
@@ -118,3 +118,32 @@ func (k *kafka) publish(topics []string, key string, msg *Elements) error {
return nil
}
func match(subTopic []string, topic []string) bool {
if len(subTopic) == 0 {
if len(topic) == 0 {
return true
}
return false
}
if len(topic) == 0 {
if subTopic[0] == "#" {
return true
}
return false
}
if subTopic[0] == "#" {
return true
}
if (subTopic[0] == "+") || (subTopic[0] == topic[0]) {
return match(subTopic[1:], topic[1:])
}
return false
}
func matchTopic(subTopic string, topic string) bool {
return match(strings.Split(subTopic, "/"), strings.Split(topic, "/"))
}
+4 -5
View File
@@ -7,9 +7,8 @@
"onSubscribe": "onSubscribe",
"onDisconnect": "onDisconnect",
"onUnsubscribe": "onUnsubscribe",
"regexpMap": [
{
"^/(.+)/(.+)/upload/(.*)$": "upload"
}
]
"deliverMap": {
"#": "publish",
"/upload/+/#": "upload"
}
}