This commit is contained in:
zhouyuyan
2017-08-23 19:24:27 +08:00
parent fe936d0e37
commit 00abbac00f
5 changed files with 141 additions and 24 deletions

View File

@@ -1,6 +1,7 @@
package broker
import (
"fhmq/lib/message"
"net"
"time"
@@ -20,6 +21,7 @@ func (b *Broker) StartListening() {
return
}
tmpDelay := 10 * ACCEPT_MIN_SLEEP
num := 0
for {
conn, err := l.Accept()
if err != nil {
@@ -37,11 +39,12 @@ func (b *Broker) StartListening() {
continue
}
tmpDelay = ACCEPT_MIN_SLEEP
go handleConnection(conn)
num += 1
go b.handleConnection(conn, num)
}
}
func handleConnection(conn net.Conn) {
func (b *Broker) handleConnection(conn net.Conn, idx int) {
//process connect packet
buf, err := ReadPacket(conn)
@@ -54,8 +57,29 @@ func handleConnection(conn net.Conn) {
log.Error(err)
return
}
}
func (b *Broker) NewClient() {
willmsg := message.NewPublishMessage()
if connMsg.WillFlag() {
willmsg.SetQoS(connMsg.WillQos())
willmsg.SetPayload(connMsg.WillMessage())
willmsg.SetRetain(connMsg.WillRetain())
willmsg.SetTopic(connMsg.WillTopic())
willmsg.SetDup(false)
} else {
willmsg = nil
}
info := info{
clientID: connMsg.ClientId(),
username: connMsg.Username(),
password: connMsg.Password(),
keepalive: connMsg.KeepAlive(),
willMsg: willmsg,
}
c := &client{
broker: b,
conn: conn,
info: info,
msgPool: MSGPool[idx%MessagePoolNum].GetPool(),
}
c.init()
c.readLoop()
}

View File

@@ -4,25 +4,55 @@ import (
"fhmq/lib/message"
"net"
"strings"
"github.com/prometheus/common/log"
)
type client struct {
broker Broker
broker *Broker
conn net.Conn
info info
localIP string
remoteIP string
msgPool *MessagePool
woker Worker
}
type info struct {
clientID []byte
username []byte
password []byte
keeplive uint16
willMsg *message.PublishMessage
clientID []byte
username []byte
password []byte
keepalive uint16
willMsg *message.PublishMessage
}
type Worker struct {
WorkerPool chan chan Message
MsgChannel chan Message
quit chan bool
}
func (c *client) init() {
c.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0]
c.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
}
func (c *client) readLoop() {
nc := c.conn
msgPool := c.msgPool
if nc == nil || msgPool == nil {
return
}
cid := c.info.clientID
for {
msg := msgPool.Pop()
buf, err := ReadPacket(nc)
if err != nil {
log.Error("read packet error: ", err)
return
}
msg.user = cid
msg.buf = buf
}
}

57
broker/msgpool.go Normal file
View File

@@ -0,0 +1,57 @@
package broker
import "sync"
type Message struct {
client *client
msg []byte
pool *MessagePool
}
type MessagePool struct {
l sync.Mutex
maxuser int
user int
queue chan *Message
}
func (p *MessagePool) Init(num int, maxusernum int) {
p.maxuser = maxusernum
p.queue = make(chan *Message, num)
for k := 0; k < num; k++ {
m := &Message{}
m.Pool = p
p.Push(m)
}
}
func (p *MessagePool) GetPool() *MessagePool {
p.l.Lock()
if p.user+1 < p.maxuser {
p.user += 1
p.l.Unlock()
return p
} else {
p.l.Unlock()
return nil
}
}
func (p *MessagePool) Reduce() {
p.l.Lock()
p.user -= 1
p.l.Unlock()
}
func (p *MessagePool) Pop() *Message {
p2 := <-p.queue
return p2
}
func (p *MessagePool) Push(pmessage *Message) {
p.queue <- pmessage
}

View File

@@ -1,11 +0,0 @@
package lib
// type MessagePool struct {
// sync.Mutex
// queue chan *Message
// }
// func (p *MessagePool) Init(len int, maxusernum int) {
// p.maxuser = maxusernum
// p.queue = make(chan *Message, len)
// }

19
main.go
View File

@@ -8,7 +8,23 @@ import (
log "github.com/cihub/seelog"
)
//https://github.com/xiaojiaqi/10billionhongbaos
const (
MaxUser = 1024 * 1024
MessagePoolNum = 1024
MessagePoolUser = MaxUser / MessagePoolNum
MessagePoolMessageNum = MaxUser / MessagePoolNum * 4
)
var (
MSGPool []MessagePool
)
func init() {
MSGPool = make([]MessagePool, (MessagePoolNum + 2))
for i := 0; i < (MessagePoolNum + 2); i++ {
MSGPool[i].Init(MessagePoolUser, MessagePoolMessageNum)
}
}
func main() {
broker := broker.NewBroker()
broker.StartListening()
@@ -16,6 +32,7 @@ func main() {
s := waitForSignal()
log.Infof("signal got: %v ,broker closed.", s)
}
func waitForSignal() os.Signal {
signalChan := make(chan os.Signal, 1)
defer close(signalChan)