diff --git a/broker/broker.go b/broker/broker.go index 7171328..73cf4c1 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -1,6 +1,7 @@ package broker import ( + "fhmq/lib/message" "net" "time" @@ -20,6 +21,7 @@ func (b *Broker) StartListening() { return } tmpDelay := 10 * ACCEPT_MIN_SLEEP + num := 0 for { conn, err := l.Accept() if err != nil { @@ -37,11 +39,12 @@ func (b *Broker) StartListening() { continue } tmpDelay = ACCEPT_MIN_SLEEP - go handleConnection(conn) + num += 1 + go b.handleConnection(conn, num) } } -func handleConnection(conn net.Conn) { +func (b *Broker) handleConnection(conn net.Conn, idx int) { //process connect packet buf, err := ReadPacket(conn) @@ -54,8 +57,29 @@ func handleConnection(conn net.Conn) { log.Error(err) return } -} - -func (b *Broker) NewClient() { - + willmsg := message.NewPublishMessage() + if connMsg.WillFlag() { + willmsg.SetQoS(connMsg.WillQos()) + willmsg.SetPayload(connMsg.WillMessage()) + willmsg.SetRetain(connMsg.WillRetain()) + willmsg.SetTopic(connMsg.WillTopic()) + willmsg.SetDup(false) + } else { + willmsg = nil + } + info := info{ + clientID: connMsg.ClientId(), + username: connMsg.Username(), + password: connMsg.Password(), + keepalive: connMsg.KeepAlive(), + willMsg: willmsg, + } + c := &client{ + broker: b, + conn: conn, + info: info, + msgPool: MSGPool[idx%MessagePoolNum].GetPool(), + } + c.init() + c.readLoop() } diff --git a/broker/client.go b/broker/client.go index ad995bd..747d0cb 100644 --- a/broker/client.go +++ b/broker/client.go @@ -4,25 +4,55 @@ import ( "fhmq/lib/message" "net" "strings" + + "github.com/prometheus/common/log" ) type client struct { - broker Broker + broker *Broker conn net.Conn info info localIP string remoteIP string + msgPool *MessagePool + woker Worker } type info struct { - clientID []byte - username []byte - password []byte - keeplive uint16 - willMsg *message.PublishMessage + clientID []byte + username []byte + password []byte + keepalive uint16 + willMsg *message.PublishMessage +} + +type Worker struct { + WorkerPool chan chan Message + MsgChannel chan Message + quit chan bool } func (c *client) init() { c.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0] c.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0] } + +func (c *client) readLoop() { + nc := c.conn + msgPool := c.msgPool + if nc == nil || msgPool == nil { + return + } + cid := c.info.clientID + for { + msg := msgPool.Pop() + buf, err := ReadPacket(nc) + if err != nil { + log.Error("read packet error: ", err) + return + } + msg.user = cid + msg.buf = buf + + } +} diff --git a/broker/msgpool.go b/broker/msgpool.go new file mode 100644 index 0000000..79118ef --- /dev/null +++ b/broker/msgpool.go @@ -0,0 +1,57 @@ +package broker + +import "sync" + +type Message struct { + client *client + msg []byte + pool *MessagePool +} + +type MessagePool struct { + l sync.Mutex + maxuser int + user int + queue chan *Message +} + +func (p *MessagePool) Init(num int, maxusernum int) { + p.maxuser = maxusernum + p.queue = make(chan *Message, num) + for k := 0; k < num; k++ { + m := &Message{} + m.Pool = p + p.Push(m) + } +} + +func (p *MessagePool) GetPool() *MessagePool { + p.l.Lock() + if p.user+1 < p.maxuser { + p.user += 1 + p.l.Unlock() + return p + } else { + p.l.Unlock() + return nil + } + +} + +func (p *MessagePool) Reduce() { + p.l.Lock() + p.user -= 1 + p.l.Unlock() + +} + +func (p *MessagePool) Pop() *Message { + + p2 := <-p.queue + return p2 +} + +func (p *MessagePool) Push(pmessage *Message) { + + p.queue <- pmessage +} diff --git a/lib/messagepool.go b/lib/messagepool.go deleted file mode 100644 index b8b11b5..0000000 --- a/lib/messagepool.go +++ /dev/null @@ -1,11 +0,0 @@ -package lib - -// type MessagePool struct { -// sync.Mutex -// queue chan *Message -// } - -// func (p *MessagePool) Init(len int, maxusernum int) { -// p.maxuser = maxusernum -// p.queue = make(chan *Message, len) -// } diff --git a/main.go b/main.go index 4c5f2e2..f23a6ba 100644 --- a/main.go +++ b/main.go @@ -8,7 +8,23 @@ import ( log "github.com/cihub/seelog" ) -//https://github.com/xiaojiaqi/10billionhongbaos +const ( + MaxUser = 1024 * 1024 + MessagePoolNum = 1024 + MessagePoolUser = MaxUser / MessagePoolNum + MessagePoolMessageNum = MaxUser / MessagePoolNum * 4 +) + +var ( + MSGPool []MessagePool +) + +func init() { + MSGPool = make([]MessagePool, (MessagePoolNum + 2)) + for i := 0; i < (MessagePoolNum + 2); i++ { + MSGPool[i].Init(MessagePoolUser, MessagePoolMessageNum) + } +} func main() { broker := broker.NewBroker() broker.StartListening() @@ -16,6 +32,7 @@ func main() { s := waitForSignal() log.Infof("signal got: %v ,broker closed.", s) } + func waitForSignal() os.Signal { signalChan := make(chan os.Signal, 1) defer close(signalChan)