diff --git a/broker/broker.go b/broker/broker.go index cf66dc0..d75679f 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -73,14 +73,17 @@ func (b *Broker) handleConnection(conn net.Conn, idx int) { keepalive: connMsg.KeepAlive(), willMsg: willmsg, } - pool := MSGPool[idx%MessagePoolNum].GetPool() + c := &client{ - broker: b, - conn: conn, - info: info, - msgPool: pool, + broker: b, + conn: conn, + info: info, } c.init() - c.readLoop() - pool.Reduce() + c.woker = Worker{ + WorkerPool: MyDispatcher, + MsgChannel: make(chan *Message), + quit: make(chan bool), + } + c.readLoop(idx) } diff --git a/broker/client.go b/broker/client.go index 747d0cb..9f882cb 100644 --- a/broker/client.go +++ b/broker/client.go @@ -14,7 +14,6 @@ type client struct { info info localIP string remoteIP string - msgPool *MessagePool woker Worker } @@ -26,33 +25,26 @@ type info struct { willMsg *message.PublishMessage } -type Worker struct { - WorkerPool chan chan Message - MsgChannel chan Message - quit chan bool -} - func (c *client) init() { c.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0] c.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0] } -func (c *client) readLoop() { +func (c *client) readLoop(idx int) { nc := c.conn - msgPool := c.msgPool + msgPool := MSGPool[idx%MessagePoolNum].GetPool() if nc == nil || msgPool == nil { return } - cid := c.info.clientID for { - msg := msgPool.Pop() buf, err := ReadPacket(nc) if err != nil { log.Error("read packet error: ", err) return } - msg.user = cid + msg.client = c msg.buf = buf - + msgPool.Push(msg) } + msgPool.Reduce() } diff --git a/broker/dispatcher.go b/broker/dispatcher.go new file mode 100644 index 0000000..5811ec3 --- /dev/null +++ b/broker/dispatcher.go @@ -0,0 +1,43 @@ +package broker + +const ( + WorkPoolNum = 1024 + MaxUser = 1024 * 1024 + MessagePoolNum = 1024 + MessagePoolUser = MaxUser / MessagePoolNum + MessagePoolMessageNum = MaxUser / MessagePoolNum * 4 + + // MessageBoxNum = 256 + // MessageBoxUserNum = MaxUser / MessageBoxNum + // MessageBoxMessageLength = MessageBoxUserNum +) + +var ( + MyDispatcher Dispatcher +) + +type Dispatcher struct { + WorkerPool chan chan *Message +} + +func init() { + workerPool = make(chan chan *Message, WorkPoolNum) + MyDispatcher = &Dispatcher{WorkerPool: workerPool} +} + +func (d *Dispatcher) dispatch() { + for i := 0; i < MessagePoolNum; i++ { + go func(idx int) { + for { + select { + case msg := <-MSGPool[idx].Pop(): + go func(msg *Message) { + msgChannel := <-d.WorkerPool + msgChannel <- msg + }(msg) + } + } + }(i) + } + +} diff --git a/broker/msgpool.go b/broker/msgpool.go index 79118ef..8a7b848 100644 --- a/broker/msgpool.go +++ b/broker/msgpool.go @@ -5,9 +5,13 @@ import "sync" type Message struct { client *client msg []byte - pool *MessagePool + // pool *MessagePool } +var ( + MSGPool []MessagePool +) + type MessagePool struct { l sync.Mutex maxuser int @@ -15,14 +19,17 @@ type MessagePool struct { queue chan *Message } +func init() []MessagePool { + MSGPool = make([]MessagePool, (MessagePoolNum + 2)) + for i := 0; i < (MessagePoolNum + 2); i++ { + MSGPool[i].Init(MessagePoolUser, MessagePoolMessageNum) + } + return MSGPool +} + func (p *MessagePool) Init(num int, maxusernum int) { p.maxuser = maxusernum p.queue = make(chan *Message, num) - for k := 0; k < num; k++ { - m := &Message{} - m.Pool = p - p.Push(m) - } } func (p *MessagePool) GetPool() *MessagePool { diff --git a/broker/worker.go b/broker/worker.go new file mode 100644 index 0000000..f21d246 --- /dev/null +++ b/broker/worker.go @@ -0,0 +1,30 @@ +package broker + +type Worker struct { + WorkerPool chan chan *Message + MsgChannel chan *Message + quit chan bool +} + +func (w Worker) Start() { + go func() { + for { + // register the current worker into the worker queue. + w.WorkerPool <- w.MsgChannel + select { + case msg := <-w.MsgChannel: + // we have received a work request. + case <-w.quit: + // we have received a signal to stop + return + } + } + }() +} + +// Stop signals the worker to stop listening for work requests. +func (w Worker) Stop() { + go func() { + w.quit <- true + }() +} diff --git a/main.go b/main.go index f46eb1a..36972bc 100644 --- a/main.go +++ b/main.go @@ -8,36 +8,6 @@ import ( log "github.com/cihub/seelog" ) -const ( - MaxUser = 1024 * 1024 - MessagePoolNum = 1024 - MessagePoolUser = MaxUser / MessagePoolNum - MessagePoolMessageNum = MaxUser / MessagePoolNum * 4 - - MessageBoxNum = 256 - MessageBoxUserNum = MaxUser / MessageBoxNum - MessageBoxMessageLength = MessageBoxUserNum -) - -var ( - MSGPool []MessagePool - // Messagebox []*Message -) - -func init() { - MSGPool = make([]MessagePool, (MessagePoolNum + 2)) - for i := 0; i < (MessagePoolNum + 2); i++ { - MSGPool[i].Init(MessagePoolUser, MessagePoolMessageNum) - } - // Messagebox = make([]MessageProcess, MailBoxNum) - // for i := 0; i < MailBoxNum; i++ { - // gMailbox[i].Init(MailBoxMessageLength, HongBaoBoxMessageLength) - // } - // for i := 0; i < MailBoxNum; i++ { - // go ProcessRequest(i) - // } - -} func main() { broker := broker.NewBroker() broker.StartListening()