diff --git a/broker/broker.go b/broker/broker.go index ffda81c..7c4a1a1 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -28,6 +28,7 @@ type Broker struct { clients sync.Map routes sync.Map remotes sync.Map + nodes map[string]interface{} sl *Sublist rl *RetainList queues map[string]int @@ -39,6 +40,7 @@ func NewBroker(config *Config) (*Broker, error) { config: config, sl: NewSublist(), rl: NewRetainList(), + nodes: make(map[string]interface{}), queues: make(map[string]int), } if b.config.TlsPort != "" { @@ -89,8 +91,8 @@ func (b *Broker) Start() { } //connect on other node in cluster - if len(b.config.Cluster.Routes) > 0 { - b.ConnectToRouters() + if b.config.Router != "" { + b.ConnectToDiscovery() } //system montior @@ -327,26 +329,71 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) { } go c.readLoop() - if typ == ROUTER { - c.SendInfo() - c.StartPing() - } } -func (b *Broker) ConnectToRouters() { - for _, v := range b.config.Cluster.Routes { - go b.connectRouter(v, "") +func (b *Broker) ConnectToDiscovery() { + var conn net.Conn + var err error + var tempDelay time.Duration = 0 + for { + conn, err = net.Dial("tcp", b.config.Router) + if err != nil { + log.Error("Error trying to connect to route: ", err) + log.Debug("Connect to route timeout ,retry...") + + if 0 == tempDelay { + tempDelay = 1 * time.Second + } else { + tempDelay *= 2 + } + + if max := 20 * time.Second; tempDelay > max { + tempDelay = max + } + time.Sleep(tempDelay) + continue + } + break } + + log.Debug("connect to router success :", b.config.Router) + + cid := b.id + info := info{ + clientID: cid, + keepalive: 60, + } + + c := &client{ + typ: CLUSTER, + broker: b, + conn: conn, + info: info, + } + + c.init() + + c.SendConnect() + c.SendInfo() + + c.mp = &MSGPool[(MessagePoolNum + 2)] + go c.readLoop() + go c.StartPing() } -func (b *Broker) connectRouter(url, remoteID string) { +func (b *Broker) connectRouter(id, addr string) { var conn net.Conn var err error var timeDelay time.Duration = 0 retryTimes := 0 max := 32 * time.Second for { - conn, err = net.Dial("tcp", url) + + if !b.checkNodeExist(id, addr) { + return + } + + conn, err = net.Dial("tcp", addr) if err != nil { log.Error("Error trying to connect to route: ", err) @@ -372,8 +419,8 @@ func (b *Broker) connectRouter(url, remoteID string) { break } route := route{ - remoteID: remoteID, - remoteUrl: conn.RemoteAddr().String(), + remoteID: id, + remoteUrl: addr, } cid := GenUniqueId() @@ -395,13 +442,31 @@ func (b *Broker) connectRouter(url, remoteID string) { c.mp = MSGPool[(MessagePoolNum + 1)].GetPool() c.SendConnect() - c.SendInfo() + // c.SendInfo() go c.readLoop() go c.StartPing() } +func (b *Broker) checkNodeExist(id, url string) bool { + for k, v := range b.nodes { + if k == id { + return true + } + + //skip + l, ok := v.(string) + if ok { + if url == l { + return true + } + } + + } + return false +} + func (b *Broker) CheckRemoteExist(remoteID, url string) bool { exist := false b.remotes.Range(func(key, value interface{}) bool { diff --git a/broker/client.go b/broker/client.go index 2f3f38d..852507b 100644 --- a/broker/client.go +++ b/broker/client.go @@ -13,13 +13,14 @@ import ( const ( // special pub topic for cluster info BrokerInfoTopic - BrokerInfoTopic = "broker001info/brokerinfo" + BrokerInfoTopic = "broker000100101info" // CLIENT is an end user. CLIENT = 0 // ROUTER is another router in the cluster. ROUTER = 1 //REMOTE is the router connect to other cluster - REMOTE = 2 + REMOTE = 2 + CLUSTER = 3 ) const ( Connected = 1 @@ -95,6 +96,10 @@ func (c *client) keepAlive(ch chan int) { case <-ch: timer.Reset(keepalive) case <-timer.C: + if c.typ == REMOTE || c.typ == CLUSTER { + timer.Reset(keepalive) + continue + } log.Error("Client exceeded timeout, disconnecting. clientID = ", c.info.clientID, " keepalive = ", c.info.keepalive) msg := &Message{client: c, packet: DisconnectdPacket} msgPool.queue <- msg @@ -180,7 +185,7 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) { } topic := packet.TopicName - if topic == BrokerInfoTopic && c.typ != CLIENT { + if topic == BrokerInfoTopic && c.typ == CLUSTER { c.ProcessInfo(packet) return } @@ -237,8 +242,8 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { } for _, sub := range r.psubs { - if sub.client.typ == REMOTE { - if typ == REMOTE { + if sub.client.typ == ROUTER { + if typ != CLIENT { continue } } @@ -257,8 +262,8 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) { if exist { // log.Info("queue index : ", cnt) for _, sub := range r.qsubs { - if sub.client.typ == REMOTE { - if c.typ == REMOTE { + if sub.client.typ == ROUTER { + if typ != CLIENT { continue } } @@ -359,7 +364,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) { retcodes = append(retcodes, qoss[i]) continue } - case REMOTE: + case ROUTER: if subinfo, exist := c.rsubs[topic]; !exist { sinfo := &subInfo{sub: sub, num: 1} c.rsubs[topic] = sinfo @@ -421,7 +426,7 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) { if ok { c.unsubscribe(sub) } - case REMOTE: + case ROUTER: subinfo, ok := c.rsubs[t] if ok { subinfo.num = subinfo.num - 1 @@ -511,11 +516,15 @@ func (c *client) Close() { b.PublishMessage(c.info.willMsg) } + if c.typ == CLUSTER { + b.ConnectToDiscovery() + } + //do reconnect if c.typ == REMOTE { localUrl := c.info.localIP + ":" + c.broker.config.Cluster.Port if c.route.remoteUrl != localUrl { - b.connectRouter(c.route.remoteUrl, "") + go b.connectRouter(c.route.remoteID, c.route.remoteUrl) } } } diff --git a/broker/config.go b/broker/config.go index 08c7245..e73f61c 100644 --- a/broker/config.go +++ b/broker/config.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "crypto/x509" "encoding/json" + "errors" "fmt" "io/ioutil" @@ -19,6 +20,7 @@ type Config struct { Host string `json:"host"` Port string `json:"port"` Cluster RouteInfo `json:"cluster"` + Router string `json:"router"` TlsHost string `json:"tlsHost"` TlsPort string `json:"tlsPort"` WsPath string `json:"wsPath"` @@ -30,9 +32,8 @@ type Config struct { } type RouteInfo struct { - Host string `json:"host"` - Port string `json:"port"` - Routes []string `json:"routes"` + Host string `json:"host"` + Port string `json:"port"` } type TLSInfo struct { @@ -75,6 +76,11 @@ func LoadConfig() (*Config, error) { config.Cluster.Host = "0.0.0.0" } } + if config.Router != "" { + if config.Cluster.Port == "" { + return nil, errors.New("cluster port is null") + } + } if config.TlsPort != "" { if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" { diff --git a/broker/dispatcher.go b/broker/dispatcher.go index df6d9c9..0b7cba3 100644 --- a/broker/dispatcher.go +++ b/broker/dispatcher.go @@ -27,7 +27,7 @@ func NewDispatcher() *Dispatcher { } func (d *Dispatcher) dispatch() { - for i := 0; i < (MessagePoolNum + 2); i++ { + for i := 0; i < (MessagePoolNum + 3); i++ { go func(idx int) { for { select { diff --git a/broker/info.go b/broker/info.go index b3ac604..7581c61 100644 --- a/broker/info.go +++ b/broker/info.go @@ -25,7 +25,7 @@ func (c *client) SendInfo() { } func (c *client) StartPing() { - timeTicker := time.NewTicker(time.Second * 30) + timeTicker := time.NewTicker(time.Second * 50) ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket) for { select { @@ -66,7 +66,7 @@ func NewInfo(sid, url string, isforword bool) *packets.PublishPacket { pub.Qos = 0 pub.TopicName = BrokerInfoTopic pub.Retain = false - info := fmt.Sprintf(`{"remoteID":"%s","url":"%s","isForward":%t}`, sid, url, isforword) + info := fmt.Sprintf(`{"brokerID":"%s","brokerUrl":"%s"}`, sid, url) // log.Info("new info", string(info)) pub.Payload = []byte(info) return pub @@ -87,43 +87,28 @@ func (c *client) ProcessInfo(packet *packets.PublishPacket) { return } - rid := js.Get("remoteID").MustString() - rurl := js.Get("url").MustString() - isForward := js.Get("isForward").MustBool() - - if rid == "" { - log.Error("receive info message error with remoteID is null") + routes, err := js.Get("data").Map() + if routes == nil { + log.Error("receive info message error, ", err) return } - if rid == b.id { - if !isForward { - c.Close() //close connet self - } - return - } + b.nodes = routes b.mu.Lock() - exist := b.CheckRemoteExist(rid, rurl) - if !exist { - b.connectRouter(rurl, rid) - } - b.mu.Unlock() - - if !isForward { - if c.typ == ROUTER { - route := route{ - remoteUrl: rurl, - remoteID: rid, - } - c.route = route + for rid, rurl := range routes { + if rid == b.id { + continue } - go b.SendLocalSubsToRouter(c) - // log.Info("BroadcastInfoMessage starting... ") - infoMsg := NewInfo(rid, rurl, true) - b.BroadcastInfoMessage(rid, infoMsg) - } + url, ok := rurl.(string) + if ok { + exist := b.CheckRemoteExist(rid, url) + if !exist { + b.connectRouter(rid, url) + } + } - return + } + b.mu.Unlock() } diff --git a/broker/msgpool.go b/broker/msgpool.go index 9118573..e058424 100644 --- a/broker/msgpool.go +++ b/broker/msgpool.go @@ -30,8 +30,8 @@ type MessagePool struct { } func InitMessagePool() { - MSGPool = make([]MessagePool, (MessagePoolNum + 2)) - for i := 0; i < (MessagePoolNum + 2); i++ { + MSGPool = make([]MessagePool, (MessagePoolNum + 3)) + for i := 0; i < (MessagePoolNum + 3); i++ { MSGPool[i].Init(MessagePoolUser, MessagePoolMessageNum) } } diff --git a/conf/hmq.config b/conf/hmq.config index 3258bfb..6c8711c 100644 --- a/conf/hmq.config +++ b/conf/hmq.config @@ -4,9 +4,9 @@ "host": "0.0.0.0", "cluster": { "host": "0.0.0.0", - "port": "1993", - "routes": [] + "port": "1993" }, + "router": "127.0.0.1:9888", "tlsPort": "8883", "tlsHost": "0.0.0.0", "wsPort": "1888",