From 3ff0f170c6d241ec000c9fad62c5c8f0d113674a Mon Sep 17 00:00:00 2001 From: "joy.zhou" Date: Wed, 31 Jul 2019 14:34:56 +0800 Subject: [PATCH] update --- broker/broker.go | 8 +++-- broker/config.go | 47 ++++++++------------------ broker/info.go | 8 ++--- broker/router.go | 16 --------- broker/rpc.go | 85 +++++++++++++++++++++++++++++++++++++++++------- grpc/hmq.pb.go | 55 ++++++++++++++++++++----------- grpc/hmq.proto | 6 ++-- 7 files changed, 137 insertions(+), 88 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index e6a760a..8effdd2 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -14,6 +14,7 @@ import ( "github.com/fhmq/hmq/broker/lib/sessions" "github.com/fhmq/hmq/broker/lib/topics" + pb "github.com/fhmq/hmq/grpc" "github.com/fhmq/hmq/plugins" "github.com/eclipse/paho.mqtt.golang/packets" @@ -42,11 +43,11 @@ type Broker struct { tlsConfig *tls.Config wpool *pool.WorkerPool clients sync.Map - remotes sync.Map nodes map[string]interface{} clusterPool chan *Message topicsMgr *topics.Manager sessionMgr *sessions.Manager + rpcClient map[string]pb.HMQServiceClient pluginAuthHTTP bool pluginKafka bool } @@ -130,7 +131,7 @@ func (b *Broker) Start() { return } - go initRPCService() + go b.initRPCService() go InitHTTPMoniter(b) @@ -369,6 +370,8 @@ func (b *Broker) handleConnection(typ int, conn net.Conn) { if ok { ol.Close() } + } else { + b.QueryConnect(cid) } b.clients.Store(cid, c) b.OnlineOfflineNotification(cid, true) @@ -426,6 +429,7 @@ func (b *Broker) PublishMessage(packet *packets.PublishPacket) { publish(sub, packet) } + b.DeliverMessage(packet) } func (b *Broker) OnlineOfflineNotification(clientID string, online bool) { diff --git a/broker/config.go b/broker/config.go index dca2c86..af7255b 100644 --- a/broker/config.go +++ b/broker/config.go @@ -17,26 +17,20 @@ import ( ) type Config struct { - Worker int `json:"workerNum"` - Host string `json:"host"` - Port string `json:"port"` - Cluster RouteInfo `json:"cluster"` - Router string `json:"router"` - TlsHost string `json:"tlsHost"` - TlsPort string `json:"tlsPort"` - WsPath string `json:"wsPath"` - WsPort string `json:"wsPort"` - WsTLS bool `json:"wsTLS"` - TlsInfo TLSInfo `json:"tlsInfo"` - Acl bool `json:"acl"` - AclConf string `json:"aclConf"` - Debug bool `json:"debug"` - Plugins []string `json:"plugins"` -} - -type RouteInfo struct { - Host string `json:"host"` - Port string `json:"port"` + Worker int `json:"workerNum"` + Host string `json:"host"` + Port string `json:"port"` + Router string `json:"router"` + TlsHost string `json:"tlsHost"` + TlsPort string `json:"tlsPort"` + WsPath string `json:"wsPath"` + WsPort string `json:"wsPort"` + WsTLS bool `json:"wsTLS"` + TlsInfo TLSInfo `json:"tlsInfo"` + Acl bool `json:"acl"` + AclConf string `json:"aclConf"` + Debug bool `json:"debug"` + Plugins []string `json:"plugins"` } type TLSInfo struct { @@ -78,8 +72,6 @@ func ConfigureConfig(args []string) (*Config, error) { fs.StringVar(&config.Port, "port", "1883", "Port to listen on.") fs.StringVar(&config.Port, "p", "1883", "Port to listen on.") fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on") - fs.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.") - fs.StringVar(&config.Cluster.Port, "clusterport", "", "Cluster port from which members can connect.") fs.StringVar(&config.Router, "r", "", "Router who maintenance cluster info") fs.StringVar(&config.Router, "router", "", "Router who maintenance cluster info") fs.StringVar(&config.WsPort, "ws", "", "port for ws to listen on") @@ -161,17 +153,6 @@ func (config *Config) check() error { } } - if config.Cluster.Port != "" { - if config.Cluster.Host == "" { - config.Cluster.Host = "0.0.0.0" - } - } - if config.Router != "" { - if config.Cluster.Port == "" { - return errors.New("cluster port is null") - } - } - if config.TlsPort != "" { if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" { log.Error("tls config error, no cert or key file.") diff --git a/broker/info.go b/broker/info.go index 8338280..0c8f541 100644 --- a/broker/info.go +++ b/broker/info.go @@ -15,7 +15,7 @@ func (c *client) SendInfo() { if c.status == Disconnected { return } - url := c.info.localIP + ":" + c.broker.config.Cluster.Port + url := c.info.localIP + ":10011" infoMsg := NewInfo(c.broker.id, url, false) err := c.WriterPacket(infoMsg) @@ -102,9 +102,9 @@ func (c *client) ProcessInfo(packet *packets.PublishPacket) { url, ok := rurl.(string) if ok { - exist := b.CheckRemoteExist(rid, url) - if !exist { - //todo new rpc client + //todo new rpc client + if _, exist := b.rpcClient[rid]; !exist { + b.initRPCClient(rid, url) } } diff --git a/broker/router.go b/broker/router.go index d80ac6d..7718546 100644 --- a/broker/router.go +++ b/broker/router.go @@ -88,19 +88,3 @@ func (b *Broker) checkNodeExist(id, url string) bool { } return false } - -func (b *Broker) CheckRemoteExist(remoteID, url string) bool { - exist := false - b.remotes.Range(func(key, value interface{}) bool { - v, ok := value.(*client) - if ok { - if v.route.remoteUrl == url { - v.route.remoteID = remoteID - exist = true - return false - } - } - return true - }) - return exist -} diff --git a/broker/rpc.go b/broker/rpc.go index 98b24e1..9f86381 100644 --- a/broker/rpc.go +++ b/broker/rpc.go @@ -13,11 +13,7 @@ import ( "google.golang.org/grpc/reflection" ) -var ( - rpcClient = make(map[string]pb.HMQServiceClient) -) - -func initRPCService() { +func (b *Broker) initRPCService() { lis, err := net.Listen("tcp", ":10011") if err != nil { log.Error("failed to listen: ", zap.Error(err)) @@ -27,17 +23,17 @@ func initRPCService() { s := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{ Time: 30 * time.Minute, })) - pb.RegisterHMQServiceServer(s, &HMQ{}) + pb.RegisterHMQServiceServer(s, &HMQ{b: b}) reflection.Register(s) if err := s.Serve(lis); err != nil { log.Error("failed to serve: ", zap.Error(err)) } } -func initRPCClient(url string) { +func (b *Broker) initRPCClient(id, url string) { conn, err := grpc.Dial(url, grpc.WithInsecure(), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ // avoid 'code = Unavailable desc = transport is closing' error + grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 30 * time.Minute, })) if err != nil { @@ -45,24 +41,89 @@ func initRPCClient(url string) { } cli := pb.NewHMQServiceClient(conn) - rpcClient[url] = cli + b.rpcClient[id] = cli } type HMQ struct { + b *Broker } func (h *HMQ) QuerySubscribe(ctx context.Context, in *pb.QuerySubscribeRequest) (*pb.Response, error) { - return nil, nil + resp := &pb.Response{ + RetCode: 0, + } + topic := in.Topic + qos := in.Qos + if qos > 1 { + resp.RetCode = 404 + return resp, nil + } + + b := h.b + var subs []interface{} + var qoss []byte + err := b.topicsMgr.Subscribers([]byte(topic), byte(qos), &subs, &qoss) + if err != nil { + log.Error("search sub client error, ", zap.Error(err)) + resp.RetCode = 404 + } + + if len(subs) == 0 { + resp.RetCode = 404 + } + + return resp, nil } func (h *HMQ) QueryConnect(ctx context.Context, in *pb.QueryConnectRequest) (*pb.Response, error) { - return nil, nil + resp := &pb.Response{ + RetCode: 0, + } + + b := h.b + cli, exist := b.clients.Load(in.ClientID) + if exist { + client := cli.(*client) + client.Close() + } + + return resp, nil } func (h *HMQ) DeliverMessage(ctx context.Context, in *pb.DeliverMessageRequest) (*pb.Response, error) { - return nil, nil + b := h.b + p := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket) + p.TopicName = in.Topic + p.Payload = in.Payload + p.Retain = false + b.PublishMessage(p) + + resp := &pb.Response{ + RetCode: 0, + } + return resp, nil } func (b *Broker) DeliverMessage(packet *packets.PublishPacket) { //TODO Query and Deliver Message + for _, client := range b.rpcClient { + + resp, err := client.QuerySubscribe(context.Background(), &pb.QuerySubscribeRequest{Topic: packet.TopicName, Qos: int32(packet.Qos)}) + if err != nil { + log.Error("rpc request error:", zap.Error(err)) + continue + } + + if resp.RetCode == 0 { + client.DeliverMessage(context.Background(), &pb.DeliverMessageRequest{Topic: packet.TopicName, Payload: packet.Payload}) + } + + } +} + +func (b *Broker) QueryConnect(clientID string) { + //TODO Query and Deliver Message + for _, client := range b.rpcClient { + client.QueryConnect(context.Background(), &pb.QueryConnectRequest{ClientID: clientID}) + } } diff --git a/grpc/hmq.pb.go b/grpc/hmq.pb.go index 1a7cdca..d282a12 100644 --- a/grpc/hmq.pb.go +++ b/grpc/hmq.pb.go @@ -26,6 +26,7 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package type QuerySubscribeRequest struct { Topic string `protobuf:"bytes,1,opt,name=Topic,proto3" json:"Topic,omitempty"` + Qos int32 `protobuf:"varint,2,opt,name=Qos,proto3" json:"Qos,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -63,6 +64,13 @@ func (m *QuerySubscribeRequest) GetTopic() string { return "" } +func (m *QuerySubscribeRequest) GetQos() int32 { + if m != nil { + return m.Qos + } + return 0 +} + type QueryConnectRequest struct { ClientID string `protobuf:"bytes,1,opt,name=ClientID,proto3" json:"ClientID,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -105,6 +113,7 @@ func (m *QueryConnectRequest) GetClientID() string { type DeliverMessageRequest struct { Topic string `protobuf:"bytes,1,opt,name=Topic,proto3" json:"Topic,omitempty"` Payload []byte `protobuf:"bytes,2,opt,name=Payload,proto3" json:"Payload,omitempty"` + Qos int32 `protobuf:"varint,3,opt,name=Qos,proto3" json:"Qos,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -149,9 +158,16 @@ func (m *DeliverMessageRequest) GetPayload() []byte { return nil } +func (m *DeliverMessageRequest) GetQos() int32 { + if m != nil { + return m.Qos + } + return 0 +} + type Response struct { - RetCode int32 `protobuf:"varint,2,opt,name=RetCode,proto3" json:"RetCode,omitempty"` - Message string `protobuf:"bytes,3,opt,name=Message,proto3" json:"Message,omitempty"` + RetCode int32 `protobuf:"varint,1,opt,name=RetCode,proto3" json:"RetCode,omitempty"` + Message string `protobuf:"bytes,2,opt,name=Message,proto3" json:"Message,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -206,23 +222,24 @@ func init() { func init() { proto.RegisterFile("hmq.proto", fileDescriptor_935f0990d4a84183) } var fileDescriptor_935f0990d4a84183 = []byte{ - // 247 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x91, 0xc1, 0x4a, 0x03, 0x31, - 0x10, 0x40, 0x5d, 0xa5, 0xda, 0x1d, 0x4a, 0x0f, 0xb1, 0x2d, 0xa1, 0xa7, 0x92, 0x53, 0x2f, 0x06, - 0xaa, 0xf4, 0xea, 0x65, 0x0b, 0xea, 0xa1, 0x60, 0x53, 0x7f, 0x60, 0x37, 0x1d, 0x34, 0xb0, 0x26, - 0xdb, 0x24, 0x5b, 0xe8, 0x47, 0xf9, 0x8f, 0xd2, 0xd8, 0x88, 0x81, 0xc5, 0xe3, 0x23, 0x6f, 0x32, - 0xe4, 0x05, 0xf2, 0x8f, 0xcf, 0x3d, 0x6f, 0xac, 0xf1, 0x86, 0xdd, 0xc1, 0x78, 0xd3, 0xa2, 0x3d, - 0x6e, 0xdb, 0xca, 0x49, 0xab, 0x2a, 0x14, 0xb8, 0x6f, 0xd1, 0x79, 0x32, 0x82, 0xde, 0x9b, 0x69, - 0x94, 0xa4, 0xd9, 0x2c, 0x9b, 0xe7, 0xe2, 0x07, 0xd8, 0x02, 0x6e, 0x83, 0x5e, 0x18, 0xad, 0x51, - 0xfa, 0x28, 0x4f, 0xa1, 0x5f, 0xd4, 0x0a, 0xb5, 0x7f, 0x59, 0x9d, 0xfd, 0x5f, 0x66, 0x4f, 0x30, - 0x5e, 0x61, 0xad, 0x0e, 0x68, 0xd7, 0xe8, 0x5c, 0xf9, 0xfe, 0xff, 0x06, 0x42, 0xe1, 0xe6, 0xb5, - 0x3c, 0xd6, 0xa6, 0xdc, 0xd1, 0xcb, 0x59, 0x36, 0x1f, 0x88, 0x88, 0xec, 0x11, 0xfa, 0x02, 0x5d, - 0x63, 0xb4, 0xc3, 0x93, 0x25, 0xd0, 0x17, 0x66, 0x87, 0xc1, 0xea, 0x89, 0x88, 0xa7, 0x93, 0xf3, - 0x1e, 0x7a, 0x15, 0xee, 0x8d, 0x78, 0xff, 0x95, 0x01, 0x3c, 0xaf, 0x37, 0x5b, 0xb4, 0x07, 0x25, - 0x91, 0x2c, 0x61, 0x98, 0xbe, 0x9c, 0x4c, 0x78, 0x67, 0x8a, 0x69, 0xce, 0xe3, 0x5e, 0x76, 0x41, - 0x16, 0x30, 0xf8, 0x5b, 0x80, 0x8c, 0x78, 0x47, 0x90, 0x74, 0x64, 0x09, 0xc3, 0xb4, 0x00, 0x99, - 0xf0, 0xce, 0x24, 0xc9, 0x58, 0x75, 0x1d, 0x7e, 0xe8, 0xe1, 0x3b, 0x00, 0x00, 0xff, 0xff, 0x0f, - 0x76, 0x58, 0x73, 0xae, 0x01, 0x00, 0x00, + // 263 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x91, 0x41, 0x4b, 0xc3, 0x40, + 0x10, 0x85, 0x8d, 0x25, 0xda, 0x0c, 0xa5, 0xc8, 0xda, 0x96, 0xd0, 0x53, 0xc9, 0xa9, 0xa7, 0x85, + 0x2a, 0xbd, 0xea, 0x21, 0x3d, 0xe8, 0xa1, 0x60, 0xb6, 0x5e, 0x3c, 0x26, 0xe9, 0xa0, 0x0b, 0x31, + 0x93, 0xee, 0x6e, 0x0a, 0xfd, 0x51, 0xfe, 0x47, 0xc9, 0x9a, 0x55, 0x03, 0x81, 0xde, 0xf2, 0x11, + 0xde, 0x7b, 0xc3, 0xb7, 0x10, 0x7c, 0x7c, 0x1e, 0x78, 0xa5, 0xc8, 0x50, 0xf4, 0x08, 0xd3, 0xa4, + 0x46, 0x75, 0xda, 0xd5, 0x99, 0xce, 0x95, 0xcc, 0x50, 0xe0, 0xa1, 0x46, 0x6d, 0xd8, 0x04, 0xfc, + 0x57, 0xaa, 0x64, 0x1e, 0x7a, 0x0b, 0x6f, 0x19, 0x88, 0x1f, 0x60, 0x37, 0x30, 0x48, 0x48, 0x87, + 0x97, 0x0b, 0x6f, 0xe9, 0x8b, 0xe6, 0x33, 0x5a, 0xc1, 0xad, 0x2d, 0x88, 0xa9, 0x2c, 0x31, 0x37, + 0x2e, 0x3e, 0x87, 0x61, 0x5c, 0x48, 0x2c, 0xcd, 0xf3, 0xa6, 0x6d, 0xf8, 0xe5, 0xe8, 0x0d, 0xa6, + 0x1b, 0x2c, 0xe4, 0x11, 0xd5, 0x16, 0xb5, 0x4e, 0xdf, 0xcf, 0x6c, 0x86, 0x70, 0xfd, 0x92, 0x9e, + 0x0a, 0x4a, 0xf7, 0x76, 0x77, 0x24, 0x1c, 0xba, 0x6b, 0x06, 0x7f, 0xd7, 0x3c, 0xc0, 0x50, 0xa0, + 0xae, 0xa8, 0xd4, 0xd8, 0xe4, 0x04, 0x9a, 0x98, 0xf6, 0x68, 0xfb, 0x7c, 0xe1, 0xb0, 0xf9, 0xd3, + 0x2e, 0xdb, 0xc6, 0x40, 0x38, 0xbc, 0xfb, 0xf2, 0x00, 0x9e, 0xb6, 0xc9, 0x0e, 0xd5, 0x51, 0xe6, + 0xc8, 0xd6, 0x30, 0xee, 0xda, 0x61, 0x33, 0xde, 0xab, 0x6b, 0x1e, 0x70, 0xb7, 0x1b, 0x5d, 0xb0, + 0x15, 0x8c, 0xfe, 0x3b, 0x61, 0x13, 0xde, 0xa3, 0xa8, 0x1b, 0x59, 0xc3, 0xb8, 0xeb, 0x84, 0xcd, + 0x78, 0xaf, 0xa4, 0x4e, 0x2c, 0xbb, 0xb2, 0xaf, 0x78, 0xff, 0x1d, 0x00, 0x00, 0xff, 0xff, 0xa0, + 0x89, 0x96, 0x30, 0xd2, 0x01, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/grpc/hmq.proto b/grpc/hmq.proto index c3fca05..d3cf84f 100644 --- a/grpc/hmq.proto +++ b/grpc/hmq.proto @@ -2,6 +2,7 @@ syntax = "proto3"; message QuerySubscribeRequest { string Topic = 1; + int32 Qos = 2; } message QueryConnectRequest { @@ -11,11 +12,12 @@ message QueryConnectRequest { message DeliverMessageRequest { string Topic = 1; bytes Payload = 2; + int32 Qos = 3; } message Response { - int32 RetCode = 2; - string Message = 3; + int32 RetCode = 1; + string Message = 2; } //Service define