This commit is contained in:
zhouyuyan
2017-08-23 21:10:38 +08:00
parent 922a10fd66
commit e73776d670
6 changed files with 101 additions and 56 deletions

View File

@@ -73,14 +73,17 @@ func (b *Broker) handleConnection(conn net.Conn, idx int) {
keepalive: connMsg.KeepAlive(),
willMsg: willmsg,
}
pool := MSGPool[idx%MessagePoolNum].GetPool()
c := &client{
broker: b,
conn: conn,
info: info,
msgPool: pool,
broker: b,
conn: conn,
info: info,
}
c.init()
c.readLoop()
pool.Reduce()
c.woker = Worker{
WorkerPool: MyDispatcher,
MsgChannel: make(chan *Message),
quit: make(chan bool),
}
c.readLoop(idx)
}

View File

@@ -14,7 +14,6 @@ type client struct {
info info
localIP string
remoteIP string
msgPool *MessagePool
woker Worker
}
@@ -26,33 +25,26 @@ type info struct {
willMsg *message.PublishMessage
}
type Worker struct {
WorkerPool chan chan Message
MsgChannel chan Message
quit chan bool
}
func (c *client) init() {
c.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0]
c.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
}
func (c *client) readLoop() {
func (c *client) readLoop(idx int) {
nc := c.conn
msgPool := c.msgPool
msgPool := MSGPool[idx%MessagePoolNum].GetPool()
if nc == nil || msgPool == nil {
return
}
cid := c.info.clientID
for {
msg := msgPool.Pop()
buf, err := ReadPacket(nc)
if err != nil {
log.Error("read packet error: ", err)
return
}
msg.user = cid
msg.client = c
msg.buf = buf
msgPool.Push(msg)
}
msgPool.Reduce()
}

43
broker/dispatcher.go Normal file
View File

@@ -0,0 +1,43 @@
package broker
const (
WorkPoolNum = 1024
MaxUser = 1024 * 1024
MessagePoolNum = 1024
MessagePoolUser = MaxUser / MessagePoolNum
MessagePoolMessageNum = MaxUser / MessagePoolNum * 4
// MessageBoxNum = 256
// MessageBoxUserNum = MaxUser / MessageBoxNum
// MessageBoxMessageLength = MessageBoxUserNum
)
var (
MyDispatcher Dispatcher
)
type Dispatcher struct {
WorkerPool chan chan *Message
}
func init() {
workerPool = make(chan chan *Message, WorkPoolNum)
MyDispatcher = &Dispatcher{WorkerPool: workerPool}
}
func (d *Dispatcher) dispatch() {
for i := 0; i < MessagePoolNum; i++ {
go func(idx int) {
for {
select {
case msg := <-MSGPool[idx].Pop():
go func(msg *Message) {
msgChannel := <-d.WorkerPool
msgChannel <- msg
}(msg)
}
}
}(i)
}
}

View File

@@ -5,9 +5,13 @@ import "sync"
type Message struct {
client *client
msg []byte
pool *MessagePool
// pool *MessagePool
}
var (
MSGPool []MessagePool
)
type MessagePool struct {
l sync.Mutex
maxuser int
@@ -15,14 +19,17 @@ type MessagePool struct {
queue chan *Message
}
func init() []MessagePool {
MSGPool = make([]MessagePool, (MessagePoolNum + 2))
for i := 0; i < (MessagePoolNum + 2); i++ {
MSGPool[i].Init(MessagePoolUser, MessagePoolMessageNum)
}
return MSGPool
}
func (p *MessagePool) Init(num int, maxusernum int) {
p.maxuser = maxusernum
p.queue = make(chan *Message, num)
for k := 0; k < num; k++ {
m := &Message{}
m.Pool = p
p.Push(m)
}
}
func (p *MessagePool) GetPool() *MessagePool {

30
broker/worker.go Normal file
View File

@@ -0,0 +1,30 @@
package broker
type Worker struct {
WorkerPool chan chan *Message
MsgChannel chan *Message
quit chan bool
}
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.MsgChannel
select {
case msg := <-w.MsgChannel:
// we have received a work request.
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}

30
main.go
View File

@@ -8,36 +8,6 @@ import (
log "github.com/cihub/seelog"
)
const (
MaxUser = 1024 * 1024
MessagePoolNum = 1024
MessagePoolUser = MaxUser / MessagePoolNum
MessagePoolMessageNum = MaxUser / MessagePoolNum * 4
MessageBoxNum = 256
MessageBoxUserNum = MaxUser / MessageBoxNum
MessageBoxMessageLength = MessageBoxUserNum
)
var (
MSGPool []MessagePool
// Messagebox []*Message
)
func init() {
MSGPool = make([]MessagePool, (MessagePoolNum + 2))
for i := 0; i < (MessagePoolNum + 2); i++ {
MSGPool[i].Init(MessagePoolUser, MessagePoolMessageNum)
}
// Messagebox = make([]MessageProcess, MailBoxNum)
// for i := 0; i < MailBoxNum; i++ {
// gMailbox[i].Init(MailBoxMessageLength, HongBaoBoxMessageLength)
// }
// for i := 0; i < MailBoxNum; i++ {
// go ProcessRequest(i)
// }
}
func main() {
broker := broker.NewBroker()
broker.StartListening()