aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/haminer/haminer.conf50
-rw-r--r--cmd/haminer/main.go51
-rw-r--r--config.go13
-rw-r--r--config_test.go28
-rw-r--r--haminer.go12
-rw-r--r--influxd_config.go91
-rw-r--r--influxdb.go52
-rw-r--r--testdata/haminer.conf5
8 files changed, 205 insertions, 97 deletions
diff --git a/cmd/haminer/haminer.conf b/cmd/haminer/haminer.conf
index 5159194..e355e44 100644
--- a/cmd/haminer/haminer.conf
+++ b/cmd/haminer/haminer.conf
@@ -55,23 +55,6 @@
#capture_request_header=
##
-## The endpoint for Influxdb HTTP API write, must include database name, and
-## optional authentication in query parameters.
-##
-## Format
-##
-## influxdb_api_write = http://<hostname|IP-address>:<port>/write?db=<influxdb-name>[&u=username][&p=password]
-##
-## Default: "", empty. If empty the log will not forwarded to Influxdb.
-##
-## Examples
-##
-## influxdb_api_write=http://127.0.0.1:8086/write?db=haminer&u=haminer&p=haminer
-##
-
-#influxdb_api_write=
-
-##
## Duration, in seconds, when the logs will be forwarded.
##
## Format
@@ -119,3 +102,36 @@
##
[preprocess "tag"]
#http_url =
+
+[forwarder "influxd"]
+
+## The version of influxd to forward the log.
+## This option affect on which HTTP API will be used later.
+## Valid values are "v1" or "v2" (default).
+#version = v2
+
+## The address of influxd.
+## If its empty, the logs will not forwarded.
+##
+## Format
+##
+## http://<hostname|IP-address>:<port>
+##
+#url =
+
+## The destination organization for writes.
+## InfluxDB writes all points in the batch to this organization.
+## For v2, this field is required.
+#org =
+
+## The bucket name where logs will be written.
+## For v1, this is equal to database name ("db" field in query parameter).
+## This field is optional, default to "haproxy"
+#bucket = haproxy
+
+## Authentication for v1.
+#user =
+#password =
+
+## Authorization for v2.
+#token =
diff --git a/cmd/haminer/main.go b/cmd/haminer/main.go
index 2583d20..7c7ac38 100644
--- a/cmd/haminer/main.go
+++ b/cmd/haminer/main.go
@@ -8,7 +8,6 @@ import (
"flag"
"fmt"
"log"
- "strings"
"github.com/shuLhan/haminer"
)
@@ -18,60 +17,22 @@ const (
defConfig = "/etc/haminer.conf"
)
-func initConfig() (cfg *haminer.Config, err error) {
+func main() {
var (
- flagConfig string
- flagListen string
- flagAcceptBackend string
- flagInfluxAPIWrite string
+ cfg *haminer.Config
+ err error
+ flagConfig string
)
log.SetPrefix(defLogPrefix)
cfg = haminer.NewConfig()
- flag.StringVar(&flagConfig, "config", defConfig,
- "Load configuration from file (default to '/etc/haminer.conf')",
- )
- flag.StringVar(&flagListen, haminer.ConfigKeyListen, "",
- "Listen for HAProxy log using UDP at ADDRESS:PORT",
- )
- flag.StringVar(&flagAcceptBackend, haminer.ConfigKeyAcceptBackend, "",
- "List of accepted backend to be filtered (comma separated)",
- )
- flag.StringVar(&flagInfluxAPIWrite, haminer.ConfigKeyInfluxAPIWrite,
- "",
- "HTTP API endpoint to write to Influxdb",
- )
+ flag.StringVar(&flagConfig, `config`, defConfig, `Path to configuration`)
flag.Parse()
- if len(flagConfig) > 0 {
- err = cfg.Load(flagConfig)
- if err != nil {
- return nil, err
- }
- }
- if len(flagListen) > 0 {
- cfg.SetListen(flagListen)
- }
- if len(flagAcceptBackend) > 0 {
- cfg.AcceptBackend = strings.Split(flagAcceptBackend, ",")
- }
- if len(flagInfluxAPIWrite) > 0 {
- cfg.InfluxAPIWrite = flagInfluxAPIWrite
- }
-
- return cfg, nil
-}
-
-func main() {
- var (
- cfg *haminer.Config
- err error
- )
-
- cfg, err = initConfig()
+ err = cfg.Load(flagConfig)
if err != nil {
log.Fatal(err)
}
diff --git a/config.go b/config.go
index 79d08f5..857a392 100644
--- a/config.go
+++ b/config.go
@@ -31,6 +31,8 @@ const (
// Config define options to create and run Haminer instance.
type Config struct {
+ Influxd InfluxdConfig
+
// Listen is the address where Haminer will bind and receiving
// log from HAProxy.
Listen string `ini:"haminer::listen"`
@@ -44,9 +46,6 @@ type Config struct {
// output.
RequestHeaders []string `ini:"haminer::capture_request_header"`
- // InfluxAPIWrite define HTTP API to write to Influxdb.
- InfluxAPIWrite string `ini:"haminer::influxdb_api_write"`
-
HttpUrl []string `ini:"preprocess:tag:http_url"`
// retags contains list of pre-processing rules for tag.
@@ -99,6 +98,11 @@ func (cfg *Config) Load(path string) (err error) {
return fmt.Errorf(`%s: %w`, logp, err)
}
+ err = cfg.Influxd.init()
+ if err != nil {
+ return fmt.Errorf(`%s: %w`, logp, err)
+ }
+
return nil
}
@@ -143,6 +147,9 @@ func (cfg *Config) parsePreprocessTag() (err error) {
if err != nil {
return fmt.Errorf(`%s: %w`, logp, err)
}
+ if retag == nil {
+ continue
+ }
cfg.retags = append(cfg.retags, retag)
}
diff --git a/config_test.go b/config_test.go
index 775743b..a836d86 100644
--- a/config_test.go
+++ b/config_test.go
@@ -36,9 +36,10 @@ func TestNewConfig(t *testing.T) {
func TestLoad(t *testing.T) {
type testCase struct {
- exp *Config
- desc string
- in string
+ exp *Config
+ desc string
+ in string
+ expError string
}
var cases = []testCase{{
@@ -49,17 +50,20 @@ func TestLoad(t *testing.T) {
ForwardInterval: defForwardInterval,
},
}, {
- desc: "With path not exist",
- in: "testdata/notexist.conf",
- exp: &Config{
- listenAddr: defListenAddr,
- listenPort: defListenPort,
- ForwardInterval: defForwardInterval,
- },
+ desc: `With path not exist`,
+ in: `testdata/notexist.conf`,
+ expError: `Load: open testdata/notexist.conf: no such file or directory`,
}, {
desc: "With path exist",
in: "testdata/haminer.conf",
exp: &Config{
+ Influxd: InfluxdConfig{
+ Version: `v2`,
+ Url: `http://127.0.0.1:8086`,
+ Org: `kilabit.info`,
+ Bucket: `haproxy`,
+ apiWrite: `http://127.0.0.1:8086/api/v2/write?bucket=haproxy&org=kilabit.info&precision=ns`,
+ },
Listen: `0.0.0.0:8080`,
listenAddr: `0.0.0.0`,
listenPort: 8080,
@@ -72,7 +76,6 @@ func TestLoad(t *testing.T) {
"host",
"referrer",
},
- InfluxAPIWrite: "http://127.0.0.1:8086/write",
HttpUrl: []string{
`/[0-9]+-\w+-\w+-\w+-\w+-\w+ => /-`,
`/\w+-\w+-\w+-\w+-\w+ => /-`,
@@ -106,7 +109,8 @@ func TestLoad(t *testing.T) {
got = NewConfig()
err = got.Load(c.in)
if err != nil {
- t.Fatal(err)
+ test.Assert(t, `error`, c.expError, err.Error())
+ continue
}
test.Assert(t, `Config`, c.exp, got)
diff --git a/haminer.go b/haminer.go
index 4864a97..10209d3 100644
--- a/haminer.go
+++ b/haminer.go
@@ -47,11 +47,15 @@ func NewHaminer(cfg *Config) (h *Haminer) {
}
func (h *Haminer) createForwarder() {
- if len(h.cfg.InfluxAPIWrite) > 0 {
- fwder := NewInfluxdbClient(h.cfg.InfluxAPIWrite)
-
- h.ff = append(h.ff, fwder)
+ if len(h.cfg.Influxd.Url) == 0 {
+ return
}
+
+ var (
+ fwder = NewInfluxdbClient(&h.cfg.Influxd)
+ )
+
+ h.ff = append(h.ff, fwder)
}
// Start will listen for UDP packet and start consuming log, parse, and
diff --git a/influxd_config.go b/influxd_config.go
new file mode 100644
index 0000000..e2fdba2
--- /dev/null
+++ b/influxd_config.go
@@ -0,0 +1,91 @@
+package haminer
+
+import (
+ "errors"
+ "net/url"
+)
+
+const (
+ defInfluxdBucket = `haproxy`
+
+ influxdVersion1 = `v1`
+ influxdVersion2 = `v2`
+)
+
+// InfluxdConfig contains configuration for forwarding the logs to Influxd.
+type InfluxdConfig struct {
+ Version string `ini:"forwarder:influxd:version"`
+
+ Url string `ini:"forwarder:influxd:url"`
+ apiWrite string
+ headerToken string
+
+ Bucket string `ini:"forwarder:influxd:bucket"`
+
+ // Fields for HTTP API v1.
+
+ User string `ini:"forwarder:influxd:user"`
+ Pass string `ini:"forwarder:influxd:pass"`
+
+ // Fields for HTTP API v2.
+
+ Org string `ini:"forwarder:influxd:org"`
+ Token string `ini:"forwarder:influxd:token"`
+}
+
+// init check, validate, and initialize the configuration values.
+func (cfg *InfluxdConfig) init() (err error) {
+ if len(cfg.Url) == 0 {
+ return
+ }
+
+ switch cfg.Version {
+ case influxdVersion1:
+ case influxdVersion2:
+ default:
+ cfg.Version = influxdVersion2
+ }
+
+ if len(cfg.Bucket) == 0 {
+ cfg.Bucket = defInfluxdBucket
+ }
+
+ var (
+ q = url.Values{}
+
+ url *url.URL
+ )
+
+ url, err = url.Parse(cfg.Url)
+ if err != nil {
+ return err
+ }
+
+ q.Set(`precision`, `ns`)
+
+ if cfg.Version == influxdVersion1 {
+ url.Path = `/write`
+
+ q.Set(`db`, cfg.Bucket)
+ if len(cfg.User) > 0 && len(cfg.Pass) > 0 {
+ q.Set(`u`, cfg.User)
+ q.Set(`p`, cfg.Pass)
+ }
+ } else {
+ cfg.headerToken = `Token ` + cfg.Token
+ url.Path = `/api/v2/write`
+
+ if len(cfg.Org) == 0 {
+ return errors.New(`empty organization field`)
+ }
+
+ q.Set(`org`, cfg.Org)
+ q.Set(`bucket`, cfg.Bucket)
+ }
+
+ url.RawQuery = q.Encode()
+
+ cfg.apiWrite = url.String()
+
+ return nil
+}
diff --git a/influxdb.go b/influxdb.go
index c5c1622..906f8c0 100644
--- a/influxdb.go
+++ b/influxdb.go
@@ -37,15 +37,15 @@ const (
// InfluxdbClient contains HTTP connection for writing logs to Influxdb.
type InfluxdbClient struct {
conn *http.Client
- apiWrite string
+ cfg *InfluxdConfig
hostname string
buf bytes.Buffer
}
// NewInfluxdbClient will create, initialize, and return new Influxdb client.
-func NewInfluxdbClient(apiWrite string) (cl *InfluxdbClient) {
+func NewInfluxdbClient(cfg *InfluxdConfig) (cl *InfluxdbClient) {
cl = &InfluxdbClient{
- apiWrite: apiWrite,
+ cfg: cfg,
}
cl.initHostname()
@@ -77,36 +77,58 @@ func (cl *InfluxdbClient) initConn() {
// Forwards implement the Forwarder interface. It will write all logs to
// Influxdb.
func (cl *InfluxdbClient) Forwards(halogs []*Halog) {
- lsrc := "InfluxdbClient.Forwards"
- err := cl.write(halogs)
+ var (
+ logp = `Forwards`
+
+ httpReq *http.Request
+ httpRes *http.Response
+ err error
+ )
+
+ err = cl.write(halogs)
+ if err != nil {
+ log.Printf(`%s: %s`, logp, err)
+ return
+ }
+
+ httpReq, err = http.NewRequest(http.MethodPost, cl.cfg.apiWrite, &cl.buf)
if err != nil {
- log.Printf("InfluxdbClient.write: %s", err)
+ log.Printf(`%s: %s`, logp, err)
return
}
- rsp, err := cl.conn.Post(cl.apiWrite, defContentType, &cl.buf)
+ httpReq.Header.Set(`Accept`, `application/json`)
+
+ if cl.cfg.Version == influxdVersion1 {
+ httpReq.Header.Set(`Content-Type`, defContentType)
+ } else {
+ httpReq.Header.Set(`Authorization`, cl.cfg.headerToken)
+ httpReq.Header.Set(`Content-Type`, `text/plain; charset=utf-8`)
+ }
+
+ httpRes, err = cl.conn.Do(httpReq)
if err != nil {
- log.Printf("InfluxdbClient.Forwards: %s", err)
+ log.Printf(`%s: %s`, logp, err)
return
}
- if rsp.StatusCode >= 200 || rsp.StatusCode <= 299 {
+ if httpRes.StatusCode >= 200 || httpRes.StatusCode <= 299 {
return
}
defer func() {
- errClose := rsp.Body.Close()
- if errClose != nil {
- log.Printf("%s: Body.Close: %s\n", lsrc, err)
+ err = httpRes.Body.Close()
+ if err != nil {
+ log.Printf(`%s: Body.Close: %s`, logp, err)
}
}()
- rspBody, err := ioutil.ReadAll(rsp.Body)
+ rspBody, err := ioutil.ReadAll(httpRes.Body)
if err != nil {
- log.Printf("%s: ioutil.ReadAll: %s", lsrc, err)
+ log.Printf(`%s: ioutil.ReadAll: %s`, logp, err)
}
- fmt.Printf("%s: response: %d %s\n", lsrc, rsp.StatusCode, rspBody)
+ fmt.Printf(`%s: response: %d %s\n`, logp, httpRes.StatusCode, rspBody)
}
func (cl *InfluxdbClient) write(halogs []*Halog) (err error) {
diff --git a/testdata/haminer.conf b/testdata/haminer.conf
index e0b1d70..1968bd2 100644
--- a/testdata/haminer.conf
+++ b/testdata/haminer.conf
@@ -4,7 +4,6 @@ accept_backend = a
accept_backend = b
capture_request_header = host
capture_request_header = referrer
-influxdb_api_write = http://127.0.0.1:8086/write
forward_interval = 20s
[preprocess "tag"]
@@ -13,3 +12,7 @@ http_url = /[0-9]+-\\w+-\\w+-\\w+-\\w+-\\w+ => /-
http_url = /\\w+-\\w+-\\w+-\\w+-\\w+ => /-
http_url = /[0-9]+ => /-
+
+[forwarder "influxd"]
+url = http://127.0.0.1:8086
+org = kilabit.info