diff options
| author | Shulhan <ms@kilabit.info> | 2022-08-15 20:13:55 +0700 |
|---|---|---|
| committer | Shulhan <ms@kilabit.info> | 2022-08-15 20:13:55 +0700 |
| commit | 8a6eaebb36c0761b21398e72d934c072ac67fa7f (patch) | |
| tree | 35150d19108888fb2c85b6d4743411121de4e493 | |
| parent | 2965b17ccc24abde2346c20ee1f9384ae6e12f20 (diff) | |
| download | haminer-8a6eaebb36c0761b21398e72d934c072ac67fa7f.tar.xz | |
all: add support for influxd API v2
This changes replace the "influxdb_api_write" with new section
`[forwarder "influxd"]`.
The section contains version, url, org, bucket, user, password, and
token.
The version field define the API version to be used when writing log
to Influxd.
| -rw-r--r-- | cmd/haminer/haminer.conf | 50 | ||||
| -rw-r--r-- | cmd/haminer/main.go | 51 | ||||
| -rw-r--r-- | config.go | 13 | ||||
| -rw-r--r-- | config_test.go | 28 | ||||
| -rw-r--r-- | haminer.go | 12 | ||||
| -rw-r--r-- | influxd_config.go | 91 | ||||
| -rw-r--r-- | influxdb.go | 52 | ||||
| -rw-r--r-- | testdata/haminer.conf | 5 |
8 files changed, 205 insertions, 97 deletions
diff --git a/cmd/haminer/haminer.conf b/cmd/haminer/haminer.conf index 5159194..e355e44 100644 --- a/cmd/haminer/haminer.conf +++ b/cmd/haminer/haminer.conf @@ -55,23 +55,6 @@ #capture_request_header= ## -## The endpoint for Influxdb HTTP API write, must include database name, and -## optional authentication in query parameters. -## -## Format -## -## influxdb_api_write = http://<hostname|IP-address>:<port>/write?db=<influxdb-name>[&u=username][&p=password] -## -## Default: "", empty. If empty the log will not forwarded to Influxdb. -## -## Examples -## -## influxdb_api_write=http://127.0.0.1:8086/write?db=haminer&u=haminer&p=haminer -## - -#influxdb_api_write= - -## ## Duration, in seconds, when the logs will be forwarded. ## ## Format @@ -119,3 +102,36 @@ ## [preprocess "tag"] #http_url = + +[forwarder "influxd"] + +## The version of influxd to forward the log. +## This option affect on which HTTP API will be used later. +## Valid values are "v1" or "v2" (default). +#version = v2 + +## The address of influxd. +## If its empty, the logs will not forwarded. +## +## Format +## +## http://<hostname|IP-address>:<port> +## +#url = + +## The destination organization for writes. +## InfluxDB writes all points in the batch to this organization. +## For v2, this field is required. +#org = + +## The bucket name where logs will be written. +## For v1, this is equal to database name ("db" field in query parameter). +## This field is optional, default to "haproxy" +#bucket = haproxy + +## Authentication for v1. +#user = +#password = + +## Authorization for v2. +#token = diff --git a/cmd/haminer/main.go b/cmd/haminer/main.go index 2583d20..7c7ac38 100644 --- a/cmd/haminer/main.go +++ b/cmd/haminer/main.go @@ -8,7 +8,6 @@ import ( "flag" "fmt" "log" - "strings" "github.com/shuLhan/haminer" ) @@ -18,60 +17,22 @@ const ( defConfig = "/etc/haminer.conf" ) -func initConfig() (cfg *haminer.Config, err error) { +func main() { var ( - flagConfig string - flagListen string - flagAcceptBackend string - flagInfluxAPIWrite string + cfg *haminer.Config + err error + flagConfig string ) log.SetPrefix(defLogPrefix) cfg = haminer.NewConfig() - flag.StringVar(&flagConfig, "config", defConfig, - "Load configuration from file (default to '/etc/haminer.conf')", - ) - flag.StringVar(&flagListen, haminer.ConfigKeyListen, "", - "Listen for HAProxy log using UDP at ADDRESS:PORT", - ) - flag.StringVar(&flagAcceptBackend, haminer.ConfigKeyAcceptBackend, "", - "List of accepted backend to be filtered (comma separated)", - ) - flag.StringVar(&flagInfluxAPIWrite, haminer.ConfigKeyInfluxAPIWrite, - "", - "HTTP API endpoint to write to Influxdb", - ) + flag.StringVar(&flagConfig, `config`, defConfig, `Path to configuration`) flag.Parse() - if len(flagConfig) > 0 { - err = cfg.Load(flagConfig) - if err != nil { - return nil, err - } - } - if len(flagListen) > 0 { - cfg.SetListen(flagListen) - } - if len(flagAcceptBackend) > 0 { - cfg.AcceptBackend = strings.Split(flagAcceptBackend, ",") - } - if len(flagInfluxAPIWrite) > 0 { - cfg.InfluxAPIWrite = flagInfluxAPIWrite - } - - return cfg, nil -} - -func main() { - var ( - cfg *haminer.Config - err error - ) - - cfg, err = initConfig() + err = cfg.Load(flagConfig) if err != nil { log.Fatal(err) } @@ -31,6 +31,8 @@ const ( // Config define options to create and run Haminer instance. type Config struct { + Influxd InfluxdConfig + // Listen is the address where Haminer will bind and receiving // log from HAProxy. Listen string `ini:"haminer::listen"` @@ -44,9 +46,6 @@ type Config struct { // output. RequestHeaders []string `ini:"haminer::capture_request_header"` - // InfluxAPIWrite define HTTP API to write to Influxdb. - InfluxAPIWrite string `ini:"haminer::influxdb_api_write"` - HttpUrl []string `ini:"preprocess:tag:http_url"` // retags contains list of pre-processing rules for tag. @@ -99,6 +98,11 @@ func (cfg *Config) Load(path string) (err error) { return fmt.Errorf(`%s: %w`, logp, err) } + err = cfg.Influxd.init() + if err != nil { + return fmt.Errorf(`%s: %w`, logp, err) + } + return nil } @@ -143,6 +147,9 @@ func (cfg *Config) parsePreprocessTag() (err error) { if err != nil { return fmt.Errorf(`%s: %w`, logp, err) } + if retag == nil { + continue + } cfg.retags = append(cfg.retags, retag) } diff --git a/config_test.go b/config_test.go index 775743b..a836d86 100644 --- a/config_test.go +++ b/config_test.go @@ -36,9 +36,10 @@ func TestNewConfig(t *testing.T) { func TestLoad(t *testing.T) { type testCase struct { - exp *Config - desc string - in string + exp *Config + desc string + in string + expError string } var cases = []testCase{{ @@ -49,17 +50,20 @@ func TestLoad(t *testing.T) { ForwardInterval: defForwardInterval, }, }, { - desc: "With path not exist", - in: "testdata/notexist.conf", - exp: &Config{ - listenAddr: defListenAddr, - listenPort: defListenPort, - ForwardInterval: defForwardInterval, - }, + desc: `With path not exist`, + in: `testdata/notexist.conf`, + expError: `Load: open testdata/notexist.conf: no such file or directory`, }, { desc: "With path exist", in: "testdata/haminer.conf", exp: &Config{ + Influxd: InfluxdConfig{ + Version: `v2`, + Url: `http://127.0.0.1:8086`, + Org: `kilabit.info`, + Bucket: `haproxy`, + apiWrite: `http://127.0.0.1:8086/api/v2/write?bucket=haproxy&org=kilabit.info&precision=ns`, + }, Listen: `0.0.0.0:8080`, listenAddr: `0.0.0.0`, listenPort: 8080, @@ -72,7 +76,6 @@ func TestLoad(t *testing.T) { "host", "referrer", }, - InfluxAPIWrite: "http://127.0.0.1:8086/write", HttpUrl: []string{ `/[0-9]+-\w+-\w+-\w+-\w+-\w+ => /-`, `/\w+-\w+-\w+-\w+-\w+ => /-`, @@ -106,7 +109,8 @@ func TestLoad(t *testing.T) { got = NewConfig() err = got.Load(c.in) if err != nil { - t.Fatal(err) + test.Assert(t, `error`, c.expError, err.Error()) + continue } test.Assert(t, `Config`, c.exp, got) @@ -47,11 +47,15 @@ func NewHaminer(cfg *Config) (h *Haminer) { } func (h *Haminer) createForwarder() { - if len(h.cfg.InfluxAPIWrite) > 0 { - fwder := NewInfluxdbClient(h.cfg.InfluxAPIWrite) - - h.ff = append(h.ff, fwder) + if len(h.cfg.Influxd.Url) == 0 { + return } + + var ( + fwder = NewInfluxdbClient(&h.cfg.Influxd) + ) + + h.ff = append(h.ff, fwder) } // Start will listen for UDP packet and start consuming log, parse, and diff --git a/influxd_config.go b/influxd_config.go new file mode 100644 index 0000000..e2fdba2 --- /dev/null +++ b/influxd_config.go @@ -0,0 +1,91 @@ +package haminer + +import ( + "errors" + "net/url" +) + +const ( + defInfluxdBucket = `haproxy` + + influxdVersion1 = `v1` + influxdVersion2 = `v2` +) + +// InfluxdConfig contains configuration for forwarding the logs to Influxd. +type InfluxdConfig struct { + Version string `ini:"forwarder:influxd:version"` + + Url string `ini:"forwarder:influxd:url"` + apiWrite string + headerToken string + + Bucket string `ini:"forwarder:influxd:bucket"` + + // Fields for HTTP API v1. + + User string `ini:"forwarder:influxd:user"` + Pass string `ini:"forwarder:influxd:pass"` + + // Fields for HTTP API v2. + + Org string `ini:"forwarder:influxd:org"` + Token string `ini:"forwarder:influxd:token"` +} + +// init check, validate, and initialize the configuration values. +func (cfg *InfluxdConfig) init() (err error) { + if len(cfg.Url) == 0 { + return + } + + switch cfg.Version { + case influxdVersion1: + case influxdVersion2: + default: + cfg.Version = influxdVersion2 + } + + if len(cfg.Bucket) == 0 { + cfg.Bucket = defInfluxdBucket + } + + var ( + q = url.Values{} + + url *url.URL + ) + + url, err = url.Parse(cfg.Url) + if err != nil { + return err + } + + q.Set(`precision`, `ns`) + + if cfg.Version == influxdVersion1 { + url.Path = `/write` + + q.Set(`db`, cfg.Bucket) + if len(cfg.User) > 0 && len(cfg.Pass) > 0 { + q.Set(`u`, cfg.User) + q.Set(`p`, cfg.Pass) + } + } else { + cfg.headerToken = `Token ` + cfg.Token + url.Path = `/api/v2/write` + + if len(cfg.Org) == 0 { + return errors.New(`empty organization field`) + } + + q.Set(`org`, cfg.Org) + q.Set(`bucket`, cfg.Bucket) + } + + url.RawQuery = q.Encode() + + cfg.apiWrite = url.String() + + return nil +} diff --git a/influxdb.go b/influxdb.go index c5c1622..906f8c0 100644 --- a/influxdb.go +++ b/influxdb.go @@ -37,15 +37,15 @@ const ( // InfluxdbClient contains HTTP connection for writing logs to Influxdb. type InfluxdbClient struct { conn *http.Client - apiWrite string + cfg *InfluxdConfig hostname string buf bytes.Buffer } // NewInfluxdbClient will create, initialize, and return new Influxdb client. -func NewInfluxdbClient(apiWrite string) (cl *InfluxdbClient) { +func NewInfluxdbClient(cfg *InfluxdConfig) (cl *InfluxdbClient) { cl = &InfluxdbClient{ - apiWrite: apiWrite, + cfg: cfg, } cl.initHostname() @@ -77,36 +77,58 @@ func (cl *InfluxdbClient) initConn() { // Forwards implement the Forwarder interface. It will write all logs to // Influxdb. func (cl *InfluxdbClient) Forwards(halogs []*Halog) { - lsrc := "InfluxdbClient.Forwards" - err := cl.write(halogs) + var ( + logp = `Forwards` + + httpReq *http.Request + httpRes *http.Response + err error + ) + + err = cl.write(halogs) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + + httpReq, err = http.NewRequest(http.MethodPost, cl.cfg.apiWrite, &cl.buf) if err != nil { - log.Printf("InfluxdbClient.write: %s", err) + log.Printf(`%s: %s`, logp, err) return } - rsp, err := cl.conn.Post(cl.apiWrite, defContentType, &cl.buf) + httpReq.Header.Set(`Accept`, `application/json`) + + if cl.cfg.Version == influxdVersion1 { + httpReq.Header.Set(`Content-Type`, defContentType) + } else { + httpReq.Header.Set(`Authorization`, cl.cfg.headerToken) + httpReq.Header.Set(`Content-Type`, `text/plain; charset=utf-8`) + } + + httpRes, err = cl.conn.Do(httpReq) if err != nil { - log.Printf("InfluxdbClient.Forwards: %s", err) + log.Printf(`%s: %s`, logp, err) return } - if rsp.StatusCode >= 200 || rsp.StatusCode <= 299 { + if httpRes.StatusCode >= 200 || httpRes.StatusCode <= 299 { return } defer func() { - errClose := rsp.Body.Close() - if errClose != nil { - log.Printf("%s: Body.Close: %s\n", lsrc, err) + err = httpRes.Body.Close() + if err != nil { + log.Printf(`%s: Body.Close: %s`, logp, err) } }() - rspBody, err := ioutil.ReadAll(rsp.Body) + rspBody, err := ioutil.ReadAll(httpRes.Body) if err != nil { - log.Printf("%s: ioutil.ReadAll: %s", lsrc, err) + log.Printf(`%s: ioutil.ReadAll: %s`, logp, err) } - fmt.Printf("%s: response: %d %s\n", lsrc, rsp.StatusCode, rspBody) + fmt.Printf(`%s: response: %d %s\n`, logp, httpRes.StatusCode, rspBody) } func (cl *InfluxdbClient) write(halogs []*Halog) (err error) { diff --git a/testdata/haminer.conf b/testdata/haminer.conf index e0b1d70..1968bd2 100644 --- a/testdata/haminer.conf +++ b/testdata/haminer.conf @@ -4,7 +4,6 @@ accept_backend = a accept_backend = b capture_request_header = host capture_request_header = referrer -influxdb_api_write = http://127.0.0.1:8086/write forward_interval = 20s [preprocess "tag"] @@ -13,3 +12,7 @@ http_url = /[0-9]+-\\w+-\\w+-\\w+-\\w+-\\w+ => /- http_url = /\\w+-\\w+-\\w+-\\w+-\\w+ => /- http_url = /[0-9]+ => /- + +[forwarder "influxd"] +url = http://127.0.0.1:8086 +org = kilabit.info |
