update plugin

This commit is contained in:
joy.zhou
2019-08-19 10:33:19 +08:00
parent 148738800b
commit 69a26f8cd9
176 changed files with 18008 additions and 2772 deletions

View File

@@ -4,8 +4,6 @@ package broker
import (
"strings"
"github.com/fhmq/hmq/plugins/authhttp"
)
const (
@@ -14,16 +12,20 @@ const (
)
func (b *Broker) CheckTopicAuth(action, username, topic string) bool {
if b.pluginAuthHTTP {
if b.auth != nil {
if strings.HasPrefix(topic, "$SYS/broker/connection/clients/") {
return true
}
if strings.HasPrefix(topic, "$queue/") {
topic = strings.TrimPrefix(topic, "$queue/")
if strings.HasPrefix(topic, "$share/") && action == SUB {
substr := groupCompile.FindStringSubmatch(topic)
if len(substr) != 3 {
return false
}
topic = substr[2]
}
return authhttp.CheckACL(username, action, topic)
return b.auth.CheckACL(action, username, topic)
}
return true
@@ -31,11 +33,11 @@ func (b *Broker) CheckTopicAuth(action, username, topic string) bool {
}
func (b *Broker) CheckConnectAuth(clientID, username, password string) bool {
if b.pluginAuthHTTP {
if b.auth != nil {
if clientID == "" || username == "" {
return false
}
return authhttp.CheckAuth(clientID, username, password)
return b.auth.CheckConnect(clientID, username, password)
}
return true

15
broker/bridge.go Normal file
View File

@@ -0,0 +1,15 @@
package broker
import (
"github.com/fhmq/hmq/plugins/bridge"
"go.uber.org/zap"
)
func (b *Broker) Publish(e *bridge.Elements) {
if b.bridgeMQ != nil {
err := b.bridgeMQ.Publish(e)
if err != nil {
log.Error("send message to mq error.", zap.Error(err))
}
}
}

View File

@@ -7,18 +7,18 @@ import (
"fmt"
"net"
"net/http"
_ "net/http/pprof"
"runtime/debug"
"sync"
"time"
"github.com/fhmq/hmq/plugins/bridge"
"github.com/fhmq/hmq/plugins/auth"
"github.com/fhmq/hmq/broker/lib/sessions"
"github.com/fhmq/hmq/broker/lib/topics"
"github.com/fhmq/hmq/plugins"
"github.com/eclipse/paho.mqtt.golang/packets"
"github.com/fhmq/hmq/plugins/authhttp"
"github.com/fhmq/hmq/plugins/kafka"
"github.com/fhmq/hmq/pool"
"github.com/shirou/gopsutil/mem"
"go.uber.org/zap"
@@ -36,20 +36,20 @@ type Message struct {
}
type Broker struct {
id string
mu sync.Mutex
config *Config
tlsConfig *tls.Config
wpool *pool.WorkerPool
clients sync.Map
routes sync.Map
remotes sync.Map
nodes map[string]interface{}
clusterPool chan *Message
topicsMgr *topics.Manager
sessionMgr *sessions.Manager
pluginAuthHTTP bool
pluginKafka bool
id string
mu sync.Mutex
config *Config
tlsConfig *tls.Config
wpool *pool.WorkerPool
clients sync.Map
routes sync.Map
remotes sync.Map
nodes map[string]interface{}
clusterPool chan *Message
topicsMgr *topics.Manager
sessionMgr *sessions.Manager
auth auth.Auth
bridgeMQ bridge.BridgeMQ
}
func newMessagePool() []chan *Message {
@@ -96,16 +96,8 @@ func NewBroker(config *Config) (*Broker, error) {
b.tlsConfig = tlsconfig
}
for _, plugin := range b.config.Plugins {
switch plugin {
case authhttp.AuthHTTP:
authhttp.Init()
b.pluginAuthHTTP = true
case kafka.Kafka:
kafka.Init()
b.pluginKafka = true
}
}
b.auth = auth.NewAuth(b.config.Plugin.Auth)
b.bridgeMQ = bridge.NewBridgeMQ(b.config.Plugin.Bridge)
return b, nil
}
@@ -162,15 +154,6 @@ func (b *Broker) Start() {
//system monitor
go StateMonitor()
// if b.config.Debug {
// startPProf()
// }
}
func startPProf() {
go func() {
http.ListenAndServe(":10060", nil)
}()
}
func StateMonitor() {
@@ -360,10 +343,10 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
}
if typ == CLIENT {
b.Publish(&plugins.Elements{
b.Publish(&bridge.Elements{
ClientID: string(msg.ClientIdentifier),
Username: string(msg.Username),
Action: plugins.Connect,
Action: bridge.Connect,
Timestamp: time.Now().Unix(),
})
}

View File

@@ -15,7 +15,7 @@ import (
"github.com/fhmq/hmq/broker/lib/sessions"
"github.com/fhmq/hmq/broker/lib/topics"
"github.com/fhmq/hmq/plugins"
"github.com/fhmq/hmq/plugins/bridge"
"github.com/eclipse/paho.mqtt.golang/packets"
"go.uber.org/zap"
@@ -239,9 +239,6 @@ func (c *client) processRouterPublish(packet *packets.PublishPacket) {
}
func (c *client) processClientPublish(packet *packets.PublishPacket) {
if c.status == Disconnected {
return
}
topic := packet.TopicName
@@ -251,10 +248,10 @@ func (c *client) processClientPublish(packet *packets.PublishPacket) {
}
//publish kafka
c.broker.Publish(&plugins.Elements{
c.broker.Publish(&bridge.Elements{
ClientID: c.info.clientID,
Username: c.info.username,
Action: plugins.Publish,
Action: bridge.Publish,
Timestamp: time.Now().Unix(),
Payload: string(packet.Payload),
Topic: topic,
@@ -366,10 +363,10 @@ func (c *client) processClientSubscribe(packet *packets.SubscribePacket) {
continue
}
b.Publish(&plugins.Elements{
b.Publish(&bridge.Elements{
ClientID: c.info.clientID,
Username: c.info.username,
Action: plugins.Subscribe,
Action: bridge.Subscribe,
Timestamp: time.Now().Unix(),
Topic: topic,
})
@@ -547,10 +544,10 @@ func (c *client) processClientUnSubscribe(packet *packets.UnsubscribePacket) {
{
//publish kafka
b.Publish(&plugins.Elements{
b.Publish(&bridge.Elements{
ClientID: c.info.clientID,
Username: c.info.username,
Action: plugins.Unsubscribe,
Action: bridge.Unsubscribe,
Timestamp: time.Now().Unix(),
Topic: topic,
})
@@ -603,10 +600,10 @@ func (c *client) Close() {
// c.status = Disconnected
b := c.broker
b.Publish(&plugins.Elements{
b.Publish(&bridge.Elements{
ClientID: c.info.clientID,
Username: c.info.username,
Action: plugins.Disconnect,
Action: bridge.Disconnect,
Timestamp: time.Now().Unix(),
})

View File

@@ -139,13 +139,18 @@ func unWrapPublishPacket(packet *packets.PublishPacket) *packets.PublishPacket {
}
func publish(sub *subscription, packet *packets.PublishPacket) {
var p *packets.PublishPacket
if sub.client.info.username != "root" {
p = unWrapPublishPacket(packet)
} else {
p = wrapPublishPacket(packet)
}
err := sub.client.WriterPacket(p)
// var p *packets.PublishPacket
// if sub.client.info.username != "root" {
// p = unWrapPublishPacket(packet)
// } else {
// p = wrapPublishPacket(packet)
// }
// err := sub.client.WriterPacket(p)
// if err != nil {
// log.Error("process message for psub error, ", zap.Error(err))
// }
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("process message for psub error, ", zap.Error(err))
}

View File

@@ -31,7 +31,12 @@ type Config struct {
Acl bool `json:"acl"`
AclConf string `json:"aclConf"`
Debug bool `json:"debug"`
Plugins []string `json:"plugins"`
Plugin Plugins `json:"plugins"`
}
type Plugins struct {
Auth string
Bridge string
}
type RouteInfo struct {

View File

@@ -1,12 +0,0 @@
package broker
import (
"github.com/fhmq/hmq/plugins"
"github.com/fhmq/hmq/plugins/kafka"
)
func (b *Broker) Publish(e *plugins.Elements) {
if b.pluginKafka {
kafka.Publish(e)
}
}