mirror of
https://github.com/fhmq/hmq.git
synced 2026-04-26 19:48:34 +00:00
'tls'
This commit is contained in:
@@ -1,26 +1,33 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fhmq/lib/message"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/websocket"
|
||||
|
||||
log "github.com/cihub/seelog"
|
||||
)
|
||||
|
||||
type Broker struct {
|
||||
id string
|
||||
config *Config
|
||||
clients cMap
|
||||
routes cMap
|
||||
remotes cMap
|
||||
sl *Sublist
|
||||
rl *RetainList
|
||||
queues map[string]int
|
||||
id string
|
||||
cid unit64
|
||||
config *Config
|
||||
tlsConfig *tls.Config
|
||||
clients cMap
|
||||
routes cMap
|
||||
remotes cMap
|
||||
sl *Sublist
|
||||
rl *RetainList
|
||||
queues map[string]int
|
||||
}
|
||||
|
||||
func NewBroker(config *Config) *Broker {
|
||||
return &Broker{
|
||||
b := &Broker{
|
||||
id: GenUniqueId(),
|
||||
config: config,
|
||||
sl: NewSublist(),
|
||||
@@ -30,13 +37,79 @@ func NewBroker(config *Config) *Broker {
|
||||
routes: NewClientMap(),
|
||||
remotes: NewClientMap(),
|
||||
}
|
||||
if b.config.TlsPort != "" {
|
||||
tlsconfig, err := NewTLSConfig(b.config.TlsInfo)
|
||||
if err != nil {
|
||||
log.Error("new tlsConfig error: ", err)
|
||||
return nil, err
|
||||
}
|
||||
b.tlsConfig = tlsconfig
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *Broker) Start() {
|
||||
go b.StartListening(CLIENT)
|
||||
if b.config.Port != "" {
|
||||
go b.StartListening(CLIENT)
|
||||
}
|
||||
if b.config.Cluster.Port != "" {
|
||||
go b.StartListening(ROUTER)
|
||||
}
|
||||
if b.config.WsPort != "" {
|
||||
go b.StartWebsocketListening()
|
||||
}
|
||||
if b.config.TlsPort != "" {
|
||||
go b.StartTLSListening()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Broker) StartWebsocketListening() {
|
||||
path := "/" + b.config.WsPath
|
||||
hp := ":" + b.config.WsPort
|
||||
log.Info("Start Webscoker Listening on ", hp, path)
|
||||
http.Handle(path, websocket.Handler(b.wsHandler))
|
||||
err := http.ListenAndServe(hp, nil)
|
||||
if err != nil {
|
||||
log.Error("ListenAndServe: " + err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Broker) wsHandler(ws *websocket.Conn) {
|
||||
atomic.AddUint64(&b.cid, 1)
|
||||
go b.handleConnection(CLIENT, conn, b.cid)
|
||||
}
|
||||
|
||||
func (b *Broker) StartTLSListening() {
|
||||
hp := b.config.TlsHost + ":" + b.config.TlsPort
|
||||
log.Info("Start TLS Listening client on ", hp)
|
||||
|
||||
l, e := tls.Listen("tcp", hp, b.tlsConfig)
|
||||
if e != nil {
|
||||
log.Error("Error listening on ", e)
|
||||
return
|
||||
}
|
||||
tmpDelay := 10 * ACCEPT_MIN_SLEEP
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
if ne, ok := err.(net.Error); ok && ne.Temporary() {
|
||||
log.Error("Temporary Client Accept Error(%v), sleeping %dms",
|
||||
ne, tmpDelay/time.Millisecond)
|
||||
time.Sleep(tmpDelay)
|
||||
tmpDelay *= 2
|
||||
if tmpDelay > ACCEPT_MAX_SLEEP {
|
||||
tmpDelay = ACCEPT_MAX_SLEEP
|
||||
}
|
||||
} else {
|
||||
log.Error("Accept error: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
tmpDelay = ACCEPT_MIN_SLEEP
|
||||
atomic.AddUint64(&b.cid, 1)
|
||||
go b.handleConnection(CLIENT, conn, b.cid)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Broker) StartListening(typ int) {
|
||||
@@ -56,7 +129,6 @@ func (b *Broker) StartListening(typ int) {
|
||||
}
|
||||
|
||||
tmpDelay := 10 * ACCEPT_MIN_SLEEP
|
||||
num := 0
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
@@ -74,8 +146,8 @@ func (b *Broker) StartListening(typ int) {
|
||||
continue
|
||||
}
|
||||
tmpDelay = ACCEPT_MIN_SLEEP
|
||||
num += 1
|
||||
go b.handleConnection(typ, conn, num)
|
||||
atomic.AddUint64(&b.cid, 1)
|
||||
go b.handleConnection(typ, conn, b.cid)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user