From b86893f6dd315cb253ec5479501791c500d82876 Mon Sep 17 00:00:00 2001 From: Shulhan Date: Sun, 17 Mar 2024 17:57:51 +0700 Subject: all: rename the forwarder files by using prefix "forwarder_" --- config_forwarder.go | 13 ++-- config_test.go | 3 +- forwarder_influxd.go | 182 +++++++++++++++++++++++++++++++++++++++++++++++++++ forwarder_questdb.go | 104 +++++++++++++++++++++++++++++ haminer.go | 12 ++-- influxd_client.go | 182 --------------------------------------------------- questdb_client.go | 104 ----------------------------- 7 files changed, 302 insertions(+), 298 deletions(-) create mode 100644 forwarder_influxd.go create mode 100644 forwarder_questdb.go delete mode 100644 influxd_client.go delete mode 100644 questdb_client.go diff --git a/config_forwarder.go b/config_forwarder.go index c53b807..0bc3cf9 100644 --- a/config_forwarder.go +++ b/config_forwarder.go @@ -14,12 +14,13 @@ const ( influxdVersion1 = `v1` influxdVersion2 = `v2` - forwarderInfluxd = `influxd` - forwarderQuestdb = `questdb` + forwarderKindInfluxd = `influxd` + forwarderKindQuestdb = `questdb` ) // ConfigForwarder contains configuration for forwarding the logs. type ConfigForwarder struct { + kind string Version string `ini:"::version"` URL string `ini:"::url"` @@ -28,12 +29,12 @@ type ConfigForwarder struct { Bucket string `ini:"::bucket"` - // Fields for HTTP API v1. + // Fields for Influxd HTTP API v1. User string `ini:"::user"` Pass string `ini:"::pass"` - // Fields for HTTP API v2. + // Fields for Influxd HTTP API v2. Org string `ini:"::org"` Token string `ini:"::token"` @@ -41,11 +42,13 @@ type ConfigForwarder struct { // init check, validate, and initialize the configuration values. func (cfg *ConfigForwarder) init(fwName string) (err error) { + cfg.kind = fwName + if len(cfg.URL) == 0 { return } - if fwName == forwarderInfluxd { + if fwName == forwarderKindInfluxd { return cfg.initInfluxd() } diff --git a/config_test.go b/config_test.go index f2b7e96..a54e18d 100644 --- a/config_test.go +++ b/config_test.go @@ -64,7 +64,8 @@ func TestLoad(t *testing.T) { in: "testdata/haminer.conf", exp: &Config{ Forwarders: map[string]*ConfigForwarder{ - `influxd`: &ConfigForwarder{ + forwarderKindInfluxd: &ConfigForwarder{ + kind: forwarderKindInfluxd, Version: `v2`, URL: `http://127.0.0.1:8086`, Org: `kilabit.info`, diff --git a/forwarder_influxd.go b/forwarder_influxd.go new file mode 100644 index 0000000..3ee358c --- /dev/null +++ b/forwarder_influxd.go @@ -0,0 +1,182 @@ +// SPDX-FileCopyrightText: 2018 M. Shulhan +// SPDX-License-Identifier: GPL-3.0-or-later + +package haminer + +import ( + "bytes" + "context" + "fmt" + "io" + "log" + "net/http" + "os" +) + +const ( + defContentType = "application/octet-stream" +) + +// forwarderInfluxd contains HTTP connection for writing logs to Influxd. +type forwarderInfluxd struct { + conn *http.Client + cfg *ConfigForwarder + hostname string + buf bytes.Buffer +} + +// newForwarderInfluxd will create, initialize, and return new Influxd client. +func newForwarderInfluxd(cfg *ConfigForwarder) (cl *forwarderInfluxd) { + if len(cfg.URL) == 0 { + return nil + } + + cl = &forwarderInfluxd{ + cfg: cfg, + } + + cl.initHostname() + cl.initConn() + + return +} + +func (cl *forwarderInfluxd) initHostname() { + var err error + + cl.hostname, err = os.Hostname() + if err != nil { + cl.hostname = os.Getenv(envHostname) + } + if len(cl.hostname) == 0 { + cl.hostname = defHostname + } +} + +func (cl *forwarderInfluxd) initConn() { + tr := &http.Transport{} + + cl.conn = &http.Client{ + Transport: tr, + } +} + +// Forwards implement the Forwarder interface. It will write all logs to +// Influxd. +func (cl *forwarderInfluxd) Forwards(halogs []*HTTPLog) { + var ( + logp = `influxdClient: Forwards` + + httpReq *http.Request + httpRes *http.Response + err error + ) + + err = cl.write(halogs) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + + var ctx = context.Background() + + httpReq, err = http.NewRequestWithContext(ctx, http.MethodPost, cl.cfg.apiWrite, &cl.buf) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + + httpReq.Header.Set(`Accept`, `application/json`) + + if cl.cfg.Version == influxdVersion1 { + httpReq.Header.Set(`Content-Type`, defContentType) + } else { + httpReq.Header.Set(`Authorization`, cl.cfg.headerToken) + httpReq.Header.Set(`Content-Type`, `text/plain; charset=utf-8`) + } + + httpRes, err = cl.conn.Do(httpReq) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + + if httpRes.StatusCode >= 200 || httpRes.StatusCode <= 299 { + return + } + + defer func() { + err = httpRes.Body.Close() + if err != nil { + log.Printf(`%s: Body.Close: %s`, logp, err) + } + }() + + rspBody, err := io.ReadAll(httpRes.Body) + if err != nil { + log.Printf(`%s: %s`, logp, err) + } + + fmt.Printf(`%s: response: %d %s\n`, logp, httpRes.StatusCode, rspBody) +} + +func (cl *forwarderInfluxd) write(halogs []*HTTPLog) (err error) { + var ( + l *HTTPLog + k string + v string + ) + + cl.buf.Reset() + + for _, l = range halogs { + cl.buf.WriteString(influxdMeasurement) + + _, err = fmt.Fprintf(&cl.buf, influxdTags, + // tags + cl.hostname, + l.ServerName, + l.BackendName, + l.FrontendName, + l.HTTPMethod, + l.HTTPURL, + l.HTTPQuery, + l.HTTPProto, + l.HTTPStatus, + l.TermState, + l.ClientIP, + l.ClientPort, + ) + if err != nil { + return err + } + + for k, v = range l.RequestHeaders { + _, err = fmt.Fprintf(&cl.buf, ",%s=%s", k, v) + if err != nil { + return err + } + } + + cl.buf.WriteByte(' ') + + _, err = fmt.Fprintf(&cl.buf, influxdFields, + l.TimeReq, l.TimeWait, l.TimeConnect, + l.TimeRsp, l.TimeAll, + l.ConnActive, l.ConnFrontend, l.ConnBackend, + l.ConnServer, l.ConnRetries, + l.QueueServer, l.QueueBackend, + l.BytesRead, + ) + if err != nil { + return err + } + + _, err = fmt.Fprintf(&cl.buf, " %d\n", l.Timestamp.UnixNano()) + if err != nil { + return err + } + } + + return nil +} diff --git a/forwarder_questdb.go b/forwarder_questdb.go new file mode 100644 index 0000000..5903e1d --- /dev/null +++ b/forwarder_questdb.go @@ -0,0 +1,104 @@ +// SPDX-FileCopyrightText: 2022 M. Shulhan +// SPDX-License-Identifier: GPL-3.0-or-later + +package haminer + +import ( + "bytes" + "fmt" + "log" + "net" + "net/url" + "time" + + libnet "git.sr.ht/~shulhan/pakakeh.go/lib/net" +) + +const ( + defQuestdbPort = 9009 +) + +// forwarderQuestdb client for questdb. +type forwarderQuestdb struct { + conn net.Conn + buf bytes.Buffer +} + +// newForwarderQuestdb create and initialize client connection using the URL in +// the ConfigForwarder. +func newForwarderQuestdb(cfg *ConfigForwarder) (questc *forwarderQuestdb, err error) { + if cfg == nil || len(cfg.URL) == 0 { + return nil, nil + } + + var ( + logp = `newForwarderQuestdb` + timeout = 10 * time.Second + + surl *url.URL + address string + ip net.IP + port uint16 + ) + + surl, err = url.Parse(cfg.URL) + if err != nil { + return nil, fmt.Errorf(`%s: %w`, logp, err) + } + + if len(surl.Scheme) == 0 { + surl.Scheme = "udp" + } + + address, ip, port = libnet.ParseIPPort(surl.Host, defQuestdbPort) + if len(address) == 0 { + address = fmt.Sprintf(`%s:%d`, ip, port) + } else { + address = fmt.Sprintf(`%s:%d`, address, port) + } + + questc = &forwarderQuestdb{} + + questc.conn, err = net.DialTimeout(surl.Scheme, address, timeout) + if err != nil { + return nil, fmt.Errorf(`%s: %w`, logp, err) + } + + return questc, nil +} + +// Forwards implement the Forwarder interface. +// It will write all logs to questdb. +func (questc *forwarderQuestdb) Forwards(logs []*HTTPLog) { + var ( + logp = `forwarderQuestdb: Forwards` + now = time.Now() + + httpLog *HTTPLog + data []byte + err error + ) + + questc.buf.Reset() + + for _, httpLog = range logs { + err = httpLog.writeIlp(&questc.buf) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + } + + data = questc.buf.Bytes() + + err = questc.conn.SetWriteDeadline(now.Add(5 * time.Second)) + if err != nil { + log.Printf(`%s: SetWriteDeadline: %s`, logp, err) + return + } + + _, err = questc.conn.Write(data) + if err != nil { + log.Printf(`%s: Write: %s`, logp, err) + } +} diff --git a/haminer.go b/haminer.go index 7d866a5..b89b619 100644 --- a/haminer.go +++ b/haminer.go @@ -71,19 +71,19 @@ func (h *Haminer) createForwarder() { ) for fwName, fwCfg = range h.cfg.Forwarders { - var influxc *InfluxdClient + var influxc *forwarderInfluxd switch fwName { - case forwarderInfluxd: - influxc = NewInfluxdClient(fwCfg) + case forwarderKindInfluxd: + influxc = newForwarderInfluxd(fwCfg) if influxc != nil { h.ff = append(h.ff, influxc) } - case forwarderQuestdb: - var questc *questdbClient + case forwarderKindQuestdb: + var questc *forwarderQuestdb - questc, err = newQuestdbClient(fwCfg) + questc, err = newForwarderQuestdb(fwCfg) if err != nil { log.Printf(`%s: %s: %s`, logp, fwName, err) continue diff --git a/influxd_client.go b/influxd_client.go deleted file mode 100644 index 481f641..0000000 --- a/influxd_client.go +++ /dev/null @@ -1,182 +0,0 @@ -// SPDX-FileCopyrightText: 2018 M. Shulhan -// SPDX-License-Identifier: GPL-3.0-or-later - -package haminer - -import ( - "bytes" - "context" - "fmt" - "io" - "log" - "net/http" - "os" -) - -const ( - defContentType = "application/octet-stream" -) - -// InfluxdClient contains HTTP connection for writing logs to Influxd. -type InfluxdClient struct { - conn *http.Client - cfg *ConfigForwarder - hostname string - buf bytes.Buffer -} - -// NewInfluxdClient will create, initialize, and return new Influxd client. -func NewInfluxdClient(cfg *ConfigForwarder) (cl *InfluxdClient) { - if len(cfg.URL) == 0 { - return nil - } - - cl = &InfluxdClient{ - cfg: cfg, - } - - cl.initHostname() - cl.initConn() - - return -} - -func (cl *InfluxdClient) initHostname() { - var err error - - cl.hostname, err = os.Hostname() - if err != nil { - cl.hostname = os.Getenv(envHostname) - } - if len(cl.hostname) == 0 { - cl.hostname = defHostname - } -} - -func (cl *InfluxdClient) initConn() { - tr := &http.Transport{} - - cl.conn = &http.Client{ - Transport: tr, - } -} - -// Forwards implement the Forwarder interface. It will write all logs to -// Influxd. -func (cl *InfluxdClient) Forwards(halogs []*HTTPLog) { - var ( - logp = `influxdClient: Forwards` - - httpReq *http.Request - httpRes *http.Response - err error - ) - - err = cl.write(halogs) - if err != nil { - log.Printf(`%s: %s`, logp, err) - return - } - - var ctx = context.Background() - - httpReq, err = http.NewRequestWithContext(ctx, http.MethodPost, cl.cfg.apiWrite, &cl.buf) - if err != nil { - log.Printf(`%s: %s`, logp, err) - return - } - - httpReq.Header.Set(`Accept`, `application/json`) - - if cl.cfg.Version == influxdVersion1 { - httpReq.Header.Set(`Content-Type`, defContentType) - } else { - httpReq.Header.Set(`Authorization`, cl.cfg.headerToken) - httpReq.Header.Set(`Content-Type`, `text/plain; charset=utf-8`) - } - - httpRes, err = cl.conn.Do(httpReq) - if err != nil { - log.Printf(`%s: %s`, logp, err) - return - } - - if httpRes.StatusCode >= 200 || httpRes.StatusCode <= 299 { - return - } - - defer func() { - err = httpRes.Body.Close() - if err != nil { - log.Printf(`%s: Body.Close: %s`, logp, err) - } - }() - - rspBody, err := io.ReadAll(httpRes.Body) - if err != nil { - log.Printf(`%s: %s`, logp, err) - } - - fmt.Printf(`%s: response: %d %s\n`, logp, httpRes.StatusCode, rspBody) -} - -func (cl *InfluxdClient) write(halogs []*HTTPLog) (err error) { - var ( - l *HTTPLog - k string - v string - ) - - cl.buf.Reset() - - for _, l = range halogs { - cl.buf.WriteString(influxdMeasurement) - - _, err = fmt.Fprintf(&cl.buf, influxdTags, - // tags - cl.hostname, - l.ServerName, - l.BackendName, - l.FrontendName, - l.HTTPMethod, - l.HTTPURL, - l.HTTPQuery, - l.HTTPProto, - l.HTTPStatus, - l.TermState, - l.ClientIP, - l.ClientPort, - ) - if err != nil { - return err - } - - for k, v = range l.RequestHeaders { - _, err = fmt.Fprintf(&cl.buf, ",%s=%s", k, v) - if err != nil { - return err - } - } - - cl.buf.WriteByte(' ') - - _, err = fmt.Fprintf(&cl.buf, influxdFields, - l.TimeReq, l.TimeWait, l.TimeConnect, - l.TimeRsp, l.TimeAll, - l.ConnActive, l.ConnFrontend, l.ConnBackend, - l.ConnServer, l.ConnRetries, - l.QueueServer, l.QueueBackend, - l.BytesRead, - ) - if err != nil { - return err - } - - _, err = fmt.Fprintf(&cl.buf, " %d\n", l.Timestamp.UnixNano()) - if err != nil { - return err - } - } - - return nil -} diff --git a/questdb_client.go b/questdb_client.go deleted file mode 100644 index d185978..0000000 --- a/questdb_client.go +++ /dev/null @@ -1,104 +0,0 @@ -// SPDX-FileCopyrightText: 2022 M. Shulhan -// SPDX-License-Identifier: GPL-3.0-or-later - -package haminer - -import ( - "bytes" - "fmt" - "log" - "net" - "net/url" - "time" - - libnet "git.sr.ht/~shulhan/pakakeh.go/lib/net" -) - -const ( - defQuestdbPort = 9009 -) - -// questdbClient client for questdb. -type questdbClient struct { - conn net.Conn - buf bytes.Buffer -} - -// newQuestdbClient create and initialize client connection using the URL in -// the ConfigForwarder. -func newQuestdbClient(cfg *ConfigForwarder) (questc *questdbClient, err error) { - if cfg == nil || len(cfg.URL) == 0 { - return nil, nil - } - - var ( - logp = `newQuestdbClient` - timeout = 10 * time.Second - - surl *url.URL - address string - ip net.IP - port uint16 - ) - - surl, err = url.Parse(cfg.URL) - if err != nil { - return nil, fmt.Errorf(`%s: %w`, logp, err) - } - - if len(surl.Scheme) == 0 { - surl.Scheme = "udp" - } - - address, ip, port = libnet.ParseIPPort(surl.Host, defQuestdbPort) - if len(address) == 0 { - address = fmt.Sprintf(`%s:%d`, ip, port) - } else { - address = fmt.Sprintf(`%s:%d`, address, port) - } - - questc = &questdbClient{} - - questc.conn, err = net.DialTimeout(surl.Scheme, address, timeout) - if err != nil { - return nil, fmt.Errorf(`%s: %w`, logp, err) - } - - return questc, nil -} - -// Forwards implement the Forwarder interface. -// It will write all logs to questdb. -func (questc *questdbClient) Forwards(logs []*HTTPLog) { - var ( - logp = `questdbClient: Forwards` - now = time.Now() - - httpLog *HTTPLog - data []byte - err error - ) - - questc.buf.Reset() - - for _, httpLog = range logs { - err = httpLog.writeIlp(&questc.buf) - if err != nil { - log.Printf(`%s: %s`, logp, err) - return - } - } - - data = questc.buf.Bytes() - - err = questc.conn.SetWriteDeadline(now.Add(5 * time.Second)) - if err != nil { - log.Printf(`%s: SetWriteDeadline: %s`, logp, err) - return - } - - _, err = questc.conn.Write(data) - if err != nil { - log.Printf(`%s: Write: %s`, logp, err) - } -} -- cgit v1.3