aboutsummaryrefslogtreecommitdiff
path: root/questdb_client.go
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2022-08-17 17:53:28 +0700
committerShulhan <ms@kilabit.info>2022-08-17 17:55:09 +0700
commit364609ad3b03b97c5e25f4e8555b3a0267b548f2 (patch)
tree542b24c1ed48a92f5c182f674cf672a54c57bb16 /questdb_client.go
parenta3ea0c7bda6cae7a88af7dd005cd52ac40d42e57 (diff)
downloadhaminer-364609ad3b03b97c5e25f4e8555b3a0267b548f2.tar.xz
all: implement forwarder for questdb
Questdb [1] is one the time-series database. We experiment to forward the HTTP log using Influx Line Protocol (ILP). [1]: https://questdb.io/
Diffstat (limited to 'questdb_client.go')
-rw-r--r--questdb_client.go101
1 files changed, 101 insertions, 0 deletions
diff --git a/questdb_client.go b/questdb_client.go
new file mode 100644
index 0000000..acfc5bc
--- /dev/null
+++ b/questdb_client.go
@@ -0,0 +1,101 @@
+package haminer
+
+import (
+ "bytes"
+ "fmt"
+ "log"
+ "net"
+ "net/url"
+ "time"
+
+ libnet "github.com/shuLhan/share/lib/net"
+)
+
+const (
+ defQuestdbPort = 9009
+)
+
+// questdbClient client for questdb.
+type questdbClient struct {
+ buf bytes.Buffer
+ conn net.Conn
+}
+
+// newQuestdbClient create and initialize client connection using the Url in
+// the ConfigForwarder.
+func newQuestdbClient(cfg *ConfigForwarder) (questc *questdbClient, err error) {
+ if cfg == nil || len(cfg.Url) == 0 {
+ return nil, nil
+ }
+
+ var (
+ logp = `newQuestdbClient`
+ timeout = 10 * time.Second
+
+ surl *url.URL
+ address string
+ ip net.IP
+ port uint16
+ )
+
+ surl, err = url.Parse(cfg.Url)
+ if err != nil {
+ return nil, fmt.Errorf(`%s: %w`, logp, err)
+ }
+
+ if len(surl.Scheme) == 0 {
+ surl.Scheme = "udp"
+ }
+
+ address, ip, port = libnet.ParseIPPort(surl.Host, defQuestdbPort)
+ if len(address) == 0 {
+ address = fmt.Sprintf(`%s:%d`, ip, port)
+ } else {
+ address = fmt.Sprintf(`%s:%d`, address, port)
+ }
+
+ questc = &questdbClient{}
+
+ questc.conn, err = net.DialTimeout(surl.Scheme, address, timeout)
+ if err != nil {
+ return nil, fmt.Errorf(`%s: %w`, logp, err)
+ }
+
+ return questc, nil
+}
+
+// Forwards implement the Forwarder interface.
+// It will write all logs to questdb.
+func (questc *questdbClient) Forwards(logs []*HttpLog) {
+ var (
+ logp = `questdbClient: Forwards`
+ now = time.Now()
+
+ httpLog *HttpLog
+ data []byte
+ err error
+ )
+
+ questc.buf.Reset()
+
+ for _, httpLog = range logs {
+ err = httpLog.writeIlp(&questc.buf)
+ if err != nil {
+ log.Printf(`%s: %s`, logp, err)
+ return
+ }
+ }
+
+ data = questc.buf.Bytes()
+
+ err = questc.conn.SetWriteDeadline(now.Add(5 * time.Second))
+ if err != nil {
+ log.Printf(`%s: SetWriteDeadline: %s`, logp, err)
+ return
+ }
+
+ _, err = questc.conn.Write(data)
+ if err != nil {
+ log.Printf(`%s: Write: %s`, logp, err)
+ }
+}