10 Commits

Author SHA1 Message Date
Eric Pearson
c46080726d Merge pull request #202 from xinkonglili/weilili
fix(broker):Added windows pipe socket communication, compatible with other systems #199
2024-04-19 17:04:13 +08:00
wei_lilitw
9063c6069c feat(broker):Added pipe socket conditions for different systems 2024-04-19 15:24:14 +08:00
wei_lilitw
d50464571e feat(hmq):add windows pipe socket.Use the open source project npipe, which nicely encapsulates the operations of windows pipe and returns the connection type net.Conn. 2024-04-18 17:31:35 +08:00
joy,zhou
2ceb61a027 update x/net 2024-04-17 14:44:26 +08:00
dependabot[bot]
c75470f5de Bump google.golang.org/protobuf from 1.30.0 to 1.33.0 (#196)
Bumps google.golang.org/protobuf from 1.30.0 to 1.33.0.

---
updated-dependencies:
- dependency-name: google.golang.org/protobuf
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-04-15 19:03:36 +08:00
Lijin
8ddca9bdc3 feat(hmq):Provide handler StartUnixSocketClientListening used to handle the Unix communications (#198)
Co-authored-by: wei_lilitw <wei_lilitw@topsec.com.cn>
2024-04-15 19:03:24 +08:00
Eric Pearson
6c75361f88 Update windows.yml 2024-03-27 17:34:13 +08:00
Eric Pearson
7a603e1a34 Update ubuntu.yml 2024-03-27 17:34:00 +08:00
Eric Pearson
fef923d10a Update go.yml 2024-03-27 17:33:48 +08:00
Eric Pearson
4a85fcb615 Update macos.yml 2024-03-27 17:33:34 +08:00
11 changed files with 196 additions and 38 deletions

View File

@@ -19,7 +19,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.19
go-version: 1.21
- name: Build
run: go build -v ./...

View File

@@ -12,7 +12,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.18
go-version: 1.21
- name: Build
run: go build -v ./...
run: go build -v ./...

View File

@@ -12,7 +12,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.18
go-version: 1.21
- name: Build
run: go build -v ./...

View File

@@ -12,7 +12,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.18
go-version: 1.21
- name: Build
run: go build -v ./...

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"net"
"net/http"
"os"
"sync"
"time"
@@ -160,6 +161,15 @@ func (b *Broker) Start() {
go b.StartClientListening(false)
}
//listen client over unix
if b.config.Port == "" && b.config.UnixFilePath != "" {
go b.StartUnixSocketClientListening(b.config.UnixFilePath, true)
}
//listen client over windows pipe
if b.config.Port == "" && b.config.UnixFilePath == "" && b.config.WindowsPipeName != "" {
go b.StartPipeSocketListening(b.config.WindowsPipeName, true)
}
//listen for cluster
if b.config.Cluster.Port != "" {
go b.StartClusterListening()
@@ -268,6 +278,60 @@ func (b *Broker) StartClientListening(Tls bool) {
}
}
func (b *Broker) StartUnixSocketClientListening(socketPath string, unixSocket bool) {
var err error
var l net.Listener
for {
if unixSocket {
if FileExist(socketPath) {
if err != nil {
log.Error("Remove Unix socketPath ", zap.Error(err))
}
}
conn, _ := net.ResolveUnixAddr("unix", socketPath)
l, err = net.ListenUnix("unix", conn)
log.Info("Start Listening client on Unix socket", zap.String("socketPath", socketPath))
}
if err == nil {
break // successfully listening
}
log.Error("Error listening on ", zap.Error(err))
time.Sleep(1 * time.Second)
}
tmpDelay := 10 * ACCEPT_MIN_SLEEP
for {
conn, err := l.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
log.Error(
"Temporary Client Accept Error(%v), sleeping %dms",
zap.Error(ne),
zap.Duration("sleeping", tmpDelay/time.Millisecond),
)
time.Sleep(tmpDelay)
tmpDelay *= 2
if tmpDelay > ACCEPT_MAX_SLEEP {
tmpDelay = ACCEPT_MAX_SLEEP
}
} else {
log.Error("Accept error", zap.Error(err))
}
continue
}
tmpDelay = ACCEPT_MIN_SLEEP
go func() {
err := b.handleConnection(CLIENT, conn)
if err != nil {
conn.Close()
}
}()
}
}
func (b *Broker) StartClusterListening() {
var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port
log.Info("Start Listening cluster on ", zap.String("hp", hp))
@@ -743,3 +807,15 @@ func (b *Broker) OnlineOfflineNotification(info Info, online bool, lastMsg int64
b.PublishMessage(packet)
}
func FileExist(name string) bool {
_, err := os.Stat(name)
if err == nil {
return true
} else if os.IsNotExist(err) {
return false
} else {
panic(err)
}
}

View File

@@ -19,20 +19,22 @@ import (
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type Config struct {
Worker int `json:"workerNum"`
HTTPPort string `json:"httpPort"`
Host string `json:"host"`
Port string `json:"port"`
Cluster RouteInfo `json:"cluster"`
Router string `json:"router"`
TlsHost string `json:"tlsHost"`
TlsPort string `json:"tlsPort"`
WsPath string `json:"wsPath"`
WsPort string `json:"wsPort"`
WsTLS bool `json:"wsTLS"`
TlsInfo TLSInfo `json:"tlsInfo"`
Debug bool `json:"debug"`
Plugin Plugins `json:"plugins"`
Worker int `json:"workerNum"`
HTTPPort string `json:"httpPort"`
Host string `json:"host"`
Port string `json:"port"`
Cluster RouteInfo `json:"cluster"`
Router string `json:"router"`
TlsHost string `json:"tlsHost"`
TlsPort string `json:"tlsPort"`
WsPath string `json:"wsPath"`
WsPort string `json:"wsPort"`
WsTLS bool `json:"wsTLS"`
TlsInfo TLSInfo `json:"tlsInfo"`
Debug bool `json:"debug"`
Plugin Plugins `json:"plugins"`
UnixFilePath string `json:"unixFilePath"`
WindowsPipeName string `json:"windowsPipeName"`
}
type Plugins struct {
@@ -87,8 +89,9 @@ func ConfigureConfig(args []string) (*Config, error) {
fs.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.")
fs.StringVar(&config.HTTPPort, "httpport", "8080", "Port to listen on.")
fs.StringVar(&config.HTTPPort, "hp", "8080", "Port to listen on.")
fs.StringVar(&config.Port, "port", "1883", "Port to listen on.")
fs.StringVar(&config.Port, "p", "1883", "Port to listen on.")
fs.StringVar(&config.Port, "port", "", "Port to listen on.")
fs.StringVar(&config.Port, "p", "", "Port to listen on.")
fs.StringVar(&config.UnixFilePath, "unixfilepath", "", "unix sock to listen on.")
fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on")
fs.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.")
fs.StringVar(&config.Cluster.Port, "clusterport", "", "Cluster port from which members can connect.")

View File

@@ -0,0 +1,11 @@
package broker
import (
"fmt"
)
// StartPipeSocketListening We use the open source npipe library
// to jump over pipe communication in mac
func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) {
fmt.Println("macos system")
}

View File

@@ -0,0 +1,6 @@
package broker
// StartPipeSocketListening We use the open source npipe library to
// jump over pipe communication in linux
func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) {
}

View File

@@ -0,0 +1,61 @@
package broker
import (
"fmt"
"github.com/natefinch/npipe"
"go.uber.org/zap"
"net"
"time"
)
// StartPipeSocketListening We use the open source npipe library to support pipe communication in windows
func (b *Broker) StartPipeSocketListening(pipeName string, usePipe bool) {
var err error
var ln *npipe.PipeListener
for {
if usePipe {
fmt.Println(pipeName)
ln, err = npipe.Listen(pipeName)
log.Info("Start Listening client on ", zap.String("pipeName", pipeName))
}
if err == nil {
break // successfully listening
}
log.Error("Error listening on ", zap.Error(err))
time.Sleep(1 * time.Second)
}
tmpDelay := 10 * ACCEPT_MIN_SLEEP
for {
conn, err := ln.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
log.Error(
"Temporary Client Accept Error(%v), sleeping %dms",
zap.Error(ne),
zap.Duration("sleeping", tmpDelay/time.Millisecond),
)
time.Sleep(tmpDelay)
tmpDelay *= 2
if tmpDelay > ACCEPT_MAX_SLEEP {
tmpDelay = ACCEPT_MAX_SLEEP
}
} else {
log.Error("Accept error", zap.Error(err))
}
continue
}
tmpDelay = ACCEPT_MIN_SLEEP
go func() {
err := b.handleConnection(CLIENT, conn)
fmt.Println("handleConnection,", err)
if err != nil {
conn.Close()
}
}()
}
}

11
go.mod
View File

@@ -14,7 +14,7 @@ require (
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/stretchr/testify v1.8.3
go.uber.org/zap v1.24.0
golang.org/x/net v0.17.0
golang.org/x/net v0.23.0
)
require (
@@ -46,6 +46,7 @@ require (
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/natefinch/npipe v0.0.0-20160621034901-c1b8fa8bdcce // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
@@ -56,9 +57,9 @@ require (
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

24
go.sum
View File

@@ -46,7 +46,6 @@ github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg
github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
@@ -96,6 +95,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/natefinch/npipe v0.0.0-20160621034901-c1b8fa8bdcce h1:TqjP/BTDrwN7zP9xyXVuLsMBXYMt6LLYi55PlrIcq8U=
github.com/natefinch/npipe v0.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:ifHPsLndGGzvgzcaXUvzmt6LxKT4pJ+uzEhtnMt+f7A=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
@@ -139,14 +140,14 @@ golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k=
golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -158,21 +159,20 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=