mirror of
https://github.com/fhmq/hmq.git
synced 2026-05-02 14:28:34 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c75470f5de | ||
|
|
8ddca9bdc3 | ||
|
|
6c75361f88 | ||
|
|
7a603e1a34 | ||
|
|
fef923d10a | ||
|
|
4a85fcb615 |
2
.github/workflows/go.yml
vendored
2
.github/workflows/go.yml
vendored
@@ -19,7 +19,7 @@ jobs:
|
|||||||
- name: Set up Go
|
- name: Set up Go
|
||||||
uses: actions/setup-go@v3
|
uses: actions/setup-go@v3
|
||||||
with:
|
with:
|
||||||
go-version: 1.19
|
go-version: 1.21
|
||||||
|
|
||||||
- name: Build
|
- name: Build
|
||||||
run: go build -v ./...
|
run: go build -v ./...
|
||||||
|
|||||||
4
.github/workflows/macos.yml
vendored
4
.github/workflows/macos.yml
vendored
@@ -12,7 +12,7 @@ jobs:
|
|||||||
- name: Set up Go
|
- name: Set up Go
|
||||||
uses: actions/setup-go@v2
|
uses: actions/setup-go@v2
|
||||||
with:
|
with:
|
||||||
go-version: 1.18
|
go-version: 1.21
|
||||||
|
|
||||||
- name: Build
|
- name: Build
|
||||||
run: go build -v ./...
|
run: go build -v ./...
|
||||||
|
|||||||
2
.github/workflows/ubuntu.yml
vendored
2
.github/workflows/ubuntu.yml
vendored
@@ -12,7 +12,7 @@ jobs:
|
|||||||
- name: Set up Go
|
- name: Set up Go
|
||||||
uses: actions/setup-go@v2
|
uses: actions/setup-go@v2
|
||||||
with:
|
with:
|
||||||
go-version: 1.18
|
go-version: 1.21
|
||||||
|
|
||||||
- name: Build
|
- name: Build
|
||||||
run: go build -v ./...
|
run: go build -v ./...
|
||||||
|
|||||||
2
.github/workflows/windows.yml
vendored
2
.github/workflows/windows.yml
vendored
@@ -12,7 +12,7 @@ jobs:
|
|||||||
- name: Set up Go
|
- name: Set up Go
|
||||||
uses: actions/setup-go@v2
|
uses: actions/setup-go@v2
|
||||||
with:
|
with:
|
||||||
go-version: 1.18
|
go-version: 1.21
|
||||||
|
|
||||||
- name: Build
|
- name: Build
|
||||||
run: go build -v ./...
|
run: go build -v ./...
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -160,6 +161,11 @@ func (b *Broker) Start() {
|
|||||||
go b.StartClientListening(false)
|
go b.StartClientListening(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//listen client over unix
|
||||||
|
if b.config.Port == "" && b.config.UnixFilePath != "" {
|
||||||
|
go b.StartUnixSocketClientListening(b.config.UnixFilePath, true)
|
||||||
|
}
|
||||||
|
|
||||||
//listen for cluster
|
//listen for cluster
|
||||||
if b.config.Cluster.Port != "" {
|
if b.config.Cluster.Port != "" {
|
||||||
go b.StartClusterListening()
|
go b.StartClusterListening()
|
||||||
@@ -268,6 +274,60 @@ func (b *Broker) StartClientListening(Tls bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Broker) StartUnixSocketClientListening(socketPath string, UnixSocket bool) {
|
||||||
|
var err error
|
||||||
|
var l net.Listener
|
||||||
|
for {
|
||||||
|
if UnixSocket {
|
||||||
|
if FileExist(socketPath) {
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Remove Unix socketPath ", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
conn, _ := net.ResolveUnixAddr("unix", socketPath)
|
||||||
|
l, err = net.ListenUnix("unix", conn)
|
||||||
|
log.Info("Start Listening client on Unix socket", zap.String("socketPath", socketPath))
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
break // successfully listening
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Error("Error listening on ", zap.Error(err))
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
tmpDelay := 10 * ACCEPT_MIN_SLEEP
|
||||||
|
for {
|
||||||
|
conn, err := l.Accept()
|
||||||
|
if err != nil {
|
||||||
|
if ne, ok := err.(net.Error); ok && ne.Temporary() {
|
||||||
|
log.Error(
|
||||||
|
"Temporary Client Accept Error(%v), sleeping %dms",
|
||||||
|
zap.Error(ne),
|
||||||
|
zap.Duration("sleeping", tmpDelay/time.Millisecond),
|
||||||
|
)
|
||||||
|
|
||||||
|
time.Sleep(tmpDelay)
|
||||||
|
tmpDelay *= 2
|
||||||
|
if tmpDelay > ACCEPT_MAX_SLEEP {
|
||||||
|
tmpDelay = ACCEPT_MAX_SLEEP
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Error("Accept error", zap.Error(err))
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
tmpDelay = ACCEPT_MIN_SLEEP
|
||||||
|
go func() {
|
||||||
|
err := b.handleConnection(CLIENT, conn)
|
||||||
|
if err != nil {
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (b *Broker) StartClusterListening() {
|
func (b *Broker) StartClusterListening() {
|
||||||
var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port
|
var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port
|
||||||
log.Info("Start Listening cluster on ", zap.String("hp", hp))
|
log.Info("Start Listening cluster on ", zap.String("hp", hp))
|
||||||
@@ -743,3 +803,15 @@ func (b *Broker) OnlineOfflineNotification(info Info, online bool, lastMsg int64
|
|||||||
|
|
||||||
b.PublishMessage(packet)
|
b.PublishMessage(packet)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func FileExist(name string) bool {
|
||||||
|
_, err := os.Stat(name)
|
||||||
|
if err == nil {
|
||||||
|
return true
|
||||||
|
} else if os.IsNotExist(err) {
|
||||||
|
return false
|
||||||
|
} else {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|||||||
@@ -19,20 +19,21 @@ import (
|
|||||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Worker int `json:"workerNum"`
|
Worker int `json:"workerNum"`
|
||||||
HTTPPort string `json:"httpPort"`
|
HTTPPort string `json:"httpPort"`
|
||||||
Host string `json:"host"`
|
Host string `json:"host"`
|
||||||
Port string `json:"port"`
|
Port string `json:"port"`
|
||||||
Cluster RouteInfo `json:"cluster"`
|
Cluster RouteInfo `json:"cluster"`
|
||||||
Router string `json:"router"`
|
Router string `json:"router"`
|
||||||
TlsHost string `json:"tlsHost"`
|
TlsHost string `json:"tlsHost"`
|
||||||
TlsPort string `json:"tlsPort"`
|
TlsPort string `json:"tlsPort"`
|
||||||
WsPath string `json:"wsPath"`
|
WsPath string `json:"wsPath"`
|
||||||
WsPort string `json:"wsPort"`
|
WsPort string `json:"wsPort"`
|
||||||
WsTLS bool `json:"wsTLS"`
|
WsTLS bool `json:"wsTLS"`
|
||||||
TlsInfo TLSInfo `json:"tlsInfo"`
|
TlsInfo TLSInfo `json:"tlsInfo"`
|
||||||
Debug bool `json:"debug"`
|
Debug bool `json:"debug"`
|
||||||
Plugin Plugins `json:"plugins"`
|
Plugin Plugins `json:"plugins"`
|
||||||
|
UnixFilePath string `json:"unixFilePath"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Plugins struct {
|
type Plugins struct {
|
||||||
@@ -87,8 +88,9 @@ func ConfigureConfig(args []string) (*Config, error) {
|
|||||||
fs.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.")
|
fs.IntVar(&config.Worker, "worker", 1024, "worker num to process message, perfer (client num)/10.")
|
||||||
fs.StringVar(&config.HTTPPort, "httpport", "8080", "Port to listen on.")
|
fs.StringVar(&config.HTTPPort, "httpport", "8080", "Port to listen on.")
|
||||||
fs.StringVar(&config.HTTPPort, "hp", "8080", "Port to listen on.")
|
fs.StringVar(&config.HTTPPort, "hp", "8080", "Port to listen on.")
|
||||||
fs.StringVar(&config.Port, "port", "1883", "Port to listen on.")
|
fs.StringVar(&config.Port, "port", "", "Port to listen on.")
|
||||||
fs.StringVar(&config.Port, "p", "1883", "Port to listen on.")
|
fs.StringVar(&config.Port, "p", "", "Port to listen on.")
|
||||||
|
fs.StringVar(&config.UnixFilePath, "unixfilepath", "", "unix sock to listen on.")
|
||||||
fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on")
|
fs.StringVar(&config.Host, "host", "0.0.0.0", "Network host to listen on")
|
||||||
fs.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.")
|
fs.StringVar(&config.Cluster.Port, "cp", "", "Cluster port from which members can connect.")
|
||||||
fs.StringVar(&config.Cluster.Port, "clusterport", "", "Cluster port from which members can connect.")
|
fs.StringVar(&config.Cluster.Port, "clusterport", "", "Cluster port from which members can connect.")
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -59,6 +59,6 @@ require (
|
|||||||
golang.org/x/crypto v0.14.0 // indirect
|
golang.org/x/crypto v0.14.0 // indirect
|
||||||
golang.org/x/sys v0.13.0 // indirect
|
golang.org/x/sys v0.13.0 // indirect
|
||||||
golang.org/x/text v0.13.0 // indirect
|
golang.org/x/text v0.13.0 // indirect
|
||||||
google.golang.org/protobuf v1.30.0 // indirect
|
google.golang.org/protobuf v1.33.0 // indirect
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
6
go.sum
6
go.sum
@@ -46,7 +46,6 @@ github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg
|
|||||||
github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
|
github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
|
||||||
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
|
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
|
||||||
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
|
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
|
||||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
|
||||||
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
||||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||||
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
||||||
@@ -170,9 +169,8 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
|||||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
||||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
|
||||||
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
|
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||||
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
|
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||||
|
|||||||
Reference in New Issue
Block a user