mirror of
https://github.com/fhmq/hmq.git
synced 2026-06-15 17:51:33 +00:00
update some logic
This commit is contained in:
+8
-56
@@ -7,7 +7,6 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -20,7 +19,6 @@ import (
|
||||
|
||||
"github.com/eclipse/paho.mqtt.golang/packets"
|
||||
"github.com/fhmq/hmq/pool"
|
||||
"github.com/shirou/gopsutil/mem"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/net/websocket"
|
||||
)
|
||||
@@ -153,22 +151,6 @@ func (b *Broker) Start() {
|
||||
b.ConnectToDiscovery()
|
||||
}
|
||||
|
||||
//system monitor
|
||||
go StateMonitor()
|
||||
|
||||
}
|
||||
|
||||
func StateMonitor() {
|
||||
v, _ := mem.VirtualMemory()
|
||||
timeSticker := time.NewTicker(time.Second * 30)
|
||||
for {
|
||||
select {
|
||||
case <-timeSticker.C:
|
||||
if v.UsedPercent > 75 {
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Broker) StartWebsocketListening() {
|
||||
@@ -234,35 +216,6 @@ func (b *Broker) StartClientListening(Tls bool) {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Broker) Handshake(conn net.Conn) bool {
|
||||
|
||||
nc := tls.Server(conn, b.tlsConfig)
|
||||
time.AfterFunc(DEFAULT_TLS_TIMEOUT, func() { TlsTimeout(nc) })
|
||||
nc.SetReadDeadline(time.Now().Add(DEFAULT_TLS_TIMEOUT))
|
||||
|
||||
// Force handshake
|
||||
if err := nc.Handshake(); err != nil {
|
||||
log.Error("TLS handshake error, ", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
nc.SetReadDeadline(time.Time{})
|
||||
return true
|
||||
|
||||
}
|
||||
|
||||
func TlsTimeout(conn *tls.Conn) {
|
||||
nc := conn
|
||||
// Check if already closed
|
||||
if nc == nil {
|
||||
return
|
||||
}
|
||||
cs := nc.ConnectionState()
|
||||
if !cs.HandshakeComplete {
|
||||
log.Error("TLS handshake timeout")
|
||||
nc.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Broker) StartClusterListening() {
|
||||
var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port
|
||||
log.Info("Start Listening cluster on ", zap.String("hp", hp))
|
||||
@@ -344,15 +297,6 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
|
||||
return
|
||||
}
|
||||
|
||||
if typ == CLIENT {
|
||||
b.Publish(&bridge.Elements{
|
||||
ClientID: string(msg.ClientIdentifier),
|
||||
Username: string(msg.Username),
|
||||
Action: bridge.Connect,
|
||||
Timestamp: time.Now().Unix(),
|
||||
})
|
||||
}
|
||||
|
||||
willmsg := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
|
||||
if msg.WillFlag {
|
||||
willmsg.Qos = msg.WillQos
|
||||
@@ -404,6 +348,14 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
|
||||
b.clients.Store(cid, c)
|
||||
|
||||
b.OnlineOfflineNotification(cid, true)
|
||||
{
|
||||
b.Publish(&bridge.Elements{
|
||||
ClientID: string(msg.ClientIdentifier),
|
||||
Username: string(msg.Username),
|
||||
Action: bridge.Connect,
|
||||
Timestamp: time.Now().Unix(),
|
||||
})
|
||||
}
|
||||
case ROUTER:
|
||||
old, exist = b.routes.Load(cid)
|
||||
if exist {
|
||||
|
||||
@@ -384,6 +384,11 @@ func (c *client) processClientSubscribe(packet *packets.SubscribePacket) {
|
||||
topic = substr[2]
|
||||
}
|
||||
|
||||
if oldSub, exist := c.subMap[t]; exist {
|
||||
c.topicsMgr.Unsubscribe([]byte(oldSub.topic), oldSub)
|
||||
delete(c.subMap, t)
|
||||
}
|
||||
|
||||
sub := &subscription{
|
||||
topic: topic,
|
||||
qos: qoss[i],
|
||||
|
||||
Reference in New Issue
Block a user