diff options
| -rw-r--r-- | cmd/haminer/haminer.conf | 15 | ||||
| -rw-r--r-- | config.go | 26 | ||||
| -rw-r--r-- | halog.go | 141 | ||||
| -rw-r--r-- | haminer.go | 2 | ||||
| -rw-r--r-- | influxdb.go | 33 |
5 files changed, 150 insertions, 67 deletions
diff --git a/cmd/haminer/haminer.conf b/cmd/haminer/haminer.conf index b8203bc..80bd2e0 100644 --- a/cmd/haminer/haminer.conf +++ b/cmd/haminer/haminer.conf @@ -25,6 +25,21 @@ #accept_backend= ## +## Parse HTTP request header in log file generated by "capture request header +## ..." option. +## +## Format: name | ... +## The name should contains only alphabets and underscore. +## Default: "" (empty) +## +## Examples, +## +## capture_request_header = host | referer +## + +#capture_request_header= + +## ## The endpoint for Influxdb HTTP API write, must include database name, and ## optional authentication in query parameters. ## @@ -14,9 +14,10 @@ import ( // List of config keys. const ( - ConfigKeyListen = "listen" - ConfigKeyAcceptBackend = "accept_backend" - ConfigKeyInfluxAPIWrite = "influxdb_api_write" + ConfigKeyAcceptBackend = "accept_backend" + ConfigKeyCaptureRequestHeader = "capture_request_header" + ConfigKeyInfluxAPIWrite = "influxdb_api_write" + ConfigKeyListen = "listen" ) // List of default config key values. @@ -39,6 +40,10 @@ type Config struct { // AcceptBackend list of backend to be filtered. AcceptBackend []string + // List of request headers to be parsed and mapped as keys in halog + // output. + RequestHeaders []string + // InfluxAPIWrite define HTTP API to write to Influxdb. InfluxAPIWrite string @@ -82,6 +87,19 @@ func (cfg *Config) SetListen(v string) { } // +// parseCaptureRequestHeader Parse request header names where each name is +// separated by "|". +// +func (cfg *Config) parseCaptureRequestHeader(v []byte) { + sep := []byte{'|'} + headers := bytes.Split(v, sep) + for x := 0; x < len(headers); x++ { + headers[x] = bytes.TrimSpace(headers[x]) + cfg.RequestHeaders = append(cfg.RequestHeaders, string(headers[x])) + } +} + +// // Load will read configuration from file defined by `path`. // func (cfg *Config) Load(path string) { @@ -109,6 +127,8 @@ func (cfg *Config) Load(path string) { switch string(kv[0]) { case ConfigKeyListen: cfg.SetListen(string(kv[1])) + case ConfigKeyCaptureRequestHeader: + cfg.parseCaptureRequestHeader(kv[1]) case ConfigKeyAcceptBackend: v := string(bytes.TrimSpace(kv[1])) if len(v) > 0 { @@ -21,33 +21,44 @@ var ( // Reference: https://cbonte.github.io/haproxy-dconv/1.7/configuration.html#8.2.3 // type Halog struct { - Timestamp time.Time - ClientIP string - ClientPort int32 + Timestamp time.Time + + ClientIP string + ClientPort int32 + FrontendName string BackendName string ServerName string - TimeReq int32 - TimeWait int32 - TimeConnect int32 - TimeRsp int32 - TimeAll int32 - HTTPStatus int32 - BytesRead int64 - CookieReq string - CookieRsp string - TermState string + + TimeReq int32 + TimeWait int32 + TimeConnect int32 + TimeRsp int32 + TimeAll int32 + + BytesRead int64 + + CookieReq string + CookieRsp string + + TermState string + ConnActive int32 ConnFrontend int32 ConnBackend int32 ConnServer int32 ConnRetries int32 + QueueServer int32 QueueBackend int32 - HTTPMethod string - HTTPURL string - HTTPQuery string - HTTPProto string + + RequestHeaders map[string]string + + HTTPStatus int32 + HTTPMethod string + HTTPURL string + HTTPQuery string + HTTPProto string } // @@ -185,6 +196,40 @@ func (halog *Halog) parseQueue(in []byte) (ok bool) { return } +// +// parserRequestHeaders parse the request header values in log file. +// The request headers start with '{' and end with '}'. +// Each header is separated by '|'. +// +func (halog *Halog) parseRequestHeaders(in []byte, reqHeaders []string) (ok bool) { + if in[0] != '{' { + // Skip if we did not find the beginning. + return true + } + + end := bytes.IndexByte(in, '}') + // Either '}' not found or its empty as in '{}'. + if end <= 1 { + return + } + + sep := []byte{'|'} + bheaders := bytes.Split(in[1:end], sep) + + if len(reqHeaders) != len(bheaders) { + return + } + + halog.RequestHeaders = make(map[string]string) + for x, name := range reqHeaders { + halog.RequestHeaders[name] = string(bheaders[x]) + } + + copy(in, in[end+2:]) + + return true +} + func (halog *Halog) parseHTTP(in []byte) (ok bool) { halog.HTTPMethod, ok = parseToString(in, ' ') if !ok { @@ -209,46 +254,29 @@ func (halog *Halog) parseHTTP(in []byte) (ok bool) { // // Parse will parse one line of HAProxy log format into Halog. // -// (1) Remove prefix from systemd/rsyslog -// (2) parse client IP -// (3) parse client port -// (4) parse timestamp, remove '[' and parse until ']' -// (5) parse frontend name -// (6) parse backend name -// (7) parse server name -// (8) parse times -// (9) parse HTTP status code -// (10) parse bytes read -// (11) parse request cookie -// (12) parse response cookie -// (13) parse termination state -// (14) parse number of connections -// (15) parse number of queue state -// (16) parse HTTP -// // nolint: gocyclo -func (halog *Halog) Parse(in []byte) (ok bool) { +func (halog *Halog) Parse(in []byte, reqHeaders []string) (ok bool) { var err error - // (1) + // Remove prefix from systemd/rsyslog ok = cleanPrefix(in) if !ok { return } - // (2) + // parse client IP halog.ClientIP, ok = parseToString(in, ':') if !ok { return } - // (3) + // parse client port halog.ClientPort, ok = parseToInt32(in, ' ') if !ok { return } - // (4) + // parse timestamp, remove '[' and parse until ']' in = in[1:] ts, ok := parseToString(in, ']') if !ok { @@ -260,74 +288,81 @@ func (halog *Halog) Parse(in []byte) (ok bool) { return false } - // (5) + // parse frontend name in = in[1:] halog.FrontendName, ok = parseToString(in, ' ') if !ok { return } - // (6) + // parse backend name halog.BackendName, ok = parseToString(in, '/') if !ok { return } - // (7) + // parse server name halog.ServerName, ok = parseToString(in, ' ') if !ok { return } - // (8) + // parse times ok = halog.parseTimes(in) if !ok { return } - // (9) + // parse HTTP status code halog.HTTPStatus, ok = parseToInt32(in, ' ') if !ok { return } - // (10) + // parse bytes read halog.BytesRead, ok = parseToInt64(in, ' ') if !ok { return } - // (11) + // parse request cookie halog.CookieReq, ok = parseToString(in, ' ') if !ok { return } - // (12) + // parse response cookie halog.CookieRsp, ok = parseToString(in, ' ') if !ok { return } - // (13) + // parse termination state halog.TermState, ok = parseToString(in, ' ') if !ok { return } - // (14) + // parse number of connections ok = halog.parseConns(in) if !ok { return } - // (15) + // parse number of queue state ok = halog.parseQueue(in) if !ok { return } - // (16) + if len(reqHeaders) > 0 { + ok = halog.parseRequestHeaders(in, reqHeaders) + if !ok { + return + } + } + + // parse HTTP in = in[1:] ok = halog.parseHTTP(in) @@ -341,7 +376,7 @@ func (halog *Halog) Parse(in []byte) (ok bool) { // It will return nil and false if UDP packet is nil, have zero length, or // cannot be parsed (rejected). // -func (halog *Halog) ParseUDPPacket(p *UDPPacket) bool { +func (halog *Halog) ParseUDPPacket(p *UDPPacket, reqHeaders []string) bool { if p == nil { return false } @@ -363,5 +398,5 @@ func (halog *Halog) ParseUDPPacket(p *UDPPacket) bool { in = p.Bytes } - return halog.Parse(in) + return halog.Parse(in, reqHeaders) } @@ -119,7 +119,7 @@ func (h *Haminer) consume() { halog := &Halog{} - ok = halog.ParseUDPPacket(p) + ok = halog.ParseUDPPacket(p, h.cfg.RequestHeaders) if !ok { continue } diff --git a/influxdb.go b/influxdb.go index 9e479e1..de25f4a 100644 --- a/influxdb.go +++ b/influxdb.go @@ -30,10 +30,7 @@ const ( "conn_active=%d,conn_frontend=%d,conn_backend=%d," + "conn_server=%d,conn_retries=%d," + "queue_server=%d,queue_backend=%d," + - "bytes_read=%d" + - " " + - // timestamp - "%d\n" + "bytes_read=%d" ) // @@ -86,11 +83,16 @@ func (cl *InfluxdbClient) initConn() { // func (cl *InfluxdbClient) Forwards(halogs []*Halog) { lsrc := "InfluxdbClient.Forwards" - cl.write(halogs) + err := cl.write(halogs) + if err != nil { + log.Printf("InfluxdbClient.write: %s", err) + return + } rsp, err := cl.conn.Post(cl.apiWrite, defContentType, &cl.buf) if err != nil { log.Printf("InfluxdbClient.Forwards: %s", err) + return } if rsp.StatusCode >= 200 || rsp.StatusCode <= 299 { @@ -112,9 +114,7 @@ func (cl *InfluxdbClient) Forwards(halogs []*Halog) { fmt.Printf("%s: response: %d %s\n", lsrc, rsp.StatusCode, rspBody) } -func (cl *InfluxdbClient) write(halogs []*Halog) { - var err error - +func (cl *InfluxdbClient) write(halogs []*Halog) (err error) { cl.buf.Reset() for _, l := range halogs { @@ -133,10 +133,23 @@ func (cl *InfluxdbClient) write(halogs []*Halog) { l.ConnServer, l.ConnRetries, l.QueueServer, l.QueueBackend, l.BytesRead, - l.Timestamp.UnixNano(), ) if err != nil { - log.Printf("InfluxdbClient.write: %s", err) + return + } + + for k, v := range l.RequestHeaders { + _, err = fmt.Fprintf(&cl.buf, ",%s=%q", k, v) + if err != nil { + return + } + } + + _, err = fmt.Fprintf(&cl.buf, " %d\n", l.Timestamp.UnixNano()) + if err != nil { + return } } + + return } |
