9 Commits

Author SHA1 Message Date
Eric Pearson
5d8c4bee66 Merge pull request #206 from xinkonglili/weilili
fix(hmq):Increased the log level of error #205
2024-08-30 18:04:50 +08:00
wei_lilitw
7f2cbaaf20 fix(hmq):Set a higher debug level for logs to avoid printing too much log information and interfering with debugging 2024-05-27 11:03:48 +08:00
Eric Pearson
a92de4c9e8 Merge pull request #204 from xinkonglili/weilili
fix(broker):Added file removal for unix sock communication #203
2024-05-24 09:38:39 +08:00
wei_lilitw
646f13199d fix(broker):fix unix sock file not be removed 2024-05-23 17:22:29 +08:00
wei_lilitw
67594b07ef fix(broker):fix unix sock file not be removed 2024-05-23 17:21:21 +08:00
Eric Pearson
c46080726d Merge pull request #202 from xinkonglili/weilili
fix(broker):Added windows pipe socket communication, compatible with other systems #199
2024-04-19 17:04:13 +08:00
wei_lilitw
9063c6069c feat(broker):Added pipe socket conditions for different systems 2024-04-19 15:24:14 +08:00
wei_lilitw
d50464571e feat(hmq):add windows pipe socket.Use the open source project npipe, which nicely encapsulates the operations of windows pipe and returns the connection type net.Conn. 2024-04-18 17:31:35 +08:00
joy,zhou
2ceb61a027 update x/net 2024-04-17 14:44:26 +08:00
8 changed files with 150 additions and 38 deletions

View File

