aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2018-10-29 15:55:46 +0700
committerShulhan <ms@kilabit.info>2018-10-29 15:57:19 +0700
commit57399dafaef33bc715502ad6a7751aa1dfc83036 (patch)
treee863a59cf513ec2a976edc8d898ca493079af7d8
parent5a847bcca1f2513b2d4f12854d8829ab2fb67ec9 (diff)
downloadhaminer-57399dafaef33bc715502ad6a7751aa1dfc83036.tar.xz
Add option to parse HTTP request header
-rw-r--r--cmd/haminer/haminer.conf15
-rw-r--r--config.go26
-rw-r--r--halog.go141
-rw-r--r--haminer.go2
-rw-r--r--influxdb.go33
5 files changed, 150 insertions, 67 deletions
diff --git a/cmd/haminer/haminer.conf b/cmd/haminer/haminer.conf
index b8203bc..80bd2e0 100644
--- a/cmd/haminer/haminer.conf
+++ b/cmd/haminer/haminer.conf
@@ -25,6 +25,21 @@
#accept_backend=
##
+## Parse HTTP request header in log file generated by "capture request header
+## ..." option.
+##
+## Format: name | ...
+## The name should contains only alphabets and underscore.
+## Default: "" (empty)
+##
+## Examples,
+##
+## capture_request_header = host | referer
+##
+
+#capture_request_header=
+
+##
## The endpoint for Influxdb HTTP API write, must include database name, and
## optional authentication in query parameters.
##
diff --git a/config.go b/config.go
index 9a8ed80..01f49c9 100644
--- a/config.go
+++ b/config.go
@@ -14,9 +14,10 @@ import (
// List of config keys.
const (
- ConfigKeyListen = "listen"
- ConfigKeyAcceptBackend = "accept_backend"
- ConfigKeyInfluxAPIWrite = "influxdb_api_write"
+ ConfigKeyAcceptBackend = "accept_backend"
+ ConfigKeyCaptureRequestHeader = "capture_request_header"
+ ConfigKeyInfluxAPIWrite = "influxdb_api_write"
+ ConfigKeyListen = "listen"
)
// List of default config key values.
@@ -39,6 +40,10 @@ type Config struct {
// AcceptBackend list of backend to be filtered.
AcceptBackend []string
+ // List of request headers to be parsed and mapped as keys in halog
+ // output.
+ RequestHeaders []string
+
// InfluxAPIWrite define HTTP API to write to Influxdb.
InfluxAPIWrite string
@@ -82,6 +87,19 @@ func (cfg *Config) SetListen(v string) {
}
//
+// parseCaptureRequestHeader Parse request header names where each name is
+// separated by "|".
+//
+func (cfg *Config) parseCaptureRequestHeader(v []byte) {
+ sep := []byte{'|'}
+ headers := bytes.Split(v, sep)
+ for x := 0; x < len(headers); x++ {
+ headers[x] = bytes.TrimSpace(headers[x])
+ cfg.RequestHeaders = append(cfg.RequestHeaders, string(headers[x]))
+ }
+}
+
+//
// Load will read configuration from file defined by `path`.
//
func (cfg *Config) Load(path string) {
@@ -109,6 +127,8 @@ func (cfg *Config) Load(path string) {
switch string(kv[0]) {
case ConfigKeyListen:
cfg.SetListen(string(kv[1]))
+ case ConfigKeyCaptureRequestHeader:
+ cfg.parseCaptureRequestHeader(kv[1])
case ConfigKeyAcceptBackend:
v := string(bytes.TrimSpace(kv[1]))
if len(v) > 0 {
diff --git a/halog.go b/halog.go
index 811ec16..8d25fd2 100644
--- a/halog.go
+++ b/halog.go
@@ -21,33 +21,44 @@ var (
// Reference: https://cbonte.github.io/haproxy-dconv/1.7/configuration.html#8.2.3
//
type Halog struct {
- Timestamp time.Time
- ClientIP string
- ClientPort int32
+ Timestamp time.Time
+
+ ClientIP string
+ ClientPort int32
+
FrontendName string
BackendName string
ServerName string
- TimeReq int32
- TimeWait int32
- TimeConnect int32
- TimeRsp int32
- TimeAll int32
- HTTPStatus int32
- BytesRead int64
- CookieReq string
- CookieRsp string
- TermState string
+
+ TimeReq int32
+ TimeWait int32
+ TimeConnect int32
+ TimeRsp int32
+ TimeAll int32
+
+ BytesRead int64
+
+ CookieReq string
+ CookieRsp string
+
+ TermState string
+
ConnActive int32
ConnFrontend int32
ConnBackend int32
ConnServer int32
ConnRetries int32
+
QueueServer int32
QueueBackend int32
- HTTPMethod string
- HTTPURL string
- HTTPQuery string
- HTTPProto string
+
+ RequestHeaders map[string]string
+
+ HTTPStatus int32
+ HTTPMethod string
+ HTTPURL string
+ HTTPQuery string
+ HTTPProto string
}
//
@@ -185,6 +196,40 @@ func (halog *Halog) parseQueue(in []byte) (ok bool) {
return
}
+//
+// parserRequestHeaders parse the request header values in log file.
+// The request headers start with '{' and end with '}'.
+// Each header is separated by '|'.
+//
+func (halog *Halog) parseRequestHeaders(in []byte, reqHeaders []string) (ok bool) {
+ if in[0] != '{' {
+ // Skip if we did not find the beginning.
+ return true
+ }
+
+ end := bytes.IndexByte(in, '}')
+ // Either '}' not found or its empty as in '{}'.
+ if end <= 1 {
+ return
+ }
+
+ sep := []byte{'|'}
+ bheaders := bytes.Split(in[1:end], sep)
+
+ if len(reqHeaders) != len(bheaders) {
+ return
+ }
+
+ halog.RequestHeaders = make(map[string]string)
+ for x, name := range reqHeaders {
+ halog.RequestHeaders[name] = string(bheaders[x])
+ }
+
+ copy(in, in[end+2:])
+
+ return true
+}
+
func (halog *Halog) parseHTTP(in []byte) (ok bool) {
halog.HTTPMethod, ok = parseToString(in, ' ')
if !ok {
@@ -209,46 +254,29 @@ func (halog *Halog) parseHTTP(in []byte) (ok bool) {
//
// Parse will parse one line of HAProxy log format into Halog.
//
-// (1) Remove prefix from systemd/rsyslog
-// (2) parse client IP
-// (3) parse client port
-// (4) parse timestamp, remove '[' and parse until ']'
-// (5) parse frontend name
-// (6) parse backend name
-// (7) parse server name
-// (8) parse times
-// (9) parse HTTP status code
-// (10) parse bytes read
-// (11) parse request cookie
-// (12) parse response cookie
-// (13) parse termination state
-// (14) parse number of connections
-// (15) parse number of queue state
-// (16) parse HTTP
-//
// nolint: gocyclo
-func (halog *Halog) Parse(in []byte) (ok bool) {
+func (halog *Halog) Parse(in []byte, reqHeaders []string) (ok bool) {
var err error
- // (1)
+ // Remove prefix from systemd/rsyslog
ok = cleanPrefix(in)
if !ok {
return
}
- // (2)
+ // parse client IP
halog.ClientIP, ok = parseToString(in, ':')
if !ok {
return
}
- // (3)
+ // parse client port
halog.ClientPort, ok = parseToInt32(in, ' ')
if !ok {
return
}
- // (4)
+ // parse timestamp, remove '[' and parse until ']'
in = in[1:]
ts, ok := parseToString(in, ']')
if !ok {
@@ -260,74 +288,81 @@ func (halog *Halog) Parse(in []byte) (ok bool) {
return false
}
- // (5)
+ // parse frontend name
in = in[1:]
halog.FrontendName, ok = parseToString(in, ' ')
if !ok {
return
}
- // (6)
+ // parse backend name
halog.BackendName, ok = parseToString(in, '/')
if !ok {
return
}
- // (7)
+ // parse server name
halog.ServerName, ok = parseToString(in, ' ')
if !ok {
return
}
- // (8)
+ // parse times
ok = halog.parseTimes(in)
if !ok {
return
}
- // (9)
+ // parse HTTP status code
halog.HTTPStatus, ok = parseToInt32(in, ' ')
if !ok {
return
}
- // (10)
+ // parse bytes read
halog.BytesRead, ok = parseToInt64(in, ' ')
if !ok {
return
}
- // (11)
+ // parse request cookie
halog.CookieReq, ok = parseToString(in, ' ')
if !ok {
return
}
- // (12)
+ // parse response cookie
halog.CookieRsp, ok = parseToString(in, ' ')
if !ok {
return
}
- // (13)
+ // parse termination state
halog.TermState, ok = parseToString(in, ' ')
if !ok {
return
}
- // (14)
+ // parse number of connections
ok = halog.parseConns(in)
if !ok {
return
}
- // (15)
+ // parse number of queue state
ok = halog.parseQueue(in)
if !ok {
return
}
- // (16)
+ if len(reqHeaders) > 0 {
+ ok = halog.parseRequestHeaders(in, reqHeaders)
+ if !ok {
+ return
+ }
+ }
+
+ // parse HTTP
in = in[1:]
ok = halog.parseHTTP(in)
@@ -341,7 +376,7 @@ func (halog *Halog) Parse(in []byte) (ok bool) {
// It will return nil and false if UDP packet is nil, have zero length, or
// cannot be parsed (rejected).
//
-func (halog *Halog) ParseUDPPacket(p *UDPPacket) bool {
+func (halog *Halog) ParseUDPPacket(p *UDPPacket, reqHeaders []string) bool {
if p == nil {
return false
}
@@ -363,5 +398,5 @@ func (halog *Halog) ParseUDPPacket(p *UDPPacket) bool {
in = p.Bytes
}
- return halog.Parse(in)
+ return halog.Parse(in, reqHeaders)
}
diff --git a/haminer.go b/haminer.go
index b501a65..245b4f1 100644
--- a/haminer.go
+++ b/haminer.go
@@ -119,7 +119,7 @@ func (h *Haminer) consume() {
halog := &Halog{}
- ok = halog.ParseUDPPacket(p)
+ ok = halog.ParseUDPPacket(p, h.cfg.RequestHeaders)
if !ok {
continue
}
diff --git a/influxdb.go b/influxdb.go
index 9e479e1..de25f4a 100644
--- a/influxdb.go
+++ b/influxdb.go
@@ -30,10 +30,7 @@ const (
"conn_active=%d,conn_frontend=%d,conn_backend=%d," +
"conn_server=%d,conn_retries=%d," +
"queue_server=%d,queue_backend=%d," +
- "bytes_read=%d" +
- " " +
- // timestamp
- "%d\n"
+ "bytes_read=%d"
)
//
@@ -86,11 +83,16 @@ func (cl *InfluxdbClient) initConn() {
//
func (cl *InfluxdbClient) Forwards(halogs []*Halog) {
lsrc := "InfluxdbClient.Forwards"
- cl.write(halogs)
+ err := cl.write(halogs)
+ if err != nil {
+ log.Printf("InfluxdbClient.write: %s", err)
+ return
+ }
rsp, err := cl.conn.Post(cl.apiWrite, defContentType, &cl.buf)
if err != nil {
log.Printf("InfluxdbClient.Forwards: %s", err)
+ return
}
if rsp.StatusCode >= 200 || rsp.StatusCode <= 299 {
@@ -112,9 +114,7 @@ func (cl *InfluxdbClient) Forwards(halogs []*Halog) {
fmt.Printf("%s: response: %d %s\n", lsrc, rsp.StatusCode, rspBody)
}
-func (cl *InfluxdbClient) write(halogs []*Halog) {
- var err error
-
+func (cl *InfluxdbClient) write(halogs []*Halog) (err error) {
cl.buf.Reset()
for _, l := range halogs {
@@ -133,10 +133,23 @@ func (cl *InfluxdbClient) write(halogs []*Halog) {
l.ConnServer, l.ConnRetries,
l.QueueServer, l.QueueBackend,
l.BytesRead,
- l.Timestamp.UnixNano(),
)
if err != nil {
- log.Printf("InfluxdbClient.write: %s", err)
+ return
+ }
+
+ for k, v := range l.RequestHeaders {
+ _, err = fmt.Fprintf(&cl.buf, ",%s=%q", k, v)
+ if err != nil {
+ return
+ }
+ }
+
+ _, err = fmt.Fprintf(&cl.buf, " %d\n", l.Timestamp.UnixNano())
+ if err != nil {
+ return
}
}
+
+ return
}