diff --git a/README.md b/README.md index 8a282f7..6c5fc27 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,44 @@ -# fhmq -Free and high performance mqtt broker +Free and High Performance MQTT Broker +============ + +## About +Golang MQTT Broker, Version 3.1.1, and Compatible +for [eclipse paho client](https://github.com/eclipse?utf8=%E2%9C%93&q=mqtt&type=&language=) + +## RUNNING +```bash +$ git clone https://github.com/fhmq/fhmq.git +$ cd fhmq +$ go run main.go +``` + +### broker.config +~~~ +{ + "port": "1883", + "host": "0.0.0.0", + "cluster": { + "host": "0.0.0.0", + "port": "1993", + "routers": ["192.168.10.11:1993","192.168.10.12:1993"] + } +} +~~~ + +### Features and Future + +* Supports QOS 0 + +* Cluster Support + +* Supports retained messages + +* Supports will messages + +* Queue subscribe + +### QUEUE SUBSCRIBE + +| Prefix | Examples | +| ------------- |---------------------------------| +| $queue/ | mosquitto_sub -t ‘$queue/topic’ | diff --git a/broker/broker.go b/broker/broker.go index aa34122..fd26549 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -11,8 +11,9 @@ import ( type Broker struct { id string config *Config - clients ClientMap - routes ClientMap + clients cMap + routes cMap + remotes cMap sl *Sublist rl *RetainList queues map[string]int @@ -20,12 +21,14 @@ type Broker struct { func NewBroker(config *Config) *Broker { return &Broker{ + id: GenUniqueId(), config: config, sl: NewSublist(), rl: NewRetainList(), queues: make(map[string]int), clients: NewClientMap(), routes: NewClientMap(), + remotes: NewClientMap(), } } @@ -117,25 +120,29 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx int) { } c := &client{ + typ: typ, broker: b, conn: conn, info: info, } c.init() + var msgPool *MessagePool var exist bool var old *client cid := string(c.info.clientID) if typ == CLIENT { old, exist = b.clients.Update(cid, c) + msgPool = MSGPool[idx%MessagePoolNum].GetPool() } else if typ == ROUTER { old, exist = b.routes.Update(cid, c) + msgPool = MSGPool[MessagePoolNum].GetPool() } if exist { log.Warn("client or routers exists, close old...") old.Close() } - c.readLoop(idx) + c.readLoop(msgPool) } func (b *Broker) ConnectToRouters() { @@ -160,7 +167,121 @@ func (b *Broker) connectRouter(url, remoteID string) { remoteID: remoteID, remoteUrl: url, } + cid := GenUniqueId() + info := info{ + clientID: []byte(cid), + } + c := &client{ + typ: REMOTE, + conn: conn, + route: route, + info: info, + } + b.remotes.Set(cid, c) + c.SendConnect() + c.SendInfo() // s.createRemote(conn, route) - return + msgPool := MSGPool[(MessagePoolNum + 1)].GetPool() + c.readLoop(msgPool) + } +} + +func (b *Broker) CheckRemoteExist(remoteID, url string) bool { + exist := false + remotes := b.remotes.Items() + for _, v := range remotes { + if v.route.remoteUrl == url { + // if v.route.remoteID == "" || v.route.remoteID != remoteID { + v.route.remoteID = remoteID + // } + exist = true + break + } + } + return exist +} + +func (b *Broker) SendLocalSubsToRouter(c *client) { + clients := b.clients.Items() + subMsg := message.NewSubscribeMessage() + for _, client := range clients { + subs := client.subs + for _, sub := range subs { + subMsg.AddTopic(sub.topic, sub.qos) + } + } + err := c.writeMessage(subMsg) + if err != nil { + log.Error("Send localsubs To Router error :", err) + } +} + +func (b *Broker) BroadcastInfoMessage(remoteID string, msg message.Message) { + remotes := b.remotes.Items() + for _, r := range remotes { + if r.route.remoteID == remoteID { + continue + } + r.writeMessage(msg) + } + // log.Info("BroadcastInfoMessage success ") +} + +func (b *Broker) BroadcastSubOrUnsubMessage(buf []byte) { + remotes := b.remotes.Items() + for _, r := range remotes { + r.writeBuffer(buf) + } + // log.Info("BroadcastSubscribeMessage remotes: ", s.remotes) +} + +func (b *Broker) removeClient(c *client) { + clientId := string(c.info.clientID) + typ := c.typ + switch typ { + case CLIENT: + b.clients.Remove(clientId) + case ROUTER: + b.routes.Remove(clientId) + case REMOTE: + b.remotes.Remove(clientId) + } + // log.Info("delete client ,", clientId) +} + +func (b *Broker) ProcessPublishMessage(msg *message.PublishMessage) { + if b == nil { + return + } + topic := string(msg.Topic()) + + r := b.sl.Match(topic) + // log.Info("psubs num: ", len(r.psubs)) + if len(r.qsubs) == 0 && len(r.psubs) == 0 { + return + } + + for _, sub := range r.psubs { + if sub != nil { + err := sub.client.writeMessage(msg) + if err != nil { + log.Error("process message for psub error, ", err) + } + } + } + + for i, sub := range r.qsubs { + // s.qmu.Lock() + if cnt, exist := b.queues[string(sub.topic)]; exist && i == cnt { + if sub != nil { + err := sub.client.writeMessage(msg) + if err != nil { + log.Error("process will message for qsub error, ", err) + } + } + b.queues[topic] = (b.queues[topic] + 1) % len(r.qsubs) + break + } + // s.qmu.Unlock() } } diff --git a/broker/client.go b/broker/client.go index b3c08a1..645d7a2 100644 --- a/broker/client.go +++ b/broker/client.go @@ -59,9 +59,8 @@ func (c *client) init() { c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0] } -func (c *client) readLoop(idx int) { +func (c *client) readLoop(msgPool *MessagePool) { nc := c.conn - msgPool := MSGPool[idx%MessagePoolNum].GetPool() if nc == nil || msgPool == nil { return } @@ -144,6 +143,7 @@ func (c *client) ProcessPublish(buf []byte) { msg, err := DecodePublishMessage(buf) if err != nil { log.Error("Decode Publish Message error: ", err) + c.Close() return } c.ProcessPublishMessage(buf, msg) @@ -158,12 +158,14 @@ func (c *client) ProcessPublish(buf []byte) { } } + func (c *client) ProcessPublishMessage(buf []byte, msg *message.PublishMessage) { b := c.broker if b == nil { return } + typ := c.typ topic := string(msg.Topic()) r := b.sl.Match(topic) @@ -173,11 +175,11 @@ func (c *client) ProcessPublishMessage(buf []byte, msg *message.PublishMessage) } for _, sub := range r.psubs { - // if sub.client.typ == ROUTER { - // if typ == ROUTER { - // continue - // } - // } + if sub.client.typ == ROUTER { + if typ == ROUTER { + continue + } + } if sub != nil { err := sub.client.writeBuffer(buf) if err != nil { @@ -187,11 +189,11 @@ func (c *client) ProcessPublishMessage(buf []byte, msg *message.PublishMessage) } for i, sub := range r.qsubs { - // if sub.client.typ == ROUTER { - // if typ == ROUTER { - // continue - // } - // } + if sub.client.typ == ROUTER { + if typ == ROUTER { + continue + } + } // s.qmu.Lock() if cnt, exist := b.queues[string(sub.topic)]; exist && i == cnt { if sub != nil { @@ -224,8 +226,8 @@ func (c *client) ProcessPubComp(buf []byte) { } func (c *client) ProcessSubscribe(buf []byte) { - srv := c.broker - if srv == nil { + b := c.broker + if b == nil { return } msg, err := DecodeSubscribeMessage(buf) @@ -256,11 +258,11 @@ func (c *client) ProcessSubscribe(buf []byte) { if len(t) > 7 { t = t[7:] queue = true - // srv.qmu.Lock() - if _, exists := srv.queues[topic]; !exists { - srv.queues[topic] = 0 + // b.qmu.Lock() + if _, exists := b.queues[topic]; !exists { + b.queues[topic] = 0 } - // srv.qmu.Unlock() + // b.qmu.Unlock() } else { retcodes = append(retcodes, message.QosFailure) continue @@ -273,11 +275,11 @@ func (c *client) ProcessSubscribe(buf []byte) { queue: queue, } - // c.mu.Lock() + c.mu.Lock() c.subs[topic] = sub - // c.mu.Unlock() + c.mu.Unlock() - err := srv.sl.Insert(sub) + err := b.sl.Insert(sub) if err != nil { log.Error("Insert subscription error: ", err) retcodes = append(retcodes, message.QosFailure) @@ -305,15 +307,13 @@ func (c *client) ProcessSubscribe(buf []byte) { return } //broadcast subscribe message - // if typ == CLIENT { - // srv.startGoRoutine(func() { - // srv.BroadcastSubscribeMessage(buf) - // }) - // } + if c.typ == CLIENT { + go b.BroadcastSubOrUnsubMessage(buf) + } //process retain message for _, t := range topics { - bufs := srv.rl.Match(t) + bufs := b.rl.Match(t) for _, buf := range bufs { log.Info("process retain message: ", string(buf)) if buf != nil && string(buf) != "" { @@ -324,8 +324,8 @@ func (c *client) ProcessSubscribe(buf []byte) { } func (c *client) ProcessUnSubscribe(buf []byte) { - srv := c.broker - if srv == nil { + b := c.broker + if b == nil { return } @@ -356,9 +356,9 @@ func (c *client) ProcessUnSubscribe(buf []byte) { return } // //process ubsubscribe message - // if typ == CLIENT { - // c.srv.BroadcastUnSubscribeMessage(msg) - // } + if c.typ == CLIENT { + b.BroadcastSubOrUnsubMessage(buf) + } } func (c *client) unsubscribe(sub *subscription) { @@ -389,15 +389,19 @@ func (c *client) ProcessPing(buf []byte) { } func (c *client) Close() { - srv := c.broker + b := c.broker subs := c.subs - if srv != nil { + if b != nil { + b.removeClient(c) for _, sub := range subs { - err := srv.sl.Remove(sub) + err := b.sl.Remove(sub) if err != nil { log.Error("closed client but remove sublist error, ", err) } } + if c.info.willMsg != nil { + b.ProcessPublishMessage(c.info.willMsg) + } } if c.conn != nil { c.conn.Close() diff --git a/broker/clientmap.go b/broker/clientmap.go index f0266c0..e35445a 100644 --- a/broker/clientmap.go +++ b/broker/clientmap.go @@ -2,7 +2,7 @@ package broker import "sync" -type ClientMap interface { +type cMap interface { Set(key string, val *client) Get(key string) (*client, bool) Items() map[string]*client @@ -17,7 +17,7 @@ type clientMap struct { mu sync.RWMutex } -func NewClientMap() ClientMap { +func NewClientMap() cMap { smap := &clientMap{ items: make(map[string]*client), } diff --git a/broker/comm.go b/broker/comm.go index ed1e86d..208feac 100644 --- a/broker/comm.go +++ b/broker/comm.go @@ -2,7 +2,12 @@ package broker import ( "bytes" + "crypto/md5" + "crypto/rand" + "encoding/base64" + "encoding/hex" "errors" + "io" "reflect" "strings" "time" @@ -128,3 +133,14 @@ func equal(k1, k2 interface{}) bool { } return false } + +func GenUniqueId() string { + b := make([]byte, 48) + if _, err := io.ReadFull(rand.Reader, b); err != nil { + return "" + } + h := md5.New() + h.Write([]byte(base64.URLEncoding.EncodeToString(b))) + return hex.EncodeToString(h.Sum(nil)) + // return GetMd5String() +} diff --git a/broker/config.go b/broker/config.go index 693eff7..58bfca4 100644 --- a/broker/config.go +++ b/broker/config.go @@ -5,7 +5,7 @@ import ( "errors" "io/ioutil" - "github.com/prometheus/common/log" + log "github.com/cihub/seelog" ) const ( diff --git a/broker/info.go b/broker/info.go new file mode 100644 index 0000000..2d81878 --- /dev/null +++ b/broker/info.go @@ -0,0 +1,113 @@ +package broker + +import ( + "fhmq/lib/message" + "fmt" + "time" + + simplejson "github.com/bitly/go-simplejson" + log "github.com/cihub/seelog" +) + +func (c *client) SendInfo() { + url := c.info.localIP + ":" + c.broker.config.Cluster.Port + + infoMsg := NewInfo(c.broker.id, url, false) + err := c.writeMessage(infoMsg) + if err != nil { + log.Error("send info message error, ", err) + return + } + // log.Info("send info success") +} + +func (c *client) StartPing() { + timeTicker := time.NewTicker(time.Second * 30) + ping := message.NewPingreqMessage() + for { + select { + case <-timeTicker.C: + err := c.writeMessage(ping) + if err != nil { + log.Error("ping error: ", err) + } + } + } +} + +func (c *client) SendConnect() { + + clientID := c.info.clientID + connMsg := message.NewConnectMessage() + connMsg.SetClientId(clientID) + connMsg.SetVersion(0x04) + err := c.writeMessage(connMsg) + if err != nil { + log.Error("send connect message error, ", err) + return + } + // log.Info("send connet success") +} + +func NewInfo(sid, url string, isforword bool) *message.PublishMessage { + infoMsg := message.NewPublishMessage() + infoMsg.SetTopic([]byte(BrokerInfoTopic)) + info := fmt.Sprintf(`{"remoteID":"%s","url":"%s","isForward":%t}`, sid, url, isforword) + // log.Info("new info", string(info)) + infoMsg.SetPayload([]byte(info)) + infoMsg.SetQoS(0) + infoMsg.SetRetain(false) + return infoMsg +} + +func (c *client) ProcessInfo(msg *message.PublishMessage) { + nc := c.conn + b := c.broker + if nc == nil { + return + } + + log.Info("recv remoteInfo: ", string(msg.Payload())) + + js, e := simplejson.NewJson(msg.Payload()) + if e != nil { + log.Warn("parse info message err", e) + return + } + + rid := js.Get("remoteID").MustString() + rurl := js.Get("url").MustString() + isForward := js.Get("isForward").MustBool() + + if rid == "" { + log.Error("receive info message error with remoteID is null") + return + } + + if rid == b.id { + if !isForward { + c.Close() //close connet self + } + return + } + + exist := b.CheckRemoteExist(rid, rurl) + if !exist { + go b.connectRouter(rurl, rid) + } + // log.Info("isforword: ", isForward) + if !isForward { + route := &route{ + remoteUrl: rurl, + remoteID: rid, + } + c.route = route + + go b.SendLocalSubsToRouter(c) + // log.Info("BroadcastInfoMessage starting... ") + infoMsg := NewInfo(rid, rurl, true) + b.BroadcastInfoMessage(rid, infoMsg) + } + + return +} diff --git a/broker/msgpool.go b/broker/msgpool.go index 8655ea1..c672236 100644 --- a/broker/msgpool.go +++ b/broker/msgpool.go @@ -26,8 +26,8 @@ type MessagePool struct { } func InitMessagePool() { - MSGPool = make([]MessagePool, MessagePoolNum) - for i := 0; i < MessagePoolNum; i++ { + MSGPool = make([]MessagePool, (MessagePoolNum + 2)) + for i := 0; i < (MessagePoolNum + 2); i++ { MSGPool[i].Init(MessagePoolUser, MessagePoolMessageNum) } }