From f3043736f137e3bd37543af22dbca566db2bee81 Mon Sep 17 00:00:00 2001 From: Shulhan Date: Sat, 16 Mar 2024 18:07:08 +0700 Subject: all: comply with all linters recommendations Some of breaking changes, * Field [Config.HttpUrl] renamed to [Config.HTTPURL] * Field [ConfigForwarder.Url] renamed to [ConfigForwarder.URL] * Struct [HttpLog] renamed to [HTTPLog] --- config.go | 8 ++-- config_forwarder.go | 6 +-- config_test.go | 14 +++---- forwarder.go | 2 +- haminer.go | 22 +++++------ http_log.go | 112 +++++++++++++++++++++++++++++----------------------- influxd_client.go | 13 +++--- questdb_client.go | 10 ++--- 8 files changed, 101 insertions(+), 86 deletions(-) diff --git a/config.go b/config.go index 2c8af49..cc7e9d5 100644 --- a/config.go +++ b/config.go @@ -36,7 +36,7 @@ type Config struct { // output. RequestHeaders []string `ini:"haminer::capture_request_header"` - HttpUrl []string `ini:"preprocess:tag:http_url"` + HTTPURL []string `ini:"preprocess:tag:http_url"` // retags contains list of pre-processing rules for tag. retags []*tagPreprocessor @@ -127,12 +127,12 @@ func (cfg *Config) parsePreprocessTag() (err error) { logp = `parsePreprocessTag` retag *tagPreprocessor - httpUrl string + httpURL string vals []string ) - for _, httpUrl = range cfg.HttpUrl { - vals = strings.Split(httpUrl, "=>") + for _, httpURL = range cfg.HTTPURL { + vals = strings.Split(httpURL, `=>`) if len(vals) != 2 { continue } diff --git a/config_forwarder.go b/config_forwarder.go index 5c7877d..c53b807 100644 --- a/config_forwarder.go +++ b/config_forwarder.go @@ -22,7 +22,7 @@ const ( type ConfigForwarder struct { Version string `ini:"::version"` - Url string `ini:"::url"` + URL string `ini:"::url"` apiWrite string headerToken string @@ -41,7 +41,7 @@ type ConfigForwarder struct { // init check, validate, and initialize the configuration values. func (cfg *ConfigForwarder) init(fwName string) (err error) { - if len(cfg.Url) == 0 { + if len(cfg.URL) == 0 { return } @@ -70,7 +70,7 @@ func (cfg *ConfigForwarder) initInfluxd() (err error) { surl *url.URL ) - surl, err = url.Parse(cfg.Url) + surl, err = url.Parse(cfg.URL) if err != nil { return err } diff --git a/config_test.go b/config_test.go index f259dbf..f2b7e96 100644 --- a/config_test.go +++ b/config_test.go @@ -66,7 +66,7 @@ func TestLoad(t *testing.T) { Forwarders: map[string]*ConfigForwarder{ `influxd`: &ConfigForwarder{ Version: `v2`, - Url: `http://127.0.0.1:8086`, + URL: `http://127.0.0.1:8086`, Org: `kilabit.info`, Bucket: `haproxy`, apiWrite: `http://127.0.0.1:8086/api/v2/write?bucket=haproxy&org=kilabit.info&precision=ns`, @@ -85,7 +85,7 @@ func TestLoad(t *testing.T) { "host", "referrer", }, - HttpUrl: []string{ + HTTPURL: []string{ `/[0-9]+-\w+-\w+-\w+-\w+-\w+ => /-`, `/\w+-\w+-\w+-\w+-\w+ => /-`, `/[0-9]+ => /-`, @@ -170,7 +170,7 @@ func TestSetListen(t *testing.T) { func TestParsePreprocessTag(t *testing.T) { type testCase struct { desc string - httpUrl []string + httpURL []string exp []*tagPreprocessor } @@ -180,13 +180,13 @@ func TestParsePreprocessTag(t *testing.T) { var cases = []testCase{{ desc: `With invalid format`, - httpUrl: []string{``}, + httpURL: []string{``}, }, { desc: `With empty regex`, - httpUrl: []string{`=>`}, + httpURL: []string{`=>`}, }, { desc: `With valid value`, - httpUrl: []string{ + httpURL: []string{ `/[0-9]+ => /-`, }, exp: []*tagPreprocessor{{ @@ -205,7 +205,7 @@ func TestParsePreprocessTag(t *testing.T) { t.Log(c.desc) cfg.retags = nil - cfg.HttpUrl = c.httpUrl + cfg.HTTPURL = c.httpURL err = cfg.parsePreprocessTag() if err != nil { diff --git a/forwarder.go b/forwarder.go index 51e214e..b9b08b6 100644 --- a/forwarder.go +++ b/forwarder.go @@ -6,5 +6,5 @@ package haminer // Forwarder define an interface to forward parsed HAProxy log to storage // engine. type Forwarder interface { - Forwards(halogs []*HttpLog) + Forwards(halogs []*HTTPLog) } diff --git a/haminer.go b/haminer.go index 3744426..7d866a5 100644 --- a/haminer.go +++ b/haminer.go @@ -24,7 +24,7 @@ var ( type Haminer struct { cfg *Config udpConn *net.UDPConn - chHttpLog chan *HttpLog + httpLogq chan *HTTPLog ff []Forwarder isRunning bool } @@ -49,9 +49,9 @@ func NewHaminer(cfg *Config) (h *Haminer) { } h = &Haminer{ - cfg: cfg, - chHttpLog: make(chan *HttpLog, 30), - ff: make([]Forwarder, 0), + cfg: cfg, + httpLogq: make(chan *HTTPLog, 30), + ff: make([]Forwarder, 0), } initHostname() @@ -117,7 +117,7 @@ func (h *Haminer) Start() (err error) { } // filter will return true if log is accepted; otherwise it will return false. -func (h *Haminer) filter(halog *HttpLog) bool { +func (h *Haminer) filter(halog *HTTPLog) bool { if halog == nil { return false } @@ -141,7 +141,7 @@ func (h *Haminer) consume() { var ( packet = make([]byte, 4096) - halog *HttpLog + halog *HTTPLog err error n int ok bool @@ -153,7 +153,7 @@ func (h *Haminer) consume() { continue } - halog = &HttpLog{} + halog = &HTTPLog{} ok = halog.ParseUDPPacket(packet[:n], h.cfg.RequestHeaders) if !ok { @@ -165,11 +165,11 @@ func (h *Haminer) consume() { continue } - h.chHttpLog <- halog + h.httpLogq <- halog } } -func (h *Haminer) preprocess(halog *HttpLog) { +func (h *Haminer) preprocess(halog *HTTPLog) { halog.tagHTTPURL = halog.HTTPURL for _, retag := range h.cfg.retags { halog.tagHTTPURL = retag.preprocess("http_url", halog.tagHTTPURL) @@ -178,11 +178,11 @@ func (h *Haminer) preprocess(halog *HttpLog) { func (h *Haminer) produce() { ticker := time.NewTicker(h.cfg.ForwardInterval) - halogs := make([]*HttpLog, 0) + halogs := make([]*HTTPLog, 0) for h.isRunning { select { - case halog := <-h.chHttpLog: + case halog := <-h.httpLogq: h.preprocess(halog) halogs = append(halogs, halog) diff --git a/http_log.go b/http_log.go index e5685ae..bce442f 100644 --- a/http_log.go +++ b/http_log.go @@ -7,6 +7,7 @@ import ( "bytes" "fmt" "io" + "math" "strconv" "strings" "time" @@ -43,10 +44,10 @@ const ( `bytes_read=%d` ) -// HttpLog contains the mapping of haproxy HTTP log format to Go struct. +// HTTPLog contains the mapping of haproxy HTTP log format to Go struct. // // Reference: https://cbonte.github.io/haproxy-dconv/1.7/configuration.html#8.2.3 -type HttpLog struct { +type HTTPLog struct { Timestamp time.Time RequestHeaders map[string]string @@ -122,19 +123,30 @@ func parseToString(in []byte, sep byte) (string, bool) { } func parseToInt32(in []byte, sep byte) (int32, bool) { - end := bytes.IndexByte(in, sep) + var end = bytes.IndexByte(in, sep) if end < 0 { return 0, false } - v, err := strconv.Atoi(string(in[:end])) + var ( + v int64 + err error + ) + + v, err = strconv.ParseInt(string(in[:end]), 10, 32) if err != nil { return 0, false } copy(in, in[end+1:]) - return int32(v), true + if v > math.MaxInt32 { + return 0, false + } + + var vi32 = int32(v) + + return vi32, true } func parseToInt64(in []byte, sep byte) (int64, bool) { @@ -153,28 +165,28 @@ func parseToInt64(in []byte, sep byte) (int64, bool) { return v, true } -func (halog *HttpLog) parseTimes(in []byte) (ok bool) { - halog.TimeReq, ok = parseToInt32(in, '/') +func (httpLog *HTTPLog) parseTimes(in []byte) (ok bool) { + httpLog.TimeReq, ok = parseToInt32(in, '/') if !ok { return } - halog.TimeWait, ok = parseToInt32(in, '/') + httpLog.TimeWait, ok = parseToInt32(in, '/') if !ok { return } - halog.TimeConnect, ok = parseToInt32(in, '/') + httpLog.TimeConnect, ok = parseToInt32(in, '/') if !ok { return } - halog.TimeRsp, ok = parseToInt32(in, '/') + httpLog.TimeRsp, ok = parseToInt32(in, '/') if !ok { return } - halog.TimeAll, ok = parseToInt32(in, ' ') + httpLog.TimeAll, ok = parseToInt32(in, ' ') if !ok { return } @@ -182,28 +194,28 @@ func (halog *HttpLog) parseTimes(in []byte) (ok bool) { return } -func (halog *HttpLog) parseConns(in []byte) (ok bool) { - halog.ConnActive, ok = parseToInt32(in, '/') +func (httpLog *HTTPLog) parseConns(in []byte) (ok bool) { + httpLog.ConnActive, ok = parseToInt32(in, '/') if !ok { return } - halog.ConnFrontend, ok = parseToInt32(in, '/') + httpLog.ConnFrontend, ok = parseToInt32(in, '/') if !ok { return } - halog.ConnBackend, ok = parseToInt32(in, '/') + httpLog.ConnBackend, ok = parseToInt32(in, '/') if !ok { return } - halog.ConnServer, ok = parseToInt32(in, '/') + httpLog.ConnServer, ok = parseToInt32(in, '/') if !ok { return } - halog.ConnRetries, ok = parseToInt32(in, ' ') + httpLog.ConnRetries, ok = parseToInt32(in, ' ') if !ok { return } @@ -211,13 +223,13 @@ func (halog *HttpLog) parseConns(in []byte) (ok bool) { return } -func (halog *HttpLog) parseQueue(in []byte) (ok bool) { - halog.QueueServer, ok = parseToInt32(in, '/') +func (httpLog *HTTPLog) parseQueue(in []byte) (ok bool) { + httpLog.QueueServer, ok = parseToInt32(in, '/') if !ok { return } - halog.QueueBackend, ok = parseToInt32(in, ' ') + httpLog.QueueBackend, ok = parseToInt32(in, ' ') return } @@ -225,7 +237,7 @@ func (halog *HttpLog) parseQueue(in []byte) (ok bool) { // parserRequestHeaders parse the request header values in log file. // The request headers start with '{' and end with '}'. // Each header is separated by '|'. -func (halog *HttpLog) parseRequestHeaders(in []byte, reqHeaders []string) (ok bool) { +func (httpLog *HTTPLog) parseRequestHeaders(in []byte, reqHeaders []string) (ok bool) { if in[0] != '{' { // Skip if we did not find the beginning. return true @@ -244,9 +256,9 @@ func (halog *HttpLog) parseRequestHeaders(in []byte, reqHeaders []string) (ok bo return } - halog.RequestHeaders = make(map[string]string) + httpLog.RequestHeaders = make(map[string]string) for x, name := range reqHeaders { - halog.RequestHeaders[name] = string(bheaders[x]) + httpLog.RequestHeaders[name] = string(bheaders[x]) } copy(in, in[end+2:]) @@ -254,8 +266,8 @@ func (halog *HttpLog) parseRequestHeaders(in []byte, reqHeaders []string) (ok bo return true } -func (halog *HttpLog) parseHTTP(in []byte) (ok bool) { - halog.HTTPMethod, ok = parseToString(in, ' ') +func (httpLog *HTTPLog) parseHTTP(in []byte) (ok bool) { + httpLog.HTTPMethod, ok = parseToString(in, ' ') if !ok { return } @@ -265,20 +277,20 @@ func (halog *HttpLog) parseHTTP(in []byte) (ok bool) { return } urlQuery := strings.SplitN(v, "?", 2) - halog.HTTPURL = urlQuery[0] + httpLog.HTTPURL = urlQuery[0] if len(urlQuery) == 2 { - halog.HTTPQuery = urlQuery[1] + httpLog.HTTPQuery = urlQuery[1] } - halog.HTTPProto, ok = parseToString(in, '"') + httpLog.HTTPProto, ok = parseToString(in, '"') return ok } -// Parse will parse one line of HAProxy log format into HttpLog. +// Parse will parse one line of HAProxy log format into HTTPLog. // // nolint: gocyclo -func (halog *HttpLog) Parse(in []byte, reqHeaders []string) (ok bool) { +func (httpLog *HTTPLog) Parse(in []byte, reqHeaders []string) (ok bool) { var err error // Remove prefix from systemd/rsyslog @@ -288,13 +300,13 @@ func (halog *HttpLog) Parse(in []byte, reqHeaders []string) (ok bool) { } // parse client IP - halog.ClientIP, ok = parseToString(in, ':') + httpLog.ClientIP, ok = parseToString(in, ':') if !ok { return } // parse client port - halog.ClientPort, ok = parseToInt32(in, ' ') + httpLog.ClientPort, ok = parseToInt32(in, ' ') if !ok { return } @@ -306,80 +318,80 @@ func (halog *HttpLog) Parse(in []byte, reqHeaders []string) (ok bool) { return } - halog.Timestamp, err = time.Parse("2/Jan/2006:15:04:05.000", ts) + httpLog.Timestamp, err = time.Parse(`2/Jan/2006:15:04:05.000`, ts) if err != nil { return false } // parse frontend name in = in[1:] - halog.FrontendName, ok = parseToString(in, ' ') + httpLog.FrontendName, ok = parseToString(in, ' ') if !ok { return } // parse backend name - halog.BackendName, ok = parseToString(in, '/') + httpLog.BackendName, ok = parseToString(in, '/') if !ok { return } // parse server name - halog.ServerName, ok = parseToString(in, ' ') + httpLog.ServerName, ok = parseToString(in, ' ') if !ok { return } // parse times - ok = halog.parseTimes(in) + ok = httpLog.parseTimes(in) if !ok { return } // parse HTTP status code - halog.HTTPStatus, ok = parseToInt32(in, ' ') + httpLog.HTTPStatus, ok = parseToInt32(in, ' ') if !ok { return } // parse bytes read - halog.BytesRead, ok = parseToInt64(in, ' ') + httpLog.BytesRead, ok = parseToInt64(in, ' ') if !ok { return } // parse request cookie - halog.CookieReq, ok = parseToString(in, ' ') + httpLog.CookieReq, ok = parseToString(in, ' ') if !ok { return } // parse response cookie - halog.CookieRsp, ok = parseToString(in, ' ') + httpLog.CookieRsp, ok = parseToString(in, ' ') if !ok { return } // parse termination state - halog.TermState, ok = parseToString(in, ' ') + httpLog.TermState, ok = parseToString(in, ' ') if !ok { return } // parse number of connections - ok = halog.parseConns(in) + ok = httpLog.parseConns(in) if !ok { return } // parse number of queue state - ok = halog.parseQueue(in) + ok = httpLog.parseQueue(in) if !ok { return } if len(reqHeaders) > 0 { - ok = halog.parseRequestHeaders(in, reqHeaders) + ok = httpLog.parseRequestHeaders(in, reqHeaders) if !ok { return } @@ -387,17 +399,17 @@ func (halog *HttpLog) Parse(in []byte, reqHeaders []string) (ok bool) { // parse HTTP in = in[1:] - ok = halog.parseHTTP(in) + ok = httpLog.parseHTTP(in) return ok } // ParseUDPPacket will convert UDP packet (in bytes) to instance of -// HttpLog. +// HTTPLog. // // It will return nil and false if UDP packet is nil, have zero length, or // cannot be parsed (rejected). -func (halog *HttpLog) ParseUDPPacket(packet []byte, reqHeaders []string) bool { +func (httpLog *HTTPLog) ParseUDPPacket(packet []byte, reqHeaders []string) bool { if len(packet) == 0 { return false } @@ -418,11 +430,11 @@ func (halog *HttpLog) ParseUDPPacket(packet []byte, reqHeaders []string) bool { in = packet } - return halog.Parse(in, reqHeaders) + return httpLog.Parse(in, reqHeaders) } // writeIlp write the HTTP log as Influxdb Line Protocol. -func (httpLog *HttpLog) writeIlp(out io.Writer) (err error) { +func (httpLog *HTTPLog) writeIlp(out io.Writer) (err error) { var ( k string v string diff --git a/influxd_client.go b/influxd_client.go index 754a62d..481f641 100644 --- a/influxd_client.go +++ b/influxd_client.go @@ -5,6 +5,7 @@ package haminer import ( "bytes" + "context" "fmt" "io" "log" @@ -26,7 +27,7 @@ type InfluxdClient struct { // NewInfluxdClient will create, initialize, and return new Influxd client. func NewInfluxdClient(cfg *ConfigForwarder) (cl *InfluxdClient) { - if len(cfg.Url) == 0 { + if len(cfg.URL) == 0 { return nil } @@ -62,7 +63,7 @@ func (cl *InfluxdClient) initConn() { // Forwards implement the Forwarder interface. It will write all logs to // Influxd. -func (cl *InfluxdClient) Forwards(halogs []*HttpLog) { +func (cl *InfluxdClient) Forwards(halogs []*HTTPLog) { var ( logp = `influxdClient: Forwards` @@ -77,7 +78,9 @@ func (cl *InfluxdClient) Forwards(halogs []*HttpLog) { return } - httpReq, err = http.NewRequest(http.MethodPost, cl.cfg.apiWrite, &cl.buf) + var ctx = context.Background() + + httpReq, err = http.NewRequestWithContext(ctx, http.MethodPost, cl.cfg.apiWrite, &cl.buf) if err != nil { log.Printf(`%s: %s`, logp, err) return @@ -117,9 +120,9 @@ func (cl *InfluxdClient) Forwards(halogs []*HttpLog) { fmt.Printf(`%s: response: %d %s\n`, logp, httpRes.StatusCode, rspBody) } -func (cl *InfluxdClient) write(halogs []*HttpLog) (err error) { +func (cl *InfluxdClient) write(halogs []*HTTPLog) (err error) { var ( - l *HttpLog + l *HTTPLog k string v string ) diff --git a/questdb_client.go b/questdb_client.go index 562e55c..d185978 100644 --- a/questdb_client.go +++ b/questdb_client.go @@ -24,10 +24,10 @@ type questdbClient struct { buf bytes.Buffer } -// newQuestdbClient create and initialize client connection using the Url in +// newQuestdbClient create and initialize client connection using the URL in // the ConfigForwarder. func newQuestdbClient(cfg *ConfigForwarder) (questc *questdbClient, err error) { - if cfg == nil || len(cfg.Url) == 0 { + if cfg == nil || len(cfg.URL) == 0 { return nil, nil } @@ -41,7 +41,7 @@ func newQuestdbClient(cfg *ConfigForwarder) (questc *questdbClient, err error) { port uint16 ) - surl, err = url.Parse(cfg.Url) + surl, err = url.Parse(cfg.URL) if err != nil { return nil, fmt.Errorf(`%s: %w`, logp, err) } @@ -69,12 +69,12 @@ func newQuestdbClient(cfg *ConfigForwarder) (questc *questdbClient, err error) { // Forwards implement the Forwarder interface. // It will write all logs to questdb. -func (questc *questdbClient) Forwards(logs []*HttpLog) { +func (questc *questdbClient) Forwards(logs []*HTTPLog) { var ( logp = `questdbClient: Forwards` now = time.Now() - httpLog *HttpLog + httpLog *HTTPLog data []byte err error ) -- cgit v1.3