diff options
| author | Shulhan <ms@kilabit.info> | 2019-01-04 16:07:58 +0700 |
|---|---|---|
| committer | Shulhan <ms@kilabit.info> | 2019-01-04 16:07:58 +0700 |
| commit | ba7bb17a36b5611ecd2a22c1d7393343c44cc8d3 (patch) | |
| tree | 348209deb31540f88a743fad69b3d789cdf5db81 | |
| parent | a5d1c2878a207b4fdc9203134df17539970f10b8 (diff) | |
| download | haminer-ba7bb17a36b5611ecd2a22c1d7393343c44cc8d3.tar.xz | |
haminer: change buffered mode from max size to time interval
Previously, we forward the logs only if total collected logs in buffer
is greater or equal to 10.
This commit change the model into using time interval, where the logs
will be send every N seconds (default to 15 seconds).
| -rw-r--r-- | cmd/haminer/haminer.conf | 8 | ||||
| -rw-r--r-- | config.go | 36 | ||||
| -rw-r--r-- | config_test.go | 56 | ||||
| -rw-r--r-- | haminer.go | 17 | ||||
| -rw-r--r-- | testdata/haminer.conf | 3 |
5 files changed, 91 insertions, 29 deletions
diff --git a/cmd/haminer/haminer.conf b/cmd/haminer/haminer.conf index 3dea780..1c2fb9a 100644 --- a/cmd/haminer/haminer.conf +++ b/cmd/haminer/haminer.conf @@ -56,6 +56,14 @@ #influxdb_api_write= ## +## Duration, in seconds, when the logs will be forwarded. +## +## Format: DIGIT "s" +## Default: "15s" +## +#forward_interval = 15s + +## ## Pre-process tag by substituting replacing their value using regular ## expression. ## Each pre-process rules is run from top to bottom, which means if we have @@ -8,6 +8,7 @@ import ( "log" "strconv" "strings" + "time" "github.com/shuLhan/share/lib/ini" ) @@ -16,6 +17,7 @@ import ( const ( ConfigKeyAcceptBackend = "accept_backend" ConfigKeyCaptureRequestHeader = "capture_request_header" + ConfigKeyForwardInterval = "forward_interval" ConfigKeyInfluxAPIWrite = "influxdb_api_write" ConfigKeyListen = "listen" ) @@ -24,7 +26,7 @@ const ( const ( defListenAddr = "127.0.0.1" defListenPort = 5140 - defMaxBufferedLogs = 10 + defForwardInterval = 15 * time.Second ) // @@ -39,6 +41,9 @@ type Config struct { // AcceptBackend list of backend to be filtered. AcceptBackend []string + // ForwardInterval define an interval where logs will be forwarded. + ForwardInterval time.Duration + // List of request headers to be parsed and mapped as keys in halog // output. RequestHeaders []string @@ -46,10 +51,6 @@ type Config struct { // InfluxAPIWrite define HTTP API to write to Influxdb. InfluxAPIWrite string - // MaxBufferedLogs define a number of logs that will be keep in buffer - // before being forwarded. - MaxBufferedLogs int - // retags contains list of pre-processing rules for tag. retags []*tagPreprocessor } @@ -62,7 +63,7 @@ func NewConfig() (cfg *Config) { return &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, } } @@ -94,12 +95,35 @@ func (cfg *Config) Load(path string) { cfg.InfluxAPIWrite = v } + v, _ = in.Get("haminer", "", ConfigKeyForwardInterval) + cfg.SetForwardInterval(v) + sec := in.GetSection("preprocess", "tag") cfg.parsePreprocessTag(sec) } // +// SetForwardInterval set forward interval using string formatted, e.g. "20s" +// where "s" represent unit time in "second". +// +func (cfg *Config) SetForwardInterval(v string) { + if len(v) == 0 { + return + } + + var err error + + cfg.ForwardInterval, err = time.ParseDuration(v) + if err != nil { + log.Println("SetForwardInterval: ", err) + } + if cfg.ForwardInterval < defForwardInterval { + cfg.ForwardInterval = defForwardInterval + } +} + +// // SetListen will parse `v` value as "addr:port", and set config address and // port based on it. // diff --git a/config_test.go b/config_test.go index 53ff75f..478ec33 100644 --- a/config_test.go +++ b/config_test.go @@ -7,6 +7,7 @@ package haminer import ( "regexp" "testing" + "time" "github.com/shuLhan/share/lib/ini" "github.com/shuLhan/share/lib/test" @@ -21,7 +22,7 @@ func TestNewConfig(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }} @@ -44,7 +45,7 @@ func TestLoad(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }, { desc: "With path not exist", @@ -52,7 +53,7 @@ func TestLoad(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }, { desc: "With path exist", @@ -60,7 +61,7 @@ func TestLoad(t *testing.T) { exp: &Config{ ListenAddr: "0.0.0.0", ListenPort: 8080, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: time.Second * 20, AcceptBackend: []string{ "a", "b", @@ -106,7 +107,7 @@ func TestSetListen(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }, { desc: "With empty port", @@ -114,7 +115,7 @@ func TestSetListen(t *testing.T) { exp: &Config{ ListenAddr: "127.0.0.2", ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }, { desc: "With no port", @@ -122,7 +123,7 @@ func TestSetListen(t *testing.T) { exp: &Config{ ListenAddr: "127.0.0.3", ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }} @@ -146,7 +147,7 @@ func TestParseAcceptBackend(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }, { desc: "With no separator", @@ -154,7 +155,7 @@ func TestParseAcceptBackend(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, AcceptBackend: []string{ "a ; b", }, @@ -165,7 +166,7 @@ func TestParseAcceptBackend(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, AcceptBackend: []string{ "a", "b", }, @@ -192,7 +193,7 @@ func TestParseCaptureRequestHeader(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, }, }, { desc: "With no separator", @@ -200,7 +201,7 @@ func TestParseCaptureRequestHeader(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, RequestHeaders: []string{ "a ; b", }, @@ -211,7 +212,7 @@ func TestParseCaptureRequestHeader(t *testing.T) { exp: &Config{ ListenAddr: defListenAddr, ListenPort: defListenPort, - MaxBufferedLogs: defMaxBufferedLogs, + ForwardInterval: defForwardInterval, RequestHeaders: []string{ "a", "b", }, @@ -284,3 +285,32 @@ func TestParsePreprocessTag(t *testing.T) { test.Assert(t, "retags", c.exp, cfg.retags, true) } } + +func TestSetForwardInterval(t *testing.T) { + cfg := NewConfig() + + cases := []struct { + desc string + in string + exp time.Duration + }{{ + desc: "With empty string", + exp: defForwardInterval, + }, { + desc: "With no interval unit", + in: "20", + exp: defForwardInterval, + }, { + desc: "With minus", + in: "-20s", + exp: defForwardInterval, + }} + + for _, c := range cases { + t.Log(c.desc) + + cfg.SetForwardInterval(c.in) + + test.Assert(t, "ForwardInterval", c.exp, cfg.ForwardInterval, true) + } +} @@ -11,6 +11,7 @@ import ( "os" "os/signal" "syscall" + "time" ) // @@ -149,21 +150,17 @@ func (h *Haminer) preprocess(halog *Halog) { } func (h *Haminer) produce() { + ticker := time.NewTicker(h.cfg.ForwardInterval) halogs := make([]*Halog, 0) for h.isRunning { - halog, ok := <-h.chHalog - if !ok { - continue - } - - h.preprocess(halog) - - halogs = append(halogs, halog) + select { + case halog := <-h.chHalog: + h.preprocess(halog) - if len(halogs) >= h.cfg.MaxBufferedLogs { + halogs = append(halogs, halog) + case <-ticker.C: h.forwards(halogs) - halogs = make([]*Halog, 0) } } } diff --git a/testdata/haminer.conf b/testdata/haminer.conf index 4e3bf5b..6f5b84f 100644 --- a/testdata/haminer.conf +++ b/testdata/haminer.conf @@ -3,8 +3,11 @@ listen = 0.0.0.0:8080 accept_backend = ,a , b, capture_request_header = , host, referrer, influxdb_api_write = http://127.0.0.1:8086/write +forward_interval = 20s [preprocess "tag"] +# a comment http_url = /[0-9]+-\\w+-\\w+-\\w+-\\w+-\\w+ => /- + http_url = /\\w+-\\w+-\\w+-\\w+-\\w+ => /- http_url = /[0-9]+ => /- |
