diff options
| author | Shulhan <ms@kilabit.info> | 2023-03-12 23:30:22 +0700 |
|---|---|---|
| committer | Shulhan <ms@kilabit.info> | 2023-03-17 21:53:38 +0700 |
| commit | bb4cfe53505d1e7a89ab61524425fe6fdd2ed673 (patch) | |
| tree | b518f1a194d4bb795efad1cc2b96c6fbb2f7702e /lib | |
| parent | e96899d74668cc0d711396482dfefefec10ab55d (diff) | |
| download | pakakeh.go-bb4cfe53505d1e7a89ab61524425fe6fdd2ed673.tar.xz | |
lib/telemetry: package for collecting and forwarding metrics
Package telemetry is a library for collecting various Metric, for example
from standard runtime/metrics, and send or write it to one or more
Forwarder.
Each Forwarder has capability to format the Metric before sending or
writing it using Formatter.
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/telemetry/agent.go | 269 | ||||
| -rw-r--r-- | lib/telemetry/agent_options.go | 57 | ||||
| -rw-r--r-- | lib/telemetry/buffer_forwarder.go | 57 | ||||
| -rw-r--r-- | lib/telemetry/buffer_forwarder_example_test.go | 70 | ||||
| -rw-r--r-- | lib/telemetry/collector.go | 11 | ||||
| -rw-r--r-- | lib/telemetry/context.go | 20 | ||||
| -rw-r--r-- | lib/telemetry/dsv_formatter.go | 105 | ||||
| -rw-r--r-- | lib/telemetry/file_forwarder.go | 48 | ||||
| -rw-r--r-- | lib/telemetry/formatter.go | 18 | ||||
| -rw-r--r-- | lib/telemetry/forwarder.go | 18 | ||||
| -rw-r--r-- | lib/telemetry/go_memstats_collector.go | 149 | ||||
| -rw-r--r-- | lib/telemetry/go_metrics_collector.go | 178 | ||||
| -rw-r--r-- | lib/telemetry/internal/cmd/agent-example/main.go | 77 | ||||
| -rw-r--r-- | lib/telemetry/metadata.go | 93 | ||||
| -rw-r--r-- | lib/telemetry/metadata_example_test.go | 41 | ||||
| -rw-r--r-- | lib/telemetry/metric.go | 12 | ||||
| -rw-r--r-- | lib/telemetry/stdout_forwarder.go | 44 | ||||
| -rw-r--r-- | lib/telemetry/telemetry.go | 10 | ||||
| -rw-r--r-- | lib/telemetry/timestamper.go | 33 |
19 files changed, 1310 insertions, 0 deletions
diff --git a/lib/telemetry/agent.go b/lib/telemetry/agent.go new file mode 100644 index 00000000..26631cb0 --- /dev/null +++ b/lib/telemetry/agent.go @@ -0,0 +1,269 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +import ( + "context" + "fmt" + "log" + "time" +) + +const ( + defQueueSize = 512 +) + +// Agent is the one that responsible to collect and forward the metrics. +type Agent struct { + bulkq chan []Metric + singleq chan Metric + + running chan bool + + opts AgentOptions +} + +// NewAgent create, initalize, and run the new Agent. +// The agent will start auto collecting the metrics in the background every +// [AgentOptions.Interval] and forward it to each [Forwarder]. +func NewAgent(opts AgentOptions) (agent *Agent) { + opts.init() + + agent = &Agent{ + opts: opts, + + singleq: make(chan Metric, defQueueSize), + bulkq: make(chan []Metric, defQueueSize), + running: make(chan bool, 1), + } + + go agent.forwarder() + go agent.collector() + + return agent +} + +// BulkForward push list of Metric asynchronously. +// If ctx contains ContextForwardWait, it will forward the metric +// synchronously. +func (agent *Agent) BulkForward(ctx context.Context, list []Metric) error { + if len(list) == 0 { + return nil + } + var ctxWait = ctx.Value(agentContextForwardWait) + if ctxWait == nil { + agent.bulkq <- list + return nil + } + return agent.forwardBulk(ctx, list) +} + +func (agent *Agent) collect() (all []Metric) { + var ( + ts = agent.opts.Timestamp() + + col Collector + list []Metric + ) + + for _, col = range agent.opts.Collectors { + list = col.Collect(ts) + all = append(all, list...) + } + return all +} + +// collector collect the metrics on each interval and forward it. +func (agent *Agent) collector() { + var ( + ticker = time.NewTicker(agent.opts.Interval) + metrics []Metric + ) + + for { + select { + case <-ticker.C: + metrics = agent.collect() + agent.BulkForward(context.Background(), metrics) + + case <-agent.running: + ticker.Stop() + // ACK the Stop. + agent.running <- false + return + } + } +} + +// Forward single metric to agent asynchronously. +// If ctx contains ContextForwardWait, it will forward the metric +// synchronously. +func (agent *Agent) Forward(ctx context.Context, m Metric) (err error) { + var ctxv = ctx.Value(agentContextForwardWait) + if ctxv == nil { + agent.singleq <- m + return nil + } + return agent.forwardSingle(ctx, &m) +} + +func (agent *Agent) forwardBulk(ctx context.Context, list []Metric) (err error) { + if len(list) == 0 { + return nil + } + + var ( + ts = agent.opts.Timestamp() + x int + ) + for ; x < len(list); x++ { + if list[x].Timestamp <= 0 { + list[x].Timestamp = ts + } + } + + var ( + // Map of Formatter.Name with its format result. + fmtWire = map[string][]byte{} + + fwd Forwarder + fmter Formatter + fmtName string + wire []byte + errfwd error + ok bool + ) + for _, fwd = range agent.opts.Forwarders { + fmter = fwd.Formatter() + if fmter == nil { + continue + } + + fmtName = fmter.Name() + + // Check if we have format the metrics before using the same + // Formatter. + wire, ok = fmtWire[fmtName] + if !ok { + wire = fmter.BulkFormat(list, agent.opts.Metadata) + fmtWire[fmtName] = wire + } + + select { + case <-ctx.Done(): + return err + default: + _, errfwd = fwd.Write(wire) + if errfwd != nil { + if err == nil { + err = fmt.Errorf(`forwardBulk: %s`, errfwd) + } else { + err = fmt.Errorf(`%s: %s`, err, errfwd) + } + } + } + } + return err +} + +func (agent *Agent) forwardSingle(ctx context.Context, m *Metric) (err error) { + if m == nil { + return nil + } + + var ( + // Map of Formatter.Name with its format result. + fmtWire = map[string][]byte{} + + fwd Forwarder + fmter Formatter + fmtName string + wire []byte + errfwd error + ok bool + ) + + if m.Timestamp <= 0 { + m.Timestamp = agent.opts.Timestamp() + } + + for _, fwd = range agent.opts.Forwarders { + fmter = fwd.Formatter() + if fmter == nil { + continue + } + + fmtName = fmter.Name() + + // Check if we have format the metrics before using the same + // Formatter. + wire, ok = fmtWire[fmtName] + if !ok { + wire = fmter.Format(*m, agent.opts.Metadata) + fmtWire[fmtName] = wire + } + + select { + case <-ctx.Done(): + // Request cancelled, timeout, or reach deadlines. + return err + default: + _, errfwd = fwd.Write(wire) + if errfwd != nil { + if err == nil { + err = fmt.Errorf(`forwardSingle: %s`, errfwd) + } else { + err = fmt.Errorf(`%s: %s`, err, errfwd) + } + } + } + } + return err +} + +// forwarder the goroutine that queue and forward single or bulk of Metric. +func (agent *Agent) forwarder() { + var ( + m Metric + list []Metric + err error + ) + + for { + select { + case list = <-agent.bulkq: + err = agent.forwardBulk(context.Background(), list) + if err != nil { + log.Print(err) + } + case m = <-agent.singleq: + err = agent.forwardSingle(context.Background(), &m) + if err != nil { + log.Print(err) + } + case <-agent.running: + // ACK the Stop. + agent.running <- false + return + } + } +} + +// Stop the agent and close all [Forwarder]. +func (agent *Agent) Stop() { + // Stop the the first goroutine. + agent.running <- false + <-agent.running + + // Stop the the second goroutine. + agent.running <- false + <-agent.running + + // Close all forwarders. + var fwd Forwarder + for _, fwd = range agent.opts.Forwarders { + _ = fwd.Close() + } +} diff --git a/lib/telemetry/agent_options.go b/lib/telemetry/agent_options.go new file mode 100644 index 00000000..95e707d4 --- /dev/null +++ b/lib/telemetry/agent_options.go @@ -0,0 +1,57 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +import "time" + +// List of default value or limit for AgentOptions. +const ( + defInterval = 1 * time.Minute + defIntervalMinimum = 10 * time.Second +) + +// AgentOptions contains options to run the Agent. +type AgentOptions struct { + // Name of the agent. + Name string + + // Metadata provides static, additional information to be forwarded + // along with the collected metrics. + Metadata *Metadata + + // Timestamp define the function to be called to set the + // [Metric.Timestamp]. + // Default to NanoTimestamp. + Timestamp Timestamper + + // Collectors contains list of Collector that provide the metrics to + // be forwarded. + // An empty Collectors means no metrics will be collected and + // forwarded. + Collectors []Collector + + // Forwarders contains list of target where collected metrics will be + // forwarded. + Forwarders []Forwarder + + // Interval for collecting metrics. + // Default value is one minutes with the minimium value is 10 seconds. + Interval time.Duration +} + +// init initialize the AgentOptions default values. +func (opts *AgentOptions) init() { + if opts.Metadata == nil { + opts.Metadata = NewMetadata() + } + if opts.Timestamp == nil { + opts.Timestamp = NanoTimestamp() + } + if opts.Interval <= 0 { + opts.Interval = defInterval + } else if opts.Interval < defIntervalMinimum { + opts.Interval = defIntervalMinimum + } +} diff --git a/lib/telemetry/buffer_forwarder.go b/lib/telemetry/buffer_forwarder.go new file mode 100644 index 00000000..e0adca7c --- /dev/null +++ b/lib/telemetry/buffer_forwarder.go @@ -0,0 +1,57 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +import ( + "bytes" + "fmt" + "sync" +) + +// BufferForwarder write the metrics to underlying [bytes.Buffer]. +type BufferForwarder struct { + formatter Formatter + bb bytes.Buffer + sync.Mutex +} + +// NewBufferForwarder create new BufferForwarder using f as Formatter. +func NewBufferForwarder(f Formatter) *BufferForwarder { + return &BufferForwarder{ + formatter: f, + } +} + +// Bytes return the metrics that has been written to Buffer. +// Once this method called the underlying Buffer will be resetted. +func (buf *BufferForwarder) Bytes() (b []byte) { + buf.Lock() + b = buf.bb.Bytes() + buf.bb.Reset() + buf.Unlock() + return b +} + +// Close on Buffer is a no-op. +func (buf *BufferForwarder) Close() error { + return nil +} + +// Formatter return the Formatter used by this BufferForwarder. +func (buf *BufferForwarder) Formatter() Formatter { + return buf.formatter +} + +// Write the raw metrics to Buffer. +func (buf *BufferForwarder) Write(wire []byte) (n int, err error) { + buf.Lock() + defer buf.Unlock() + + n, err = buf.bb.Write(wire) + if err != nil { + return n, fmt.Errorf(`BufferForwarder.Forward: %w`, err) + } + return n, nil +} diff --git a/lib/telemetry/buffer_forwarder_example_test.go b/lib/telemetry/buffer_forwarder_example_test.go new file mode 100644 index 00000000..c7afc11f --- /dev/null +++ b/lib/telemetry/buffer_forwarder_example_test.go @@ -0,0 +1,70 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry_test + +import ( + "context" + "fmt" + "log" + + "github.com/shuLhan/share/lib/telemetry" +) + +func ExampleBufferForwarder() { + // Create the Formatter and Forwarder. + var ( + dsvFmt = telemetry.NewDsvFormatter(';', telemetry.RuntimeMetricsAlias) + bufFwd = telemetry.NewBufferForwarder(dsvFmt) + ) + + // Create metadata. + var md = telemetry.NewMetadata() + md.Set(`name`, `BufferForwarder`) + md.Set(`version`, `0.1.0`) + + // Create the Agent. + var ( + agentOpts = telemetry.AgentOptions{ + Metadata: md, + Forwarders: []telemetry.Forwarder{bufFwd}, + Timestamp: telemetry.DummyTimestamp(), + } + agent = telemetry.NewAgent(agentOpts) + ) + defer agent.Stop() + + // Forward single metric and print the result. + var ( + m = telemetry.Metric{ + Name: `usage`, + Value: 0.5, + } + ctx = telemetry.ContextForwardWait(context.Background()) + + err error + ) + err = agent.Forward(ctx, m) + if err != nil { + log.Fatal(err) + } + + fmt.Printf(`%s`, bufFwd.Bytes()) + + // Forward list of Metric and print the result. + var list = []telemetry.Metric{ + {Name: `usage`, Value: 0.4}, + {Name: `usage`, Value: 0.3}, + } + err = agent.BulkForward(ctx, list) + if err != nil { + log.Fatal(err) + } + + fmt.Printf(`%s`, bufFwd.Bytes()) + // Output: + // 1678606568;"usage";0.500000;"name=BufferForwarder,version=0.1.0" + // 1678606568;"usage";0.400000;"name=BufferForwarder,version=0.1.0" + // 1678606568;"usage";0.300000;"name=BufferForwarder,version=0.1.0" +} diff --git a/lib/telemetry/collector.go b/lib/telemetry/collector.go new file mode 100644 index 00000000..75572f4a --- /dev/null +++ b/lib/telemetry/collector.go @@ -0,0 +1,11 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +// Collector provides an interface to collect the metrics. +type Collector interface { + // Collect the metrics with timestamp. + Collect(timestamp int64) []Metric +} diff --git a/lib/telemetry/context.go b/lib/telemetry/context.go new file mode 100644 index 00000000..41f90a74 --- /dev/null +++ b/lib/telemetry/context.go @@ -0,0 +1,20 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +import "context" + +type agentContext string + +// List of context for agent. +const ( + agentContextForwardWait agentContext = `agent_push_wait` +) + +// ContextForwardWait wait for the [Agent.Forward] or [Agent.BulkForward] to be +// finished. +func ContextForwardWait(ctx context.Context) context.Context { + return context.WithValue(ctx, agentContextForwardWait, struct{}{}) +} diff --git a/lib/telemetry/dsv_formatter.go b/lib/telemetry/dsv_formatter.go new file mode 100644 index 00000000..589206d3 --- /dev/null +++ b/lib/telemetry/dsv_formatter.go @@ -0,0 +1,105 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +import ( + "bytes" + "fmt" + + libbytes "github.com/shuLhan/share/lib/bytes" +) + +// DsvFormatter format the [Metric] in single line where each value is separated +// by single character. +// The metric are formatted in the following order, +// +// Timestamp SEP Name SEP Value *(SEP metadata) +// metadata = Metadata.Key "=" Metadata.Value *("," metadata) +// +// The Name, Value, and metadata are enclosed with double quoted. +type DsvFormatter struct { + metricsAlias map[string]string + name string + mdRaw []byte + sep rune + mdVersion int // Latest version of metadata. +} + +// NewDsvFormatter create new DsvFormatter using sep as separater and options +// to change the metric output name using metricsAlias. +// See [RuntimeMetricsAlias] for example. +func NewDsvFormatter(sep rune, metricsAlias map[string]string) (dsv *DsvFormatter) { + dsv = &DsvFormatter{ + name: `dsv`, + sep: sep, + metricsAlias: metricsAlias, + } + return dsv +} + +// BulkFormat bulk format list of Metric with [Metadata]. +func (dsv *DsvFormatter) BulkFormat(listm []Metric, md *Metadata) []byte { + if len(listm) == 0 { + return nil + } + + dsv.generateMetadata(md) + + var ( + bb bytes.Buffer + m Metric + ) + for _, m = range listm { + dsv.formatMetric(&bb, m) + } + return libbytes.Copy(bb.Bytes()) +} + +// Format single Metric into single line DSV. +func (dsv *DsvFormatter) Format(m Metric, md *Metadata) []byte { + return dsv.BulkFormat([]Metric{m}, md) +} + +func (dsv *DsvFormatter) formatMetric(bb *bytes.Buffer, m Metric) { + if len(m.Name) == 0 { + return + } + + var name = dsv.metricsAlias[m.Name] + if len(name) == 0 { + name = m.Name + } + fmt.Fprintf(bb, "%d%c%q%c%f%c%q\n", m.Timestamp, dsv.sep, name, dsv.sep, m.Value, dsv.sep, dsv.mdRaw) +} + +func (dsv *DsvFormatter) generateMetadata(md *Metadata) { + var mdVersion = md.Version() + if dsv.mdVersion == mdVersion { + return + } + + var ( + keys, vals = md.KeysMap() + + bb bytes.Buffer + k string + v string + x int + ) + for x, k = range keys { + if x > 0 { + bb.WriteByte(',') + } + v = vals[k] + fmt.Fprintf(&bb, `%s=%s`, k, v) + } + dsv.mdRaw = libbytes.Copy(bb.Bytes()) + dsv.mdVersion = mdVersion +} + +// Name return the Name of DsvFormatter as "dsv". +func (dsv *DsvFormatter) Name() string { + return dsv.name +} diff --git a/lib/telemetry/file_forwarder.go b/lib/telemetry/file_forwarder.go new file mode 100644 index 00000000..d3b95d7f --- /dev/null +++ b/lib/telemetry/file_forwarder.go @@ -0,0 +1,48 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +import ( + "fmt" + "io" +) + +// FileForwarder forward the raw metrics into file. +type FileForwarder struct { + fmt Formatter + file io.WriteCloser +} + +// NewFileForwarder create new FileForwarder using fmt as the Formatter. +func NewFileForwarder(fmt Formatter, file io.WriteCloser) (fwd *FileForwarder) { + fwd = &FileForwarder{ + fmt: fmt, + file: file, + } + return fwd +} + +// Close the underlying file. +// Calling Forward after closing Forwarder may cause panic. +func (fwd *FileForwarder) Close() (err error) { + if fwd.file != nil { + err = fwd.file.Close() + } + return err +} + +// Formatter return the Formatter that is used by this FileForwarder. +func (fwd *FileForwarder) Formatter() Formatter { + return fwd.fmt +} + +// Forward write the formatted metrics into file. +func (fwd *FileForwarder) Forward(wire []byte) (err error) { + _, err = fwd.file.Write(wire) + if err != nil { + return fmt.Errorf(`FileForwarder.Forward: %w`, err) + } + return nil +} diff --git a/lib/telemetry/formatter.go b/lib/telemetry/formatter.go new file mode 100644 index 00000000..dc4bc5d0 --- /dev/null +++ b/lib/telemetry/formatter.go @@ -0,0 +1,18 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +// Formatter define the interface that responsible to convert single or bulk +// of Metric into its wire format. +type Formatter interface { + // BulkFormat format list of Metric with metadata for transfer. + BulkFormat(listm []Metric, md *Metadata) []byte + + // Format the Metric m and metadata for transfer. + Format(m Metric, md *Metadata) []byte + + // Name return the name of formatter. + Name() string +} diff --git a/lib/telemetry/forwarder.go b/lib/telemetry/forwarder.go new file mode 100644 index 00000000..c958acc3 --- /dev/null +++ b/lib/telemetry/forwarder.go @@ -0,0 +1,18 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +import "io" + +// Forwarder provide the interface to be implemented by forwarder in order to +// store the collected metrics. +type Forwarder interface { + // Implement the Close and Write from package [io]. + // Calling Forward after Close may cause panic. + io.WriteCloser + + // Formatter return the Formatter being used to format the metrics. + Formatter() Formatter +} diff --git a/lib/telemetry/go_memstats_collector.go b/lib/telemetry/go_memstats_collector.go new file mode 100644 index 00000000..56923064 --- /dev/null +++ b/lib/telemetry/go_memstats_collector.go @@ -0,0 +1,149 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +import ( + "log" + "regexp" + "runtime" + "sort" +) + +// GoMemStatsCollector collect Go statistics about memory allocator, as in +// calling [runtime.ReadMemStats]. +// +// This collector export the following metric names, with value from the field +// of [runtime.MemStats]: +// +// - go_memstats_alloc_bytes: [runtime.MemStats.Alloc] +// - go_memstats_total_alloc_bytes: [runtime.MemStats.TotalAlloc] +// - go_memstats_sys_bytes: [runtime.MemStats.Sys] +// - go_memstats_lookups: [runtime.MemStats.Lookups] +// - go_memstats_mallocs_objects: [runtime.MemStats.Mallocs] +// - go_memstats_frees_objects: [runtime.MemStats.Frees] +// - go_memstats_heap_alloc_bytes: [runtime.MemStats.HeapAlloc] +// - go_memstats_heap_sys_bytes: [runtime.MemStats.HeapSys] +// - go_memstats_heap_idle_bytes: [runtime.MemStats.HeapIdle] +// - go_memstats_heap_inuse_bytes: [runtime.MemStats.HeapInuse] +// - go_memstats_heap_released_bytes: [runtime.MemStats.HeapReleased] +// - go_memstats_heap_objects: [runtime.MemStats.HeapObjects] +// - go_memstats_stack_inuse_bytes: [runtime.MemStats.StackInuse] +// - go_memstats_stack_sys_bytes: [runtime.MemStats.StackSys] +// - go_memstats_mspan_inuse_bytes: [runtime.MemStats.MSpanInuse] +// - go_memstats_mspan_sys_bytes: [runtime.MemStats.MSpanSys] +// - go_memstats_mcache_inuse_bytes: [runtime.MemStats.MCacheInuse] +// - go_memstats_mcache_sys_bytes: [runtime.MemStats.MCacheSys] +// - go_memstats_buck_hash_sys_bytes: [runtime.MemStats.BuckHashSys] +// - go_memstats_gc_sys_bytes: [runtime.MemStats.GCSys] +// - go_memstats_other_sys_bytes: [runtime.MemStats.OtherSys] +// - go_memstats_gc_next_bytes: [runtime.MemStats.NextGC] +// - go_memstats_gc_last: [runtime.MemStats.LastGC] +// - go_memstats_pause_total_ns: [runtime.MemStats.PauseTotalNs] +// - go_memstats_pause_ns: [runtime.MemStats.PauseNs] +// - go_memstats_pause_end_ns: [runtime.MemStats.PauseEnd] +// - go_memstats_gc_num: [runtime.MemStats.NumGC] +// - go_memstats_gc_forced_num: [runtime.MemStats.NumForcedGC] +// - go_memstats_gc_cpu_fraction: [runtime.MemStats.GCCPUFraction] +type GoMemStatsCollector struct { + memstats runtime.MemStats + + // names contains the filtered metric names. + names []string + + // map of metric name with its pointer to its value. + nameValue map[string]any +} + +// NewGoMemStatsCollector create new MemStats collector with options to filter +// the metrics by its name using regular expression. +// +// If filter is nil, none of the metrics will be collected. +func NewGoMemStatsCollector(filter *regexp.Regexp) (col *GoMemStatsCollector) { + col = &GoMemStatsCollector{} + if filter == nil { + return col + } + + col.init(filter) + + return col +} + +func (col *GoMemStatsCollector) init(filter *regexp.Regexp) { + col.nameValue = map[string]any{ + `go_memstats_alloc_bytes`: &col.memstats.Alloc, + `go_memstats_total_alloc_bytes`: &col.memstats.TotalAlloc, + `go_memstats_sys_bytes`: &col.memstats.Sys, + `go_memstats_lookups`: &col.memstats.Lookups, + `go_memstats_mallocs_objects`: &col.memstats.Mallocs, + `go_memstats_frees_objects`: &col.memstats.Frees, + `go_memstats_heap_alloc_bytes`: &col.memstats.HeapAlloc, + `go_memstats_heap_sys_bytes`: &col.memstats.HeapSys, + `go_memstats_heap_idle_bytes`: &col.memstats.HeapIdle, + `go_memstats_heap_inuse_bytes`: &col.memstats.HeapInuse, + `go_memstats_heap_released_bytes`: &col.memstats.HeapReleased, + `go_memstats_heap_objects`: &col.memstats.HeapObjects, + `go_memstats_stack_inuse_bytes`: &col.memstats.StackInuse, + `go_memstats_stack_sys_bytes`: &col.memstats.StackSys, + `go_memstats_mspan_inuse_bytes`: &col.memstats.MSpanInuse, + `go_memstats_mspan_sys_bytes`: &col.memstats.MSpanSys, + `go_memstats_mcache_inuse_bytes`: &col.memstats.MCacheInuse, + `go_memstats_mcache_sys_bytes`: &col.memstats.MCacheSys, + `go_memstats_buck_hash_sys_bytes`: &col.memstats.BuckHashSys, + `go_memstats_gc_sys_bytes`: &col.memstats.GCSys, + `go_memstats_other_sys_bytes`: &col.memstats.OtherSys, + `go_memstats_gc_next_bytes`: &col.memstats.NextGC, + `go_memstats_gc_last`: &col.memstats.LastGC, + `go_memstats_pause_total_ns`: &col.memstats.PauseTotalNs, + `go_memstats_pause_ns`: &col.memstats.PauseNs, + `go_memstats_pause_end_ns`: &col.memstats.PauseEnd, + `go_memstats_gc_num`: &col.memstats.NumGC, + `go_memstats_gc_forced_num`: &col.memstats.NumForcedGC, + `go_memstats_gc_cpu_fraction`: &col.memstats.GCCPUFraction, + } + + var key string + for key = range col.nameValue { + if filter.MatchString(key) { + col.names = append(col.names, key) + } + } + sort.Strings(col.names) +} + +// Collect the Go MemStats. +func (col *GoMemStatsCollector) Collect(ts int64) (list []Metric) { + if len(col.names) == 0 { + return nil + } + + runtime.ReadMemStats(&col.memstats) + + var name string + for _, name = range col.names { + var m = Metric{ + Timestamp: ts, + Name: name, + } + + var val = col.nameValue[name] + + switch v := val.(type) { + case *uint64: + m.Value = float64(*v) + case *uint32: + m.Value = float64(*v) + case *float64: + m.Value = *v + case *[256]uint64: + var last = v[(col.memstats.NumGC+255)%256] + m.Value = float64(last) + default: + log.Printf(`GoMemStatsCollector.Collect: unknown type: %T %v`, v, v) + } + list = append(list, m) + } + return list +} diff --git a/lib/telemetry/go_metrics_collector.go b/lib/telemetry/go_metrics_collector.go new file mode 100644 index 00000000..f0f0dcd6 --- /dev/null +++ b/lib/telemetry/go_metrics_collector.go @@ -0,0 +1,178 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +import ( + "regexp" + "runtime/metrics" +) + +// RuntimeMetricsAlias define an alias for [runtime/metrics.Name] to be +// exported by GoMetricsCollector. +var RuntimeMetricsAlias = map[string]string{ + `/cgo/go-to-c-calls:calls`: `go_cgo_calls`, + + `/cpu/classes/gc/mark/assist:cpu-seconds`: `go_cpu_gc_mark_assist_seconds`, + `/cpu/classes/gc/mark/dedicated:cpu-seconds`: `go_cpu_gc_mark_dedicated_seconds`, + `/cpu/classes/gc/mark/idle:cpu-seconds`: `go_cpu_gc_mark_idle_seconds`, + `/cpu/classes/gc/pause:cpu-seconds`: `go_cpu_gc_pause_seconds`, + `/cpu/classes/gc/total:cpu-seconds`: `go_cpu_gc_total_seconds`, + + `/cpu/classes/idle:cpu-seconds`: `go_cpu_idle_seconds`, + `/cpu/classes/scavenge/assist:cpu-seconds`: `go_cpu_scavenge_assist_seconds`, + `/cpu/classes/scavenge/background:cpu-seconds`: `go_cpu_scavenge_background_seconds`, + `/cpu/classes/scavenge/total:cpu-seconds`: `go_cpu_scavenge_total_seconds`, + `/cpu/classes/total:cpu-seconds`: `go_cpu_total_seconds`, + `/cpu/classes/user:cpu-seconds`: `go_cpu_user_seconds`, + + `/gc/cycles/automatic:gc-cycles`: `go_gc_cycles_automatic`, + `/gc/cycles/forced:gc-cycles`: `go_gc_cycles_forced`, + `/gc/cycles/total:gc-cycles`: `go_gc_cycles_total`, + + `/gc/heap/allocs-by-size:bytes`: `go_gc_heap_alloc_by_size_bytes`, + `/gc/heap/allocs:bytes`: `go_gc_heap_allocs_bytes`, + `/gc/heap/allocs:objects`: `go_gc_heap_allocs_objects`, + `/gc/heap/frees-by-size:bytes`: `go_gc_heap_frees_by_size_bytes`, + `/gc/heap/frees:bytes`: `go_gc_heap_frees_bytes`, + `/gc/heap/frees:objects`: `go_gc_heap_frees_objects`, + `/gc/heap/goal:bytes`: `go_gc_heap_goal_bytes`, + `/gc/heap/objects:objects`: `go_gc_heap_objects`, + `/gc/heap/tiny/allocs:objects`: `go_gc_heap_tiny_allocs_objects`, + + `/gc/limiter/last-enabled:gc-cycle`: `go_gc_limiter_last_enabled`, + + `/gc/pauses:seconds`: `go_gc_pauses_seconds`, + + `/gc/stack/starting-size:bytes`: `go_gc_stack_starting_size_bytes`, + + `/godebug/non-default-behavior/execerrdot:events`: `go_godebug_execerrdot_events`, + `/godebug/non-default-behavior/http2client:events`: `go_godebug_http2client_events`, + `/godebug/non-default-behavior/http2server:events`: `go_godebug_http2server_events`, + `/godebug/non-default-behavior/installgoroot:events`: `go_godebug_installgoroot_events`, + `/godebug/non-default-behavior/panicnil:events`: `go_godebug_panicnil_events`, + `/godebug/non-default-behavior/randautoseed:events`: `go_godebug_randautoseed_events`, + `/godebug/non-default-behavior/tarinsecurepath:events`: `go_godebug_trainsecurepath_events`, + `/godebug/non-default-behavior/x509sha1:events`: `go_godebug_x509sha1_events`, + `/godebug/non-default-behavior/x509usefallbackroots:events`: `go_godebug_x509usefallbackroots_events`, + `/godebug/non-default-behavior/zipinsecurepath:events`: `go_godebug_zipinsecurepath_events`, + + `/memory/classes/heap/free:bytes`: `go_memory_heap_free_bytes`, + `/memory/classes/heap/objects:bytes`: `go_memory_heap_objects_bytes`, + `/memory/classes/heap/released:bytes`: `go_memory_heap_released_bytes`, + `/memory/classes/heap/stacks:bytes`: `go_memory_heap_stacks_bytes`, + `/memory/classes/heap/unused:bytes`: `go_memory_heap_unused_bytes`, + + `/memory/classes/metadata/mcache/free:bytes`: `go_memory_metadata_mcache_free_bytes`, + `/memory/classes/metadata/mcache/inuse:bytes`: `go_memory_metadata_mcache_inuse_bytes`, + `/memory/classes/metadata/mspan/free:bytes`: `go_memory_metadata_mspan_free_bytes`, + `/memory/classes/metadata/mspan/inuse:bytes`: `go_memory_metadata_mspan_inuse_bytes`, + `/memory/classes/metadata/other:bytes`: `go_memory_metadata_other_bytes`, + + `/memory/classes/os-stacks:bytes`: `go_memory_os_stacks_bytes`, + `/memory/classes/other:bytes`: `go_memory_other_bytes`, + `/memory/classes/profiling/buckets:bytes`: `go_memory_profiling_buckets_bytes`, + `/memory/classes/total:bytes`: `go_memory_total_bytes`, + + `/sched/gomaxprocs:threads`: `go_sched_gomaxprocs`, + `/sched/goroutines:goroutines`: `go_sched_goroutines`, + `/sched/latencies:seconds`: `go_sched_latencies_seconds`, + + `/sync/mutex/wait/total:seconds`: `go_sync_mutex_wait_total_seconds`, +} + +// GoMetricsCollector collect the metrics using [runtime/metrics.Read]. +type GoMetricsCollector struct { + // samples list of [runtime/metrics.Sample] that has been + // filtered using [AgentOptions.RuntimeMetrics]. + samples []metrics.Sample +} + +// NewGoMetricsCollector create new collector for [runtime/metrics] with +// options to filter specific metric by alias name using regular expression. +// +// For example, to collect all metrics pass regex `^.*$`, to collect memory +// only pass `^go_memory_.*$`. +// A nil filter means no metrics will be collected. +func NewGoMetricsCollector(filter *regexp.Regexp) (col *GoMetricsCollector) { + col = &GoMetricsCollector{} + if filter == nil { + // Nothing to collect. + return col + } + + var ( + org string + alias string + ) + for org, alias = range RuntimeMetricsAlias { + if filter.MatchString(alias) { + var sample = metrics.Sample{ + Name: org, + } + col.samples = append(col.samples, sample) + } + } + return col +} + +// Collect the [runtime/metrics]. +func (col *GoMetricsCollector) Collect(timestamp int64) (list []Metric) { + if len(col.samples) == 0 { + return nil + } + + metrics.Read(col.samples) + + var sample metrics.Sample + + list = make([]Metric, 0, len(col.samples)) + + for _, sample = range col.samples { + var m = Metric{ + Timestamp: timestamp, + Name: RuntimeMetricsAlias[sample.Name], + } + + switch sample.Value.Kind() { + case metrics.KindUint64: + m.Value = float64(sample.Value.Uint64()) + case metrics.KindFloat64: + m.Value = sample.Value.Float64() + case metrics.KindFloat64Histogram: + var hist = sample.Value.Float64Histogram() + m.Value = medianBucket(hist) + } + list = append(list, m) + } + return list +} + +// medianBucket get the median of the histogram values. +func medianBucket(hist *metrics.Float64Histogram) float64 { + var ( + total uint64 + count uint64 + ) + for _, count = range hist.Counts { + total += count + } + if total == 0 { + return 0 + } + + var ( + thresh = total / 2 + x int + ) + + total = 0 + for x, count = range hist.Counts { + total += count + if total >= thresh { + return hist.Buckets[x] + } + } + return 0 +} diff --git a/lib/telemetry/internal/cmd/agent-example/main.go b/lib/telemetry/internal/cmd/agent-example/main.go new file mode 100644 index 00000000..5b561ff9 --- /dev/null +++ b/lib/telemetry/internal/cmd/agent-example/main.go @@ -0,0 +1,77 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Program agent-example provide an example of how to create agent. +package main + +import ( + "log" + "os" + "os/signal" + "regexp" + "syscall" + "time" + + "github.com/shuLhan/share/lib/telemetry" +) + +func main() { + var ( + col = createGoMemStatsCollector() + ilpFmt = telemetry.NewIlpFormatter(`rescached`) + stdoutFwd = telemetry.NewStdoutForwarder(ilpFmt) + ) + + // Create metadata. + var md = telemetry.NewMetadata() + md.Set(`name`, `agent-example`) + md.Set(`version`, `0.1.0`) + + // Create the Agent. + var ( + agentOpts = telemetry.AgentOptions{ + Metadata: md, + Timestamp: telemetry.NanoTimestamp(), + Forwarders: []telemetry.Forwarder{ + stdoutFwd, + }, + Collectors: []telemetry.Collector{ + col, + }, + Interval: 10 * time.Second, + } + agent = telemetry.NewAgent(agentOpts) + ) + defer agent.Stop() + + var qsignal = make(chan os.Signal, 1) + signal.Notify(qsignal, syscall.SIGQUIT, syscall.SIGSEGV, syscall.SIGTERM, syscall.SIGINT) + <-qsignal +} + +func createGoMetricsCollector() (col *telemetry.GoMetricsCollector) { + var ( + metricsFilter *regexp.Regexp + err error + ) + metricsFilter, err = regexp.Compile(`^go_(cpu|gc|memory|sched)_.*$`) + if err != nil { + log.Fatal(err) + } + col = telemetry.NewGoMetricsCollector(metricsFilter) + return col +} + +func createGoMemStatsCollector() (col *telemetry.GoMemStatsCollector) { + var ( + metricsFilter *regexp.Regexp + err error + ) + metricsFilter, err = regexp.Compile(`^.*$`) + if err != nil { + log.Fatal(err) + } + col = telemetry.NewGoMemStatsCollector(metricsFilter) + return col +} diff --git a/lib/telemetry/metadata.go b/lib/telemetry/metadata.go new file mode 100644 index 00000000..75742c77 --- /dev/null +++ b/lib/telemetry/metadata.go @@ -0,0 +1,93 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +import ( + "sort" + "strings" +) + +// Metadata provides versioned Map with stable key order. +type Metadata struct { + vals map[string]string + version int +} + +// NewMetadata create and initialize new metadata. +func NewMetadata() (md *Metadata) { + md = &Metadata{ + vals: map[string]string{}, + } + return md +} + +// Delete Metadata by its key. +// The versioning will be increased only if the key exist. +func (md *Metadata) Delete(key string) { + var ok bool + _, ok = md.vals[key] + if ok { + delete(md.vals, key) + md.version++ + } +} + +// Get the Metadata value by its key. +func (md *Metadata) Get(key string) string { + return md.vals[key] +} + +// Keys return the Metadata keys sorted lexicographically. +func (md *Metadata) Keys() (keys []string) { + var key string + for key = range md.vals { + keys = append(keys, key) + } + sort.Strings(keys) + return keys +} + +// KeysMap return the Metadata keys sorted lexicographically and its map. +func (md *Metadata) KeysMap() (keys []string, vals map[string]string) { + keys = md.Keys() + return keys, md.vals +} + +// Set store the key with value into Metadata. +// This method always increase the version. +func (md *Metadata) Set(key, value string) { + md.version++ + md.vals[key] = value +} + +// String return the Metadata where each item separated by comma and the +// key-value separated by equal character. +func (md *Metadata) String() string { + var keys = md.Keys() + + var ( + sb strings.Builder + key string + val string + x int + ) + for x, key = range keys { + if x > 0 { + sb.WriteByte(',') + } + + val = md.vals[key] + + sb.WriteString(key) + sb.WriteByte('=') + sb.WriteString(val) + } + return sb.String() +} + +// Version return the current version of Metadata. +func (md *Metadata) Version() int { + return md.version +} diff --git a/lib/telemetry/metadata_example_test.go b/lib/telemetry/metadata_example_test.go new file mode 100644 index 00000000..a8f8d39a --- /dev/null +++ b/lib/telemetry/metadata_example_test.go @@ -0,0 +1,41 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry_test + +import ( + "fmt" + + "github.com/shuLhan/share/lib/telemetry" +) + +func ExampleMetadata() { + var md = telemetry.NewMetadata() + + // The new Metadata has version=0. + fmt.Println(md.Version(), md.String(), ".") + + // Setting a key increase the version to 1 + md.Set(`host`, `localhost`) + fmt.Println(md.Version(), md.String(), ".") + + // ... even if the key already exist. + md.Set(`host`, `my.localhost`) + fmt.Println(md.Version(), md.String(), ".") + + // Deleting a key increase the version too. + md.Delete(`host`) + fmt.Println(md.Version(), md.String(), ".") + + // But if the key is not exist, it will not increase the version. + md.Delete(`host`) + fmt.Println(md.Version(), md.String(), ".") + + // Output: + // 0 . + // 1 host=localhost . + // 2 host=my.localhost . + // 3 . + // 3 . +} diff --git a/lib/telemetry/metric.go b/lib/telemetry/metric.go new file mode 100644 index 00000000..8cab29a2 --- /dev/null +++ b/lib/telemetry/metric.go @@ -0,0 +1,12 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +// Metric contains name, value, and timestamp. +type Metric struct { + Name string `json:"name"` + Value float64 `json:"value"` + Timestamp int64 `json:"timestamp"` +} diff --git a/lib/telemetry/stdout_forwarder.go b/lib/telemetry/stdout_forwarder.go new file mode 100644 index 00000000..4e84898b --- /dev/null +++ b/lib/telemetry/stdout_forwarder.go @@ -0,0 +1,44 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +import ( + "fmt" + "os" +) + +// StdoutForwarder write the metrics to os.Stdout. +// This type is used as example and to provide wrapper for os.Stdout, since +// user should not call Close on os.Stdout. +type StdoutForwarder struct { + formatter Formatter +} + +// NewStdoutForwarder create new StdoutForwarder using f as Formatter. +func NewStdoutForwarder(f Formatter) *StdoutForwarder { + return &StdoutForwarder{ + formatter: f, + } +} + +// Close on StdoutForwarder sync the Stdout. +func (stdout *StdoutForwarder) Close() error { + os.Stdout.Sync() + return nil +} + +// Formatter return the Formatter used by this StdoutForwarder. +func (stdout *StdoutForwarder) Formatter() Formatter { + return stdout.formatter +} + +// Write the raw metrics to stdout. +func (stdout *StdoutForwarder) Write(wire []byte) (n int, err error) { + n, err = os.Stdout.Write(wire) + if err != nil { + return n, fmt.Errorf(`StdoutForwarder.Forward: %w`, err) + } + return n, nil +} diff --git a/lib/telemetry/telemetry.go b/lib/telemetry/telemetry.go new file mode 100644 index 00000000..cdf654a0 --- /dev/null +++ b/lib/telemetry/telemetry.go @@ -0,0 +1,10 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package telemetry is a library for collecting various [Metric], for example +// from standard [runtime/metrics], and send or write it to one or more +// [Forwarder]. +// Each Forwarder has capability to format the Metric before sending or +// writing it using [Formatter]. +package telemetry diff --git a/lib/telemetry/timestamper.go b/lib/telemetry/timestamper.go new file mode 100644 index 00000000..0d141f9c --- /dev/null +++ b/lib/telemetry/timestamper.go @@ -0,0 +1,33 @@ +// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package telemetry + +import "time" + +// Timestamper a type that return a function to generate timestamp. +type Timestamper func() int64 + +// SecondTimestamp return the number of seconds elapsed since January 1, +// 1970 UTC +func SecondTimestamp() Timestamper { + return func() int64 { return time.Now().Unix() } +} + +// MilliTimestamp return the number of milliseconds elapsed since January 1, +// 1970 UTC. +func MilliTimestamp() Timestamper { + return func() int64 { return time.Now().UnixMilli() } +} + +// NanoTimestamp return the number of nanoseconds elapsed since January 1, +// 1970 UTC +func NanoTimestamp() Timestamper { + return func() int64 { return time.Now().UnixNano() } +} + +// DummyTimestamp return fixed epoch 1678606568, for testing only. +func DummyTimestamp() Timestamper { + return func() int64 { return 1678606568 } +} |
