mirror of
https://github.com/fhmq/hmq.git
synced 2026-05-04 07:08:32 +00:00
Compare commits
14 Commits
new
...
revert-102
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d69f13e48 | ||
|
|
c2248bed2b | ||
|
|
6be79cbe88 | ||
|
|
6cb307d252 | ||
|
|
b8bacb4c3d | ||
|
|
481a61c520 | ||
|
|
4782f76048 | ||
|
|
1a374f9734 | ||
|
|
3f60d23e85 | ||
|
|
3cf90d5231 | ||
|
|
a1bf3d93b2 | ||
|
|
af7db83bdc | ||
|
|
839041e912 | ||
|
|
17dac26996 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1,4 +1,5 @@
|
||||
hmq
|
||||
log
|
||||
log/*
|
||||
*.test
|
||||
*.test
|
||||
.vscode/settings.json
|
||||
|
||||
3
.vscode/settings.json
vendored
3
.vscode/settings.json
vendored
@@ -4,5 +4,8 @@
|
||||
"--enable=errcheck,varcheck,deadcode",
|
||||
"--enable=varcheck",
|
||||
"--enable=deadcode"
|
||||
],
|
||||
"cSpell.words": [
|
||||
"Authorised"
|
||||
]
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM golang:1.12 as builder
|
||||
FROM golang:1.14 as builder
|
||||
WORKDIR /go/src/github.com/fhmq/hmq
|
||||
COPY . .
|
||||
RUN CGO_ENABLED=0 go build -o hmq -a -ldflags '-extldflags "-static"' .
|
||||
|
||||
@@ -92,8 +92,8 @@ func NewBroker(config *Config) (*Broker, error) {
|
||||
b.tlsConfig = tlsconfig
|
||||
}
|
||||
|
||||
b.auth = auth.NewAuth(b.config.Plugin.Auth)
|
||||
b.bridgeMQ = bridge.NewBridgeMQ(b.config.Plugin.Bridge)
|
||||
b.auth = b.config.Plugin.Auth
|
||||
b.bridgeMQ = b.config.Plugin.Bridge
|
||||
|
||||
return b, nil
|
||||
}
|
||||
@@ -123,7 +123,7 @@ func (b *Broker) Start() {
|
||||
go InitHTTPMoniter(b)
|
||||
}
|
||||
|
||||
//listen clinet over tcp
|
||||
//listen client over tcp
|
||||
if b.config.Port != "" {
|
||||
go b.StartClientListening(false)
|
||||
}
|
||||
@@ -155,12 +155,14 @@ func (b *Broker) StartWebsocketListening() {
|
||||
path := b.config.WsPath
|
||||
hp := ":" + b.config.WsPort
|
||||
log.Info("Start Websocket Listener on:", zap.String("hp", hp), zap.String("path", path))
|
||||
http.Handle(path, websocket.Handler(b.wsHandler))
|
||||
ws := &websocket.Server{Handler: websocket.Handler(b.wsHandler)}
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle(path, ws)
|
||||
var err error
|
||||
if b.config.WsTLS {
|
||||
err = http.ListenAndServeTLS(hp, b.config.TlsInfo.CertFile, b.config.TlsInfo.KeyFile, nil)
|
||||
err = http.ListenAndServeTLS(hp, b.config.TlsInfo.CertFile, b.config.TlsInfo.KeyFile, mux)
|
||||
} else {
|
||||
err = http.ListenAndServe(hp, nil)
|
||||
err = http.ListenAndServe(hp, mux)
|
||||
}
|
||||
if err != nil {
|
||||
log.Error("ListenAndServe:" + err.Error())
|
||||
|
||||
192
broker/client.go
192
broker/client.go
@@ -3,6 +3,7 @@ package broker
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/eapache/queue"
|
||||
"math/rand"
|
||||
"net"
|
||||
"reflect"
|
||||
@@ -14,6 +15,7 @@ import (
|
||||
"github.com/fhmq/hmq/broker/lib/sessions"
|
||||
"github.com/fhmq/hmq/broker/lib/topics"
|
||||
"github.com/fhmq/hmq/plugins/bridge"
|
||||
"golang.org/x/net/websocket"
|
||||
|
||||
"github.com/eclipse/paho.mqtt.golang/packets"
|
||||
"go.uber.org/zap"
|
||||
@@ -40,29 +42,52 @@ const (
|
||||
Disconnected = 2
|
||||
)
|
||||
|
||||
const (
|
||||
awaitRelTimeout int64 = 20
|
||||
retryInterval int64 = 20
|
||||
)
|
||||
|
||||
var (
|
||||
groupCompile = regexp.MustCompile(_GroupTopicRegexp)
|
||||
)
|
||||
|
||||
type client struct {
|
||||
typ int
|
||||
mu sync.Mutex
|
||||
broker *Broker
|
||||
conn net.Conn
|
||||
info info
|
||||
route route
|
||||
status int
|
||||
ctx context.Context
|
||||
cancelFunc context.CancelFunc
|
||||
session *sessions.Session
|
||||
subMap map[string]*subscription
|
||||
topicsMgr *topics.Manager
|
||||
subs []interface{}
|
||||
qoss []byte
|
||||
rmsgs []*packets.PublishPacket
|
||||
routeSubMap map[string]uint64
|
||||
typ int
|
||||
mu sync.Mutex
|
||||
broker *Broker
|
||||
conn net.Conn
|
||||
info info
|
||||
route route
|
||||
status int
|
||||
ctx context.Context
|
||||
cancelFunc context.CancelFunc
|
||||
session *sessions.Session
|
||||
subMap map[string]*subscription
|
||||
topicsMgr *topics.Manager
|
||||
subs []interface{}
|
||||
qoss []byte
|
||||
rmsgs []*packets.PublishPacket
|
||||
routeSubMap map[string]uint64
|
||||
awaitingRel map[uint16]int64
|
||||
maxAwaitingRel int
|
||||
inflight map[uint16]*inflightElem
|
||||
mqueue *queue.Queue
|
||||
retryTimer *time.Timer
|
||||
retryTimerLock sync.Mutex
|
||||
}
|
||||
|
||||
type InflightStatus uint8
|
||||
|
||||
const (
|
||||
Publish InflightStatus = 0
|
||||
Pubrel InflightStatus = 1
|
||||
)
|
||||
|
||||
type inflightElem struct {
|
||||
status InflightStatus
|
||||
packet *packets.PublishPacket
|
||||
timestamp int64
|
||||
}
|
||||
type subscription struct {
|
||||
client *client
|
||||
topic string
|
||||
@@ -87,17 +112,29 @@ type route struct {
|
||||
}
|
||||
|
||||
var (
|
||||
DisconnectdPacket = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
|
||||
r = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
DisconnectedPacket = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
|
||||
r = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
)
|
||||
|
||||
func (c *client) init() {
|
||||
c.status = Connected
|
||||
c.info.localIP, _, _ = net.SplitHostPort(c.conn.LocalAddr().String())
|
||||
c.info.remoteIP, _, _ = net.SplitHostPort(c.conn.RemoteAddr().String())
|
||||
remoteAddr := c.conn.RemoteAddr()
|
||||
remoteNetwork := remoteAddr.Network()
|
||||
c.info.remoteIP = ""
|
||||
if remoteNetwork != "websocket" {
|
||||
c.info.remoteIP, _, _ = net.SplitHostPort(remoteAddr.String())
|
||||
} else {
|
||||
ws := c.conn.(*websocket.Conn)
|
||||
c.info.remoteIP, _, _ = net.SplitHostPort(ws.Request().RemoteAddr)
|
||||
}
|
||||
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
|
||||
c.subMap = make(map[string]*subscription)
|
||||
c.topicsMgr = c.broker.topicsMgr
|
||||
c.routeSubMap = make(map[string]uint64)
|
||||
c.awaitingRel = make(map[uint16]int64)
|
||||
c.inflight = make(map[uint16]*inflightElem)
|
||||
c.mqueue = queue.New()
|
||||
}
|
||||
|
||||
func (c *client) readLoop() {
|
||||
@@ -116,14 +153,16 @@ func (c *client) readLoop() {
|
||||
return
|
||||
default:
|
||||
//add read timeout
|
||||
if err := nc.SetReadDeadline(time.Now().Add(timeOut)); err != nil {
|
||||
log.Error("set read timeout error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
msg := &Message{
|
||||
client: c,
|
||||
packet: DisconnectdPacket,
|
||||
if keepAlive > 0 {
|
||||
if err := nc.SetReadDeadline(time.Now().Add(timeOut)); err != nil {
|
||||
log.Error("set read timeout error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
msg := &Message{
|
||||
client: c,
|
||||
packet: DisconnectedPacket,
|
||||
}
|
||||
b.SubmitWork(c.info.clientID, msg)
|
||||
return
|
||||
}
|
||||
b.SubmitWork(c.info.clientID, msg)
|
||||
return
|
||||
}
|
||||
|
||||
packet, err := packets.ReadPacket(nc)
|
||||
@@ -131,7 +170,7 @@ func (c *client) readLoop() {
|
||||
log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
msg := &Message{
|
||||
client: c,
|
||||
packet: DisconnectdPacket,
|
||||
packet: DisconnectedPacket,
|
||||
}
|
||||
b.SubmitWork(c.info.clientID, msg)
|
||||
return
|
||||
@@ -165,9 +204,43 @@ func ProcessMessage(msg *Message) {
|
||||
packet := ca.(*packets.PublishPacket)
|
||||
c.ProcessPublish(packet)
|
||||
case *packets.PubackPacket:
|
||||
packet := ca.(*packets.PubackPacket)
|
||||
if _, found := c.inflight[packet.MessageID]; found {
|
||||
delete(c.inflight, packet.MessageID)
|
||||
} else {
|
||||
log.Error("Duplicated PUBACK PacketId", zap.Uint16("MessageID", packet.MessageID))
|
||||
}
|
||||
case *packets.PubrecPacket:
|
||||
packet := ca.(*packets.PubrecPacket)
|
||||
if _, found := c.inflight[packet.MessageID]; found {
|
||||
if c.inflight[packet.MessageID].status == Publish {
|
||||
c.inflight[packet.MessageID].status = Pubrel
|
||||
c.inflight[packet.MessageID].timestamp = time.Now().Unix()
|
||||
} else if c.inflight[packet.MessageID].status == Pubrel {
|
||||
log.Error("Duplicated PUBREC PacketId", zap.Uint16("MessageID", packet.MessageID))
|
||||
}
|
||||
} else {
|
||||
log.Error("The PUBREC PacketId is not found.", zap.Uint16("MessageID", packet.MessageID))
|
||||
}
|
||||
|
||||
pubrel := packets.NewControlPacket(packets.Pubrel).(*packets.PubrelPacket)
|
||||
pubrel.MessageID = packet.MessageID
|
||||
if err := c.WriterPacket(pubrel); err != nil {
|
||||
log.Error("send pubrel error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
return
|
||||
}
|
||||
case *packets.PubrelPacket:
|
||||
packet := ca.(*packets.PubrelPacket)
|
||||
c.pubRel(packet.MessageID)
|
||||
pubcomp := packets.NewControlPacket(packets.Pubcomp).(*packets.PubcompPacket)
|
||||
pubcomp.MessageID = packet.MessageID
|
||||
if err := c.WriterPacket(pubcomp); err != nil {
|
||||
log.Error("send pubcomp error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
return
|
||||
}
|
||||
case *packets.PubcompPacket:
|
||||
packet := ca.(*packets.PubcompPacket)
|
||||
delete(c.inflight, packet.MessageID)
|
||||
case *packets.SubscribePacket:
|
||||
packet := ca.(*packets.SubscribePacket)
|
||||
c.ProcessSubscribe(packet)
|
||||
@@ -267,6 +340,17 @@ func (c *client) processClientPublish(packet *packets.PublishPacket) {
|
||||
}
|
||||
c.ProcessPublishMessage(packet)
|
||||
case QosExactlyOnce:
|
||||
if err := c.registerPublishPacketId(packet.MessageID); err != nil {
|
||||
return
|
||||
} else {
|
||||
pubrec := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
|
||||
pubrec.MessageID = packet.MessageID
|
||||
if err := c.WriterPacket(pubrec); err != nil {
|
||||
log.Error("send pubrec error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
return
|
||||
}
|
||||
c.ProcessPublishMessage(packet)
|
||||
}
|
||||
return
|
||||
default:
|
||||
log.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID))
|
||||
@@ -332,6 +416,8 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
|
||||
case CLIENT:
|
||||
c.processClientSubscribe(packet)
|
||||
case ROUTER:
|
||||
fallthrough
|
||||
case REMOTE:
|
||||
c.processRouterSubscribe(packet)
|
||||
}
|
||||
}
|
||||
@@ -648,6 +734,11 @@ func (c *client) Close() {
|
||||
}
|
||||
|
||||
func (c *client) WriterPacket(packet packets.ControlPacket) error {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
log.Error("recover error, ", zap.Any("recover", r))
|
||||
}
|
||||
}()
|
||||
if c.status == Disconnected {
|
||||
return nil
|
||||
}
|
||||
@@ -665,3 +756,50 @@ func (c *client) WriterPacket(packet packets.ControlPacket) error {
|
||||
c.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *client) registerPublishPacketId(packetId uint16) error {
|
||||
if c.isAwaitingFull() {
|
||||
log.Error("Dropped qos2 packet for too many awaiting_rel", zap.Uint16("id", packetId))
|
||||
return errors.New("DROPPED_QOS2_PACKET_FOR_TOO_MANY_AWAITING_REL")
|
||||
}
|
||||
|
||||
if _, found := c.awaitingRel[packetId]; found {
|
||||
return errors.New("RC_PACKET_IDENTIFIER_IN_USE")
|
||||
}
|
||||
c.awaitingRel[packetId] = time.Now().Unix()
|
||||
time.AfterFunc(time.Duration(awaitRelTimeout)*time.Second, c.expireAwaitingRel)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *client) isAwaitingFull() bool {
|
||||
if c.maxAwaitingRel == 0 {
|
||||
return false
|
||||
}
|
||||
if len(c.awaitingRel) < c.maxAwaitingRel {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *client) expireAwaitingRel() {
|
||||
if len(c.awaitingRel) == 0 {
|
||||
return
|
||||
}
|
||||
now := time.Now().Unix()
|
||||
for packetId, Timestamp := range c.awaitingRel {
|
||||
if now-Timestamp >= awaitRelTimeout {
|
||||
log.Error("Dropped qos2 packet for await_rel_timeout", zap.Uint16("id", packetId))
|
||||
delete(c.awaitingRel, packetId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) pubRel(packetId uint16) error {
|
||||
if _, found := c.awaitingRel[packetId]; found {
|
||||
delete(c.awaitingRel, packetId)
|
||||
} else {
|
||||
log.Error("The PUBREL PacketId is not found", zap.Uint16("id", packetId))
|
||||
return errors.New("RC_PACKET_IDENTIFIER_NOT_FOUND")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/eclipse/paho.mqtt.golang/packets"
|
||||
uuid "github.com/satori/go.uuid"
|
||||
uuid "github.com/google/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -114,7 +114,11 @@ func delSubMap(m map[string]uint64, topic string) uint64 {
|
||||
}
|
||||
|
||||
func GenUniqueId() string {
|
||||
return uuid.NewV4().String()
|
||||
id, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
log.Error("uuid.NewRandom() returned an error: " + err.Error())
|
||||
}
|
||||
return id.String()
|
||||
}
|
||||
|
||||
func wrapPublishPacket(packet *packets.PublishPacket) *packets.PublishPacket {
|
||||
@@ -148,8 +152,78 @@ func publish(sub *subscription, packet *packets.PublishPacket) {
|
||||
// log.Error("process message for psub error, ", zap.Error(err))
|
||||
// }
|
||||
|
||||
err := sub.client.WriterPacket(packet)
|
||||
if err != nil {
|
||||
log.Error("process message for psub error, ", zap.Error(err))
|
||||
switch packet.Qos {
|
||||
case QosAtMostOnce:
|
||||
err := sub.client.WriterPacket(packet)
|
||||
if err != nil {
|
||||
log.Error("process message for psub error, ", zap.Error(err))
|
||||
}
|
||||
case QosAtLeastOnce, QosExactlyOnce:
|
||||
sub.client.inflight[packet.MessageID] = &inflightElem{status: Publish, packet: packet, timestamp: time.Now().Unix()}
|
||||
err := sub.client.WriterPacket(packet)
|
||||
if err != nil {
|
||||
log.Error("process message for psub error, ", zap.Error(err))
|
||||
}
|
||||
sub.client.ensureRetryTimer()
|
||||
default:
|
||||
log.Error("publish with unknown qos", zap.String("ClientID", sub.client.info.clientID))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// timer for retry delivery
|
||||
func (c *client) ensureRetryTimer(interval ...int64) {
|
||||
if c.retryTimer != nil {
|
||||
return
|
||||
}
|
||||
if len(interval) > 1 {
|
||||
return
|
||||
}
|
||||
timerInterval := retryInterval
|
||||
if len(interval) == 1 {
|
||||
timerInterval = interval[0]
|
||||
}
|
||||
c.retryTimerLock.Lock()
|
||||
c.retryTimer = time.AfterFunc(time.Duration(timerInterval)*time.Second, c.retryDelivery)
|
||||
c.retryTimerLock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (c *client) resetRetryTimer() {
|
||||
if c.retryTimer == nil {
|
||||
return
|
||||
}
|
||||
// reset timer
|
||||
c.retryTimerLock.Lock()
|
||||
c.retryTimer = nil
|
||||
c.retryTimerLock.Unlock()
|
||||
|
||||
}
|
||||
|
||||
func (c *client) retryDelivery() {
|
||||
c.resetRetryTimer()
|
||||
if c.conn == nil || len(c.inflight) == 0 { //Reset timer when client offline OR inflight is empty
|
||||
return
|
||||
}
|
||||
now := time.Now().Unix()
|
||||
for _, infEle := range c.inflight {
|
||||
age := now - infEle.timestamp
|
||||
if age >= retryInterval {
|
||||
if infEle.status == Publish {
|
||||
c.WriterPacket(infEle.packet)
|
||||
c.inflight[infEle.packet.MessageID].timestamp = now
|
||||
} else if infEle.status == Pubrel {
|
||||
pubrel := packets.NewControlPacket(packets.Pubrel).(*packets.PubrelPacket)
|
||||
pubrel.MessageID = infEle.packet.MessageID
|
||||
c.WriterPacket(pubrel)
|
||||
c.inflight[infEle.packet.MessageID].timestamp = now
|
||||
}
|
||||
} else {
|
||||
if age < 0 {
|
||||
age = 0
|
||||
}
|
||||
c.ensureRetryTimer(retryInterval - age)
|
||||
}
|
||||
}
|
||||
c.ensureRetryTimer()
|
||||
}
|
||||
|
||||
@@ -11,6 +11,8 @@ import (
|
||||
"os"
|
||||
|
||||
"github.com/fhmq/hmq/logger"
|
||||
"github.com/fhmq/hmq/plugins/auth"
|
||||
"github.com/fhmq/hmq/plugins/bridge"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@@ -32,6 +34,11 @@ type Config struct {
|
||||
}
|
||||
|
||||
type Plugins struct {
|
||||
Auth auth.Auth
|
||||
Bridge bridge.BridgeMQ
|
||||
}
|
||||
|
||||
type NamedPlugins struct {
|
||||
Auth string
|
||||
Bridge string
|
||||
}
|
||||
@@ -152,6 +159,18 @@ func LoadConfig(filename string) (*Config, error) {
|
||||
return &config, nil
|
||||
}
|
||||
|
||||
|
||||
func (p *Plugins) UnmarshalJSON(b []byte) error {
|
||||
var named NamedPlugins
|
||||
err := json.Unmarshal(b, &named)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.Auth = auth.NewAuth(named.Auth)
|
||||
p.Bridge = bridge.NewBridgeMQ(named.Bridge)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (config *Config) check() error {
|
||||
|
||||
if config.Worker == 0 {
|
||||
|
||||
@@ -46,6 +46,8 @@ func (c *client) SendConnect() {
|
||||
return
|
||||
}
|
||||
m := packets.NewControlPacket(packets.Connect).(*packets.ConnectPacket)
|
||||
m.ProtocolName = "MQIsdp"
|
||||
m.ProtocolVersion = 3
|
||||
|
||||
m.CleanSession = true
|
||||
m.ClientIdentifier = c.info.clientID
|
||||
|
||||
5
go.mod
5
go.mod
@@ -3,16 +3,17 @@ module github.com/fhmq/hmq
|
||||
go 1.12
|
||||
|
||||
require (
|
||||
github.com/Shopify/sarama v1.26.1
|
||||
github.com/Shopify/sarama v1.23.0
|
||||
github.com/bitly/go-simplejson v0.5.0
|
||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
|
||||
github.com/eapache/queue v1.1.0
|
||||
github.com/eclipse/paho.mqtt.golang v1.2.0
|
||||
github.com/gin-gonic/gin v1.4.0
|
||||
github.com/golang/protobuf v1.3.2 // indirect
|
||||
github.com/google/uuid v1.1.1
|
||||
github.com/kr/pretty v0.1.0 // indirect
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/pkg/errors v0.8.1 // indirect
|
||||
github.com/satori/go.uuid v1.2.0
|
||||
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
|
||||
github.com/stretchr/testify v1.3.0
|
||||
github.com/tidwall/gjson v1.3.0
|
||||
|
||||
4
go.sum
4
go.sum
@@ -30,6 +30,8 @@ github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs
|
||||
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
|
||||
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
|
||||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
|
||||
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
|
||||
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM=
|
||||
@@ -58,8 +60,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
|
||||
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
|
||||
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e h1:uO75wNGioszjmIzcY/tvdDYKRLVvzggtAmmJkn9j4GQ=
|
||||
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e/go.mod h1:tm/wZFQ8e24NYaBGIlnO2WGCAi67re4HHuOm0sftE/M=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
|
||||
@@ -19,5 +19,5 @@ func (a *aclAuth) CheckConnect(clientID, username, password string) bool {
|
||||
}
|
||||
|
||||
func (a *aclAuth) CheckACL(action, clientID, username, ip, topic string) bool {
|
||||
return checkTopicAuth(a.config, action, username, ip, clientID, topic)
|
||||
return checkTopicAuth(a.config, action, ip, username, clientID, topic)
|
||||
}
|
||||
|
||||
23
plugins/auth/authfile/acl_test.go
Normal file
23
plugins/auth/authfile/acl_test.go
Normal file
@@ -0,0 +1,23 @@
|
||||
//+build test
|
||||
|
||||
package acl
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestOrigAcls(t *testing.T) {
|
||||
pwd, _ := os.Getwd()
|
||||
os.Chdir("../../../")
|
||||
aclOrig := Init()
|
||||
os.Chdir(pwd)
|
||||
|
||||
// rule: allow ip 127.0.0.1 2 $SYS/#
|
||||
origAllowed := aclOrig.CheckACL(PUB, "dummyClientID", "dummyUser", "127.0.0.1", "$SYS/something")
|
||||
assert.True(t, origAllowed)
|
||||
origAllowed = aclOrig.CheckACL(SUB, "dummyClientID", "dummyUser", "127.0.0.1", "$SYS/something")
|
||||
assert.False(t, origAllowed)
|
||||
}
|
||||
Reference in New Issue
Block a user