summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2024-03-16 18:07:08 +0700
committerShulhan <ms@kilabit.info>2024-03-16 18:07:08 +0700
commitf3043736f137e3bd37543af22dbca566db2bee81 (patch)
treea0f1e28fc43d2c45a16c254b093ce211eaf5e4a1
parent916d1874a7047162d0894191c0979d47422786a3 (diff)
downloadhaminer-f3043736f137e3bd37543af22dbca566db2bee81.tar.xz
all: comply with all linters recommendations
Some of breaking changes, * Field [Config.HttpUrl] renamed to [Config.HTTPURL] * Field [ConfigForwarder.Url] renamed to [ConfigForwarder.URL] * Struct [HttpLog] renamed to [HTTPLog]
-rw-r--r--config.go8
-rw-r--r--config_forwarder.go6
-rw-r--r--config_test.go14
-rw-r--r--forwarder.go2
-rw-r--r--haminer.go22
-rw-r--r--http_log.go112
-rw-r--r--influxd_client.go13
-rw-r--r--questdb_client.go10
8 files changed, 101 insertions, 86 deletions
diff --git a/config.go b/config.go
index 2c8af49..cc7e9d5 100644
--- a/config.go
+++ b/config.go
@@ -36,7 +36,7 @@ type Config struct {
// output.
RequestHeaders []string `ini:"haminer::capture_request_header"`
- HttpUrl []string `ini:"preprocess:tag:http_url"`
+ HTTPURL []string `ini:"preprocess:tag:http_url"`
// retags contains list of pre-processing rules for tag.
retags []*tagPreprocessor
@@ -127,12 +127,12 @@ func (cfg *Config) parsePreprocessTag() (err error) {
logp = `parsePreprocessTag`
retag *tagPreprocessor
- httpUrl string
+ httpURL string
vals []string
)
- for _, httpUrl = range cfg.HttpUrl {
- vals = strings.Split(httpUrl, "=>")
+ for _, httpURL = range cfg.HTTPURL {
+ vals = strings.Split(httpURL, `=>`)
if len(vals) != 2 {
continue
}
diff --git a/config_forwarder.go b/config_forwarder.go
index 5c7877d..c53b807 100644
--- a/config_forwarder.go
+++ b/config_forwarder.go
@@ -22,7 +22,7 @@ const (
type ConfigForwarder struct {
Version string `ini:"::version"`
- Url string `ini:"::url"`
+ URL string `ini:"::url"`
apiWrite string
headerToken string
@@ -41,7 +41,7 @@ type ConfigForwarder struct {
// init check, validate, and initialize the configuration values.
func (cfg *ConfigForwarder) init(fwName string) (err error) {
- if len(cfg.Url) == 0 {
+ if len(cfg.URL) == 0 {
return
}
@@ -70,7 +70,7 @@ func (cfg *ConfigForwarder) initInfluxd() (err error) {
surl *url.URL
)
- surl, err = url.Parse(cfg.Url)
+ surl, err = url.Parse(cfg.URL)
if err != nil {
return err
}
diff --git a/config_test.go b/config_test.go
index f259dbf..f2b7e96 100644
--- a/config_test.go
+++ b/config_test.go
@@ -66,7 +66,7 @@ func TestLoad(t *testing.T) {
Forwarders: map[string]*ConfigForwarder{
`influxd`: &ConfigForwarder{
Version: `v2`,
- Url: `http://127.0.0.1:8086`,
+ URL: `http://127.0.0.1:8086`,
Org: `kilabit.info`,
Bucket: `haproxy`,
apiWrite: `http://127.0.0.1:8086/api/v2/write?bucket=haproxy&org=kilabit.info&precision=ns`,
@@ -85,7 +85,7 @@ func TestLoad(t *testing.T) {
"host",
"referrer",
},
- HttpUrl: []string{
+ HTTPURL: []string{
`/[0-9]+-\w+-\w+-\w+-\w+-\w+ => /-`,
`/\w+-\w+-\w+-\w+-\w+ => /-`,
`/[0-9]+ => /-`,
@@ -170,7 +170,7 @@ func TestSetListen(t *testing.T) {
func TestParsePreprocessTag(t *testing.T) {
type testCase struct {
desc string
- httpUrl []string
+ httpURL []string
exp []*tagPreprocessor
}
@@ -180,13 +180,13 @@ func TestParsePreprocessTag(t *testing.T) {
var cases = []testCase{{
desc: `With invalid format`,
- httpUrl: []string{``},
+ httpURL: []string{``},
}, {
desc: `With empty regex`,
- httpUrl: []string{`=>`},
+ httpURL: []string{`=>`},
}, {
desc: `With valid value`,
- httpUrl: []string{
+ httpURL: []string{
`/[0-9]+ => /-`,
},
exp: []*tagPreprocessor{{
@@ -205,7 +205,7 @@ func TestParsePreprocessTag(t *testing.T) {
t.Log(c.desc)
cfg.retags = nil
- cfg.HttpUrl = c.httpUrl
+ cfg.HTTPURL = c.httpURL
err = cfg.parsePreprocessTag()
if err != nil {
diff --git a/forwarder.go b/forwarder.go
index 51e214e..b9b08b6 100644
--- a/forwarder.go
+++ b/forwarder.go
@@ -6,5 +6,5 @@ package haminer
// Forwarder define an interface to forward parsed HAProxy log to storage
// engine.
type Forwarder interface {
- Forwards(halogs []*HttpLog)
+ Forwards(halogs []*HTTPLog)
}
diff --git a/haminer.go b/haminer.go
index 3744426..7d866a5 100644
--- a/haminer.go
+++ b/haminer.go
@@ -24,7 +24,7 @@ var (
type Haminer struct {
cfg *Config
udpConn *net.UDPConn
- chHttpLog chan *HttpLog
+ httpLogq chan *HTTPLog
ff []Forwarder
isRunning bool
}
@@ -49,9 +49,9 @@ func NewHaminer(cfg *Config) (h *Haminer) {
}
h = &Haminer{
- cfg: cfg,
- chHttpLog: make(chan *HttpLog, 30),
- ff: make([]Forwarder, 0),
+ cfg: cfg,
+ httpLogq: make(chan *HTTPLog, 30),
+ ff: make([]Forwarder, 0),
}
initHostname()
@@ -117,7 +117,7 @@ func (h *Haminer) Start() (err error) {
}
// filter will return true if log is accepted; otherwise it will return false.
-func (h *Haminer) filter(halog *HttpLog) bool {
+func (h *Haminer) filter(halog *HTTPLog) bool {
if halog == nil {
return false
}
@@ -141,7 +141,7 @@ func (h *Haminer) consume() {
var (
packet = make([]byte, 4096)
- halog *HttpLog
+ halog *HTTPLog
err error
n int
ok bool
@@ -153,7 +153,7 @@ func (h *Haminer) consume() {
continue
}
- halog = &HttpLog{}
+ halog = &HTTPLog{}
ok = halog.ParseUDPPacket(packet[:n], h.cfg.RequestHeaders)
if !ok {
@@ -165,11 +165,11 @@ func (h *Haminer) consume() {
continue
}
- h.chHttpLog <- halog
+ h.httpLogq <- halog
}
}
-func (h *Haminer) preprocess(halog *HttpLog) {
+func (h *Haminer) preprocess(halog *HTTPLog) {
halog.tagHTTPURL = halog.HTTPURL
for _, retag := range h.cfg.retags {
halog.tagHTTPURL = retag.preprocess("http_url", halog.tagHTTPURL)
@@ -178,11 +178,11 @@ func (h *Haminer) preprocess(halog *HttpLog) {
func (h *Haminer) produce() {
ticker := time.NewTicker(h.cfg.ForwardInterval)
- halogs := make([]*HttpLog, 0)
+ halogs := make([]*HTTPLog, 0)
for h.isRunning {
select {
- case halog := <-h.chHttpLog:
+ case halog := <-h.httpLogq:
h.preprocess(halog)
halogs = append(halogs, halog)
diff --git a/http_log.go b/http_log.go
index e5685ae..bce442f 100644
--- a/http_log.go
+++ b/http_log.go
@@ -7,6 +7,7 @@ import (
"bytes"
"fmt"
"io"
+ "math"
"strconv"
"strings"
"time"
@@ -43,10 +44,10 @@ const (
`bytes_read=%d`
)
-// HttpLog contains the mapping of haproxy HTTP log format to Go struct.
+// HTTPLog contains the mapping of haproxy HTTP log format to Go struct.
//
// Reference: https://cbonte.github.io/haproxy-dconv/1.7/configuration.html#8.2.3
-type HttpLog struct {
+type HTTPLog struct {
Timestamp time.Time
RequestHeaders map[string]string
@@ -122,19 +123,30 @@ func parseToString(in []byte, sep byte) (string, bool) {
}
func parseToInt32(in []byte, sep byte) (int32, bool) {
- end := bytes.IndexByte(in, sep)
+ var end = bytes.IndexByte(in, sep)
if end < 0 {
return 0, false
}
- v, err := strconv.Atoi(string(in[:end]))
+ var (
+ v int64
+ err error
+ )
+
+ v, err = strconv.ParseInt(string(in[:end]), 10, 32)
if err != nil {
return 0, false
}
copy(in, in[end+1:])
- return int32(v), true
+ if v > math.MaxInt32 {
+ return 0, false
+ }
+
+ var vi32 = int32(v)
+
+ return vi32, true
}
func parseToInt64(in []byte, sep byte) (int64, bool) {
@@ -153,28 +165,28 @@ func parseToInt64(in []byte, sep byte) (int64, bool) {
return v, true
}
-func (halog *HttpLog) parseTimes(in []byte) (ok bool) {
- halog.TimeReq, ok = parseToInt32(in, '/')
+func (httpLog *HTTPLog) parseTimes(in []byte) (ok bool) {
+ httpLog.TimeReq, ok = parseToInt32(in, '/')
if !ok {
return
}
- halog.TimeWait, ok = parseToInt32(in, '/')
+ httpLog.TimeWait, ok = parseToInt32(in, '/')
if !ok {
return
}
- halog.TimeConnect, ok = parseToInt32(in, '/')
+ httpLog.TimeConnect, ok = parseToInt32(in, '/')
if !ok {
return
}
- halog.TimeRsp, ok = parseToInt32(in, '/')
+ httpLog.TimeRsp, ok = parseToInt32(in, '/')
if !ok {
return
}
- halog.TimeAll, ok = parseToInt32(in, ' ')
+ httpLog.TimeAll, ok = parseToInt32(in, ' ')
if !ok {
return
}
@@ -182,28 +194,28 @@ func (halog *HttpLog) parseTimes(in []byte) (ok bool) {
return
}
-func (halog *HttpLog) parseConns(in []byte) (ok bool) {
- halog.ConnActive, ok = parseToInt32(in, '/')
+func (httpLog *HTTPLog) parseConns(in []byte) (ok bool) {
+ httpLog.ConnActive, ok = parseToInt32(in, '/')
if !ok {
return
}
- halog.ConnFrontend, ok = parseToInt32(in, '/')
+ httpLog.ConnFrontend, ok = parseToInt32(in, '/')
if !ok {
return
}
- halog.ConnBackend, ok = parseToInt32(in, '/')
+ httpLog.ConnBackend, ok = parseToInt32(in, '/')
if !ok {
return
}
- halog.ConnServer, ok = parseToInt32(in, '/')
+ httpLog.ConnServer, ok = parseToInt32(in, '/')
if !ok {
return
}
- halog.ConnRetries, ok = parseToInt32(in, ' ')
+ httpLog.ConnRetries, ok = parseToInt32(in, ' ')
if !ok {
return
}
@@ -211,13 +223,13 @@ func (halog *HttpLog) parseConns(in []byte) (ok bool) {
return
}
-func (halog *HttpLog) parseQueue(in []byte) (ok bool) {
- halog.QueueServer, ok = parseToInt32(in, '/')
+func (httpLog *HTTPLog) parseQueue(in []byte) (ok bool) {
+ httpLog.QueueServer, ok = parseToInt32(in, '/')
if !ok {
return
}
- halog.QueueBackend, ok = parseToInt32(in, ' ')
+ httpLog.QueueBackend, ok = parseToInt32(in, ' ')
return
}
@@ -225,7 +237,7 @@ func (halog *HttpLog) parseQueue(in []byte) (ok bool) {
// parserRequestHeaders parse the request header values in log file.
// The request headers start with '{' and end with '}'.
// Each header is separated by '|'.
-func (halog *HttpLog) parseRequestHeaders(in []byte, reqHeaders []string) (ok bool) {
+func (httpLog *HTTPLog) parseRequestHeaders(in []byte, reqHeaders []string) (ok bool) {
if in[0] != '{' {
// Skip if we did not find the beginning.
return true
@@ -244,9 +256,9 @@ func (halog *HttpLog) parseRequestHeaders(in []byte, reqHeaders []string) (ok bo
return
}
- halog.RequestHeaders = make(map[string]string)
+ httpLog.RequestHeaders = make(map[string]string)
for x, name := range reqHeaders {
- halog.RequestHeaders[name] = string(bheaders[x])
+ httpLog.RequestHeaders[name] = string(bheaders[x])
}
copy(in, in[end+2:])
@@ -254,8 +266,8 @@ func (halog *HttpLog) parseRequestHeaders(in []byte, reqHeaders []string) (ok bo
return true
}
-func (halog *HttpLog) parseHTTP(in []byte) (ok bool) {
- halog.HTTPMethod, ok = parseToString(in, ' ')
+func (httpLog *HTTPLog) parseHTTP(in []byte) (ok bool) {
+ httpLog.HTTPMethod, ok = parseToString(in, ' ')
if !ok {
return
}
@@ -265,20 +277,20 @@ func (halog *HttpLog) parseHTTP(in []byte) (ok bool) {
return
}
urlQuery := strings.SplitN(v, "?", 2)
- halog.HTTPURL = urlQuery[0]
+ httpLog.HTTPURL = urlQuery[0]
if len(urlQuery) == 2 {
- halog.HTTPQuery = urlQuery[1]
+ httpLog.HTTPQuery = urlQuery[1]
}
- halog.HTTPProto, ok = parseToString(in, '"')
+ httpLog.HTTPProto, ok = parseToString(in, '"')
return ok
}
-// Parse will parse one line of HAProxy log format into HttpLog.
+// Parse will parse one line of HAProxy log format into HTTPLog.
//
// nolint: gocyclo
-func (halog *HttpLog) Parse(in []byte, reqHeaders []string) (ok bool) {
+func (httpLog *HTTPLog) Parse(in []byte, reqHeaders []string) (ok bool) {
var err error
// Remove prefix from systemd/rsyslog
@@ -288,13 +300,13 @@ func (halog *HttpLog) Parse(in []byte, reqHeaders []string) (ok bool) {
}
// parse client IP
- halog.ClientIP, ok = parseToString(in, ':')
+ httpLog.ClientIP, ok = parseToString(in, ':')
if !ok {
return
}
// parse client port
- halog.ClientPort, ok = parseToInt32(in, ' ')
+ httpLog.ClientPort, ok = parseToInt32(in, ' ')
if !ok {
return
}
@@ -306,80 +318,80 @@ func (halog *HttpLog) Parse(in []byte, reqHeaders []string) (ok bool) {
return
}
- halog.Timestamp, err = time.Parse("2/Jan/2006:15:04:05.000", ts)
+ httpLog.Timestamp, err = time.Parse(`2/Jan/2006:15:04:05.000`, ts)
if err != nil {
return false
}
// parse frontend name
in = in[1:]
- halog.FrontendName, ok = parseToString(in, ' ')
+ httpLog.FrontendName, ok = parseToString(in, ' ')
if !ok {
return
}
// parse backend name
- halog.BackendName, ok = parseToString(in, '/')
+ httpLog.BackendName, ok = parseToString(in, '/')
if !ok {
return
}
// parse server name
- halog.ServerName, ok = parseToString(in, ' ')
+ httpLog.ServerName, ok = parseToString(in, ' ')
if !ok {
return
}
// parse times
- ok = halog.parseTimes(in)
+ ok = httpLog.parseTimes(in)
if !ok {
return
}
// parse HTTP status code
- halog.HTTPStatus, ok = parseToInt32(in, ' ')
+ httpLog.HTTPStatus, ok = parseToInt32(in, ' ')
if !ok {
return
}
// parse bytes read
- halog.BytesRead, ok = parseToInt64(in, ' ')
+ httpLog.BytesRead, ok = parseToInt64(in, ' ')
if !ok {
return
}
// parse request cookie
- halog.CookieReq, ok = parseToString(in, ' ')
+ httpLog.CookieReq, ok = parseToString(in, ' ')
if !ok {
return
}
// parse response cookie
- halog.CookieRsp, ok = parseToString(in, ' ')
+ httpLog.CookieRsp, ok = parseToString(in, ' ')
if !ok {
return
}
// parse termination state
- halog.TermState, ok = parseToString(in, ' ')
+ httpLog.TermState, ok = parseToString(in, ' ')
if !ok {
return
}
// parse number of connections
- ok = halog.parseConns(in)
+ ok = httpLog.parseConns(in)
if !ok {
return
}
// parse number of queue state
- ok = halog.parseQueue(in)
+ ok = httpLog.parseQueue(in)
if !ok {
return
}
if len(reqHeaders) > 0 {
- ok = halog.parseRequestHeaders(in, reqHeaders)
+ ok = httpLog.parseRequestHeaders(in, reqHeaders)
if !ok {
return
}
@@ -387,17 +399,17 @@ func (halog *HttpLog) Parse(in []byte, reqHeaders []string) (ok bool) {
// parse HTTP
in = in[1:]
- ok = halog.parseHTTP(in)
+ ok = httpLog.parseHTTP(in)
return ok
}
// ParseUDPPacket will convert UDP packet (in bytes) to instance of
-// HttpLog.
+// HTTPLog.
//
// It will return nil and false if UDP packet is nil, have zero length, or
// cannot be parsed (rejected).
-func (halog *HttpLog) ParseUDPPacket(packet []byte, reqHeaders []string) bool {
+func (httpLog *HTTPLog) ParseUDPPacket(packet []byte, reqHeaders []string) bool {
if len(packet) == 0 {
return false
}
@@ -418,11 +430,11 @@ func (halog *HttpLog) ParseUDPPacket(packet []byte, reqHeaders []string) bool {
in = packet
}
- return halog.Parse(in, reqHeaders)
+ return httpLog.Parse(in, reqHeaders)
}
// writeIlp write the HTTP log as Influxdb Line Protocol.
-func (httpLog *HttpLog) writeIlp(out io.Writer) (err error) {
+func (httpLog *HTTPLog) writeIlp(out io.Writer) (err error) {
var (
k string
v string
diff --git a/influxd_client.go b/influxd_client.go
index 754a62d..481f641 100644
--- a/influxd_client.go
+++ b/influxd_client.go
@@ -5,6 +5,7 @@ package haminer
import (
"bytes"
+ "context"
"fmt"
"io"
"log"
@@ -26,7 +27,7 @@ type InfluxdClient struct {
// NewInfluxdClient will create, initialize, and return new Influxd client.
func NewInfluxdClient(cfg *ConfigForwarder) (cl *InfluxdClient) {
- if len(cfg.Url) == 0 {
+ if len(cfg.URL) == 0 {
return nil
}
@@ -62,7 +63,7 @@ func (cl *InfluxdClient) initConn() {
// Forwards implement the Forwarder interface. It will write all logs to
// Influxd.
-func (cl *InfluxdClient) Forwards(halogs []*HttpLog) {
+func (cl *InfluxdClient) Forwards(halogs []*HTTPLog) {
var (
logp = `influxdClient: Forwards`
@@ -77,7 +78,9 @@ func (cl *InfluxdClient) Forwards(halogs []*HttpLog) {
return
}
- httpReq, err = http.NewRequest(http.MethodPost, cl.cfg.apiWrite, &cl.buf)
+ var ctx = context.Background()
+
+ httpReq, err = http.NewRequestWithContext(ctx, http.MethodPost, cl.cfg.apiWrite, &cl.buf)
if err != nil {
log.Printf(`%s: %s`, logp, err)
return
@@ -117,9 +120,9 @@ func (cl *InfluxdClient) Forwards(halogs []*HttpLog) {
fmt.Printf(`%s: response: %d %s\n`, logp, httpRes.StatusCode, rspBody)
}
-func (cl *InfluxdClient) write(halogs []*HttpLog) (err error) {
+func (cl *InfluxdClient) write(halogs []*HTTPLog) (err error) {
var (
- l *HttpLog
+ l *HTTPLog
k string
v string
)
diff --git a/questdb_client.go b/questdb_client.go
index 562e55c..d185978 100644
--- a/questdb_client.go
+++ b/questdb_client.go
@@ -24,10 +24,10 @@ type questdbClient struct {
buf bytes.Buffer
}
-// newQuestdbClient create and initialize client connection using the Url in
+// newQuestdbClient create and initialize client connection using the URL in
// the ConfigForwarder.
func newQuestdbClient(cfg *ConfigForwarder) (questc *questdbClient, err error) {
- if cfg == nil || len(cfg.Url) == 0 {
+ if cfg == nil || len(cfg.URL) == 0 {
return nil, nil
}
@@ -41,7 +41,7 @@ func newQuestdbClient(cfg *ConfigForwarder) (questc *questdbClient, err error) {
port uint16
)
- surl, err = url.Parse(cfg.Url)
+ surl, err = url.Parse(cfg.URL)
if err != nil {
return nil, fmt.Errorf(`%s: %w`, logp, err)
}
@@ -69,12 +69,12 @@ func newQuestdbClient(cfg *ConfigForwarder) (questc *questdbClient, err error) {
// Forwards implement the Forwarder interface.
// It will write all logs to questdb.
-func (questc *questdbClient) Forwards(logs []*HttpLog) {
+func (questc *questdbClient) Forwards(logs []*HTTPLog) {
var (
logp = `questdbClient: Forwards`
now = time.Now()
- httpLog *HttpLog
+ httpLog *HTTPLog
data []byte
err error
)