diff options
| author | Shulhan <ms@kilabit.info> | 2022-08-17 17:53:28 +0700 |
|---|---|---|
| committer | Shulhan <ms@kilabit.info> | 2022-08-17 17:55:09 +0700 |
| commit | 364609ad3b03b97c5e25f4e8555b3a0267b548f2 (patch) | |
| tree | 542b24c1ed48a92f5c182f674cf672a54c57bb16 /questdb_client.go | |
| parent | a3ea0c7bda6cae7a88af7dd005cd52ac40d42e57 (diff) | |
| download | haminer-364609ad3b03b97c5e25f4e8555b3a0267b548f2.tar.xz | |
all: implement forwarder for questdb
Questdb [1] is one the time-series database.
We experiment to forward the HTTP log using Influx Line Protocol (ILP).
[1]: https://questdb.io/
Diffstat (limited to 'questdb_client.go')
| -rw-r--r-- | questdb_client.go | 101 |
1 files changed, 101 insertions, 0 deletions
diff --git a/questdb_client.go b/questdb_client.go new file mode 100644 index 0000000..acfc5bc --- /dev/null +++ b/questdb_client.go @@ -0,0 +1,101 @@ +package haminer + +import ( + "bytes" + "fmt" + "log" + "net" + "net/url" + "time" + + libnet "github.com/shuLhan/share/lib/net" +) + +const ( + defQuestdbPort = 9009 +) + +// questdbClient client for questdb. +type questdbClient struct { + buf bytes.Buffer + conn net.Conn +} + +// newQuestdbClient create and initialize client connection using the Url in +// the ConfigForwarder. +func newQuestdbClient(cfg *ConfigForwarder) (questc *questdbClient, err error) { + if cfg == nil || len(cfg.Url) == 0 { + return nil, nil + } + + var ( + logp = `newQuestdbClient` + timeout = 10 * time.Second + + surl *url.URL + address string + ip net.IP + port uint16 + ) + + surl, err = url.Parse(cfg.Url) + if err != nil { + return nil, fmt.Errorf(`%s: %w`, logp, err) + } + + if len(surl.Scheme) == 0 { + surl.Scheme = "udp" + } + + address, ip, port = libnet.ParseIPPort(surl.Host, defQuestdbPort) + if len(address) == 0 { + address = fmt.Sprintf(`%s:%d`, ip, port) + } else { + address = fmt.Sprintf(`%s:%d`, address, port) + } + + questc = &questdbClient{} + + questc.conn, err = net.DialTimeout(surl.Scheme, address, timeout) + if err != nil { + return nil, fmt.Errorf(`%s: %w`, logp, err) + } + + return questc, nil +} + +// Forwards implement the Forwarder interface. +// It will write all logs to questdb. +func (questc *questdbClient) Forwards(logs []*HttpLog) { + var ( + logp = `questdbClient: Forwards` + now = time.Now() + + httpLog *HttpLog + data []byte + err error + ) + + questc.buf.Reset() + + for _, httpLog = range logs { + err = httpLog.writeIlp(&questc.buf) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + } + + data = questc.buf.Bytes() + + err = questc.conn.SetWriteDeadline(now.Add(5 * time.Second)) + if err != nil { + log.Printf(`%s: SetWriteDeadline: %s`, logp, err) + return + } + + _, err = questc.conn.Write(data) + if err != nil { + log.Printf(`%s: Write: %s`, logp, err) + } +} |
