mirror of
https://github.com/fhmq/hmq.git
synced 2026-04-26 11:38:33 +00:00
Pool (#16)
* add pool * elastic workerpool * del buf * modify usage * modify readme
This commit is contained in:
@@ -4,6 +4,7 @@ package broker
|
||||
|
||||
import (
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -38,7 +39,6 @@ type client struct {
|
||||
status int
|
||||
closed chan int
|
||||
smu sync.RWMutex
|
||||
mp *MessagePool
|
||||
subs map[string]*subscription
|
||||
rsubs map[string]*subInfo
|
||||
}
|
||||
@@ -86,11 +86,10 @@ func (c *client) init() {
|
||||
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
|
||||
}
|
||||
|
||||
func (c *client) keepAlive(ch chan int) {
|
||||
func (c *client) keepAlive(ch chan int, mpool chan *Message) {
|
||||
defer close(ch)
|
||||
keepalive := time.Duration(c.info.keepalive*3/2) * time.Second
|
||||
timer := time.NewTimer(keepalive)
|
||||
msgPool := c.mp
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -101,9 +100,9 @@ func (c *client) keepAlive(ch chan int) {
|
||||
timer.Reset(keepalive)
|
||||
continue
|
||||
}
|
||||
log.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive))
|
||||
brokerLog.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive))
|
||||
msg := &Message{client: c, packet: DisconnectdPacket}
|
||||
msgPool.queue <- msg
|
||||
mpool <- msg
|
||||
timer.Stop()
|
||||
return
|
||||
case _, ok := <-c.closed:
|
||||
@@ -114,34 +113,33 @@ func (c *client) keepAlive(ch chan int) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) readLoop() {
|
||||
func (c *client) readLoop(mpool chan *Message) {
|
||||
nc := c.conn
|
||||
msgPool := c.mp
|
||||
if nc == nil || msgPool == nil {
|
||||
if nc == nil || mpool == nil {
|
||||
return
|
||||
}
|
||||
|
||||
ch := make(chan int, 1000)
|
||||
go c.keepAlive(ch)
|
||||
go c.keepAlive(ch, mpool)
|
||||
|
||||
for {
|
||||
packet, err := packets.ReadPacket(nc)
|
||||
if err != nil {
|
||||
log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
brokerLog.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
break
|
||||
}
|
||||
|
||||
// keepalive channel
|
||||
ch <- 1
|
||||
|
||||
msg := &Message{
|
||||
client: c,
|
||||
packet: packet,
|
||||
}
|
||||
msgPool.queue <- msg
|
||||
mpool <- msg
|
||||
}
|
||||
|
||||
msg := &Message{client: c, packet: DisconnectdPacket}
|
||||
msgPool.queue <- msg
|
||||
msgPool.Reduce()
|
||||
mpool <- msg
|
||||
}
|
||||
|
||||
func ProcessMessage(msg *Message) {
|
||||
@@ -150,10 +148,10 @@ func ProcessMessage(msg *Message) {
|
||||
if ca == nil {
|
||||
return
|
||||
}
|
||||
log.Debug("Recv message from client,", zap.String("ClientID", c.info.clientID))
|
||||
|
||||
brokerLog.Debug("Recv message:", zap.String("message type", reflect.TypeOf(msg.packet).String()[9:]), zap.String("ClientID", c.info.clientID))
|
||||
switch ca.(type) {
|
||||
case *packets.ConnackPacket:
|
||||
|
||||
case *packets.ConnectPacket:
|
||||
case *packets.PublishPacket:
|
||||
packet := ca.(*packets.PublishPacket)
|
||||
@@ -176,7 +174,7 @@ func ProcessMessage(msg *Message) {
|
||||
case *packets.DisconnectPacket:
|
||||
c.Close()
|
||||
default:
|
||||
log.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID))
|
||||
brokerLog.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -192,7 +190,7 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
|
||||
}
|
||||
|
||||
if !c.CheckTopicAuth(PUB, topic) {
|
||||
log.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
|
||||
brokerLog.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -203,21 +201,21 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
|
||||
puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
|
||||
puback.MessageID = packet.MessageID
|
||||
if err := c.WriterPacket(puback); err != nil {
|
||||
log.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
brokerLog.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
return
|
||||
}
|
||||
c.ProcessPublishMessage(packet)
|
||||
case QosExactlyOnce:
|
||||
return
|
||||
default:
|
||||
log.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID))
|
||||
brokerLog.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID))
|
||||
return
|
||||
}
|
||||
if packet.Retain {
|
||||
if b := c.broker; b != nil {
|
||||
err := b.rl.Insert(topic, packet)
|
||||
if err != nil {
|
||||
log.Error("Insert Retain Message error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
brokerLog.Error("Insert Retain Message error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -237,7 +235,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
|
||||
topic := packet.TopicName
|
||||
|
||||
r := b.sl.Match(topic)
|
||||
// log.Info("psubs num: ", len(r.psubs))
|
||||
// brokerLog.Info("psubs num: ", len(r.psubs))
|
||||
if len(r.qsubs) == 0 && len(r.psubs) == 0 {
|
||||
return
|
||||
}
|
||||
@@ -251,7 +249,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
|
||||
if sub != nil {
|
||||
err := sub.client.WriterPacket(packet)
|
||||
if err != nil {
|
||||
log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
brokerLog.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -261,7 +259,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
|
||||
t := "$queue/" + topic
|
||||
cnt, exist := b.queues[t]
|
||||
if exist {
|
||||
// log.Info("queue index : ", cnt)
|
||||
// brokerLog.Info("queue index : ", cnt)
|
||||
for _, sub := range r.qsubs {
|
||||
if sub.client.typ == ROUTER {
|
||||
if typ != CLIENT {
|
||||
@@ -277,7 +275,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
|
||||
if sub != nil {
|
||||
err := sub.client.WriterPacket(packet)
|
||||
if err != nil {
|
||||
log.Error("send publish error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
brokerLog.Error("send publish error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -331,7 +329,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
|
||||
t := topic
|
||||
//check topic auth for client
|
||||
if !c.CheckTopicAuth(SUB, topic) {
|
||||
log.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
|
||||
brokerLog.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
|
||||
retcodes = append(retcodes, QosFailure)
|
||||
continue
|
||||
}
|
||||
@@ -378,7 +376,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
|
||||
}
|
||||
err := b.sl.Insert(sub)
|
||||
if err != nil {
|
||||
log.Error("Insert subscription error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
brokerLog.Error("Insert subscription error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
retcodes = append(retcodes, QosFailure)
|
||||
} else {
|
||||
retcodes = append(retcodes, qoss[i])
|
||||
@@ -388,7 +386,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
|
||||
|
||||
err := c.WriterPacket(suback)
|
||||
if err != nil {
|
||||
log.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
brokerLog.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
return
|
||||
}
|
||||
//broadcast subscribe message
|
||||
@@ -400,7 +398,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
|
||||
for _, t := range topics {
|
||||
packets := b.rl.Match(t)
|
||||
for _, packet := range packets {
|
||||
log.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID))
|
||||
brokerLog.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID))
|
||||
if packet != nil {
|
||||
c.WriterPacket(packet)
|
||||
}
|
||||
@@ -447,7 +445,7 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
|
||||
|
||||
err := c.WriterPacket(unsuback)
|
||||
if err != nil {
|
||||
log.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
brokerLog.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
return
|
||||
}
|
||||
// //process ubsubscribe message
|
||||
@@ -476,7 +474,7 @@ func (c *client) ProcessPing() {
|
||||
resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket)
|
||||
err := c.WriterPacket(resp)
|
||||
if err != nil {
|
||||
log.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
brokerLog.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -507,7 +505,7 @@ func (c *client) Close() {
|
||||
for _, sub := range subs {
|
||||
err := b.sl.Remove(sub)
|
||||
if err != nil {
|
||||
log.Error("closed client but remove sublist error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
brokerLog.Error("closed client but remove sublist error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
|
||||
}
|
||||
}
|
||||
if c.typ == CLIENT {
|
||||
|
||||
Reference in New Issue
Block a user