diff options
| author | Shulhan <ms@kilabit.info> | 2022-08-15 20:13:55 +0700 |
|---|---|---|
| committer | Shulhan <ms@kilabit.info> | 2022-08-15 20:13:55 +0700 |
| commit | 8a6eaebb36c0761b21398e72d934c072ac67fa7f (patch) | |
| tree | 35150d19108888fb2c85b6d4743411121de4e493 /influxdb.go | |
| parent | 2965b17ccc24abde2346c20ee1f9384ae6e12f20 (diff) | |
| download | haminer-8a6eaebb36c0761b21398e72d934c072ac67fa7f.tar.xz | |
all: add support for influxd API v2
This changes replace the "influxdb_api_write" with new section
`[forwarder "influxd"]`.
The section contains version, url, org, bucket, user, password, and
token.
The version field define the API version to be used when writing log
to Influxd.
Diffstat (limited to 'influxdb.go')
| -rw-r--r-- | influxdb.go | 52 |
1 files changed, 37 insertions, 15 deletions
diff --git a/influxdb.go b/influxdb.go index c5c1622..906f8c0 100644 --- a/influxdb.go +++ b/influxdb.go @@ -37,15 +37,15 @@ const ( // InfluxdbClient contains HTTP connection for writing logs to Influxdb. type InfluxdbClient struct { conn *http.Client - apiWrite string + cfg *InfluxdConfig hostname string buf bytes.Buffer } // NewInfluxdbClient will create, initialize, and return new Influxdb client. -func NewInfluxdbClient(apiWrite string) (cl *InfluxdbClient) { +func NewInfluxdbClient(cfg *InfluxdConfig) (cl *InfluxdbClient) { cl = &InfluxdbClient{ - apiWrite: apiWrite, + cfg: cfg, } cl.initHostname() @@ -77,36 +77,58 @@ func (cl *InfluxdbClient) initConn() { // Forwards implement the Forwarder interface. It will write all logs to // Influxdb. func (cl *InfluxdbClient) Forwards(halogs []*Halog) { - lsrc := "InfluxdbClient.Forwards" - err := cl.write(halogs) + var ( + logp = `Forwards` + + httpReq *http.Request + httpRes *http.Response + err error + ) + + err = cl.write(halogs) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + + httpReq, err = http.NewRequest(http.MethodPost, cl.cfg.apiWrite, &cl.buf) if err != nil { - log.Printf("InfluxdbClient.write: %s", err) + log.Printf(`%s: %s`, logp, err) return } - rsp, err := cl.conn.Post(cl.apiWrite, defContentType, &cl.buf) + httpReq.Header.Set(`Accept`, `application/json`) + + if cl.cfg.Version == influxdVersion1 { + httpReq.Header.Set(`Content-Type`, defContentType) + } else { + httpReq.Header.Set(`Authorization`, cl.cfg.headerToken) + httpReq.Header.Set(`Content-Type`, `text/plain; charset=utf-8`) + } + + httpRes, err = cl.conn.Do(httpReq) if err != nil { - log.Printf("InfluxdbClient.Forwards: %s", err) + log.Printf(`%s: %s`, logp, err) return } - if rsp.StatusCode >= 200 || rsp.StatusCode <= 299 { + if httpRes.StatusCode >= 200 || httpRes.StatusCode <= 299 { return } defer func() { - errClose := rsp.Body.Close() - if errClose != nil { - log.Printf("%s: Body.Close: %s\n", lsrc, err) + err = httpRes.Body.Close() + if err != nil { + log.Printf(`%s: Body.Close: %s`, logp, err) } }() - rspBody, err := ioutil.ReadAll(rsp.Body) + rspBody, err := ioutil.ReadAll(httpRes.Body) if err != nil { - log.Printf("%s: ioutil.ReadAll: %s", lsrc, err) + log.Printf(`%s: ioutil.ReadAll: %s`, logp, err) } - fmt.Printf("%s: response: %d %s\n", lsrc, rsp.StatusCode, rspBody) + fmt.Printf(`%s: response: %d %s\n`, logp, httpRes.StatusCode, rspBody) } func (cl *InfluxdbClient) write(halogs []*Halog) (err error) { |
