diff options
| author | Shulhan <ms@kilabit.info> | 2022-08-17 01:47:45 +0700 |
|---|---|---|
| committer | Shulhan <ms@kilabit.info> | 2022-08-17 01:47:45 +0700 |
| commit | d4d756000a90053bbbd46710daf35c8b20523965 (patch) | |
| tree | 551b28c4cb9542956c1432dfbed1cd8c6d82523c /influxd_client.go | |
| parent | c0420ca7fb513f8ee8b224f512124819bd2af34e (diff) | |
| download | haminer-d4d756000a90053bbbd46710daf35c8b20523965.tar.xz | |
all: rename influxdb.go to influxd_client.go
Diffstat (limited to 'influxd_client.go')
| -rw-r--r-- | influxd_client.go | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/influxd_client.go b/influxd_client.go new file mode 100644 index 0000000..639167e --- /dev/null +++ b/influxd_client.go @@ -0,0 +1,203 @@ +package haminer + +import ( + "bytes" + "fmt" + "io" + "log" + "net/http" + "os" +) + +const ( + envHostname = "HOSTNAME" + defHostname = "localhost" + defContentType = "application/octet-stream" + + influxdMeasurement = `haproxy` + + influxdTags = `,host=%s,` + + `server=%s,` + + `backend=%s,` + + `frontend=%s,` + + `http_method=%s,` + + `http_url=%s,` + + `http_query=%q,` + + `http_proto=%s,` + + `http_status=%d,` + + `term_state=%s,` + + `client_ip=%s,` + + `client_port=%d` + + influxdFields = `time_req=%d,` + + `time_wait=%d,` + + `time_connect=%d,` + + `time_rsp=%d,` + + `time_all=%d,` + + `conn_active=%d,` + + `conn_frontend=%d,` + + `conn_backend=%d,` + + `conn_server=%d,` + + `conn_retries=%d,` + + `queue_server=%d,` + + `queue_backend=%d,` + + `bytes_read=%d` +) + +// InfluxdClient contains HTTP connection for writing logs to Influxd. +type InfluxdClient struct { + conn *http.Client + cfg *InfluxdConfig + hostname string + buf bytes.Buffer +} + +// NewInfluxdClient will create, initialize, and return new Influxd client. +func NewInfluxdClient(cfg *InfluxdConfig) (cl *InfluxdClient) { + cl = &InfluxdClient{ + cfg: cfg, + } + + cl.initHostname() + cl.initConn() + + return +} + +func (cl *InfluxdClient) initHostname() { + var err error + + cl.hostname, err = os.Hostname() + if err != nil { + cl.hostname = os.Getenv(envHostname) + } + if len(cl.hostname) == 0 { + cl.hostname = defHostname + } +} + +func (cl *InfluxdClient) initConn() { + tr := &http.Transport{} + + cl.conn = &http.Client{ + Transport: tr, + } +} + +// Forwards implement the Forwarder interface. It will write all logs to +// Influxd. +func (cl *InfluxdClient) Forwards(halogs []*Halog) { + var ( + logp = `Forwards` + + httpReq *http.Request + httpRes *http.Response + err error + ) + + err = cl.write(halogs) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + + httpReq, err = http.NewRequest(http.MethodPost, cl.cfg.apiWrite, &cl.buf) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + + httpReq.Header.Set(`Accept`, `application/json`) + + if cl.cfg.Version == influxdVersion1 { + httpReq.Header.Set(`Content-Type`, defContentType) + } else { + httpReq.Header.Set(`Authorization`, cl.cfg.headerToken) + httpReq.Header.Set(`Content-Type`, `text/plain; charset=utf-8`) + } + + httpRes, err = cl.conn.Do(httpReq) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + + if httpRes.StatusCode >= 200 || httpRes.StatusCode <= 299 { + return + } + + defer func() { + err = httpRes.Body.Close() + if err != nil { + log.Printf(`%s: Body.Close: %s`, logp, err) + } + }() + + rspBody, err := io.ReadAll(httpRes.Body) + if err != nil { + log.Printf(`%s: %s`, logp, err) + } + + fmt.Printf(`%s: response: %d %s\n`, logp, httpRes.StatusCode, rspBody) +} + +func (cl *InfluxdClient) write(halogs []*Halog) (err error) { + var ( + l *Halog + k string + v string + ) + + cl.buf.Reset() + + for _, l = range halogs { + cl.buf.WriteString(influxdMeasurement) + + _, err = fmt.Fprintf(&cl.buf, influxdTags, + // tags + cl.hostname, + l.ServerName, + l.BackendName, + l.FrontendName, + l.HTTPMethod, + l.HTTPURL, + l.HTTPQuery, + l.HTTPProto, + l.HTTPStatus, + l.TermState, + l.ClientIP, + l.ClientPort, + ) + if err != nil { + return err + } + + for k, v = range l.RequestHeaders { + _, err = fmt.Fprintf(&cl.buf, ",%s=%s", k, v) + if err != nil { + return err + } + } + + cl.buf.WriteByte(' ') + + _, err = fmt.Fprintf(&cl.buf, influxdFields, + l.TimeReq, l.TimeWait, l.TimeConnect, + l.TimeRsp, l.TimeAll, + l.ConnActive, l.ConnFrontend, l.ConnBackend, + l.ConnServer, l.ConnRetries, + l.QueueServer, l.QueueBackend, + l.BytesRead, + ) + if err != nil { + return err + } + + _, err = fmt.Fprintf(&cl.buf, " %d\n", l.Timestamp.UnixNano()) + if err != nil { + return err + } + } + + return nil +} |
