From 364609ad3b03b97c5e25f4e8555b3a0267b548f2 Mon Sep 17 00:00:00 2001 From: Shulhan Date: Wed, 17 Aug 2022 17:53:28 +0700 Subject: all: implement forwarder for questdb Questdb [1] is one the time-series database. We experiment to forward the HTTP log using Influx Line Protocol (ILP). [1]: https://questdb.io/ --- cmd/haminer/haminer.conf | 19 +++++++++ config.go | 11 +++--- config_forwarder.go | 23 +++++++---- go.mod | 2 + go.sum | 2 + haminer.go | 53 ++++++++++++++++++++++--- http_log.go | 96 ++++++++++++++++++++++++++++++++++++++++++++ influxd_client.go | 33 +--------------- questdb_client.go | 101 +++++++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 290 insertions(+), 50 deletions(-) create mode 100644 questdb_client.go diff --git a/cmd/haminer/haminer.conf b/cmd/haminer/haminer.conf index e355e44..c20f135 100644 --- a/cmd/haminer/haminer.conf +++ b/cmd/haminer/haminer.conf @@ -135,3 +135,22 @@ ## Authorization for v2. #token = + +## The questdb forwarder define configuration to forward the log to questdb +## instance. +## The log is forwarded using Influxb Line Protocol (ILP) [1] +## +## [1]: https://questdb.io/docs/reference/api/ilp/overview + +[forwarder "questdb"] + +## The URL of questdb server in the following format, +## +## $scheme "://" $ip_address [":" $port] +## +## The $scheme can be "udp" (default) or "tcp". +## The $ip_address is required, define the address where questdb running. +## The $port number is optional default to 9009. +## +## An empty url means the forwarder is disabled. +url = diff --git a/config.go b/config.go index 3635638..45bd023 100644 --- a/config.go +++ b/config.go @@ -67,8 +67,9 @@ func (cfg *Config) Load(path string) (err error) { var ( logp = `Load` - in *ini.Ini - fw *ConfigForwarder + in *ini.Ini + fwCfg *ConfigForwarder + fwName string ) in, err = ini.Open(path) @@ -90,10 +91,10 @@ func (cfg *Config) Load(path string) (err error) { return fmt.Errorf(`%s: %w`, logp, err) } - for _, fw = range cfg.Forwarders { - err = fw.init() + for fwName, fwCfg = range cfg.Forwarders { + err = fwCfg.init(fwName) if err != nil { - return fmt.Errorf(`%s: %w`, logp, err) + return fmt.Errorf(`%s: %s: %w`, logp, fwName, err) } } diff --git a/config_forwarder.go b/config_forwarder.go index 408d1bd..c7acc85 100644 --- a/config_forwarder.go +++ b/config_forwarder.go @@ -12,6 +12,7 @@ const ( influxdVersion2 = `v2` forwarderInfluxd = `influxd` + forwarderQuestdb = `questdb` ) // ConfigForwarder contains configuration for forwarding the logs. @@ -36,11 +37,19 @@ type ConfigForwarder struct { } // init check, validate, and initialize the configuration values. -func (cfg *ConfigForwarder) init() (err error) { +func (cfg *ConfigForwarder) init(fwName string) (err error) { if len(cfg.Url) == 0 { return } + if fwName == forwarderInfluxd { + return cfg.initInfluxd() + } + + return nil +} + +func (cfg *ConfigForwarder) initInfluxd() (err error) { switch cfg.Version { case influxdVersion1: case influxdVersion2: @@ -55,10 +64,10 @@ func (cfg *ConfigForwarder) init() (err error) { var ( q = url.Values{} - url *url.URL + surl *url.URL ) - url, err = url.Parse(cfg.Url) + surl, err = url.Parse(cfg.Url) if err != nil { return err } @@ -66,7 +75,7 @@ func (cfg *ConfigForwarder) init() (err error) { q.Set(`precision`, `ns`) if cfg.Version == influxdVersion1 { - url.Path = `/write` + surl.Path = `/write` q.Set(`db`, cfg.Bucket) if len(cfg.User) > 0 && len(cfg.Pass) > 0 { @@ -75,7 +84,7 @@ func (cfg *ConfigForwarder) init() (err error) { } } else { cfg.headerToken = `Token ` + cfg.Token - url.Path = `/api/v2/write` + surl.Path = `/api/v2/write` if len(cfg.Org) == 0 { return errors.New(`empty organization field`) @@ -85,9 +94,9 @@ func (cfg *ConfigForwarder) init() (err error) { q.Set(`bucket`, cfg.Bucket) } - url.RawQuery = q.Encode() + surl.RawQuery = q.Encode() - cfg.apiWrite = url.String() + cfg.apiWrite = surl.String() return nil } diff --git a/go.mod b/go.mod index 4963bb1..a5a675a 100644 --- a/go.mod +++ b/go.mod @@ -3,3 +3,5 @@ module git.sr.ht/~shulhan/haminer go 1.18 require github.com/shuLhan/share v0.40.0 + +require golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect diff --git a/go.sum b/go.sum index 27da647..ebbff64 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,4 @@ github.com/shuLhan/share v0.40.0 h1:C0c1lfKLzogUStIiYJecoTTP9TrEDMz64la1Y1l8Wl0= github.com/shuLhan/share v0.40.0/go.mod h1:hb3Kis5s4jPume4YD15JELE67naFybtuALshhh9TlOg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/haminer.go b/haminer.go index 9b1e080..07bbe91 100644 --- a/haminer.go +++ b/haminer.go @@ -8,9 +8,19 @@ import ( "fmt" "log" "net" + "os" "time" ) +const ( + defHostname = `localhost` + envHostname = `HOSTNAME` +) + +var ( + _hostname string +) + // Haminer define the log consumer and producer. type Haminer struct { cfg *Config @@ -20,6 +30,18 @@ type Haminer struct { isRunning bool } +func initHostname() { + var err error + + _hostname, err = os.Hostname() + if err != nil { + _hostname = os.Getenv(envHostname) + } + if len(_hostname) == 0 { + _hostname = defHostname + } +} + // NewHaminer create, initialize, and return new Haminer instance. If config // parameter is nil, it will use the default options. func NewHaminer(cfg *Config) (h *Haminer) { @@ -33,6 +55,8 @@ func NewHaminer(cfg *Config) (h *Haminer) { ff: make([]Forwarder, 0), } + initHostname() + h.createForwarder() return @@ -40,18 +64,35 @@ func NewHaminer(cfg *Config) (h *Haminer) { func (h *Haminer) createForwarder() { var ( - fwCfg *ConfigForwarder - influxdc *InfluxdClient - fwName string + logp = `createForwarder` + + fwCfg *ConfigForwarder + fwName string + err error ) for fwName, fwCfg = range h.cfg.Forwarders { + var influxc *InfluxdClient + switch fwName { case forwarderInfluxd: - influxdc = NewInfluxdClient(fwCfg) - if influxdc != nil { - h.ff = append(h.ff, influxdc) + influxc = NewInfluxdClient(fwCfg) + if influxc != nil { + h.ff = append(h.ff, influxc) + } + + case forwarderQuestdb: + var questc *questdbClient + + questc, err = newQuestdbClient(fwCfg) + if err != nil { + log.Printf(`%s: %s: %s`, logp, fwName, err) + continue + } + if questc == nil { + continue } + h.ff = append(h.ff, questc) } } } diff --git a/http_log.go b/http_log.go index bbaf225..f0fd7fd 100644 --- a/http_log.go +++ b/http_log.go @@ -6,11 +6,44 @@ package haminer import ( "bytes" + "fmt" + "io" "strconv" "strings" "time" ) +const ( + influxdMeasurement = `haproxy` + + influxdTags = `,host=%s` + + `,server=%s` + + `,backend=%s` + + `,frontend=%s` + + `,http_method=%s` + + `,http_url=%s` + + `,http_query=%q` + + `,http_proto=%s` + + `,http_status=%d` + + `,term_state=%s` + + `,client_ip=%s` + + `,client_port=%d` + + influxdFields = `time_req=%d,` + + `time_wait=%d,` + + `time_connect=%d,` + + `time_rsp=%d,` + + `time_all=%d,` + + `conn_active=%d,` + + `conn_frontend=%d,` + + `conn_backend=%d,` + + `conn_server=%d,` + + `conn_retries=%d,` + + `queue_server=%d,` + + `queue_backend=%d,` + + `bytes_read=%d` +) + // HttpLog contains the mapping of haproxy HTTP log format to Go struct. // // Reference: https://cbonte.github.io/haproxy-dconv/1.7/configuration.html#8.2.3 @@ -388,3 +421,66 @@ func (halog *HttpLog) ParseUDPPacket(packet []byte, reqHeaders []string) bool { return halog.Parse(in, reqHeaders) } + +// writeIlp write the HTTP log as Influxdb Line Protocol. +func (httpLog *HttpLog) writeIlp(out io.Writer) (err error) { + var ( + k string + v string + ) + + _, err = out.Write([]byte(influxdMeasurement)) + if err != nil { + return err + } + + _, err = fmt.Fprintf(out, influxdTags, + // tags + _hostname, + httpLog.ServerName, + httpLog.BackendName, + httpLog.FrontendName, + httpLog.HTTPMethod, + httpLog.HTTPURL, + httpLog.HTTPQuery, + httpLog.HTTPProto, + httpLog.HTTPStatus, + httpLog.TermState, + httpLog.ClientIP, + httpLog.ClientPort, + ) + if err != nil { + return err + } + + for k, v = range httpLog.RequestHeaders { + _, err = fmt.Fprintf(out, `,%s=%s`, k, v) + if err != nil { + return err + } + } + + _, err = out.Write([]byte(` `)) + if err != nil { + return err + } + + _, err = fmt.Fprintf(out, influxdFields, + httpLog.TimeReq, httpLog.TimeWait, httpLog.TimeConnect, + httpLog.TimeRsp, httpLog.TimeAll, + httpLog.ConnActive, httpLog.ConnFrontend, httpLog.ConnBackend, + httpLog.ConnServer, httpLog.ConnRetries, + httpLog.QueueServer, httpLog.QueueBackend, + httpLog.BytesRead, + ) + if err != nil { + return err + } + + _, err = fmt.Fprintf(out, " %d\n", httpLog.Timestamp.UnixNano()) + if err != nil { + return err + } + + return nil +} diff --git a/influxd_client.go b/influxd_client.go index 7c8a54a..3991042 100644 --- a/influxd_client.go +++ b/influxd_client.go @@ -10,38 +10,7 @@ import ( ) const ( - envHostname = "HOSTNAME" - defHostname = "localhost" defContentType = "application/octet-stream" - - influxdMeasurement = `haproxy` - - influxdTags = `,host=%s,` + - `server=%s,` + - `backend=%s,` + - `frontend=%s,` + - `http_method=%s,` + - `http_url=%s,` + - `http_query=%q,` + - `http_proto=%s,` + - `http_status=%d,` + - `term_state=%s,` + - `client_ip=%s,` + - `client_port=%d` - - influxdFields = `time_req=%d,` + - `time_wait=%d,` + - `time_connect=%d,` + - `time_rsp=%d,` + - `time_all=%d,` + - `conn_active=%d,` + - `conn_frontend=%d,` + - `conn_backend=%d,` + - `conn_server=%d,` + - `conn_retries=%d,` + - `queue_server=%d,` + - `queue_backend=%d,` + - `bytes_read=%d` ) // InfluxdClient contains HTTP connection for writing logs to Influxd. @@ -92,7 +61,7 @@ func (cl *InfluxdClient) initConn() { // Influxd. func (cl *InfluxdClient) Forwards(halogs []*HttpLog) { var ( - logp = `Forwards` + logp = `influxdClient: Forwards` httpReq *http.Request httpRes *http.Response diff --git a/questdb_client.go b/questdb_client.go new file mode 100644 index 0000000..acfc5bc --- /dev/null +++ b/questdb_client.go @@ -0,0 +1,101 @@ +package haminer + +import ( + "bytes" + "fmt" + "log" + "net" + "net/url" + "time" + + libnet "github.com/shuLhan/share/lib/net" +) + +const ( + defQuestdbPort = 9009 +) + +// questdbClient client for questdb. +type questdbClient struct { + buf bytes.Buffer + conn net.Conn +} + +// newQuestdbClient create and initialize client connection using the Url in +// the ConfigForwarder. +func newQuestdbClient(cfg *ConfigForwarder) (questc *questdbClient, err error) { + if cfg == nil || len(cfg.Url) == 0 { + return nil, nil + } + + var ( + logp = `newQuestdbClient` + timeout = 10 * time.Second + + surl *url.URL + address string + ip net.IP + port uint16 + ) + + surl, err = url.Parse(cfg.Url) + if err != nil { + return nil, fmt.Errorf(`%s: %w`, logp, err) + } + + if len(surl.Scheme) == 0 { + surl.Scheme = "udp" + } + + address, ip, port = libnet.ParseIPPort(surl.Host, defQuestdbPort) + if len(address) == 0 { + address = fmt.Sprintf(`%s:%d`, ip, port) + } else { + address = fmt.Sprintf(`%s:%d`, address, port) + } + + questc = &questdbClient{} + + questc.conn, err = net.DialTimeout(surl.Scheme, address, timeout) + if err != nil { + return nil, fmt.Errorf(`%s: %w`, logp, err) + } + + return questc, nil +} + +// Forwards implement the Forwarder interface. +// It will write all logs to questdb. +func (questc *questdbClient) Forwards(logs []*HttpLog) { + var ( + logp = `questdbClient: Forwards` + now = time.Now() + + httpLog *HttpLog + data []byte + err error + ) + + questc.buf.Reset() + + for _, httpLog = range logs { + err = httpLog.writeIlp(&questc.buf) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + } + + data = questc.buf.Bytes() + + err = questc.conn.SetWriteDeadline(now.Add(5 * time.Second)) + if err != nil { + log.Printf(`%s: SetWriteDeadline: %s`, logp, err) + return + } + + _, err = questc.conn.Write(data) + if err != nil { + log.Printf(`%s: Write: %s`, logp, err) + } +} -- cgit v1.3