This commit is contained in:
zhouyuyan
2017-09-01 19:29:33 +08:00
parent 65ac09cf50
commit 8bf6ccaa25
3 changed files with 18 additions and 93 deletions

View File

@@ -218,28 +218,26 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
cid := c.info.clientID
var msgPool *MessagePool
var loaded bool
var old *client
var exist bool
var old interface{}
switch typ {
case CLIENT:
msgPool = MSGPool[idx%MessagePoolNum].GetPool()
old, loaded = b.clients.LoadOrStore(cid)
if !loaded {
b.clients.Store(cid, c)
old.(*client).Close()
}
old, exist = b.clients.Load(cid)
b.clients.Store(cid, c)
case ROUTER:
msgPool = MSGPool[MessagePoolNum].GetPool()
old, loaded = b.routes.LoadOrStore(cid)
if !loaded {
b.routes.Store(cid, c)
}
old, exist = b.routes.Load(cid)
b.routes.Store(cid, c)
}
if !loaded {
log.Warn("client or routers exists, close old...")
old.(*client).Close()
if exist {
log.Warn("client or routers exist, close old...")
ol, ok := old.(*client)
if ok {
ol.Close()
}
}
c.readLoop(msgPool)
}
@@ -287,7 +285,7 @@ func (b *Broker) connectRouter(url, remoteID string) {
func (b *Broker) CheckRemoteExist(remoteID, url string) bool {
exist := false
remotes := b.remotes.Range(func(key, value interface{}) bool {
b.remotes.Range(func(key, value interface{}) bool {
v, ok := value.(*client)
if ok {
if v.route.remoteUrl == url {
@@ -306,7 +304,7 @@ func (b *Broker) CheckRemoteExist(remoteID, url string) bool {
func (b *Broker) SendLocalSubsToRouter(c *client) {
subInfo := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket)
b.clients.Range(func(key, value interface{}) bool {
client, ok := value.(*clients)
client, ok := value.(*client)
if ok {
subs := client.subs
for _, sub := range subs {
@@ -341,7 +339,7 @@ func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) {
b.remotes.Range(func(key, value interface{}) bool {
r, ok := value.(*client)
if ok {
r.WriterPacket(msg)
r.WriterPacket(packet)
}
return true
})
@@ -353,11 +351,11 @@ func (b *Broker) removeClient(c *client) {
typ := c.typ
switch typ {
case CLIENT:
b.clients.Remove(clientId)
b.clients.Delete(clientId)
case ROUTER:
b.routes.Remove(clientId)
b.routes.Delete(clientId)
case REMOTE:
b.remotes.Remove(clientId)
b.remotes.Delete(clientId)
}
// log.Info("delete client ,", clientId)
}

View File

@@ -1,73 +0,0 @@
package broker
import "sync"
type cMap interface {
Set(key string, val *client)
Get(key string) (*client, bool)
Items() map[string]*client
Exist(key string) bool
Update(key string, val *client) (*client, bool)
Count() int
Remove(key string)
}
type clientMap struct {
items map[string]*client
mu sync.RWMutex
}
func NewClientMap() cMap {
smap := &clientMap{
items: make(map[string]*client),
}
return smap
}
func (s *clientMap) Set(key string, val *client) {
s.mu.Lock()
s.items[key] = val
s.mu.Unlock()
}
func (s *clientMap) Get(key string) (*client, bool) {
s.mu.RLock()
val, ok := s.items[key]
s.mu.RUnlock()
return val, ok
}
func (s *clientMap) Exist(key string) bool {
s.mu.RLock()
_, ok := s.items[key]
s.mu.RUnlock()
return ok
}
func (s *clientMap) Update(key string, val *client) (*client, bool) {
s.mu.Lock()
old, ok := s.items[key]
s.items[key] = val
s.mu.Unlock()
return old, ok
}
func (s *clientMap) Count() int {
s.mu.RLock()
len := len(s.items)
s.mu.RUnlock()
return len
}
func (s *clientMap) Remove(key string) {
s.mu.Lock()
delete(s.items, key)
s.mu.Unlock()
}
func (s *clientMap) Items() map[string]*client {
s.mu.RLock()
items := s.items
s.mu.RUnlock()
return items
}

BIN
hmq

Binary file not shown.