14 Commits

Author SHA1 Message Date
joy.zhou
0d69f13e48 Revert "op: low performance code (#102)"
This reverts commit c2248bed2b.
2021-01-07 16:22:51 +08:00
c2248bed2b op: low performance code (#102)
thanks
2021-01-07 14:12:28 +08:00
turtletramp
6be79cbe88 Bugfix - authfile plugin did wrongly use username as IP and IP as username in ACL checks (#100)
* adding test + fix issue with wrong order in acl check

* reduce to featureset from original fork
2020-12-02 10:05:46 +08:00
sngyai
6cb307d252 Feature qos1&qos2 (#99)
* client publish qos2

* server dispatch qos1&qos2

* Use at most one timer for each client

* Use at most one timer for each client
2020-11-30 11:34:03 +08:00
joy,zhou
b8bacb4c3d fixed bug #96 2020-08-26 17:24:22 +08:00
chujiangke
481a61c520 fix (#90) 2020-06-24 15:14:25 +08:00
Rajiv Shah
4782f76048 Replace satori/go.uuid with google/uuid (#89) 2020-06-09 10:13:37 +08:00
Aleksey Myasnikov
1a374f9734 Update comm.go (#85) 2020-05-08 11:26:44 +08:00
janson
3f60d23e85 fix fail in cluster deploy (#86)
Co-authored-by: janson <janson@gmail.com>
2020-05-08 11:26:26 +08:00
yu
3cf90d5231 add websocket client ip 2020-04-16 14:08:51 +08:00
gerdstolpmann
a1bf3d93b2 only set a read deadline when the keep-alive value is positive (#83) 2020-04-16 10:33:17 +08:00
gerdstolpmann
af7db83bdc do not try to set remoteIP for websocket connections (#81) 2020-04-04 10:41:36 +08:00
gerdstolpmann
839041e912 do not expect "Origin" header for websocket connections (#80)
* websocket: do not check the presence of the "Origin" header

* avoid using http.DefaultServeMux
2020-04-04 10:40:12 +08:00
gerdstolpmann
17dac26996 if used as library, allow that the auth and bridge plugins can be set by (#79)
struct, and not only by name
2020-04-03 14:49:50 +08:00
12 changed files with 308 additions and 45 deletions

3
.gitignore vendored
View File

@@ -1,4 +1,5 @@
hmq
log
log/*
*.test
*.test
.vscode/settings.json

View File

@@ -4,5 +4,8 @@
"--enable=errcheck,varcheck,deadcode",
"--enable=varcheck",
"--enable=deadcode"
],
"cSpell.words": [
"Authorised"
]
}

View File

@@ -1,4 +1,4 @@
FROM golang:1.12 as builder
FROM golang:1.14 as builder
WORKDIR /go/src/github.com/fhmq/hmq
COPY . .
RUN CGO_ENABLED=0 go build -o hmq -a -ldflags '-extldflags "-static"' .

View File

@@ -92,8 +92,8 @@ func NewBroker(config *Config) (*Broker, error) {
b.tlsConfig = tlsconfig
}
b.auth = auth.NewAuth(b.config.Plugin.Auth)
b.bridgeMQ = bridge.NewBridgeMQ(b.config.Plugin.Bridge)
b.auth = b.config.Plugin.Auth
b.bridgeMQ = b.config.Plugin.Bridge
return b, nil
}
@@ -123,7 +123,7 @@ func (b *Broker) Start() {
go InitHTTPMoniter(b)
}
//listen clinet over tcp
//listen client over tcp
if b.config.Port != "" {
go b.StartClientListening(false)
}
@@ -155,12 +155,14 @@ func (b *Broker) StartWebsocketListening() {
path := b.config.WsPath
hp := ":" + b.config.WsPort
log.Info("Start Websocket Listener on:", zap.String("hp", hp), zap.String("path", path))
http.Handle(path, websocket.Handler(b.wsHandler))
ws := &websocket.Server{Handler: websocket.Handler(b.wsHandler)}
mux := http.NewServeMux()
mux.Handle(path, ws)
var err error
if b.config.WsTLS {
err = http.ListenAndServeTLS(hp, b.config.TlsInfo.CertFile, b.config.TlsInfo.KeyFile, nil)
err = http.ListenAndServeTLS(hp, b.config.TlsInfo.CertFile, b.config.TlsInfo.KeyFile, mux)
} else {
err = http.ListenAndServe(hp, nil)
err = http.ListenAndServe(hp, mux)
}
if err != nil {
log.Error("ListenAndServe:" + err.Error())

View File

@@ -3,6 +3,7 @@ package broker
import (
"context"
"errors"
"github.com/eapache/queue"
"math/rand"
"net"
"reflect"
@@ -14,6 +15,7 @@ import (
"github.com/fhmq/hmq/broker/lib/sessions"
"github.com/fhmq/hmq/broker/lib/topics"
"github.com/fhmq/hmq/plugins/bridge"
"golang.org/x/net/websocket"
"github.com/eclipse/paho.mqtt.golang/packets"
"go.uber.org/zap"
@@ -40,29 +42,52 @@ const (
Disconnected = 2
)
const (
awaitRelTimeout int64 = 20
retryInterval int64 = 20
)
var (
groupCompile = regexp.MustCompile(_GroupTopicRegexp)
)
type client struct {
typ int
mu sync.Mutex
broker *Broker
conn net.Conn
info info
route route
status int
ctx context.Context
cancelFunc context.CancelFunc
session *sessions.Session
subMap map[string]*subscription
topicsMgr *topics.Manager
subs []interface{}
qoss []byte
rmsgs []*packets.PublishPacket
routeSubMap map[string]uint64
typ int
mu sync.Mutex
broker *Broker
conn net.Conn
info info
route route
status int
ctx context.Context
cancelFunc context.CancelFunc
session *sessions.Session
subMap map[string]*subscription
topicsMgr *topics.Manager
subs []interface{}
qoss []byte
rmsgs []*packets.PublishPacket
routeSubMap map[string]uint64
awaitingRel map[uint16]int64
maxAwaitingRel int
inflight map[uint16]*inflightElem
mqueue *queue.Queue
retryTimer *time.Timer
retryTimerLock sync.Mutex
}
type InflightStatus uint8
const (
Publish InflightStatus = 0
Pubrel InflightStatus = 1
)
type inflightElem struct {
status InflightStatus
packet *packets.PublishPacket
timestamp int64
}
type subscription struct {
client *client
topic string
@@ -87,17 +112,29 @@ type route struct {
}
var (
DisconnectdPacket = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
r = rand.New(rand.NewSource(time.Now().UnixNano()))
DisconnectedPacket = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
r = rand.New(rand.NewSource(time.Now().UnixNano()))
)
func (c *client) init() {
c.status = Connected
c.info.localIP, _, _ = net.SplitHostPort(c.conn.LocalAddr().String())
c.info.remoteIP, _, _ = net.SplitHostPort(c.conn.RemoteAddr().String())
remoteAddr := c.conn.RemoteAddr()
remoteNetwork := remoteAddr.Network()
c.info.remoteIP = ""
if remoteNetwork != "websocket" {
c.info.remoteIP, _, _ = net.SplitHostPort(remoteAddr.String())
} else {
ws := c.conn.(*websocket.Conn)
c.info.remoteIP, _, _ = net.SplitHostPort(ws.Request().RemoteAddr)
}
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
c.subMap = make(map[string]*subscription)
c.topicsMgr = c.broker.topicsMgr
c.routeSubMap = make(map[string]uint64)
c.awaitingRel = make(map[uint16]int64)
c.inflight = make(map[uint16]*inflightElem)
c.mqueue = queue.New()
}
func (c *client) readLoop() {
@@ -116,14 +153,16 @@ func (c *client) readLoop() {
return
default:
//add read timeout
if err := nc.SetReadDeadline(time.Now().Add(timeOut)); err != nil {
log.Error("set read timeout error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
msg := &Message{
client: c,
packet: DisconnectdPacket,
if keepAlive > 0 {
if err := nc.SetReadDeadline(time.Now().Add(timeOut)); err != nil {
log.Error("set read timeout error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
msg := &Message{
client: c,
packet: DisconnectedPacket,
}
b.SubmitWork(c.info.clientID, msg)
return
}
b.SubmitWork(c.info.clientID, msg)
return
}
packet, err := packets.ReadPacket(nc)
@@ -131,7 +170,7 @@ func (c *client) readLoop() {
log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
msg := &Message{
client: c,
packet: DisconnectdPacket,
packet: DisconnectedPacket,
}
b.SubmitWork(c.info.clientID, msg)
return
@@ -165,9 +204,43 @@ func ProcessMessage(msg *Message) {
packet := ca.(*packets.PublishPacket)
c.ProcessPublish(packet)
case *packets.PubackPacket:
packet := ca.(*packets.PubackPacket)
if _, found := c.inflight[packet.MessageID]; found {
delete(c.inflight, packet.MessageID)
} else {
log.Error("Duplicated PUBACK PacketId", zap.Uint16("MessageID", packet.MessageID))
}
case *packets.PubrecPacket:
packet := ca.(*packets.PubrecPacket)
if _, found := c.inflight[packet.MessageID]; found {
if c.inflight[packet.MessageID].status == Publish {
c.inflight[packet.MessageID].status = Pubrel
c.inflight[packet.MessageID].timestamp = time.Now().Unix()
} else if c.inflight[packet.MessageID].status == Pubrel {
log.Error("Duplicated PUBREC PacketId", zap.Uint16("MessageID", packet.MessageID))
}
} else {
log.Error("The PUBREC PacketId is not found.", zap.Uint16("MessageID", packet.MessageID))
}
pubrel := packets.NewControlPacket(packets.Pubrel).(*packets.PubrelPacket)
pubrel.MessageID = packet.MessageID
if err := c.WriterPacket(pubrel); err != nil {
log.Error("send pubrel error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
case *packets.PubrelPacket:
packet := ca.(*packets.PubrelPacket)
c.pubRel(packet.MessageID)
pubcomp := packets.NewControlPacket(packets.Pubcomp).(*packets.PubcompPacket)
pubcomp.MessageID = packet.MessageID
if err := c.WriterPacket(pubcomp); err != nil {
log.Error("send pubcomp error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
case *packets.PubcompPacket:
packet := ca.(*packets.PubcompPacket)
delete(c.inflight, packet.MessageID)
case *packets.SubscribePacket:
packet := ca.(*packets.SubscribePacket)
c.ProcessSubscribe(packet)
@@ -267,6 +340,17 @@ func (c *client) processClientPublish(packet *packets.PublishPacket) {
}
c.ProcessPublishMessage(packet)
case QosExactlyOnce:
if err := c.registerPublishPacketId(packet.MessageID); err != nil {
return
} else {
pubrec := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
pubrec.MessageID = packet.MessageID
if err := c.WriterPacket(pubrec); err != nil {
log.Error("send pubrec error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
c.ProcessPublishMessage(packet)
}
return
default:
log.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID))
@@ -332,6 +416,8 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
case CLIENT:
c.processClientSubscribe(packet)
case ROUTER:
fallthrough
case REMOTE:
c.processRouterSubscribe(packet)
}
}
@@ -648,6 +734,11 @@ func (c *client) Close() {
}
func (c *client) WriterPacket(packet packets.ControlPacket) error {
defer func() {
if err := recover(); err != nil {
log.Error("recover error, ", zap.Any("recover", r))
}
}()
if c.status == Disconnected {
return nil
}
@@ -665,3 +756,50 @@ func (c *client) WriterPacket(packet packets.ControlPacket) error {
c.mu.Unlock()
return err
}
func (c *client) registerPublishPacketId(packetId uint16) error {
if c.isAwaitingFull() {
log.Error("Dropped qos2 packet for too many awaiting_rel", zap.Uint16("id", packetId))
return errors.New("DROPPED_QOS2_PACKET_FOR_TOO_MANY_AWAITING_REL")
}
if _, found := c.awaitingRel[packetId]; found {
return errors.New("RC_PACKET_IDENTIFIER_IN_USE")
}
c.awaitingRel[packetId] = time.Now().Unix()
time.AfterFunc(time.Duration(awaitRelTimeout)*time.Second, c.expireAwaitingRel)
return nil
}
func (c *client) isAwaitingFull() bool {
if c.maxAwaitingRel == 0 {
return false
}
if len(c.awaitingRel) < c.maxAwaitingRel {
return false
}
return true
}
func (c *client) expireAwaitingRel() {
if len(c.awaitingRel) == 0 {
return
}
now := time.Now().Unix()
for packetId, Timestamp := range c.awaitingRel {
if now-Timestamp >= awaitRelTimeout {
log.Error("Dropped qos2 packet for await_rel_timeout", zap.Uint16("id", packetId))
delete(c.awaitingRel, packetId)
}
}
}
func (c *client) pubRel(packetId uint16) error {
if _, found := c.awaitingRel[packetId]; found {
delete(c.awaitingRel, packetId)
} else {
log.Error("The PUBREL PacketId is not found", zap.Uint16("id", packetId))
return errors.New("RC_PACKET_IDENTIFIER_NOT_FOUND")
}
return nil
}

View File

@@ -9,7 +9,7 @@ import (
"go.uber.org/zap"
"github.com/eclipse/paho.mqtt.golang/packets"
uuid "github.com/satori/go.uuid"
uuid "github.com/google/uuid"
)
const (
@@ -114,7 +114,11 @@ func delSubMap(m map[string]uint64, topic string) uint64 {
}
func GenUniqueId() string {
return uuid.NewV4().String()
id, err := uuid.NewRandom()
if err != nil {
log.Error("uuid.NewRandom() returned an error: " + err.Error())
}
return id.String()
}
func wrapPublishPacket(packet *packets.PublishPacket) *packets.PublishPacket {
@@ -148,8 +152,78 @@ func publish(sub *subscription, packet *packets.PublishPacket) {
// log.Error("process message for psub error, ", zap.Error(err))
// }
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("process message for psub error, ", zap.Error(err))
switch packet.Qos {
case QosAtMostOnce:
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("process message for psub error, ", zap.Error(err))
}
case QosAtLeastOnce, QosExactlyOnce:
sub.client.inflight[packet.MessageID] = &inflightElem{status: Publish, packet: packet, timestamp: time.Now().Unix()}
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("process message for psub error, ", zap.Error(err))
}
sub.client.ensureRetryTimer()
default:
log.Error("publish with unknown qos", zap.String("ClientID", sub.client.info.clientID))
return
}
}
// timer for retry delivery
func (c *client) ensureRetryTimer(interval ...int64) {
if c.retryTimer != nil {
return
}
if len(interval) > 1 {
return
}
timerInterval := retryInterval
if len(interval) == 1 {
timerInterval = interval[0]
}
c.retryTimerLock.Lock()
c.retryTimer = time.AfterFunc(time.Duration(timerInterval)*time.Second, c.retryDelivery)
c.retryTimerLock.Unlock()
return
}
func (c *client) resetRetryTimer() {
if c.retryTimer == nil {
return
}
// reset timer
c.retryTimerLock.Lock()
c.retryTimer = nil
c.retryTimerLock.Unlock()
}
func (c *client) retryDelivery() {
c.resetRetryTimer()
if c.conn == nil || len(c.inflight) == 0 { //Reset timer when client offline OR inflight is empty
return
}
now := time.Now().Unix()
for _, infEle := range c.inflight {
age := now - infEle.timestamp
if age >= retryInterval {
if infEle.status == Publish {
c.WriterPacket(infEle.packet)
c.inflight[infEle.packet.MessageID].timestamp = now
} else if infEle.status == Pubrel {
pubrel := packets.NewControlPacket(packets.Pubrel).(*packets.PubrelPacket)
pubrel.MessageID = infEle.packet.MessageID
c.WriterPacket(pubrel)
c.inflight[infEle.packet.MessageID].timestamp = now
}
} else {
if age < 0 {
age = 0
}
c.ensureRetryTimer(retryInterval - age)
}
}
c.ensureRetryTimer()
}

View File

@@ -11,6 +11,8 @@ import (
"os"
"github.com/fhmq/hmq/logger"
"github.com/fhmq/hmq/plugins/auth"
"github.com/fhmq/hmq/plugins/bridge"
"go.uber.org/zap"
)
@@ -32,6 +34,11 @@ type Config struct {
}
type Plugins struct {
Auth auth.Auth
Bridge bridge.BridgeMQ
}
type NamedPlugins struct {
Auth string
Bridge string
}
@@ -152,6 +159,18 @@ func LoadConfig(filename string) (*Config, error) {
return &config, nil
}
func (p *Plugins) UnmarshalJSON(b []byte) error {
var named NamedPlugins
err := json.Unmarshal(b, &named)
if err != nil {
return err
}
p.Auth = auth.NewAuth(named.Auth)
p.Bridge = bridge.NewBridgeMQ(named.Bridge)
return nil
}
func (config *Config) check() error {
if config.Worker == 0 {

View File

@@ -46,6 +46,8 @@ func (c *client) SendConnect() {
return
}
m := packets.NewControlPacket(packets.Connect).(*packets.ConnectPacket)
m.ProtocolName = "MQIsdp"
m.ProtocolVersion = 3
m.CleanSession = true
m.ClientIdentifier = c.info.clientID

5
go.mod
View File

@@ -3,16 +3,17 @@ module github.com/fhmq/hmq
go 1.12
require (
github.com/Shopify/sarama v1.26.1
github.com/Shopify/sarama v1.23.0
github.com/bitly/go-simplejson v0.5.0
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/eapache/queue v1.1.0
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/gin-gonic/gin v1.4.0
github.com/golang/protobuf v1.3.2 // indirect
github.com/google/uuid v1.1.1
github.com/kr/pretty v0.1.0 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.8.1 // indirect
github.com/satori/go.uuid v1.2.0
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/stretchr/testify v1.3.0
github.com/tidwall/gjson v1.3.0

4
go.sum
View File

@@ -30,6 +30,8 @@ github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM=
@@ -58,8 +60,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e h1:uO75wNGioszjmIzcY/tvdDYKRLVvzggtAmmJkn9j4GQ=
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e/go.mod h1:tm/wZFQ8e24NYaBGIlnO2WGCAi67re4HHuOm0sftE/M=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

View File

@@ -19,5 +19,5 @@ func (a *aclAuth) CheckConnect(clientID, username, password string) bool {
}
func (a *aclAuth) CheckACL(action, clientID, username, ip, topic string) bool {
return checkTopicAuth(a.config, action, username, ip, clientID, topic)
return checkTopicAuth(a.config, action, ip, username, clientID, topic)
}

View File

@@ -0,0 +1,23 @@
//+build test
package acl
import (
"os"
"testing"
"github.com/stretchr/testify/assert"
)
func TestOrigAcls(t *testing.T) {
pwd, _ := os.Getwd()
os.Chdir("../../../")
aclOrig := Init()
os.Chdir(pwd)
// rule: allow ip 127.0.0.1 2 $SYS/#
origAllowed := aclOrig.CheckACL(PUB, "dummyClientID", "dummyUser", "127.0.0.1", "$SYS/something")
assert.True(t, origAllowed)
origAllowed = aclOrig.CheckACL(SUB, "dummyClientID", "dummyUser", "127.0.0.1", "$SYS/something")
assert.False(t, origAllowed)
}