* pool

* pool

* wpool
This commit is contained in:
joy.zhou
2018-04-04 13:49:52 +08:00
committed by GitHub
parent c0fea6a5ba
commit 5ed4728575
2 changed files with 41 additions and 38 deletions

View File

@@ -4,19 +4,19 @@ package broker
import ( import (
"crypto/tls" "crypto/tls"
"github.com/eclipse/paho.mqtt.golang/packets"
"github.com/fhmq/hmq/lib/acl"
"github.com/fhmq/hmq/pool"
"github.com/segmentio/fasthash/fnv1a"
"github.com/shirou/gopsutil/mem"
"go.uber.org/zap"
"golang.org/x/net/websocket"
"net" "net"
"net/http" "net/http"
"runtime/debug" "runtime/debug"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/eclipse/paho.mqtt.golang/packets"
"github.com/fhmq/hmq/lib/acl"
"github.com/fhmq/hmq/pool"
"github.com/shirou/gopsutil/mem"
"go.uber.org/zap"
"golang.org/x/net/websocket"
) )
const ( const (
@@ -42,10 +42,10 @@ type Broker struct {
remotes sync.Map remotes sync.Map
nodes map[string]interface{} nodes map[string]interface{}
clusterPool chan *Message clusterPool chan *Message
messagePool []chan *Message
sl *Sublist sl *Sublist
rl *RetainList rl *RetainList
queues map[string]int queues map[string]int
// messagePool []chan *Message
} }
func newMessagePool() []chan *Message { func newMessagePool() []chan *Message {
@@ -67,7 +67,7 @@ func NewBroker(config *Config) (*Broker, error) {
nodes: make(map[string]interface{}), nodes: make(map[string]interface{}),
queues: make(map[string]int), queues: make(map[string]int),
clusterPool: make(chan *Message), clusterPool: make(chan *Message),
messagePool: newMessagePool(), // messagePool: newMessagePool(),
} }
if b.config.TlsPort != "" { if b.config.TlsPort != "" {
tlsconfig, err := NewTLSConfig(b.config.TlsInfo) tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
@@ -89,20 +89,17 @@ func NewBroker(config *Config) (*Broker, error) {
return b, nil return b, nil
} }
func (b *Broker) StartDispatcher() { func (b *Broker) SubmitWork(msg *Message) {
for _, mpool := range b.messagePool { if b.wpool == nil {
go func(ch chan *Message) { b.wpool = pool.New(b.config.Worker)
for { }
msg, ok := <-ch
if !ok { if msg.client.typ == CLUSTER {
log.Error("read message from client channel error") b.clusterPool <- msg
return } else {
} b.wpool.Submit(func() {
b.wpool.Submit(func() { ProcessMessage(msg)
ProcessMessage(msg) })
})
}
}(mpool)
} }
} }
@@ -112,7 +109,6 @@ func (b *Broker) Start() {
log.Error("broker is null") log.Error("broker is null")
return return
} }
go b.StartDispatcher()
//listen clinet over tcp //listen clinet over tcp
if b.config.Port != "" { if b.config.Port != "" {
@@ -365,9 +361,9 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
b.routes.Store(cid, c) b.routes.Store(cid, c)
} }
mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum] // mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum]
c.readLoop(mpool) c.readLoop()
} }
func (b *Broker) ConnectToDiscovery() { func (b *Broker) ConnectToDiscovery() {
@@ -414,7 +410,7 @@ func (b *Broker) ConnectToDiscovery() {
c.SendConnect() c.SendConnect()
c.SendInfo() c.SendInfo()
go c.readLoop(b.clusterPool) go c.readLoop()
go c.StartPing() go c.StartPing()
} }
@@ -490,8 +486,8 @@ func (b *Broker) connectRouter(id, addr string) {
c.SendConnect() c.SendConnect()
mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum] // mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum]
go c.readLoop(mpool) go c.readLoop()
go c.StartPing() go c.StartPing()
} }

View File

@@ -3,13 +3,14 @@
package broker package broker
import ( import (
"github.com/eclipse/paho.mqtt.golang/packets"
"go.uber.org/zap"
"net" "net"
"reflect" "reflect"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/eclipse/paho.mqtt.golang/packets"
"go.uber.org/zap"
) )
const ( const (
@@ -85,8 +86,11 @@ func (c *client) init() {
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0] c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
} }
func (c *client) keepAlive(ch chan int, mpool chan *Message) { func (c *client) keepAlive(ch chan int) {
defer close(ch) defer close(ch)
b := c.broker
keepalive := time.Duration(c.info.keepalive*3/2) * time.Second keepalive := time.Duration(c.info.keepalive*3/2) * time.Second
timer := time.NewTimer(keepalive) timer := time.NewTimer(keepalive)
@@ -100,8 +104,10 @@ func (c *client) keepAlive(ch chan int, mpool chan *Message) {
continue continue
} }
log.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive)) log.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive))
msg := &Message{client: c, packet: DisconnectdPacket} msg := &Message{client: c, packet: DisconnectdPacket}
mpool <- msg b.SubmitWork(msg)
timer.Stop() timer.Stop()
return return
case _, ok := <-c.closed: case _, ok := <-c.closed:
@@ -112,14 +118,15 @@ func (c *client) keepAlive(ch chan int, mpool chan *Message) {
} }
} }
func (c *client) readLoop(mpool chan *Message) { func (c *client) readLoop() {
nc := c.conn nc := c.conn
if nc == nil || mpool == nil { b := c.broker
if nc == nil || b == nil {
return return
} }
ch := make(chan int, 1000) ch := make(chan int, 1000)
go c.keepAlive(ch, mpool) go c.keepAlive(ch)
for { for {
packet, err := packets.ReadPacket(nc) packet, err := packets.ReadPacket(nc)
@@ -134,11 +141,11 @@ func (c *client) readLoop(mpool chan *Message) {
client: c, client: c,
packet: packet, packet: packet,
} }
mpool <- msg b.SubmitWork(msg)
} }
msg := &Message{client: c, packet: DisconnectdPacket} msg := &Message{client: c, packet: DisconnectdPacket}
mpool <- msg b.SubmitWork(msg)
} }
func ProcessMessage(msg *Message) { func ProcessMessage(msg *Message) {