This commit is contained in:
joy.zhou
2019-07-31 14:34:56 +08:00
parent 913bff4ed5
commit 3ff0f170c6
7 changed files with 137 additions and 88 deletions

View File

@@ -14,6 +14,7 @@ import (
"github.com/fhmq/hmq/broker/lib/sessions" "github.com/fhmq/hmq/broker/lib/sessions"
"github.com/fhmq/hmq/broker/lib/topics" "github.com/fhmq/hmq/broker/lib/topics"
pb "github.com/fhmq/hmq/grpc"
"github.com/fhmq/hmq/plugins" "github.com/fhmq/hmq/plugins"
"github.com/eclipse/paho.mqtt.golang/packets" "github.com/eclipse/paho.mqtt.golang/packets"
@@ -42,11 +43,11 @@ type Broker struct {
tlsConfig *tls.Config tlsConfig *tls.Config
wpool *pool.WorkerPool wpool *pool.WorkerPool
clients sync.Map clients sync.Map
remotes sync.Map
nodes map[string]interface{} nodes map[string]interface{}
clusterPool chan *Message clusterPool chan *Message
topicsMgr *topics.Manager topicsMgr *topics.Manager
sessionMgr *sessions.Manager sessionMgr *sessions.Manager
rpcClient map[string]pb.HMQServiceClient
pluginAuthHTTP bool pluginAuthHTTP bool
pluginKafka bool pluginKafka bool
} }
@@ -130,7 +131,7 @@ func (b *Broker) Start() {
return return
} }
go initRPCService() go b.initRPCService()
go InitHTTPMoniter(b) go InitHTTPMoniter(b)
@@ -369,6 +370,8 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) {
if ok { if ok {
ol.Close() ol.Close()
} }
} else {
b.QueryConnect(cid)
} }
b.clients.Store(cid, c) b.clients.Store(cid, c)
b.OnlineOfflineNotification(cid, true) b.OnlineOfflineNotification(cid, true)
@@ -426,6 +429,7 @@ func (b *Broker) PublishMessage(packet *packets.PublishPacket) {
publish(sub, packet) publish(sub, packet)
} }
b.DeliverMessage(packet)
} }
func (b *Broker) OnlineOfflineNotification(clientID string, online bool) { func (b *Broker) OnlineOfflineNotification(clientID string, online bool) {

View File

@@ -17,26 +17,20 @@ import (
) )
type Config struct { type Config struct {
Worker int `json:"workerNum"` Worker int `json:"workerNum"`
Host string `json:"host"` Host string `json:"host"`
Port string `json:"port"` Port string `json:"port"`
Cluster RouteInfo `json:"cluster"` Router string `json:"router"`
Router string `json:"router"` TlsHost string `json:"tlsHost"`
TlsHost string `json:"tlsHost"` TlsPort string `json:"tlsPort"`
TlsPort string `json:"tlsPort"` WsPath string `json:"wsPath"`
WsPath string `json:"wsPath"` WsPort string `json:"wsPort"`
WsPort string `json:"wsPort"` WsTLS bool `json:"wsTLS"`
WsTLS bool `json:"wsTLS"` TlsInfo TLSInfo `json:"tlsInfo"`
TlsInfo TLSInfo `json:"tlsInfo"` Acl bool `json:"acl"`
Acl bool `json:"acl"` AclConf string `json:"aclConf"`
AclConf string `json:"aclConf"` Debug bool `json:"debug"`
Debug bool `json:"debug"` Plugins []string `json:"plugins"`
Plugins []string `json:"plugins"`
}
type RouteInfo struct {
Host string `json:"host"`
Port string `json:"port"`
} }
type TLSInfo struct { type TLSInfo struct {
@@ -78,8 +72,6 @@ func ConfigureConfig(args []string) (*Config, error) {
fs.StringVar(&config.Port, "port", "1883", "Port to listen on.") fs.StringVar(&config.Port, "port", "1883", "Port to listen on.")
fs.StringVar(&config.Port, "p", "1883", "Port to listen on.") fs.StringVar(&config.Port, "p", "1883", "Port to listen on.")
fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on") fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on")
fs.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.")
fs.StringVar(&config.Cluster.Port, "clusterport", "", "Cluster port from which members can connect.")
fs.StringVar(&config.Router, "r", "", "Router who maintenance cluster info") fs.StringVar(&config.Router, "r", "", "Router who maintenance cluster info")
fs.StringVar(&config.Router, "router", "", "Router who maintenance cluster info") fs.StringVar(&config.Router, "router", "", "Router who maintenance cluster info")
fs.StringVar(&config.WsPort, "ws", "", "port for ws to listen on") fs.StringVar(&config.WsPort, "ws", "", "port for ws to listen on")
@@ -161,17 +153,6 @@ func (config *Config) check() error {
} }
} }
if config.Cluster.Port != "" {
if config.Cluster.Host == "" {
config.Cluster.Host = "0.0.0.0"
}
}
if config.Router != "" {
if config.Cluster.Port == "" {
return errors.New("cluster port is null")
}
}
if config.TlsPort != "" { if config.TlsPort != "" {
if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" { if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" {
log.Error("tls config error, no cert or key file.") log.Error("tls config error, no cert or key file.")

View File

@@ -15,7 +15,7 @@ func (c *client) SendInfo() {
if c.status == Disconnected { if c.status == Disconnected {
return return
} }
url := c.info.localIP + ":" + c.broker.config.Cluster.Port url := c.info.localIP + ":10011"
infoMsg := NewInfo(c.broker.id, url, false) infoMsg := NewInfo(c.broker.id, url, false)
err := c.WriterPacket(infoMsg) err := c.WriterPacket(infoMsg)
@@ -102,9 +102,9 @@ func (c *client) ProcessInfo(packet *packets.PublishPacket) {
url, ok := rurl.(string) url, ok := rurl.(string)
if ok { if ok {
exist := b.CheckRemoteExist(rid, url) //todo new rpc client
if !exist { if _, exist := b.rpcClient[rid]; !exist {
//todo new rpc client b.initRPCClient(rid, url)
} }
} }

View File

@@ -88,19 +88,3 @@ func (b *Broker) checkNodeExist(id, url string) bool {
} }
return false return false
} }
func (b *Broker) CheckRemoteExist(remoteID, url string) bool {
exist := false
b.remotes.Range(func(key, value interface{}) bool {
v, ok := value.(*client)
if ok {
if v.route.remoteUrl == url {
v.route.remoteID = remoteID
exist = true
return false
}
}
return true
})
return exist
}

View File

@@ -13,11 +13,7 @@ import (
"google.golang.org/grpc/reflection" "google.golang.org/grpc/reflection"
) )
var ( func (b *Broker) initRPCService() {
rpcClient = make(map[string]pb.HMQServiceClient)
)
func initRPCService() {
lis, err := net.Listen("tcp", ":10011") lis, err := net.Listen("tcp", ":10011")
if err != nil { if err != nil {
log.Error("failed to listen: ", zap.Error(err)) log.Error("failed to listen: ", zap.Error(err))
@@ -27,17 +23,17 @@ func initRPCService() {
s := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{ s := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 30 * time.Minute, Time: 30 * time.Minute,
})) }))
pb.RegisterHMQServiceServer(s, &HMQ{}) pb.RegisterHMQServiceServer(s, &HMQ{b: b})
reflection.Register(s) reflection.Register(s)
if err := s.Serve(lis); err != nil { if err := s.Serve(lis); err != nil {
log.Error("failed to serve: ", zap.Error(err)) log.Error("failed to serve: ", zap.Error(err))
} }
} }
func initRPCClient(url string) { func (b *Broker) initRPCClient(id, url string) {
conn, err := grpc.Dial(url, conn, err := grpc.Dial(url,
grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{ // avoid 'code = Unavailable desc = transport is closing' error grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Minute, Time: 30 * time.Minute,
})) }))
if err != nil { if err != nil {
@@ -45,24 +41,89 @@ func initRPCClient(url string) {
} }
cli := pb.NewHMQServiceClient(conn) cli := pb.NewHMQServiceClient(conn)
rpcClient[url] = cli b.rpcClient[id] = cli
} }
type HMQ struct { type HMQ struct {
b *Broker
} }
func (h *HMQ) QuerySubscribe(ctx context.Context, in *pb.QuerySubscribeRequest) (*pb.Response, error) { func (h *HMQ) QuerySubscribe(ctx context.Context, in *pb.QuerySubscribeRequest) (*pb.Response, error) {
return nil, nil resp := &pb.Response{
RetCode: 0,
}
topic := in.Topic
qos := in.Qos
if qos > 1 {
resp.RetCode = 404
return resp, nil
}
b := h.b
var subs []interface{}
var qoss []byte
err := b.topicsMgr.Subscribers([]byte(topic), byte(qos), &subs, &qoss)
if err != nil {
log.Error("search sub client error, ", zap.Error(err))
resp.RetCode = 404
}
if len(subs) == 0 {
resp.RetCode = 404
}
return resp, nil
} }
func (h *HMQ) QueryConnect(ctx context.Context, in *pb.QueryConnectRequest) (*pb.Response, error) { func (h *HMQ) QueryConnect(ctx context.Context, in *pb.QueryConnectRequest) (*pb.Response, error) {
return nil, nil resp := &pb.Response{
RetCode: 0,
}
b := h.b
cli, exist := b.clients.Load(in.ClientID)
if exist {
client := cli.(*client)
client.Close()
}
return resp, nil
} }
func (h *HMQ) DeliverMessage(ctx context.Context, in *pb.DeliverMessageRequest) (*pb.Response, error) { func (h *HMQ) DeliverMessage(ctx context.Context, in *pb.DeliverMessageRequest) (*pb.Response, error) {
return nil, nil b := h.b
p := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
p.TopicName = in.Topic
p.Payload = in.Payload
p.Retain = false
b.PublishMessage(p)
resp := &pb.Response{
RetCode: 0,
}
return resp, nil
} }
func (b *Broker) DeliverMessage(packet *packets.PublishPacket) { func (b *Broker) DeliverMessage(packet *packets.PublishPacket) {
//TODO Query and Deliver Message //TODO Query and Deliver Message
for _, client := range b.rpcClient {
resp, err := client.QuerySubscribe(context.Background(), &pb.QuerySubscribeRequest{Topic: packet.TopicName, Qos: int32(packet.Qos)})
if err != nil {
log.Error("rpc request error:", zap.Error(err))
continue
}
if resp.RetCode == 0 {
client.DeliverMessage(context.Background(), &pb.DeliverMessageRequest{Topic: packet.TopicName, Payload: packet.Payload})
}
}
}
func (b *Broker) QueryConnect(clientID string) {
//TODO Query and Deliver Message
for _, client := range b.rpcClient {
client.QueryConnect(context.Background(), &pb.QueryConnectRequest{ClientID: clientID})
}
} }

