From 393dfaa1c89973ed7b605ab29ba1753dc2366f3f Mon Sep 17 00:00:00 2001 From: chowyu08 Date: Sat, 26 Aug 2017 21:08:25 +0800 Subject: [PATCH] 'acl' --- README.md | 2 + broker/auth.go | 29 +++++++++ broker/broker.go | 14 +++++ broker/client.go | 20 +++--- broker/retain.go | 4 +- conf/acl.conf | 12 ++++ lib/acl/acl.go | 114 ++++++++++++++++++++++++++++++++++ lib/acl/aclcheck.go | 148 ++++++++++++++++++++++++++++++++++++++++++++ lib/acl/spilt.go | 52 ++++++++++++++++ 9 files changed, 387 insertions(+), 8 deletions(-) create mode 100644 broker/auth.go create mode 100644 conf/acl.conf create mode 100644 lib/acl/acl.go create mode 100644 lib/acl/aclcheck.go create mode 100644 lib/acl/spilt.go diff --git a/README.md b/README.md index 9c49c32..3a13d66 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,8 @@ $ go run main.go * TLS/SSL Support +* Flexible ACL + ### QUEUE SUBSCRIBE ~~~ | Prefix | Examples | diff --git a/broker/auth.go b/broker/auth.go new file mode 100644 index 0000000..a24ed73 --- /dev/null +++ b/broker/auth.go @@ -0,0 +1,29 @@ +package broker + +import ( + "hmq/lib/acl" + "strings" +) + +const ( + PUB = 1 + SUB = 2 +) + +func (c *client) CheckTopicAuth(typ int, topic string) bool { + if !c.broker.config.Acl { + return true + } + if strings.HasPrefix(topic, "$queue/") { + topic = string([]byte(topic)[7:]) + if topic == "" { + return false + } + } + ip := c.info.remoteIP + username := string(c.info.username) + clientid := string(c.info.clientID) + aclInfo := c.broker.AclConfig + return acl.CheckTopicAuth(aclInfo, typ, ip, username, clientid, topic) + +} diff --git a/broker/broker.go b/broker/broker.go index 57b7539..6268b88 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -2,6 +2,7 @@ package broker import ( "crypto/tls" + "hmq/lib/acl" "hmq/lib/message" "net" "net/http" @@ -18,6 +19,7 @@ type Broker struct { cid uint64 config *Config tlsConfig *tls.Config + AclConfig *acl.ACLConfig clients cMap routes cMap remotes cMap @@ -45,10 +47,22 @@ func NewBroker(config *Config) *Broker { } b.tlsConfig = tlsconfig } + if b.config.Acl { + aclconfig, err := acl.AclConfigLoad(b.config.AclConf) + if err != nil { + log.Error("Load acl conf error: ", err) + return nil + } + b.AclConfig = aclconfig + } return b } func (b *Broker) Start() { + if b == nil { + log.Error("broker is null") + return + } if b.config.Port != "" { go b.StartListening(CLIENT) } diff --git a/broker/client.go b/broker/client.go index 4345733..0436941 100644 --- a/broker/client.go +++ b/broker/client.go @@ -146,11 +146,16 @@ func (c *client) ProcessPublish(buf []byte) { c.Close() return } + topic := msg.Topic() + + if c.typ != CLIENT || !c.CheckTopicAuth(PUB, string(topic)) { + return + } c.ProcessPublishMessage(buf, msg) if msg.Retain() { if b := c.broker; b != nil { - err := b.rl.Insert(msg.Topic(), buf) + err := b.rl.Insert(topic, buf) if err != nil { log.Error("Insert Retain Message error: ", err) } @@ -246,12 +251,13 @@ func (c *client) ProcessSubscribe(buf []byte) { for i, t := range topics { topic := string(t) //check topic auth for client - // if !c.CheckTopicAuth(topic, SUB) { - // log.Error("CheckSubAuth failed") - // retcodes = append(retcodes, message.QosFailure) - // continue - // } - + if c.typ == CLIENT { + if !c.CheckTopicAuth(SUB, topic) { + log.Error("CheckSubAuth failed") + retcodes = append(retcodes, message.QosFailure) + continue + } + } if _, exist := c.subs[topic]; !exist { queue := false if strings.HasPrefix(topic, "$queue/") { diff --git a/broker/retain.go b/broker/retain.go index 8752ec9..1f5b32f 100644 --- a/broker/retain.go +++ b/broker/retain.go @@ -1,6 +1,8 @@ package broker -import "sync" +import ( + "sync" +) type RetainList struct { sync.RWMutex diff --git a/conf/acl.conf b/conf/acl.conf new file mode 100644 index 0000000..c659321 --- /dev/null +++ b/conf/acl.conf @@ -0,0 +1,12 @@ +## pub 1 , sub 2, pubsub 3 +## %c is clientid , %s is username +##auth type value pub/sub topic +allow ip 127.0.0.1 2 $SYS/# +allow clientid 0001 3 # +deny username admin 3 # +allow username joy 3 /test,hello/world +allow clientid * 1 toCloud/%c +allow username * 1 toCloud/%u +allow clientid * 2 toDevice/%c +allow username * 2 toDevice/%u +deny clientid * 3 # diff --git a/lib/acl/acl.go b/lib/acl/acl.go new file mode 100644 index 0000000..9cfe14a --- /dev/null +++ b/lib/acl/acl.go @@ -0,0 +1,114 @@ +package acl + +import ( + "bufio" + "errors" + "io" + "os" + "strconv" + "strings" +) + +const ( + PUB = 1 + SUB = 2 + PUBSUB = 3 + CLIENTID = "clientid" + USERNAME = "username" + IP = "ip" + ALLOW = "allow" + DENY = "deny" +) + +type AuthInfo struct { + Auth string + Typ string + Val string + PubSub int + Topics []string +} + +type ACLConfig struct { + File string + Info []*AuthInfo +} + +func AclConfigLoad(file string) (*ACLConfig, error) { + if file == "" { + file = "./conf/acl.conf" + } + aclconifg := &ACLConfig{ + File: file, + Info: make([]*AuthInfo, 0, 4), + } + err := aclconifg.Prase() + if err != nil { + return nil, err + } + return aclconifg, err +} + +func (c *ACLConfig) Prase() error { + f, err := os.Open(c.File) + defer f.Close() + if err != nil { + return err + } + buf := bufio.NewReader(f) + var parseErr error + for { + line, err := buf.ReadString('\n') + line = strings.TrimSpace(line) + if isCommentOut(line) { + continue + } + if line == "" { + return parseErr + } + // fmt.Println(line) + tmpArr := strings.Fields(line) + if len(tmpArr) != 5 { + parseErr = errors.New("\"" + line + "\" format is error") + break + } + if tmpArr[0] != ALLOW && tmpArr[0] != DENY { + parseErr = errors.New("\"" + line + "\" format is error") + break + } + if tmpArr[1] != CLIENTID && tmpArr[1] != USERNAME && tmpArr[1] != IP { + parseErr = errors.New("\"" + line + "\" format is error") + break + } + var pubsub int + pubsub, err = strconv.Atoi(tmpArr[3]) + if err != nil { + parseErr = errors.New("\"" + line + "\" format is error") + break + } + topicStr := strings.Replace(tmpArr[4], " ", "", -1) + topicStr = strings.Replace(topicStr, "\n", "", -1) + topics := strings.Split(topicStr, ",") + tmpAuth := &AuthInfo{ + Auth: tmpArr[0], + Typ: tmpArr[1], + Val: tmpArr[2], + Topics: topics, + PubSub: pubsub, + } + c.Info = append(c.Info, tmpAuth) + if err != nil { + if err != io.EOF { + parseErr = err + } + break + } + } + return parseErr +} +func isCommentOut(line string) bool { + if strings.HasPrefix(line, "#") || strings.HasPrefix(line, ";") || strings.HasPrefix(line, "//") || strings.HasPrefix(line, "*") { + return true + } else { + return false + } +} diff --git a/lib/acl/aclcheck.go b/lib/acl/aclcheck.go new file mode 100644 index 0000000..e896fcc --- /dev/null +++ b/lib/acl/aclcheck.go @@ -0,0 +1,148 @@ +package acl + +import "strings" + +func CheckTopicAuth(ACLInfo *ACLConfig, typ int, ip, username, clientid, topic string) bool { + for _, info := range ACLInfo.Info { + ctyp := info.Typ + switch ctyp { + case CLIENTID: + if match, auth := info.checkWithClientID(typ, clientid, topic); match { + return auth + } + case USERNAME: + if match, auth := info.checkWithUsername(typ, username, topic); match { + return auth + } + case IP: + if match, auth := info.checkWithIP(typ, ip, topic); match { + return auth + } + } + } + return false +} + +func (a *AuthInfo) checkWithClientID(typ int, clientid, topic string) (bool, bool) { + auth := false + match := false + if a.Val == "*" || a.Val == clientid { + for _, tp := range a.Topics { + des := strings.Replace(tp, "%c", clientid, -1) + if typ == PUB { + if pubTopicMatch(topic, des) { + match = true + auth = a.checkAuth(PUB) + } + } else if typ == SUB { + if subTopicMatch(topic, des) { + match = true + auth = a.checkAuth(SUB) + } + } + } + } + return match, auth +} + +func (a *AuthInfo) checkWithUsername(typ int, username, topic string) (bool, bool) { + auth := false + match := false + if a.Val == "*" || a.Val == username { + for _, tp := range a.Topics { + des := strings.Replace(tp, "%u", username, -1) + if typ == PUB { + if pubTopicMatch(topic, des) { + match = true + auth = a.checkAuth(PUB) + } + } else if typ == SUB { + if subTopicMatch(topic, des) { + match = true + auth = a.checkAuth(SUB) + } + } + } + } + return match, auth +} + +func (a *AuthInfo) checkWithIP(typ int, ip, topic string) (bool, bool) { + auth := false + match := false + if a.Val == "*" || a.Val == ip { + for _, tp := range a.Topics { + des := tp + if typ == PUB { + if pubTopicMatch(topic, des) { + auth = a.checkAuth(PUB) + match = true + } + } else if typ == SUB { + if subTopicMatch(topic, des) { + auth = a.checkAuth(SUB) + match = true + } + } + } + } + return match, auth +} + +func (a *AuthInfo) checkAuth(typ int) bool { + auth := false + if typ == PUB { + if a.Auth == ALLOW && (a.PubSub == PUB || a.PubSub == PUBSUB) { + auth = true + } else if a.Auth == DENY && a.PubSub == SUB { + auth = true + } + } else if typ == SUB { + if a.Auth == ALLOW && (a.PubSub == SUB || a.PubSub == PUBSUB) { + auth = true + } else if a.Auth == DENY && a.PubSub == PUB { + auth = true + } + } + return auth +} + +func pubTopicMatch(pub, des string) bool { + dest, _ := SubscribeTopicSpilt(des) + topic, _ := PublishTopicSpilt(pub) + for i, t := range dest { + if i > len(topic)-1 { + return false + } + if t == "#" { + return true + } + if t == "+" || t == topic[i] { + continue + } + if t != topic[i] { + return false + } + } + return true +} + +func subTopicMatch(pub, des string) bool { + dest, _ := SubscribeTopicSpilt(des) + topic, _ := SubscribeTopicSpilt(pub) + for i, t := range dest { + if i > len(topic)-1 { + return false + } + if t == "#" { + return true + } + if t == "+" || "+" == topic[i] || t == topic[i] { + continue + } + if t != topic[i] { + return false + } + } + return true +} diff --git a/lib/acl/spilt.go b/lib/acl/spilt.go new file mode 100644 index 0000000..3df0c9d --- /dev/null +++ b/lib/acl/spilt.go @@ -0,0 +1,52 @@ +package acl + +import ( + "bytes" + "errors" + "strings" +) + +func SubscribeTopicSpilt(topic string) ([]string, error) { + subject := []byte(topic) + if bytes.IndexByte(subject, '#') != -1 { + if bytes.IndexByte(subject, '#') != len(subject)-1 { + return nil, errors.New("Topic format error with index of #") + } + } + re := strings.Split(topic, "/") + for i, v := range re { + if i != 0 && i != (len(re)-1) { + if v == "" { + return nil, errors.New("Topic format error with index of //") + } + if strings.Contains(v, "+") && v != "+" { + return nil, errors.New("Topic format error with index of +") + } + } else { + if v == "" { + re[i] = "/" + } + } + } + return re, nil + +} + +func PublishTopicSpilt(topic string) ([]string, error) { + subject := []byte(topic) + if bytes.IndexByte(subject, '#') != -1 || bytes.IndexByte(subject, '+') != -1 { + return nil, errors.New("Publish Topic format error with + and #") + } + re := strings.Split(topic, "/") + for i, v := range re { + if v == "" { + if i != 0 && i != (len(re)-1) { + return nil, errors.New("Topic format error with index of //") + } else { + re[i] = "/" + } + } + + } + return re, nil +}