force handshark

This commit is contained in:
zhouyuyan
2017-09-06 09:18:43 +08:00
parent 43a6bb8c5d
commit 417a12174c

View File

@@ -63,16 +63,16 @@ func (b *Broker) Start() {
return
}
if b.config.Port != "" {
go b.StartListening(CLIENT)
go b.StartClientListening(false)
}
if b.config.Cluster.Port != "" {
go b.StartListening(ROUTER)
go b.StartClusterListening()
}
if b.config.WsPort != "" {
go b.StartWebsocketListening()
}
if b.config.TlsPort != "" {
go b.StartTLSListening()
go b.StartClientListening(true)
}
if len(b.config.Cluster.Routes) > 0 {
b.ConnectToRouters()
@@ -96,11 +96,17 @@ func (b *Broker) wsHandler(ws *websocket.Conn) {
go b.handleConnection(CLIENT, ws, b.cid)
}
func (b *Broker) StartTLSListening() {
hp := b.config.TlsHost + ":" + b.config.TlsPort
log.Info("Start TLS Listening client on ", hp)
func (b *Broker) StartClientListening(tls bool) {
var hp string
if tls {
hp = b.config.TlsHost + ":" + b.config.TlsPort
log.Info("Start TLS Listening client on ", hp)
} else {
hp := b.config.Host + ":" + b.config.Port
log.Info("Start Listening client on ", hp)
}
l, e := tls.Listen("tcp", hp, b.tlsConfig)
l, e := net.Listen("tcp", hp)
if e != nil {
log.Error("Error listening on ", e)
return
@@ -123,20 +129,49 @@ func (b *Broker) StartTLSListening() {
continue
}
tmpDelay = ACCEPT_MIN_SLEEP
if tls {
if !b.Handshake(conn) {
return
}
}
atomic.AddUint64(&b.cid, 1)
go b.handleConnection(CLIENT, conn, b.cid)
}
}
func (b *Broker) StartListening(typ int) {
var hp string
if typ == CLIENT {
hp = b.config.Host + ":" + b.config.Port
log.Info("Start Listening client on ", hp)
} else if typ == ROUTER {
hp = b.config.Cluster.Host + ":" + b.config.Cluster.Port
log.Info("Start Listening cluster on ", hp)
func (b *Broker) Handshake(conn net.Conn) bool {
nc := tls.Server(conn, b.tlsConfig)
time.AfterFunc(DEFAULT_TLS_TIMEOUT, func() { TlsTimeout(nc) })
nc.SetReadDeadline(time.Now().Add(DEFAULT_TLS_TIMEOUT))
// Force handshake
if err := nc.Handshake(); err != nil {
log.Error("TLS handshake error, ", err)
return false
}
nc.SetReadDeadline(time.Time{})
return true
}
func TlsTimeout(conn *tls.Conn) {
nc := conn
// Check if already closed
if nc == nil {
return
}
cs := nc.ConnectionState()
if !cs.HandshakeComplete {
log.Error("TLS handshake timeout")
nc.Close()
}
}
func (b *Broker) StartClusterListening() {
var hp string = b.config.Cluster.Host + ":" + b.config.Cluster.Port
log.Info("Start Listening cluster on ", hp)
l, e := net.Listen("tcp", hp)
if e != nil {
@@ -164,16 +199,11 @@ func (b *Broker) StartListening(typ int) {
}
tmpDelay = ACCEPT_MIN_SLEEP
if typ == CLIENT {
atomic.AddUint64(&b.cid, 1)
go b.handleConnection(typ, conn, b.cid)
go b.handleConnection(ROUTER, conn, idx)
if idx == 1 {
idx = 0
} else {
go b.handleConnection(typ, conn, idx)
if idx == 1 {
idx = 0
} else {
idx = idx + 1
}
idx = idx + 1
}
}