Files
hmq/broker/client.go
joy.zhou 7547ad3bdc Restruct (#34)
* modify

* remove

* modify

* modify

* remove no use

* add online/offline notification

* modify

* format log

* add reference
2018-12-26 14:51:13 +08:00

422 lines
9.0 KiB
Go

/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
*/
package broker
import (
"context"
"errors"
"net"
"reflect"
"strings"
"sync"
"time"
"github.com/eclipse/paho.mqtt.golang/packets"
"github.com/fhmq/hmq/lib/sessions"
"github.com/fhmq/hmq/lib/topics"
"go.uber.org/zap"
)
const (
// special pub topic for cluster info BrokerInfoTopic
BrokerInfoTopic = "broker000100101info"
// CLIENT is an end user.
CLIENT = 0
// ROUTER is another router in the cluster.
ROUTER = 1
//REMOTE is the router connect to other cluster
REMOTE = 2
CLUSTER = 3
)
const (
Connected = 1
Disconnected = 2
)
type client struct {
typ int
mu sync.Mutex
broker *Broker
conn net.Conn
info info
route route
status int
ctx context.Context
cancelFunc context.CancelFunc
session *sessions.Session
subMap map[string]*subscription
topicsMgr *topics.Manager
subs []interface{}
qoss []byte
rmsgs []*packets.PublishPacket
}
type subInfo struct {
sub *subscription
num int
}
type subscription struct {
client *client
topic string
qos byte
queue bool
}
type info struct {
clientID string
username string
password []byte
keepalive uint16
willMsg *packets.PublishPacket
localIP string
remoteIP string
}
type route struct {
remoteID string
remoteUrl string
}
var (
DisconnectdPacket = packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
)
func (c *client) init() {
c.status = Connected
c.info.localIP = strings.Split(c.conn.LocalAddr().String(), ":")[0]
c.info.remoteIP = strings.Split(c.conn.RemoteAddr().String(), ":")[0]
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
c.subMap = make(map[string]*subscription)
c.topicsMgr = c.broker.topicsMgr
}
func (c *client) readLoop() {
nc := c.conn
b := c.broker
if nc == nil || b == nil {
return
}
keepAlive := time.Second * time.Duration(c.info.keepalive)
timeOut := keepAlive + (keepAlive / 2)
for {
select {
case <-c.ctx.Done():
return
default:
//add read timeout
if err := nc.SetReadDeadline(time.Now().Add(timeOut)); err != nil {
log.Error("set read timeout error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
packet, err := packets.ReadPacket(nc)
if err != nil {
log.Error("read packet error: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
msg := &Message{client: c, packet: DisconnectdPacket}
b.SubmitWork(msg)
return
}
msg := &Message{
client: c,
packet: packet,
}
b.SubmitWork(msg)
}
}
}
func ProcessMessage(msg *Message) {
c := msg.client
ca := msg.packet
if ca == nil {
return
}
log.Debug("Recv message:", zap.String("message type", reflect.TypeOf(msg.packet).String()[9:]), zap.String("ClientID", c.info.clientID))
switch ca.(type) {
case *packets.ConnackPacket:
case *packets.ConnectPacket:
case *packets.PublishPacket:
packet := ca.(*packets.PublishPacket)
c.ProcessPublish(packet)
case *packets.PubackPacket:
case *packets.PubrecPacket:
case *packets.PubrelPacket:
case *packets.PubcompPacket:
case *packets.SubscribePacket:
packet := ca.(*packets.SubscribePacket)
c.ProcessSubscribe(packet)
case *packets.SubackPacket:
case *packets.UnsubscribePacket:
packet := ca.(*packets.UnsubscribePacket)
c.ProcessUnSubscribe(packet)
case *packets.UnsubackPacket:
case *packets.PingreqPacket:
c.ProcessPing()
case *packets.PingrespPacket:
case *packets.DisconnectPacket:
c.Close()
default:
log.Info("Recv Unknow message.......", zap.String("ClientID", c.info.clientID))
}
}
func (c *client) ProcessPublish(packet *packets.PublishPacket) {
if c.status == Disconnected {
return
}
topic := packet.TopicName
if topic == BrokerInfoTopic && c.typ == CLUSTER {
c.ProcessInfo(packet)
return
}
if !c.CheckTopicAuth(PUB, topic) {
log.Error("Pub Topics Auth failed, ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
return
}
switch packet.Qos {
case QosAtMostOnce:
c.ProcessPublishMessage(packet)
case QosAtLeastOnce:
puback := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
puback.MessageID = packet.MessageID
if err := c.WriterPacket(puback); err != nil {
log.Error("send puback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
c.ProcessPublishMessage(packet)
case QosExactlyOnce:
return
default:
log.Error("publish with unknown qos", zap.String("ClientID", c.info.clientID))
return
}
}
func (c *client) ProcessPublishMessage(packet *packets.PublishPacket) {
if c.status == Disconnected {
return
}
b := c.broker
if b == nil {
return
}
typ := c.typ
if packet.Retain {
if err := c.topicsMgr.Retain(packet); err != nil {
log.Error("Error retaining message: ", zap.Error(err), zap.String("ClientID", c.info.clientID))
}
}
err := c.topicsMgr.Subscribers([]byte(packet.TopicName), packet.Qos, &c.subs, &c.qoss)
if err != nil {
log.Error("Error retrieving subscribers list: ", zap.String("ClientID", c.info.clientID))
return
}
// log.Info("psubs num: ", len(r.psubs))
if len(c.subs) == 0 {
return
}
for _, sub := range c.subs {
s, ok := sub.(*subscription)
if ok {
if s.client.typ == ROUTER {
if typ != CLIENT {
continue
}
}
err := s.client.WriterPacket(packet)
if err != nil {
log.Error("process message for psub error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
}
}
}
}
func (c *client) ProcessSubscribe(packet *packets.SubscribePacket) {
if c.status == Disconnected {
return
}
b := c.broker
if b == nil {
return
}
topics := packet.Topics
qoss := packet.Qoss
suback := packets.NewControlPacket(packets.Suback).(*packets.SubackPacket)
suback.MessageID = packet.MessageID
var retcodes []byte
for i, topic := range topics {
t := topic
//check topic auth for client
if !c.CheckTopicAuth(SUB, topic) {
log.Error("Sub topic Auth failed: ", zap.String("topic", topic), zap.String("ClientID", c.info.clientID))
retcodes = append(retcodes, QosFailure)
continue
}
sub := &subscription{
topic: t,
qos: qoss[i],
client: c,
}
rqos, err := c.topicsMgr.Subscribe([]byte(topic), qoss[i], sub)
if err != nil {
return
}
c.subMap[topic] = sub
c.session.AddTopic(topic, qoss[i])
retcodes = append(retcodes, rqos)
c.topicsMgr.Retained([]byte(topic), &c.rmsgs)
}
suback.ReturnCodes = retcodes
err := c.WriterPacket(suback)
if err != nil {
log.Error("send suback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
//broadcast subscribe message
if c.typ == CLIENT {
go b.BroadcastSubOrUnsubMessage(packet)
}
//process retain message
for _, rm := range c.rmsgs {
if err := c.WriterPacket(rm); err != nil {
log.Error("Error publishing retained message:", zap.Any("err", err), zap.String("ClientID", c.info.clientID))
} else {
log.Info("process retain message: ", zap.Any("packet", packet), zap.String("ClientID", c.info.clientID))
}
}
}
func (c *client) ProcessUnSubscribe(packet *packets.UnsubscribePacket) {
if c.status == Disconnected {
return
}
b := c.broker
if b == nil {
return
}
topics := packet.Topics
for _, topic := range topics {
t := []byte(topic)
sub, exist := c.subMap[topic]
if exist {
c.topicsMgr.Unsubscribe(t, sub)
c.session.RemoveTopic(topic)
delete(c.subMap, topic)
}
}
unsuback := packets.NewControlPacket(packets.Unsuback).(*packets.UnsubackPacket)
unsuback.MessageID = packet.MessageID
err := c.WriterPacket(unsuback)
if err != nil {
log.Error("send unsuback error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
// //process ubsubscribe message
if c.typ == CLIENT {
b.BroadcastSubOrUnsubMessage(packet)
}
}
func (c *client) ProcessPing() {
if c.status == Disconnected {
return
}
resp := packets.NewControlPacket(packets.Pingresp).(*packets.PingrespPacket)
err := c.WriterPacket(resp)
if err != nil {
log.Error("send PingResponse error, ", zap.Error(err), zap.String("ClientID", c.info.clientID))
return
}
}
func (c *client) Close() {
if c.status == Disconnected {
return
}
c.cancelFunc()
c.status = Disconnected
//wait for message complete
time.Sleep(1 * time.Second)
// c.status = Disconnected
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
b := c.broker
subs := c.subMap
if b != nil {
b.removeClient(c)
if c.typ == CLIENT {
b.BroadcastUnSubscribe(subs)
//offline notification
b.OnlineOfflineNotification(c.info.clientID, false)
}
if c.info.willMsg != nil {
b.PublishMessage(c.info.willMsg)
}
if c.typ == CLUSTER {
b.ConnectToDiscovery()
}
//do reconnect
if c.typ == REMOTE {
go b.connectRouter(c.route.remoteID, c.route.remoteUrl)
}
}
}
func (c *client) WriterPacket(packet packets.ControlPacket) error {
if c.status == Disconnected {
return nil
}
if packet == nil {
return nil
}
if c.conn == nil {
c.Close()
return errors.New("connect lost ....")
}
c.mu.Lock()
err := packet.Write(c.conn)
c.mu.Unlock()
return err
}