This commit is contained in:
zhouyuyan
2017-08-25 16:55:49 +08:00
parent 0ea7da1dc0
commit 5c6e136c2b
6 changed files with 201 additions and 35 deletions

View File

@@ -9,32 +9,49 @@ import (
)
type Broker struct {
id string
config *Config
remote map[string]*client
sl *Sublist
rl *RetainList
queues map[string]int
id string
config *Config
clients ClientMap
routes ClientMap
sl *Sublist
rl *RetainList
queues map[string]int
}
func NewBroker(config *Config) *Broker {
return &Broker{
config: config,
sl: NewSublist(),
rl: NewRetainList(),
queues: make(map[string]int),
remote: make(map[string]*client),
config: config,
sl: NewSublist(),
rl: NewRetainList(),
queues: make(map[string]int),
clients: NewClientMap(),
routes: NewClientMap(),
}
}
func (b *Broker) StartListening() {
hp := b.config.Host + ":" + b.config.Port
func (b *Broker) Start() {
go b.StartListening(CLIENT)
if b.config.Cluster.Port != "" {
go b.StartListening(ROUTER)
}
}
func (b *Broker) StartListening(typ int) {
var hp string
if typ == CLIENT {
hp = b.config.Host + ":" + b.config.Port
log.Info("Start Listening client on ", hp)
} else if typ == ROUTER {
hp = b.config.Cluster.Host + ":" + b.config.Cluster.Port
log.Info("Start Listening cluster on ", hp)
}
l, e := net.Listen("tcp", hp)
if e != nil {
log.Error("Error listening on ", e)
return
}
log.Info("Start Listening client on ", hp)
tmpDelay := 10 * ACCEPT_MIN_SLEEP
num := 0
for {
@@ -55,11 +72,11 @@ func (b *Broker) StartListening() {
}
tmpDelay = ACCEPT_MIN_SLEEP
num += 1
go b.handleConnection(conn, num)
go b.handleConnection(typ, conn, num)
}
}
func (b *Broker) handleConnection(conn net.Conn, idx int) {
func (b *Broker) handleConnection(typ int, conn net.Conn, idx int) {
//process connect packet
buf, err := ReadPacket(conn)
if err != nil {
@@ -105,5 +122,45 @@ func (b *Broker) handleConnection(conn net.Conn, idx int) {
info: info,
}
c.init()
var exist bool
var old *client
cid := string(c.info.clientID)
if typ == CLIENT {
old, exist = b.clients.Update(cid, c)
} else if typ == ROUTER {
old, exist = b.routes.Update(cid, c)
}
if exist {
log.Warn("client or routers exists, close old...")
old.Close()
}
c.readLoop(idx)
}
func (b *Broker) ConnectToRouters() {
for i := 0; i < len(b.config.Cluster.Routes); i++ {
url := b.config.Cluster.Routes[i]
go b.connectRouter(url, "")
}
}
func (b *Broker) connectRouter(url, remoteID string) {
for {
conn, err := net.Dial("tcp", url)
if err != nil {
log.Error("Error trying to connect to route: ", err)
select {
case <-time.After(DEFAULT_ROUTE_CONNECT):
log.Debug("Connect to route timeout ,retry...")
continue
}
}
route := &route{
remoteID: remoteID,
remoteUrl: url,
}
// s.createRemote(conn, route)
return
}
}