aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2022-08-17 17:53:28 +0700
committerShulhan <ms@kilabit.info>2022-08-17 17:55:09 +0700
commit364609ad3b03b97c5e25f4e8555b3a0267b548f2 (patch)
tree542b24c1ed48a92f5c182f674cf672a54c57bb16
parenta3ea0c7bda6cae7a88af7dd005cd52ac40d42e57 (diff)
downloadhaminer-364609ad3b03b97c5e25f4e8555b3a0267b548f2.tar.xz
all: implement forwarder for questdb
Questdb [1] is one the time-series database. We experiment to forward the HTTP log using Influx Line Protocol (ILP). [1]: https://questdb.io/
-rw-r--r--cmd/haminer/haminer.conf19
-rw-r--r--config.go11
-rw-r--r--config_forwarder.go23
-rw-r--r--go.mod2
-rw-r--r--go.sum2
-rw-r--r--haminer.go53
-rw-r--r--http_log.go96
-rw-r--r--influxd_client.go33
-rw-r--r--questdb_client.go101
9 files changed, 290 insertions, 50 deletions
diff --git a/cmd/haminer/haminer.conf b/cmd/haminer/haminer.conf
index e355e44..c20f135 100644
--- a/cmd/haminer/haminer.conf
+++ b/cmd/haminer/haminer.conf
@@ -135,3 +135,22 @@
## Authorization for v2.
#token =
+
+## The questdb forwarder define configuration to forward the log to questdb
+## instance.
+## The log is forwarded using Influxb Line Protocol (ILP) [1]
+##
+## [1]: https://questdb.io/docs/reference/api/ilp/overview
+
+[forwarder "questdb"]
+
+## The URL of questdb server in the following format,
+##
+## $scheme "://" $ip_address [":" $port]
+##
+## The $scheme can be "udp" (default) or "tcp".
+## The $ip_address is required, define the address where questdb running.
+## The $port number is optional default to 9009.
+##
+## An empty url means the forwarder is disabled.
+url =
diff --git a/config.go b/config.go
index 3635638..45bd023 100644
--- a/config.go
+++ b/config.go
@@ -67,8 +67,9 @@ func (cfg *Config) Load(path string) (err error) {
var (
logp = `Load`
- in *ini.Ini
- fw *ConfigForwarder
+ in *ini.Ini
+ fwCfg *ConfigForwarder
+ fwName string
)
in, err = ini.Open(path)
@@ -90,10 +91,10 @@ func (cfg *Config) Load(path string) (err error) {
return fmt.Errorf(`%s: %w`, logp, err)
}
- for _, fw = range cfg.Forwarders {
- err = fw.init()
+ for fwName, fwCfg = range cfg.Forwarders {
+ err = fwCfg.init(fwName)
if err != nil {
- return fmt.Errorf(`%s: %w`, logp, err)
+ return fmt.Errorf(`%s: %s: %w`, logp, fwName, err)
}
}
diff --git a/config_forwarder.go b/config_forwarder.go
index 408d1bd..c7acc85 100644
--- a/config_forwarder.go
+++ b/config_forwarder.go
@@ -12,6 +12,7 @@ const (
influxdVersion2 = `v2`
forwarderInfluxd = `influxd`
+ forwarderQuestdb = `questdb`
)
// ConfigForwarder contains configuration for forwarding the logs.
@@ -36,11 +37,19 @@ type ConfigForwarder struct {
}
// init check, validate, and initialize the configuration values.
-func (cfg *ConfigForwarder) init() (err error) {
+func (cfg *ConfigForwarder) init(fwName string) (err error) {
if len(cfg.Url) == 0 {
return
}
+ if fwName == forwarderInfluxd {
+ return cfg.initInfluxd()
+ }
+
+ return nil
+}
+
+func (cfg *ConfigForwarder) initInfluxd() (err error) {
switch cfg.Version {
case influxdVersion1:
case influxdVersion2:
@@ -55,10 +64,10 @@ func (cfg *ConfigForwarder) init() (err error) {
var (
q = url.Values{}
- url *url.URL
+ surl *url.URL
)
- url, err = url.Parse(cfg.Url)
+ surl, err = url.Parse(cfg.Url)
if err != nil {
return err
}
@@ -66,7 +75,7 @@ func (cfg *ConfigForwarder) init() (err error) {
q.Set(`precision`, `ns`)
if cfg.Version == influxdVersion1 {
- url.Path = `/write`
+ surl.Path = `/write`
q.Set(`db`, cfg.Bucket)
if len(cfg.User) > 0 && len(cfg.Pass) > 0 {
@@ -75,7 +84,7 @@ func (cfg *ConfigForwarder) init() (err error) {
}
} else {
cfg.headerToken = `Token ` + cfg.Token
- url.Path = `/api/v2/write`
+ surl.Path = `/api/v2/write`
if len(cfg.Org) == 0 {
return errors.New(`empty organization field`)
@@ -85,9 +94,9 @@ func (cfg *ConfigForwarder) init() (err error) {
q.Set(`bucket`, cfg.Bucket)
}
- url.RawQuery = q.Encode()
+ surl.RawQuery = q.Encode()
- cfg.apiWrite = url.String()
+ cfg.apiWrite = surl.String()
return nil
}
diff --git a/go.mod b/go.mod
index 4963bb1..a5a675a 100644
--- a/go.mod
+++ b/go.mod
@@ -3,3 +3,5 @@ module git.sr.ht/~shulhan/haminer
go 1.18
require github.com/shuLhan/share v0.40.0
+
+require golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
diff --git a/go.sum b/go.sum
index 27da647..ebbff64 100644
--- a/go.sum
+++ b/go.sum
@@ -1,2 +1,4 @@
github.com/shuLhan/share v0.40.0 h1:C0c1lfKLzogUStIiYJecoTTP9TrEDMz64la1Y1l8Wl0=
github.com/shuLhan/share v0.40.0/go.mod h1:hb3Kis5s4jPume4YD15JELE67naFybtuALshhh9TlOg=
+golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
+golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
diff --git a/haminer.go b/haminer.go
index 9b1e080..07bbe91 100644
--- a/haminer.go
+++ b/haminer.go
@@ -8,9 +8,19 @@ import (
"fmt"
"log"
"net"
+ "os"
"time"
)
+const (
+ defHostname = `localhost`
+ envHostname = `HOSTNAME`
+)
+
+var (
+ _hostname string
+)
+
// Haminer define the log consumer and producer.
type Haminer struct {
cfg *Config
@@ -20,6 +30,18 @@ type Haminer struct {
isRunning bool
}
+func initHostname() {
+ var err error
+
+ _hostname, err = os.Hostname()
+ if err != nil {
+ _hostname = os.Getenv(envHostname)
+ }
+ if len(_hostname) == 0 {
+ _hostname = defHostname
+ }
+}
+
// NewHaminer create, initialize, and return new Haminer instance. If config
// parameter is nil, it will use the default options.
func NewHaminer(cfg *Config) (h *Haminer) {
@@ -33,6 +55,8 @@ func NewHaminer(cfg *Config) (h *Haminer) {
ff: make([]Forwarder, 0),
}
+ initHostname()
+
h.createForwarder()
return
@@ -40,18 +64,35 @@ func NewHaminer(cfg *Config) (h *Haminer) {
func (h *Haminer) createForwarder() {
var (
- fwCfg *ConfigForwarder
- influxdc *InfluxdClient
- fwName string
+ logp = `createForwarder`
+
+ fwCfg *ConfigForwarder
+ fwName string
+ err error
)
for fwName, fwCfg = range h.cfg.Forwarders {
+ var influxc *InfluxdClient
+
switch fwName {
case forwarderInfluxd:
- influxdc = NewInfluxdClient(fwCfg)
- if influxdc != nil {
- h.ff = append(h.ff, influxdc)
+ influxc = NewInfluxdClient(fwCfg)
+ if influxc != nil {
+ h.ff = append(h.ff, influxc)
+ }
+
+ case forwarderQuestdb:
+ var questc *questdbClient
+
+ questc, err = newQuestdbClient(fwCfg)
+ if err != nil {
+ log.Printf(`%s: %s: %s`, logp, fwName, err)
+ continue
+ }
+ if questc == nil {
+ continue
}
+ h.ff = append(h.ff, questc)
}
}
}
diff --git a/http_log.go b/http_log.go
index bbaf225..f0fd7fd 100644
--- a/http_log.go
+++ b/http_log.go
@@ -6,11 +6,44 @@ package haminer
import (
"bytes"
+ "fmt"
+ "io"
"strconv"
"strings"
"time"
)
+const (
+ influxdMeasurement = `haproxy`
+
+ influxdTags = `,host=%s` +
+ `,server=%s` +
+ `,backend=%s` +
+ `,frontend=%s` +
+ `,http_method=%s` +
+ `,http_url=%s` +
+ `,http_query=%q` +
+ `,http_proto=%s` +
+ `,http_status=%d` +
+ `,term_state=%s` +
+ `,client_ip=%s` +
+ `,client_port=%d`
+
+ influxdFields = `time_req=%d,` +
+ `time_wait=%d,` +
+ `time_connect=%d,` +
+ `time_rsp=%d,` +
+ `time_all=%d,` +
+ `conn_active=%d,` +
+ `conn_frontend=%d,` +
+ `conn_backend=%d,` +
+ `conn_server=%d,` +
+ `conn_retries=%d,` +
+ `queue_server=%d,` +
+ `queue_backend=%d,` +
+ `bytes_read=%d`
+)
+
// HttpLog contains the mapping of haproxy HTTP log format to Go struct.
//
// Reference: https://cbonte.github.io/haproxy-dconv/1.7/configuration.html#8.2.3
@@ -388,3 +421,66 @@ func (halog *HttpLog) ParseUDPPacket(packet []byte, reqHeaders []string) bool {
return halog.Parse(in, reqHeaders)
}
+
+// writeIlp write the HTTP log as Influxdb Line Protocol.
+func (httpLog *HttpLog) writeIlp(out io.Writer) (err error) {
+ var (
+ k string
+ v string
+ )
+
+ _, err = out.Write([]byte(influxdMeasurement))
+ if err != nil {
+ return err
+ }
+
+ _, err = fmt.Fprintf(out, influxdTags,
+ // tags
+ _hostname,
+ httpLog.ServerName,
+ httpLog.BackendName,
+ httpLog.FrontendName,
+ httpLog.HTTPMethod,
+ httpLog.HTTPURL,
+ httpLog.HTTPQuery,
+ httpLog.HTTPProto,
+ httpLog.HTTPStatus,
+ httpLog.TermState,
+ httpLog.ClientIP,
+ httpLog.ClientPort,
+ )
+ if err != nil {
+ return err
+ }
+
+ for k, v = range httpLog.RequestHeaders {
+ _, err = fmt.Fprintf(out, `,%s=%s`, k, v)
+ if err != nil {
+ return err
+ }
+ }
+
+ _, err = out.Write([]byte(` `))
+ if err != nil {
+ return err
+ }
+
+ _, err = fmt.Fprintf(out, influxdFields,
+ httpLog.TimeReq, httpLog.TimeWait, httpLog.TimeConnect,
+ httpLog.TimeRsp, httpLog.TimeAll,
+ httpLog.ConnActive, httpLog.ConnFrontend, httpLog.ConnBackend,
+ httpLog.ConnServer, httpLog.ConnRetries,
+ httpLog.QueueServer, httpLog.QueueBackend,
+ httpLog.BytesRead,
+ )
+ if err != nil {
+ return err
+ }
+
+ _, err = fmt.Fprintf(out, " %d\n", httpLog.Timestamp.UnixNano())
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/influxd_client.go b/influxd_client.go
index 7c8a54a..3991042 100644
--- a/influxd_client.go
+++ b/influxd_client.go
@@ -10,38 +10,7 @@ import (
)
const (
- envHostname = "HOSTNAME"
- defHostname = "localhost"
defContentType = "application/octet-stream"
-
- influxdMeasurement = `haproxy`
-
- influxdTags = `,host=%s,` +
- `server=%s,` +
- `backend=%s,` +
- `frontend=%s,` +
- `http_method=%s,` +
- `http_url=%s,` +
- `http_query=%q,` +
- `http_proto=%s,` +
- `http_status=%d,` +
- `term_state=%s,` +
- `client_ip=%s,` +
- `client_port=%d`
-
- influxdFields = `time_req=%d,` +
- `time_wait=%d,` +
- `time_connect=%d,` +
- `time_rsp=%d,` +
- `time_all=%d,` +
- `conn_active=%d,` +
- `conn_frontend=%d,` +
- `conn_backend=%d,` +
- `conn_server=%d,` +
- `conn_retries=%d,` +
- `queue_server=%d,` +
- `queue_backend=%d,` +
- `bytes_read=%d`
)
// InfluxdClient contains HTTP connection for writing logs to Influxd.
@@ -92,7 +61,7 @@ func (cl *InfluxdClient) initConn() {
// Influxd.
func (cl *InfluxdClient) Forwards(halogs []*HttpLog) {
var (
- logp = `Forwards`
+ logp = `influxdClient: Forwards`
httpReq *http.Request
httpRes *http.Response
diff --git a/questdb_client.go b/questdb_client.go
new file mode 100644
index 0000000..acfc5bc
--- /dev/null
+++ b/questdb_client.go
@@ -0,0 +1,101 @@
+package haminer
+
+import (
+ "bytes"
+ "fmt"
+ "log"
+ "net"
+ "net/url"
+ "time"
+
+ libnet "github.com/shuLhan/share/lib/net"
+)
+
+const (
+ defQuestdbPort = 9009
+)
+
+// questdbClient client for questdb.
+type questdbClient struct {
+ buf bytes.Buffer
+ conn net.Conn
+}
+
+// newQuestdbClient create and initialize client connection using the Url in
+// the ConfigForwarder.
+func newQuestdbClient(cfg *ConfigForwarder) (questc *questdbClient, err error) {
+ if cfg == nil || len(cfg.Url) == 0 {
+ return nil, nil
+ }
+
+ var (
+ logp = `newQuestdbClient`
+ timeout = 10 * time.Second
+
+ surl *url.URL
+ address string
+ ip net.IP
+ port uint16
+ )
+
+ surl, err = url.Parse(cfg.Url)
+ if err != nil {
+ return nil, fmt.Errorf(`%s: %w`, logp, err)
+ }
+
+ if len(surl.Scheme) == 0 {
+ surl.Scheme = "udp"
+ }
+
+ address, ip, port = libnet.ParseIPPort(surl.Host, defQuestdbPort)
+ if len(address) == 0 {
+ address = fmt.Sprintf(`%s:%d`, ip, port)
+ } else {
+ address = fmt.Sprintf(`%s:%d`, address, port)
+ }
+
+ questc = &questdbClient{}
+
+ questc.conn, err = net.DialTimeout(surl.Scheme, address, timeout)
+ if err != nil {
+ return nil, fmt.Errorf(`%s: %w`, logp, err)
+ }
+
+ return questc, nil
+}
+
+// Forwards implement the Forwarder interface.
+// It will write all logs to questdb.
+func (questc *questdbClient) Forwards(logs []*HttpLog) {
+ var (
+ logp = `questdbClient: Forwards`
+ now = time.Now()
+
+ httpLog *HttpLog
+ data []byte
+ err error
+ )
+
+ questc.buf.Reset()
+
+ for _, httpLog = range logs {
+ err = httpLog.writeIlp(&questc.buf)
+ if err != nil {
+ log.Printf(`%s: %s`, logp, err)
+ return
+ }
+ }
+
+ data = questc.buf.Bytes()
+
+ err = questc.conn.SetWriteDeadline(now.Add(5 * time.Second))
+ if err != nil {
+ log.Printf(`%s: SetWriteDeadline: %s`, logp, err)
+ return
+ }
+
+ _, err = questc.conn.Write(data)
+ if err != nil {
+ log.Printf(`%s: Write: %s`, logp, err)
+ }
+}