summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2022-08-16 23:27:34 +0700
committerShulhan <ms@kilabit.info>2022-08-17 00:55:14 +0700
commitc0420ca7fb513f8ee8b224f512124819bd2af34e (patch)
tree1b6174409a5cca859b462e74cc5ac2c491c1f74a
parentd769cfe466e2a5f96d396f91f6000c4fc604d836 (diff)
downloadhaminer-c0420ca7fb513f8ee8b224f512124819bd2af34e.tar.xz
all: restructure the log that we send to influxdb
Since the influxdb v2, and since the flux query language introduced, the way the log read and queried kinda changes. Things that we can query on v1, is not possible (or maybe I forgotten) anymore. This changes move all haproxy log fields value that is not number (except HTTP status code) to tags and left all numbers (like time, number of connections, bytes read) in the fields.
-rw-r--r--config.go2
-rw-r--r--influxdb.go108
2 files changed, 70 insertions, 40 deletions
diff --git a/config.go b/config.go
index 857a392..78df898 100644
--- a/config.go
+++ b/config.go
@@ -42,7 +42,7 @@ type Config struct {
// AcceptBackend list of backend to be filtered.
AcceptBackend []string `ini:"haminer::accept_backend"`
- // List of request headers to be parsed and mapped as keys in halog
+ // List of request headers to be parsed and mapped as tags in halog
// output.
RequestHeaders []string `ini:"haminer::capture_request_header"`
diff --git a/influxdb.go b/influxdb.go
index 906f8c0..3868574 100644
--- a/influxdb.go
+++ b/influxdb.go
@@ -3,7 +3,7 @@ package haminer
import (
"bytes"
"fmt"
- "io/ioutil"
+ "io"
"log"
"net/http"
"os"
@@ -13,25 +13,35 @@ const (
envHostname = "HOSTNAME"
defHostname = "localhost"
defContentType = "application/octet-stream"
- influxdbFormat = "" +
- // measurements
- "haproxy," +
- // tags
- "host=%q," +
- "frontend=%q,backend=%q,server=%q," +
- "tag_http_status=%d,tag_http_url=%s,tag_http_method=%s" +
- " " +
- // fields
- "http_proto=%q,http_method=%q,http_url=%q," +
- "http_query=%q,http_status=%d," +
- "term_state=%q," +
- "client_ip=%q,client_port=%d," +
- "time_req=%d,time_wait=%d,time_connect=%d," +
- "time_rsp=%d,time_all=%d," +
- "conn_active=%d,conn_frontend=%d,conn_backend=%d," +
- "conn_server=%d,conn_retries=%d," +
- "queue_server=%d,queue_backend=%d," +
- "bytes_read=%d"
+
+ influxdMeasurement = `haproxy`
+
+ influxdTags = `,host=%s,` +
+ `server=%s,` +
+ `backend=%s,` +
+ `frontend=%s,` +
+ `http_method=%s,` +
+ `http_url=%s,` +
+ `http_query=%q,` +
+ `http_proto=%s,` +
+ `http_status=%d,` +
+ `term_state=%s,` +
+ `client_ip=%s,` +
+ `client_port=%d`
+
+ influxdFields = `time_req=%d,` +
+ `time_wait=%d,` +
+ `time_connect=%d,` +
+ `time_rsp=%d,` +
+ `time_all=%d,` +
+ `conn_active=%d,` +
+ `conn_frontend=%d,` +
+ `conn_backend=%d,` +
+ `conn_server=%d,` +
+ `conn_retries=%d,` +
+ `queue_server=%d,` +
+ `queue_backend=%d,` +
+ `bytes_read=%d`
)
// InfluxdbClient contains HTTP connection for writing logs to Influxdb.
@@ -123,28 +133,55 @@ func (cl *InfluxdbClient) Forwards(halogs []*Halog) {
}
}()
- rspBody, err := ioutil.ReadAll(httpRes.Body)
+ rspBody, err := io.ReadAll(httpRes.Body)
if err != nil {
- log.Printf(`%s: ioutil.ReadAll: %s`, logp, err)
+ log.Printf(`%s: %s`, logp, err)
}
fmt.Printf(`%s: response: %d %s\n`, logp, httpRes.StatusCode, rspBody)
}
func (cl *InfluxdbClient) write(halogs []*Halog) (err error) {
+ var (
+ l *Halog
+ k string
+ v string
+ )
+
cl.buf.Reset()
- for _, l := range halogs {
- _, err = fmt.Fprintf(&cl.buf, influxdbFormat,
+ for _, l = range halogs {
+ cl.buf.WriteString(influxdMeasurement)
+
+ _, err = fmt.Fprintf(&cl.buf, influxdTags,
// tags
cl.hostname,
- l.FrontendName, l.BackendName, l.ServerName,
- l.HTTPStatus, l.tagHTTPURL, l.HTTPMethod,
- // fields
- l.HTTPProto, l.HTTPMethod, l.HTTPURL,
- l.HTTPQuery, l.HTTPStatus,
+ l.ServerName,
+ l.BackendName,
+ l.FrontendName,
+ l.HTTPMethod,
+ l.HTTPURL,
+ l.HTTPQuery,
+ l.HTTPProto,
+ l.HTTPStatus,
l.TermState,
- l.ClientIP, l.ClientPort,
+ l.ClientIP,
+ l.ClientPort,
+ )
+ if err != nil {
+ return err
+ }
+
+ for k, v = range l.RequestHeaders {
+ _, err = fmt.Fprintf(&cl.buf, ",%s=%s", k, v)
+ if err != nil {
+ return err
+ }
+ }
+
+ cl.buf.WriteByte(' ')
+
+ _, err = fmt.Fprintf(&cl.buf, influxdFields,
l.TimeReq, l.TimeWait, l.TimeConnect,
l.TimeRsp, l.TimeAll,
l.ConnActive, l.ConnFrontend, l.ConnBackend,
@@ -153,19 +190,12 @@ func (cl *InfluxdbClient) write(halogs []*Halog) (err error) {
l.BytesRead,
)
if err != nil {
- return
- }
-
- for k, v := range l.RequestHeaders {
- _, err = fmt.Fprintf(&cl.buf, ",%s=%q", k, v)
- if err != nil {
- return
- }
+ return err
}
_, err = fmt.Fprintf(&cl.buf, " %d\n", l.Timestamp.UnixNano())
if err != nil {
- return
+ return err
}
}