From ba7bb17a36b5611ecd2a22c1d7393343c44cc8d3 Mon Sep 17 00:00:00 2001 From: Shulhan Date: Fri, 4 Jan 2019 16:07:58 +0700 Subject: haminer: change buffered mode from max size to time interval Previously, we forward the logs only if total collected logs in buffer is greater or equal to 10. This commit change the model into using time interval, where the logs will be send every N seconds (default to 15 seconds). --- cmd/haminer/haminer.conf | 8 +++++++ config.go | 36 +++++++++++++++++++++++++------ config_test.go | 56 +++++++++++++++++++++++++++++++++++++----------- haminer.go | 17 ++++++--------- testdata/haminer.conf | 3 +++ 5 files changed, 91 insertions(+), 29 deletions(-) diff --git a/cmd/haminer/haminer.conf b/cmd/haminer/haminer.conf index 3dea780..1c2fb9a 100644 --- a/cmd/haminer/haminer.conf +++ b/cmd/haminer/haminer.conf @@ -55,6 +55,14 @@ #influxdb_api_write= +## +## Duration, in seconds, when the logs will be forwarded. +## +## Format: DIGIT "s" +## Default: "15s" +## +#forward_interval = 15s + ## ## Pre-process tag by substituting replacing their value using regular ## expression. diff --git a/config.go b/config.go index 4bdd57b..d8c66e3 100644 --- a/config.go +++ b/config.go @@ -8,6 +8,7 @@ import ( "log" "strconv" "strings" + "time" "github.com/shuLhan/share/lib/ini" ) @@ -16,6 +17,7 @@ import ( const ( ConfigKeyAcceptBackend = "accept_backend" ConfigKeyCaptureRequestHeader = "capture_request_header" + ConfigKeyForwardInterval = "forward_interval" ConfigKeyInfluxAPIWrite = "influxdb_api_write" ConfigKeyListen = "listen" ) @@ -24,7 +26,7 @@ const ( const ( defListenAddr = "127.0.0.1" defListenPort = 5140 - defMaxBufferedLogs = 10 + defForwardInterval = 15 * time.Second ) // @@ -39,6 +41,9 @@ type Config struct { // AcceptBackend list of backend to be filtered. AcceptBackend []string + // ForwardInterval define an interval where logs will be forwarded. + ForwardInterval time.Duration + // List of request headers to be parsed and mapped as keys in halog // output. RequestHeaders []string @@ -46,10 +51,6 @@ type Config struct { // InfluxAPIWrite define HTTP API to write to Influxdb. InfluxAPIWrite string - // MaxBufferedLogs define a number of logs that will be keep in buffer - // before being forwarded. - MaxBufferedLogs int - // retags contains list of pre-processing rules for tag. retags []*tagPreprocessor } @@ -62,7 +63,7 @@ func NewConfig() (cfg *Config) { return &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, } } @@ -94,11 +95,34 @@ func (cfg *Config) Load(path string) { cfg.InfluxAPIWrite = v } + v, _ = in.Get("haminer", "", ConfigKeyForwardInterval) + cfg.SetForwardInterval(v) + sec := in.GetSection("preprocess", "tag") cfg.parsePreprocessTag(sec) } +// +// SetForwardInterval set forward interval using string formatted, e.g. "20s" +// where "s" represent unit time in "second". +// +func (cfg *Config) SetForwardInterval(v string) { + if len(v) == 0 { + return + } + + var err error + + cfg.ForwardInterval, err = time.ParseDuration(v) + if err != nil { + log.Println("SetForwardInterval: ", err) + } + if cfg.ForwardInterval < defForwardInterval { + cfg.ForwardInterval = defForwardInterval + } +} + // // SetListen will parse `v` value as "addr:port", and set config address and // port based on it. diff --git a/config_test.go b/config_test.go index 53ff75f..478ec33 100644 --- a/config_test.go +++ b/config_test.go @@ -7,6 +7,7 @@ package haminer import ( "regexp" "testing" + "time" "github.com/shuLhan/share/lib/ini" "github.com/shuLhan/share/lib/test" @@ -21,7 +22,7 @@ func TestNewConfig(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }} @@ -44,7 +45,7 @@ func TestLoad(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }, { desc: "With path not exist", @@ -52,7 +53,7 @@ func TestLoad(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }, { desc: "With path exist", @@ -60,7 +61,7 @@ func TestLoad(t *testing.T) { exp: &Config{ ListenAddr: "0.0.0.0", ListenPort: 8080, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: time.Second * 20, AcceptBackend: []string{ "a", "b", @@ -106,7 +107,7 @@ func TestSetListen(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }, { desc: "With empty port", @@ -114,7 +115,7 @@ func TestSetListen(t *testing.T) { exp: &Config{ ListenAddr: "127.0.0.2", ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }, { desc: "With no port", @@ -122,7 +123,7 @@ func TestSetListen(t *testing.T) { exp: &Config{ ListenAddr: "127.0.0.3", ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }} @@ -146,7 +147,7 @@ func TestParseAcceptBackend(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }, { desc: "With no separator", @@ -154,7 +155,7 @@ func TestParseAcceptBackend(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, AcceptBackend: []string{ "a ; b", }, @@ -165,7 +166,7 @@ func TestParseAcceptBackend(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, AcceptBackend: []string{ "a", "b", }, @@ -192,7 +193,7 @@ func TestParseCaptureRequestHeader(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }, { desc: "With no separator", @@ -200,7 +201,7 @@ func TestParseCaptureRequestHeader(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, RequestHeaders: []string{ "a ; b", }, @@ -211,7 +212,7 @@ func TestParseCaptureRequestHeader(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, RequestHeaders: []string{ "a", "b", }, @@ -284,3 +285,32 @@ func TestParsePreprocessTag(t *testing.T) { test.Assert(t, "retags", c.exp, cfg.retags, true) } } + +func TestSetForwardInterval(t *testing.T) { + cfg := NewConfig() + + cases := []struct { + desc string + in string + exp time.Duration + }{{ + desc: "With empty string", + exp: defForwardInterval, + }, { + desc: "With no interval unit", + in: "20", + exp: defForwardInterval, + }, { + desc: "With minus", + in: "-20s", + exp: defForwardInterval, + }} + + for _, c := range cases { + t.Log(c.desc) + + cfg.SetForwardInterval(c.in) + + test.Assert(t, "ForwardInterval", c.exp, cfg.ForwardInterval, true) + } +} diff --git a/haminer.go b/haminer.go index 3f62024..62a4aa7 100644 --- a/haminer.go +++ b/haminer.go @@ -11,6 +11,7 @@ import ( "os" "os/signal" "syscall" + "time" ) // @@ -149,21 +150,17 @@ func (h *Haminer) preprocess(halog *Halog) { } func (h *Haminer) produce() { + ticker := time.NewTicker(h.cfg.ForwardInterval) halogs := make([]*Halog, 0) for h.isRunning { - halog, ok := <-h.chHalog - if !ok { - continue - } - - h.preprocess(halog) - - halogs = append(halogs, halog) + select { + case halog := <-h.chHalog: + h.preprocess(halog) - if len(halogs) >= h.cfg.MaxBufferedLogs { + halogs = append(halogs, halog) + case <-ticker.C: h.forwards(halogs) - halogs = make([]*Halog, 0) } } } diff --git a/testdata/haminer.conf b/testdata/haminer.conf index 4e3bf5b..6f5b84f 100644 --- a/testdata/haminer.conf +++ b/testdata/haminer.conf @@ -3,8 +3,11 @@ listen = 0.0.0.0:8080 accept_backend = ,a , b, capture_request_header = , host, referrer, influxdb_api_write = http://127.0.0.1:8086/write +forward_interval = 20s [preprocess "tag"] +# a comment http_url = /[0-9]+-\\w+-\\w+-\\w+-\\w+-\\w+ => /- + http_url = /\\w+-\\w+-\\w+-\\w+-\\w+ => /- http_url = /[0-9]+ => /- -- cgit v1.3