mirror of
https://github.com/fhmq/hmq.git
synced 2026-05-02 14:28:34 +00:00
modify
This commit is contained in:
8
.vscode/settings.json
vendored
Normal file
8
.vscode/settings.json
vendored
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
{
|
||||||
|
"go.lintFlags": [
|
||||||
|
"--disable=all",
|
||||||
|
"--enable=errcheck,varcheck,deadcode",
|
||||||
|
"--enable=varcheck",
|
||||||
|
"--enable=deadcode"
|
||||||
|
]
|
||||||
|
}
|
||||||
@@ -3,10 +3,7 @@
|
|||||||
package broker
|
package broker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/fhmq/hmq/lib/acl"
|
"github.com/fhmq/hmq/plugins/authhttp"
|
||||||
"github.com/fsnotify/fsnotify"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -15,67 +12,17 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (c *client) CheckTopicAuth(typ int, topic string) bool {
|
func (c *client) CheckTopicAuth(typ int, topic string) bool {
|
||||||
if c.typ != CLIENT || !c.broker.config.Acl {
|
if c.typ != CLIENT || !c.broker.pluginAuthHTTP {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if strings.HasPrefix(topic, "$queue/") {
|
access := "sub"
|
||||||
topic = string([]byte(topic)[7:])
|
switch typ {
|
||||||
if topic == "" {
|
case 1:
|
||||||
return false
|
access = "pub"
|
||||||
}
|
case 2:
|
||||||
|
access = "sub"
|
||||||
}
|
}
|
||||||
ip := c.info.remoteIP
|
|
||||||
username := string(c.info.username)
|
username := string(c.info.username)
|
||||||
clientid := string(c.info.clientID)
|
return authhttp.CheckACL(username, access, topic)
|
||||||
aclInfo := c.broker.AclConfig
|
|
||||||
return acl.CheckTopicAuth(aclInfo, typ, ip, username, clientid, topic)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
|
||||||
watchList = []string{"./conf"}
|
|
||||||
)
|
|
||||||
|
|
||||||
func (b *Broker) handleFsEvent(event fsnotify.Event) error {
|
|
||||||
switch event.Name {
|
|
||||||
case b.config.AclConf:
|
|
||||||
if event.Op&fsnotify.Write == fsnotify.Write ||
|
|
||||||
event.Op&fsnotify.Create == fsnotify.Create {
|
|
||||||
log.Info("text:handling acl config change event:", zap.String("filename", event.Name))
|
|
||||||
aclconfig, err := acl.AclConfigLoad(event.Name)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("aclconfig change failed, load acl conf error: ", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
b.AclConfig = aclconfig
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Broker) StartAclWatcher() {
|
|
||||||
go func() {
|
|
||||||
wch, e := fsnotify.NewWatcher()
|
|
||||||
if e != nil {
|
|
||||||
log.Error("start monitor acl config file error,", zap.Error(e))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer wch.Close()
|
|
||||||
|
|
||||||
for _, i := range watchList {
|
|
||||||
if err := wch.Add(i); err != nil {
|
|
||||||
log.Error("start monitor acl config file error,", zap.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.Info("watching acl config file change...")
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case evt := <-wch.Events:
|
|
||||||
b.handleFsEvent(evt)
|
|
||||||
case err := <-wch.Errors:
|
|
||||||
log.Error("error:", zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -13,10 +13,13 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/fhmq/hmq/plugins"
|
||||||
|
|
||||||
"github.com/eclipse/paho.mqtt.golang/packets"
|
"github.com/eclipse/paho.mqtt.golang/packets"
|
||||||
"github.com/fhmq/hmq/lib/acl"
|
|
||||||
"github.com/fhmq/hmq/lib/sessions"
|
"github.com/fhmq/hmq/lib/sessions"
|
||||||
"github.com/fhmq/hmq/lib/topics"
|
"github.com/fhmq/hmq/lib/topics"
|
||||||
|
"github.com/fhmq/hmq/plugins/authhttp"
|
||||||
|
"github.com/fhmq/hmq/plugins/kafka"
|
||||||
"github.com/fhmq/hmq/pool"
|
"github.com/fhmq/hmq/pool"
|
||||||
"github.com/shirou/gopsutil/mem"
|
"github.com/shirou/gopsutil/mem"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@@ -34,21 +37,23 @@ type Message struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Broker struct {
|
type Broker struct {
|
||||||
id string
|
id string
|
||||||
cid uint64
|
cid uint64
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
config *Config
|
config *Config
|
||||||
tlsConfig *tls.Config
|
tlsConfig *tls.Config
|
||||||
AclConfig *acl.ACLConfig
|
wpool *pool.WorkerPool
|
||||||
wpool *pool.WorkerPool
|
clients sync.Map
|
||||||
clients sync.Map
|
routes sync.Map
|
||||||
routes sync.Map
|
remotes sync.Map
|
||||||
remotes sync.Map
|
nodes map[string]interface{}
|
||||||
nodes map[string]interface{}
|
clusterPool chan *Message
|
||||||
clusterPool chan *Message
|
queues map[string]int
|
||||||
queues map[string]int
|
topicsMgr *topics.Manager
|
||||||
topicsMgr *topics.Manager
|
sessionMgr *sessions.Manager
|
||||||
sessionMgr *sessions.Manager
|
pluginAuthHTTP bool
|
||||||
|
pluginKafka bool
|
||||||
|
|
||||||
// messagePool []chan *Message
|
// messagePool []chan *Message
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -92,15 +97,18 @@ func NewBroker(config *Config) (*Broker, error) {
|
|||||||
}
|
}
|
||||||
b.tlsConfig = tlsconfig
|
b.tlsConfig = tlsconfig
|
||||||
}
|
}
|
||||||
if b.config.Acl {
|
|
||||||
aclconfig, err := acl.AclConfigLoad(b.config.AclConf)
|
for _, plugin := range b.config.Plugins {
|
||||||
if err != nil {
|
switch plugin {
|
||||||
log.Error("Load acl conf error", zap.Error(err))
|
case authhttp.AuthHTTP:
|
||||||
return nil, err
|
authhttp.Init()
|
||||||
|
b.pluginAuthHTTP = true
|
||||||
|
case kafka.Kafka:
|
||||||
|
kafka.Init()
|
||||||
|
b.pluginKafka = true
|
||||||
}
|
}
|
||||||
b.AclConfig = aclconfig
|
|
||||||
b.StartAclWatcher()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return b, nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -328,12 +336,30 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
|
|||||||
connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket)
|
connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket)
|
||||||
connack.ReturnCode = packets.Accepted
|
connack.ReturnCode = packets.Accepted
|
||||||
connack.SessionPresent = msg.CleanSession
|
connack.SessionPresent = msg.CleanSession
|
||||||
|
|
||||||
|
if b.pluginAuthHTTP == true && authhttp.CheckAuth(string(msg.ClientIdentifier), string(msg.Username), string(msg.Password)) {
|
||||||
|
err = connack.Write(conn)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err = connack.Write(conn)
|
err = connack.Write(conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier))
|
log.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if b.pluginKafka && typ == CLIENT {
|
||||||
|
kafka.Publish(&plugins.Elements{
|
||||||
|
ClientID: string(msg.ClientIdentifier),
|
||||||
|
Username: string(msg.Username),
|
||||||
|
Action: plugins.Connect,
|
||||||
|
Timestamp: time.Now().Unix(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
willmsg := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
|
willmsg := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
|
||||||
if msg.WillFlag {
|
if msg.WillFlag {
|
||||||
willmsg.Qos = msg.WillQos
|
willmsg.Qos = msg.WillQos
|
||||||
|
|||||||
@@ -11,6 +11,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/fhmq/hmq/plugins"
|
||||||
|
"github.com/fhmq/hmq/plugins/kafka"
|
||||||
|
|
||||||
"github.com/eclipse/paho.mqtt.golang/packets"
|
"github.com/eclipse/paho.mqtt.golang/packets"
|
||||||
"github.com/fhmq/hmq/lib/sessions"
|
"github.com/fhmq/hmq/lib/sessions"
|
||||||
"github.com/fhmq/hmq/lib/topics"
|
"github.com/fhmq/hmq/lib/topics"
|
||||||
@@ -176,6 +179,17 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.broker.pluginKafka && c.typ == CLIENT {
|
||||||
|
kafka.Publish(&plugins.Elements{
|
||||||
|
ClientID: c.info.clientID,
|
||||||
|
Username: c.info.username,
|
||||||
|
Action: plugins.Publish,
|
||||||
|
Timestamp: time.Now().Unix(),
|
||||||
|
Payload: string(packet.Payload),
|
||||||
|
Topic: topic,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
switch packet.Qos {
|
switch packet.Qos {
|
||||||
case QosAtMostOnce:
|
case QosAtMostOnce:
|
||||||
c.ProcessPublishMessage(packet)
|
c.ProcessPublishMessage(packet)
|
||||||
@@ -266,6 +280,16 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.broker.pluginKafka && c.typ == CLIENT {
|
||||||
|
kafka.Publish(&plugins.Elements{
|
||||||
|
ClientID: c.info.clientID,
|
||||||
|
Username: c.info.username,
|
||||||
|
Action: plugins.Subscribe,
|
||||||
|
Timestamp: time.Now().Unix(),
|
||||||
|
Topic: topic,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
sub := &subscription{
|
sub := &subscription{
|
||||||
topic: t,
|
topic: t,
|
||||||
qos: qoss[i],
|
qos: qoss[i],
|
||||||
@@ -324,6 +348,16 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
|
|||||||
c.session.RemoveTopic(topic)
|
c.session.RemoveTopic(topic)
|
||||||
delete(c.subMap, topic)
|
delete(c.subMap, topic)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.broker.pluginKafka && c.typ == CLIENT {
|
||||||
|
kafka.Publish(&plugins.Elements{
|
||||||
|
ClientID: c.info.clientID,
|
||||||
|
Username: c.info.username,
|
||||||
|
Action: plugins.Unsubscribe,
|
||||||
|
Timestamp: time.Now().Unix(),
|
||||||
|
Topic: topic,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
unsuback := packets.NewControlPacket(packets.Unsuback).(*packets.UnsubackPacket)
|
unsuback := packets.NewControlPacket(packets.Unsuback).(*packets.UnsubackPacket)
|
||||||
@@ -364,6 +398,15 @@ func (c *client) Close() {
|
|||||||
// time.Sleep(1 * time.Second)
|
// time.Sleep(1 * time.Second)
|
||||||
// c.status = Disconnected
|
// c.status = Disconnected
|
||||||
|
|
||||||
|
if c.broker.pluginKafka && c.typ == CLIENT {
|
||||||
|
kafka.Publish(&plugins.Elements{
|
||||||
|
ClientID: c.info.clientID,
|
||||||
|
Username: c.info.username,
|
||||||
|
Action: plugins.Disconnect,
|
||||||
|
Timestamp: time.Now().Unix(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
if c.conn != nil {
|
if c.conn != nil {
|
||||||
c.conn.Close()
|
c.conn.Close()
|
||||||
c.conn = nil
|
c.conn = nil
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ type Config struct {
|
|||||||
Acl bool `json:"acl"`
|
Acl bool `json:"acl"`
|
||||||
AclConf string `json:"aclConf"`
|
AclConf string `json:"aclConf"`
|
||||||
Debug bool `json:"-"`
|
Debug bool `json:"-"`
|
||||||
|
Plugins []string `json:"plugins"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type RouteInfo struct {
|
type RouteInfo struct {
|
||||||
|
|||||||
@@ -18,6 +18,5 @@
|
|||||||
"certFile": "ssl/server/cert.pem",
|
"certFile": "ssl/server/cert.pem",
|
||||||
"keyFile": "ssl/server/key.pem"
|
"keyFile": "ssl/server/key.pem"
|
||||||
},
|
},
|
||||||
"acl": false,
|
"plugins": ["authhttp","kafka"]
|
||||||
"aclConf": "conf/acl.conf"
|
|
||||||
}
|
}
|
||||||
|
|||||||
1
go.mod
1
go.mod
@@ -19,4 +19,5 @@ require (
|
|||||||
golang.org/x/net v0.0.0-20190424024845-afe8014c977f
|
golang.org/x/net v0.0.0-20190424024845-afe8014c977f
|
||||||
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 // indirect
|
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 // indirect
|
||||||
golang.org/x/tools v0.0.0-20190424031103-cb2dda6eabdf // indirect
|
golang.org/x/tools v0.0.0-20190424031103-cb2dda6eabdf // indirect
|
||||||
|
github.com/Shopify/sarama v1.23.0
|
||||||
)
|
)
|
||||||
|
|||||||
93
plugins/authhttp/authhttp.go
Normal file
93
plugins/authhttp/authhttp.go
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
package authhttp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/fhmq/hmq/logger"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
//AuthHTTP plugin name
|
||||||
|
AuthHTTP = "authhttp"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
config Config
|
||||||
|
log = logger.Get().Named("http")
|
||||||
|
)
|
||||||
|
|
||||||
|
//Config device kafka config
|
||||||
|
type Config struct {
|
||||||
|
AuthURL string `json:"auth"`
|
||||||
|
ACLURL string `json:"onSubscribe"`
|
||||||
|
SuperURL string `json:"onPublish"`
|
||||||
|
}
|
||||||
|
|
||||||
|
//Init init kafak client
|
||||||
|
func Init() {
|
||||||
|
content, err := ioutil.ReadFile("../../plugins/kafka/conf.json")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Read config file error: ", zap.Error(err))
|
||||||
|
}
|
||||||
|
// log.Info(string(content))
|
||||||
|
|
||||||
|
err = json.Unmarshal(content, &config)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Unmarshal config file error: ", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
//CheckAuth check mqtt connect
|
||||||
|
func CheckAuth(clientID, username, password string) bool {
|
||||||
|
payload := fmt.Sprintf("username=%s&password=%s&clientid=%s", username, password, clientID)
|
||||||
|
resp, err := http.Post(config.AuthURL,
|
||||||
|
"application/x-www-form-urlencoded",
|
||||||
|
strings.NewReader(payload))
|
||||||
|
if err != nil {
|
||||||
|
log.Error("request acl: ", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode == http.StatusOK {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
//CheckSuper check mqtt connect
|
||||||
|
func CheckSuper(clientID, username, password string) bool {
|
||||||
|
payload := fmt.Sprintf("username=%s&password=%s&clientid=%s", username, password, clientID)
|
||||||
|
resp, err := http.Post(config.SuperURL,
|
||||||
|
"application/x-www-form-urlencoded",
|
||||||
|
strings.NewReader(payload))
|
||||||
|
if err != nil {
|
||||||
|
log.Error("request acl: ", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode == http.StatusOK {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
//CheckACL check mqtt connect
|
||||||
|
func CheckACL(username, access, topic string) bool {
|
||||||
|
url := fmt.Sprintf(config.ACLURL+"?username=%s&access=%s&topic=%s", username, access, topic)
|
||||||
|
resp, err := http.Get(url)
|
||||||
|
if err != nil {
|
||||||
|
// handle error
|
||||||
|
}
|
||||||
|
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode == http.StatusOK {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
5
plugins/authhttp/conf.json
Normal file
5
plugins/authhttp/conf.json
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
{
|
||||||
|
"auth": "http://127.0.0.1:9090/mqtt/auth",
|
||||||
|
"acl": "http://127.0.0.1:9090/mqtt/acl",
|
||||||
|
"super": "http://127.0.0.1:9090/mqtt/superuser"
|
||||||
|
}
|
||||||
25
plugins/elements.go
Normal file
25
plugins/elements.go
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
package plugins
|
||||||
|
|
||||||
|
const (
|
||||||
|
//Connect mqtt connect
|
||||||
|
Connect = "connect"
|
||||||
|
//Publish mqtt publish
|
||||||
|
Publish = "publish"
|
||||||
|
//Subscribe mqtt sub
|
||||||
|
Subscribe = "subscribe"
|
||||||
|
//Unsubscribe mqtt sub
|
||||||
|
Unsubscribe = "unsubscribe"
|
||||||
|
//Disconnect mqtt disconenct
|
||||||
|
Disconnect = "disconnect"
|
||||||
|
)
|
||||||
|
|
||||||
|
//Elements kafka publish elements
|
||||||
|
type Elements struct {
|
||||||
|
ClientID string `json:"clientid"`
|
||||||
|
Username string `json:"username"`
|
||||||
|
Topic string `json:"topic"`
|
||||||
|
Payload string `json:"payload"`
|
||||||
|
Timestamp int64 `json:"ts"`
|
||||||
|
Size int32 `json:"size"`
|
||||||
|
Action string `json:"action"`
|
||||||
|
}
|
||||||
10
plugins/kafka/conf.json
Normal file
10
plugins/kafka/conf.json
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
{
|
||||||
|
"addr": [
|
||||||
|
"127.0.0.1:9090"
|
||||||
|
],
|
||||||
|
"onConnect": "onConnect",
|
||||||
|
"onPublish": "onPublish",
|
||||||
|
"onSubscribe": "onSubscribe",
|
||||||
|
"onDisconnect": "onDisconnect",
|
||||||
|
"onUnsubscribe": "onUnsubscribe"
|
||||||
|
}
|
||||||
106
plugins/kafka/kafka.go
Normal file
106
plugins/kafka/kafka.go
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
package kafka
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"io/ioutil"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Shopify/sarama"
|
||||||
|
"github.com/fhmq/hmq/logger"
|
||||||
|
"github.com/fhmq/hmq/plugins"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
//Kafka plugin name
|
||||||
|
Kafka = "kafka"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
kafkaClient sarama.AsyncProducer
|
||||||
|
config Config
|
||||||
|
log = logger.Get().Named("kafka")
|
||||||
|
)
|
||||||
|
|
||||||
|
//Config device kafka config
|
||||||
|
type Config struct {
|
||||||
|
Addr []string `json:"addr"`
|
||||||
|
ConnectTopic string `json:"onConnect"`
|
||||||
|
SubscribeTopic string `json:"onSubscribe"`
|
||||||
|
PublishTopic string `json:"onPublish"`
|
||||||
|
UnsubscribeTopic string `json:"onUnsubscribe"`
|
||||||
|
DisconnectTopic string `json:"onDisconnect"`
|
||||||
|
}
|
||||||
|
|
||||||
|
//Init init kafak client
|
||||||
|
func Init() {
|
||||||
|
content, err := ioutil.ReadFile("../../plugins/kafka/conf.json")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Read config file error: ", zap.Error(err))
|
||||||
|
}
|
||||||
|
// log.Info(string(content))
|
||||||
|
|
||||||
|
err = json.Unmarshal(content, &config)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("Unmarshal config file error: ", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
//connect
|
||||||
|
func connect() {
|
||||||
|
var err error
|
||||||
|
conf := sarama.NewConfig()
|
||||||
|
kafkaClient, err = sarama.NewAsyncProducer(config.Addr, conf)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("create kafka async producer failed: ", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for err := range kafkaClient.Errors() {
|
||||||
|
log.Error("send msg to kafka failed: ", zap.Error(err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
//Publish publish to kafka
|
||||||
|
func Publish(e *plugins.Elements) {
|
||||||
|
topic, key := "", ""
|
||||||
|
switch e.Action {
|
||||||
|
case plugins.Connect:
|
||||||
|
topic = config.ConnectTopic
|
||||||
|
case plugins.Publish:
|
||||||
|
topic = config.PublishTopic
|
||||||
|
case plugins.Subscribe:
|
||||||
|
topic = config.SubscribeTopic
|
||||||
|
case plugins.Unsubscribe:
|
||||||
|
topic = config.UnsubscribeTopic
|
||||||
|
case plugins.Disconnect:
|
||||||
|
topic = config.DisconnectTopic
|
||||||
|
default:
|
||||||
|
log.Error("error action: ", zap.String("action", e.Action))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
key = e.Username
|
||||||
|
err := publish(topic, key, e)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("publish kafka error: ", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func publish(topic, key string, msg *plugins.Elements) error {
|
||||||
|
payload, err := json.Marshal(msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case kafkaClient.Input() <- &sarama.ProducerMessage{
|
||||||
|
Topic: topic,
|
||||||
|
Key: sarama.ByteEncoder(key),
|
||||||
|
Value: sarama.ByteEncoder(payload)}:
|
||||||
|
return nil
|
||||||
|
case <-time.After(1 * time.Minute):
|
||||||
|
return errors.New("send to kafka time out")
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user