This commit is contained in:
zhouyuyan
2017-09-01 11:08:28 +08:00
parent c732d395e1
commit a45cccaa7a
7 changed files with 152 additions and 165 deletions

View File

@@ -2,7 +2,6 @@ package broker
import (
"errors"
"hmq/lib/message"
"hmq/packets"
"net"
"strings"
@@ -44,7 +43,7 @@ type info struct {
username string
password []byte
keepalive uint16
willMsg packets.ControlPacket
willMsg *packets.PublishPacket
localIP string
remoteIP string
}
@@ -67,7 +66,7 @@ func (c *client) readLoop(msgPool *MessagePool) {
}
msg := &Message{}
for {
packet, err := packets.ReadPacket(conn)
packet, err := packets.ReadPacket(nc)
// buf, err := ReadPacket(nc)
if err != nil {
log.Error("read packet error: ", err)
@@ -75,56 +74,65 @@ func (c *client) readLoop(msgPool *MessagePool) {
return
}
msg.client = c
msg.msg = packet
msg.packet = packet
msgPool.queue <- msg
}
msgPool.Reduce()
}
func ProcessMessage(msg *Message) {
buf := msg.msg
c := msg.client
if c == nil || buf == nil {
ca := msg.packet
if c == nil || ca == nil {
return
}
switch m := msg.(type) {
case *packets.Connack:
switch ca.(type) {
case *packets.ConnackPacket:
// log.Info("Recv conack message..........")
c.ProcessConnAck(buf)
case *packets.Connect:
packet := ca.(*packets.ConnackPacket)
c.ProcessConnAck(packet)
case *packets.ConnectPacket:
// log.Info("Recv connect message..........")
c.ProcessConnect(buf)
case *packets.Publish:
packet := ca.(*packets.ConnectPacket)
c.ProcessConnect(packet)
case *packets.PublishPacket:
// log.Info("Recv publish message..........")
c.ProcessPublish(buf)
case *packets.Puback:
packet := ca.(*packets.PublishPacket)
c.ProcessPublish(packet)
case *packets.PubackPacket:
//log.Info("Recv publish ack message..........")
c.ProcessPubAck(buf)
case *packets.Pubrec::
packet := ca.(*packets.PubackPacket)
c.ProcessPubAck(packet)
case *packets.PubrecPacket:
//log.Info("Recv publish rec message..........")
c.ProcessPubREC(buf)
case *packets.Pubrel:
packet := ca.(*packets.PubrecPacket)
c.ProcessPubREC(packet)
case *packets.PubrelPacket:
//log.Info("Recv publish rel message..........")
c.ProcessPubREL(buf)
case *packets.Pubcomp:
packet := ca.(*packets.PubrelPacket)
c.ProcessPubREL(packet)
case *packets.PubcompPacket:
//log.Info("Recv publish ack message..........")
c.ProcessPubComp(buf)
case *packets.Subscribe:
packet := ca.(*packets.PubcompPacket)
c.ProcessPubComp(packet)
case *packets.SubscribePacket:
// log.Info("Recv subscribe message.....")
c.ProcessSubscribe(buf)
case *packets.Suback:
packet := ca.(*packets.SubscribePacket)
c.ProcessSubscribe(packet)
case *packets.SubackPacket:
// log.Info("Recv suback message.....")
case *packets.Unsubscribe:
case *packets.UnsubscribePacket:
// log.Info("Recv unsubscribe message.....")
c.ProcessUnSubscribe(buf)
case *packets.Unsuback:
packet := ca.(*packets.UnsubscribePacket)
c.ProcessUnSubscribe(packet)
case *packets.UnsubackPacket:
//log.Info("Recv unsuback message.....")
case *packets.Pingreq:
case *packets.PingreqPacket:
// log.Info("Recv PINGREQ message..........")
c.ProcessPing(buf)
c.ProcessPing()
case *packets.PingrespPacket:
//log.Info("Recv PINGRESP message..........")
case *packets.Disconnect:
case *packets.DisconnectPacket:
// log.Info("Recv DISCONNECT message.......")
c.Close()
default:
@@ -132,31 +140,25 @@ func ProcessMessage(msg *Message) {
}
}
func (c *client) ProcessConnect(buf []byte) {
func (c *client) ProcessConnect(packet *packets.ConnectPacket) {
}
func (c *client) ProcessConnAck(buf []byte) {
func (c *client) ProcessConnAck(packet *packets.ConnackPacket) {
}
func (c *client) ProcessPublish(buf []byte) {
msg, err := DecodePublishMessage(buf)
if err != nil {
log.Error("Decode Publish Message error: ", err)
c.Close()
func (c *client) ProcessPublish(packet *packets.PublishPacket) {
topic := packet.TopicName
if c.typ != CLIENT || !c.CheckTopicAuth(PUB, topic) {
return
}
topic := msg.Topic()
c.ProcessPublishMessage(packet)
if c.typ != CLIENT || !c.CheckTopicAuth(PUB, string(topic)) {
return
}
c.ProcessPublishMessage(buf, msg)
if msg.Retain() {
if packet.Retain {
if b := c.broker; b != nil {
err := b.rl.Insert(topic, buf)
err := b.rl.Insert([]byte(topic), packet)
if err != nil {
log.Error("Insert Retain Message error: ", err)
}
@@ -165,14 +167,14 @@ func (c *client) ProcessPublish(buf []byte) {
}
func (c *client) ProcessPublishMessage(buf []byte, msg *message.PublishMessage) {
func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
b := c.broker
if b == nil {
return
}
typ := c.typ
topic := string(msg.Topic())
topic := packet.TopicName
r := b.sl.Match(topic)
// log.Info("psubs num: ", len(r.psubs))
@@ -187,7 +189,7 @@ func (c *client) ProcessPublishMessage(buf []byte, msg *message.PublishMessage)
}
}
if sub != nil {
err := sub.client.writeBuffer(buf)
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("process message for psub error, ", err)
}
@@ -203,7 +205,7 @@ func (c *client) ProcessPublishMessage(buf []byte, msg *message.PublishMessage)
// s.qmu.Lock()
if cnt, exist := b.queues[string(sub.topic)]; exist && i == cnt {
if sub != nil {
err := sub.client.writeBuffer(buf)
err := sub.client.WriterPacket(packet)
if err != nil {
log.Error("process will message for qsub error, ", err)
}
@@ -215,47 +217,41 @@ func (c *client) ProcessPublishMessage(buf []byte, msg *message.PublishMessage)
}
}
func (c *client) ProcessPubAck(buf []byte) {
func (c *client) ProcessPubAck(packet *packets.PubackPacket) {
}
func (c *client) ProcessPubREC(buf []byte) {
func (c *client) ProcessPubREC(packet *packets.PubrecPacket) {
}
func (c *client) ProcessPubREL(buf []byte) {
func (c *client) ProcessPubREL(packet *packets.PubrelPacket) {
}
func (c *client) ProcessPubComp(buf []byte) {
func (c *client) ProcessPubComp(packet *packets.PubcompPacket) {
}
func (c *client) ProcessSubscribe(buf []byte) {
func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
b := c.broker
if b == nil {
return
}
msg, err := DecodeSubscribeMessage(buf)
if err != nil {
log.Error("Decode Subscribe Message error: ", err)
c.Close()
return
}
topics := msg.Topics()
qos := msg.Qos()
topics := packet.Topics
qoss := packet.Qoss
suback := message.NewSubackMessage()
suback.SetPacketId(msg.PacketId())
suback := packets.NewControlPacket(packets.Suback).(*packets.SubackPacket)
suback.MessageID = packet.MessageID
var retcodes []byte
for i, t := range topics {
topic := string(t)
for i, topic := range topics {
t := []byte(topic)
//check topic auth for client
if c.typ == CLIENT {
if !c.CheckTopicAuth(SUB, topic) {
log.Error("CheckSubAuth failed")
retcodes = append(retcodes, message.QosFailure)
retcodes = append(retcodes, QosFailure)
continue
}
}
@@ -271,13 +267,13 @@ func (c *client) ProcessSubscribe(buf []byte) {
}
// b.qmu.Unlock()
} else {
retcodes = append(retcodes, message.QosFailure)
retcodes = append(retcodes, QosFailure)
continue
}
}
sub := &subscription{
topic: t,
qos: qos[i],
qos: qoss[i],
client: c,
queue: queue,
}
@@ -289,82 +285,69 @@ func (c *client) ProcessSubscribe(buf []byte) {
err := b.sl.Insert(sub)
if err != nil {
log.Error("Insert subscription error: ", err)
retcodes = append(retcodes, message.QosFailure)
retcodes = append(retcodes, QosFailure)
}
retcodes = append(retcodes, qos[i])
retcodes = append(retcodes, qoss[i])
} else {
//if exist ,check whether qos change
c.subs[topic].qos = qos[i]
retcodes = append(retcodes, qos[i])
c.subs[topic].qos = qoss[i]
retcodes = append(retcodes, qoss[i])
}
}
if err := suback.AddReturnCodes(retcodes); err != nil {
log.Error("add return suback code error, ", err)
// if typ == CLIENT {
c.Close()
// }
return
}
suback.ReturnCodes = retcodes
err1 := c.writeMessage(suback)
if err1 != nil {
log.Error("send suback error, ", err1)
err := c.WriterPacket(suback)
if err != nil {
log.Error("send suback error, ", err)
return
}
//broadcast subscribe message
if c.typ == CLIENT {
go b.BroadcastSubOrUnsubMessage(buf)
go b.BroadcastSubOrUnsubMessage(packet)
}
//process retain message
for _, t := range topics {
bufs := b.rl.Match(t)
for _, buf := range bufs {
log.Info("process retain message: ", string(buf))
if buf != nil && string(buf) != "" {
c.writeBuffer(buf)
packets := b.rl.Match([]byte(t))
for _, packet := range packets {
log.Info("process retain message: ", packet)
if packet != nil {
c.WriterPacket(packet)
}
}
}
}
func (c *client) ProcessUnSubscribe(buf []byte) {
func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
b := c.broker
if b == nil {
return
}
unsub, err := DecodeUnsubscribeMessage(buf)
if err != nil {
log.Error("Decode UnSubscribe Message error: ", err)
c.Close()
return
}
topics := unsub.Topics()
topics := packet.Topics
for _, t := range topics {
var sub *subscription
ok := false
if sub, ok = c.subs[string(t)]; ok {
if sub, ok = c.subs[t]; ok {
go c.unsubscribe(sub)
}
}
resp := message.NewUnsubackMessage()
resp.SetPacketId(unsub.PacketId())
unsuback := packets.NewControlPacket(packets.Unsuback).(*packets.UnsubackPacket)
unsuback.MessageID = packet.MessageID
err1 := c.writeMessage(resp)
if err1 != nil {
log.Error("send ubsuback error, ", err1)
err := c.WriterPacket(unsuback)
if err != nil {
log.Error("send unsuback error, ", err)
return
}
// //process ubsubscribe message
if c.typ == CLIENT {
b.BroadcastSubOrUnsubMessage(buf)
b.BroadcastSubOrUnsubMessage(packet)
}
}
@@ -379,16 +362,9 @@ func (c *client) unsubscribe(sub *subscription) {
}
}
func (c *client) ProcessPing(buf []byte) {
_, err := DecodePingreqMessage(buf)
if err != nil {
log.Error("Decode PingRequest Message error: ", err)
c.Close()
return
}
pingRspMsg := message.NewPingrespMessage()
err = c.writeMessage(pingRspMsg)
func (c *client) ProcessPing() {
resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket)
err := c.WriterPacket(resp)
if err != nil {
log.Error("send PingResponse error, ", err)
return
@@ -407,7 +383,7 @@ func (c *client) Close() {
}
}
if c.info.willMsg != nil {
// b.ProcessPublishMessage(c.info.willMsg)
b.ProcessPublishMessage(c.info.willMsg)
}
}
if c.conn != nil {
@@ -416,6 +392,13 @@ func (c *client) Close() {
}
}
func (c *client) WriterPacket(packet packets.ControlPacket) error {
c.mu.Lock()
err := packet.Write(c.conn)
c.mu.Unlock()
return err
}
func WriteBuffer(conn net.Conn, buf []byte) error {
if conn == nil {
return errors.New("conn is nul")
@@ -430,10 +413,10 @@ func (c *client) writeBuffer(buf []byte) error {
return err
}
func (c *client) writeMessage(msg message.Message) error {
buf, err := EncodeMessage(msg)
if err != nil {
return err
}
return c.writeBuffer(buf)
}
// func (c *client) writeMessage(msg message.Message) error {
// buf, err := EncodeMessage(msg)
// if err != nil {
// return err
// }
// return c.writeBuffer(buf)
// }