diff --git a/README.md b/README.md index 543e7c3..068b153 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,20 @@ $ go run main.go "host": "0.0.0.0", "port": "1993", "routers": ["10.10.0.11:1993","10.10.0.12:1993"] - } + }, + "wsPort": "1888", + "wsPath": "/ws", + "wsTLS": true, + "tlsPort": "8883", + "tlsHost": "0.0.0.0", + "tlsInfo": { + "verify": true, + "caFile": "tls/ca/cacert.pem", + "certFile": "tls/server/cert.pem", + "keyFile": "tls/server/key.pem" + }, + "acl":true, + "aclConf":"conf/acl.conf" } ~~~ @@ -37,6 +50,10 @@ $ go run main.go * Queue subscribe +* Websocket Support + +* TLS/SSL Support + ### QUEUE SUBSCRIBE | Prefix | Examples | diff --git a/broker.config b/broker.config index dd95de3..1632315 100644 --- a/broker.config +++ b/broker.config @@ -5,5 +5,18 @@ "host": "0.0.0.0", "port": "1993", "routes": [] - } + }, + "tlsPort": "", + "tlsHost": "0.0.0.0", + "wsPort": "1888", + "wsPath": "/ws", + "wsTLS": false, + "tlsInfo": { + "verify": true, + "caFile": "tls/ca/cacert.pem", + "certFile": "tls/server/cert.pem", + "keyFile": "tls/server/key.pem" + }, + "acl": true, + "aclConf": "conf/acl.conf" } \ No newline at end of file diff --git a/broker/broker.go b/broker/broker.go index fd26549..0642437 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -1,26 +1,33 @@ package broker import ( + "crypto/tls" "fhmq/lib/message" "net" + "net/http" + "sync/atomic" "time" + "golang.org/x/net/websocket" + log "github.com/cihub/seelog" ) type Broker struct { - id string - config *Config - clients cMap - routes cMap - remotes cMap - sl *Sublist - rl *RetainList - queues map[string]int + id string + cid unit64 + config *Config + tlsConfig *tls.Config + clients cMap + routes cMap + remotes cMap + sl *Sublist + rl *RetainList + queues map[string]int } func NewBroker(config *Config) *Broker { - return &Broker{ + b := &Broker{ id: GenUniqueId(), config: config, sl: NewSublist(), @@ -30,13 +37,79 @@ func NewBroker(config *Config) *Broker { routes: NewClientMap(), remotes: NewClientMap(), } + if b.config.TlsPort != "" { + tlsconfig, err := NewTLSConfig(b.config.TlsInfo) + if err != nil { + log.Error("new tlsConfig error: ", err) + return nil, err + } + b.tlsConfig = tlsconfig + } + return b } func (b *Broker) Start() { - go b.StartListening(CLIENT) + if b.config.Port != "" { + go b.StartListening(CLIENT) + } if b.config.Cluster.Port != "" { go b.StartListening(ROUTER) } + if b.config.WsPort != "" { + go b.StartWebsocketListening() + } + if b.config.TlsPort != "" { + go b.StartTLSListening() + } +} + +func (b *Broker) StartWebsocketListening() { + path := "/" + b.config.WsPath + hp := ":" + b.config.WsPort + log.Info("Start Webscoker Listening on ", hp, path) + http.Handle(path, websocket.Handler(b.wsHandler)) + err := http.ListenAndServe(hp, nil) + if err != nil { + log.Error("ListenAndServe: " + err.Error()) + return + } +} + +func (b *Broker) wsHandler(ws *websocket.Conn) { + atomic.AddUint64(&b.cid, 1) + go b.handleConnection(CLIENT, conn, b.cid) +} + +func (b *Broker) StartTLSListening() { + hp := b.config.TlsHost + ":" + b.config.TlsPort + log.Info("Start TLS Listening client on ", hp) + + l, e := tls.Listen("tcp", hp, b.tlsConfig) + if e != nil { + log.Error("Error listening on ", e) + return + } + tmpDelay := 10 * ACCEPT_MIN_SLEEP + for { + conn, err := l.Accept() + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Temporary() { + log.Error("Temporary Client Accept Error(%v), sleeping %dms", + ne, tmpDelay/time.Millisecond) + time.Sleep(tmpDelay) + tmpDelay *= 2 + if tmpDelay > ACCEPT_MAX_SLEEP { + tmpDelay = ACCEPT_MAX_SLEEP + } + } else { + log.Error("Accept error: %v", err) + } + continue + } + tmpDelay = ACCEPT_MIN_SLEEP + atomic.AddUint64(&b.cid, 1) + go b.handleConnection(CLIENT, conn, b.cid) + } } func (b *Broker) StartListening(typ int) { @@ -56,7 +129,6 @@ func (b *Broker) StartListening(typ int) { } tmpDelay := 10 * ACCEPT_MIN_SLEEP - num := 0 for { conn, err := l.Accept() if err != nil { @@ -74,8 +146,8 @@ func (b *Broker) StartListening(typ int) { continue } tmpDelay = ACCEPT_MIN_SLEEP - num += 1 - go b.handleConnection(typ, conn, num) + atomic.AddUint64(&b.cid, 1) + go b.handleConnection(typ, conn, b.cid) } } diff --git a/broker/config.go b/broker/config.go index 58bfca4..a96bb16 100644 --- a/broker/config.go +++ b/broker/config.go @@ -1,8 +1,10 @@ package broker import ( + "crypto/tls" + "crypto/x509" "encoding/json" - "errors" + "fmt" "io/ioutil" log "github.com/cihub/seelog" @@ -16,6 +18,14 @@ type Config struct { Host string `json:"host"` Port string `json:"port"` Cluster RouteInfo `json:"cluster"` + TlsHost string `json:"tlsHost"` + TlsPort string `json:"tlsPort"` + WsPath string `json:"wsPath"` + WsPort string `json:"wsPort"` + WsTLS bool `json:"wsTLS"` + TlsInfo TLSInfo `json:"tlsInfo"` + Acl bool `json:"acl"` + AclConf string `json:"aclConf"` } type RouteInfo struct { @@ -24,12 +34,23 @@ type RouteInfo struct { Routes []string `json:"routes"` } +type TLSInfo struct { + Verify bool `json:"verify"` + CaFile string `json:"caFile"` + CertFile string `json:"certFile"` + KeyFile string `json:"keyFile"` +} + func LoadConfig() (*Config, error) { + content, err := ioutil.ReadFile(CONFIGFILE) if err != nil { log.Error("Read config file error: ", err) return nil, err } + + log.Info(string(content)) + var config Config err = json.Unmarshal(content, &config) if err != nil { @@ -41,8 +62,6 @@ func LoadConfig() (*Config, error) { if config.Host == "" { config.Host = "0.0.0.0" } - } else { - return nil, errors.New("Listen port nil") } if config.Cluster.Port != "" { @@ -51,5 +70,54 @@ func LoadConfig() (*Config, error) { } } + if config.TlsPort != "" { + if config.TlsInfo.CertFile == "" || config.TlsInfo.KeyFile == "" { + log.Error("tls config error, no cert or key file.") + return nil, err + } + if config.TlsHost == "" { + config.TlsHost = "0.0.0.0" + } + } + + return &config, nil +} + +func NewTLSConfig(tlsInfo TLSInfo) (*tls.Config, error) { + + cert, err := tls.LoadX509KeyPair(tlsInfo.CertFile, tlsInfo.KeyFile) + if err != nil { + return nil, fmt.Errorf("error parsing X509 certificate/key pair: %v", err) + } + cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0]) + if err != nil { + return nil, fmt.Errorf("error parsing certificate: %v", err) + } + + // Create TLSConfig + // We will determine the cipher suites that we prefer. + config := tls.Config{ + Certificates: []tls.Certificate{cert}, + MinVersion: tls.VersionTLS12, + } + + // Require client certificates as needed + if tlsInfo.Verify { + config.ClientAuth = tls.RequireAndVerifyClientCert + } + // Add in CAs if applicable. + if tlsInfo.CaFile != "" { + rootPEM, err := ioutil.ReadFile(tlsInfo.CaFile) + if err != nil || rootPEM == nil { + return nil, err + } + pool := x509.NewCertPool() + ok := pool.AppendCertsFromPEM([]byte(rootPEM)) + if !ok { + return nil, fmt.Errorf("failed to parse root ca certificate") + } + config.ClientCAs = pool + } + return &config, nil } diff --git a/fhmq b/fhmq deleted file mode 100755 index fe2a52c..0000000 Binary files a/fhmq and /dev/null differ diff --git a/hmq.exe b/hmq.exe new file mode 100644 index 0000000..8f4bff0 Binary files /dev/null and b/hmq.exe differ