@@ -165,6 +165,10 @@ func (b *Broker) Start() {
if b.config.Port == "" && b.config.UnixFilePath != "" {
go b.StartUnixSocketClientListening(b.config.UnixFilePath, true)
}
//listen client over windows pipe
if b.config.Port == "" && b.config.UnixFilePath == "" && b.config.WindowsPipeName != "" {
go b.StartPipeSocketListening(b.config.WindowsPipeName, true)
}
//listen for cluster
if b.config.Cluster.Port != "" {
@@ -274,12 +278,13 @@ func (b *Broker) StartClientListening(Tls bool) {
}
}
func (b *Broker) StartUnixSocketClientListening(socketPath string, UnixSocket bool) {
func (b *Broker) StartUnixSocketClientListening(socketPath string, unixSocket bool) {
var err error
var l net.Listener
for {
if UnixSocket {
if unixSocket {
if FileExist(socketPath) {
err = os.Remove(socketPath)
if err != nil {
log.Error("Remove Unix socketPath ", zap.Error(err))
}

View File

@@ -19,21 +19,22 @@ import (
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type Config struct {
Worker int `json:"workerNum"`
HTTPPort string `json:"httpPort"`
Host string `json:"host"`
Port string `json:"port"`
Cluster RouteInfo `json:"cluster"`
Router string `json:"router"`
TlsHost string `json:"tlsHost"`
TlsPort string `json:"tlsPort"`
WsPath string `json:"wsPath"`
WsPort string `json:"wsPort"`
WsTLS bool `json:"wsTLS"`
TlsInfo TLSInfo `json:"tlsInfo"`
Debug bool `json:"debug"`
Plugin Plugins `json:"plugins"`
UnixFilePath string `json:"unixFilePath"`
Worker int `json:"workerNum"`
HTTPPort string `json:"httpPort"`
Host string `json:"host"`
Port string `json:"port"`
Cluster RouteInfo `json:"cluster"`
Router string `json:"router"`
TlsHost string `json:"tlsHost"`
TlsPort string `json:"tlsPort"`
WsPath string `json:"wsPath"`
WsPort string `json:"wsPort"`
WsTLS bool `json:"wsTLS"`
TlsInfo TLSInfo `json:"tlsInfo"`
Debug string `json:"debug"`
Plugin Plugins `json:"plugins"`
UnixFilePath string `json:"unixFilePath"`
WindowsPipeName string `json:"windowsPipeName"`
}
type Plugins struct {
@@ -86,12 +87,12 @@ func ConfigureConfig(args []string) (*Config, error) {
fs.BoolVar(&help, "help", false, "Show this message.")
fs.IntVar(&config.Worker, "w", 1024, "worker num to process message, perfer (client num)/10.")
fs.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.")
fs.StringVar(&config.HTTPPort, "httpport", "8080", "Port to listen on.")
fs.StringVar(&config.HTTPPort, "hp", "8080", "Port to listen on.")
fs.StringVar(&config.Port, "port", "", "Port to listen on.")
fs.StringVar(&config.Port, "p", "", "Port to listen on.")
fs.StringVar(&config.HTTPPort, "httpport", "", "Port to listen on.")
fs.StringVar(&config.HTTPPort, "hp", "", "Port to listen on.")
fs.StringVar(&config.Port, "port", "8090", "Port to listen on.")
fs.StringVar(&config.Port, "p", "8090", "Port to listen on.")
fs.StringVar(&config.UnixFilePath, "unixfilepath", "", "unix sock to listen on.")
fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on")
fs.StringVar(&config.Host, "host", "127.0.0.1", "Network host to listen on")
fs.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.")
fs.StringVar(&config.Cluster.Port, "clusterport", "", "Cluster port from which members can connect.")
fs.StringVar(&config.Router, "r", "", "Router who maintenance cluster info")
@@ -102,8 +103,8 @@ func ConfigureConfig(args []string) (*Config, error) {
fs.StringVar(&config.WsPath, "wspath", "", "path for ws to listen on")
fs.StringVar(&configFile, "config", "", "config file for hmq")
fs.StringVar(&configFile, "c", "", "config file for hmq")
fs.BoolVar(&config.Debug, "debug", false, "enable Debug logging.")
fs.BoolVar(&config.Debug, "d", false, "enable Debug logging.")
fs.StringVar(&config.Debug, "debug", "info", "enable Debug logging.")
fs.StringVar(&config.Debug, "d", "info", "enable Debug logging.")
fs.Bool("D", true, "enable Debug logging.")
@@ -119,7 +120,7 @@ func ConfigureConfig(args []string) (*Config, error) {
fs.Visit(func(f *flag.Flag) {
switch f.Name {
case "D":
config.Debug = true
config.Debug = "debug"
}
})
@@ -132,7 +133,15 @@ func ConfigureConfig(args []string) (*Config, error) {
}
}
if config.Debug {
//Set the debug level of logs
switch config.Debug {
case "debug":
log = logger.Debug().Named("broker")
case "info":
log = logger.Prod().Named("broker")
case "release":
log = logger.Release().Named("broker")
default:
log = logger.Debug().Named("broker")
}

View File

@@ -0,0 +1,11 @@
package broker
import (
"fmt"
)
// StartPipeSocketListening We use the open source npipe library
// to jump over pipe communication in mac
func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) {
fmt.Println("macos system")
}

View File

@@ -0,0 +1,6 @@
package broker
// StartPipeSocketListening We use the open source npipe library to
// jump over pipe communication in linux
func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) {
}

View File

@@ -0,0 +1,61 @@
package broker
import (
"fmt"
"github.com/natefinch/npipe"
"go.uber.org/zap"
"net"
"time"
)
// StartPipeSocketListening We use the open source npipe library to support pipe communication in windows
func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) {
var err error
var ln *npipe.PipeListener
for {
if usePipe {
fmt.Println(pipeName)
ln, err = npipe.Listen(pipeName)
log.Info("Start Listening client on ", zap.String("pipeName", pipeName))
}
if err == nil {
break // successfully listening
}
log.Error("Error listening on ", zap.Error(err))
time.Sleep(1 * time.Second)
}
tmpDelay := 10 * ACCEPT_MIN_SLEEP
for {
conn, err := ln.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
log.Error(
"Temporary Client Accept Error(%v), sleeping %dms",
zap.Error(ne),
zap.Duration("sleeping", tmpDelay/time.Millisecond),
)
time.Sleep(tmpDelay)
tmpDelay *= 2
if tmpDelay > ACCEPT_MAX_SLEEP {
tmpDelay = ACCEPT_MAX_SLEEP
}
} else {
log.Error("Accept error", zap.Error(err))
}
continue
}
tmpDelay = ACCEPT_MIN_SLEEP
go func() {
err := b.handleConnection(CLIENT, conn)
fmt.Println("handleConnection,", err)
if err != nil {
conn.Close()
}
}()
}
}

9
go.mod
View File

@@ -14,7 +14,7 @@ require (
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/stretchr/testify v1.8.3
go.uber.org/zap v1.24.0
golang.org/x/net v0.17.0
golang.org/x/net v0.23.0
)
require (
@@ -46,6 +46,7 @@ require (
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/natefinch/npipe v0.0.0-20160621034901-c1b8fa8bdcce // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
@@ -56,9 +57,9 @@ require (
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

18
go.sum
View File

@@ -95,6 +95,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/natefinch/npipe v0.0.0-20160621034901-c1b8fa8bdcce h1:TqjP/BTDrwN7zP9xyXVuLsMBXYMt6LLYi55PlrIcq8U=
github.com/natefinch/npipe v0.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:ifHPsLndGGzvgzcaXUvzmt6LxKT4pJ+uzEhtnMt+f7A=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
@@ -138,14 +140,14 @@ golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k=
golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -157,15 +159,15 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@@ -38,6 +38,15 @@ func NewProdLogger() (*zap.Logger, error) {
return logCfg.Build()
}
// NewReleaseLogger return a logger for production builds
func NewReleaseLogger() (*zap.Logger, error) {
logCfg := zap.NewProductionConfig()
logCfg.DisableStacktrace = true
logCfg.Level = zap.NewAtomicLevelAt(zap.ErrorLevel)
logCfg.EncoderConfig = encoderCfg
return logCfg.Build()
}
func Prod() *zap.Logger {
l, _ := NewProdLogger()
@@ -54,6 +63,14 @@ func Debug() *zap.Logger {
return instance
}
func Release() *zap.Logger {
l, _ := NewReleaseLogger()
instance = l
return instance
}
func Get() *zap.Logger {
if instance == nil {
l, _ := NewProdLogger()