42 Commits
1.01 ... wpool

Author SHA1 Message Date
zhouyuyan
026cfbe123 wpool 2018-03-02 13:29:18 +08:00
zhouyuyan
6f26df7a9a pool 2018-03-02 12:52:23 +08:00
zhouyuyan
fd2adb7a26 pool 2018-03-02 12:42:04 +08:00
zhouyuyan
c0fea6a5ba modify_message_pool 2018-02-24 13:19:43 +08:00
zhouyuyan
47500910e1 fix broker out painc 2018-02-06 11:01:06 +08:00
joy.zhou
0ff20b6ee2 Update README.md 2018-02-03 13:11:53 +08:00
joy.zhou
7155667f6c Pool (#16)
* add pool

* elastic workerpool

* del buf

* modify usage

* modify readme
2018-02-03 12:42:25 +08:00
zhouyuyan
83db82cdcc Merge branch 'master' of https://github.com/fhmq/hmq 2018-01-31 11:00:29 +08:00
zhouyuyan
b3653bcfb1 fix #14 2018-01-31 10:59:59 +08:00
joy.zhou
221d00480e update read.me 2018-01-26 16:29:14 +08:00
zhouyuyan
91733bf91e modify debug log 2018-01-26 15:47:34 +08:00
Marc Magnin
ef252550dc fhmq/hmq#5 added zap logger (#11) 2018-01-26 13:51:36 +08:00
joy.zhou
1058256235 update readme 2018-01-25 19:34:37 +08:00
joy.zhou
5a569f14a3 del debug info
delete debug message body
2018-01-25 19:31:47 +08:00
zhouyuyan
93b21777ff add lisence 2018-01-25 13:47:50 +08:00
zhouyuyan
dcf2934e1b add flag for hmq 2018-01-25 13:11:45 +08:00
joy.zhou
d9e6e216b0 Merge pull request #4 from MarcMagnin/master
fhmq/hmq#2 added full package ref
2018-01-24 18:14:13 +08:00
Marc Magnin
ca3951769a fhmq/hmq#2 added full package ref 2018-01-23 15:29:16 +01:00
zhouyuyan
0439e7ce90 fxi ws conn 2018-01-22 09:30:08 +08:00
zhouyuyan
dc0f2185ab skip self 2018-01-19 13:53:47 +08:00
zhouyuyan
7462afcfb5 modify readme 2018-01-19 13:49:53 +08:00
zhouyuyan
114e6f901e modify cluster 2018-01-19 13:41:17 +08:00
zhouyuyan
0cb51bd37a Merge branch 'master' of https://github.com/fhmq/hmq 2018-01-18 09:18:38 +08:00
zhouyuyan
819b4725f2 modify route 2018-01-18 09:17:48 +08:00
joy.zhou
85bdeccbfc release link
addd down link
2018-01-17 21:39:31 +08:00
zhouyuyan
1339a04b28 modify Dockerfile 2018-01-17 10:11:36 +08:00
zhouyuyan
957329d85c modify Dockerfile 2018-01-17 10:10:04 +08:00
zhouyuyan
7db7edaa17 cluster fix 2018-01-17 09:39:07 +08:00
zhouyuyan
1d6f6a4a71 add cluster 2018-01-16 16:50:10 +08:00
zhouyuyan
123bb7210f move dispatcher 2018-01-02 10:55:28 +08:00
zhouyuyan
9ad6590e83 modify timer 2017-12-28 09:13:20 +08:00
zhouyuyan
516db49db5 modify keep alive 2017-12-27 16:42:38 +08:00
zhouyuyan
a260057bfe modify time close 2017-12-08 13:25:05 +08:00
zhouyuyan
bdd802ebfb modify log 2017-12-07 16:30:48 +08:00
zhouyuyan
5786e69b01 modify cluster logic 2017-11-21 14:05:06 +08:00
zhouyuyan
6a89b627d4 add clientID in log for debug 2017-11-02 15:31:57 +08:00
zhouyuyan
208a7cf0a8 wait for message when close connection 2017-10-26 16:11:01 +08:00
zhouyuyan
a7fb7f1912 modify close old connect connection logic 2017-10-26 15:57:19 +08:00
zhouyuyan
eeab0c6b7d modify readloop 2017-09-22 12:08:11 +08:00
zhouyuyan
4646042b7f disconnect 2017-09-12 15:37:05 +08:00
zhouyuyan
49385e52fd log format 2017-09-12 09:50:54 +08:00
zhouyuyan
3ed8625bb9 log formar 2017-09-12 09:49:50 +08:00
25 changed files with 1081 additions and 421 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
hmq
log
log/*
*.test

View File

@@ -1,6 +1,5 @@
FROM alpine
COPY hmq /
COPY hmq.config /
COPY ssl /ssl
COPY conf /conf

201
LICENSE Normal file
View File

@@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright {yyyy} {name of copyright owner}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@@ -3,16 +3,42 @@ Free and High Performance MQTT Broker
## About
Golang MQTT Broker, Version 3.1.1, and Compatible
for [eclipse paho client](https://github.com/eclipse?utf8=%E2%9C%93&q=mqtt&type=&language=)
for [eclipse paho client](https://github.com/eclipse?utf8=%E2%9C%93&q=mqtt&type=&language=) and mosquitto-client
Download: [click here](https://github.com/fhmq/hmq/releases)
## RUNNING
```bash
$ git clone https://github.com/fhmq/hmq.git
$ cd hmq
$ go get github.com/fhmq/hmq
$ cd $GOPATH/github.com/fhmq/hmq
$ go run main.go
```
### broker.config
## Usage of hmq:
~~~
Usage: hmq [options]
Broker Options:
-w, --worker <number> Worker num to process message, perfer (client num)/10. (default 1024)
-p, --port <port> Use port for clients (default: 1883)
--host <host> Network host to listen on. (default "0.0.0.0")
-ws, --wsport <port> Use port for websocket monitoring
-wsp,--wspath <path> Use path for websocket monitoring
-c, --config <file> Configuration file
Logging Options:
-d, --debug <bool> Enable debugging output (default false)
-D Debug enabled
Cluster Options:
-r, --router <rurl> Router who maintenance cluster info
-cp, --clusterport <cluster-port> Cluster listen port for others
Common Options:
-h, --help Show this message
~~~
### hmq.config
~~~
{
"workerNum": 4096,
@@ -20,9 +46,9 @@ $ go run main.go
"host": "0.0.0.0",
"cluster": {
"host": "0.0.0.0",
"port": "1993",
"routers": ["10.10.0.11:1993","10.10.0.12:1993"]
"port": "1993"
},
"router": "127.0.0.1:9888",
"wsPort": "1888",
"wsPath": "/ws",
"wsTLS": true,
@@ -59,6 +85,16 @@ $ go run main.go
* Flexible ACL
### Cluster
```bash
1, start router for hmq (https://github.com/fhmq/router.git)
$ go get github.com/fhmq/router
$ cd $GOPATH/github.com/fhmq/router
$ go run main.go
2, config router in hmq.config ("router": "127.0.0.1:9888")
```
### QUEUE SUBSCRIBE
~~~
| Prefix | Examples |
@@ -129,4 +165,4 @@ Client -> | Rule1 | --nomatch--> | Rule2 | --nomatch--> | Rule3 | -->
## License
* Apache License Version 2.0
* Apache License Version 2.0

View File

@@ -1 +0,0 @@
theme: jekyll-theme-slate

View File

@@ -1,11 +1,12 @@
/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
*/
package broker
import (
"hmq/lib/acl"
"strings"
log "github.com/cihub/seelog"
"github.com/fhmq/hmq/lib/acl"
"github.com/fsnotify/fsnotify"
"go.uber.org/zap"
"strings"
)
const (
@@ -40,10 +41,10 @@ func (b *Broker) handleFsEvent(event fsnotify.Event) error {
case b.config.AclConf:
if event.Op&fsnotify.Write == fsnotify.Write ||
event.Op&fsnotify.Create == fsnotify.Create {
log.Info("text:handling acl config change event:", event)
log.Info("text:handling acl config change event:", zap.String("filename", event.Name))
aclconfig, err := acl.AclConfigLoad(event.Name)
if err != nil {
log.Error("aclconfig change failed, load acl conf error: ", err)
log.Error("aclconfig change failed, load acl conf error: ", zap.Error(err))
return err
}
b.AclConfig = aclconfig
@@ -56,14 +57,14 @@ func (b *Broker) StartAclWatcher() {
go func() {
wch, e := fsnotify.NewWatcher()
if e != nil {
log.Error("start monitor acl config file error,", e)
log.Error("start monitor acl config file error,", zap.Error(e))
return
}
defer wch.Close()
for _, i := range watchList {
if err := wch.Add(i); err != nil {
log.Error("start monitor acl config file error,", err)
log.Error("start monitor acl config file error,", zap.Error(err))
return
}
}
@@ -73,7 +74,7 @@ func (b *Broker) StartAclWatcher() {
case evt := <-wch.Events:
b.handleFsEvent(evt)
case err := <-wch.Errors:
log.Error("error:", err.Error())
log.Error("error:", zap.Error(err))
}
}
}()

View File

@@ -1,8 +1,9 @@
/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
*/
package broker
import (
"crypto/tls"
"hmq/lib/acl"
"net"
"net/http"
"runtime/debug"
@@ -11,39 +12,67 @@ import (
"time"
"github.com/eclipse/paho.mqtt.golang/packets"
"github.com/fhmq/hmq/lib/acl"
"github.com/fhmq/hmq/pool"
"github.com/shirou/gopsutil/mem"
"go.uber.org/zap"
"golang.org/x/net/websocket"
log "github.com/cihub/seelog"
)
const (
MessagePoolNum = 1024
MessagePoolMessageNum = 1024
)
type Message struct {
client *client
packet packets.ControlPacket
}
type Broker struct {
id string
cid uint64
config *Config
tlsConfig *tls.Config
AclConfig *acl.ACLConfig
clients sync.Map
routes sync.Map
remotes sync.Map
sl *Sublist
rl *RetainList
queues map[string]int
id string
cid uint64
mu sync.Mutex
config *Config
tlsConfig *tls.Config
AclConfig *acl.ACLConfig
wpool *pool.WorkerPool
clients sync.Map
routes sync.Map
remotes sync.Map
nodes map[string]interface{}
clusterPool chan *Message
sl *Sublist
rl *RetainList
queues map[string]int
// messagePool []chan *Message
}
func newMessagePool() []chan *Message {
pool := make([]chan *Message, 0)
for i := 0; i < MessagePoolNum; i++ {
ch := make(chan *Message, MessagePoolMessageNum)
pool = append(pool, ch)
}
return pool
}
func NewBroker(config *Config) (*Broker, error) {
b := &Broker{
id: GenUniqueId(),
config: config,
sl: NewSublist(),
rl: NewRetainList(),
queues: make(map[string]int),
id: GenUniqueId(),
config: config,
wpool: pool.New(config.Worker),
sl: NewSublist(),
rl: NewRetainList(),
nodes: make(map[string]interface{}),
queues: make(map[string]int),
clusterPool: make(chan *Message),
// messagePool: newMessagePool(),
}
if b.config.TlsPort != "" {
tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
if err != nil {
log.Error("new tlsConfig error: ", err)
log.Error("new tlsConfig error", zap.Error(err))
return nil, err
}
b.tlsConfig = tlsconfig
@@ -51,7 +80,7 @@ func NewBroker(config *Config) (*Broker, error) {
if b.config.Acl {
aclconfig, err := acl.AclConfigLoad(b.config.AclConf)
if err != nil {
log.Error("Load acl conf error: ", err)
log.Error("Load acl conf error", zap.Error(err))
return nil, err
}
b.AclConfig = aclconfig
@@ -60,40 +89,67 @@ func NewBroker(config *Config) (*Broker, error) {
return b, nil
}
func (b *Broker) SubmitWork(msg *Message) {
if b.wpool == nil {
b.wpool = pool.New(b.config.Worker)
}
if msg.client.typ == CLUSTER {
b.clusterPool <- msg
} else {
b.wpool.Submit(func() {
ProcessMessage(msg)
})
}
}
func (b *Broker) Start() {
if b == nil {
log.Error("broker is null")
return
}
//listen clinet over tcp
if b.config.Port != "" {
go b.StartClientListening(false)
}
//listen for cluster
if b.config.Cluster.Port != "" {
go b.StartClusterListening()
}
//listen for websocket
if b.config.WsPort != "" {
go b.StartWebsocketListening()
}
//listen client over tls
if b.config.TlsPort != "" {
go b.StartClientListening(true)
}
if len(b.config.Cluster.Routes) > 0 {
b.ConnectToRouters()
//connect on other node in cluster
if b.config.Router != "" {
go b.processClusterInfo()
b.ConnectToDiscovery()
}
//system monitor
go StateMonitor()
}
func StateMonitor() {
v, _ := mem.VirtualMemory()
timeSticker := time.NewTicker(time.Second * 5)
timeSticker := time.NewTicker(time.Second * 30)
for {
select {
case <-timeSticker.C:
if v.UsedPercent > 0.8 {
if v.UsedPercent > 75 {
debug.FreeOSMemory()
}
// fmt.Printf("Total: %v, Free:%v, UsedPercent:%f%%\n", v.Total, v.Free, v.UsedPercent)
}
}
}
@@ -101,7 +157,7 @@ func StateMonitor() {
func (b *Broker) StartWebsocketListening() {
path := b.config.WsPath
hp := ":" + b.config.WsPort
log.Info("Start Webscoker Listening on ", hp, path)
log.Info("Start Websocket Listener on:", zap.String("hp", hp), zap.String("path", path))
http.Handle(path, websocket.Handler(b.wsHandler))
var err error
if b.config.WsTLS {
@@ -110,7 +166,7 @@ func (b *Broker) StartWebsocketListening() {
err = http.ListenAndServe(hp, nil)
}
if err != nil {
log.Error("ListenAndServe: " + err.Error())
log.Error("ListenAndServe:" + err.Error())
return
}
}
@@ -119,7 +175,7 @@ func (b *Broker) wsHandler(ws *websocket.Conn) {
// io.Copy(ws, ws)
atomic.AddUint64(&b.cid, 1)
ws.PayloadType = websocket.BinaryFrame
b.handleConnection(CLIENT, ws, b.cid)
b.handleConnection(CLIENT, ws)
}
func (b *Broker) StartClientListening(Tls bool) {
@@ -129,14 +185,14 @@ func (b *Broker) StartClientListening(Tls bool) {
if Tls {
hp = b.config.TlsHost + ":" + b.config.TlsPort
l, err = tls.Listen("tcp", hp, b.tlsConfig)
log.Info("Start TLS Listening client on ", hp)
log.Info("Start TLS Listening client on ", zap.String("hp", hp))
} else {
hp := b.config.Host + ":" + b.config.Port
l, err = net.Listen("tcp", hp)
log.Info("Start Listening client on ", hp)
log.Info("Start Listening client on ", zap.String("hp", hp))
}
if err != nil {
log.Error("Error listening on ", err)
log.Error("Error listening on ", zap.Error(err))
return
}
tmpDelay := 10 * ACCEPT_MIN_SLEEP
@@ -145,20 +201,20 @@ func (b *Broker) StartClientListening(Tls bool) {
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
log.Error("Temporary Client Accept Error(%v), sleeping %dms",
ne, tmpDelay/time.Millisecond)
zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond))
time.Sleep(tmpDelay)
tmpDelay *= 2
if tmpDelay > ACCEPT_MAX_SLEEP {
tmpDelay = ACCEPT_MAX_SLEEP
}
} else {
log.Error("Accept error: %v", err)
log.Error("Accept error: %v", zap.Error(err))
}
continue
}
tmpDelay = ACCEPT_MIN_SLEEP
atomic.AddUint64(&b.cid, 1)
go b.handleConnection(CLIENT, conn, b.cid)
go b.handleConnection(CLIENT, conn)
}
}
@@ -171,7 +227,7 @@ func (b *Broker) Handshake(conn net.Conn) bool {
// Force handshake
if err := nc.Handshake(); err != nil {
log.Error("TLS handshake error, ", err)
log.Error("TLS handshake error, ", zap.Error(err))
return false
}
nc.SetReadDeadline(time.Time{})
@@ -194,49 +250,42 @@ func TlsTimeout(conn *tls.Conn) {
func (b *Broker) StartClusterListening() {
var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port
log.Info("Start Listening cluster on ", hp)
log.Info("Start Listening cluster on ", zap.String("hp", hp))
l, e := net.Listen("tcp", hp)
if e != nil {
log.Error("Error listening on ", e)
log.Error("Error listening on ", zap.Error(e))
return
}
var idx uint64 = 0
tmpDelay := 10 * ACCEPT_MIN_SLEEP
for {
conn, err := l.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
log.Error("Temporary Client Accept Error(%v), sleeping %dms",
ne, tmpDelay/time.Millisecond)
zap.Error(ne), zap.Duration("sleeping", tmpDelay/time.Millisecond))
time.Sleep(tmpDelay)
tmpDelay *= 2
if tmpDelay > ACCEPT_MAX_SLEEP {
tmpDelay = ACCEPT_MAX_SLEEP
}
} else {
log.Error("Accept error: %v", err)
log.Error("Accept error: %v", zap.Error(err))
}
continue
}
tmpDelay = ACCEPT_MIN_SLEEP
go b.handleConnection(ROUTER, conn, idx)
if idx == 1 {
idx = 0
} else {
idx = idx + 1
}
go b.handleConnection(ROUTER, conn)
}
}
func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
func (b *Broker) handleConnection(typ int, conn net.Conn) {
//process connect packet
packet, err := packets.ReadPacket(conn)
if err != nil {
log.Error("read connect packet error: ", err)
log.Error("read connect packet error: ", zap.Error(err))
return
}
if packet == nil {
@@ -253,7 +302,7 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
connack.SessionPresent = msg.CleanSession
err = connack.Write(conn)
if err != nil {
log.Error("send connack error, ", err)
log.Error("send connack error, ", zap.Error(err), zap.String("clientID", msg.ClientIdentifier))
return
}
@@ -281,75 +330,188 @@ func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
conn: conn,
info: info,
}
c.init()
cid := c.info.clientID
var msgPool *MessagePool
var exist bool
var old interface{}
switch typ {
case CLIENT:
msgPool = MSGPool[idx%MessagePoolNum].GetPool()
old, exist = b.clients.Load(cid)
if exist {
log.Warn("client exist, close old...", zap.String("clientID", c.info.clientID))
ol, ok := old.(*client)
if ok {
ol.Close()
}
}
b.clients.Store(cid, c)
case ROUTER:
msgPool = MSGPool[(MessagePoolNum + idx)].GetPool()
old, exist = b.routes.Load(cid)
if exist {
log.Warn("router exist, close old...")
ol, ok := old.(*client)
if ok {
ol.Close()
}
}
b.routes.Store(cid, c)
}
if exist {
log.Warn("client or routers exist, close old...")
ol, ok := old.(*client)
if ok {
ol.Close()
}
}
c.readLoop(msgPool)
// mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum]
c.readLoop()
}
func (b *Broker) ConnectToRouters() {
for i := 0; i < len(b.config.Cluster.Routes); i++ {
url := b.config.Cluster.Routes[i]
go b.connectRouter(url, "")
}
}
func (b *Broker) connectRouter(url, remoteID string) {
func (b *Broker) ConnectToDiscovery() {
var conn net.Conn
var err error
var tempDelay time.Duration = 0
for {
conn, err = net.Dial("tcp", url)
conn, err = net.Dial("tcp", b.config.Router)
if err != nil {
log.Error("Error trying to connect to route: ", err)
select {
case <-time.After(DEFAULT_ROUTE_CONNECT):
log.Debug("Connect to route timeout ,retry...")
continue
log.Error("Error trying to connect to route: ", zap.Error(err))
log.Debug("Connect to route timeout ,retry...")
if 0 == tempDelay {
tempDelay = 1 * time.Second
} else {
tempDelay *= 2
}
if max := 20 * time.Second; tempDelay > max {
tempDelay = max
}
time.Sleep(tempDelay)
continue
}
break
}
route := &route{
remoteID: remoteID,
remoteUrl: url,
log.Debug("connect to router success :", zap.String("Router", b.config.Router))
cid := b.id
info := info{
clientID: cid,
keepalive: 60,
}
c := &client{
typ: CLUSTER,
broker: b,
conn: conn,
info: info,
}
c.init()
c.SendConnect()
c.SendInfo()
go c.readLoop()
go c.StartPing()
}
func (b *Broker) processClusterInfo() {
for {
msg, ok := <-b.clusterPool
if !ok {
log.Error("read message from cluster channel error")
return
}
ProcessMessage(msg)
}
}
func (b *Broker) connectRouter(id, addr string) {
var conn net.Conn
var err error
var timeDelay time.Duration = 0
retryTimes := 0
max := 32 * time.Second
for {
if !b.checkNodeExist(id, addr) {
return
}
conn, err = net.Dial("tcp", addr)
if err != nil {
log.Error("Error trying to connect to route: ", zap.Error(err))
if retryTimes > 50 {
return
}
log.Debug("Connect to route timeout ,retry...")
if 0 == timeDelay {
timeDelay = 1 * time.Second
} else {
timeDelay *= 2
}
if timeDelay > max {
timeDelay = max
}
time.Sleep(timeDelay)
retryTimes++
continue
}
break
}
route := route{
remoteID: id,
remoteUrl: addr,
}
cid := GenUniqueId()
info := info{
clientID: cid,
clientID: cid,
keepalive: 60,
}
c := &client{
typ: REMOTE,
conn: conn,
route: route,
info: info,
broker: b,
typ: REMOTE,
conn: conn,
route: route,
info: info,
}
c.init()
b.remotes.Store(cid, c)
c.SendConnect()
c.SendInfo()
c.StartPing()
// mpool := b.messagePool[fnv1a.HashString64(cid)%MessagePoolNum]
go c.readLoop()
go c.StartPing()
}
func (b *Broker) checkNodeExist(id, url string) bool {
if id == b.id {
return false
}
for k, v := range b.nodes {
if k == id {
return true
}
//skip
l, ok := v.(string)
if ok {
if url == l {
return true
}
}
}
return false
}
func (b *Broker) CheckRemoteExist(remoteID, url string) bool {
@@ -358,9 +520,7 @@ func (b *Broker) CheckRemoteExist(remoteID, url string) bool {
v, ok := value.(*client)
if ok {
if v.route.remoteUrl == url {
// if v.route.remoteID == "" || v.route.remoteID != remoteID {
v.route.remoteID = remoteID
// }
exist = true
return false
}
@@ -383,14 +543,16 @@ func (b *Broker) SendLocalSubsToRouter(c *client) {
}
return true
})
err := c.WriterPacket(subInfo)
if err != nil {
log.Error("Send localsubs To Router error :", err)
if len(subInfo.Topics) > 0 {
err := c.WriterPacket(subInfo)
if err != nil {
log.Error("Send localsubs To Router error :", zap.Error(err))
}
}
}
func (b *Broker) BroadcastInfoMessage(remoteID string, msg *packets.PublishPacket) {
b.remotes.Range(func(key, value interface{}) bool {
b.routes.Range(func(key, value interface{}) bool {
r, ok := value.(*client)
if ok {
if r.route.remoteID == remoteID {
@@ -405,7 +567,8 @@ func (b *Broker) BroadcastInfoMessage(remoteID string, msg *packets.PublishPacke
}
func (b *Broker) BroadcastSubOrUnsubMessage(packet packets.ControlPacket) {
b.remotes.Range(func(key, value interface{}) bool {
b.routes.Range(func(key, value interface{}) bool {
r, ok := value.(*client)
if ok {
r.WriterPacket(packet)
@@ -432,7 +595,6 @@ func (b *Broker) removeClient(c *client) {
func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
topic := packet.TopicName
r := b.sl.Match(topic)
// log.Info("psubs num: ", len(r.psubs))
if len(r.psubs) == 0 {
return
}
@@ -441,7 +603,7 @@ func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
if sub != nil {
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("process message for psub error, ", err)
log.Error("process message for psub error, ", zap.Error(err))
}
}
}
@@ -449,14 +611,12 @@ func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
func (b *Broker) BroadcastUnSubscribe(subs map[string]*subscription) {
ubsub := packets.NewControlPacket(packets.Unsubscribe).(*packets.UnsubscribePacket)
unsub := packets.NewControlPacket(packets.Unsubscribe).(*packets.UnsubscribePacket)
for topic, _ := range subs {
// topic := sub.topic
// if sub.queue {
// topic = "$queue/" + sub.topic
// }
ubsub.Topics = append(ubsub.Topics, topic)
unsub.Topics = append(unsub.Topics, topic)
}
b.BroadcastSubOrUnsubMessage(ubsub)
if len(unsub.Topics) > 0 {
b.BroadcastSubOrUnsubMessage(unsub)
}
}

View File

@@ -1,25 +1,28 @@
/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
*/
package broker
import (
"net"
"reflect"
"strings"
"sync"
"time"
"github.com/eclipse/paho.mqtt.golang/packets"
log "github.com/cihub/seelog"
"go.uber.org/zap"
)
const (
// special pub topic for cluster info BrokerInfoTopic
BrokerInfoTopic = "broker001info/brokerinfo"
BrokerInfoTopic = "broker000100101info"
// CLIENT is an end user.
CLIENT = 0
// ROUTER is another router in the cluster.
ROUTER = 1
//REMOTE is the router connect to other cluster
REMOTE = 2
REMOTE = 2
CLUSTER = 3
)
const (
Connected = 1
@@ -32,8 +35,9 @@ type client struct {
broker *Broker
conn net.Conn
info info
route *route
route route
status int
closed chan int
smu sync.RWMutex
subs map[string]*subscription
rsubs map[string]*subInfo
@@ -74,48 +78,74 @@ func (c *client) init() {
c.smu.Lock()
defer c.smu.Unlock()
c.status = Connected
typ := c.typ
if typ == ROUTER {
c.rsubs = make(map[string]*subInfo)
} else if typ == CLIENT {
c.subs = make(map[string]*subscription, 10)
}
c.closed = make(chan int, 1)
c.rsubs = make(map[string]*subInfo)
c.subs = make(map[string]*subscription, 10)
c.info.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0]
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
}
func (c *client) readLoop(msgPool *MessagePool) {
func (c *client) keepAlive(ch chan int) {
defer close(ch)
b := c.broker
keepalive := time.Duration(c.info.keepalive*3/2) * time.Second
timer := time.NewTimer(keepalive)
for {
select {
case <-ch:
timer.Reset(keepalive)
case <-timer.C:
if c.typ == REMOTE || c.typ == CLUSTER {
timer.Reset(keepalive)
continue
}
log.Error("Client exceeded timeout, disconnecting. ", zap.String("ClientID", c.info.clientID), zap.Uint16("keepalive", c.info.keepalive))
msg := &Message{client: c, packet: DisconnectdPacket}
b.SubmitWork(msg)
timer.Stop()
return
case _, ok := <-c.closed:
if !ok {
return
}
}
}
}
func (c *client) readLoop() {
nc := c.conn
if nc == nil || msgPool == nil {
b := c.broker
if nc == nil || b == nil {
return
}
lastIn := uint16(time.Now().Unix())
var nowTime uint16
ch := make(chan int, 1000)
go c.keepAlive(ch)
for {
nowTime = uint16(time.Now().Unix())
if 0 != c.info.keepalive && nowTime-lastIn > c.info.keepalive*3/2 {
log.Errorf("Client %s has exceeded timeout, disconnecting.\n", c.info.clientID)
msg := &Message{client: c, packet: DisconnectdPacket}
msgPool.queue <- msg
return
}
packet, err := packets.ReadPacket(nc)
if err != nil {
log.Error("read packet error: ", err)
msg := &Message{client: c, packet: DisconnectdPacket}
msgPool.queue <- msg
return
log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
break
}
// log.Info("recv buf: ", packet)
lastIn = uint16(time.Now().Unix())
// keepalive channel
ch <- 1
msg := &Message{
client: c,
packet: packet,
}
msgPool.queue <- msg
b.SubmitWork(msg)
}
msgPool.Reduce()
msg := &Message{client: c, packet: DisconnectdPacket}
b.SubmitWork(msg)
}
func ProcessMessage(msg *Message) {
@@ -124,45 +154,33 @@ func ProcessMessage(msg *Message) {
if ca == nil {
return
}
log.Debug("Recv message:", zap.String("message type", reflect.TypeOf(msg.packet).String()[9:]), zap.String("ClientID", c.info.clientID))
switch ca.(type) {
case *packets.ConnackPacket:
// log.Info("Recv conack message..........")
case *packets.ConnectPacket:
// log.Info("Recv connect message..........")
case *packets.PublishPacket:
// log.Info("Recv publish message..........")
packet := ca.(*packets.PublishPacket)
c.ProcessPublish(packet)
case *packets.PubackPacket:
//log.Info("Recv publish ack message..........")
case *packets.PubrecPacket:
//log.Info("Recv publish rec message..........")
case *packets.PubrelPacket:
//log.Info("Recv publish rel message..........")
case *packets.PubcompPacket:
//log.Info("Recv publish ack message..........")
case *packets.SubscribePacket:
// log.Info("Recv subscribe message.....")
packet := ca.(*packets.SubscribePacket)
c.ProcessSubscribe(packet)
case *packets.SubackPacket:
// log.Info("Recv suback message.....")
case *packets.UnsubscribePacket:
// log.Info("Recv unsubscribe message.....")
packet := ca.(*packets.UnsubscribePacket)
c.ProcessUnSubscribe(packet)
case *packets.UnsubackPacket:
//log.Info("Recv unsuback message.....")
case *packets.PingreqPacket:
// log.Info("Recv PINGREQ message..........")
c.ProcessPing()
case *packets.PingrespPacket:
//log.Info("Recv PINGRESP message..........")
case *packets.DisconnectPacket:
// log.Info("Recv DISCONNECT message.......")
c.Close()
default:
log.Info("Recv Unknow message.......")
log.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID))
}
}
@@ -172,8 +190,13 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
}
topic := packet.TopicName
if topic == BrokerInfoTopic && c.typ == CLUSTER {
c.ProcessInfo(packet)
return
}
if !c.CheckTopicAuth(PUB, topic) {
log.Error("Pub Topics Auth failed, ", topic)
log.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
return
}
@@ -184,21 +207,21 @@ func (c *client) ProcessPublish(packet *packets.PublishPacket) {
puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
puback.MessageID = packet.MessageID
if err := c.WriterPacket(puback); err != nil {
log.Error("send puback error, ", err)
log.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
c.ProcessPublishMessage(packet)
case QosExactlyOnce:
return
default:
log.Error("publish with unknown qos")
log.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID))
return
}
if packet.Retain {
if b := c.broker; b != nil {
err := b.rl.Insert(topic, packet)
if err != nil {
log.Error("Insert Retain Message error: ", err)
log.Error("Insert Retain Message error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
}
}
}
@@ -225,14 +248,14 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
for _, sub := range r.psubs {
if sub.client.typ == ROUTER {
if typ == ROUTER {
if typ != CLIENT {
continue
}
}
if sub != nil {
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("process message for psub error, ", err)
log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
}
}
}
@@ -245,7 +268,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
// log.Info("queue index : ", cnt)
for _, sub := range r.qsubs {
if sub.client.typ == ROUTER {
if c.typ == ROUTER {
if typ != CLIENT {
continue
}
}
@@ -258,7 +281,7 @@ func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if sub != nil {
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("send publish error, ", err)
log.Error("send publish error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
}
}
@@ -312,7 +335,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
t := topic
//check topic auth for client
if !c.CheckTopicAuth(SUB, topic) {
log.Error("Sub topic Auth failed: ", topic)
log.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
retcodes = append(retcodes, QosFailure)
continue
}
@@ -359,7 +382,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
}
err := b.sl.Insert(sub)
if err != nil {
log.Error("Insert subscription error: ", err)
log.Error("Insert subscription error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
retcodes = append(retcodes, QosFailure)
} else {
retcodes = append(retcodes, qoss[i])
@@ -369,7 +392,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
err := c.WriterPacket(suback)
if err != nil {
log.Error("send suback error, ", err)
log.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
//broadcast subscribe message
@@ -381,7 +404,7 @@ func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
for _, t := range topics {
packets := b.rl.Match(t)
for _, packet := range packets {
log.Info("process retain message: ", packet)
log.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID))
if packet != nil {
c.WriterPacket(packet)
}
@@ -401,29 +424,25 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
topics := packet.Topics
for _, t := range topics {
var sub *subscription
ok := false
switch typ {
case CLIENT:
sub, ok = c.subs[t]
sub, ok := c.subs[t]
if ok {
c.unsubscribe(sub)
}
case ROUTER:
subinfo, ok := c.rsubs[t]
if ok {
subinfo.num = subinfo.num - 1
if subinfo.num < 1 {
sub = subinfo.sub
delete(c.rsubs, t)
c.unsubscribe(subinfo.sub)
} else {
c.rsubs[t] = subinfo
sub = nil
}
} else {
return
}
}
if ok {
go c.unsubscribe(sub)
}
}
@@ -432,7 +451,7 @@ func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
err := c.WriterPacket(unsuback)
if err != nil {
log.Error("send unsuback error, ", err)
log.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
// //process ubsubscribe message
@@ -461,19 +480,30 @@ func (c *client) ProcessPing() {
resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket)
err := c.WriterPacket(resp)
if err != nil {
log.Error("send PingResponse error, ", err)
log.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
}
func (c *client) Close() {
c.smu.Lock()
if c.status == Disconnected {
c.smu.Unlock()
return
}
//wait for message complete
time.Sleep(1 * time.Second)
c.status = Disconnected
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
c.smu.Unlock()
close(c.closed)
b := c.broker
subs := c.subs
if b != nil {
@@ -481,7 +511,7 @@ func (c *client) Close() {
for _, sub := range subs {
err := b.sl.Remove(sub)
if err != nil {
log.Error("closed client but remove sublist error, ", err)
log.Error("closed client but remove sublist error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
}
}
if c.typ == CLIENT {
@@ -490,10 +520,23 @@ func (c *client) Close() {
if c.info.willMsg != nil {
b.PublishMessage(c.info.willMsg)
}
if c.typ == CLUSTER {
b.ConnectToDiscovery()
}
//do reconnect
if c.typ == REMOTE {
go b.connectRouter(c.route.remoteID, c.route.remoteUrl)
}
}
}
func (c *client) WriterPacket(packet packets.ControlPacket) error {
if c == nil || packet == nil {
return nil
}
c.mu.Lock()
err := packet.Write(c.conn)
c.mu.Unlock()

View File

@@ -1,3 +1,5 @@
/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
*/
package broker
import (

View File

@@ -1,17 +1,18 @@
/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
*/
package broker
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"flag"
"fmt"
"github.com/fhmq/hmq/logger"
"go.uber.org/zap"
"io/ioutil"
log "github.com/cihub/seelog"
)
const (
CONFIGFILE = "hmq.config"
"os"
)
type Config struct {
@@ -19,6 +20,7 @@ type Config struct {
Host string `json:"host"`
Port string `json:"port"`
Cluster RouteInfo `json:"cluster"`
Router string `json:"router"`
TlsHost string `json:"tlsHost"`
TlsPort string `json:"tlsPort"`
WsPath string `json:"wsPath"`
@@ -27,12 +29,12 @@ type Config struct {
TlsInfo TLSInfo `json:"tlsInfo"`
Acl bool `json:"acl"`
AclConf string `json:"aclConf"`
Debug bool `json:"-"`
}
type RouteInfo struct {
Host string `json:"host"`
Port string `json:"port"`
Routes []string `json:"routes"`
Host string `json:"host"`
Port string `json:"port"`
}
type TLSInfo struct {
@@ -42,11 +44,94 @@ type TLSInfo struct {
KeyFile string `json:"keyFile"`
}
func LoadConfig() (*Config, error) {
var DefaultConfig *Config = &Config{
Worker: 4096,
Host: "0.0.0.0",
Port: "1883",
Acl: false,
}
content, err := ioutil.ReadFile(CONFIGFILE)
var (
log *zap.Logger
)
func showHelp() {
fmt.Printf("%s\n", usageStr)
os.Exit(0)
}
func ConfigureConfig(args []string) (*Config, error) {
config := &Config{}
var (
help bool
configFile string
)
fs := flag.NewFlagSet("hmq-broker", flag.ExitOnError)
fs.Usage = showHelp
fs.BoolVar(&help, "h", false, "Show this message.")
fs.BoolVar(&help, "help", false, "Show this message.")
fs.IntVar(&config.Worker, "w", 1024, "worker num to process message, perfer (client num)/10.")
fs.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.")
fs.StringVar(&config.Port, "port", "1883", "Port to listen on.")
fs.StringVar(&config.Port, "p", "1883", "Port to listen on.")
fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on")
fs.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.")
fs.StringVar(&config.Cluster.Port, "clusterport", "", "Cluster port from which members can connect.")
fs.StringVar(&config.Router, "r", "", "Router who maintenance cluster info")
fs.StringVar(&config.Router, "router", "", "Router who maintenance cluster info")
fs.StringVar(&config.WsPort, "ws", "", "port for ws to listen on")
fs.StringVar(&config.WsPort, "wsport", "", "port for ws to listen on")
fs.StringVar(&config.WsPath, "wsp", "", "path for ws to listen on")
fs.StringVar(&config.WsPath, "wspath", "", "path for ws to listen on")
fs.StringVar(&configFile, "config", "", "config file for hmq")
fs.StringVar(&configFile, "c", "", "config file for hmq")
fs.BoolVar(&config.Debug, "debug", false, "enable Debug logging.")
fs.BoolVar(&config.Debug, "d", false, "enable Debug logging.")
fs.Bool("D", true, "enable Debug logging.")
if err := fs.Parse(args); err != nil {
return nil, err
}
if help {
showHelp()
return nil, nil
}
fs.Visit(func(f *flag.Flag) {
switch f.Name {
case "D":
config.Debug = true
}
})
logger.InitLogger(config.Debug)
log = logger.Get().Named("Broker")
if configFile != "" {
tmpConfig, e := LoadConfig(configFile)
if e != nil {
return nil, e
} else {
config = tmpConfig
}
}
if err := config.check(); err != nil {
return nil, err
}
return config, nil
}
func LoadConfig(filename string) (*Config, error) {
content, err := ioutil.ReadFile(filename)
if err != nil {
log.Error("Read config file error: ", err)
log.Error("Read config file error: ", zap.Error(err))
return nil, err
}
// log.Info(string(content))
@@ -54,16 +139,19 @@ func LoadConfig() (*Config, error) {
var config Config
err = json.Unmarshal(content, &config)
if err != nil {
log.Error("Unmarshal config file error: ", err)
log.Error("Unmarshal config file error: ", zap.Error(err))
return nil, err
}
return &config, nil
}
func (config *Config) check() error {
if config.Worker == 0 {
config.Worker = 1024
}
WorkNum = config.Worker
if config.Port != "" {
if config.Host == "" {
config.Host = "0.0.0.0"
@@ -75,29 +163,33 @@ func LoadConfig() (*Config, error) {
config.Cluster.Host = "0.0.0.0"
}
}
if config.Router != "" {
if config.Cluster.Port == "" {
return errors.New("cluster port is null")
}
}
if config.TlsPort != "" {
if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" {
log.Error("tls config error, no cert or key file.")
return nil, err
return errors.New("tls config error, no cert or key file.")
}
if config.TlsHost == "" {
config.TlsHost = "0.0.0.0"
}
}
return &config, nil
return nil
}
func NewTLSConfig(tlsInfo TLSInfo) (*tls.Config, error) {
cert, err := tls.LoadX509KeyPair(tlsInfo.CertFile, tlsInfo.KeyFile)
if err != nil {
return nil, fmt.Errorf("error parsing X509 certificate/key pair: %v", err)
return nil, fmt.Errorf("error parsing X509 certificate/key pair: %v", zap.Error(err))
}
cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
if err != nil {
return nil, fmt.Errorf("error parsing certificate: %v", err)
return nil, fmt.Errorf("error parsing certificate: %v", zap.Error(err))
}
// Create TLSConfig

View File

@@ -1,48 +0,0 @@
package broker
// const (
// WorkNum = 4096
// )
var WorkNum int
type Dispatcher struct {
WorkerPool chan chan *Message
}
func StartDispatcher() {
InitMessagePool()
dispatcher := NewDispatcher()
dispatcher.Run()
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < WorkNum; i++ {
worker := NewWorker(d.WorkerPool)
worker.Start()
}
go d.dispatch()
}
func NewDispatcher() *Dispatcher {
pool := make(chan chan *Message, WorkNum)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) dispatch() {
for i := 0; i < MessagePoolNum; i++ {
go func(idx int) {
for {
select {
case msg := <-MSGPool[idx].queue:
go func(msg *Message) {
msgChannel := <-d.WorkerPool
msgChannel <- msg
}(msg)
}
}
}(i)
}
}

View File

@@ -1,13 +1,13 @@
/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
*/
package broker
import (
"fmt"
"time"
"github.com/eclipse/paho.mqtt.golang/packets"
simplejson "github.com/bitly/go-simplejson"
log "github.com/cihub/seelog"
"github.com/eclipse/paho.mqtt.golang/packets"
"go.uber.org/zap"
"time"
)
func (c *client) SendInfo() {
@@ -19,28 +19,33 @@ func (c *client) SendInfo() {
infoMsg := NewInfo(c.broker.id, url, false)
err := c.WriterPacket(infoMsg)
if err != nil {
log.Error("send info message error, ", err)
log.Error("send info message error, ", zap.Error(err))
return
}
// log.Info("send info success")
}
func (c *client) StartPing() {
timeTicker := time.NewTicker(time.Second * 30)
timeTicker := time.NewTicker(time.Second * 50)
ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket)
for {
select {
case <-timeTicker.C:
err := c.WriterPacket(ping)
if err != nil {
log.Error("ping error: ", err)
log.Error("ping error: ", zap.Error(err))
c.Close()
}
case _, ok := <-c.closed:
if !ok {
return
}
}
}
}
func (c *client) SendConnect() {
if c.status == Disconnected {
if c.status != Connected {
return
}
m := packets.NewControlPacket(packets.Connect).(*packets.ConnectPacket)
@@ -50,10 +55,10 @@ func (c *client) SendConnect() {
m.Keepalive = uint16(60)
err := c.WriterPacket(m)
if err != nil {
log.Error("send connect message error, ", err)
log.Error("send connect message error, ", zap.Error(err))
return
}
// log.Info("send connet success")
log.Info("send connect success")
}
func NewInfo(sid, url string, isforword bool) *packets.PublishPacket {
@@ -61,7 +66,7 @@ func NewInfo(sid, url string, isforword bool) *packets.PublishPacket {
pub.Qos = 0
pub.TopicName = BrokerInfoTopic
pub.Retain = false
info := fmt.Sprintf(`{"remoteID":"%s","url":"%s","isForward":%t}`, sid, url, isforword)
info := fmt.Sprintf(`{"brokerID":"%s","brokerUrl":"%s"}`, sid, url)
// log.Info("new info", string(info))
pub.Payload = []byte(info)
return pub
@@ -74,47 +79,36 @@ func (c *client) ProcessInfo(packet *packets.PublishPacket) {
return
}
log.Info("recv remoteInfo: ", string(packet.Payload))
log.Info("recv remoteInfo: ", zap.String("payload", string(packet.Payload)))
js, e := simplejson.NewJson(packet.Payload)
if e != nil {
log.Warn("parse info message err", e)
js, err := simplejson.NewJson(packet.Payload)
if err != nil {
log.Warn("parse info message err", zap.Error(err))
return
}
rid := js.Get("remoteID").MustString()
rurl := js.Get("url").MustString()
isForward := js.Get("isForward").MustBool()
if rid == "" {
log.Error("receive info message error with remoteID is null")
routes, err := js.Get("data").Map()
if routes == nil {
log.Error("receive info message error, ", zap.Error(err))
return
}
if rid == b.id {
if !isForward {
c.Close() //close connet self
b.nodes = routes
b.mu.Lock()
for rid, rurl := range routes {
if rid == b.id {
continue
}
return
}
exist := b.CheckRemoteExist(rid, rurl)
if !exist {
go b.connectRouter(rurl, rid)
}
// log.Info("isforword: ", isForward)
if !isForward {
route := &route{
remoteUrl: rurl,
remoteID: rid,
url, ok := rurl.(string)
if ok {
exist := b.CheckRemoteExist(rid, url)
if !exist {
b.connectRouter(rid, url)
}
}
c.route = route
go b.SendLocalSubsToRouter(c)
// log.Info("BroadcastInfoMessage starting... ")
infoMsg := NewInfo(rid, rurl, true)
b.BroadcastInfoMessage(rid, infoMsg)
}
return
b.mu.Unlock()
}

View File

@@ -1,62 +0,0 @@
package broker
import (
"sync"
"github.com/eclipse/paho.mqtt.golang/packets"
)
const (
MaxUser = 1024 * 1024
MessagePoolNum = 1024
MessagePoolUser = MaxUser / MessagePoolNum
MessagePoolMessageNum = MaxUser / MessagePoolNum * 4
)
type Message struct {
client *client
packet packets.ControlPacket
}
var (
MSGPool []MessagePool
)
type MessagePool struct {
l sync.Mutex
maxuser int
user int
queue chan *Message
}
func InitMessagePool() {
MSGPool = make([]MessagePool, (MessagePoolNum + 2))
for i := 0; i < (MessagePoolNum + 2); i++ {
MSGPool[i].Init(MessagePoolUser, MessagePoolMessageNum)
}
}
func (p *MessagePool) Init(num int, maxusernum int) {
p.maxuser = maxusernum
p.queue = make(chan *Message, num)
}
func (p *MessagePool) GetPool() *MessagePool {
p.l.Lock()
if p.user+1 < p.maxuser {
p.user += 1
p.l.Unlock()
return p
} else {
p.l.Unlock()
return nil
}
}
func (p *MessagePool) Reduce() {
p.l.Lock()
p.user -= 1
p.l.Unlock()
}

View File

@@ -1,9 +1,8 @@
package broker
import (
"sync"
"github.com/eclipse/paho.mqtt.golang/packets"
"sync"
)
type RetainList struct {

View File

@@ -1,10 +1,11 @@
/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
*/
package broker
import (
"errors"
"go.uber.org/zap"
"sync"
log "github.com/cihub/seelog"
)
// A result structure better optimized for queue subs.
@@ -209,7 +210,7 @@ func (s *Sublist) Match(topic string) *SublistResult {
tokens, err := PublishTopicCheckAndSpilt(topic)
if err != nil {
log.Error("\tserver/sublist.go: ", err)
log.Error("\tserver/sublist.go: ", zap.Error(err))
return nil
}
@@ -241,7 +242,6 @@ func (s *Sublist) Match(topic string) *SublistResult {
}
s.Unlock()
// log.Info("SublistResult: ", result)
return result
}
@@ -294,7 +294,6 @@ func removeSubFromList(sub *subscription, sl []*subscription) ([]*subscription,
sl[i] = sl[last]
sl[last] = nil
sl = sl[:last]
// log.Info("removeSubFromList success")
return shrinkAsNeeded(sl), true
}
}

24
broker/usage.go Normal file
View File

@@ -0,0 +1,24 @@
package broker
var usageStr = `
Usage: hmq [options]
Broker Options:
-w, --worker <number> Worker num to process message, perfer (client num)/10. (default 1024)
-p, --port <port> Use port for clients (default: 1883)
--host <host> Network host to listen on. (default "0.0.0.0")
-ws, --wsport <port> Use port for websocket monitoring
-wsp,--wspath <path> Use path for websocket monitoring
-c, --config <file> Configuration file
Logging Options:
-d, --debug <bool> Enable debugging output (default false)
-D Debug and trace
Cluster Options:
-r, --router <rurl> Router who maintenance cluster info
-cp, --clusterport <cluster-port> Cluster listen port for others
Common Options:
-h, --help Show this message
`

View File

@@ -1,37 +0,0 @@
package broker
type Worker struct {
WorkerPool chan chan *Message
MsgChannel chan *Message
quit chan bool
}
func NewWorker(workerPool chan chan *Message) Worker {
return Worker{
WorkerPool: workerPool,
MsgChannel: make(chan *Message),
quit: make(chan bool)}
}
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.MsgChannel
select {
case msg := <-w.MsgChannel:
// we have received a work request.
ProcessMessage(msg)
case <-w.quit:
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}

View File

@@ -4,9 +4,9 @@
"host": "0.0.0.0",
"cluster": {
"host": "0.0.0.0",
"port": "1993",
"routes": []
"port": "1993"
},
"router": "127.0.0.1:9888",
"tlsPort": "8883",
"tlsHost": "0.0.0.0",
"wsPort": "1888",

View File

@@ -1,3 +1,5 @@
/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
*/
package acl
import (

View File

@@ -1,3 +1,4 @@
/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>*/
package acl
import "strings"

View File

@@ -1,3 +1,5 @@
/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
*/
package acl
import (

50
logger/logger.go Normal file
View File

@@ -0,0 +1,50 @@
/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
*/
package logger
import (
"go.uber.org/zap"
)
var (
// env can be setup at build time with Go Linker. Value could be prod or whatever else for dev env
instance *zap.Logger
logCfg zap.Config
)
// NewDevLogger return a logger for dev builds
func NewDevLogger() (*zap.Logger, error) {
logCfg := zap.NewDevelopmentConfig()
return logCfg.Build()
}
// NewProdLogger return a logger for production builds
func NewProdLogger() (*zap.Logger, error) {
logCfg := zap.NewProductionConfig()
logCfg.DisableStacktrace = true
logCfg.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
return logCfg.Build()
}
func InitLogger(debug bool) {
var err error
var log *zap.Logger
if debug {
log, err = NewDevLogger()
} else {
log, err = NewProdLogger()
}
if err != nil {
panic("Unable to create a logger.")
}
defer log.Sync()
log.Debug("Logger initialization succeeded")
instance = log.Named("hmq")
}
// Get return a *zap.Logger instance
func Get() *zap.Logger {
return instance
}

33
logger/logger_test.go Normal file
View File

@@ -0,0 +1,33 @@
/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
*/
package logger
import (
"testing"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestGet(t *testing.T) {
var l *zap.Logger
logger := Get()
assert.NotNil(t, logger)
assert.IsType(t, l, logger)
}
func TestNewDevLogger(t *testing.T) {
logger, err := NewDevLogger()
assert.Nil(t, err)
assert.True(t, logger.Core().Enabled(zap.DebugLevel))
}
func TestNewProdLogger(t *testing.T) {
logger, err := NewProdLogger()
assert.Nil(t, err)
assert.False(t, logger.Core().Enabled(zap.DebugLevel))
}

23
main.go
View File

@@ -1,33 +1,36 @@
/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
*/
package main
import (
"hmq/broker"
"fmt"
"github.com/fhmq/hmq/broker"
"os"
"os/signal"
"runtime"
log "github.com/cihub/seelog"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
runtime.GC()
config, er := broker.LoadConfig()
if er != nil {
log.Error("Load Config file error: ", er)
config, err := broker.ConfigureConfig(os.Args[1:])
if err != nil {
fmt.Println("configure broker config error: ", err)
return
}
broker.StartDispatcher()
b, err := broker.NewBroker(config)
if err != nil {
log.Error("New Broker error: ", er)
fmt.Println("New Broker error: ", err)
return
}
b.Start()
s := waitForSignal()
log.Infof("signal got: %v ,broker closed.", s)
fmt.Println("signal received, broker closed.", s)
}
func waitForSignal() os.Signal {

166
pool/pool.go Normal file
View File

@@ -0,0 +1,166 @@
package pool
import "time"
const (
// This value is the size of the queue that workers register their
// availability to the dispatcher. There may be hundreds of workers, but
// only a small channel is needed to register some of the workers.
readyQueueSize = 16
// If worker pool receives no new work for this period of time, then stop
// a worker goroutine.
idleTimeoutSec = 5
)
type WorkerPool struct {
maxWorkers int
timeout time.Duration
taskQueue chan func()
readyWorkers chan chan func()
stoppedChan chan struct{}
}
func New(maxWorkers int) *WorkerPool {
// There must be at least one worker.
if maxWorkers < 1 {
maxWorkers = 1
}
// taskQueue is unbuffered since items are always removed immediately.
pool := &WorkerPool{
taskQueue: make(chan func()),
maxWorkers: maxWorkers,
readyWorkers: make(chan chan func(), readyQueueSize),
timeout: time.Second * idleTimeoutSec,
stoppedChan: make(chan struct{}),
}
// Start the task dispatcher.
go pool.dispatch()
return pool
}
func (p *WorkerPool) Stop() {
if p.Stopped() {
return
}
close(p.taskQueue)
<-p.stoppedChan
}
func (p *WorkerPool) Stopped() bool {
select {
case <-p.stoppedChan:
return true
default:
}
return false
}
func (p *WorkerPool) Submit(task func()) {
if task != nil {
p.taskQueue <- task
}
}
func (p *WorkerPool) SubmitWait(task func()) {
if task == nil {
return
}
doneChan := make(chan struct{})
p.taskQueue <- func() {
task()
close(doneChan)
}
<-doneChan
}
func (p *WorkerPool) dispatch() {
defer close(p.stoppedChan)
timeout := time.NewTimer(p.timeout)
var workerCount int
var task func()
var ok bool
var workerTaskChan chan func()
startReady := make(chan chan func())
Loop:
for {
timeout.Reset(p.timeout)
select {
case task, ok = <-p.taskQueue:
if !ok {
break Loop
}
// Got a task to do.
select {
case workerTaskChan = <-p.readyWorkers:
// A worker is ready, so give task to worker.
workerTaskChan <- task
default:
// No workers ready.
// Create a new worker, if not at max.
if workerCount < p.maxWorkers {
workerCount++
go func(t func()) {
startWorker(startReady, p.readyWorkers)
// Submit the task when the new worker.
taskChan := <-startReady
taskChan <- t
}(task)
} else {
// Start a goroutine to submit the task when an existing
// worker is ready.
go func(t func()) {
taskChan := <-p.readyWorkers
taskChan <- t
}(task)
}
}
case <-timeout.C:
// Timed out waiting for work to arrive. Kill a ready worker.
if workerCount > 0 {
select {
case workerTaskChan = <-p.readyWorkers:
// A worker is ready, so kill.
close(workerTaskChan)
workerCount--
default:
// No work, but no ready workers. All workers are busy.
}
}
}
}
// Stop all remaining workers as they become ready.
for workerCount > 0 {
workerTaskChan = <-p.readyWorkers
close(workerTaskChan)
workerCount--
}
}
func startWorker(startReady, readyWorkers chan chan func()) {
go func() {
taskChan := make(chan func())
var task func()
var ok bool
// Register availability on starReady channel.
startReady <- taskChan
for {
// Read task from dispatcher.
task, ok = <-taskChan
if !ok {
// Dispatcher has told worker to stop.
break
}
// Execute the task.
task()
// Register availability on readyWorkers channel.
readyWorkers <- taskChan
}
}()
}