From 05b4830d0fd5aaec157139d88beffa1cf0ce0615 Mon Sep 17 00:00:00 2001 From: Shulhan Date: Wed, 17 Aug 2022 13:32:00 +0700 Subject: all: rename struct type Halog to HttpLog Halog contains parsed HTTP log, so its make more readable if we rename the type name. --- forwarder.go | 2 +- halog.go | 390 ------------------------------------------------------ haminer.go | 24 ++-- http_log.go | 390 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ influxd_client.go | 6 +- 5 files changed, 406 insertions(+), 406 deletions(-) delete mode 100644 halog.go create mode 100644 http_log.go diff --git a/forwarder.go b/forwarder.go index 79ab2bc..7c56d20 100644 --- a/forwarder.go +++ b/forwarder.go @@ -3,5 +3,5 @@ package haminer // Forwarder define an interface to forward parsed HAProxy log to storage // engine. type Forwarder interface { - Forwards(halogs []*Halog) + Forwards(halogs []*HttpLog) } diff --git a/halog.go b/halog.go deleted file mode 100644 index 8d4f304..0000000 --- a/halog.go +++ /dev/null @@ -1,390 +0,0 @@ -// Copyright 2018, M. Shulhan (ms@kilabit.info). All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -package haminer - -import ( - "bytes" - "strconv" - "strings" - "time" -) - -// Halog contains the mapping of haproxy HTTP log format to Go struct. -// -// Reference: https://cbonte.github.io/haproxy-dconv/1.7/configuration.html#8.2.3 -type Halog struct { // nolint: maligned - Timestamp time.Time - - ClientIP string - ClientPort int32 - - FrontendName string - BackendName string - ServerName string - - TimeReq int32 - TimeWait int32 - TimeConnect int32 - TimeRsp int32 - TimeAll int32 - - BytesRead int64 - - CookieReq string - CookieRsp string - - TermState string - - ConnActive int32 - ConnFrontend int32 - ConnBackend int32 - ConnServer int32 - ConnRetries int32 - - QueueServer int32 - QueueBackend int32 - - RequestHeaders map[string]string - - HTTPStatus int32 - HTTPMethod string - HTTPURL string - HTTPQuery string - HTTPProto string - - tagHTTPURL string -} - -// cleanPrefix will remove ` [pid]: ` prefix (which -// come from systemd/rsyslog) in input. -func cleanPrefix(in []byte) bool { - start := bytes.IndexByte(in, '[') - if start < 0 { - return false - } - - end := bytes.IndexByte(in[start:], ']') - if end < 0 { - return false - } - - end = start + end + 3 - - copy(in[0:], in[end:]) - - return true -} - -func parseToString(in []byte, sep byte) (string, bool) { - end := bytes.IndexByte(in, sep) - if end < 0 { - return "", false - } - - v := string(in[:end]) - copy(in, in[end+1:]) - - return v, true -} - -func parseToInt32(in []byte, sep byte) (int32, bool) { - end := bytes.IndexByte(in, sep) - if end < 0 { - return 0, false - } - - v, err := strconv.Atoi(string(in[:end])) - if err != nil { - return 0, false - } - - copy(in, in[end+1:]) - - return int32(v), true -} - -func parseToInt64(in []byte, sep byte) (int64, bool) { - end := bytes.IndexByte(in, sep) - if end < 0 { - return 0, false - } - - v, err := strconv.ParseInt(string(in[:end]), 10, 64) - if err != nil { - return 0, false - } - - copy(in, in[end+1:]) - - return v, true -} - -func (halog *Halog) parseTimes(in []byte) (ok bool) { - halog.TimeReq, ok = parseToInt32(in, '/') - if !ok { - return - } - - halog.TimeWait, ok = parseToInt32(in, '/') - if !ok { - return - } - - halog.TimeConnect, ok = parseToInt32(in, '/') - if !ok { - return - } - - halog.TimeRsp, ok = parseToInt32(in, '/') - if !ok { - return - } - - halog.TimeAll, ok = parseToInt32(in, ' ') - if !ok { - return - } - - return -} - -func (halog *Halog) parseConns(in []byte) (ok bool) { - halog.ConnActive, ok = parseToInt32(in, '/') - if !ok { - return - } - - halog.ConnFrontend, ok = parseToInt32(in, '/') - if !ok { - return - } - - halog.ConnBackend, ok = parseToInt32(in, '/') - if !ok { - return - } - - halog.ConnServer, ok = parseToInt32(in, '/') - if !ok { - return - } - - halog.ConnRetries, ok = parseToInt32(in, ' ') - if !ok { - return - } - - return -} - -func (halog *Halog) parseQueue(in []byte) (ok bool) { - halog.QueueServer, ok = parseToInt32(in, '/') - if !ok { - return - } - - halog.QueueBackend, ok = parseToInt32(in, ' ') - - return -} - -// parserRequestHeaders parse the request header values in log file. -// The request headers start with '{' and end with '}'. -// Each header is separated by '|'. -func (halog *Halog) parseRequestHeaders(in []byte, reqHeaders []string) (ok bool) { - if in[0] != '{' { - // Skip if we did not find the beginning. - return true - } - - end := bytes.IndexByte(in, '}') - // Either '}' not found or its empty as in '{}'. - if end <= 1 { - return - } - - sep := []byte{'|'} - bheaders := bytes.Split(in[1:end], sep) - - if len(reqHeaders) != len(bheaders) { - return - } - - halog.RequestHeaders = make(map[string]string) - for x, name := range reqHeaders { - halog.RequestHeaders[name] = string(bheaders[x]) - } - - copy(in, in[end+2:]) - - return true -} - -func (halog *Halog) parseHTTP(in []byte) (ok bool) { - halog.HTTPMethod, ok = parseToString(in, ' ') - if !ok { - return - } - - v, ok := parseToString(in, ' ') - if !ok { - return - } - urlQuery := strings.SplitN(v, "?", 2) - halog.HTTPURL = urlQuery[0] - if len(urlQuery) == 2 { - halog.HTTPQuery = urlQuery[1] - } - - halog.HTTPProto, ok = parseToString(in, '"') - - return ok -} - -// Parse will parse one line of HAProxy log format into Halog. -// -// nolint: gocyclo -func (halog *Halog) Parse(in []byte, reqHeaders []string) (ok bool) { - var err error - - // Remove prefix from systemd/rsyslog - ok = cleanPrefix(in) - if !ok { - return - } - - // parse client IP - halog.ClientIP, ok = parseToString(in, ':') - if !ok { - return - } - - // parse client port - halog.ClientPort, ok = parseToInt32(in, ' ') - if !ok { - return - } - - // parse timestamp, remove '[' and parse until ']' - in = in[1:] - ts, ok := parseToString(in, ']') - if !ok { - return - } - - halog.Timestamp, err = time.Parse("2/Jan/2006:15:04:05.000", ts) - if err != nil { - return false - } - - // parse frontend name - in = in[1:] - halog.FrontendName, ok = parseToString(in, ' ') - if !ok { - return - } - - // parse backend name - halog.BackendName, ok = parseToString(in, '/') - if !ok { - return - } - - // parse server name - halog.ServerName, ok = parseToString(in, ' ') - if !ok { - return - } - - // parse times - ok = halog.parseTimes(in) - if !ok { - return - } - - // parse HTTP status code - halog.HTTPStatus, ok = parseToInt32(in, ' ') - if !ok { - return - } - - // parse bytes read - halog.BytesRead, ok = parseToInt64(in, ' ') - if !ok { - return - } - - // parse request cookie - halog.CookieReq, ok = parseToString(in, ' ') - if !ok { - return - } - - // parse response cookie - halog.CookieRsp, ok = parseToString(in, ' ') - if !ok { - return - } - - // parse termination state - halog.TermState, ok = parseToString(in, ' ') - if !ok { - return - } - - // parse number of connections - ok = halog.parseConns(in) - if !ok { - return - } - - // parse number of queue state - ok = halog.parseQueue(in) - if !ok { - return - } - - if len(reqHeaders) > 0 { - ok = halog.parseRequestHeaders(in, reqHeaders) - if !ok { - return - } - } - - // parse HTTP - in = in[1:] - ok = halog.parseHTTP(in) - - return ok -} - -// ParseUDPPacket will convert UDP packet (in bytes) to instance of -// Halog. -// -// It will return nil and false if UDP packet is nil, have zero length, or -// cannot be parsed (rejected). -func (halog *Halog) ParseUDPPacket(packet []byte, reqHeaders []string) bool { - if len(packet) == 0 { - return false - } - - var ( - endIdx int - in []byte - ) - - if packet[0] == '<' { - endIdx = bytes.IndexByte(packet, '>') - if endIdx < 0 { - return false - } - - in = packet[endIdx+1:] - } else { - in = packet - } - - return halog.Parse(in, reqHeaders) -} diff --git a/haminer.go b/haminer.go index 8b0799f..45c15c7 100644 --- a/haminer.go +++ b/haminer.go @@ -19,7 +19,7 @@ type Haminer struct { cfg *Config udpConn *net.UDPConn chSignal chan os.Signal - chHalog chan *Halog + chHttpLog chan *HttpLog ff []Forwarder isRunning bool } @@ -32,10 +32,10 @@ func NewHaminer(cfg *Config) (h *Haminer) { } h = &Haminer{ - cfg: cfg, - chSignal: make(chan os.Signal, 1), - chHalog: make(chan *Halog, 30), - ff: make([]Forwarder, 0), + cfg: cfg, + chSignal: make(chan os.Signal, 1), + chHttpLog: make(chan *HttpLog, 30), + ff: make([]Forwarder, 0), } signal.Notify(h.chSignal, syscall.SIGHUP, syscall.SIGINT, @@ -84,7 +84,7 @@ func (h *Haminer) Start() (err error) { } // filter will return true if log is accepted; otherwise it will return false. -func (h *Haminer) filter(halog *Halog) bool { +func (h *Haminer) filter(halog *HttpLog) bool { if halog == nil { return false } @@ -108,7 +108,7 @@ func (h *Haminer) consume() { var ( packet = make([]byte, 4096) - halog *Halog + halog *HttpLog err error n int ok bool @@ -120,7 +120,7 @@ func (h *Haminer) consume() { continue } - halog = &Halog{} + halog = &HttpLog{} ok = halog.ParseUDPPacket(packet[:n], h.cfg.RequestHeaders) if !ok { @@ -132,11 +132,11 @@ func (h *Haminer) consume() { continue } - h.chHalog <- halog + h.chHttpLog <- halog } } -func (h *Haminer) preprocess(halog *Halog) { +func (h *Haminer) preprocess(halog *HttpLog) { halog.tagHTTPURL = halog.HTTPURL for _, retag := range h.cfg.retags { halog.tagHTTPURL = retag.preprocess("http_url", halog.tagHTTPURL) @@ -145,11 +145,11 @@ func (h *Haminer) preprocess(halog *Halog) { func (h *Haminer) produce() { ticker := time.NewTicker(h.cfg.ForwardInterval) - halogs := make([]*Halog, 0) + halogs := make([]*HttpLog, 0) for h.isRunning { select { - case halog := <-h.chHalog: + case halog := <-h.chHttpLog: h.preprocess(halog) halogs = append(halogs, halog) diff --git a/http_log.go b/http_log.go new file mode 100644 index 0000000..bbaf225 --- /dev/null +++ b/http_log.go @@ -0,0 +1,390 @@ +// Copyright 2018, M. Shulhan (ms@kilabit.info). All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package haminer + +import ( + "bytes" + "strconv" + "strings" + "time" +) + +// HttpLog contains the mapping of haproxy HTTP log format to Go struct. +// +// Reference: https://cbonte.github.io/haproxy-dconv/1.7/configuration.html#8.2.3 +type HttpLog struct { // nolint: maligned + Timestamp time.Time + + ClientIP string + ClientPort int32 + + FrontendName string + BackendName string + ServerName string + + TimeReq int32 + TimeWait int32 + TimeConnect int32 + TimeRsp int32 + TimeAll int32 + + BytesRead int64 + + CookieReq string + CookieRsp string + + TermState string + + ConnActive int32 + ConnFrontend int32 + ConnBackend int32 + ConnServer int32 + ConnRetries int32 + + QueueServer int32 + QueueBackend int32 + + RequestHeaders map[string]string + + HTTPStatus int32 + HTTPMethod string + HTTPURL string + HTTPQuery string + HTTPProto string + + tagHTTPURL string +} + +// cleanPrefix will remove ` [pid]: ` prefix (which +// come from systemd/rsyslog) in input. +func cleanPrefix(in []byte) bool { + start := bytes.IndexByte(in, '[') + if start < 0 { + return false + } + + end := bytes.IndexByte(in[start:], ']') + if end < 0 { + return false + } + + end = start + end + 3 + + copy(in[0:], in[end:]) + + return true +} + +func parseToString(in []byte, sep byte) (string, bool) { + end := bytes.IndexByte(in, sep) + if end < 0 { + return "", false + } + + v := string(in[:end]) + copy(in, in[end+1:]) + + return v, true +} + +func parseToInt32(in []byte, sep byte) (int32, bool) { + end := bytes.IndexByte(in, sep) + if end < 0 { + return 0, false + } + + v, err := strconv.Atoi(string(in[:end])) + if err != nil { + return 0, false + } + + copy(in, in[end+1:]) + + return int32(v), true +} + +func parseToInt64(in []byte, sep byte) (int64, bool) { + end := bytes.IndexByte(in, sep) + if end < 0 { + return 0, false + } + + v, err := strconv.ParseInt(string(in[:end]), 10, 64) + if err != nil { + return 0, false + } + + copy(in, in[end+1:]) + + return v, true +} + +func (halog *HttpLog) parseTimes(in []byte) (ok bool) { + halog.TimeReq, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.TimeWait, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.TimeConnect, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.TimeRsp, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.TimeAll, ok = parseToInt32(in, ' ') + if !ok { + return + } + + return +} + +func (halog *HttpLog) parseConns(in []byte) (ok bool) { + halog.ConnActive, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.ConnFrontend, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.ConnBackend, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.ConnServer, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.ConnRetries, ok = parseToInt32(in, ' ') + if !ok { + return + } + + return +} + +func (halog *HttpLog) parseQueue(in []byte) (ok bool) { + halog.QueueServer, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.QueueBackend, ok = parseToInt32(in, ' ') + + return +} + +// parserRequestHeaders parse the request header values in log file. +// The request headers start with '{' and end with '}'. +// Each header is separated by '|'. +func (halog *HttpLog) parseRequestHeaders(in []byte, reqHeaders []string) (ok bool) { + if in[0] != '{' { + // Skip if we did not find the beginning. + return true + } + + end := bytes.IndexByte(in, '}') + // Either '}' not found or its empty as in '{}'. + if end <= 1 { + return + } + + sep := []byte{'|'} + bheaders := bytes.Split(in[1:end], sep) + + if len(reqHeaders) != len(bheaders) { + return + } + + halog.RequestHeaders = make(map[string]string) + for x, name := range reqHeaders { + halog.RequestHeaders[name] = string(bheaders[x]) + } + + copy(in, in[end+2:]) + + return true +} + +func (halog *HttpLog) parseHTTP(in []byte) (ok bool) { + halog.HTTPMethod, ok = parseToString(in, ' ') + if !ok { + return + } + + v, ok := parseToString(in, ' ') + if !ok { + return + } + urlQuery := strings.SplitN(v, "?", 2) + halog.HTTPURL = urlQuery[0] + if len(urlQuery) == 2 { + halog.HTTPQuery = urlQuery[1] + } + + halog.HTTPProto, ok = parseToString(in, '"') + + return ok +} + +// Parse will parse one line of HAProxy log format into HttpLog. +// +// nolint: gocyclo +func (halog *HttpLog) Parse(in []byte, reqHeaders []string) (ok bool) { + var err error + + // Remove prefix from systemd/rsyslog + ok = cleanPrefix(in) + if !ok { + return + } + + // parse client IP + halog.ClientIP, ok = parseToString(in, ':') + if !ok { + return + } + + // parse client port + halog.ClientPort, ok = parseToInt32(in, ' ') + if !ok { + return + } + + // parse timestamp, remove '[' and parse until ']' + in = in[1:] + ts, ok := parseToString(in, ']') + if !ok { + return + } + + halog.Timestamp, err = time.Parse("2/Jan/2006:15:04:05.000", ts) + if err != nil { + return false + } + + // parse frontend name + in = in[1:] + halog.FrontendName, ok = parseToString(in, ' ') + if !ok { + return + } + + // parse backend name + halog.BackendName, ok = parseToString(in, '/') + if !ok { + return + } + + // parse server name + halog.ServerName, ok = parseToString(in, ' ') + if !ok { + return + } + + // parse times + ok = halog.parseTimes(in) + if !ok { + return + } + + // parse HTTP status code + halog.HTTPStatus, ok = parseToInt32(in, ' ') + if !ok { + return + } + + // parse bytes read + halog.BytesRead, ok = parseToInt64(in, ' ') + if !ok { + return + } + + // parse request cookie + halog.CookieReq, ok = parseToString(in, ' ') + if !ok { + return + } + + // parse response cookie + halog.CookieRsp, ok = parseToString(in, ' ') + if !ok { + return + } + + // parse termination state + halog.TermState, ok = parseToString(in, ' ') + if !ok { + return + } + + // parse number of connections + ok = halog.parseConns(in) + if !ok { + return + } + + // parse number of queue state + ok = halog.parseQueue(in) + if !ok { + return + } + + if len(reqHeaders) > 0 { + ok = halog.parseRequestHeaders(in, reqHeaders) + if !ok { + return + } + } + + // parse HTTP + in = in[1:] + ok = halog.parseHTTP(in) + + return ok +} + +// ParseUDPPacket will convert UDP packet (in bytes) to instance of +// HttpLog. +// +// It will return nil and false if UDP packet is nil, have zero length, or +// cannot be parsed (rejected). +func (halog *HttpLog) ParseUDPPacket(packet []byte, reqHeaders []string) bool { + if len(packet) == 0 { + return false + } + + var ( + endIdx int + in []byte + ) + + if packet[0] == '<' { + endIdx = bytes.IndexByte(packet, '>') + if endIdx < 0 { + return false + } + + in = packet[endIdx+1:] + } else { + in = packet + } + + return halog.Parse(in, reqHeaders) +} diff --git a/influxd_client.go b/influxd_client.go index 639167e..e096624 100644 --- a/influxd_client.go +++ b/influxd_client.go @@ -86,7 +86,7 @@ func (cl *InfluxdClient) initConn() { // Forwards implement the Forwarder interface. It will write all logs to // Influxd. -func (cl *InfluxdClient) Forwards(halogs []*Halog) { +func (cl *InfluxdClient) Forwards(halogs []*HttpLog) { var ( logp = `Forwards` @@ -141,9 +141,9 @@ func (cl *InfluxdClient) Forwards(halogs []*Halog) { fmt.Printf(`%s: response: %d %s\n`, logp, httpRes.StatusCode, rspBody) } -func (cl *InfluxdClient) write(halogs []*Halog) (err error) { +func (cl *InfluxdClient) write(halogs []*HttpLog) (err error) { var ( - l *Halog + l *HttpLog k string v string ) -- cgit v1.3