diff options
| author | Shulhan <ms@kilabit.info> | 2022-08-16 23:27:34 +0700 |
|---|---|---|
| committer | Shulhan <ms@kilabit.info> | 2022-08-17 00:55:14 +0700 |
| commit | c0420ca7fb513f8ee8b224f512124819bd2af34e (patch) | |
| tree | 1b6174409a5cca859b462e74cc5ac2c491c1f74a | |
| parent | d769cfe466e2a5f96d396f91f6000c4fc604d836 (diff) | |
| download | haminer-c0420ca7fb513f8ee8b224f512124819bd2af34e.tar.xz | |
all: restructure the log that we send to influxdb
Since the influxdb v2, and since the flux query language introduced,
the way the log read and queried kinda changes.
Things that we can query on v1, is not possible (or maybe I forgotten)
anymore.
This changes move all haproxy log fields value that is not number (except
HTTP status code) to tags and left all numbers (like time, number of
connections, bytes read) in the fields.
| -rw-r--r-- | config.go | 2 | ||||
| -rw-r--r-- | influxdb.go | 108 |
2 files changed, 70 insertions, 40 deletions
@@ -42,7 +42,7 @@ type Config struct { // AcceptBackend list of backend to be filtered. AcceptBackend []string `ini:"haminer::accept_backend"` - // List of request headers to be parsed and mapped as keys in halog + // List of request headers to be parsed and mapped as tags in halog // output. RequestHeaders []string `ini:"haminer::capture_request_header"` diff --git a/influxdb.go b/influxdb.go index 906f8c0..3868574 100644 --- a/influxdb.go +++ b/influxdb.go @@ -3,7 +3,7 @@ package haminer import ( "bytes" "fmt" - "io/ioutil" + "io" "log" "net/http" "os" @@ -13,25 +13,35 @@ const ( envHostname = "HOSTNAME" defHostname = "localhost" defContentType = "application/octet-stream" - influxdbFormat = "" + - // measurements - "haproxy," + - // tags - "host=%q," + - "frontend=%q,backend=%q,server=%q," + - "tag_http_status=%d,tag_http_url=%s,tag_http_method=%s" + - " " + - // fields - "http_proto=%q,http_method=%q,http_url=%q," + - "http_query=%q,http_status=%d," + - "term_state=%q," + - "client_ip=%q,client_port=%d," + - "time_req=%d,time_wait=%d,time_connect=%d," + - "time_rsp=%d,time_all=%d," + - "conn_active=%d,conn_frontend=%d,conn_backend=%d," + - "conn_server=%d,conn_retries=%d," + - "queue_server=%d,queue_backend=%d," + - "bytes_read=%d" + + influxdMeasurement = `haproxy` + + influxdTags = `,host=%s,` + + `server=%s,` + + `backend=%s,` + + `frontend=%s,` + + `http_method=%s,` + + `http_url=%s,` + + `http_query=%q,` + + `http_proto=%s,` + + `http_status=%d,` + + `term_state=%s,` + + `client_ip=%s,` + + `client_port=%d` + + influxdFields = `time_req=%d,` + + `time_wait=%d,` + + `time_connect=%d,` + + `time_rsp=%d,` + + `time_all=%d,` + + `conn_active=%d,` + + `conn_frontend=%d,` + + `conn_backend=%d,` + + `conn_server=%d,` + + `conn_retries=%d,` + + `queue_server=%d,` + + `queue_backend=%d,` + + `bytes_read=%d` ) // InfluxdbClient contains HTTP connection for writing logs to Influxdb. @@ -123,28 +133,55 @@ func (cl *InfluxdbClient) Forwards(halogs []*Halog) { } }() - rspBody, err := ioutil.ReadAll(httpRes.Body) + rspBody, err := io.ReadAll(httpRes.Body) if err != nil { - log.Printf(`%s: ioutil.ReadAll: %s`, logp, err) + log.Printf(`%s: %s`, logp, err) } fmt.Printf(`%s: response: %d %s\n`, logp, httpRes.StatusCode, rspBody) } func (cl *InfluxdbClient) write(halogs []*Halog) (err error) { + var ( + l *Halog + k string + v string + ) + cl.buf.Reset() - for _, l := range halogs { - _, err = fmt.Fprintf(&cl.buf, influxdbFormat, + for _, l = range halogs { + cl.buf.WriteString(influxdMeasurement) + + _, err = fmt.Fprintf(&cl.buf, influxdTags, // tags cl.hostname, - l.FrontendName, l.BackendName, l.ServerName, - l.HTTPStatus, l.tagHTTPURL, l.HTTPMethod, - // fields - l.HTTPProto, l.HTTPMethod, l.HTTPURL, - l.HTTPQuery, l.HTTPStatus, + l.ServerName, + l.BackendName, + l.FrontendName, + l.HTTPMethod, + l.HTTPURL, + l.HTTPQuery, + l.HTTPProto, + l.HTTPStatus, l.TermState, - l.ClientIP, l.ClientPort, + l.ClientIP, + l.ClientPort, + ) + if err != nil { + return err + } + + for k, v = range l.RequestHeaders { + _, err = fmt.Fprintf(&cl.buf, ",%s=%s", k, v) + if err != nil { + return err + } + } + + cl.buf.WriteByte(' ') + + _, err = fmt.Fprintf(&cl.buf, influxdFields, l.TimeReq, l.TimeWait, l.TimeConnect, l.TimeRsp, l.TimeAll, l.ConnActive, l.ConnFrontend, l.ConnBackend, @@ -153,19 +190,12 @@ func (cl *InfluxdbClient) write(halogs []*Halog) (err error) { l.BytesRead, ) if err != nil { - return - } - - for k, v := range l.RequestHeaders { - _, err = fmt.Fprintf(&cl.buf, ",%s=%q", k, v) - if err != nil { - return - } + return err } _, err = fmt.Fprintf(&cl.buf, " %d\n", l.Timestamp.UnixNano()) if err != nil { - return + return err } } |
