From fdf8bfaac65122b5e77645b3ad9a1f012a9c4ddb Mon Sep 17 00:00:00 2001 From: "joy.zhou" Date: Fri, 12 Jul 2019 19:39:27 +0800 Subject: [PATCH] update --- plugins/authhttp/authhttp.go | 79 ++++++++++++++++++++++++++++-------- plugins/kafka/kafka.go | 2 +- 2 files changed, 64 insertions(+), 17 deletions(-) diff --git a/plugins/authhttp/authhttp.go b/plugins/authhttp/authhttp.go index 2b82035..66f41c9 100644 --- a/plugins/authhttp/authhttp.go +++ b/plugins/authhttp/authhttp.go @@ -2,10 +2,12 @@ package authhttp import ( "encoding/json" - "fmt" "io/ioutil" "net/http" + "net/url" + "strconv" "strings" + "time" "github.com/fhmq/hmq/logger" "go.uber.org/zap" @@ -17,8 +19,9 @@ const ( ) var ( - config Config - log = logger.Get().Named("http") + config Config + log = logger.Get().Named("http") + httpClient *http.Client ) //Config device kafka config @@ -41,16 +44,36 @@ func Init() { log.Fatal("Unmarshal config file error: ", zap.Error(err)) } + httpClient = &http.Client{ + Transport: &http.Transport{ + MaxConnsPerHost: 100, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + }, + Timeout: time.Second * 100, + } + } //CheckAuth check mqtt connect func CheckAuth(clientID, username, password string) bool { - payload := fmt.Sprintf("username=%s&password=%s&clientid=%s", username, password, clientID) - resp, err := http.Post(config.AuthURL, - "application/x-www-form-urlencoded", - strings.NewReader(payload)) + data := url.Values{} + data.Add("username", username) + data.Add("clientid", clientID) + data.Add("password", password) + + req, err := http.NewRequest("POST", config.AuthURL, strings.NewReader(data.Encode())) if err != nil { - log.Error("request acl: ", zap.Error(err)) + log.Error("new request super: ", zap.Error(err)) + return false + } + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) + + resp, err := httpClient.Do(req) + if err != nil { + log.Error("request super: ", zap.Error(err)) + return false } defer resp.Body.Close() @@ -62,12 +85,23 @@ func CheckAuth(clientID, username, password string) bool { //CheckSuper check mqtt connect func CheckSuper(clientID, username, password string) bool { - payload := fmt.Sprintf("username=%s&password=%s&clientid=%s", username, password, clientID) - resp, err := http.Post(config.SuperURL, - "application/x-www-form-urlencoded", - strings.NewReader(payload)) + data := url.Values{} + data.Add("username", username) + data.Add("clientid", clientID) + data.Add("password", password) + + req, err := http.NewRequest("POST", config.SuperURL, strings.NewReader(data.Encode())) if err != nil { - log.Error("request acl: ", zap.Error(err)) + log.Error("new request super: ", zap.Error(err)) + return false + } + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) + + resp, err := httpClient.Do(req) + if err != nil { + log.Error("request super: ", zap.Error(err)) + return false } defer resp.Body.Close() @@ -79,10 +113,23 @@ func CheckSuper(clientID, username, password string) bool { //CheckACL check mqtt connect func CheckACL(username, access, topic string) bool { - url := fmt.Sprintf(config.ACLURL+"?username=%s&access=%s&topic=%s", username, access, topic) - resp, err := http.Get(url) + req, err := http.NewRequest("GET", config.ACLURL, nil) if err != nil { - // handle error + log.Error("get acl: ", zap.Error(err)) + return false + } + + data := req.URL.Query() + + data.Add("username", username) + data.Add("topic", topic) + data.Add("access", access) + req.URL.RawQuery = data.Encode() + // log.Debugf("req is :%v", req) + resp, err := httpClient.Do(req) + if err != nil { + log.Error("request acl: ", zap.Error(err)) + return false } defer resp.Body.Close() diff --git a/plugins/kafka/kafka.go b/plugins/kafka/kafka.go index ca86d0a..fe76bb4 100644 --- a/plugins/kafka/kafka.go +++ b/plugins/kafka/kafka.go @@ -46,7 +46,7 @@ func Init() { if err != nil { log.Fatal("Unmarshal config file error: ", zap.Error(err)) } - + connect() } //connect