mirror of
https://github.com/fhmq/hmq.git
synced 2026-04-24 10:38:34 +00:00
mqp
This commit is contained in:
114
broker/broker.go
114
broker/broker.go
@@ -6,6 +6,7 @@ import (
|
||||
"hmq/packets"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -20,9 +21,9 @@ type Broker struct {
|
||||
config *Config
|
||||
tlsConfig *tls.Config
|
||||
AclConfig *acl.ACLConfig
|
||||
clients cMap
|
||||
routes cMap
|
||||
remotes cMap
|
||||
clients sync.Map
|
||||
routes sync.Map
|
||||
remotes sync.Map
|
||||
sl *Sublist
|
||||
rl *RetainList
|
||||
queues map[string]int
|
||||
@@ -30,14 +31,11 @@ type Broker struct {
|
||||
|
||||
func NewBroker(config *Config) (*Broker, error) {
|
||||
b := &Broker{
|
||||
id: GenUniqueId(),
|
||||
config: config,
|
||||
sl: NewSublist(),
|
||||
rl: NewRetainList(),
|
||||
queues: make(map[string]int),
|
||||
clients: NewClientMap(),
|
||||
routes: NewClientMap(),
|
||||
remotes: NewClientMap(),
|
||||
id: GenUniqueId(),
|
||||
config: config,
|
||||
sl: NewSublist(),
|
||||
rl: NewRetainList(),
|
||||
queues: make(map[string]int),
|
||||
}
|
||||
if b.config.TlsPort != "" {
|
||||
tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
|
||||
@@ -217,20 +215,31 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
|
||||
}
|
||||
c.init()
|
||||
|
||||
cid := c.info.clientID
|
||||
|
||||
var msgPool *MessagePool
|
||||
var exist bool
|
||||
var loaded bool
|
||||
var old *client
|
||||
cid := string(c.info.clientID)
|
||||
if typ == CLIENT {
|
||||
old, exist = b.clients.Update(cid, c)
|
||||
|
||||
switch typ {
|
||||
case CLIENT:
|
||||
msgPool = MSGPool[idx%MessagePoolNum].GetPool()
|
||||
} else if typ == ROUTER {
|
||||
old, exist = b.routes.Update(cid, c)
|
||||
old, loaded = b.clients.LoadOrStore(cid)
|
||||
if !loaded {
|
||||
b.clients.Store(cid, c)
|
||||
old.(*client).Close()
|
||||
}
|
||||
case ROUTER:
|
||||
msgPool = MSGPool[MessagePoolNum].GetPool()
|
||||
old, loaded = b.routes.LoadOrStore(cid)
|
||||
if !loaded {
|
||||
b.routes.Store(cid, c)
|
||||
}
|
||||
}
|
||||
if exist {
|
||||
|
||||
if !loaded {
|
||||
log.Warn("client or routers exists, close old...")
|
||||
old.Close()
|
||||
old.(*client).Close()
|
||||
}
|
||||
c.readLoop(msgPool)
|
||||
}
|
||||
@@ -267,7 +276,7 @@ func (b *Broker) connectRouter(url, remoteID string) {
|
||||
route: route,
|
||||
info: info,
|
||||
}
|
||||
b.remotes.Set(cid, c)
|
||||
b.remotes.Store(cid, c)
|
||||
c.SendConnect()
|
||||
c.SendInfo()
|
||||
// s.createRemote(conn, route)
|
||||
@@ -278,29 +287,35 @@ func (b *Broker) connectRouter(url, remoteID string) {
|
||||
|
||||
func (b *Broker) CheckRemoteExist(remoteID, url string) bool {
|
||||
exist := false
|
||||
remotes := b.remotes.Items()
|
||||
for _, v := range remotes {
|
||||
if v.route.remoteUrl == url {
|
||||
// if v.route.remoteID == "" || v.route.remoteID != remoteID {
|
||||
v.route.remoteID = remoteID
|
||||
// }
|
||||
exist = true
|
||||
break
|
||||
remotes := b.remotes.Range(func(key, value interface{}) bool {
|
||||
v, ok := value.(*client)
|
||||
if ok {
|
||||
if v.route.remoteUrl == url {
|
||||
// if v.route.remoteID == "" || v.route.remoteID != remoteID {
|
||||
v.route.remoteID = remoteID
|
||||
// }
|
||||
exist = true
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
return exist
|
||||
}
|
||||
|
||||
func (b *Broker) SendLocalSubsToRouter(c *client) {
|
||||
clients := b.clients.Items()
|
||||
subInfo := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket)
|
||||
for _, client := range clients {
|
||||
subs := client.subs
|
||||
for _, sub := range subs {
|
||||
subInfo.Topics = append(subInfo.Topics, string(sub.topic))
|
||||
subInfo.Qoss = append(subInfo.Qoss, sub.qos)
|
||||
b.clients.Range(func(key, value interface{}) bool {
|
||||
client, ok := value.(*clients)
|
||||
if ok {
|
||||
subs := client.subs
|
||||
for _, sub := range subs {
|
||||
subInfo.Topics = append(subInfo.Topics, string(sub.topic))
|
||||
subInfo.Qoss = append(subInfo.Qoss, sub.qos)
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
err := c.WriterPacket(subInfo)
|
||||
if err != nil {
|
||||
log.Error("Send localsubs To Router error :", err)
|
||||
@@ -308,21 +323,28 @@ func (b *Broker) SendLocalSubsToRouter(c *client) {
|
||||
}
|
||||
|
||||
func (b *Broker) BroadcastInfoMessage(remoteID string, msg *packets.PublishPacket) {
|
||||
remotes := b.remotes.Items()
|
||||
for _, r := range remotes {
|
||||
if r.route.remoteID == remoteID {
|
||||
continue
|
||||
b.remotes.Range(func(key, value interface{}) bool {
|
||||
r, ok := value.(*client)
|
||||
if ok {
|
||||
if r.route.remoteID == remoteID {
|
||||
return true
|
||||
}
|
||||
r.WriterPacket(msg)
|
||||
}
|
||||
r.WriterPacket(msg)
|
||||
}
|
||||
return true
|
||||
|
||||
})
|
||||
// log.Info("BroadcastInfoMessage success ")
|
||||
}
|
||||
|
||||
func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) {
|
||||
remotes := b.remotes.Items()
|
||||
for _, r := range remotes {
|
||||
r.WriterPacket(packet)
|
||||
}
|
||||
b.remotes.Range(func(key, value interface{}) bool {
|
||||
r, ok := value.(*client)
|
||||
if ok {
|
||||
r.WriterPacket(msg)
|
||||
}
|
||||
return true
|
||||
})
|
||||
// log.Info("BroadcastSubscribeMessage remotes: ", s.remotes)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user