mirror of
https://github.com/fhmq/hmq.git
synced 2026-05-09 08:58:24 +00:00
modify
This commit is contained in:
+2
-2
@@ -13,9 +13,9 @@ import (
|
|||||||
|
|
||||||
"github.com/eclipse/paho.mqtt.golang/packets"
|
"github.com/eclipse/paho.mqtt.golang/packets"
|
||||||
"github.com/fhmq/hmq/lib/acl"
|
"github.com/fhmq/hmq/lib/acl"
|
||||||
|
"github.com/fhmq/hmq/lib/sessions"
|
||||||
|
"github.com/fhmq/hmq/lib/topics"
|
||||||
"github.com/fhmq/hmq/pool"
|
"github.com/fhmq/hmq/pool"
|
||||||
"github.com/fhmq/hmq/sessions"
|
|
||||||
"github.com/fhmq/hmq/topics"
|
|
||||||
"github.com/shirou/gopsutil/mem"
|
"github.com/shirou/gopsutil/mem"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/net/websocket"
|
"golang.org/x/net/websocket"
|
||||||
|
|||||||
+2
-2
@@ -12,8 +12,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/eclipse/paho.mqtt.golang/packets"
|
"github.com/eclipse/paho.mqtt.golang/packets"
|
||||||
"github.com/fhmq/hmq/sessions"
|
"github.com/fhmq/hmq/lib/sessions"
|
||||||
"github.com/fhmq/hmq/topics"
|
"github.com/fhmq/hmq/lib/topics"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -1,121 +0,0 @@
|
|||||||
package broker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/eclipse/paho.mqtt.golang/packets"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
type RetainList struct {
|
|
||||||
sync.RWMutex
|
|
||||||
root *rlevel
|
|
||||||
}
|
|
||||||
type rlevel struct {
|
|
||||||
nodes map[string]*rnode
|
|
||||||
}
|
|
||||||
type rnode struct {
|
|
||||||
next *rlevel
|
|
||||||
msg *packets.PublishPacket
|
|
||||||
}
|
|
||||||
type RetainResult struct {
|
|
||||||
msg []*packets.PublishPacket
|
|
||||||
}
|
|
||||||
|
|
||||||
func newRNode() *rnode {
|
|
||||||
return &rnode{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newRLevel() *rlevel {
|
|
||||||
return &rlevel{nodes: make(map[string]*rnode)}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewRetainList() *RetainList {
|
|
||||||
return &RetainList{root: newRLevel()}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RetainList) Insert(topic string, buf *packets.PublishPacket) error {
|
|
||||||
|
|
||||||
tokens, err := PublishTopicCheckAndSpilt(topic)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// log.Info("insert tokens:", tokens)
|
|
||||||
r.Lock()
|
|
||||||
|
|
||||||
l := r.root
|
|
||||||
var n *rnode
|
|
||||||
for _, t := range tokens {
|
|
||||||
n = l.nodes[t]
|
|
||||||
if n == nil {
|
|
||||||
n = newRNode()
|
|
||||||
l.nodes[t] = n
|
|
||||||
}
|
|
||||||
if n.next == nil {
|
|
||||||
n.next = newRLevel()
|
|
||||||
}
|
|
||||||
l = n.next
|
|
||||||
}
|
|
||||||
n.msg = buf
|
|
||||||
r.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RetainList) Match(topic string) []*packets.PublishPacket {
|
|
||||||
|
|
||||||
tokens, err := SubscribeTopicCheckAndSpilt(topic)
|
|
||||||
if err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
results := &RetainResult{}
|
|
||||||
|
|
||||||
r.Lock()
|
|
||||||
l := r.root
|
|
||||||
matchRLevel(l, tokens, results)
|
|
||||||
r.Unlock()
|
|
||||||
// log.Info("results: ", results)
|
|
||||||
return results.msg
|
|
||||||
|
|
||||||
}
|
|
||||||
func matchRLevel(l *rlevel, toks []string, results *RetainResult) {
|
|
||||||
var n *rnode
|
|
||||||
for i, t := range toks {
|
|
||||||
if l == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// log.Info("l info :", l.nodes)
|
|
||||||
if t == "#" {
|
|
||||||
for _, n := range l.nodes {
|
|
||||||
n.GetAll(results)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if t == "+" {
|
|
||||||
for _, n := range l.nodes {
|
|
||||||
if len(t[i+1:]) == 0 {
|
|
||||||
results.msg = append(results.msg, n.msg)
|
|
||||||
} else {
|
|
||||||
matchRLevel(n.next, toks[i+1:], results)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
n = l.nodes[t]
|
|
||||||
if n != nil {
|
|
||||||
l = n.next
|
|
||||||
} else {
|
|
||||||
l = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if n != nil {
|
|
||||||
results.msg = append(results.msg, n.msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *rnode) GetAll(results *RetainResult) {
|
|
||||||
// log.Info("node 's message: ", string(r.msg))
|
|
||||||
if r.msg != nil {
|
|
||||||
results.msg = append(results.msg, r.msg)
|
|
||||||
}
|
|
||||||
l := r.next
|
|
||||||
for _, n := range l.nodes {
|
|
||||||
n.GetAll(results)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,317 +0,0 @@
|
|||||||
/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
|
|
||||||
*/
|
|
||||||
package broker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
// A result structure better optimized for queue subs.
|
|
||||||
type SublistResult struct {
|
|
||||||
psubs []*subscription
|
|
||||||
qsubs []*subscription // don't make this a map, too expensive to iterate
|
|
||||||
}
|
|
||||||
|
|
||||||
// A Sublist stores and efficiently retrieves subscriptions.
|
|
||||||
type Sublist struct {
|
|
||||||
sync.RWMutex
|
|
||||||
cache map[string]*SublistResult
|
|
||||||
root *level
|
|
||||||
}
|
|
||||||
|
|
||||||
// A node contains subscriptions and a pointer to the next level.
|
|
||||||
type node struct {
|
|
||||||
next *level
|
|
||||||
psubs []*subscription
|
|
||||||
qsubs []*subscription
|
|
||||||
}
|
|
||||||
|
|
||||||
// A level represents a group of nodes and special pointers to
|
|
||||||
// wildcard nodes.
|
|
||||||
type level struct {
|
|
||||||
nodes map[string]*node
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a new default node.
|
|
||||||
func newNode() *node {
|
|
||||||
return &node{psubs: make([]*subscription, 0, 4), qsubs: make([]*subscription, 0, 4)}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a new default level. We use FNV1A as the hash
|
|
||||||
// algortihm for the tokens, which should be short.
|
|
||||||
func newLevel() *level {
|
|
||||||
return &level{nodes: make(map[string]*node)}
|
|
||||||
}
|
|
||||||
|
|
||||||
// New will create a default sublist
|
|
||||||
func NewSublist() *Sublist {
|
|
||||||
return &Sublist{root: newLevel(), cache: make(map[string]*SublistResult)}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insert adds a subscription into the sublist
|
|
||||||
func (s *Sublist) Insert(sub *subscription) error {
|
|
||||||
|
|
||||||
tokens, err := SubscribeTopicCheckAndSpilt(sub.topic)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
s.Lock()
|
|
||||||
|
|
||||||
l := s.root
|
|
||||||
var n *node
|
|
||||||
for _, t := range tokens {
|
|
||||||
n = l.nodes[t]
|
|
||||||
if n == nil {
|
|
||||||
n = newNode()
|
|
||||||
l.nodes[t] = n
|
|
||||||
}
|
|
||||||
if n.next == nil {
|
|
||||||
n.next = newLevel()
|
|
||||||
}
|
|
||||||
l = n.next
|
|
||||||
}
|
|
||||||
if sub.queue {
|
|
||||||
//check qsub is already exist
|
|
||||||
for i := range n.qsubs {
|
|
||||||
if equal(n.qsubs[i], sub) {
|
|
||||||
n.qsubs[i] = sub
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
n.qsubs = append(n.qsubs, sub)
|
|
||||||
} else {
|
|
||||||
//check psub is already exist
|
|
||||||
for i := range n.psubs {
|
|
||||||
if equal(n.psubs[i], sub) {
|
|
||||||
n.psubs[i] = sub
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
n.psubs = append(n.psubs, sub)
|
|
||||||
}
|
|
||||||
|
|
||||||
topic := string(sub.topic)
|
|
||||||
s.addToCache(topic, sub)
|
|
||||||
s.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Sublist) addToCache(topic string, sub *subscription) {
|
|
||||||
for k, r := range s.cache {
|
|
||||||
if matchLiteral(k, topic) {
|
|
||||||
// Copy since others may have a reference.
|
|
||||||
nr := copyResult(r)
|
|
||||||
if sub.queue == false {
|
|
||||||
nr.psubs = append(nr.psubs, sub)
|
|
||||||
} else {
|
|
||||||
nr.qsubs = append(nr.qsubs, sub)
|
|
||||||
}
|
|
||||||
s.cache[k] = nr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Sublist) removeFromCache(topic string, sub *subscription) {
|
|
||||||
for k := range s.cache {
|
|
||||||
if !matchLiteral(k, topic) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Since someone else may be referecing, can't modify the list
|
|
||||||
// safely, just let it re-populate.
|
|
||||||
delete(s.cache, k)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func matchLiteral(literal, topic string) bool {
|
|
||||||
tok, _ := SubscribeTopicCheckAndSpilt(topic)
|
|
||||||
li, _ := PublishTopicCheckAndSpilt(literal)
|
|
||||||
|
|
||||||
for i := 0; i < len(tok); i++ {
|
|
||||||
b := tok[i]
|
|
||||||
switch b {
|
|
||||||
case "+":
|
|
||||||
|
|
||||||
case "#":
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
if b != li[i] {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deep copy
|
|
||||||
func copyResult(r *SublistResult) *SublistResult {
|
|
||||||
nr := &SublistResult{}
|
|
||||||
nr.psubs = append([]*subscription(nil), r.psubs...)
|
|
||||||
nr.qsubs = append([]*subscription(nil), r.qsubs...)
|
|
||||||
return nr
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Sublist) Remove(sub *subscription) error {
|
|
||||||
tokens, err := SubscribeTopicCheckAndSpilt(sub.topic)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
s.Lock()
|
|
||||||
defer s.Unlock()
|
|
||||||
|
|
||||||
l := s.root
|
|
||||||
var n *node
|
|
||||||
|
|
||||||
for _, t := range tokens {
|
|
||||||
if l == nil {
|
|
||||||
return errors.New("No Matches subscription Found")
|
|
||||||
}
|
|
||||||
n = l.nodes[t]
|
|
||||||
if n != nil {
|
|
||||||
l = n.next
|
|
||||||
} else {
|
|
||||||
l = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !s.removeFromNode(n, sub) {
|
|
||||||
return errors.New("No Matches subscription Found")
|
|
||||||
}
|
|
||||||
topic := string(sub.topic)
|
|
||||||
s.removeFromCache(topic, sub)
|
|
||||||
return nil
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Sublist) removeFromNode(n *node, sub *subscription) (found bool) {
|
|
||||||
if n == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if sub.queue {
|
|
||||||
n.qsubs, found = removeSubFromList(sub, n.qsubs)
|
|
||||||
return found
|
|
||||||
} else {
|
|
||||||
n.psubs, found = removeSubFromList(sub, n.psubs)
|
|
||||||
return found
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Sublist) Match(topic string) *SublistResult {
|
|
||||||
s.RLock()
|
|
||||||
rc, ok := s.cache[topic]
|
|
||||||
s.RUnlock()
|
|
||||||
|
|
||||||
if ok {
|
|
||||||
return rc
|
|
||||||
}
|
|
||||||
|
|
||||||
tokens, err := PublishTopicCheckAndSpilt(topic)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("\tserver/sublist.go: ", zap.Error(err))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
result := &SublistResult{}
|
|
||||||
|
|
||||||
s.Lock()
|
|
||||||
l := s.root
|
|
||||||
if len(tokens) > 0 {
|
|
||||||
if tokens[0] == "/" {
|
|
||||||
if _, exist := l.nodes["#"]; exist {
|
|
||||||
addNodeToResults(l.nodes["#"], result)
|
|
||||||
}
|
|
||||||
if _, exist := l.nodes["+"]; exist {
|
|
||||||
matchLevel(l.nodes["/"].next, tokens[1:], result)
|
|
||||||
}
|
|
||||||
if _, exist := l.nodes["/"]; exist {
|
|
||||||
matchLevel(l.nodes["/"].next, tokens[1:], result)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
matchLevel(s.root, tokens, result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
s.cache[topic] = result
|
|
||||||
if len(s.cache) > 1024 {
|
|
||||||
for k := range s.cache {
|
|
||||||
delete(s.cache, k)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
s.Unlock()
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
func matchLevel(l *level, toks []string, results *SublistResult) {
|
|
||||||
var swc, n *node
|
|
||||||
exist := false
|
|
||||||
for i, t := range toks {
|
|
||||||
if l == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, exist = l.nodes["#"]; exist {
|
|
||||||
addNodeToResults(l.nodes["#"], results)
|
|
||||||
}
|
|
||||||
if t != "/" {
|
|
||||||
if swc, exist = l.nodes["+"]; exist {
|
|
||||||
matchLevel(l.nodes["+"].next, toks[i+1:], results)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if _, exist = l.nodes["+"]; exist {
|
|
||||||
addNodeToResults(l.nodes["+"], results)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
n = l.nodes[t]
|
|
||||||
if n != nil {
|
|
||||||
l = n.next
|
|
||||||
} else {
|
|
||||||
l = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if n != nil {
|
|
||||||
addNodeToResults(n, results)
|
|
||||||
}
|
|
||||||
if swc != nil {
|
|
||||||
addNodeToResults(n, results)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This will add in a node's results to the total results.
|
|
||||||
func addNodeToResults(n *node, results *SublistResult) {
|
|
||||||
results.psubs = append(results.psubs, n.psubs...)
|
|
||||||
results.qsubs = append(results.qsubs, n.qsubs...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func removeSubFromList(sub *subscription, sl []*subscription) ([]*subscription, bool) {
|
|
||||||
for i := 0; i < len(sl); i++ {
|
|
||||||
if sl[i] == sub {
|
|
||||||
last := len(sl) - 1
|
|
||||||
sl[i] = sl[last]
|
|
||||||
sl[last] = nil
|
|
||||||
sl = sl[:last]
|
|
||||||
return shrinkAsNeeded(sl), true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return sl, false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Checks if we need to do a resize. This is for very large growth then
|
|
||||||
// subsequent return to a more normal size from unsubscribe.
|
|
||||||
func shrinkAsNeeded(sl []*subscription) []*subscription {
|
|
||||||
lsl := len(sl)
|
|
||||||
csl := cap(sl)
|
|
||||||
// Don't bother if list not too big
|
|
||||||
if csl <= 8 {
|
|
||||||
return sl
|
|
||||||
}
|
|
||||||
pFree := float32(csl-lsl) / float32(csl)
|
|
||||||
if pFree > 0.50 {
|
|
||||||
return append([]*subscription(nil), sl...)
|
|
||||||
}
|
|
||||||
return sl
|
|
||||||
}
|
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
{
|
{
|
||||||
|
"ID": "node-0",
|
||||||
"workerNum": 4096,
|
"workerNum": 4096,
|
||||||
"port": "1883",
|
"port": "1883",
|
||||||
"host": "0.0.0.0",
|
"host": "0.0.0.0",
|
||||||
|
|||||||
Reference in New Issue
Block a user