From 0fdfc19fc5a91671d9d79ec9a1bf0c3707c852d4 Mon Sep 17 00:00:00 2001 From: Shulhan Date: Wed, 15 Mar 2023 23:09:50 +0700 Subject: lib/telemetry: add formatter for Influxdata Line Protocol The IlpFormatter implement a formatter for Influxdata Line Protocol that can be used for prepare raw metric for Influxdata or QuestDB. --- lib/telemetry/ilp_formatter.go | 109 ++++++++++++++++++++++++++++++++++++ lib/telemetry/ilp_formatter_test.go | 77 +++++++++++++++++++++++++ 2 files changed, 186 insertions(+) create mode 100644 lib/telemetry/ilp_formatter.go create mode 100644 lib/telemetry/ilp_formatter_test.go diff --git a/lib/telemetry/ilp_formatter.go b/lib/telemetry/ilp_formatter.go new file mode 100644 index 00000000..3a8f556b --- /dev/null +++ b/lib/telemetry/ilp_formatter.go @@ -0,0 +1,109 @@ +// Copyright 2023, Shulhan . All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +import ( + "bytes" + "fmt" + + libbytes "github.com/shuLhan/share/lib/bytes" +) + +const ( + ilpFormatterName = `ilp` +) + +// IlpFormatter format the Metric using the Influxdata Line Protocol, [ILP]. +// Syntax, +// +// ILP = measurement [METADATA] " " METRIC [" " timestamp] LF +// METADATA = *("," key "=" value) +// METRIC = key "=" value *("," METRIC) +// +// [ILP]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/ +type IlpFormatter struct { + name string + measurement string + mdRaw []byte + mdVersion int // Latest version of metadata. + +} + +// NewIlpFormatter create and initialize new IlpFormatter. +func NewIlpFormatter(measurement string) (ilp *IlpFormatter) { + ilp = &IlpFormatter{ + name: ilpFormatterName, + measurement: measurement, + } + return ilp +} + +// BulkFormat format list of Metric with metadata. +func (ilp *IlpFormatter) BulkFormat(list []Metric, md *Metadata) []byte { + if len(list) == 0 { + return nil + } + + ilp.generateMetadata(md) + + var ( + bb bytes.Buffer + m Metric + x int + ) + + bb.WriteString(ilp.measurement) + bb.Write(ilp.mdRaw) + bb.WriteByte(' ') + + for _, m = range list { + if len(m.Name) == 0 { + continue + } + if x > 0 { + bb.WriteByte(',') + } + fmt.Fprintf(&bb, `%s=%f`, m.Name, m.Value) + x++ + } + + fmt.Fprintf(&bb, " %d\n", m.Timestamp) + + return libbytes.Copy(bb.Bytes()) +} + +// Format single Metric. +func (ilp *IlpFormatter) Format(m Metric, md *Metadata) []byte { + if len(m.Name) == 0 { + return nil + } + return ilp.BulkFormat([]Metric{m}, md) +} + +func (ilp *IlpFormatter) generateMetadata(md *Metadata) { + var mdVersion = md.Version() + if ilp.mdVersion == mdVersion { + return + } + + var ( + keys, vals = md.KeysMap() + + bb bytes.Buffer + k string + v string + ) + for _, k = range keys { + v = vals[k] + fmt.Fprintf(&bb, `,%s=%s`, k, v) + } + ilp.mdRaw = libbytes.Copy(bb.Bytes()) + ilp.mdVersion = mdVersion +} + +// Name return the unique name of the formatter, "ilp". +func (ilp *IlpFormatter) Name() string { + return ilp.name +} diff --git a/lib/telemetry/ilp_formatter_test.go b/lib/telemetry/ilp_formatter_test.go new file mode 100644 index 00000000..4aca4b47 --- /dev/null +++ b/lib/telemetry/ilp_formatter_test.go @@ -0,0 +1,77 @@ +// Copyright 2023, Shulhan . All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +import ( + "testing" + + "github.com/shuLhan/share/lib/test" +) + +func TestIlpFormatter_Format(t *testing.T) { + var md = NewMetadata() + md.Set(`host`, `localhost`) + md.Set(`version`, `0.1.0`) + + var ilp = NewIlpFormatter(`myapp`) + + test.Assert(t, `Name`, `ilp`, ilp.Name()) + + var ( + m Metric + got []byte + exp string + ) + + got = ilp.Format(m, md) + test.Assert(t, `Format: empty`, exp, string(got)) + + m = Metric{ + Timestamp: 1000, + Name: `go_gc_total`, + Value: 0.004, + } + got = ilp.Format(m, md) + + exp = `myapp,host=localhost,version=0.1.0 go_gc_total=0.004000 1000` + + test.Assert(t, `Format`, string(exp), string(got)) +} + +func TestIlpFormatter_BulkFormat(t *testing.T) { + var md = NewMetadata() + md.Set(`host`, `localhost`) + md.Set(`version`, `0.1.0`) + + var ilp = NewIlpFormatter(`myapp`) + + test.Assert(t, `Name`, `ilp`, ilp.Name()) + + var ( + list = []Metric{} + got = ilp.BulkFormat(list, md) + + exp string + ) + + test.Assert(t, `BulkFormat: empty`, exp, string(got)) + + list = append(list, Metric{ + Timestamp: 1000, + Name: `go_gc_total`, + Value: 0.004, + }) + list = append(list, Metric{ + Timestamp: 1000, + Name: `go_gc_pause_seconds`, + Value: 0.00001, + }) + + got = ilp.BulkFormat(list, md) + + exp = "myapp,host=localhost,version=0.1.0 go_gc_total=0.004000,go_gc_pause_seconds=0.000010 1000\n" + + test.Assert(t, `BulkFormat`, string(exp), string(got)) +} -- cgit v1.3