modify cluster

This commit is contained in:
zhouyuyan
2018-01-19 13:41:17 +08:00
parent 0cb51bd37a
commit 114e6f901e
7 changed files with 130 additions and 65 deletions

View File

@@ -28,6 +28,7 @@ type Broker struct {
clients sync.Map clients sync.Map
routes sync.Map routes sync.Map
remotes sync.Map remotes sync.Map
nodes map[string]interface{}
sl *Sublist sl *Sublist
rl *RetainList rl *RetainList
queues map[string]int queues map[string]int
@@ -39,6 +40,7 @@ func NewBroker(config *Config) (*Broker, error) {
config: config, config: config,
sl: NewSublist(), sl: NewSublist(),
rl: NewRetainList(), rl: NewRetainList(),
nodes: make(map[string]interface{}),
queues: make(map[string]int), queues: make(map[string]int),
} }
if b.config.TlsPort != "" { if b.config.TlsPort != "" {
@@ -89,8 +91,8 @@ func (b *Broker) Start() {
} }
//connect on other node in cluster //connect on other node in cluster
if len(b.config.Cluster.Routes) > 0 { if b.config.Router != "" {
b.ConnectToRouters() b.ConnectToDiscovery()
} }
//system montior //system montior
@@ -327,26 +329,71 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
} }
go c.readLoop() go c.readLoop()
if typ == ROUTER {
c.SendInfo()
c.StartPing()
}
} }
func (b *Broker) ConnectToRouters() { func (b *Broker) ConnectToDiscovery() {
for _, v := range b.config.Cluster.Routes { var conn net.Conn
go b.connectRouter(v, "") var err error
var tempDelay time.Duration = 0
for {
conn, err = net.Dial("tcp", b.config.Router)
if err != nil {
log.Error("Error trying to connect to route: ", err)
log.Debug("Connect to route timeout ,retry...")
if 0 == tempDelay {
tempDelay = 1 * time.Second
} else {
tempDelay *= 2
}
if max := 20 * time.Second; tempDelay > max {
tempDelay = max
}
time.Sleep(tempDelay)
continue
}
break
} }
log.Debug("connect to router success :", b.config.Router)
cid := b.id
info := info{
clientID: cid,
keepalive: 60,
}
c := &client{
typ: CLUSTER,
broker: b,
conn: conn,
info: info,
}
c.init()
c.SendConnect()
c.SendInfo()
c.mp = &MSGPool[(MessagePoolNum + 2)]
go c.readLoop()
go c.StartPing()
} }
func (b *Broker) connectRouter(url, remoteID string) { func (b *Broker) connectRouter(id, addr string) {
var conn net.Conn var conn net.Conn
var err error var err error
var timeDelay time.Duration = 0 var timeDelay time.Duration = 0
retryTimes := 0 retryTimes := 0
max := 32 * time.Second max := 32 * time.Second
for { for {
conn, err = net.Dial("tcp", url)
if !b.checkNodeExist(id, addr) {
return
}
conn, err = net.Dial("tcp", addr)
if err != nil { if err != nil {
log.Error("Error trying to connect to route: ", err) log.Error("Error trying to connect to route: ", err)
@@ -372,8 +419,8 @@ func (b *Broker) connectRouter(url, remoteID string) {
break break
} }
route := route{ route := route{
remoteID: remoteID, remoteID: id,
remoteUrl: conn.RemoteAddr().String(), remoteUrl: addr,
} }
cid := GenUniqueId() cid := GenUniqueId()
@@ -395,13 +442,31 @@ func (b *Broker) connectRouter(url, remoteID string) {
c.mp = MSGPool[(MessagePoolNum + 1)].GetPool() c.mp = MSGPool[(MessagePoolNum + 1)].GetPool()
c.SendConnect() c.SendConnect()
c.SendInfo() // c.SendInfo()
go c.readLoop() go c.readLoop()
go c.StartPing() go c.StartPing()
} }
func (b *Broker) checkNodeExist(id, url string) bool {
for k, v := range b.nodes {
if k == id {
return true
}
//skip
l, ok := v.(string)
if ok {
if url == l {
return true
}
}
}
return false
}
func (b *Broker) CheckRemoteExist(remoteID, url string) bool { func (b *Broker) CheckRemoteExist(remoteID, url string) bool {
exist := false exist := false
b.remotes.Range(func(key, value interface{}) bool { b.remotes.Range(func(key, value interface{}) bool {

View File

@@ -13,13 +13,14 @@ import (
const ( const (
// special pub topic for cluster info BrokerInfoTopic // special pub topic for cluster info BrokerInfoTopic
BrokerInfoTopic = "broker001info/brokerinfo" BrokerInfoTopic = "broker000100101info"
// CLIENT is an end user. // CLIENT is an end user.
CLIENT = 0 CLIENT = 0
// ROUTER is another router in the cluster. // ROUTER is another router in the cluster.
ROUTER = 1 ROUTER = 1
//REMOTE is the router connect to other cluster //REMOTE is the router connect to other cluster
REMOTE = 2 REMOTE = 2
CLUSTER = 3
) )
const ( const (
Connected = 1 Connected = 1
@@ -95,6 +96,10 @@ func (c *client) keepAlive(ch chan int) {
case <-ch: case <-ch:
timer.Reset(keepalive) timer.Reset(keepalive)
case <-timer.C: case <-timer.C:
if c.typ == REMOTE || c.typ == CLUSTER {
timer.Reset(keepalive)
continue
}
log.Error("Client exceeded timeout, disconnecting. clientID = ", c.info.clientID, " keepalive = ", c.info.keepalive) log.Error("Client exceeded timeout, disconnecting. clientID = ", c.info.clientID, " keepalive = ", c.info.keepalive)
msg := &Message{client: c, packet: DisconnectdPacket} msg := &Message{client: c, packet: DisconnectdPacket}
msgPool.queue <- msg msgPool.queue <- msg
@@ -180,7 +185,7 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
} }
topic := packet.TopicName topic := packet.TopicName
if topic == BrokerInfoTopic && c.typ != CLIENT { if topic == BrokerInfoTopic && c.typ == CLUSTER {
c.ProcessInfo(packet) c.ProcessInfo(packet)
return return
} }
@@ -237,8 +242,8 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
} }
for _, sub := range r.psubs { for _, sub := range r.psubs {
if sub.client.typ == REMOTE { if sub.client.typ == ROUTER {
if typ == REMOTE { if typ != CLIENT {
continue continue
} }
} }
@@ -257,8 +262,8 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if exist { if exist {
// log.Info("queue index : ", cnt) // log.Info("queue index : ", cnt)
for _, sub := range r.qsubs { for _, sub := range r.qsubs {
if sub.client.typ == REMOTE { if sub.client.typ == ROUTER {
if c.typ == REMOTE { if typ != CLIENT {
continue continue
} }
} }
@@ -359,7 +364,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
retcodes = append(retcodes, qoss[i]) retcodes = append(retcodes, qoss[i])
continue continue
} }
case REMOTE: case ROUTER:
if subinfo, exist := c.rsubs[topic]; !exist { if subinfo, exist := c.rsubs[topic]; !exist {
sinfo := &subInfo{sub: sub, num: 1} sinfo := &subInfo{sub: sub, num: 1}
c.rsubs[topic] = sinfo c.rsubs[topic] = sinfo
@@ -421,7 +426,7 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
if ok { if ok {
c.unsubscribe(sub) c.unsubscribe(sub)
} }
case REMOTE: case ROUTER:
subinfo, ok := c.rsubs[t] subinfo, ok := c.rsubs[t]
if ok { if ok {
subinfo.num = subinfo.num - 1 subinfo.num = subinfo.num - 1
@@ -511,11 +516,15 @@ func (c *client) Close() {
b.PublishMessage(c.info.willMsg) b.PublishMessage(c.info.willMsg)
} }
if c.typ == CLUSTER {
b.ConnectToDiscovery()
}
//do reconnect //do reconnect
if c.typ == REMOTE { if c.typ == REMOTE {
localUrl := c.info.localIP + ":" + c.broker.config.Cluster.Port localUrl := c.info.localIP + ":" + c.broker.config.Cluster.Port
if c.route.remoteUrl != localUrl { if c.route.remoteUrl != localUrl {
b.connectRouter(c.route.remoteUrl, "") go b.connectRouter(c.route.remoteID, c.route.remoteUrl)
} }
} }
} }

View File

@@ -4,6 +4,7 @@ import (
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@@ -19,6 +20,7 @@ type Config struct {
Host string `json:"host"` Host string `json:"host"`
Port string `json:"port"` Port string `json:"port"`
Cluster RouteInfo `json:"cluster"` Cluster RouteInfo `json:"cluster"`
Router string `json:"router"`
TlsHost string `json:"tlsHost"` TlsHost string `json:"tlsHost"`
TlsPort string `json:"tlsPort"` TlsPort string `json:"tlsPort"`
WsPath string `json:"wsPath"` WsPath string `json:"wsPath"`
@@ -30,9 +32,8 @@ type Config struct {
} }
type RouteInfo struct { type RouteInfo struct {
Host string `json:"host"` Host string `json:"host"`
Port string `json:"port"` Port string `json:"port"`
Routes []string `json:"routes"`
} }
type TLSInfo struct { type TLSInfo struct {
@@ -75,6 +76,11 @@ func LoadConfig() (*Config, error) {
config.Cluster.Host = "0.0.0.0" config.Cluster.Host = "0.0.0.0"
} }
} }
if config.Router != "" {
if config.Cluster.Port == "" {
return nil, errors.New("cluster port is null")
}
}
if config.TlsPort != "" { if config.TlsPort != "" {
if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" { if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" {

View File

@@ -27,7 +27,7 @@ func NewDispatcher() *Dispatcher {
} }
func (d *Dispatcher) dispatch() { func (d *Dispatcher) dispatch() {
for i := 0; i < (MessagePoolNum + 2); i++ { for i := 0; i < (MessagePoolNum + 3); i++ {
go func(idx int) { go func(idx int) {
for { for {
select { select {

View File

@@ -25,7 +25,7 @@ func (c *client) SendInfo() {
} }
func (c *client) StartPing() { func (c *client) StartPing() {
timeTicker := time.NewTicker(time.Second * 30) timeTicker := time.NewTicker(time.Second * 50)
ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket) ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket)
for { for {
select { select {
@@ -66,7 +66,7 @@ func NewInfo(sid, url string, isforword bool) *packets.PublishPacket {
pub.Qos = 0 pub.Qos = 0
pub.TopicName = BrokerInfoTopic pub.TopicName = BrokerInfoTopic
pub.Retain = false pub.Retain = false
info := fmt.Sprintf(`{"remoteID":"%s","url":"%s","isForward":%t}`, sid, url, isforword) info := fmt.Sprintf(`{"brokerID":"%s","brokerUrl":"%s"}`, sid, url)
// log.Info("new info", string(info)) // log.Info("new info", string(info))
pub.Payload = []byte(info) pub.Payload = []byte(info)
return pub return pub
@@ -87,43 +87,28 @@ func (c *client) ProcessInfo(packet *packets.PublishPacket) {
return return
} }
rid := js.Get("remoteID").MustString() routes, err := js.Get("data").Map()
rurl := js.Get("url").MustString() if routes == nil {
isForward := js.Get("isForward").MustBool() log.Error("receive info message error, ", err)
if rid == "" {
log.Error("receive info message error with remoteID is null")
return return
} }
if rid == b.id { b.nodes = routes
if !isForward {
c.Close() //close connet self
}
return
}
b.mu.Lock() b.mu.Lock()
exist := b.CheckRemoteExist(rid, rurl) for rid, rurl := range routes {
if !exist { if rid == b.id {
b.connectRouter(rurl, rid) continue
}
b.mu.Unlock()
if !isForward {
if c.typ == ROUTER {
route := route{
remoteUrl: rurl,
remoteID: rid,
}
c.route = route
} }
go b.SendLocalSubsToRouter(c) url, ok := rurl.(string)
// log.Info("BroadcastInfoMessage starting... ") if ok {
infoMsg := NewInfo(rid, rurl, true) exist := b.CheckRemoteExist(rid, url)
b.BroadcastInfoMessage(rid, infoMsg) if !exist {
} b.connectRouter(rid, url)
}
}
return }
b.mu.Unlock()
} }

View File

@@ -30,8 +30,8 @@ type MessagePool struct {
} }
func InitMessagePool() { func InitMessagePool() {
MSGPool = make([]MessagePool, (MessagePoolNum + 2)) MSGPool = make([]MessagePool, (MessagePoolNum + 3))
for i := 0; i < (MessagePoolNum + 2); i++ { for i := 0; i < (MessagePoolNum + 3); i++ {
MSGPool[i].Init(MessagePoolUser, MessagePoolMessageNum) MSGPool[i].Init(MessagePoolUser, MessagePoolMessageNum)
} }
} }

View File

@@ -4,9 +4,9 @@
"host": "0.0.0.0", "host": "0.0.0.0",
"cluster": { "cluster": {
"host": "0.0.0.0", "host": "0.0.0.0",
"port": "1993", "port": "1993"
"routes": []
}, },
"router": "127.0.0.1:9888",
"tlsPort": "8883", "tlsPort": "8883",
"tlsHost": "0.0.0.0", "tlsHost": "0.0.0.0",
"wsPort": "1888", "wsPort": "1888",