mirror of
https://github.com/fhmq/hmq.git
synced 2026-04-24 10:38:34 +00:00
d
This commit is contained in:
53
broker/broker.go
Normal file
53
broker/broker.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
log "github.com/cihub/seelog"
|
||||
)
|
||||
|
||||
type Broker struct {
|
||||
}
|
||||
|
||||
func NewBroker() *Broker {
|
||||
return &Broker{}
|
||||
}
|
||||
func (b *Broker) StartListening() {
|
||||
l, e := net.Listen("tcp", "0.0.0.0:1883")
|
||||
if e != nil {
|
||||
log.Error("Error listening on ", e)
|
||||
return
|
||||
}
|
||||
tmpDelay := 10 * ACCEPT_MIN_SLEEP
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
if ne, ok := err.(net.Error); ok && ne.Temporary() {
|
||||
log.Error("Temporary Client Accept Error(%v), sleeping %dms",
|
||||
ne, tmpDelay/time.Millisecond)
|
||||
time.Sleep(tmpDelay)
|
||||
tmpDelay *= 2
|
||||
if tmpDelay > ACCEPT_MAX_SLEEP {
|
||||
tmpDelay = ACCEPT_MAX_SLEEP
|
||||
}
|
||||
} else {
|
||||
log.Error("Accept error: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
tmpDelay = ACCEPT_MIN_SLEEP
|
||||
go handleConnection(conn)
|
||||
}
|
||||
}
|
||||
|
||||
func handleConnection(conn net.Conn) {
|
||||
|
||||
//process connect packet
|
||||
connMsg, err := ReadPacket(conn)
|
||||
if err != nil {
|
||||
log.Error("read connect packet error: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
31
broker/const.go
Normal file
31
broker/const.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package broker
|
||||
|
||||
import "time"
|
||||
|
||||
const (
|
||||
// ACCEPT_MIN_SLEEP is the minimum acceptable sleep times on temporary errors.
|
||||
ACCEPT_MIN_SLEEP = 100 * time.Millisecond
|
||||
// ACCEPT_MAX_SLEEP is the maximum acceptable sleep times on temporary errors
|
||||
ACCEPT_MAX_SLEEP = 10 * time.Second
|
||||
// DEFAULT_ROUTE_CONNECT Route solicitation intervals.
|
||||
DEFAULT_ROUTE_CONNECT = 5 * time.Second
|
||||
// DEFAULT_TLS_TIMEOUT
|
||||
DEFAULT_TLS_TIMEOUT = 5 * time.Second
|
||||
)
|
||||
|
||||
const (
|
||||
CONNECT = uint8(iota + 1)
|
||||
CONNACK
|
||||
PUBLISH
|
||||
PUBACK
|
||||
PUBREC
|
||||
PUBREL
|
||||
PUBCOMP
|
||||
SUBSCRIBE
|
||||
SUBACK
|
||||
UNSUBSCRIBE
|
||||
UNSUBACK
|
||||
PINGREQ
|
||||
PINGRESP
|
||||
DISCONNECT
|
||||
)
|
||||
60
broker/packet.go
Normal file
60
broker/packet.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
|
||||
log "github.com/cihub/seelog"
|
||||
)
|
||||
|
||||
func checkError(desc string, err error) {
|
||||
if err != nil {
|
||||
log.Error(desc, " : ", err)
|
||||
}
|
||||
}
|
||||
func ReadPacket(conn net.Conn) ([]byte, error) {
|
||||
if conn == nil {
|
||||
return nil, errors.New("conn is null")
|
||||
}
|
||||
// conn.SetReadDeadline(t)
|
||||
var buf []byte
|
||||
// read fix header
|
||||
b := make([]byte, 1)
|
||||
_, err := io.ReadFull(conn, b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
buf = append(buf, b...)
|
||||
// read rem msg length
|
||||
rembuf, remlen := decodeLength(conn)
|
||||
buf = append(buf, rembuf...)
|
||||
// read rem msg
|
||||
packetBytes := make([]byte, remlen)
|
||||
_, err = io.ReadFull(conn, packetBytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
buf = append(buf, packetBytes...)
|
||||
// log.Info("len buf: ", len(buf))
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
func decodeLength(r io.Reader) ([]byte, int) {
|
||||
var rLength uint32
|
||||
var multiplier uint32
|
||||
var buf []byte
|
||||
b := make([]byte, 1)
|
||||
for {
|
||||
io.ReadFull(r, b)
|
||||
digit := b[0]
|
||||
buf = append(buf, b[0])
|
||||
rLength |= uint32(digit&127) << multiplier
|
||||
if (digit & 128) == 0 {
|
||||
break
|
||||
}
|
||||
multiplier += 7
|
||||
|
||||
}
|
||||
return buf, int(rLength)
|
||||
}
|
||||
13
lib/message.go
Normal file
13
lib/message.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package lib
|
||||
|
||||
import "sync"
|
||||
|
||||
type MessagePool struct {
|
||||
sync.Mutex
|
||||
queue chan *Message
|
||||
}
|
||||
|
||||
func (p *MessagePool) Init(len int, maxusernum int) {
|
||||
p.maxuser = maxusernum
|
||||
p.queue = make(chan *Message, len)
|
||||
}
|
||||
25
main.go
Normal file
25
main.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fhmq/broker"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
||||
log "github.com/cihub/seelog"
|
||||
)
|
||||
|
||||
func main() {
|
||||
broker := broker.NewBroker()
|
||||
broker.StartListening()
|
||||
|
||||
s := waitForSignal()
|
||||
log.Infof("signal got: %v ,broker closed.", s)
|
||||
}
|
||||
func waitForSignal() os.Signal {
|
||||
signalChan := make(chan os.Signal, 1)
|
||||
defer close(signalChan)
|
||||
signal.Notify(signalChan, os.Kill, os.Interrupt)
|
||||
s := <-signalChan
|
||||
signal.Stop(signalChan)
|
||||
return s
|
||||
}
|
||||
64
worker.go
Normal file
64
worker.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
var (
|
||||
MaxWorker = os.Getenv("MAX_WORKERS")
|
||||
MaxQueue = os.Getenv("MAX_QUEUE")
|
||||
)
|
||||
|
||||
// Job represents the job to be run
|
||||
type Job struct {
|
||||
Payload Payload
|
||||
}
|
||||
|
||||
// A buffered channel that we can send work requests on.
|
||||
var JobQueue chan Job
|
||||
|
||||
// Worker represents the worker that executes the job
|
||||
type Worker struct {
|
||||
WorkerPool chan chan Job
|
||||
JobChannel chan Job
|
||||
quit chan bool
|
||||
}
|
||||
|
||||
func NewWorker(workerPool chan chan Job) Worker {
|
||||
return Worker{
|
||||
WorkerPool: workerPool,
|
||||
JobChannel: make(chan Job),
|
||||
quit: make(chan bool)}
|
||||
}
|
||||
|
||||
// Start method starts the run loop for the worker, listening for a quit channel in
|
||||
// case we need to stop it
|
||||
func (w Worker) Start() {
|
||||
go func() {
|
||||
for {
|
||||
// register the current worker into the worker queue.
|
||||
w.WorkerPool <- w.JobChannel
|
||||
|
||||
select {
|
||||
case job := <-w.JobChannel:
|
||||
// we have received a work request.
|
||||
fmt.Println("process msg ")
|
||||
// if err := job.Payload.UploadToS3(); err != nil {
|
||||
// log.Errorf("Error uploading to S3: %s", err.Error())
|
||||
// }
|
||||
|
||||
case <-w.quit:
|
||||
// we have received a signal to stop
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop signals the worker to stop listening for work requests.
|
||||
func (w Worker) Stop() {
|
||||
go func() {
|
||||
w.quit <- true
|
||||
}()
|
||||
}
|
||||
Reference in New Issue
Block a user