This commit is contained in:
zhouyuyan
2017-08-25 15:28:41 +08:00
parent 0bc226d071
commit 0ea7da1dc0
8 changed files with 91 additions and 25 deletions

4
broker.config Normal file
View File

@@ -0,0 +1,4 @@
{
"port": "1883",
"host": "0.0.0.0"
}

View File

@@ -9,24 +9,32 @@ import (
)
type Broker struct {
id string
config *Config
remote map[string]*client
sl *Sublist
rl *RetainList
queues map[string]int
}
func NewBroker() *Broker {
func NewBroker(config *Config) *Broker {
return &Broker{
config: config,
sl: NewSublist(),
rl: NewRetainList(),
queues: make(map[string]int),
remote: make(map[string]*client),
}
}
func (b *Broker) StartListening() {
l, e := net.Listen("tcp", "0.0.0.0:1883")
hp := b.config.Host + ":" + b.config.Port
l, e := net.Listen("tcp", hp)
if e != nil {
log.Error("Error listening on ", e)
return
}
log.Info("Start Listening client on ", hp)
tmpDelay := 10 * ACCEPT_MIN_SLEEP
num := 0
for {

View File

@@ -7,7 +7,7 @@ import (
"strings"
"sync"
"github.com/prometheus/common/log"
log "github.com/cihub/seelog"
)
type client struct {
@@ -130,6 +130,15 @@ func (c *client) ProcessPublish(buf []byte) {
}
c.ProcessPublishMessage(buf, msg)
if msg.Retain() {
if b := c.broker; b != nil {
err := b.rl.Insert(msg.Topic(), buf)
if err != nil {
log.Error("Insert Retain Message error: ", err)
}
}
}
}
func (c *client) ProcessPublishMessage(buf []byte, msg *message.PublishMessage) {
@@ -283,18 +292,17 @@ func (c *client) ProcessSubscribe(buf []byte) {
// srv.BroadcastSubscribeMessage(buf)
// })
// }
//process retain message
// for _, t := range topics {
// srv.startGoRoutine(func() {
// bufs := srv.rl.Match(t)
// for _, buf := range bufs {
// log.Info("process retain message: ", string(buf))
// if buf != nil && string(buf) != "" {
// c.writeBuffer(buf)
// }
// }
// })
// }
for _, t := range topics {
bufs := srv.rl.Match(t)
for _, buf := range bufs {
log.Info("process retain message: ", string(buf))
if buf != nil && string(buf) != "" {
c.writeBuffer(buf)
}
}
}
}
func (c *client) ProcessUnSubscribe(buf []byte) {
@@ -366,16 +374,11 @@ func (c *client) Close() {
srv := c.broker
subs := c.subs
if srv != nil {
// srv.removeClient(c)
for _, sub := range subs {
// log.Info("remove Sub")
err := srv.sl.Remove(sub)
if err != nil {
log.Error("closed client but remove sublist error, ", err)
}
// if c.typ == CLIENT {
// srv.BroadcastUnSubscribe(sub)
// }
}
}
if c.conn != nil {

42
broker/config.go Normal file
View File

@@ -0,0 +1,42 @@
package broker
import (
"encoding/json"
"errors"
"io/ioutil"
"github.com/prometheus/common/log"
)
const (
CONFIGFILE = "broker.config"
)
type Config struct {
Host string `json:"host"`
Port string `json:"port"`
}
func LoadConfig() (*Config, error) {
content, err := ioutil.ReadFile(CONFIGFILE)
if err != nil {
log.Error("Read config file error: ", err)
return nil, err
}
var info Config
err = json.Unmarshal(content, &info)
if err != nil {
log.Error("Unmarshal config file error: ", err)
return nil, err
}
if info.Port != "" {
if info.Host == "" {
info.Host = "0.0.0.0"
}
} else {
return nil, errors.New("Listen port nil")
}
return &info, nil
}

View File

@@ -1,11 +1,7 @@
package broker
const (
WorkNum = 2048
MaxUser = 1024 * 1024
MessagePoolNum = 1024
MessagePoolUser = MaxUser / MessagePoolNum
MessagePoolMessageNum = MaxUser / MessagePoolNum * 4
WorkNum = 2048
)
type Dispatcher struct {

View File

@@ -2,6 +2,13 @@ package broker
import "sync"
const (
MaxUser = 1024 * 1024
MessagePoolNum = 1024
MessagePoolUser = MaxUser / MessagePoolNum
MessagePoolMessageNum = MaxUser / MessagePoolNum * 4
)
type Message struct {
client *client
msg []byte

BIN
fhmq Executable file

Binary file not shown.

View File

@@ -9,7 +9,13 @@ import (
)
func main() {
broker := broker.NewBroker()
config, er := broker.LoadConfig()
if er != nil {
log.Error("Load Config file error: ", er)
return
}
broker := broker.NewBroker(config)
broker.StartListening()
s := waitForSignal()