aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2023-03-12 23:30:22 +0700
committerShulhan <ms@kilabit.info>2023-03-17 21:53:38 +0700
commitbb4cfe53505d1e7a89ab61524425fe6fdd2ed673 (patch)
treeb518f1a194d4bb795efad1cc2b96c6fbb2f7702e /lib
parente96899d74668cc0d711396482dfefefec10ab55d (diff)
downloadpakakeh.go-bb4cfe53505d1e7a89ab61524425fe6fdd2ed673.tar.xz
lib/telemetry: package for collecting and forwarding metrics
Package telemetry is a library for collecting various Metric, for example from standard runtime/metrics, and send or write it to one or more Forwarder. Each Forwarder has capability to format the Metric before sending or writing it using Formatter.
Diffstat (limited to 'lib')
-rw-r--r--lib/telemetry/agent.go269
-rw-r--r--lib/telemetry/agent_options.go57
-rw-r--r--lib/telemetry/buffer_forwarder.go57
-rw-r--r--lib/telemetry/buffer_forwarder_example_test.go70
-rw-r--r--lib/telemetry/collector.go11
-rw-r--r--lib/telemetry/context.go20
-rw-r--r--lib/telemetry/dsv_formatter.go105
-rw-r--r--lib/telemetry/file_forwarder.go48
-rw-r--r--lib/telemetry/formatter.go18
-rw-r--r--lib/telemetry/forwarder.go18
-rw-r--r--lib/telemetry/go_memstats_collector.go149
-rw-r--r--lib/telemetry/go_metrics_collector.go178
-rw-r--r--lib/telemetry/internal/cmd/agent-example/main.go77
-rw-r--r--lib/telemetry/metadata.go93
-rw-r--r--lib/telemetry/metadata_example_test.go41
-rw-r--r--lib/telemetry/metric.go12
-rw-r--r--lib/telemetry/stdout_forwarder.go44
-rw-r--r--lib/telemetry/telemetry.go10
-rw-r--r--lib/telemetry/timestamper.go33
19 files changed, 1310 insertions, 0 deletions
diff --git a/lib/telemetry/agent.go b/lib/telemetry/agent.go
new file mode 100644
index 00000000..26631cb0
--- /dev/null
+++ b/lib/telemetry/agent.go
@@ -0,0 +1,269 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry
+
+import (
+ "context"
+ "fmt"
+ "log"
+ "time"
+)
+
+const (
+ defQueueSize = 512
+)
+
+// Agent is the one that responsible to collect and forward the metrics.
+type Agent struct {
+ bulkq chan []Metric
+ singleq chan Metric
+
+ running chan bool
+
+ opts AgentOptions
+}
+
+// NewAgent create, initalize, and run the new Agent.
+// The agent will start auto collecting the metrics in the background every
+// [AgentOptions.Interval] and forward it to each [Forwarder].
+func NewAgent(opts AgentOptions) (agent *Agent) {
+ opts.init()
+
+ agent = &Agent{
+ opts: opts,
+
+ singleq: make(chan Metric, defQueueSize),
+ bulkq: make(chan []Metric, defQueueSize),
+ running: make(chan bool, 1),
+ }
+
+ go agent.forwarder()
+ go agent.collector()
+
+ return agent
+}
+
+// BulkForward push list of Metric asynchronously.
+// If ctx contains ContextForwardWait, it will forward the metric
+// synchronously.
+func (agent *Agent) BulkForward(ctx context.Context, list []Metric) error {
+ if len(list) == 0 {
+ return nil
+ }
+ var ctxWait = ctx.Value(agentContextForwardWait)
+ if ctxWait == nil {
+ agent.bulkq <- list
+ return nil
+ }
+ return agent.forwardBulk(ctx, list)
+}
+
+func (agent *Agent) collect() (all []Metric) {
+ var (
+ ts = agent.opts.Timestamp()
+
+ col Collector
+ list []Metric
+ )
+
+ for _, col = range agent.opts.Collectors {
+ list = col.Collect(ts)
+ all = append(all, list...)
+ }
+ return all
+}
+
+// collector collect the metrics on each interval and forward it.
+func (agent *Agent) collector() {
+ var (
+ ticker = time.NewTicker(agent.opts.Interval)
+ metrics []Metric
+ )
+
+ for {
+ select {
+ case <-ticker.C:
+ metrics = agent.collect()
+ agent.BulkForward(context.Background(), metrics)
+
+ case <-agent.running:
+ ticker.Stop()
+ // ACK the Stop.
+ agent.running <- false
+ return
+ }
+ }
+}
+
+// Forward single metric to agent asynchronously.
+// If ctx contains ContextForwardWait, it will forward the metric
+// synchronously.
+func (agent *Agent) Forward(ctx context.Context, m Metric) (err error) {
+ var ctxv = ctx.Value(agentContextForwardWait)
+ if ctxv == nil {
+ agent.singleq <- m
+ return nil
+ }
+ return agent.forwardSingle(ctx, &m)
+}
+
+func (agent *Agent) forwardBulk(ctx context.Context, list []Metric) (err error) {
+ if len(list) == 0 {
+ return nil
+ }
+
+ var (
+ ts = agent.opts.Timestamp()
+ x int
+ )
+ for ; x < len(list); x++ {
+ if list[x].Timestamp <= 0 {
+ list[x].Timestamp = ts
+ }
+ }
+
+ var (
+ // Map of Formatter.Name with its format result.
+ fmtWire = map[string][]byte{}
+
+ fwd Forwarder
+ fmter Formatter
+ fmtName string
+ wire []byte
+ errfwd error
+ ok bool
+ )
+ for _, fwd = range agent.opts.Forwarders {
+ fmter = fwd.Formatter()
+ if fmter == nil {
+ continue
+ }
+
+ fmtName = fmter.Name()
+
+ // Check if we have format the metrics before using the same
+ // Formatter.
+ wire, ok = fmtWire[fmtName]
+ if !ok {
+ wire = fmter.BulkFormat(list, agent.opts.Metadata)
+ fmtWire[fmtName] = wire
+ }
+
+ select {
+ case <-ctx.Done():
+ return err
+ default:
+ _, errfwd = fwd.Write(wire)
+ if errfwd != nil {
+ if err == nil {
+ err = fmt.Errorf(`forwardBulk: %s`, errfwd)
+ } else {
+ err = fmt.Errorf(`%s: %s`, err, errfwd)
+ }
+ }
+ }
+ }
+ return err
+}
+
+func (agent *Agent) forwardSingle(ctx context.Context, m *Metric) (err error) {
+ if m == nil {
+ return nil
+ }
+
+ var (
+ // Map of Formatter.Name with its format result.
+ fmtWire = map[string][]byte{}
+
+ fwd Forwarder
+ fmter Formatter
+ fmtName string
+ wire []byte
+ errfwd error
+ ok bool
+ )
+
+ if m.Timestamp <= 0 {
+ m.Timestamp = agent.opts.Timestamp()
+ }
+
+ for _, fwd = range agent.opts.Forwarders {
+ fmter = fwd.Formatter()
+ if fmter == nil {
+ continue
+ }
+
+ fmtName = fmter.Name()
+
+ // Check if we have format the metrics before using the same
+ // Formatter.
+ wire, ok = fmtWire[fmtName]
+ if !ok {
+ wire = fmter.Format(*m, agent.opts.Metadata)
+ fmtWire[fmtName] = wire
+ }
+
+ select {
+ case <-ctx.Done():
+ // Request cancelled, timeout, or reach deadlines.
+ return err
+ default:
+ _, errfwd = fwd.Write(wire)
+ if errfwd != nil {
+ if err == nil {
+ err = fmt.Errorf(`forwardSingle: %s`, errfwd)
+ } else {
+ err = fmt.Errorf(`%s: %s`, err, errfwd)
+ }
+ }
+ }
+ }
+ return err
+}
+
+// forwarder the goroutine that queue and forward single or bulk of Metric.
+func (agent *Agent) forwarder() {
+ var (
+ m Metric
+ list []Metric
+ err error
+ )
+
+ for {
+ select {
+ case list = <-agent.bulkq:
+ err = agent.forwardBulk(context.Background(), list)
+ if err != nil {
+ log.Print(err)
+ }
+ case m = <-agent.singleq:
+ err = agent.forwardSingle(context.Background(), &m)
+ if err != nil {
+ log.Print(err)
+ }
+ case <-agent.running:
+ // ACK the Stop.
+ agent.running <- false
+ return
+ }
+ }
+}
+
+// Stop the agent and close all [Forwarder].
+func (agent *Agent) Stop() {
+ // Stop the the first goroutine.
+ agent.running <- false
+ <-agent.running
+
+ // Stop the the second goroutine.
+ agent.running <- false
+ <-agent.running
+
+ // Close all forwarders.
+ var fwd Forwarder
+ for _, fwd = range agent.opts.Forwarders {
+ _ = fwd.Close()
+ }
+}
diff --git a/lib/telemetry/agent_options.go b/lib/telemetry/agent_options.go
new file mode 100644
index 00000000..95e707d4
--- /dev/null
+++ b/lib/telemetry/agent_options.go
@@ -0,0 +1,57 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry
+
+import "time"
+
+// List of default value or limit for AgentOptions.
+const (
+ defInterval = 1 * time.Minute
+ defIntervalMinimum = 10 * time.Second
+)
+
+// AgentOptions contains options to run the Agent.
+type AgentOptions struct {
+ // Name of the agent.
+ Name string
+
+ // Metadata provides static, additional information to be forwarded
+ // along with the collected metrics.
+ Metadata *Metadata
+
+ // Timestamp define the function to be called to set the
+ // [Metric.Timestamp].
+ // Default to NanoTimestamp.
+ Timestamp Timestamper
+
+ // Collectors contains list of Collector that provide the metrics to
+ // be forwarded.
+ // An empty Collectors means no metrics will be collected and
+ // forwarded.
+ Collectors []Collector
+
+ // Forwarders contains list of target where collected metrics will be
+ // forwarded.
+ Forwarders []Forwarder
+
+ // Interval for collecting metrics.
+ // Default value is one minutes with the minimium value is 10 seconds.
+ Interval time.Duration
+}
+
+// init initialize the AgentOptions default values.
+func (opts *AgentOptions) init() {
+ if opts.Metadata == nil {
+ opts.Metadata = NewMetadata()
+ }
+ if opts.Timestamp == nil {
+ opts.Timestamp = NanoTimestamp()
+ }
+ if opts.Interval <= 0 {
+ opts.Interval = defInterval
+ } else if opts.Interval < defIntervalMinimum {
+ opts.Interval = defIntervalMinimum
+ }
+}
diff --git a/lib/telemetry/buffer_forwarder.go b/lib/telemetry/buffer_forwarder.go
new file mode 100644
index 00000000..e0adca7c
--- /dev/null
+++ b/lib/telemetry/buffer_forwarder.go
@@ -0,0 +1,57 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry
+
+import (
+ "bytes"
+ "fmt"
+ "sync"
+)
+
+// BufferForwarder write the metrics to underlying [bytes.Buffer].
+type BufferForwarder struct {
+ formatter Formatter
+ bb bytes.Buffer
+ sync.Mutex
+}
+
+// NewBufferForwarder create new BufferForwarder using f as Formatter.
+func NewBufferForwarder(f Formatter) *BufferForwarder {
+ return &BufferForwarder{
+ formatter: f,
+ }
+}
+
+// Bytes return the metrics that has been written to Buffer.
+// Once this method called the underlying Buffer will be resetted.
+func (buf *BufferForwarder) Bytes() (b []byte) {
+ buf.Lock()
+ b = buf.bb.Bytes()
+ buf.bb.Reset()
+ buf.Unlock()
+ return b
+}
+
+// Close on Buffer is a no-op.
+func (buf *BufferForwarder) Close() error {
+ return nil
+}
+
+// Formatter return the Formatter used by this BufferForwarder.
+func (buf *BufferForwarder) Formatter() Formatter {
+ return buf.formatter
+}
+
+// Write the raw metrics to Buffer.
+func (buf *BufferForwarder) Write(wire []byte) (n int, err error) {
+ buf.Lock()
+ defer buf.Unlock()
+
+ n, err = buf.bb.Write(wire)
+ if err != nil {
+ return n, fmt.Errorf(`BufferForwarder.Forward: %w`, err)
+ }
+ return n, nil
+}
diff --git a/lib/telemetry/buffer_forwarder_example_test.go b/lib/telemetry/buffer_forwarder_example_test.go
new file mode 100644
index 00000000..c7afc11f
--- /dev/null
+++ b/lib/telemetry/buffer_forwarder_example_test.go
@@ -0,0 +1,70 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry_test
+
+import (
+ "context"
+ "fmt"
+ "log"
+
+ "github.com/shuLhan/share/lib/telemetry"
+)
+
+func ExampleBufferForwarder() {
+ // Create the Formatter and Forwarder.
+ var (
+ dsvFmt = telemetry.NewDsvFormatter(';', telemetry.RuntimeMetricsAlias)
+ bufFwd = telemetry.NewBufferForwarder(dsvFmt)
+ )
+
+ // Create metadata.
+ var md = telemetry.NewMetadata()
+ md.Set(`name`, `BufferForwarder`)
+ md.Set(`version`, `0.1.0`)
+
+ // Create the Agent.
+ var (
+ agentOpts = telemetry.AgentOptions{
+ Metadata: md,
+ Forwarders: []telemetry.Forwarder{bufFwd},
+ Timestamp: telemetry.DummyTimestamp(),
+ }
+ agent = telemetry.NewAgent(agentOpts)
+ )
+ defer agent.Stop()
+
+ // Forward single metric and print the result.
+ var (
+ m = telemetry.Metric{
+ Name: `usage`,
+ Value: 0.5,
+ }
+ ctx = telemetry.ContextForwardWait(context.Background())
+
+ err error
+ )
+ err = agent.Forward(ctx, m)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ fmt.Printf(`%s`, bufFwd.Bytes())
+
+ // Forward list of Metric and print the result.
+ var list = []telemetry.Metric{
+ {Name: `usage`, Value: 0.4},
+ {Name: `usage`, Value: 0.3},
+ }
+ err = agent.BulkForward(ctx, list)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ fmt.Printf(`%s`, bufFwd.Bytes())
+ // Output:
+ // 1678606568;"usage";0.500000;"name=BufferForwarder,version=0.1.0"
+ // 1678606568;"usage";0.400000;"name=BufferForwarder,version=0.1.0"
+ // 1678606568;"usage";0.300000;"name=BufferForwarder,version=0.1.0"
+}
diff --git a/lib/telemetry/collector.go b/lib/telemetry/collector.go
new file mode 100644
index 00000000..75572f4a
--- /dev/null
+++ b/lib/telemetry/collector.go
@@ -0,0 +1,11 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry
+
+// Collector provides an interface to collect the metrics.
+type Collector interface {
+ // Collect the metrics with timestamp.
+ Collect(timestamp int64) []Metric
+}
diff --git a/lib/telemetry/context.go b/lib/telemetry/context.go
new file mode 100644
index 00000000..41f90a74
--- /dev/null
+++ b/lib/telemetry/context.go
@@ -0,0 +1,20 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry
+
+import "context"
+
+type agentContext string
+
+// List of context for agent.
+const (
+ agentContextForwardWait agentContext = `agent_push_wait`
+)
+
+// ContextForwardWait wait for the [Agent.Forward] or [Agent.BulkForward] to be
+// finished.
+func ContextForwardWait(ctx context.Context) context.Context {
+ return context.WithValue(ctx, agentContextForwardWait, struct{}{})
+}
diff --git a/lib/telemetry/dsv_formatter.go b/lib/telemetry/dsv_formatter.go
new file mode 100644
index 00000000..589206d3
--- /dev/null
+++ b/lib/telemetry/dsv_formatter.go
@@ -0,0 +1,105 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry
+
+import (
+ "bytes"
+ "fmt"
+
+ libbytes "github.com/shuLhan/share/lib/bytes"
+)
+
+// DsvFormatter format the [Metric] in single line where each value is separated
+// by single character.
+// The metric are formatted in the following order,
+//
+// Timestamp SEP Name SEP Value *(SEP metadata)
+// metadata = Metadata.Key "=" Metadata.Value *("," metadata)
+//
+// The Name, Value, and metadata are enclosed with double quoted.
+type DsvFormatter struct {
+ metricsAlias map[string]string
+ name string
+ mdRaw []byte
+ sep rune
+ mdVersion int // Latest version of metadata.
+}
+
+// NewDsvFormatter create new DsvFormatter using sep as separater and options
+// to change the metric output name using metricsAlias.
+// See [RuntimeMetricsAlias] for example.
+func NewDsvFormatter(sep rune, metricsAlias map[string]string) (dsv *DsvFormatter) {
+ dsv = &DsvFormatter{
+ name: `dsv`,
+ sep: sep,
+ metricsAlias: metricsAlias,
+ }
+ return dsv
+}
+
+// BulkFormat bulk format list of Metric with [Metadata].
+func (dsv *DsvFormatter) BulkFormat(listm []Metric, md *Metadata) []byte {
+ if len(listm) == 0 {
+ return nil
+ }
+
+ dsv.generateMetadata(md)
+
+ var (
+ bb bytes.Buffer
+ m Metric
+ )
+ for _, m = range listm {
+ dsv.formatMetric(&bb, m)
+ }
+ return libbytes.Copy(bb.Bytes())
+}
+
+// Format single Metric into single line DSV.
+func (dsv *DsvFormatter) Format(m Metric, md *Metadata) []byte {
+ return dsv.BulkFormat([]Metric{m}, md)
+}
+
+func (dsv *DsvFormatter) formatMetric(bb *bytes.Buffer, m Metric) {
+ if len(m.Name) == 0 {
+ return
+ }
+
+ var name = dsv.metricsAlias[m.Name]
+ if len(name) == 0 {
+ name = m.Name
+ }
+ fmt.Fprintf(bb, "%d%c%q%c%f%c%q\n", m.Timestamp, dsv.sep, name, dsv.sep, m.Value, dsv.sep, dsv.mdRaw)
+}
+
+func (dsv *DsvFormatter) generateMetadata(md *Metadata) {
+ var mdVersion = md.Version()
+ if dsv.mdVersion == mdVersion {
+ return
+ }
+
+ var (
+ keys, vals = md.KeysMap()
+
+ bb bytes.Buffer
+ k string
+ v string
+ x int
+ )
+ for x, k = range keys {
+ if x > 0 {
+ bb.WriteByte(',')
+ }
+ v = vals[k]
+ fmt.Fprintf(&bb, `%s=%s`, k, v)
+ }
+ dsv.mdRaw = libbytes.Copy(bb.Bytes())
+ dsv.mdVersion = mdVersion
+}
+
+// Name return the Name of DsvFormatter as "dsv".
+func (dsv *DsvFormatter) Name() string {
+ return dsv.name
+}
diff --git a/lib/telemetry/file_forwarder.go b/lib/telemetry/file_forwarder.go
new file mode 100644
index 00000000..d3b95d7f
--- /dev/null
+++ b/lib/telemetry/file_forwarder.go
@@ -0,0 +1,48 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry
+
+import (
+ "fmt"
+ "io"
+)
+
+// FileForwarder forward the raw metrics into file.
+type FileForwarder struct {
+ fmt Formatter
+ file io.WriteCloser
+}
+
+// NewFileForwarder create new FileForwarder using fmt as the Formatter.
+func NewFileForwarder(fmt Formatter, file io.WriteCloser) (fwd *FileForwarder) {
+ fwd = &FileForwarder{
+ fmt: fmt,
+ file: file,
+ }
+ return fwd
+}
+
+// Close the underlying file.
+// Calling Forward after closing Forwarder may cause panic.
+func (fwd *FileForwarder) Close() (err error) {
+ if fwd.file != nil {
+ err = fwd.file.Close()
+ }
+ return err
+}
+
+// Formatter return the Formatter that is used by this FileForwarder.
+func (fwd *FileForwarder) Formatter() Formatter {
+ return fwd.fmt
+}
+
+// Forward write the formatted metrics into file.
+func (fwd *FileForwarder) Forward(wire []byte) (err error) {
+ _, err = fwd.file.Write(wire)
+ if err != nil {
+ return fmt.Errorf(`FileForwarder.Forward: %w`, err)
+ }
+ return nil
+}
diff --git a/lib/telemetry/formatter.go b/lib/telemetry/formatter.go
new file mode 100644
index 00000000..dc4bc5d0
--- /dev/null
+++ b/lib/telemetry/formatter.go
@@ -0,0 +1,18 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry
+
+// Formatter define the interface that responsible to convert single or bulk
+// of Metric into its wire format.
+type Formatter interface {
+ // BulkFormat format list of Metric with metadata for transfer.
+ BulkFormat(listm []Metric, md *Metadata) []byte
+
+ // Format the Metric m and metadata for transfer.
+ Format(m Metric, md *Metadata) []byte
+
+ // Name return the name of formatter.
+ Name() string
+}
diff --git a/lib/telemetry/forwarder.go b/lib/telemetry/forwarder.go
new file mode 100644
index 00000000..c958acc3
--- /dev/null
+++ b/lib/telemetry/forwarder.go
@@ -0,0 +1,18 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry
+
+import "io"
+
+// Forwarder provide the interface to be implemented by forwarder in order to
+// store the collected metrics.
+type Forwarder interface {
+ // Implement the Close and Write from package [io].
+ // Calling Forward after Close may cause panic.
+ io.WriteCloser
+
+ // Formatter return the Formatter being used to format the metrics.
+ Formatter() Formatter
+}
diff --git a/lib/telemetry/go_memstats_collector.go b/lib/telemetry/go_memstats_collector.go
new file mode 100644
index 00000000..56923064
--- /dev/null
+++ b/lib/telemetry/go_memstats_collector.go
@@ -0,0 +1,149 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry
+
+import (
+ "log"
+ "regexp"
+ "runtime"
+ "sort"
+)
+
+// GoMemStatsCollector collect Go statistics about memory allocator, as in
+// calling [runtime.ReadMemStats].
+//
+// This collector export the following metric names, with value from the field
+// of [runtime.MemStats]:
+//
+// - go_memstats_alloc_bytes: [runtime.MemStats.Alloc]
+// - go_memstats_total_alloc_bytes: [runtime.MemStats.TotalAlloc]
+// - go_memstats_sys_bytes: [runtime.MemStats.Sys]
+// - go_memstats_lookups: [runtime.MemStats.Lookups]
+// - go_memstats_mallocs_objects: [runtime.MemStats.Mallocs]
+// - go_memstats_frees_objects: [runtime.MemStats.Frees]
+// - go_memstats_heap_alloc_bytes: [runtime.MemStats.HeapAlloc]
+// - go_memstats_heap_sys_bytes: [runtime.MemStats.HeapSys]
+// - go_memstats_heap_idle_bytes: [runtime.MemStats.HeapIdle]
+// - go_memstats_heap_inuse_bytes: [runtime.MemStats.HeapInuse]
+// - go_memstats_heap_released_bytes: [runtime.MemStats.HeapReleased]
+// - go_memstats_heap_objects: [runtime.MemStats.HeapObjects]
+// - go_memstats_stack_inuse_bytes: [runtime.MemStats.StackInuse]
+// - go_memstats_stack_sys_bytes: [runtime.MemStats.StackSys]
+// - go_memstats_mspan_inuse_bytes: [runtime.MemStats.MSpanInuse]
+// - go_memstats_mspan_sys_bytes: [runtime.MemStats.MSpanSys]
+// - go_memstats_mcache_inuse_bytes: [runtime.MemStats.MCacheInuse]
+// - go_memstats_mcache_sys_bytes: [runtime.MemStats.MCacheSys]
+// - go_memstats_buck_hash_sys_bytes: [runtime.MemStats.BuckHashSys]
+// - go_memstats_gc_sys_bytes: [runtime.MemStats.GCSys]
+// - go_memstats_other_sys_bytes: [runtime.MemStats.OtherSys]
+// - go_memstats_gc_next_bytes: [runtime.MemStats.NextGC]
+// - go_memstats_gc_last: [runtime.MemStats.LastGC]
+// - go_memstats_pause_total_ns: [runtime.MemStats.PauseTotalNs]
+// - go_memstats_pause_ns: [runtime.MemStats.PauseNs]
+// - go_memstats_pause_end_ns: [runtime.MemStats.PauseEnd]
+// - go_memstats_gc_num: [runtime.MemStats.NumGC]
+// - go_memstats_gc_forced_num: [runtime.MemStats.NumForcedGC]
+// - go_memstats_gc_cpu_fraction: [runtime.MemStats.GCCPUFraction]
+type GoMemStatsCollector struct {
+ memstats runtime.MemStats
+
+ // names contains the filtered metric names.
+ names []string
+
+ // map of metric name with its pointer to its value.
+ nameValue map[string]any
+}
+
+// NewGoMemStatsCollector create new MemStats collector with options to filter
+// the metrics by its name using regular expression.
+//
+// If filter is nil, none of the metrics will be collected.
+func NewGoMemStatsCollector(filter *regexp.Regexp) (col *GoMemStatsCollector) {
+ col = &GoMemStatsCollector{}
+ if filter == nil {
+ return col
+ }
+
+ col.init(filter)
+
+ return col
+}
+
+func (col *GoMemStatsCollector) init(filter *regexp.Regexp) {
+ col.nameValue = map[string]any{
+ `go_memstats_alloc_bytes`: &col.memstats.Alloc,
+ `go_memstats_total_alloc_bytes`: &col.memstats.TotalAlloc,
+ `go_memstats_sys_bytes`: &col.memstats.Sys,
+ `go_memstats_lookups`: &col.memstats.Lookups,
+ `go_memstats_mallocs_objects`: &col.memstats.Mallocs,
+ `go_memstats_frees_objects`: &col.memstats.Frees,
+ `go_memstats_heap_alloc_bytes`: &col.memstats.HeapAlloc,
+ `go_memstats_heap_sys_bytes`: &col.memstats.HeapSys,
+ `go_memstats_heap_idle_bytes`: &col.memstats.HeapIdle,
+ `go_memstats_heap_inuse_bytes`: &col.memstats.HeapInuse,
+ `go_memstats_heap_released_bytes`: &col.memstats.HeapReleased,
+ `go_memstats_heap_objects`: &col.memstats.HeapObjects,
+ `go_memstats_stack_inuse_bytes`: &col.memstats.StackInuse,
+ `go_memstats_stack_sys_bytes`: &col.memstats.StackSys,
+ `go_memstats_mspan_inuse_bytes`: &col.memstats.MSpanInuse,
+ `go_memstats_mspan_sys_bytes`: &col.memstats.MSpanSys,
+ `go_memstats_mcache_inuse_bytes`: &col.memstats.MCacheInuse,
+ `go_memstats_mcache_sys_bytes`: &col.memstats.MCacheSys,
+ `go_memstats_buck_hash_sys_bytes`: &col.memstats.BuckHashSys,
+ `go_memstats_gc_sys_bytes`: &col.memstats.GCSys,
+ `go_memstats_other_sys_bytes`: &col.memstats.OtherSys,
+ `go_memstats_gc_next_bytes`: &col.memstats.NextGC,
+ `go_memstats_gc_last`: &col.memstats.LastGC,
+ `go_memstats_pause_total_ns`: &col.memstats.PauseTotalNs,
+ `go_memstats_pause_ns`: &col.memstats.PauseNs,
+ `go_memstats_pause_end_ns`: &col.memstats.PauseEnd,
+ `go_memstats_gc_num`: &col.memstats.NumGC,
+ `go_memstats_gc_forced_num`: &col.memstats.NumForcedGC,
+ `go_memstats_gc_cpu_fraction`: &col.memstats.GCCPUFraction,
+ }
+
+ var key string
+ for key = range col.nameValue {
+ if filter.MatchString(key) {
+ col.names = append(col.names, key)
+ }
+ }
+ sort.Strings(col.names)
+}
+
+// Collect the Go MemStats.
+func (col *GoMemStatsCollector) Collect(ts int64) (list []Metric) {
+ if len(col.names) == 0 {
+ return nil
+ }
+
+ runtime.ReadMemStats(&col.memstats)
+
+ var name string
+ for _, name = range col.names {
+ var m = Metric{
+ Timestamp: ts,
+ Name: name,
+ }
+
+ var val = col.nameValue[name]
+
+ switch v := val.(type) {
+ case *uint64:
+ m.Value = float64(*v)
+ case *uint32:
+ m.Value = float64(*v)
+ case *float64:
+ m.Value = *v
+ case *[256]uint64:
+ var last = v[(col.memstats.NumGC+255)%256]
+ m.Value = float64(last)
+ default:
+ log.Printf(`GoMemStatsCollector.Collect: unknown type: %T %v`, v, v)
+ }
+ list = append(list, m)
+ }
+ return list
+}
diff --git a/lib/telemetry/go_metrics_collector.go b/lib/telemetry/go_metrics_collector.go
new file mode 100644
index 00000000..f0f0dcd6
--- /dev/null
+++ b/lib/telemetry/go_metrics_collector.go
@@ -0,0 +1,178 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry
+
+import (
+ "regexp"
+ "runtime/metrics"
+)
+
+// RuntimeMetricsAlias define an alias for [runtime/metrics.Name] to be
+// exported by GoMetricsCollector.
+var RuntimeMetricsAlias = map[string]string{
+ `/cgo/go-to-c-calls:calls`: `go_cgo_calls`,
+
+ `/cpu/classes/gc/mark/assist:cpu-seconds`: `go_cpu_gc_mark_assist_seconds`,
+ `/cpu/classes/gc/mark/dedicated:cpu-seconds`: `go_cpu_gc_mark_dedicated_seconds`,
+ `/cpu/classes/gc/mark/idle:cpu-seconds`: `go_cpu_gc_mark_idle_seconds`,
+ `/cpu/classes/gc/pause:cpu-seconds`: `go_cpu_gc_pause_seconds`,
+ `/cpu/classes/gc/total:cpu-seconds`: `go_cpu_gc_total_seconds`,
+
+ `/cpu/classes/idle:cpu-seconds`: `go_cpu_idle_seconds`,
+ `/cpu/classes/scavenge/assist:cpu-seconds`: `go_cpu_scavenge_assist_seconds`,
+ `/cpu/classes/scavenge/background:cpu-seconds`: `go_cpu_scavenge_background_seconds`,
+ `/cpu/classes/scavenge/total:cpu-seconds`: `go_cpu_scavenge_total_seconds`,
+ `/cpu/classes/total:cpu-seconds`: `go_cpu_total_seconds`,
+ `/cpu/classes/user:cpu-seconds`: `go_cpu_user_seconds`,
+
+ `/gc/cycles/automatic:gc-cycles`: `go_gc_cycles_automatic`,
+ `/gc/cycles/forced:gc-cycles`: `go_gc_cycles_forced`,
+ `/gc/cycles/total:gc-cycles`: `go_gc_cycles_total`,
+
+ `/gc/heap/allocs-by-size:bytes`: `go_gc_heap_alloc_by_size_bytes`,
+ `/gc/heap/allocs:bytes`: `go_gc_heap_allocs_bytes`,
+ `/gc/heap/allocs:objects`: `go_gc_heap_allocs_objects`,
+ `/gc/heap/frees-by-size:bytes`: `go_gc_heap_frees_by_size_bytes`,
+ `/gc/heap/frees:bytes`: `go_gc_heap_frees_bytes`,
+ `/gc/heap/frees:objects`: `go_gc_heap_frees_objects`,
+ `/gc/heap/goal:bytes`: `go_gc_heap_goal_bytes`,
+ `/gc/heap/objects:objects`: `go_gc_heap_objects`,
+ `/gc/heap/tiny/allocs:objects`: `go_gc_heap_tiny_allocs_objects`,
+
+ `/gc/limiter/last-enabled:gc-cycle`: `go_gc_limiter_last_enabled`,
+
+ `/gc/pauses:seconds`: `go_gc_pauses_seconds`,
+
+ `/gc/stack/starting-size:bytes`: `go_gc_stack_starting_size_bytes`,
+
+ `/godebug/non-default-behavior/execerrdot:events`: `go_godebug_execerrdot_events`,
+ `/godebug/non-default-behavior/http2client:events`: `go_godebug_http2client_events`,
+ `/godebug/non-default-behavior/http2server:events`: `go_godebug_http2server_events`,
+ `/godebug/non-default-behavior/installgoroot:events`: `go_godebug_installgoroot_events`,
+ `/godebug/non-default-behavior/panicnil:events`: `go_godebug_panicnil_events`,
+ `/godebug/non-default-behavior/randautoseed:events`: `go_godebug_randautoseed_events`,
+ `/godebug/non-default-behavior/tarinsecurepath:events`: `go_godebug_trainsecurepath_events`,
+ `/godebug/non-default-behavior/x509sha1:events`: `go_godebug_x509sha1_events`,
+ `/godebug/non-default-behavior/x509usefallbackroots:events`: `go_godebug_x509usefallbackroots_events`,
+ `/godebug/non-default-behavior/zipinsecurepath:events`: `go_godebug_zipinsecurepath_events`,
+
+ `/memory/classes/heap/free:bytes`: `go_memory_heap_free_bytes`,
+ `/memory/classes/heap/objects:bytes`: `go_memory_heap_objects_bytes`,
+ `/memory/classes/heap/released:bytes`: `go_memory_heap_released_bytes`,
+ `/memory/classes/heap/stacks:bytes`: `go_memory_heap_stacks_bytes`,
+ `/memory/classes/heap/unused:bytes`: `go_memory_heap_unused_bytes`,
+
+ `/memory/classes/metadata/mcache/free:bytes`: `go_memory_metadata_mcache_free_bytes`,
+ `/memory/classes/metadata/mcache/inuse:bytes`: `go_memory_metadata_mcache_inuse_bytes`,
+ `/memory/classes/metadata/mspan/free:bytes`: `go_memory_metadata_mspan_free_bytes`,
+ `/memory/classes/metadata/mspan/inuse:bytes`: `go_memory_metadata_mspan_inuse_bytes`,
+ `/memory/classes/metadata/other:bytes`: `go_memory_metadata_other_bytes`,
+
+ `/memory/classes/os-stacks:bytes`: `go_memory_os_stacks_bytes`,
+ `/memory/classes/other:bytes`: `go_memory_other_bytes`,
+ `/memory/classes/profiling/buckets:bytes`: `go_memory_profiling_buckets_bytes`,
+ `/memory/classes/total:bytes`: `go_memory_total_bytes`,
+
+ `/sched/gomaxprocs:threads`: `go_sched_gomaxprocs`,
+ `/sched/goroutines:goroutines`: `go_sched_goroutines`,
+ `/sched/latencies:seconds`: `go_sched_latencies_seconds`,
+
+ `/sync/mutex/wait/total:seconds`: `go_sync_mutex_wait_total_seconds`,
+}
+
+// GoMetricsCollector collect the metrics using [runtime/metrics.Read].
+type GoMetricsCollector struct {
+ // samples list of [runtime/metrics.Sample] that has been
+ // filtered using [AgentOptions.RuntimeMetrics].
+ samples []metrics.Sample
+}
+
+// NewGoMetricsCollector create new collector for [runtime/metrics] with
+// options to filter specific metric by alias name using regular expression.
+//
+// For example, to collect all metrics pass regex `^.*$`, to collect memory
+// only pass `^go_memory_.*$`.
+// A nil filter means no metrics will be collected.
+func NewGoMetricsCollector(filter *regexp.Regexp) (col *GoMetricsCollector) {
+ col = &GoMetricsCollector{}
+ if filter == nil {
+ // Nothing to collect.
+ return col
+ }
+
+ var (
+ org string
+ alias string
+ )
+ for org, alias = range RuntimeMetricsAlias {
+ if filter.MatchString(alias) {
+ var sample = metrics.Sample{
+ Name: org,
+ }
+ col.samples = append(col.samples, sample)
+ }
+ }
+ return col
+}
+
+// Collect the [runtime/metrics].
+func (col *GoMetricsCollector) Collect(timestamp int64) (list []Metric) {
+ if len(col.samples) == 0 {
+ return nil
+ }
+
+ metrics.Read(col.samples)
+
+ var sample metrics.Sample
+
+ list = make([]Metric, 0, len(col.samples))
+
+ for _, sample = range col.samples {
+ var m = Metric{
+ Timestamp: timestamp,
+ Name: RuntimeMetricsAlias[sample.Name],
+ }
+
+ switch sample.Value.Kind() {
+ case metrics.KindUint64:
+ m.Value = float64(sample.Value.Uint64())
+ case metrics.KindFloat64:
+ m.Value = sample.Value.Float64()
+ case metrics.KindFloat64Histogram:
+ var hist = sample.Value.Float64Histogram()
+ m.Value = medianBucket(hist)
+ }
+ list = append(list, m)
+ }
+ return list
+}
+
+// medianBucket get the median of the histogram values.
+func medianBucket(hist *metrics.Float64Histogram) float64 {
+ var (
+ total uint64
+ count uint64
+ )
+ for _, count = range hist.Counts {
+ total += count
+ }
+ if total == 0 {
+ return 0
+ }
+
+ var (
+ thresh = total / 2
+ x int
+ )
+
+ total = 0
+ for x, count = range hist.Counts {
+ total += count
+ if total >= thresh {
+ return hist.Buckets[x]
+ }
+ }
+ return 0
+}
diff --git a/lib/telemetry/internal/cmd/agent-example/main.go b/lib/telemetry/internal/cmd/agent-example/main.go
new file mode 100644
index 00000000..5b561ff9
--- /dev/null
+++ b/lib/telemetry/internal/cmd/agent-example/main.go
@@ -0,0 +1,77 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Program agent-example provide an example of how to create agent.
+package main
+
+import (
+ "log"
+ "os"
+ "os/signal"
+ "regexp"
+ "syscall"
+ "time"
+
+ "github.com/shuLhan/share/lib/telemetry"
+)
+
+func main() {
+ var (
+ col = createGoMemStatsCollector()
+ ilpFmt = telemetry.NewIlpFormatter(`rescached`)
+ stdoutFwd = telemetry.NewStdoutForwarder(ilpFmt)
+ )
+
+ // Create metadata.
+ var md = telemetry.NewMetadata()
+ md.Set(`name`, `agent-example`)
+ md.Set(`version`, `0.1.0`)
+
+ // Create the Agent.
+ var (
+ agentOpts = telemetry.AgentOptions{
+ Metadata: md,
+ Timestamp: telemetry.NanoTimestamp(),
+ Forwarders: []telemetry.Forwarder{
+ stdoutFwd,
+ },
+ Collectors: []telemetry.Collector{
+ col,
+ },
+ Interval: 10 * time.Second,
+ }
+ agent = telemetry.NewAgent(agentOpts)
+ )
+ defer agent.Stop()
+
+ var qsignal = make(chan os.Signal, 1)
+ signal.Notify(qsignal, syscall.SIGQUIT, syscall.SIGSEGV, syscall.SIGTERM, syscall.SIGINT)
+ <-qsignal
+}
+
+func createGoMetricsCollector() (col *telemetry.GoMetricsCollector) {
+ var (
+ metricsFilter *regexp.Regexp
+ err error
+ )
+ metricsFilter, err = regexp.Compile(`^go_(cpu|gc|memory|sched)_.*$`)
+ if err != nil {
+ log.Fatal(err)
+ }
+ col = telemetry.NewGoMetricsCollector(metricsFilter)
+ return col
+}
+
+func createGoMemStatsCollector() (col *telemetry.GoMemStatsCollector) {
+ var (
+ metricsFilter *regexp.Regexp
+ err error
+ )
+ metricsFilter, err = regexp.Compile(`^.*$`)
+ if err != nil {
+ log.Fatal(err)
+ }
+ col = telemetry.NewGoMemStatsCollector(metricsFilter)
+ return col
+}
diff --git a/lib/telemetry/metadata.go b/lib/telemetry/metadata.go
new file mode 100644
index 00000000..75742c77
--- /dev/null
+++ b/lib/telemetry/metadata.go
@@ -0,0 +1,93 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry
+
+import (
+ "sort"
+ "strings"
+)
+
+// Metadata provides versioned Map with stable key order.
+type Metadata struct {
+ vals map[string]string
+ version int
+}
+
+// NewMetadata create and initialize new metadata.
+func NewMetadata() (md *Metadata) {
+ md = &Metadata{
+ vals: map[string]string{},
+ }
+ return md
+}
+
+// Delete Metadata by its key.
+// The versioning will be increased only if the key exist.
+func (md *Metadata) Delete(key string) {
+ var ok bool
+ _, ok = md.vals[key]
+ if ok {
+ delete(md.vals, key)
+ md.version++
+ }
+}
+
+// Get the Metadata value by its key.
+func (md *Metadata) Get(key string) string {
+ return md.vals[key]
+}
+
+// Keys return the Metadata keys sorted lexicographically.
+func (md *Metadata) Keys() (keys []string) {
+ var key string
+ for key = range md.vals {
+ keys = append(keys, key)
+ }
+ sort.Strings(keys)
+ return keys
+}
+
+// KeysMap return the Metadata keys sorted lexicographically and its map.
+func (md *Metadata) KeysMap() (keys []string, vals map[string]string) {
+ keys = md.Keys()
+ return keys, md.vals
+}
+
+// Set store the key with value into Metadata.
+// This method always increase the version.
+func (md *Metadata) Set(key, value string) {
+ md.version++
+ md.vals[key] = value
+}
+
+// String return the Metadata where each item separated by comma and the
+// key-value separated by equal character.
+func (md *Metadata) String() string {
+ var keys = md.Keys()
+
+ var (
+ sb strings.Builder
+ key string
+ val string
+ x int
+ )
+ for x, key = range keys {
+ if x > 0 {
+ sb.WriteByte(',')
+ }
+
+ val = md.vals[key]
+
+ sb.WriteString(key)
+ sb.WriteByte('=')
+ sb.WriteString(val)
+ }
+ return sb.String()
+}
+
+// Version return the current version of Metadata.
+func (md *Metadata) Version() int {
+ return md.version
+}
diff --git a/lib/telemetry/metadata_example_test.go b/lib/telemetry/metadata_example_test.go
new file mode 100644
index 00000000..a8f8d39a
--- /dev/null
+++ b/lib/telemetry/metadata_example_test.go
@@ -0,0 +1,41 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry_test
+
+import (
+ "fmt"
+
+ "github.com/shuLhan/share/lib/telemetry"
+)
+
+func ExampleMetadata() {
+ var md = telemetry.NewMetadata()
+
+ // The new Metadata has version=0.
+ fmt.Println(md.Version(), md.String(), ".")
+
+ // Setting a key increase the version to 1
+ md.Set(`host`, `localhost`)
+ fmt.Println(md.Version(), md.String(), ".")
+
+ // ... even if the key already exist.
+ md.Set(`host`, `my.localhost`)
+ fmt.Println(md.Version(), md.String(), ".")
+
+ // Deleting a key increase the version too.
+ md.Delete(`host`)
+ fmt.Println(md.Version(), md.String(), ".")
+
+ // But if the key is not exist, it will not increase the version.
+ md.Delete(`host`)
+ fmt.Println(md.Version(), md.String(), ".")
+
+ // Output:
+ // 0 .
+ // 1 host=localhost .
+ // 2 host=my.localhost .
+ // 3 .
+ // 3 .
+}
diff --git a/lib/telemetry/metric.go b/lib/telemetry/metric.go
new file mode 100644
index 00000000..8cab29a2
--- /dev/null
+++ b/lib/telemetry/metric.go
@@ -0,0 +1,12 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry
+
+// Metric contains name, value, and timestamp.
+type Metric struct {
+ Name string `json:"name"`
+ Value float64 `json:"value"`
+ Timestamp int64 `json:"timestamp"`
+}
diff --git a/lib/telemetry/stdout_forwarder.go b/lib/telemetry/stdout_forwarder.go
new file mode 100644
index 00000000..4e84898b
--- /dev/null
+++ b/lib/telemetry/stdout_forwarder.go
@@ -0,0 +1,44 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry
+
+import (
+ "fmt"
+ "os"
+)
+
+// StdoutForwarder write the metrics to os.Stdout.
+// This type is used as example and to provide wrapper for os.Stdout, since
+// user should not call Close on os.Stdout.
+type StdoutForwarder struct {
+ formatter Formatter
+}
+
+// NewStdoutForwarder create new StdoutForwarder using f as Formatter.
+func NewStdoutForwarder(f Formatter) *StdoutForwarder {
+ return &StdoutForwarder{
+ formatter: f,
+ }
+}
+
+// Close on StdoutForwarder sync the Stdout.
+func (stdout *StdoutForwarder) Close() error {
+ os.Stdout.Sync()
+ return nil
+}
+
+// Formatter return the Formatter used by this StdoutForwarder.
+func (stdout *StdoutForwarder) Formatter() Formatter {
+ return stdout.formatter
+}
+
+// Write the raw metrics to stdout.
+func (stdout *StdoutForwarder) Write(wire []byte) (n int, err error) {
+ n, err = os.Stdout.Write(wire)
+ if err != nil {
+ return n, fmt.Errorf(`StdoutForwarder.Forward: %w`, err)
+ }
+ return n, nil
+}
diff --git a/lib/telemetry/telemetry.go b/lib/telemetry/telemetry.go
new file mode 100644
index 00000000..cdf654a0
--- /dev/null
+++ b/lib/telemetry/telemetry.go
@@ -0,0 +1,10 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package telemetry is a library for collecting various [Metric], for example
+// from standard [runtime/metrics], and send or write it to one or more
+// [Forwarder].
+// Each Forwarder has capability to format the Metric before sending or
+// writing it using [Formatter].
+package telemetry
diff --git a/lib/telemetry/timestamper.go b/lib/telemetry/timestamper.go
new file mode 100644
index 00000000..0d141f9c
--- /dev/null
+++ b/lib/telemetry/timestamper.go
@@ -0,0 +1,33 @@
+// Copyright 2023, Shulhan <ms@kilabit.info>. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package telemetry
+
+import "time"
+
+// Timestamper a type that return a function to generate timestamp.
+type Timestamper func() int64
+
+// SecondTimestamp return the number of seconds elapsed since January 1,
+// 1970 UTC
+func SecondTimestamp() Timestamper {
+ return func() int64 { return time.Now().Unix() }
+}
+
+// MilliTimestamp return the number of milliseconds elapsed since January 1,
+// 1970 UTC.
+func MilliTimestamp() Timestamper {
+ return func() int64 { return time.Now().UnixMilli() }
+}
+
+// NanoTimestamp return the number of nanoseconds elapsed since January 1,
+// 1970 UTC
+func NanoTimestamp() Timestamper {
+ return func() int64 { return time.Now().UnixNano() }
+}
+
+// DummyTimestamp return fixed epoch 1678606568, for testing only.
+func DummyTimestamp() Timestamper {
+ return func() int64 { return 1678606568 }
+}