This commit is contained in:
chowyu08
2017-08-31 22:04:00 +08:00
parent 0a6456f1e3
commit c732d395e1
20 changed files with 1294 additions and 51 deletions

View File

@@ -4,6 +4,7 @@ import (
"crypto/tls"
"hmq/lib/acl"
"hmq/lib/message"
"hmq/packets"
"net"
"net/http"
"sync/atomic"
@@ -168,41 +169,44 @@ func (b *Broker) StartListening(typ int) {
func (b *Broker) handleConnection(typ int, conn net.Conn, idx uint64) {
//process connect packet
buf, err := ReadPacket(conn)
packet, err := packets.ReadPacket(conn)
if err != nil {
log.Error("read connect packet error: ", err)
return
}
connMsg, err := DecodeConnectMessage(buf)
if packet == nil {
log.Error("received nil packet")
return
}
msg, ok := packet.(*packets.ConnectPacket)
if !ok {
log.Error("received msg that was not Connect")
return
}
connack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket)
connack.ReturnCode = packets.Accepted
connack.SessionPresent = msg.CleanSession
err = connack.Write(conn)
if err != nil {
log.Error(err)
log.Error("send connack error, ", err)
return
}
connack := message.NewConnackMessage()
connack.SetReturnCode(message.ConnectionAccepted)
ack, _ := EncodeMessage(connack)
err1 := WriteBuffer(conn, ack)
if err1 != nil {
log.Error("send connack error, ", err1)
return
}
willmsg := message.NewPublishMessage()
if connMsg.WillFlag() {
willmsg.SetQoS(connMsg.WillQos())
willmsg.SetPayload(connMsg.WillMessage())
willmsg.SetRetain(connMsg.WillRetain())
willmsg.SetTopic(connMsg.WillTopic())
willmsg.SetDup(false)
willmsg := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
if msg.WillFlag {
willmsg.Qos = msg.WillQos
willmsg.TopicName = msg.WillTopic
willmsg.Retain = msg.WillRetain
willmsg.Payload = msg.WillMessage
willmsg.Dup = msg.Dup
} else {
willmsg = nil
}
info := info{
clientID: connMsg.ClientId(),
username: connMsg.Username(),
password: connMsg.Password(),
keepalive: connMsg.KeepAlive(),
clientID: msg.ClientIdentifier,
username: msg.Username,
password: msg.Password,
keepalive: msg.Keepalive,
willMsg: willmsg,
}
@@ -256,7 +260,7 @@ func (b *Broker) connectRouter(url, remoteID string) {
}
cid := GenUniqueId()
info := info{
clientID: []byte(cid),
clientID: cid,
}
c := &client{
typ: REMOTE,

View File

@@ -3,6 +3,7 @@ package broker
import (
"errors"
"hmq/lib/message"
"hmq/packets"
"net"
"strings"
"sync"
@@ -39,11 +40,11 @@ type subscription struct {
}
type info struct {
clientID []byte
username []byte
clientID string
username string
password []byte
keepalive uint16
willMsg *message.PublishMessage
willMsg packets.ControlPacket
localIP string
remoteIP string
}
@@ -66,14 +67,15 @@ func (c *client) readLoop(msgPool *MessagePool) {
}
msg := &Message{}
for {
buf, err := ReadPacket(nc)
packet, err := packets.ReadPacket(conn)
// buf, err := ReadPacket(nc)
if err != nil {
log.Error("read packet error: ", err)
c.Close()
return
}
msg.client = c
msg.msg = buf
msg.msg = packet
msgPool.queue <- msg
}
msgPool.Reduce()
@@ -85,45 +87,44 @@ func ProcessMessage(msg *Message) {
if c == nil || buf == nil {
return
}
msgType := uint8(buf[0] & 0xF0 >> 4)
switch msgType {
case CONNACK:
switch m := msg.(type) {
case *packets.Connack:
// log.Info("Recv conack message..........")
c.ProcessConnAck(buf)
case CONNECT:
case *packets.Connect:
// log.Info("Recv connect message..........")
c.ProcessConnect(buf)
case PUBLISH:
case *packets.Publish:
// log.Info("Recv publish message..........")
c.ProcessPublish(buf)
case PUBACK:
case *packets.Puback:
//log.Info("Recv publish ack message..........")
c.ProcessPubAck(buf)
case PUBCOMP:
//log.Info("Recv publish ack message..........")
c.ProcessPubComp(buf)
case PUBREC:
case *packets.Pubrec::
//log.Info("Recv publish rec message..........")
c.ProcessPubREC(buf)
case PUBREL:
case *packets.Pubrel:
//log.Info("Recv publish rel message..........")
c.ProcessPubREL(buf)
case SUBSCRIBE:
case *packets.Pubcomp:
//log.Info("Recv publish ack message..........")
c.ProcessPubComp(buf)
case *packets.Subscribe:
// log.Info("Recv subscribe message.....")
c.ProcessSubscribe(buf)
case SUBACK:
case *packets.Suback:
// log.Info("Recv suback message.....")
case UNSUBSCRIBE:
case *packets.Unsubscribe:
// log.Info("Recv unsubscribe message.....")
c.ProcessUnSubscribe(buf)
case UNSUBACK:
case *packets.Unsuback:
//log.Info("Recv unsuback message.....")
case PINGREQ:
case *packets.Pingreq:
// log.Info("Recv PINGREQ message..........")
c.ProcessPing(buf)
case PINGRESP:
case *packets.PingrespPacket:
//log.Info("Recv PINGRESP message..........")
case DISCONNECT:
case *packets.Disconnect:
// log.Info("Recv DISCONNECT message.......")
c.Close()
default:
@@ -406,7 +407,7 @@ func (c *client) Close() {
}
}
if c.info.willMsg != nil {
b.ProcessPublishMessage(c.info.willMsg)
// b.ProcessPublishMessage(c.info.willMsg)
}
}
if c.conn != nil {

View File

@@ -39,7 +39,7 @@ func (c *client) SendConnect() {
clientID := c.info.clientID
connMsg := message.NewConnectMessage()
connMsg.SetClientId(clientID)
connMsg.SetClientId([]byte(clientID))
connMsg.SetVersion(0x04)
err := c.writeMessage(connMsg)
if err != nil {

View File

@@ -1,6 +1,9 @@
package broker
import "sync"
import (
"hmq/packets"
"sync"
)
const (
MaxUser = 1024 * 1024
@@ -11,7 +14,7 @@ const (
type Message struct {
client *client
msg []byte
msg packets.ControlPacket
}
var (