diff --git a/broker/broker.go b/broker/broker.go index 8d51929..a9eab7e 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -17,6 +17,8 @@ import ( "github.com/fhmq/hmq/pool" "github.com/eclipse/paho.mqtt.golang/packets" + + v5packets "github.com/eclipse/paho.golang/packets" "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/net/websocket" @@ -322,6 +324,123 @@ func (b *Broker) DisConnClientByClientId(clientId string) { conn.Close() } +func (b *Broker) handleV5Connection(typ int, conn net.Conn) error { + //process connect packet + cp, err := v5packets.ReadPacket(conn) + if err != nil { + return errors.New(fmt.Sprintf("read connect packet error:%v", err)) + } + if cp == nil { + return errors.New("received nil packet") + } + if cp.Type != v5packets.CONNECT { + return errors.New("received msg that was not Connect") + } + + msg := cp.Content.(*v5packets.Connect) + + log.Info("read connect from ", getAdditionalLogFields(msg.ClientID, conn)...) + + connack := &v5packets.Connack{ + SessionPresent: msg.CleanStart, + } + + if typ == CLIENT && !b.CheckConnectAuth(msg.ClientID, msg.Username, string(msg.Password)) { + connack.ReasonCode = v5packets.ConnackBadUsernameOrPassword + if _, err := connack.WriteTo(conn); err != nil { + return fmt.Errorf("send connack error:%v,clientID:%v,conn:%v", err, msg.ClientID, conn) + } + return fmt.Errorf("connect packet CheckConnectAuth failed with connack.ReturnCode%v", connack.ReasonCode) + } + + if _, err := connack.WriteTo(conn); err != nil { + return fmt.Errorf("send connack error:%v,clientID:%v,conn:%v", err, msg.ClientID, conn) + } + + willmsg := &v5packets.Publish{} + if msg.WillFlag { + willmsg.QoS = msg.WillQOS + willmsg.Topic = msg.WillTopic + willmsg.Retain = msg.WillRetain + willmsg.Payload = msg.WillMessage + } else { + willmsg = nil + } + info := info{ + clientID: msg.ClientID, + username: msg.Username, + password: msg.Password, + keepalive: msg.KeepAlive, + willMsg: willmsg, + } + + c := &client{ + typ: typ, + broker: b, + conn: conn, + info: info, + } + + c.init() + + if err := b.getSession(c, msg, connack); err != nil { + return fmt.Errorf("get session error:%v,clientID:%v,conn:%v", err, msg.ClientIdentifier, conn) + } + + cid := c.info.clientID + + var exists bool + var old interface{} + + switch typ { + case CLIENT: + old, exists = b.clients.Load(cid) + if exists { + if ol, ok := old.(*client); ok { + log.Warn("client exists, close old client", getAdditionalLogFields(ol.info.clientID, ol.conn)...) + ol.Close() + } + } + b.clients.Store(cid, c) + + var pubPack = PubPacket{} + if willmsg != nil { + pubPack.TopicName = info.willMsg.TopicName + pubPack.Payload = info.willMsg.Payload + } + + pubInfo := Info{ + ClientID: info.clientID, + Username: info.username, + Password: info.password, + Keepalive: info.keepalive, + WillMsg: pubPack, + } + + b.OnlineOfflineNotification(pubInfo, true, c.lastMsgTime) + { + b.Publish(&bridge.Elements{ + ClientID: msg.ClientIdentifier, + Username: msg.Username, + Action: bridge.Connect, + Timestamp: time.Now().Unix(), + }) + } + case ROUTER: + old, exists = b.routes.Load(cid) + if exists { + if ol, ok := old.(*client); ok { + log.Warn("router exists, close old router", getAdditionalLogFields(ol.info.clientID, ol.conn)...) + ol.Close() + } + } + b.routes.Store(cid, c) + } + + c.readLoop() + return nil +} + func (b *Broker) handleConnection(typ int, conn net.Conn) error { //process connect packet packet, err := packets.ReadPacket(conn) diff --git a/broker/config.go b/broker/config.go index 2aa22e2..1c155d0 100644 --- a/broker/config.go +++ b/broker/config.go @@ -6,7 +6,6 @@ import ( "errors" "flag" "fmt" - "io/ioutil" "os" "github.com/fhmq/hmq/logger" @@ -22,6 +21,7 @@ type Config struct { Worker int `json:"workerNum"` HTTPPort string `json:"httpPort"` Host string `json:"host"` + V5Port string `json:"v5Port"` Port string `json:"port"` Cluster RouteInfo `json:"cluster"` Router string `json:"router"` @@ -89,6 +89,8 @@ func ConfigureConfig(args []string) (*Config, error) { fs.StringVar(&config.HTTPPort, "hp", "8080", "Port to listen on.") fs.StringVar(&config.Port, "port", "1883", "Port to listen on.") fs.StringVar(&config.Port, "p", "1883", "Port to listen on.") + fs.StringVar(&config.V5Port, "v5port", "", "Port v5 to listen on.") + fs.StringVar(&config.V5Port, "p5", "", "Port v5 to listen on.") fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on") fs.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.") fs.StringVar(&config.Cluster.Port, "clusterport", "", "Cluster port from which members can connect.") @@ -144,7 +146,7 @@ func ConfigureConfig(args []string) (*Config, error) { func LoadConfig(filename string) (*Config, error) { - content, err := ioutil.ReadFile(filename) + content, err := os.ReadFile(filename) if err != nil { // log.Error("Read config file error: ", zap.Error(err)) return nil, err @@ -231,7 +233,7 @@ func NewTLSConfig(tlsInfo TLSInfo) (*tls.Config, error) { } // Add in CAs if applicable. if tlsInfo.CaFile != "" { - rootPEM, err := ioutil.ReadFile(tlsInfo.CaFile) + rootPEM, err := os.ReadFile(tlsInfo.CaFile) if err != nil || rootPEM == nil { return nil, err } diff --git a/conf/hmq.config b/conf/hmq.config index 4256f2a..7528e73 100644 --- a/conf/hmq.config +++ b/conf/hmq.config @@ -1,6 +1,7 @@ { "workerNum": 4096, "port": "1883", + "v5Port": "1885", "host": "0.0.0.0", "cluster": { "host": "0.0.0.0", @@ -11,6 +12,7 @@ "tlsHost": "0.0.0.0", "wsPort": "1888", "wsPath": "/ws", + "ws5Path": "/ws5", "wsTLS": false, "tlsInfo": { "verify": true, diff --git a/go.mod b/go.mod index 4c3ff0f..3605b77 100644 --- a/go.mod +++ b/go.mod @@ -7,12 +7,13 @@ require ( github.com/bitly/go-simplejson v0.5.0 github.com/cespare/xxhash/v2 v2.1.2 github.com/eapache/queue v1.1.0 - github.com/eclipse/paho.mqtt.golang v1.4.2 + github.com/eclipse/paho.golang v0.12.0 + github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/gin-gonic/gin v1.9.1 github.com/google/uuid v1.3.0 github.com/json-iterator/go v1.1.12 github.com/patrickmn/go-cache v2.1.0+incompatible - github.com/stretchr/testify v1.8.3 + github.com/stretchr/testify v1.8.4 go.uber.org/zap v1.24.0 golang.org/x/net v0.17.0 ) @@ -41,7 +42,6 @@ require ( github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/klauspost/compress v1.15.14 // indirect github.com/klauspost/cpuid/v2 v2.2.4 // indirect - github.com/kr/text v0.2.0 // indirect github.com/leodido/go-urn v1.2.4 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/go.sum b/go.sum index e09bbc6..1068131 100644 --- a/go.sum +++ b/go.sum @@ -16,7 +16,6 @@ github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -26,8 +25,10 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9 github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= -github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4= -github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= +github.com/eclipse/paho.golang v0.12.0 h1:EXQFJbJklDnUqW6lyAknMWRhM2NgpHxwrrL8riUmp3Q= +github.com/eclipse/paho.golang v0.12.0/go.mod h1:TSDCUivu9JnoR9Hl+H7sQMcHkejWH2/xKK1NJGtLbIE= +github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= +github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= @@ -49,14 +50,14 @@ github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= -github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= @@ -120,16 +121,17 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= -go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= @@ -142,16 +144,13 @@ golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -168,7 +167,6 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=