diff options
| author | Shulhan <ms@kilabit.info> | 2018-04-01 05:01:02 +0700 |
|---|---|---|
| committer | Shulhan <ms@kilabit.info> | 2018-04-01 05:01:02 +0700 |
| commit | 3987f51dda643f770729ac39729def7d64ebb9f9 (patch) | |
| tree | 0b20f329efc1972fe76dfda3de03de4af9ad9587 /influxdb.go | |
| download | haminer-3987f51dda643f770729ac39729def7d64ebb9f9.tar.xz | |
haminer: Library and program to parse and forward HAProxy logs
Diffstat (limited to 'influxdb.go')
| -rw-r--r-- | influxdb.go | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/influxdb.go b/influxdb.go new file mode 100644 index 0000000..9e479e1 --- /dev/null +++ b/influxdb.go @@ -0,0 +1,142 @@ +package haminer + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "net/http" + "os" +) + +const ( + envHostname = "HOSTNAME" + defHostname = "localhost" + defContentType = "application/octet-stream" + influxdbFormat = "" + + // measurements + "haproxy," + + // tags + "host=%q," + + "frontend=%q,backend=%q,server=%q" + + " " + + // fields + "http_proto=%q,http_method=%q,http_url=%q," + + "http_query=\"%s\",http_status=%d," + + "term_state=%q," + + "client_ip=%q,client_port=%d," + + "time_req=%d,time_wait=%d,time_connect=%d," + + "time_rsp=%d,time_all=%d," + + "conn_active=%d,conn_frontend=%d,conn_backend=%d," + + "conn_server=%d,conn_retries=%d," + + "queue_server=%d,queue_backend=%d," + + "bytes_read=%d" + + " " + + // timestamp + "%d\n" +) + +// +// InfluxdbClient contains HTTP connection for writing logs to Influxdb. +// +type InfluxdbClient struct { + conn *http.Client + apiWrite string + hostname string + buf bytes.Buffer +} + +// +// NewInfluxdbClient will create, initialize, and return new Influxdb client. +// +func NewInfluxdbClient(apiWrite string) (cl *InfluxdbClient) { + cl = &InfluxdbClient{ + apiWrite: apiWrite, + } + + cl.initHostname() + cl.initConn() + + return +} + +func (cl *InfluxdbClient) initHostname() { + var err error + + cl.hostname, err = os.Hostname() + if err != nil { + cl.hostname = os.Getenv(envHostname) + } + if len(cl.hostname) == 0 { + cl.hostname = defHostname + } +} + +func (cl *InfluxdbClient) initConn() { + tr := &http.Transport{} + + cl.conn = &http.Client{ + Transport: tr, + } +} + +// +// Forwards implement the Forwarder interface. It will write all logs to +// Influxdb. +// +func (cl *InfluxdbClient) Forwards(halogs []*Halog) { + lsrc := "InfluxdbClient.Forwards" + cl.write(halogs) + + rsp, err := cl.conn.Post(cl.apiWrite, defContentType, &cl.buf) + if err != nil { + log.Printf("InfluxdbClient.Forwards: %s", err) + } + + if rsp.StatusCode >= 200 || rsp.StatusCode <= 299 { + return + } + + defer func() { + errClose := rsp.Body.Close() + if errClose != nil { + log.Printf("%s: Body.Close: %s\n", lsrc, err) + } + }() + + rspBody, err := ioutil.ReadAll(rsp.Body) + if err != nil { + log.Printf("%s: ioutil.ReadAll: %s", lsrc, err) + } + + fmt.Printf("%s: response: %d %s\n", lsrc, rsp.StatusCode, rspBody) +} + +func (cl *InfluxdbClient) write(halogs []*Halog) { + var err error + + cl.buf.Reset() + + for _, l := range halogs { + _, err = fmt.Fprintf(&cl.buf, influxdbFormat, + // tags + cl.hostname, + l.FrontendName, l.BackendName, l.ServerName, + // fields + l.HTTPProto, l.HTTPMethod, l.HTTPURL, + l.HTTPQuery, l.HTTPStatus, + l.TermState, + l.ClientIP, l.ClientPort, + l.TimeReq, l.TimeWait, l.TimeConnect, + l.TimeRsp, l.TimeAll, + l.ConnActive, l.ConnFrontend, l.ConnBackend, + l.ConnServer, l.ConnRetries, + l.QueueServer, l.QueueBackend, + l.BytesRead, + l.Timestamp.UnixNano(), + ) + if err != nil { + log.Printf("InfluxdbClient.write: %s", err) + } + } +} |
