From d4d756000a90053bbbd46710daf35c8b20523965 Mon Sep 17 00:00:00 2001 From: Shulhan Date: Wed, 17 Aug 2022 01:47:45 +0700 Subject: all: rename influxdb.go to influxd_client.go --- haminer.go | 2 +- influxd_client.go | 203 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ influxdb.go | 203 ------------------------------------------------------ 3 files changed, 204 insertions(+), 204 deletions(-) create mode 100644 influxd_client.go delete mode 100644 influxdb.go diff --git a/haminer.go b/haminer.go index 95dfe62..8b0799f 100644 --- a/haminer.go +++ b/haminer.go @@ -52,7 +52,7 @@ func (h *Haminer) createForwarder() { } var ( - fwder = NewInfluxdbClient(&h.cfg.Influxd) + fwder = NewInfluxdClient(&h.cfg.Influxd) ) h.ff = append(h.ff, fwder) diff --git a/influxd_client.go b/influxd_client.go new file mode 100644 index 0000000..639167e --- /dev/null +++ b/influxd_client.go @@ -0,0 +1,203 @@ +package haminer + +import ( + "bytes" + "fmt" + "io" + "log" + "net/http" + "os" +) + +const ( + envHostname = "HOSTNAME" + defHostname = "localhost" + defContentType = "application/octet-stream" + + influxdMeasurement = `haproxy` + + influxdTags = `,host=%s,` + + `server=%s,` + + `backend=%s,` + + `frontend=%s,` + + `http_method=%s,` + + `http_url=%s,` + + `http_query=%q,` + + `http_proto=%s,` + + `http_status=%d,` + + `term_state=%s,` + + `client_ip=%s,` + + `client_port=%d` + + influxdFields = `time_req=%d,` + + `time_wait=%d,` + + `time_connect=%d,` + + `time_rsp=%d,` + + `time_all=%d,` + + `conn_active=%d,` + + `conn_frontend=%d,` + + `conn_backend=%d,` + + `conn_server=%d,` + + `conn_retries=%d,` + + `queue_server=%d,` + + `queue_backend=%d,` + + `bytes_read=%d` +) + +// InfluxdClient contains HTTP connection for writing logs to Influxd. +type InfluxdClient struct { + conn *http.Client + cfg *InfluxdConfig + hostname string + buf bytes.Buffer +} + +// NewInfluxdClient will create, initialize, and return new Influxd client. +func NewInfluxdClient(cfg *InfluxdConfig) (cl *InfluxdClient) { + cl = &InfluxdClient{ + cfg: cfg, + } + + cl.initHostname() + cl.initConn() + + return +} + +func (cl *InfluxdClient) initHostname() { + var err error + + cl.hostname, err = os.Hostname() + if err != nil { + cl.hostname = os.Getenv(envHostname) + } + if len(cl.hostname) == 0 { + cl.hostname = defHostname + } +} + +func (cl *InfluxdClient) initConn() { + tr := &http.Transport{} + + cl.conn = &http.Client{ + Transport: tr, + } +} + +// Forwards implement the Forwarder interface. It will write all logs to +// Influxd. +func (cl *InfluxdClient) Forwards(halogs []*Halog) { + var ( + logp = `Forwards` + + httpReq *http.Request + httpRes *http.Response + err error + ) + + err = cl.write(halogs) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + + httpReq, err = http.NewRequest(http.MethodPost, cl.cfg.apiWrite, &cl.buf) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + + httpReq.Header.Set(`Accept`, `application/json`) + + if cl.cfg.Version == influxdVersion1 { + httpReq.Header.Set(`Content-Type`, defContentType) + } else { + httpReq.Header.Set(`Authorization`, cl.cfg.headerToken) + httpReq.Header.Set(`Content-Type`, `text/plain; charset=utf-8`) + } + + httpRes, err = cl.conn.Do(httpReq) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + + if httpRes.StatusCode >= 200 || httpRes.StatusCode <= 299 { + return + } + + defer func() { + err = httpRes.Body.Close() + if err != nil { + log.Printf(`%s: Body.Close: %s`, logp, err) + } + }() + + rspBody, err := io.ReadAll(httpRes.Body) + if err != nil { + log.Printf(`%s: %s`, logp, err) + } + + fmt.Printf(`%s: response: %d %s\n`, logp, httpRes.StatusCode, rspBody) +} + +func (cl *InfluxdClient) write(halogs []*Halog) (err error) { + var ( + l *Halog + k string + v string + ) + + cl.buf.Reset() + + for _, l = range halogs { + cl.buf.WriteString(influxdMeasurement) + + _, err = fmt.Fprintf(&cl.buf, influxdTags, + // tags + cl.hostname, + l.ServerName, + l.BackendName, + l.FrontendName, + l.HTTPMethod, + l.HTTPURL, + l.HTTPQuery, + l.HTTPProto, + l.HTTPStatus, + l.TermState, + l.ClientIP, + l.ClientPort, + ) + if err != nil { + return err + } + + for k, v = range l.RequestHeaders { + _, err = fmt.Fprintf(&cl.buf, ",%s=%s", k, v) + if err != nil { + return err + } + } + + cl.buf.WriteByte(' ') + + _, err = fmt.Fprintf(&cl.buf, influxdFields, + l.TimeReq, l.TimeWait, l.TimeConnect, + l.TimeRsp, l.TimeAll, + l.ConnActive, l.ConnFrontend, l.ConnBackend, + l.ConnServer, l.ConnRetries, + l.QueueServer, l.QueueBackend, + l.BytesRead, + ) + if err != nil { + return err + } + + _, err = fmt.Fprintf(&cl.buf, " %d\n", l.Timestamp.UnixNano()) + if err != nil { + return err + } + } + + return nil +} diff --git a/influxdb.go b/influxdb.go deleted file mode 100644 index 3868574..0000000 --- a/influxdb.go +++ /dev/null @@ -1,203 +0,0 @@ -package haminer - -import ( - "bytes" - "fmt" - "io" - "log" - "net/http" - "os" -) - -const ( - envHostname = "HOSTNAME" - defHostname = "localhost" - defContentType = "application/octet-stream" - - influxdMeasurement = `haproxy` - - influxdTags = `,host=%s,` + - `server=%s,` + - `backend=%s,` + - `frontend=%s,` + - `http_method=%s,` + - `http_url=%s,` + - `http_query=%q,` + - `http_proto=%s,` + - `http_status=%d,` + - `term_state=%s,` + - `client_ip=%s,` + - `client_port=%d` - - influxdFields = `time_req=%d,` + - `time_wait=%d,` + - `time_connect=%d,` + - `time_rsp=%d,` + - `time_all=%d,` + - `conn_active=%d,` + - `conn_frontend=%d,` + - `conn_backend=%d,` + - `conn_server=%d,` + - `conn_retries=%d,` + - `queue_server=%d,` + - `queue_backend=%d,` + - `bytes_read=%d` -) - -// InfluxdbClient contains HTTP connection for writing logs to Influxdb. -type InfluxdbClient struct { - conn *http.Client - cfg *InfluxdConfig - hostname string - buf bytes.Buffer -} - -// NewInfluxdbClient will create, initialize, and return new Influxdb client. -func NewInfluxdbClient(cfg *InfluxdConfig) (cl *InfluxdbClient) { - cl = &InfluxdbClient{ - cfg: cfg, - } - - cl.initHostname() - cl.initConn() - - return -} - -func (cl *InfluxdbClient) initHostname() { - var err error - - cl.hostname, err = os.Hostname() - if err != nil { - cl.hostname = os.Getenv(envHostname) - } - if len(cl.hostname) == 0 { - cl.hostname = defHostname - } -} - -func (cl *InfluxdbClient) initConn() { - tr := &http.Transport{} - - cl.conn = &http.Client{ - Transport: tr, - } -} - -// Forwards implement the Forwarder interface. It will write all logs to -// Influxdb. -func (cl *InfluxdbClient) Forwards(halogs []*Halog) { - var ( - logp = `Forwards` - - httpReq *http.Request - httpRes *http.Response - err error - ) - - err = cl.write(halogs) - if err != nil { - log.Printf(`%s: %s`, logp, err) - return - } - - httpReq, err = http.NewRequest(http.MethodPost, cl.cfg.apiWrite, &cl.buf) - if err != nil { - log.Printf(`%s: %s`, logp, err) - return - } - - httpReq.Header.Set(`Accept`, `application/json`) - - if cl.cfg.Version == influxdVersion1 { - httpReq.Header.Set(`Content-Type`, defContentType) - } else { - httpReq.Header.Set(`Authorization`, cl.cfg.headerToken) - httpReq.Header.Set(`Content-Type`, `text/plain; charset=utf-8`) - } - - httpRes, err = cl.conn.Do(httpReq) - if err != nil { - log.Printf(`%s: %s`, logp, err) - return - } - - if httpRes.StatusCode >= 200 || httpRes.StatusCode <= 299 { - return - } - - defer func() { - err = httpRes.Body.Close() - if err != nil { - log.Printf(`%s: Body.Close: %s`, logp, err) - } - }() - - rspBody, err := io.ReadAll(httpRes.Body) - if err != nil { - log.Printf(`%s: %s`, logp, err) - } - - fmt.Printf(`%s: response: %d %s\n`, logp, httpRes.StatusCode, rspBody) -} - -func (cl *InfluxdbClient) write(halogs []*Halog) (err error) { - var ( - l *Halog - k string - v string - ) - - cl.buf.Reset() - - for _, l = range halogs { - cl.buf.WriteString(influxdMeasurement) - - _, err = fmt.Fprintf(&cl.buf, influxdTags, - // tags - cl.hostname, - l.ServerName, - l.BackendName, - l.FrontendName, - l.HTTPMethod, - l.HTTPURL, - l.HTTPQuery, - l.HTTPProto, - l.HTTPStatus, - l.TermState, - l.ClientIP, - l.ClientPort, - ) - if err != nil { - return err - } - - for k, v = range l.RequestHeaders { - _, err = fmt.Fprintf(&cl.buf, ",%s=%s", k, v) - if err != nil { - return err - } - } - - cl.buf.WriteByte(' ') - - _, err = fmt.Fprintf(&cl.buf, influxdFields, - l.TimeReq, l.TimeWait, l.TimeConnect, - l.TimeRsp, l.TimeAll, - l.ConnActive, l.ConnFrontend, l.ConnBackend, - l.ConnServer, l.ConnRetries, - l.QueueServer, l.QueueBackend, - l.BytesRead, - ) - if err != nil { - return err - } - - _, err = fmt.Fprintf(&cl.buf, " %d\n", l.Timestamp.UnixNano()) - if err != nil { - return err - } - } - - return nil -} -- cgit v1.3