Files
hmq/broker/auth.go
joy.zhou 7155667f6c Pool (#16)
* add pool

* elastic workerpool

* del buf

* modify usage

* modify readme
2018-02-03 12:42:25 +08:00

85 lines
1.8 KiB
Go

/* Copyright (c) 2018, joy.zhou <chowyu08@gmail.com>
*/
package broker
import (
"strings"
"github.com/fhmq/hmq/lib/acl"
"go.uber.org/zap"
"github.com/fsnotify/fsnotify"
)
const (
PUB = 1
SUB = 2
)
func (c *client) CheckTopicAuth(typ int, topic string) bool {
if c.typ != CLIENT || !c.broker.config.Acl {
return true
}
if strings.HasPrefix(topic, "$queue/") {
topic = string([]byte(topic)[7:])
if topic == "" {
return false
}
}
ip := c.info.remoteIP
username := string(c.info.username)
clientid := string(c.info.clientID)
aclInfo := c.broker.AclConfig
return acl.CheckTopicAuth(aclInfo, typ, ip, username, clientid, topic)
}
var (
watchList = []string{"./conf"}
)
func (b *Broker) handleFsEvent(event fsnotify.Event) error {
switch event.Name {
case b.config.AclConf:
if event.Op&fsnotify.Write == fsnotify.Write ||
event.Op&fsnotify.Create == fsnotify.Create {
brokerLog.Info("text:handling acl config change event:", zap.String("filename", event.Name))
aclconfig, err := acl.AclConfigLoad(event.Name)
if err != nil {
brokerLog.Error("aclconfig change failed, load acl conf error: ", zap.Error(err))
return err
}
b.AclConfig = aclconfig
}
}
return nil
}
func (b *Broker) StartAclWatcher() {
go func() {
wch, e := fsnotify.NewWatcher()
if e != nil {
brokerLog.Error("start monitor acl config file error,", zap.Error(e))
return
}
defer wch.Close()
for _, i := range watchList {
if err := wch.Add(i); err != nil {
brokerLog.Error("start monitor acl config file error,", zap.Error(err))
return
}
}
brokerLog.Info("watching acl config file change...")
for {
select {
case evt := <-wch.Events:
b.handleFsEvent(evt)
case err := <-wch.Errors:
brokerLog.Error("error:", zap.Error(err))
}
}
}()
}