aboutsummaryrefslogtreecommitdiff
path: root/forwarder_influxd.go
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2024-03-17 17:57:51 +0700
committerShulhan <ms@kilabit.info>2024-03-18 21:55:35 +0700
commitb86893f6dd315cb253ec5479501791c500d82876 (patch)
tree2122d00405271e4f8a5ac4f048693562d17a4319 /forwarder_influxd.go
parent182a79905737010fa135d101be656144b7b0f39a (diff)
downloadhaminer-b86893f6dd315cb253ec5479501791c500d82876.tar.xz
all: rename the forwarder files by using prefix "forwarder_"
Diffstat (limited to 'forwarder_influxd.go')
-rw-r--r--forwarder_influxd.go182
1 files changed, 182 insertions, 0 deletions
diff --git a/forwarder_influxd.go b/forwarder_influxd.go
new file mode 100644
index 0000000..3ee358c
--- /dev/null
+++ b/forwarder_influxd.go
@@ -0,0 +1,182 @@
+// SPDX-FileCopyrightText: 2018 M. Shulhan <ms@kilabit.info>
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package haminer
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ "os"
+)
+
+const (
+ defContentType = "application/octet-stream"
+)
+
+// forwarderInfluxd contains HTTP connection for writing logs to Influxd.
+type forwarderInfluxd struct {
+ conn *http.Client
+ cfg *ConfigForwarder
+ hostname string
+ buf bytes.Buffer
+}
+
+// newForwarderInfluxd will create, initialize, and return new Influxd client.
+func newForwarderInfluxd(cfg *ConfigForwarder) (cl *forwarderInfluxd) {
+ if len(cfg.URL) == 0 {
+ return nil
+ }
+
+ cl = &forwarderInfluxd{
+ cfg: cfg,
+ }
+
+ cl.initHostname()
+ cl.initConn()
+
+ return
+}
+
+func (cl *forwarderInfluxd) initHostname() {
+ var err error
+
+ cl.hostname, err = os.Hostname()
+ if err != nil {
+ cl.hostname = os.Getenv(envHostname)
+ }
+ if len(cl.hostname) == 0 {
+ cl.hostname = defHostname
+ }
+}
+
+func (cl *forwarderInfluxd) initConn() {
+ tr := &http.Transport{}
+
+ cl.conn = &http.Client{
+ Transport: tr,
+ }
+}
+
+// Forwards implement the Forwarder interface. It will write all logs to
+// Influxd.
+func (cl *forwarderInfluxd) Forwards(halogs []*HTTPLog) {
+ var (
+ logp = `influxdClient: Forwards`
+
+ httpReq *http.Request
+ httpRes *http.Response
+ err error
+ )
+
+ err = cl.write(halogs)
+ if err != nil {
+ log.Printf(`%s: %s`, logp, err)
+ return
+ }
+
+ var ctx = context.Background()
+
+ httpReq, err = http.NewRequestWithContext(ctx, http.MethodPost, cl.cfg.apiWrite, &cl.buf)
+ if err != nil {
+ log.Printf(`%s: %s`, logp, err)
+ return
+ }
+
+ httpReq.Header.Set(`Accept`, `application/json`)
+
+ if cl.cfg.Version == influxdVersion1 {
+ httpReq.Header.Set(`Content-Type`, defContentType)
+ } else {
+ httpReq.Header.Set(`Authorization`, cl.cfg.headerToken)
+ httpReq.Header.Set(`Content-Type`, `text/plain; charset=utf-8`)
+ }
+
+ httpRes, err = cl.conn.Do(httpReq)
+ if err != nil {
+ log.Printf(`%s: %s`, logp, err)
+ return
+ }
+
+ if httpRes.StatusCode >= 200 || httpRes.StatusCode <= 299 {
+ return
+ }
+
+ defer func() {
+ err = httpRes.Body.Close()
+ if err != nil {
+ log.Printf(`%s: Body.Close: %s`, logp, err)
+ }
+ }()
+
+ rspBody, err := io.ReadAll(httpRes.Body)
+ if err != nil {
+ log.Printf(`%s: %s`, logp, err)
+ }
+
+ fmt.Printf(`%s: response: %d %s\n`, logp, httpRes.StatusCode, rspBody)
+}
+
+func (cl *forwarderInfluxd) write(halogs []*HTTPLog) (err error) {
+ var (
+ l *HTTPLog
+ k string
+ v string
+ )
+
+ cl.buf.Reset()
+
+ for _, l = range halogs {
+ cl.buf.WriteString(influxdMeasurement)
+
+ _, err = fmt.Fprintf(&cl.buf, influxdTags,
+ // tags
+ cl.hostname,
+ l.ServerName,
+ l.BackendName,
+ l.FrontendName,
+ l.HTTPMethod,
+ l.HTTPURL,
+ l.HTTPQuery,
+ l.HTTPProto,
+ l.HTTPStatus,
+ l.TermState,
+ l.ClientIP,
+ l.ClientPort,
+ )
+ if err != nil {
+ return err
+ }
+
+ for k, v = range l.RequestHeaders {
+ _, err = fmt.Fprintf(&cl.buf, ",%s=%s", k, v)
+ if err != nil {
+ return err
+ }
+ }
+
+ cl.buf.WriteByte(' ')
+
+ _, err = fmt.Fprintf(&cl.buf, influxdFields,
+ l.TimeReq, l.TimeWait, l.TimeConnect,
+ l.TimeRsp, l.TimeAll,
+ l.ConnActive, l.ConnFrontend, l.ConnBackend,
+ l.ConnServer, l.ConnRetries,
+ l.QueueServer, l.QueueBackend,
+ l.BytesRead,
+ )
+ if err != nil {
+ return err
+ }
+
+ _, err = fmt.Fprintf(&cl.buf, " %d\n", l.Timestamp.UnixNano())
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}