From 551cbd839a87ec3566d9eb8d1b9aec5cdb521c2f Mon Sep 17 00:00:00 2001 From: joyz Date: Mon, 24 Dec 2018 19:43:04 +0800 Subject: [PATCH] modify --- broker/broker.go | 4 +- broker/client.go | 4 +- broker/retain.go | 121 -------- broker/sublist.go | 317 -------------------- conf/hmq.config | 1 + {sessions => lib/sessions}/memprovider.go | 0 {sessions => lib/sessions}/redisprovider.go | 0 {sessions => lib/sessions}/session.go | 0 {sessions => lib/sessions}/sessions.go | 0 {topics => lib/topics}/memtopics.go | 0 {topics => lib/topics}/topics.go | 0 11 files changed, 5 insertions(+), 442 deletions(-) delete mode 100644 broker/retain.go delete mode 100644 broker/sublist.go rename {sessions => lib/sessions}/memprovider.go (100%) rename {sessions => lib/sessions}/redisprovider.go (100%) rename {sessions => lib/sessions}/session.go (100%) rename {sessions => lib/sessions}/sessions.go (100%) rename {topics => lib/topics}/memtopics.go (100%) rename {topics => lib/topics}/topics.go (100%) diff --git a/broker/broker.go b/broker/broker.go index ffae754..fd543e2 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -13,9 +13,9 @@ import ( "github.com/eclipse/paho.mqtt.golang/packets" "github.com/fhmq/hmq/lib/acl" + "github.com/fhmq/hmq/lib/sessions" + "github.com/fhmq/hmq/lib/topics" "github.com/fhmq/hmq/pool" - "github.com/fhmq/hmq/sessions" - "github.com/fhmq/hmq/topics" "github.com/shirou/gopsutil/mem" "go.uber.org/zap" "golang.org/x/net/websocket" diff --git a/broker/client.go b/broker/client.go index 50eb426..237e505 100644 --- a/broker/client.go +++ b/broker/client.go @@ -12,8 +12,8 @@ import ( "time" "github.com/eclipse/paho.mqtt.golang/packets" - "github.com/fhmq/hmq/sessions" - "github.com/fhmq/hmq/topics" + "github.com/fhmq/hmq/lib/sessions" + "github.com/fhmq/hmq/lib/topics" "go.uber.org/zap" ) diff --git a/broker/retain.go b/broker/retain.go deleted file mode 100644 index 1c5cf12..0000000 --- a/broker/retain.go +++ /dev/null @@ -1,121 +0,0 @@ -package broker - -import ( - "github.com/eclipse/paho.mqtt.golang/packets" - "sync" -) - -type RetainList struct { - sync.RWMutex - root *rlevel -} -type rlevel struct { - nodes map[string]*rnode -} -type rnode struct { - next *rlevel - msg *packets.PublishPacket -} -type RetainResult struct { - msg []*packets.PublishPacket -} - -func newRNode() *rnode { - return &rnode{} -} - -func newRLevel() *rlevel { - return &rlevel{nodes: make(map[string]*rnode)} -} - -func NewRetainList() *RetainList { - return &RetainList{root: newRLevel()} -} - -func (r *RetainList) Insert(topic string, buf *packets.PublishPacket) error { - - tokens, err := PublishTopicCheckAndSpilt(topic) - if err != nil { - return err - } - // log.Info("insert tokens:", tokens) - r.Lock() - - l := r.root - var n *rnode - for _, t := range tokens { - n = l.nodes[t] - if n == nil { - n = newRNode() - l.nodes[t] = n - } - if n.next == nil { - n.next = newRLevel() - } - l = n.next - } - n.msg = buf - r.Unlock() - return nil -} - -func (r *RetainList) Match(topic string) []*packets.PublishPacket { - - tokens, err := SubscribeTopicCheckAndSpilt(topic) - if err != nil { - return nil - } - results := &RetainResult{} - - r.Lock() - l := r.root - matchRLevel(l, tokens, results) - r.Unlock() - // log.Info("results: ", results) - return results.msg - -} -func matchRLevel(l *rlevel, toks []string, results *RetainResult) { - var n *rnode - for i, t := range toks { - if l == nil { - return - } - // log.Info("l info :", l.nodes) - if t == "#" { - for _, n := range l.nodes { - n.GetAll(results) - } - } - if t == "+" { - for _, n := range l.nodes { - if len(t[i+1:]) == 0 { - results.msg = append(results.msg, n.msg) - } else { - matchRLevel(n.next, toks[i+1:], results) - } - } - } - - n = l.nodes[t] - if n != nil { - l = n.next - } else { - l = nil - } - } - if n != nil { - results.msg = append(results.msg, n.msg) - } -} - -func (r *rnode) GetAll(results *RetainResult) { - // log.Info("node 's message: ", string(r.msg)) - if r.msg != nil { - results.msg = append(results.msg, r.msg) - } - l := r.next - for _, n := range l.nodes { - n.GetAll(results) - } -} diff --git a/broker/sublist.go b/broker/sublist.go deleted file mode 100644 index e1d8ecc..0000000 --- a/broker/sublist.go +++ /dev/null @@ -1,317 +0,0 @@ -/* Copyright (c) 2018, joy.zhou - */ -package broker - -import ( - "errors" - "go.uber.org/zap" - "sync" -) - -// A result structure better optimized for queue subs. -type SublistResult struct { - psubs []*subscription - qsubs []*subscription // don't make this a map, too expensive to iterate -} - -// A Sublist stores and efficiently retrieves subscriptions. -type Sublist struct { - sync.RWMutex - cache map[string]*SublistResult - root *level -} - -// A node contains subscriptions and a pointer to the next level. -type node struct { - next *level - psubs []*subscription - qsubs []*subscription -} - -// A level represents a group of nodes and special pointers to -// wildcard nodes. -type level struct { - nodes map[string]*node -} - -// Create a new default node. -func newNode() *node { - return &node{psubs: make([]*subscription, 0, 4), qsubs: make([]*subscription, 0, 4)} -} - -// Create a new default level. We use FNV1A as the hash -// algortihm for the tokens, which should be short. -func newLevel() *level { - return &level{nodes: make(map[string]*node)} -} - -// New will create a default sublist -func NewSublist() *Sublist { - return &Sublist{root: newLevel(), cache: make(map[string]*SublistResult)} -} - -// Insert adds a subscription into the sublist -func (s *Sublist) Insert(sub *subscription) error { - - tokens, err := SubscribeTopicCheckAndSpilt(sub.topic) - if err != nil { - return err - } - s.Lock() - - l := s.root - var n *node - for _, t := range tokens { - n = l.nodes[t] - if n == nil { - n = newNode() - l.nodes[t] = n - } - if n.next == nil { - n.next = newLevel() - } - l = n.next - } - if sub.queue { - //check qsub is already exist - for i := range n.qsubs { - if equal(n.qsubs[i], sub) { - n.qsubs[i] = sub - return nil - } - } - n.qsubs = append(n.qsubs, sub) - } else { - //check psub is already exist - for i := range n.psubs { - if equal(n.psubs[i], sub) { - n.psubs[i] = sub - return nil - } - } - n.psubs = append(n.psubs, sub) - } - - topic := string(sub.topic) - s.addToCache(topic, sub) - s.Unlock() - return nil -} - -func (s *Sublist) addToCache(topic string, sub *subscription) { - for k, r := range s.cache { - if matchLiteral(k, topic) { - // Copy since others may have a reference. - nr := copyResult(r) - if sub.queue == false { - nr.psubs = append(nr.psubs, sub) - } else { - nr.qsubs = append(nr.qsubs, sub) - } - s.cache[k] = nr - } - } -} - -func (s *Sublist) removeFromCache(topic string, sub *subscription) { - for k := range s.cache { - if !matchLiteral(k, topic) { - continue - } - // Since someone else may be referecing, can't modify the list - // safely, just let it re-populate. - delete(s.cache, k) - } -} - -func matchLiteral(literal, topic string) bool { - tok, _ := SubscribeTopicCheckAndSpilt(topic) - li, _ := PublishTopicCheckAndSpilt(literal) - - for i := 0; i < len(tok); i++ { - b := tok[i] - switch b { - case "+": - - case "#": - return true - default: - if b != li[i] { - return false - } - } - } - return true -} - -// Deep copy -func copyResult(r *SublistResult) *SublistResult { - nr := &SublistResult{} - nr.psubs = append([]*subscription(nil), r.psubs...) - nr.qsubs = append([]*subscription(nil), r.qsubs...) - return nr -} - -func (s *Sublist) Remove(sub *subscription) error { - tokens, err := SubscribeTopicCheckAndSpilt(sub.topic) - if err != nil { - return err - } - s.Lock() - defer s.Unlock() - - l := s.root - var n *node - - for _, t := range tokens { - if l == nil { - return errors.New("No Matches subscription Found") - } - n = l.nodes[t] - if n != nil { - l = n.next - } else { - l = nil - } - } - if !s.removeFromNode(n, sub) { - return errors.New("No Matches subscription Found") - } - topic := string(sub.topic) - s.removeFromCache(topic, sub) - return nil - -} - -func (s *Sublist) removeFromNode(n *node, sub *subscription) (found bool) { - if n == nil { - return false - } - - if sub.queue { - n.qsubs, found = removeSubFromList(sub, n.qsubs) - return found - } else { - n.psubs, found = removeSubFromList(sub, n.psubs) - return found - } - - return false -} - -func (s *Sublist) Match(topic string) *SublistResult { - s.RLock() - rc, ok := s.cache[topic] - s.RUnlock() - - if ok { - return rc - } - - tokens, err := PublishTopicCheckAndSpilt(topic) - if err != nil { - log.Error("\tserver/sublist.go: ", zap.Error(err)) - return nil - } - - result := &SublistResult{} - - s.Lock() - l := s.root - if len(tokens) > 0 { - if tokens[0] == "/" { - if _, exist := l.nodes["#"]; exist { - addNodeToResults(l.nodes["#"], result) - } - if _, exist := l.nodes["+"]; exist { - matchLevel(l.nodes["/"].next, tokens[1:], result) - } - if _, exist := l.nodes["/"]; exist { - matchLevel(l.nodes["/"].next, tokens[1:], result) - } - } else { - matchLevel(s.root, tokens, result) - } - } - s.cache[topic] = result - if len(s.cache) > 1024 { - for k := range s.cache { - delete(s.cache, k) - break - } - } - - s.Unlock() - return result -} - -func matchLevel(l *level, toks []string, results *SublistResult) { - var swc, n *node - exist := false - for i, t := range toks { - if l == nil { - return - } - - if _, exist = l.nodes["#"]; exist { - addNodeToResults(l.nodes["#"], results) - } - if t != "/" { - if swc, exist = l.nodes["+"]; exist { - matchLevel(l.nodes["+"].next, toks[i+1:], results) - } - } else { - if _, exist = l.nodes["+"]; exist { - addNodeToResults(l.nodes["+"], results) - } - } - - n = l.nodes[t] - if n != nil { - l = n.next - } else { - l = nil - } - } - if n != nil { - addNodeToResults(n, results) - } - if swc != nil { - addNodeToResults(n, results) - } -} - -// This will add in a node's results to the total results. -func addNodeToResults(n *node, results *SublistResult) { - results.psubs = append(results.psubs, n.psubs...) - results.qsubs = append(results.qsubs, n.qsubs...) -} - -func removeSubFromList(sub *subscription, sl []*subscription) ([]*subscription, bool) { - for i := 0; i < len(sl); i++ { - if sl[i] == sub { - last := len(sl) - 1 - sl[i] = sl[last] - sl[last] = nil - sl = sl[:last] - return shrinkAsNeeded(sl), true - } - } - return sl, false -} - -// Checks if we need to do a resize. This is for very large growth then -// subsequent return to a more normal size from unsubscribe. -func shrinkAsNeeded(sl []*subscription) []*subscription { - lsl := len(sl) - csl := cap(sl) - // Don't bother if list not too big - if csl <= 8 { - return sl - } - pFree := float32(csl-lsl) / float32(csl) - if pFree > 0.50 { - return append([]*subscription(nil), sl...) - } - return sl -} diff --git a/conf/hmq.config b/conf/hmq.config index 6c8711c..5a97dbb 100644 --- a/conf/hmq.config +++ b/conf/hmq.config @@ -1,4 +1,5 @@ { + "ID": "node-0", "workerNum": 4096, "port": "1883", "host": "0.0.0.0", diff --git a/sessions/memprovider.go b/lib/sessions/memprovider.go similarity index 100% rename from sessions/memprovider.go rename to lib/sessions/memprovider.go diff --git a/sessions/redisprovider.go b/lib/sessions/redisprovider.go similarity index 100% rename from sessions/redisprovider.go rename to lib/sessions/redisprovider.go diff --git a/sessions/session.go b/lib/sessions/session.go similarity index 100% rename from sessions/session.go rename to lib/sessions/session.go diff --git a/sessions/sessions.go b/lib/sessions/sessions.go similarity index 100% rename from sessions/sessions.go rename to lib/sessions/sessions.go diff --git a/topics/memtopics.go b/lib/topics/memtopics.go similarity index 100% rename from topics/memtopics.go rename to lib/topics/memtopics.go diff --git a/topics/topics.go b/lib/topics/topics.go similarity index 100% rename from topics/topics.go rename to lib/topics/topics.go