View File

@@ -26,6 +26,7 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type QuerySubscribeRequest struct { type QuerySubscribeRequest struct {
Topic string `protobuf:"bytes,1,opt,name=Topic,proto3" json:"Topic,omitempty"` Topic string `protobuf:"bytes,1,opt,name=Topic,proto3" json:"Topic,omitempty"`
Qos int32 `protobuf:"varint,2,opt,name=Qos,proto3" json:"Qos,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@@ -63,6 +64,13 @@ func (m *QuerySubscribeRequest) GetTopic() string {
return "" return ""
} }
func (m *QuerySubscribeRequest) GetQos() int32 {
if m != nil {
return m.Qos
}
return 0
}
type QueryConnectRequest struct { type QueryConnectRequest struct {
ClientID string `protobuf:"bytes,1,opt,name=ClientID,proto3" json:"ClientID,omitempty"` ClientID string `protobuf:"bytes,1,opt,name=ClientID,proto3" json:"ClientID,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
@@ -105,6 +113,7 @@ func (m *QueryConnectRequest) GetClientID() string {
type DeliverMessageRequest struct { type DeliverMessageRequest struct {
Topic string `protobuf:"bytes,1,opt,name=Topic,proto3" json:"Topic,omitempty"` Topic string `protobuf:"bytes,1,opt,name=Topic,proto3" json:"Topic,omitempty"`
Payload []byte `protobuf:"bytes,2,opt,name=Payload,proto3" json:"Payload,omitempty"` Payload []byte `protobuf:"bytes,2,opt,name=Payload,proto3" json:"Payload,omitempty"`
Qos int32 `protobuf:"varint,3,opt,name=Qos,proto3" json:"Qos,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@@ -149,9 +158,16 @@ func (m *DeliverMessageRequest) GetPayload() []byte {
return nil return nil
} }
func (m *DeliverMessageRequest) GetQos() int32 {
if m != nil {
return m.Qos
}
return 0
}
type Response struct { type Response struct {
RetCode int32 `protobuf:"varint,2,opt,name=RetCode,proto3" json:"RetCode,omitempty"` RetCode int32 `protobuf:"varint,1,opt,name=RetCode,proto3" json:"RetCode,omitempty"`
Message string `protobuf:"bytes,3,opt,name=Message,proto3" json:"Message,omitempty"` Message string `protobuf:"bytes,2,opt,name=Message,proto3" json:"Message,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@@ -206,23 +222,24 @@ func init() {
func init() { proto.RegisterFile("hmq.proto", fileDescriptor_935f0990d4a84183) } func init() { proto.RegisterFile("hmq.proto", fileDescriptor_935f0990d4a84183) }
var fileDescriptor_935f0990d4a84183 = []byte{ var fileDescriptor_935f0990d4a84183 = []byte{
// 247 bytes of a gzipped FileDescriptorProto // 263 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x91, 0xc1, 0x4a, 0x03, 0x31, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x91, 0x41, 0x4b, 0xc3, 0x40,
0x10, 0x40, 0x5d, 0xa5, 0xda, 0x1d, 0x4a, 0x0f, 0xb1, 0x2d, 0xa1, 0xa7, 0x92, 0x53, 0x2f, 0x06, 0x10, 0x85, 0x8d, 0x25, 0xda, 0x0c, 0xa5, 0xc8, 0xda, 0x96, 0xd0, 0x53, 0xc9, 0xa9, 0xa7, 0x85,
0xaa, 0xf4, 0xea, 0x65, 0x0b, 0xea, 0xa1, 0x60, 0x53, 0x7f, 0x60, 0x37, 0x1d, 0x34, 0xb0, 0x26, 0x2a, 0xbd, 0xea, 0x21, 0x3d, 0xe8, 0xa1, 0x60, 0xb6, 0x5e, 0x3c, 0x26, 0xe9, 0xa0, 0x0b, 0x31,
0xdb, 0x24, 0x5b, 0xe8, 0x47, 0xf9, 0x8f, 0xd2, 0xd8, 0x88, 0x81, 0xc5, 0xe3, 0x23, 0x6f, 0x32, 0x93, 0xee, 0x6e, 0x0a, 0xfd, 0x51, 0xfe, 0x47, 0xc9, 0x9a, 0x55, 0x03, 0x81, 0xde, 0xf2, 0x11,
0xe4, 0x05, 0xf2, 0x8f, 0xcf, 0x3d, 0x6f, 0xac, 0xf1, 0x86, 0xdd, 0xc1, 0x78, 0xd3, 0xa2, 0x3d, 0xde, 0x7b, 0xc3, 0xb7, 0x10, 0x7c, 0x7c, 0x1e, 0x78, 0xa5, 0xc8, 0x50, 0xf4, 0x08, 0xd3, 0xa4,
0x6e, 0xdb, 0xca, 0x49, 0xab, 0x2a, 0x14, 0xb8, 0x6f, 0xd1, 0x79, 0x32, 0x82, 0xde, 0x9b, 0x69, 0x46, 0x75, 0xda, 0xd5, 0x99, 0xce, 0x95, 0xcc, 0x50, 0xe0, 0xa1, 0x46, 0x6d, 0xd8, 0x04, 0xfc,
0x94, 0xa4, 0xd9, 0x2c, 0x9b, 0xe7, 0xe2, 0x07, 0xd8, 0x02, 0x6e, 0x83, 0x5e, 0x18, 0xad, 0x51, 0x57, 0xaa, 0x64, 0x1e, 0x7a, 0x0b, 0x6f, 0x19, 0x88, 0x1f, 0x60, 0x37, 0x30, 0x48, 0x48, 0x87,
0xfa, 0x28, 0x4f, 0xa1, 0x5f, 0xd4, 0x0a, 0xb5, 0x7f, 0x59, 0x9d, 0xfd, 0x5f, 0x66, 0x4f, 0x30, 0x97, 0x0b, 0x6f, 0xe9, 0x8b, 0xe6, 0x33, 0x5a, 0xc1, 0xad, 0x2d, 0x88, 0xa9, 0x2c, 0x31, 0x37,
0x5e, 0x61, 0xad, 0x0e, 0x68, 0xd7, 0xe8, 0x5c, 0xf9, 0xfe, 0xff, 0x06, 0x42, 0xe1, 0xe6, 0xb5, 0x2e, 0x3e, 0x87, 0x61, 0x5c, 0x48, 0x2c, 0xcd, 0xf3, 0xa6, 0x6d, 0xf8, 0xe5, 0xe8, 0x0d, 0xa6,
0x3c, 0xd6, 0xa6, 0xdc, 0xd1, 0xcb, 0x59, 0x36, 0x1f, 0x88, 0x88, 0xec, 0x11, 0xfa, 0x02, 0x5d, 0x1b, 0x2c, 0xe4, 0x11, 0xd5, 0x16, 0xb5, 0x4e, 0xdf, 0xcf, 0x6c, 0x86, 0x70, 0xfd, 0x92, 0x9e,
0x63, 0xb4, 0xc3, 0x93, 0x25, 0xd0, 0x17, 0x66, 0x87, 0xc1, 0xea, 0x89, 0x88, 0xa7, 0x93, 0xf3, 0x0a, 0x4a, 0xf7, 0x76, 0x77, 0x24, 0x1c, 0xba, 0x6b, 0x06, 0x7f, 0xd7, 0x3c, 0xc0, 0x50, 0xa0,
0x1e, 0x7a, 0x15, 0xee, 0x8d, 0x78, 0xff, 0x95, 0x01, 0x3c, 0xaf, 0x37, 0x5b, 0xb4, 0x07, 0x25, 0xae, 0xa8, 0xd4, 0xd8, 0xe4, 0x04, 0x9a, 0x98, 0xf6, 0x68, 0xfb, 0x7c, 0xe1, 0xb0, 0xf9, 0xd3,
0x91, 0x2c, 0x61, 0x98, 0xbe, 0x9c, 0x4c, 0x78, 0x67, 0x8a, 0x69, 0xce, 0xe3, 0x5e, 0x76, 0x41, 0x2e, 0xdb, 0xc6, 0x40, 0x38, 0xbc, 0xfb, 0xf2, 0x00, 0x9e, 0xb6, 0xc9, 0x0e, 0xd5, 0x51, 0xe6,
0x16, 0x30, 0xf8, 0x5b, 0x80, 0x8c, 0x78, 0x47, 0x90, 0x74, 0x64, 0x09, 0xc3, 0xb4, 0x00, 0x99, 0xc8, 0xd6, 0x30, 0xee, 0xda, 0x61, 0x33, 0xde, 0xab, 0x6b, 0x1e, 0x70, 0xb7, 0x1b, 0x5d, 0xb0,
0xf0, 0xce, 0x24, 0xc9, 0x58, 0x75, 0x1d, 0x7e, 0xe8, 0xe1, 0x3b, 0x00, 0x00, 0xff, 0xff, 0x0f, 0x15, 0x8c, 0xfe, 0x3b, 0x61, 0x13, 0xde, 0xa3, 0xa8, 0x1b, 0x59, 0xc3, 0xb8, 0xeb, 0x84, 0xcd,
0x76, 0x58, 0x73, 0xae, 0x01, 0x00, 0x00, 0x78, 0xaf, 0xa4, 0x4e, 0x2c, 0xbb, 0xb2, 0xaf, 0x78, 0xff, 0x1d, 0x00, 0x00, 0xff, 0xff, 0xa0,
0x89, 0x96, 0x30, 0xd2, 0x01, 0x00, 0x00,
} }
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.

View File

@@ -2,6 +2,7 @@ syntax = "proto3";
message QuerySubscribeRequest { message QuerySubscribeRequest {
string Topic = 1; string Topic = 1;
int32 Qos = 2;
} }
message QueryConnectRequest { message QueryConnectRequest {
@@ -11,11 +12,12 @@ message QueryConnectRequest {
message DeliverMessageRequest { message DeliverMessageRequest {
string Topic = 1; string Topic = 1;
bytes Payload = 2; bytes Payload = 2;
int32 Qos = 3;
} }
message Response { message Response {
int32 RetCode = 2; int32 RetCode = 1;
string Message = 3; string Message = 2;
} }
//Service define //Service define