First Commit (#130)

Co-authored-by: Gary Barnett <gary.barnett@airsensa.com>
This commit is contained in:
Gary Barnett
2021-11-05 02:28:58 +00:00
committed by GitHub
parent 1d6979189a
commit 538bf70f5b
6 changed files with 511 additions and 5 deletions

1
.gitignore vendored
View File

@@ -4,3 +4,4 @@ log/*
*.test
.vscode/settings.json
.pre-commit-config.yaml
hmq.exe

View File

@@ -2,13 +2,11 @@
"workerNum": 4096,
"port": "1883",
"host": "0.0.0.0",
"debug": true,
"cluster": {
"host": "0.0.0.0",
"port": "1993"
},
"httpPort": "8080",
"router": "127.0.0.1:9888",
"tlsPort": "8883",
"tlsHost": "0.0.0.0",
"wsPort": "1888",
@@ -21,7 +19,7 @@
"keyFile": "ssl/server/key.pem"
},
"plugins": {
"auth": "authhttp",
"bridge": "kafka"
"auth": "mock",
"bridge": "csvlog"
}
}

50
plugins/bridge/CSVLog.md Normal file
View File

@@ -0,0 +1,50 @@
# CSVLog Plugin For HMQ
This is a bridge implementation for HMQ that allows messages to be logged to a CSV file at runtime.
It can be used for debugging/monitoring purposes, for integration with other systems/platforms, or as an audit trail of messages.
The plugin allows you to define 0, 1, or more filters which determine which messages get bridged. Where no filters are defined the plugin bridges every message. Where one or more filters exist, the plugin applies the filter/s and only brdiges messages that match the filter spec.
The plugin allows you provide a filename for the output file, and also supports three special filenames {LOG},{STDOUT}, and {NULL}. {LOG} results in messages being bridged to the log, {STDOUT} bridges them to Std out, and {NULL} simply skips and returns without an error.
## Configuration
The configiration settings for CSVLog are defined by the struct csvBridgeConfig.
```
type csvBridgeConfig struct {
FileName string `json:"fileName"`
LogFileMaxSizeMB int64 `json:"logFileMaxSizeMB"`
LogFileMaxFiles int64 `json:"logFileMaxFiles"`
WriteIntervalSecs int64 `json:"writeIntervalSecs"`
CommandTopic string `json:"commandTopic"`
Filters []string `json:"filters"`
}
```
| Setting | Description |
| ----------- | ----------- |
| FileName | A complete filename for the output file, or {LOG} to send bridged messages to the log, {STDOUT} to send bridged messages to STDOUT, or {NULL} to not bridge anything at all |
| LogFileMaxSizeMB | The size in megabytes at which the log file is rotated |
| LogFileMaxFiles | The maximum number of rotated logfiles to retain before they're deleted |
| WriteIntervalSecs | The delay before flushing any pending writes to the file |
| CommandTopic | The name of a topic to which commands relating to CSVLog will be sent eg "bridge/CSVLOG/command" |
| Filters | An array of filter specifications which are used to determine which messages are bridged, if there are no filters specified the filter is assumed to be "#" which bridges everything. Filters are specified the same way that topic acls are described|
## Filters
Filters use the same syntax as for ACL permissions.
So a filter can name a specific topic..
"animals/cats" will bridge messages sent to the "animals/cats" topic.
A filter can use the + or # wildcards so
"animals/cats/+" will bridge messages sent to "animals/cats/breeds", "animals/cats/colours" but not "animals/cats/breeds/longhair"
"animals/cats/#" will bridge messages sent to "animals/cats/breeds", "animals/cats/colours", "animals/cats/breeds/longhair", etc
## Commands
Currently two commands can be sent to the CSVLog bridge:
ROTATEFILE - Triggers an immediate rotation of the log file
REALOADCONFIG - Triggers a reload of the CSVLog config file

View File

@@ -32,7 +32,8 @@ type Elements struct {
const (
//Kafka plugin name
Kafka = "kafka"
Kafka = "kafka"
CSVLog = "csvlog"
)
type BridgeMQ interface {
@@ -43,6 +44,8 @@ func NewBridgeMQ(name string) BridgeMQ {
switch name {
case Kafka:
return InitKafka()
case CSVLog:
return InitCSVLog()
default:
return &mockMQ{}
}

418
plugins/bridge/csvlog.go Normal file
View File

@@ -0,0 +1,418 @@
package bridge
/*
Copyright (c) 2021, Gary Barnett @thinkovation. Released under the Apache 2 License
CSVLog is a bridge plugin for HMQ that implements CSV logging of messages. See CSVLog.md for more information
*/
import (
"encoding/csv"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"go.uber.org/zap"
)
type csvBridgeConfig struct {
FileName string `json:"fileName"`
LogFileMaxSizeMB int64 `json:"logFileMaxSizeMB"`
LogFileMaxFiles int64 `json:"logFileMaxFiles"`
WriteIntervalSecs int64 `json:"writeIntervalSecs"`
CommandTopic string `json:"commandTopic"`
Filters []string `json:"filters"`
}
type csvLog struct {
config csvBridgeConfig
buffer []string
msgchan chan (*Elements)
sync.RWMutex
}
// rotateLog performs a log rotation - copying the current logfile to the base file name plus a timestamp
func (c *csvLog) rotateLog(withPrune bool) error {
c.Lock()
filename := c.config.FileName
c.Unlock()
basename := strings.TrimSuffix(filename, filepath.Ext(filename))
newpath := basename + time.Now().Format("-20060102T150405") + filepath.Ext(filename)
renameError := os.Rename(filename, newpath)
if renameError != nil {
return renameError
}
outfile, _ := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
outfile.Close()
// Whenever we rotate a logfile we prune
if withPrune {
c.logFilePrune()
}
return nil
}
// writeToLog takes an array of elements and writes them to the logfile (or to log or stdout) spefified in
// the configuration
func (c *csvLog) writeToLog(els []Elements) error {
c.RLock()
fname := c.config.FileName
c.RUnlock()
if fname == "" {
fname = "CSVLOG.CSV"
}
if fname == "{LOG}" {
for _, value := range els {
t := time.Unix(value.Timestamp, 0)
log.Info(t.Format("2006-01-02T15:04:05") + " " + value.ClientID + " " + value.Username + " " + value.Action + " " + value.Topic + " " + value.Payload)
}
return nil
}
if fname == "{STDOUT}" {
for _, value := range els {
t := time.Unix(value.Timestamp, 0)
fmt.Println(t.Format("2006-01-02T15:04:05") + " " + value.ClientID + " " + value.Username + " " + value.Action + " " + value.Topic + " " + value.Payload)
}
return nil
}
var mbsize int64
fileStat, fileStatErr := os.Stat(fname)
if fileStatErr != nil {
log.Warn("Could not get CSVLog info. Received Err " + fileStatErr.Error())
mbsize = 0
} else {
mbsize = fileStat.Size() / 1024 / 1024
}
if mbsize > c.config.LogFileMaxSizeMB && c.config.LogFileMaxSizeMB != 0 {
rotateErr := c.rotateLog(true)
if rotateErr != nil {
log.Warn("Unable to rotate outputfile")
}
}
outfile, outfileOpenError := os.OpenFile(fname, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
defer outfile.Close()
if outfileOpenError != nil {
log.Warn("Could not open CSV Log file to write")
return errors.New("Could not write to CSV Log File")
}
writer := csv.NewWriter(outfile)
defer writer.Flush()
for _, value := range els {
t := time.Unix(value.Timestamp, 0)
var outrow = []string{t.Format("2006-01-02T15:04:05"), value.ClientID, value.Username, value.Action, value.Topic, value.Payload}
writeOutRowError := writer.Write(outrow)
if writeOutRowError != nil {
log.Warn("Could not write msg to CSV Log")
}
}
return nil
}
// Worker should be invoked as a goroutine - It listens on the csvlog message channel for incoming messages
// for performance we batch messages into an outqueue and write them in bulk when a timer expires
func (c *csvLog) Worker() {
log.Info("Running CSVLog worker")
run := true
var outqueue []Elements
for run == true {
c.RLock()
waitInterval := c.config.WriteIntervalSecs
c.RUnlock()
timer := time.NewTimer(time.Second * time.Duration(waitInterval))
select {
case p := <-c.msgchan:
c.RLock()
oktopublish := false
// Check to see if any filters are defined. If there are none we assume we're logging everything
if len(c.config.Filters) != 0 {
// We pick up a Read lock here to parse the c.config.Filters string array
// as it's a read lock, and write locks will be rare
// it feels as if this will be fine.
// If there is contention, it _might_ make sense to quickly lock c, get
// the filters and release the lock, then process the filters with no locks
// but I think it's unlikely
for _, filt := range c.config.Filters {
if topicMatch(p.Topic, filt) {
oktopublish = true
break
}
}
} else {
oktopublish = true
}
if oktopublish {
var el Elements
el.Action = p.Action
el.ClientID = p.ClientID
el.Payload = p.Payload
el.Size = p.Size
el.Timestamp = p.Timestamp
el.Topic = p.Topic
el.Username = p.Username
outqueue = append(outqueue, el)
}
c.RUnlock()
break
case <-timer.C:
if len(outqueue) > 0 {
writeResult := c.writeToLog(outqueue)
if writeResult != nil {
log.Warn("Trouble writing to CSV Log")
}
outqueue = nil
}
break
}
if run != true {
log.Info("Closing CSV Bridge worker")
break
}
}
}
// LoadCSVLogConfig loads the configuration file - it currently looks in
// "./plugins/csvlog/csvlogconfig.json" (following the example of the default location of the kafka plugin config file)
// if it doesn't find it there it looks in two further places - the current directory and
// an "assets" folder under the current directory (This is for compatibility with a couple of deployed)
// implementations.
func LoadCSVLogConfig() csvBridgeConfig {
// Check to see if the CSVLOGCONFFILE environment variable is set and if so
// check that it does actually point to a file
csvLogConfigFile := os.Getenv("CSVLOGCONFFILE")
if csvLogConfigFile != "" {
if _, err := os.Stat(csvLogConfigFile); os.IsNotExist(err) {
csvLogConfigFile = ""
}
}
// If csvLogConfigFile is blank look in the plugins directory,
// then the current directory for the csvLogConfigFile. If it's still not found we use a default config
// If the file does not exist, we use default parameters
if csvLogConfigFile == "" {
csvLogConfigFile = "./plugins/csvlog/csvlogconfig.json"
}
if _, err := os.Stat(csvLogConfigFile); os.IsNotExist(err) {
if _, err := os.Stat("csvlogconfig.json"); os.IsNotExist(err) {
csvLogConfigFile = ""
} else {
csvLogConfigFile = "csvlogconfig.json"
}
}
var configUnmarshalErr error
var config csvBridgeConfig
if csvLogConfigFile != "" {
log.Info("Trying to load config file from " + csvLogConfigFile)
content, err := ioutil.ReadFile(csvLogConfigFile)
if err != nil {
log.Info("Read config file error: ", zap.Error(err))
}
configUnmarshalErr = json.Unmarshal(content, &config)
}
if configUnmarshalErr != nil || config.FileName == "" {
log.Warn("Unable to load csvlog config file, so using default settings")
config.FileName = "/var/log/csvlog.log"
config.CommandTopic = "CSVLOG/command"
config.WriteIntervalSecs = 10
config.LogFileMaxSizeMB = 1
config.LogFileMaxFiles = 4
}
return config
}
// InitCSVLog initialises a CSVLOG plugin
// It does this by loading a config file if one can be found. The default filename follows the same
// convention as the kafka plugin - ie it's in "./plugins/csvlog/csvlogconfig.json" but an
// environment var - CSVLOGCONFFILE - can be set to provide a different location.
//
// Once the config is set the worker is started
func InitCSVLog() *csvLog {
log.Info("Trying to init CSVLOG")
c := &csvLog{config: LoadCSVLogConfig()}
c.msgchan = make(chan *Elements, 200)
//Start the csvlog worker
go c.Worker()
return c
}
// topicMatch accepts a topic name and a filter string, it then evaluates the
// topic against the filter string and returns true if there is a match.
//
// The CSV bridge can be configured with 0, 1 or more filters - Where there are no
// filters specified, every message will be re-published. Where there are filters, any message
// that passes any of the filter tests will be re-published.
func topicMatch(topic string, filter string) bool {
if topic == filter || filter == "#" {
return true
}
topicComponents := strings.Split(topic, "/")
filterComponents := strings.Split(filter, "/")
currentpos := 0
filterComponentsLength := len(filterComponents)
currentFilterComponent := ""
if filterComponentsLength > 0 {
currentFilterComponent = filterComponents[currentpos]
}
for _, topicVal := range topicComponents {
if currentFilterComponent == "" {
return false
}
if currentFilterComponent == "#" {
return true
}
if currentFilterComponent != "+" && currentFilterComponent != topicVal {
return false
}
currentpos++
if filterComponentsLength > currentpos {
currentFilterComponent = filterComponents[currentpos]
} else {
currentFilterComponent = ""
}
}
return true
}
// logFilePrune checks the number of rotated logfiles and prunes them
func (c *csvLog) logFilePrune() error {
// List the rotated files
c.RLock()
filename := c.config.FileName
maxfiles := c.config.LogFileMaxFiles
c.RUnlock()
if maxfiles == 0 {
return nil
}
fileExt := filepath.Ext(filename)
fileDir := filepath.Dir(filename)
baseFileName := strings.TrimSuffix(filepath.Base(filename), fileExt)
files, err := ioutil.ReadDir(fileDir)
if err != nil {
return err
}
var foundFiles []string
for _, file := range files {
if strings.HasPrefix(file.Name(), baseFileName+"-") {
foundFiles = append(foundFiles, file.Name())
}
}
if len(foundFiles) >= int(maxfiles) {
fmt.Println("Found ", len(foundFiles), " files")
sort.Strings(foundFiles)
for i := 0; i < len(foundFiles)-int(maxfiles); i++ {
fileDeleteError := os.Remove(fileDir + "//" + foundFiles[i])
log.Info("Pruning logfile " + fileDir + "//" + foundFiles[i])
if fileDeleteError != nil {
log.Warn("Could not delete file " + fileDir + "//" + foundFiles[i])
}
}
}
return nil
}
// Publish implements the bridge interface - it accepts an Element then checks to see if that element is a
// message published to the admin topic for the plugin
//
func (c *csvLog) Publish(e *Elements) error {
// A short-lived lock on c allows us to
// get the Command topic then release the lock
// This then allows us to process the command - which may
// take its a write lock on c (to update values) and then
// return here where we'll pick up a
// read lock to iterate over the c.config.filters
// We're trying to minimise the time spent in this function
// and to limit the overall time spent in any write locks.
c.RLock()
//CSVLOG allows you to configure a CommandTopic which is a topic to which commands affecting the behaviour of CSVLog can be sent
//The simplest would be a message with a payload of "RELOAD" which will reload the configuration allowing configuration changes to be
//made at runtime without restarting the broker
CommandTopic := c.config.CommandTopic
OutFile := c.config.FileName
c.RUnlock()
// If the outfile is set to "{NULL}" we don't do anything with the message - we just return nil
// This feature is here to allow CSVLOG to be enabled/disabled at runtime
if OutFile == "{NULL}" {
return nil
}
if e.Topic == CommandTopic {
log.Info("CSVLOG Command Received")
// Process Command
// These are going to be rare ocurrences, so in this implementation
// we will process the command here - but if we _really_ want to
// squeeze delays out, we could have a worker sitting on a
// command channel processing any commands.
if e.Payload == "RELOADCONFIG" {
newConfig := LoadCSVLogConfig()
c.Lock()
c.config = newConfig
c.Unlock()
}
if e.Payload == "ROTATEFILE" {
c.rotateLog(true)
}
if e.Payload == "ROTATEFILENOPRUNE" {
c.rotateLog(false)
}
// We could return without doing anything more here, but
// for now we move ahead with the filter processing on the
// basis that unless we either filter for "all" (with #) or
// filter for the CommandTopic, they won't be logged - but we
// may have a reason for wanting to track commands too
}
// Push the message into the channel and return
// the channel is buffered and is read by a goroutine so this should block for the shortest possible time
c.msgchan <- e
return nil
}

View File

@@ -0,0 +1,36 @@
package bridge
import (
"fmt"
"testing"
)
//Test_topicMatch is here to double check the topic matching logic
func Test_topicMatch(t *testing.T) {
tests := []struct {
name string
topic string
filter string
want bool
}{
// Some sample test cases
{name: "Simple", topic: "test", filter: "test", want: true},
{name: "Simple", topic: "test/cat", filter: "test/+", want: true},
{name: "Simple", topic: "test/cat/breed", filter: "test/+", want: false},
{name: "Simple", topic: "test/cat", filter: "test/#", want: true},
{name: "Simple", topic: "test/cat/banana", filter: "test/#", want: true},
{name: "Simple", topic: "test/cat/banana", filter: "test/+", want: false},
{name: "Simple", topic: "test/dog/banana", filter: "test/cat/+", want: false},
{name: "Simple", topic: "test/cat/banana", filter: "test/+/banana", want: true},
}
for _, tt := range tests {
fmt.Println(tt)
t.Run(tt.name, func(t *testing.T) {
if got := topicMatch(tt.topic, tt.filter); got != tt.want {
t.Errorf("topicMatch() = %v, want %v", got, tt.want)
}
})
}
}