diff options
| author | Shulhan <ms@kilabit.info> | 2018-04-01 05:01:02 +0700 |
|---|---|---|
| committer | Shulhan <ms@kilabit.info> | 2018-04-01 05:01:02 +0700 |
| commit | 3987f51dda643f770729ac39729def7d64ebb9f9 (patch) | |
| tree | 0b20f329efc1972fe76dfda3de03de4af9ad9587 | |
| download | haminer-3987f51dda643f770729ac39729def7d64ebb9f9.tar.xz | |
haminer: Library and program to parse and forward HAProxy logs
| -rw-r--r-- | .gitignore | 1 | ||||
| -rw-r--r-- | LICENSE | 31 | ||||
| -rw-r--r-- | Makefile | 12 | ||||
| -rw-r--r-- | README.md | 71 | ||||
| -rw-r--r-- | cmd/haminer/haminer.conf | 39 | ||||
| -rw-r--r-- | cmd/haminer/main.go | 74 | ||||
| -rw-r--r-- | config.go | 121 | ||||
| -rw-r--r-- | forwarder.go | 9 | ||||
| -rw-r--r-- | halog.go | 367 | ||||
| -rw-r--r-- | haminer.go | 178 | ||||
| -rw-r--r-- | influxdb.go | 142 | ||||
| -rw-r--r-- | udppacket.go | 41 |
12 files changed, 1086 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..745b3b6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/haminer @@ -0,0 +1,31 @@ +Copyright 2018, M. Shulhan (ms@kilabit.info). All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of copyright holder nor the names of its contributors may + be used to endorse or promote products derived from this software without + specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +-- +Website: http://kilabit.info +Contact: ms@kilabit.info +Repository: https://github.com/shuLhan/haminer diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..8f971a7 --- /dev/null +++ b/Makefile @@ -0,0 +1,12 @@ +.PHONE: all build install + +all: install + +build: + go build -v ./cmd/haminer + +lint: + -gometalinter --sort=path --disable=maligned ./... + +install: build lint + go install -v ./cmd/haminer diff --git a/README.md b/README.md new file mode 100644 index 0000000..95571e1 --- /dev/null +++ b/README.md @@ -0,0 +1,71 @@ +# haminer + +Library and program to parse and forward HAProxy logs. + +Supported forwarder, + +* Influxdb + + +## Requirements + +* [[ https://golang.org | Go ]] for building from source code +* [[ https://github.com/alecthomas/gometalinter | gometalinter ]] (optional) +* [[ https://git-scm.com/ | git ]] for downloading source code +* [[ https://portal.influxdata.com/downloads | Influxdb ]] for storing + HAProxy log. +* [[ https://portal.influxdata.com/downloads | Chronograf ]] for viewing + influxdb data with graph. + +## Building + +This steps assume that you already installed `Go`, `git`, `gometalinter`, and +`influxdb`. + +Get the source code using git, + + $ git clone git@github.com:shuLhan/haminer.git + $ make + +The binary will be installed on `$GOPATH/bin/haminer`. + + +## Configuration + +`haminer` by default will load it's config from `/etc/haminer.conf`, if not +specified when running the program. + +See `cmd/haminer/haminer.conf` for an example of possible configuration. + + +## Installation + +(1) Copy configuration from `$SOURCE/cmd/haminer/haminer/conf` to +`/etc/haminer.conf` + +(2) Update haminer configuration in `/etc/haminer.conf` + +(3) Update HAProxy config to forward log to UDP port other than rsyslog, for +example, + +``` +global + ... + log 127.0.0.1:5140 haminer + ... +``` + +Then reload or restart HAProxy. + +(4) Create user and database in Influxdb, + + $ influx + > CREATE USER "haminer" WITH PASSWORD 'haminer' + > CREATE DATABASE haminer + > GRANT ALL ON haminer TO haminer + +## Running + +Run the haminer program manually, + + $ $GOPATH/bin/haminer diff --git a/cmd/haminer/haminer.conf b/cmd/haminer/haminer.conf new file mode 100644 index 0000000..b8203bc --- /dev/null +++ b/cmd/haminer/haminer.conf @@ -0,0 +1,39 @@ +## +## Set default listen address in UDP. +## +## Format: <ADDR:PORT> +## Default: 127.0.0.1:5140 +## +## Examples, +##listen=127.0.0.1:5140 +##listen=192.168.56.1:5140 +## + +#listen= + +## +## List of HAProxy backend to be accepted and forwarded to Influxdb. Each +## backend name is separated by comma. +## +## Format: [name],... +## Default: "", no filter (all backend are accepted). +## +## Examples, +## accept_backend=api_01,api_02 +## + +#accept_backend= + +## +## The endpoint for Influxdb HTTP API write, must include database name, and +## optional authentication in query parameters. +## +## Format: http://<hostname|IP-address>:<port>/write?db=<influxdb-name>[&u=username][&p=password] +## Default: "", empty. If empty the log will not forwarded to Influxdb. +## +## Examples, +##influxdb_api_write=http://127.0.0.1:8086/write?db=haminer&u=haminer&p=haminer +##influxdb_api_write=http://192.168.56.10:8086/write?db=haminer&u=haminer&p=haminer&precision=ns +## + +#influxdb_api_write= diff --git a/cmd/haminer/main.go b/cmd/haminer/main.go new file mode 100644 index 0000000..b457070 --- /dev/null +++ b/cmd/haminer/main.go @@ -0,0 +1,74 @@ +// Copyright 2018, M. Shulhan (ms@kilabit.info). All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package main + +import ( + "flag" + "fmt" + "log" + "strings" + + "github.com/shuLhan/haminer" +) + +const ( + defLogPrefix = "haminer: " + defConfig = "/etc/haminer.conf" +) + +func initConfig() (cfg *haminer.Config) { + var ( + flagConfig, flagListen, flagAcceptBackend string + flagInfluxAPIWrite string + ) + + log.SetPrefix(defLogPrefix) + + cfg = haminer.NewConfig() + + flag.StringVar(&flagConfig, "config", defConfig, + "Load configuration from file (default to '/etc/haminer.conf')", + ) + flag.StringVar(&flagListen, haminer.ConfigKeyListen, "", + "Listen for HAProxy log using UDP at ADDRESS:PORT", + ) + flag.StringVar(&flagAcceptBackend, haminer.ConfigKeyAcceptBackend, "", + "List of accepted backend to be filtered (comma separated)", + ) + flag.StringVar(&flagInfluxAPIWrite, haminer.ConfigKeyInfluxAPIWrite, + "", + "HTTP API endpoint to write to Influxdb", + ) + + flag.Parse() + + if len(flagConfig) > 0 { + cfg.Load(flagConfig) + } + if len(flagListen) > 0 { + cfg.SetListen(flagListen) + } + if len(flagAcceptBackend) > 0 { + cfg.AcceptBackend = strings.Split(flagAcceptBackend, ",") + } + if len(flagInfluxAPIWrite) > 0 { + cfg.InfluxAPIWrite = flagInfluxAPIWrite + } + + return +} + +func main() { + cfg := initConfig() + + fmt.Printf("Starting Haminer with config: %+v\n", cfg) + + h := haminer.NewHaminer(cfg) + + err := h.Start() + if err != nil { + log.Println(err) + } +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..9a8ed80 --- /dev/null +++ b/config.go @@ -0,0 +1,121 @@ +// Copyright 2018, M. Shulhan (ms@kilabit.info). All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package haminer + +import ( + "bytes" + "io/ioutil" + "log" + "strconv" + "strings" +) + +// List of config keys. +const ( + ConfigKeyListen = "listen" + ConfigKeyAcceptBackend = "accept_backend" + ConfigKeyInfluxAPIWrite = "influxdb_api_write" +) + +// List of default config key values. +const ( + DefListenAddr = "127.0.0.1" + DefListenPort = 5140 + DefInfluxAPIWrite = "http://127.0.0.1:8086/write?db=haproxy" + DefMaxBufferedLogs = 10 +) + +// +// Config define options to create and run Haminer instance. +// +type Config struct { + // ListenAddr is an IP address where Haminer will bind and receiving + // log from HAProxy. + ListenAddr string + ListenPort int + + // AcceptBackend list of backend to be filtered. + AcceptBackend []string + + // InfluxAPIWrite define HTTP API to write to Influxdb. + InfluxAPIWrite string + + // MaxBufferedLogs define a number of logs that will be keep in buffer + // before being forwarded. + MaxBufferedLogs int +} + +// +// NewConfig will create, initialize, and return new config with defautl +// values. +// +func NewConfig() (cfg *Config) { + return &Config{ + ListenAddr: DefListenAddr, + ListenPort: DefListenPort, + MaxBufferedLogs: DefMaxBufferedLogs, + } +} + +// +// SetListen will parse `v` value as "addr:port", and set config address and port +// based on it. +// +func (cfg *Config) SetListen(v string) { + var err error + + addrPort := strings.Split(v, ":") + switch len(addrPort) { + case 0: + return + case 1: + cfg.ListenAddr = addrPort[0] + case 2: + cfg.ListenAddr = addrPort[0] + cfg.ListenPort, err = strconv.Atoi(addrPort[1]) + if err != nil { + cfg.ListenPort = DefListenPort + } + } +} + +// +// Load will read configuration from file defined by `path`. +// +func (cfg *Config) Load(path string) { + bb, err := ioutil.ReadFile(path) + if err != nil { + log.Println(err) + return + } + + lines := bytes.Split(bb, []byte("\n")) + + for _, line := range lines { + if len(line) == 0 { + continue + } + if line[0] == '#' { + continue + } + + kv := bytes.SplitN(line, []byte("="), 2) + if len(kv) != 2 { + continue + } + + switch string(kv[0]) { + case ConfigKeyListen: + cfg.SetListen(string(kv[1])) + case ConfigKeyAcceptBackend: + v := string(bytes.TrimSpace(kv[1])) + if len(v) > 0 { + cfg.AcceptBackend = strings.Split(v, ",") + } + case ConfigKeyInfluxAPIWrite: + cfg.InfluxAPIWrite = string(kv[1]) + } + } +} diff --git a/forwarder.go b/forwarder.go new file mode 100644 index 0000000..0d59156 --- /dev/null +++ b/forwarder.go @@ -0,0 +1,9 @@ +package haminer + +// +// Forwarder define an interface to forward parsed HAProxy log to storage +// engine. +// +type Forwarder interface { + Forwards(halogs []*Halog) +} diff --git a/halog.go b/halog.go new file mode 100644 index 0000000..811ec16 --- /dev/null +++ b/halog.go @@ -0,0 +1,367 @@ +// Copyright 2018, M. Shulhan (ms@kilabit.info). All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package haminer + +import ( + "bytes" + "strconv" + "strings" + "time" +) + +var ( + timestampLayout = "2/Jan/2006:15:04:05.000" +) + +// +// Halog contains the mapping of haproxy HTTP log format to Go struct. +// +// Reference: https://cbonte.github.io/haproxy-dconv/1.7/configuration.html#8.2.3 +// +type Halog struct { + Timestamp time.Time + ClientIP string + ClientPort int32 + FrontendName string + BackendName string + ServerName string + TimeReq int32 + TimeWait int32 + TimeConnect int32 + TimeRsp int32 + TimeAll int32 + HTTPStatus int32 + BytesRead int64 + CookieReq string + CookieRsp string + TermState string + ConnActive int32 + ConnFrontend int32 + ConnBackend int32 + ConnServer int32 + ConnRetries int32 + QueueServer int32 + QueueBackend int32 + HTTPMethod string + HTTPURL string + HTTPQuery string + HTTPProto string +} + +// +// cleanPrefix will remove `<date-time> <process-name>[pid]: ` prefix (which +// come from systemd/rsyslog) in input. +// +func cleanPrefix(in []byte) bool { + start := bytes.IndexByte(in, '[') + if start < 0 { + return false + } + + end := bytes.IndexByte(in[start:], ']') + if end < 0 { + return false + } + + end = start + end + 3 + + copy(in[0:], in[end:]) + + return true +} + +func parseToString(in []byte, sep byte) (string, bool) { + end := bytes.IndexByte(in, sep) + if end < 0 { + return "", false + } + + v := string(in[:end]) + copy(in, in[end+1:]) + + return v, true +} + +func parseToInt32(in []byte, sep byte) (int32, bool) { + end := bytes.IndexByte(in, sep) + if end < 0 { + return 0, false + } + + v, err := strconv.Atoi(string(in[:end])) + if err != nil { + return 0, false + } + + copy(in, in[end+1:]) + + return int32(v), true +} + +func parseToInt64(in []byte, sep byte) (int64, bool) { + end := bytes.IndexByte(in, sep) + if end < 0 { + return 0, false + } + + v, err := strconv.ParseInt(string(in[:end]), 10, 64) + if err != nil { + return 0, false + } + + copy(in, in[end+1:]) + + return v, true +} + +func (halog *Halog) parseTimes(in []byte) (ok bool) { + halog.TimeReq, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.TimeWait, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.TimeConnect, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.TimeRsp, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.TimeAll, ok = parseToInt32(in, ' ') + if !ok { + return + } + + return +} + +func (halog *Halog) parseConns(in []byte) (ok bool) { + halog.ConnActive, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.ConnFrontend, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.ConnBackend, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.ConnServer, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.ConnRetries, ok = parseToInt32(in, ' ') + if !ok { + return + } + + return +} + +func (halog *Halog) parseQueue(in []byte) (ok bool) { + halog.QueueServer, ok = parseToInt32(in, '/') + if !ok { + return + } + + halog.QueueBackend, ok = parseToInt32(in, ' ') + + return +} + +func (halog *Halog) parseHTTP(in []byte) (ok bool) { + halog.HTTPMethod, ok = parseToString(in, ' ') + if !ok { + return + } + + v, ok := parseToString(in, ' ') + if !ok { + return + } + urlQuery := strings.SplitN(v, "?", 2) + halog.HTTPURL = urlQuery[0] + if len(urlQuery) == 2 { + halog.HTTPQuery = urlQuery[1] + } + + halog.HTTPProto, ok = parseToString(in, '"') + + return +} + +// +// Parse will parse one line of HAProxy log format into Halog. +// +// (1) Remove prefix from systemd/rsyslog +// (2) parse client IP +// (3) parse client port +// (4) parse timestamp, remove '[' and parse until ']' +// (5) parse frontend name +// (6) parse backend name +// (7) parse server name +// (8) parse times +// (9) parse HTTP status code +// (10) parse bytes read +// (11) parse request cookie +// (12) parse response cookie +// (13) parse termination state +// (14) parse number of connections +// (15) parse number of queue state +// (16) parse HTTP +// +// nolint: gocyclo +func (halog *Halog) Parse(in []byte) (ok bool) { + var err error + + // (1) + ok = cleanPrefix(in) + if !ok { + return + } + + // (2) + halog.ClientIP, ok = parseToString(in, ':') + if !ok { + return + } + + // (3) + halog.ClientPort, ok = parseToInt32(in, ' ') + if !ok { + return + } + + // (4) + in = in[1:] + ts, ok := parseToString(in, ']') + if !ok { + return + } + + halog.Timestamp, err = time.Parse(timestampLayout, ts) + if err != nil { + return false + } + + // (5) + in = in[1:] + halog.FrontendName, ok = parseToString(in, ' ') + if !ok { + return + } + + // (6) + halog.BackendName, ok = parseToString(in, '/') + if !ok { + return + } + + // (7) + halog.ServerName, ok = parseToString(in, ' ') + if !ok { + return + } + + // (8) + ok = halog.parseTimes(in) + if !ok { + return + } + + // (9) + halog.HTTPStatus, ok = parseToInt32(in, ' ') + if !ok { + return + } + + // (10) + halog.BytesRead, ok = parseToInt64(in, ' ') + if !ok { + return + } + + // (11) + halog.CookieReq, ok = parseToString(in, ' ') + if !ok { + return + } + + // (12) + halog.CookieRsp, ok = parseToString(in, ' ') + if !ok { + return + } + + // (13) + halog.TermState, ok = parseToString(in, ' ') + if !ok { + return + } + + // (14) + ok = halog.parseConns(in) + if !ok { + return + } + + // (15) + ok = halog.parseQueue(in) + if !ok { + return + } + + // (16) + in = in[1:] + ok = halog.parseHTTP(in) + + return +} + +// +// ParseUDPPacket will convert UDP packet (in bytes) to instance of +// Halog. +// +// It will return nil and false if UDP packet is nil, have zero length, or +// cannot be parsed (rejected). +// +func (halog *Halog) ParseUDPPacket(p *UDPPacket) bool { + if p == nil { + return false + } + if len(p.Bytes) == 0 { + return false + } + + var in []byte + + if p.Bytes[0] == '<' { + endIdx := bytes.IndexByte(p.Bytes, '>') + if endIdx < 0 { + return false + } + + in = make([]byte, len(p.Bytes)) + copy(in, p.Bytes[endIdx+1:]) + } else { + in = p.Bytes + } + + return halog.Parse(in) +} diff --git a/haminer.go b/haminer.go new file mode 100644 index 0000000..b501a65 --- /dev/null +++ b/haminer.go @@ -0,0 +1,178 @@ +// Copyright 2018, M. Shulhan (ms@kilabit.info). All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package haminer + +import ( + "fmt" + "log" + "net" + "os" + "os/signal" + "syscall" +) + +// +// Haminer define the log consumer and producer. +// +type Haminer struct { + cfg *Config + udpConn *net.UDPConn + isRunning bool + chSignal chan os.Signal + chHalog chan *Halog + ff []Forwarder +} + +// +// NewHaminer create, initialize, and return new Haminer instance. If config +// parameter is nil, it will use the default options. +// +func NewHaminer(cfg *Config) (h *Haminer) { + if cfg == nil { + cfg = NewConfig() + } + + h = &Haminer{ + cfg: cfg, + chSignal: make(chan os.Signal, 1), + chHalog: make(chan *Halog, 30), + ff: make([]Forwarder, 0), + } + + signal.Notify(h.chSignal, syscall.SIGHUP, syscall.SIGINT, + syscall.SIGTERM, syscall.SIGQUIT) + + h.createForwarder() + + return +} + +func (h *Haminer) createForwarder() { + if len(h.cfg.InfluxAPIWrite) > 0 { + fwder := NewInfluxdbClient(h.cfg.InfluxAPIWrite) + + h.ff = append(h.ff, fwder) + } +} + +// +// Start will listen for UDP packet and start consuming log, parse, and +// publish it to analytic server. +// +func (h *Haminer) Start() (err error) { + udpAddr := &net.UDPAddr{ + IP: net.ParseIP(h.cfg.ListenAddr), + Port: h.cfg.ListenPort, + } + + h.udpConn, err = net.ListenUDP("udp", udpAddr) + if err != nil { + return + } + + h.isRunning = true + + go h.consume() + go h.produce() + + <-h.chSignal + + h.Stop() + + return +} + +// +// filter will return true if log is accepted; otherwise it will return false. +// +func (h *Haminer) filter(halog *Halog) bool { + if halog == nil { + return false + } + if len(h.cfg.AcceptBackend) == 0 { + return true + } + + for _, be := range h.cfg.AcceptBackend { + if halog.BackendName == be { + return true + } + } + + return false +} + +func (h *Haminer) consume() { + var ( + err error + ok bool + p = NewUDPPacket(0) + ) + + for h.isRunning { + _, err = h.udpConn.Read(p.Bytes) + if err != nil { + continue + } + + halog := &Halog{} + + ok = halog.ParseUDPPacket(p) + if !ok { + continue + } + + ok = h.filter(halog) + if !ok { + continue + } + + h.chHalog <- halog + + p.Reset() + } +} + +func (h *Haminer) forwards(halogs []*Halog) { + for _, fwder := range h.ff { + fwder.Forwards(halogs) + } +} + +func (h *Haminer) produce() { + halogs := make([]*Halog, 0) + + for h.isRunning { + halog, ok := <-h.chHalog + if !ok { + continue + } + + halogs = append(halogs, halog) + + if len(halogs) >= h.cfg.MaxBufferedLogs { + h.forwards(halogs) + halogs = make([]*Halog, 0) + } + } +} + +// +// Stop will close UDP server and clear all resources. +// +func (h *Haminer) Stop() { + h.isRunning = false + + signal.Stop(h.chSignal) + + if h.udpConn != nil { + err := h.udpConn.Close() + if err != nil { + log.Println(err) + } + } + + fmt.Println("Stopped") +} diff --git a/influxdb.go b/influxdb.go new file mode 100644 index 0000000..9e479e1 --- /dev/null +++ b/influxdb.go @@ -0,0 +1,142 @@ +package haminer + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "net/http" + "os" +) + +const ( + envHostname = "HOSTNAME" + defHostname = "localhost" + defContentType = "application/octet-stream" + influxdbFormat = "" + + // measurements + "haproxy," + + // tags + "host=%q," + + "frontend=%q,backend=%q,server=%q" + + " " + + // fields + "http_proto=%q,http_method=%q,http_url=%q," + + "http_query=\"%s\",http_status=%d," + + "term_state=%q," + + "client_ip=%q,client_port=%d," + + "time_req=%d,time_wait=%d,time_connect=%d," + + "time_rsp=%d,time_all=%d," + + "conn_active=%d,conn_frontend=%d,conn_backend=%d," + + "conn_server=%d,conn_retries=%d," + + "queue_server=%d,queue_backend=%d," + + "bytes_read=%d" + + " " + + // timestamp + "%d\n" +) + +// +// InfluxdbClient contains HTTP connection for writing logs to Influxdb. +// +type InfluxdbClient struct { + conn *http.Client + apiWrite string + hostname string + buf bytes.Buffer +} + +// +// NewInfluxdbClient will create, initialize, and return new Influxdb client. +// +func NewInfluxdbClient(apiWrite string) (cl *InfluxdbClient) { + cl = &InfluxdbClient{ + apiWrite: apiWrite, + } + + cl.initHostname() + cl.initConn() + + return +} + +func (cl *InfluxdbClient) initHostname() { + var err error + + cl.hostname, err = os.Hostname() + if err != nil { + cl.hostname = os.Getenv(envHostname) + } + if len(cl.hostname) == 0 { + cl.hostname = defHostname + } +} + +func (cl *InfluxdbClient) initConn() { + tr := &http.Transport{} + + cl.conn = &http.Client{ + Transport: tr, + } +} + +// +// Forwards implement the Forwarder interface. It will write all logs to +// Influxdb. +// +func (cl *InfluxdbClient) Forwards(halogs []*Halog) { + lsrc := "InfluxdbClient.Forwards" + cl.write(halogs) + + rsp, err := cl.conn.Post(cl.apiWrite, defContentType, &cl.buf) + if err != nil { + log.Printf("InfluxdbClient.Forwards: %s", err) + } + + if rsp.StatusCode >= 200 || rsp.StatusCode <= 299 { + return + } + + defer func() { + errClose := rsp.Body.Close() + if errClose != nil { + log.Printf("%s: Body.Close: %s\n", lsrc, err) + } + }() + + rspBody, err := ioutil.ReadAll(rsp.Body) + if err != nil { + log.Printf("%s: ioutil.ReadAll: %s", lsrc, err) + } + + fmt.Printf("%s: response: %d %s\n", lsrc, rsp.StatusCode, rspBody) +} + +func (cl *InfluxdbClient) write(halogs []*Halog) { + var err error + + cl.buf.Reset() + + for _, l := range halogs { + _, err = fmt.Fprintf(&cl.buf, influxdbFormat, + // tags + cl.hostname, + l.FrontendName, l.BackendName, l.ServerName, + // fields + l.HTTPProto, l.HTTPMethod, l.HTTPURL, + l.HTTPQuery, l.HTTPStatus, + l.TermState, + l.ClientIP, l.ClientPort, + l.TimeReq, l.TimeWait, l.TimeConnect, + l.TimeRsp, l.TimeAll, + l.ConnActive, l.ConnFrontend, l.ConnBackend, + l.ConnServer, l.ConnRetries, + l.QueueServer, l.QueueBackend, + l.BytesRead, + l.Timestamp.UnixNano(), + ) + if err != nil { + log.Printf("InfluxdbClient.write: %s", err) + } + } +} diff --git a/udppacket.go b/udppacket.go new file mode 100644 index 0000000..c7b017b --- /dev/null +++ b/udppacket.go @@ -0,0 +1,41 @@ +// Copyright 2018, M. Shulhan (ms@kilabit.info). All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package haminer + +const ( + defSize = 1024 +) + +// +// UDPPacket wrap the slice of bytes for easy manipulation. +// +type UDPPacket struct { + Bytes []byte +} + +// +// NewUDPPacket will create and initialize UDP packet. +// +func NewUDPPacket(size uint32) (p *UDPPacket) { + if size <= 0 { + size = defSize + } + p = &UDPPacket{ + Bytes: make([]byte, size), + } + + return +} + +// +// Reset will set the content of packet data to zero, so it can be used agains +// on Read(). +// +func (p *UDPPacket) Reset() { + p.Bytes[0] = 0 + for x := 1; x < len(p.Bytes); x *= 2 { + copy(p.Bytes[x:], p.Bytes[:x]) + } +} |
