aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2018-04-01 05:01:02 +0700
committerShulhan <ms@kilabit.info>2018-04-01 05:01:02 +0700
commit3987f51dda643f770729ac39729def7d64ebb9f9 (patch)
tree0b20f329efc1972fe76dfda3de03de4af9ad9587
downloadhaminer-3987f51dda643f770729ac39729def7d64ebb9f9.tar.xz
haminer: Library and program to parse and forward HAProxy logs
-rw-r--r--.gitignore1
-rw-r--r--LICENSE31
-rw-r--r--Makefile12
-rw-r--r--README.md71
-rw-r--r--cmd/haminer/haminer.conf39
-rw-r--r--cmd/haminer/main.go74
-rw-r--r--config.go121
-rw-r--r--forwarder.go9
-rw-r--r--halog.go367
-rw-r--r--haminer.go178
-rw-r--r--influxdb.go142
-rw-r--r--udppacket.go41
12 files changed, 1086 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..745b3b6
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+/haminer
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..a5754fd
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,31 @@
+Copyright 2018, M. Shulhan (ms@kilabit.info). All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+* Neither the name of copyright holder nor the names of its contributors may
+ be used to endorse or promote products derived from this software without
+ specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+--
+Website: http://kilabit.info
+Contact: ms@kilabit.info
+Repository: https://github.com/shuLhan/haminer
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..8f971a7
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,12 @@
+.PHONE: all build install
+
+all: install
+
+build:
+ go build -v ./cmd/haminer
+
+lint:
+ -gometalinter --sort=path --disable=maligned ./...
+
+install: build lint
+ go install -v ./cmd/haminer
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..95571e1
--- /dev/null
+++ b/README.md
@@ -0,0 +1,71 @@
+# haminer
+
+Library and program to parse and forward HAProxy logs.
+
+Supported forwarder,
+
+* Influxdb
+
+
+## Requirements
+
+* [[ https://golang.org | Go ]] for building from source code
+* [[ https://github.com/alecthomas/gometalinter | gometalinter ]] (optional)
+* [[ https://git-scm.com/ | git ]] for downloading source code
+* [[ https://portal.influxdata.com/downloads | Influxdb ]] for storing
+ HAProxy log.
+* [[ https://portal.influxdata.com/downloads | Chronograf ]] for viewing
+ influxdb data with graph.
+
+## Building
+
+This steps assume that you already installed `Go`, `git`, `gometalinter`, and
+`influxdb`.
+
+Get the source code using git,
+
+ $ git clone git@github.com:shuLhan/haminer.git
+ $ make
+
+The binary will be installed on `$GOPATH/bin/haminer`.
+
+
+## Configuration
+
+`haminer` by default will load it's config from `/etc/haminer.conf`, if not
+specified when running the program.
+
+See `cmd/haminer/haminer.conf` for an example of possible configuration.
+
+
+## Installation
+
+(1) Copy configuration from `$SOURCE/cmd/haminer/haminer/conf` to
+`/etc/haminer.conf`
+
+(2) Update haminer configuration in `/etc/haminer.conf`
+
+(3) Update HAProxy config to forward log to UDP port other than rsyslog, for
+example,
+
+```
+global
+ ...
+ log 127.0.0.1:5140 haminer
+ ...
+```
+
+Then reload or restart HAProxy.
+
+(4) Create user and database in Influxdb,
+
+ $ influx
+ > CREATE USER "haminer" WITH PASSWORD 'haminer'
+ > CREATE DATABASE haminer
+ > GRANT ALL ON haminer TO haminer
+
+## Running
+
+Run the haminer program manually,
+
+ $ $GOPATH/bin/haminer
diff --git a/cmd/haminer/haminer.conf b/cmd/haminer/haminer.conf
new file mode 100644
index 0000000..b8203bc
--- /dev/null
+++ b/cmd/haminer/haminer.conf
@@ -0,0 +1,39 @@
+##
+## Set default listen address in UDP.
+##
+## Format: <ADDR:PORT>
+## Default: 127.0.0.1:5140
+##
+## Examples,
+##listen=127.0.0.1:5140
+##listen=192.168.56.1:5140
+##
+
+#listen=
+
+##
+## List of HAProxy backend to be accepted and forwarded to Influxdb. Each
+## backend name is separated by comma.
+##
+## Format: [name],...
+## Default: "", no filter (all backend are accepted).
+##
+## Examples,
+## accept_backend=api_01,api_02
+##
+
+#accept_backend=
+
+##
+## The endpoint for Influxdb HTTP API write, must include database name, and
+## optional authentication in query parameters.
+##
+## Format: http://<hostname|IP-address>:<port>/write?db=<influxdb-name>[&u=username][&p=password]
+## Default: "", empty. If empty the log will not forwarded to Influxdb.
+##
+## Examples,
+##influxdb_api_write=http://127.0.0.1:8086/write?db=haminer&u=haminer&p=haminer
+##influxdb_api_write=http://192.168.56.10:8086/write?db=haminer&u=haminer&p=haminer&precision=ns
+##
+
+#influxdb_api_write=
diff --git a/cmd/haminer/main.go b/cmd/haminer/main.go
new file mode 100644
index 0000000..b457070
--- /dev/null
+++ b/cmd/haminer/main.go
@@ -0,0 +1,74 @@
+// Copyright 2018, M. Shulhan (ms@kilabit.info). All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "strings"
+
+ "github.com/shuLhan/haminer"
+)
+
+const (
+ defLogPrefix = "haminer: "
+ defConfig = "/etc/haminer.conf"
+)
+
+func initConfig() (cfg *haminer.Config) {
+ var (
+ flagConfig, flagListen, flagAcceptBackend string
+ flagInfluxAPIWrite string
+ )
+
+ log.SetPrefix(defLogPrefix)
+
+ cfg = haminer.NewConfig()
+
+ flag.StringVar(&flagConfig, "config", defConfig,
+ "Load configuration from file (default to '/etc/haminer.conf')",
+ )
+ flag.StringVar(&flagListen, haminer.ConfigKeyListen, "",
+ "Listen for HAProxy log using UDP at ADDRESS:PORT",
+ )
+ flag.StringVar(&flagAcceptBackend, haminer.ConfigKeyAcceptBackend, "",
+ "List of accepted backend to be filtered (comma separated)",
+ )
+ flag.StringVar(&flagInfluxAPIWrite, haminer.ConfigKeyInfluxAPIWrite,
+ "",
+ "HTTP API endpoint to write to Influxdb",
+ )
+
+ flag.Parse()
+
+ if len(flagConfig) > 0 {
+ cfg.Load(flagConfig)
+ }
+ if len(flagListen) > 0 {
+ cfg.SetListen(flagListen)
+ }
+ if len(flagAcceptBackend) > 0 {
+ cfg.AcceptBackend = strings.Split(flagAcceptBackend, ",")
+ }
+ if len(flagInfluxAPIWrite) > 0 {
+ cfg.InfluxAPIWrite = flagInfluxAPIWrite
+ }
+
+ return
+}
+
+func main() {
+ cfg := initConfig()
+
+ fmt.Printf("Starting Haminer with config: %+v\n", cfg)
+
+ h := haminer.NewHaminer(cfg)
+
+ err := h.Start()
+ if err != nil {
+ log.Println(err)
+ }
+}
diff --git a/config.go b/config.go
new file mode 100644
index 0000000..9a8ed80
--- /dev/null
+++ b/config.go
@@ -0,0 +1,121 @@
+// Copyright 2018, M. Shulhan (ms@kilabit.info). All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package haminer
+
+import (
+ "bytes"
+ "io/ioutil"
+ "log"
+ "strconv"
+ "strings"
+)
+
+// List of config keys.
+const (
+ ConfigKeyListen = "listen"
+ ConfigKeyAcceptBackend = "accept_backend"
+ ConfigKeyInfluxAPIWrite = "influxdb_api_write"
+)
+
+// List of default config key values.
+const (
+ DefListenAddr = "127.0.0.1"
+ DefListenPort = 5140
+ DefInfluxAPIWrite = "http://127.0.0.1:8086/write?db=haproxy"
+ DefMaxBufferedLogs = 10
+)
+
+//
+// Config define options to create and run Haminer instance.
+//
+type Config struct {
+ // ListenAddr is an IP address where Haminer will bind and receiving
+ // log from HAProxy.
+ ListenAddr string
+ ListenPort int
+
+ // AcceptBackend list of backend to be filtered.
+ AcceptBackend []string
+
+ // InfluxAPIWrite define HTTP API to write to Influxdb.
+ InfluxAPIWrite string
+
+ // MaxBufferedLogs define a number of logs that will be keep in buffer
+ // before being forwarded.
+ MaxBufferedLogs int
+}
+
+//
+// NewConfig will create, initialize, and return new config with defautl
+// values.
+//
+func NewConfig() (cfg *Config) {
+ return &Config{
+ ListenAddr: DefListenAddr,
+ ListenPort: DefListenPort,
+ MaxBufferedLogs: DefMaxBufferedLogs,
+ }
+}
+
+//
+// SetListen will parse `v` value as "addr:port", and set config address and port
+// based on it.
+//
+func (cfg *Config) SetListen(v string) {
+ var err error
+
+ addrPort := strings.Split(v, ":")
+ switch len(addrPort) {
+ case 0:
+ return
+ case 1:
+ cfg.ListenAddr = addrPort[0]
+ case 2:
+ cfg.ListenAddr = addrPort[0]
+ cfg.ListenPort, err = strconv.Atoi(addrPort[1])
+ if err != nil {
+ cfg.ListenPort = DefListenPort
+ }
+ }
+}
+
+//
+// Load will read configuration from file defined by `path`.
+//
+func (cfg *Config) Load(path string) {
+ bb, err := ioutil.ReadFile(path)
+ if err != nil {
+ log.Println(err)
+ return
+ }
+
+ lines := bytes.Split(bb, []byte("\n"))
+
+ for _, line := range lines {
+ if len(line) == 0 {
+ continue
+ }
+ if line[0] == '#' {
+ continue
+ }
+
+ kv := bytes.SplitN(line, []byte("="), 2)
+ if len(kv) != 2 {
+ continue
+ }
+
+ switch string(kv[0]) {
+ case ConfigKeyListen:
+ cfg.SetListen(string(kv[1]))
+ case ConfigKeyAcceptBackend:
+ v := string(bytes.TrimSpace(kv[1]))
+ if len(v) > 0 {
+ cfg.AcceptBackend = strings.Split(v, ",")
+ }
+ case ConfigKeyInfluxAPIWrite:
+ cfg.InfluxAPIWrite = string(kv[1])
+ }
+ }
+}
diff --git a/forwarder.go b/forwarder.go
new file mode 100644
index 0000000..0d59156
--- /dev/null
+++ b/forwarder.go
@@ -0,0 +1,9 @@
+package haminer
+
+//
+// Forwarder define an interface to forward parsed HAProxy log to storage
+// engine.
+//
+type Forwarder interface {
+ Forwards(halogs []*Halog)
+}
diff --git a/halog.go b/halog.go
new file mode 100644
index 0000000..811ec16
--- /dev/null
+++ b/halog.go
@@ -0,0 +1,367 @@
+// Copyright 2018, M. Shulhan (ms@kilabit.info). All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package haminer
+
+import (
+ "bytes"
+ "strconv"
+ "strings"
+ "time"
+)
+
+var (
+ timestampLayout = "2/Jan/2006:15:04:05.000"
+)
+
+//
+// Halog contains the mapping of haproxy HTTP log format to Go struct.
+//
+// Reference: https://cbonte.github.io/haproxy-dconv/1.7/configuration.html#8.2.3
+//
+type Halog struct {
+ Timestamp time.Time
+ ClientIP string
+ ClientPort int32
+ FrontendName string
+ BackendName string
+ ServerName string
+ TimeReq int32
+ TimeWait int32
+ TimeConnect int32
+ TimeRsp int32
+ TimeAll int32
+ HTTPStatus int32
+ BytesRead int64
+ CookieReq string
+ CookieRsp string
+ TermState string
+ ConnActive int32
+ ConnFrontend int32
+ ConnBackend int32
+ ConnServer int32
+ ConnRetries int32
+ QueueServer int32
+ QueueBackend int32
+ HTTPMethod string
+ HTTPURL string
+ HTTPQuery string
+ HTTPProto string
+}
+
+//
+// cleanPrefix will remove `<date-time> <process-name>[pid]: ` prefix (which
+// come from systemd/rsyslog) in input.
+//
+func cleanPrefix(in []byte) bool {
+ start := bytes.IndexByte(in, '[')
+ if start < 0 {
+ return false
+ }
+
+ end := bytes.IndexByte(in[start:], ']')
+ if end < 0 {
+ return false
+ }
+
+ end = start + end + 3
+
+ copy(in[0:], in[end:])
+
+ return true
+}
+
+func parseToString(in []byte, sep byte) (string, bool) {
+ end := bytes.IndexByte(in, sep)
+ if end < 0 {
+ return "", false
+ }
+
+ v := string(in[:end])
+ copy(in, in[end+1:])
+
+ return v, true
+}
+
+func parseToInt32(in []byte, sep byte) (int32, bool) {
+ end := bytes.IndexByte(in, sep)
+ if end < 0 {
+ return 0, false
+ }
+
+ v, err := strconv.Atoi(string(in[:end]))
+ if err != nil {
+ return 0, false
+ }
+
+ copy(in, in[end+1:])
+
+ return int32(v), true
+}
+
+func parseToInt64(in []byte, sep byte) (int64, bool) {
+ end := bytes.IndexByte(in, sep)
+ if end < 0 {
+ return 0, false
+ }
+
+ v, err := strconv.ParseInt(string(in[:end]), 10, 64)
+ if err != nil {
+ return 0, false
+ }
+
+ copy(in, in[end+1:])
+
+ return v, true
+}
+
+func (halog *Halog) parseTimes(in []byte) (ok bool) {
+ halog.TimeReq, ok = parseToInt32(in, '/')
+ if !ok {
+ return
+ }
+
+ halog.TimeWait, ok = parseToInt32(in, '/')
+ if !ok {
+ return
+ }
+
+ halog.TimeConnect, ok = parseToInt32(in, '/')
+ if !ok {
+ return
+ }
+
+ halog.TimeRsp, ok = parseToInt32(in, '/')
+ if !ok {
+ return
+ }
+
+ halog.TimeAll, ok = parseToInt32(in, ' ')
+ if !ok {
+ return
+ }
+
+ return
+}
+
+func (halog *Halog) parseConns(in []byte) (ok bool) {
+ halog.ConnActive, ok = parseToInt32(in, '/')
+ if !ok {
+ return
+ }
+
+ halog.ConnFrontend, ok = parseToInt32(in, '/')
+ if !ok {
+ return
+ }
+
+ halog.ConnBackend, ok = parseToInt32(in, '/')
+ if !ok {
+ return
+ }
+
+ halog.ConnServer, ok = parseToInt32(in, '/')
+ if !ok {
+ return
+ }
+
+ halog.ConnRetries, ok = parseToInt32(in, ' ')
+ if !ok {
+ return
+ }
+
+ return
+}
+
+func (halog *Halog) parseQueue(in []byte) (ok bool) {
+ halog.QueueServer, ok = parseToInt32(in, '/')
+ if !ok {
+ return
+ }
+
+ halog.QueueBackend, ok = parseToInt32(in, ' ')
+
+ return
+}
+
+func (halog *Halog) parseHTTP(in []byte) (ok bool) {
+ halog.HTTPMethod, ok = parseToString(in, ' ')
+ if !ok {
+ return
+ }
+
+ v, ok := parseToString(in, ' ')
+ if !ok {
+ return
+ }
+ urlQuery := strings.SplitN(v, "?", 2)
+ halog.HTTPURL = urlQuery[0]
+ if len(urlQuery) == 2 {
+ halog.HTTPQuery = urlQuery[1]
+ }
+
+ halog.HTTPProto, ok = parseToString(in, '"')
+
+ return
+}
+
+//
+// Parse will parse one line of HAProxy log format into Halog.
+//
+// (1) Remove prefix from systemd/rsyslog
+// (2) parse client IP
+// (3) parse client port
+// (4) parse timestamp, remove '[' and parse until ']'
+// (5) parse frontend name
+// (6) parse backend name
+// (7) parse server name
+// (8) parse times
+// (9) parse HTTP status code
+// (10) parse bytes read
+// (11) parse request cookie
+// (12) parse response cookie
+// (13) parse termination state
+// (14) parse number of connections
+// (15) parse number of queue state
+// (16) parse HTTP
+//
+// nolint: gocyclo
+func (halog *Halog) Parse(in []byte) (ok bool) {
+ var err error
+
+ // (1)
+ ok = cleanPrefix(in)
+ if !ok {
+ return
+ }
+
+ // (2)
+ halog.ClientIP, ok = parseToString(in, ':')
+ if !ok {
+ return
+ }
+
+ // (3)
+ halog.ClientPort, ok = parseToInt32(in, ' ')
+ if !ok {
+ return
+ }
+
+ // (4)
+ in = in[1:]
+ ts, ok := parseToString(in, ']')
+ if !ok {
+ return
+ }
+
+ halog.Timestamp, err = time.Parse(timestampLayout, ts)
+ if err != nil {
+ return false
+ }
+
+ // (5)
+ in = in[1:]
+ halog.FrontendName, ok = parseToString(in, ' ')
+ if !ok {
+ return
+ }
+
+ // (6)
+ halog.BackendName, ok = parseToString(in, '/')
+ if !ok {
+ return
+ }
+
+ // (7)
+ halog.ServerName, ok = parseToString(in, ' ')
+ if !ok {
+ return
+ }
+
+ // (8)
+ ok = halog.parseTimes(in)
+ if !ok {
+ return
+ }
+
+ // (9)
+ halog.HTTPStatus, ok = parseToInt32(in, ' ')
+ if !ok {
+ return
+ }
+
+ // (10)
+ halog.BytesRead, ok = parseToInt64(in, ' ')
+ if !ok {
+ return
+ }
+
+ // (11)
+ halog.CookieReq, ok = parseToString(in, ' ')
+ if !ok {
+ return
+ }
+
+ // (12)
+ halog.CookieRsp, ok = parseToString(in, ' ')
+ if !ok {
+ return
+ }
+
+ // (13)
+ halog.TermState, ok = parseToString(in, ' ')
+ if !ok {
+ return
+ }
+
+ // (14)
+ ok = halog.parseConns(in)
+ if !ok {
+ return
+ }
+
+ // (15)
+ ok = halog.parseQueue(in)
+ if !ok {
+ return
+ }
+
+ // (16)
+ in = in[1:]
+ ok = halog.parseHTTP(in)
+
+ return
+}
+
+//
+// ParseUDPPacket will convert UDP packet (in bytes) to instance of
+// Halog.
+//
+// It will return nil and false if UDP packet is nil, have zero length, or
+// cannot be parsed (rejected).
+//
+func (halog *Halog) ParseUDPPacket(p *UDPPacket) bool {
+ if p == nil {
+ return false
+ }
+ if len(p.Bytes) == 0 {
+ return false
+ }
+
+ var in []byte
+
+ if p.Bytes[0] == '<' {
+ endIdx := bytes.IndexByte(p.Bytes, '>')
+ if endIdx < 0 {
+ return false
+ }
+
+ in = make([]byte, len(p.Bytes))
+ copy(in, p.Bytes[endIdx+1:])
+ } else {
+ in = p.Bytes
+ }
+
+ return halog.Parse(in)
+}
diff --git a/haminer.go b/haminer.go
new file mode 100644
index 0000000..b501a65
--- /dev/null
+++ b/haminer.go
@@ -0,0 +1,178 @@
+// Copyright 2018, M. Shulhan (ms@kilabit.info). All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package haminer
+
+import (
+ "fmt"
+ "log"
+ "net"
+ "os"
+ "os/signal"
+ "syscall"
+)
+
+//
+// Haminer define the log consumer and producer.
+//
+type Haminer struct {
+ cfg *Config
+ udpConn *net.UDPConn
+ isRunning bool
+ chSignal chan os.Signal
+ chHalog chan *Halog
+ ff []Forwarder
+}
+
+//
+// NewHaminer create, initialize, and return new Haminer instance. If config
+// parameter is nil, it will use the default options.
+//
+func NewHaminer(cfg *Config) (h *Haminer) {
+ if cfg == nil {
+ cfg = NewConfig()
+ }
+
+ h = &Haminer{
+ cfg: cfg,
+ chSignal: make(chan os.Signal, 1),
+ chHalog: make(chan *Halog, 30),
+ ff: make([]Forwarder, 0),
+ }
+
+ signal.Notify(h.chSignal, syscall.SIGHUP, syscall.SIGINT,
+ syscall.SIGTERM, syscall.SIGQUIT)
+
+ h.createForwarder()
+
+ return
+}
+
+func (h *Haminer) createForwarder() {
+ if len(h.cfg.InfluxAPIWrite) > 0 {
+ fwder := NewInfluxdbClient(h.cfg.InfluxAPIWrite)
+
+ h.ff = append(h.ff, fwder)
+ }
+}
+
+//
+// Start will listen for UDP packet and start consuming log, parse, and
+// publish it to analytic server.
+//
+func (h *Haminer) Start() (err error) {
+ udpAddr := &net.UDPAddr{
+ IP: net.ParseIP(h.cfg.ListenAddr),
+ Port: h.cfg.ListenPort,
+ }
+
+ h.udpConn, err = net.ListenUDP("udp", udpAddr)
+ if err != nil {
+ return
+ }
+
+ h.isRunning = true
+
+ go h.consume()
+ go h.produce()
+
+ <-h.chSignal
+
+ h.Stop()
+
+ return
+}
+
+//
+// filter will return true if log is accepted; otherwise it will return false.
+//
+func (h *Haminer) filter(halog *Halog) bool {
+ if halog == nil {
+ return false
+ }
+ if len(h.cfg.AcceptBackend) == 0 {
+ return true
+ }
+
+ for _, be := range h.cfg.AcceptBackend {
+ if halog.BackendName == be {
+ return true
+ }
+ }
+
+ return false
+}
+
+func (h *Haminer) consume() {
+ var (
+ err error
+ ok bool
+ p = NewUDPPacket(0)
+ )
+
+ for h.isRunning {
+ _, err = h.udpConn.Read(p.Bytes)
+ if err != nil {
+ continue
+ }
+
+ halog := &Halog{}
+
+ ok = halog.ParseUDPPacket(p)
+ if !ok {
+ continue
+ }
+
+ ok = h.filter(halog)
+ if !ok {
+ continue
+ }
+
+ h.chHalog <- halog
+
+ p.Reset()
+ }
+}
+
+func (h *Haminer) forwards(halogs []*Halog) {
+ for _, fwder := range h.ff {
+ fwder.Forwards(halogs)
+ }
+}
+
+func (h *Haminer) produce() {
+ halogs := make([]*Halog, 0)
+
+ for h.isRunning {
+ halog, ok := <-h.chHalog
+ if !ok {
+ continue
+ }
+
+ halogs = append(halogs, halog)
+
+ if len(halogs) >= h.cfg.MaxBufferedLogs {
+ h.forwards(halogs)
+ halogs = make([]*Halog, 0)
+ }
+ }
+}
+
+//
+// Stop will close UDP server and clear all resources.
+//
+func (h *Haminer) Stop() {
+ h.isRunning = false
+
+ signal.Stop(h.chSignal)
+
+ if h.udpConn != nil {
+ err := h.udpConn.Close()
+ if err != nil {
+ log.Println(err)
+ }
+ }
+
+ fmt.Println("Stopped")
+}
diff --git a/influxdb.go b/influxdb.go
new file mode 100644
index 0000000..9e479e1
--- /dev/null
+++ b/influxdb.go
@@ -0,0 +1,142 @@
+package haminer
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "net/http"
+ "os"
+)
+
+const (
+ envHostname = "HOSTNAME"
+ defHostname = "localhost"
+ defContentType = "application/octet-stream"
+ influxdbFormat = "" +
+ // measurements
+ "haproxy," +
+ // tags
+ "host=%q," +
+ "frontend=%q,backend=%q,server=%q" +
+ " " +
+ // fields
+ "http_proto=%q,http_method=%q,http_url=%q," +
+ "http_query=\"%s\",http_status=%d," +
+ "term_state=%q," +
+ "client_ip=%q,client_port=%d," +
+ "time_req=%d,time_wait=%d,time_connect=%d," +
+ "time_rsp=%d,time_all=%d," +
+ "conn_active=%d,conn_frontend=%d,conn_backend=%d," +
+ "conn_server=%d,conn_retries=%d," +
+ "queue_server=%d,queue_backend=%d," +
+ "bytes_read=%d" +
+ " " +
+ // timestamp
+ "%d\n"
+)
+
+//
+// InfluxdbClient contains HTTP connection for writing logs to Influxdb.
+//
+type InfluxdbClient struct {
+ conn *http.Client
+ apiWrite string
+ hostname string
+ buf bytes.Buffer
+}
+
+//
+// NewInfluxdbClient will create, initialize, and return new Influxdb client.
+//
+func NewInfluxdbClient(apiWrite string) (cl *InfluxdbClient) {
+ cl = &InfluxdbClient{
+ apiWrite: apiWrite,
+ }
+
+ cl.initHostname()
+ cl.initConn()
+
+ return
+}
+
+func (cl *InfluxdbClient) initHostname() {
+ var err error
+
+ cl.hostname, err = os.Hostname()
+ if err != nil {
+ cl.hostname = os.Getenv(envHostname)
+ }
+ if len(cl.hostname) == 0 {
+ cl.hostname = defHostname
+ }
+}
+
+func (cl *InfluxdbClient) initConn() {
+ tr := &http.Transport{}
+
+ cl.conn = &http.Client{
+ Transport: tr,
+ }
+}
+
+//
+// Forwards implement the Forwarder interface. It will write all logs to
+// Influxdb.
+//
+func (cl *InfluxdbClient) Forwards(halogs []*Halog) {
+ lsrc := "InfluxdbClient.Forwards"
+ cl.write(halogs)
+
+ rsp, err := cl.conn.Post(cl.apiWrite, defContentType, &cl.buf)
+ if err != nil {
+ log.Printf("InfluxdbClient.Forwards: %s", err)
+ }
+
+ if rsp.StatusCode >= 200 || rsp.StatusCode <= 299 {
+ return
+ }
+
+ defer func() {
+ errClose := rsp.Body.Close()
+ if errClose != nil {
+ log.Printf("%s: Body.Close: %s\n", lsrc, err)
+ }
+ }()
+
+ rspBody, err := ioutil.ReadAll(rsp.Body)
+ if err != nil {
+ log.Printf("%s: ioutil.ReadAll: %s", lsrc, err)
+ }
+
+ fmt.Printf("%s: response: %d %s\n", lsrc, rsp.StatusCode, rspBody)
+}
+
+func (cl *InfluxdbClient) write(halogs []*Halog) {
+ var err error
+
+ cl.buf.Reset()
+
+ for _, l := range halogs {
+ _, err = fmt.Fprintf(&cl.buf, influxdbFormat,
+ // tags
+ cl.hostname,
+ l.FrontendName, l.BackendName, l.ServerName,
+ // fields
+ l.HTTPProto, l.HTTPMethod, l.HTTPURL,
+ l.HTTPQuery, l.HTTPStatus,
+ l.TermState,
+ l.ClientIP, l.ClientPort,
+ l.TimeReq, l.TimeWait, l.TimeConnect,
+ l.TimeRsp, l.TimeAll,
+ l.ConnActive, l.ConnFrontend, l.ConnBackend,
+ l.ConnServer, l.ConnRetries,
+ l.QueueServer, l.QueueBackend,
+ l.BytesRead,
+ l.Timestamp.UnixNano(),
+ )
+ if err != nil {
+ log.Printf("InfluxdbClient.write: %s", err)
+ }
+ }
+}
diff --git a/udppacket.go b/udppacket.go
new file mode 100644
index 0000000..c7b017b
--- /dev/null
+++ b/udppacket.go
@@ -0,0 +1,41 @@
+// Copyright 2018, M. Shulhan (ms@kilabit.info). All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package haminer
+
+const (
+ defSize = 1024
+)
+
+//
+// UDPPacket wrap the slice of bytes for easy manipulation.
+//
+type UDPPacket struct {
+ Bytes []byte
+}
+
+//
+// NewUDPPacket will create and initialize UDP packet.
+//
+func NewUDPPacket(size uint32) (p *UDPPacket) {
+ if size <= 0 {
+ size = defSize
+ }
+ p = &UDPPacket{
+ Bytes: make([]byte, size),
+ }
+
+ return
+}
+
+//
+// Reset will set the content of packet data to zero, so it can be used agains
+// on Read().
+//
+func (p *UDPPacket) Reset() {
+ p.Bytes[0] = 0
+ for x := 1; x < len(p.Bytes); x *= 2 {
+ copy(p.Bytes[x:], p.Bytes[:x])
+ }
+}