aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2019-01-04 16:07:58 +0700
committerShulhan <ms@kilabit.info>2019-01-04 16:07:58 +0700
commitba7bb17a36b5611ecd2a22c1d7393343c44cc8d3 (patch)
tree348209deb31540f88a743fad69b3d789cdf5db81
parenta5d1c2878a207b4fdc9203134df17539970f10b8 (diff)
downloadhaminer-ba7bb17a36b5611ecd2a22c1d7393343c44cc8d3.tar.xz
haminer: change buffered mode from max size to time interval
Previously, we forward the logs only if total collected logs in buffer is greater or equal to 10. This commit change the model into using time interval, where the logs will be send every N seconds (default to 15 seconds).
-rw-r--r--cmd/haminer/haminer.conf8
-rw-r--r--config.go36
-rw-r--r--config_test.go56
-rw-r--r--haminer.go17
-rw-r--r--testdata/haminer.conf3
5 files changed, 91 insertions, 29 deletions
diff --git a/cmd/haminer/haminer.conf b/cmd/haminer/haminer.conf
index 3dea780..1c2fb9a 100644
--- a/cmd/haminer/haminer.conf
+++ b/cmd/haminer/haminer.conf
@@ -56,6 +56,14 @@
#influxdb_api_write=
##
+## Duration, in seconds, when the logs will be forwarded.
+##
+## Format: DIGIT "s"
+## Default: "15s"
+##
+#forward_interval = 15s
+
+##
## Pre-process tag by substituting replacing their value using regular
## expression.
## Each pre-process rules is run from top to bottom, which means if we have
diff --git a/config.go b/config.go
index 4bdd57b..d8c66e3 100644
--- a/config.go
+++ b/config.go
@@ -8,6 +8,7 @@ import (
"log"
"strconv"
"strings"
+ "time"
"github.com/shuLhan/share/lib/ini"
)
@@ -16,6 +17,7 @@ import (
const (
ConfigKeyAcceptBackend = "accept_backend"
ConfigKeyCaptureRequestHeader = "capture_request_header"
+ ConfigKeyForwardInterval = "forward_interval"
ConfigKeyInfluxAPIWrite = "influxdb_api_write"
ConfigKeyListen = "listen"
)
@@ -24,7 +26,7 @@ const (
const (
defListenAddr = "127.0.0.1"
defListenPort = 5140
- defMaxBufferedLogs = 10
+ defForwardInterval = 15 * time.Second
)
//
@@ -39,6 +41,9 @@ type Config struct {
// AcceptBackend list of backend to be filtered.
AcceptBackend []string
+ // ForwardInterval define an interval where logs will be forwarded.
+ ForwardInterval time.Duration
+
// List of request headers to be parsed and mapped as keys in halog
// output.
RequestHeaders []string
@@ -46,10 +51,6 @@ type Config struct {
// InfluxAPIWrite define HTTP API to write to Influxdb.
InfluxAPIWrite string
- // MaxBufferedLogs define a number of logs that will be keep in buffer
- // before being forwarded.
- MaxBufferedLogs int
-
// retags contains list of pre-processing rules for tag.
retags []*tagPreprocessor
}
@@ -62,7 +63,7 @@ func NewConfig() (cfg *Config) {
return &Config{
ListenAddr: defListenAddr,
ListenPort: defListenPort,
- MaxBufferedLogs: defMaxBufferedLogs,
+ ForwardInterval: defForwardInterval,
}
}
@@ -94,12 +95,35 @@ func (cfg *Config) Load(path string) {
cfg.InfluxAPIWrite = v
}
+ v, _ = in.Get("haminer", "", ConfigKeyForwardInterval)
+ cfg.SetForwardInterval(v)
+
sec := in.GetSection("preprocess", "tag")
cfg.parsePreprocessTag(sec)
}
//
+// SetForwardInterval set forward interval using string formatted, e.g. "20s"
+// where "s" represent unit time in "second".
+//
+func (cfg *Config) SetForwardInterval(v string) {
+ if len(v) == 0 {
+ return
+ }
+
+ var err error
+
+ cfg.ForwardInterval, err = time.ParseDuration(v)
+ if err != nil {
+ log.Println("SetForwardInterval: ", err)
+ }
+ if cfg.ForwardInterval < defForwardInterval {
+ cfg.ForwardInterval = defForwardInterval
+ }
+}
+
+//
// SetListen will parse `v` value as "addr:port", and set config address and
// port based on it.
//
diff --git a/config_test.go b/config_test.go
index 53ff75f..478ec33 100644
--- a/config_test.go
+++ b/config_test.go
@@ -7,6 +7,7 @@ package haminer
import (
"regexp"
"testing"
+ "time"
"github.com/shuLhan/share/lib/ini"
"github.com/shuLhan/share/lib/test"
@@ -21,7 +22,7 @@ func TestNewConfig(t *testing.T) {
exp: &Config{
ListenAddr: defListenAddr,
ListenPort: defListenPort,
- MaxBufferedLogs: defMaxBufferedLogs,
+ ForwardInterval: defForwardInterval,
},
}}
@@ -44,7 +45,7 @@ func TestLoad(t *testing.T) {
exp: &Config{
ListenAddr: defListenAddr,
ListenPort: defListenPort,
- MaxBufferedLogs: defMaxBufferedLogs,
+ ForwardInterval: defForwardInterval,
},
}, {
desc: "With path not exist",
@@ -52,7 +53,7 @@ func TestLoad(t *testing.T) {
exp: &Config{
ListenAddr: defListenAddr,
ListenPort: defListenPort,
- MaxBufferedLogs: defMaxBufferedLogs,
+ ForwardInterval: defForwardInterval,
},
}, {
desc: "With path exist",
@@ -60,7 +61,7 @@ func TestLoad(t *testing.T) {
exp: &Config{
ListenAddr: "0.0.0.0",
ListenPort: 8080,
- MaxBufferedLogs: defMaxBufferedLogs,
+ ForwardInterval: time.Second * 20,
AcceptBackend: []string{
"a",
"b",
@@ -106,7 +107,7 @@ func TestSetListen(t *testing.T) {
exp: &Config{
ListenAddr: defListenAddr,
ListenPort: defListenPort,
- MaxBufferedLogs: defMaxBufferedLogs,
+ ForwardInterval: defForwardInterval,
},
}, {
desc: "With empty port",
@@ -114,7 +115,7 @@ func TestSetListen(t *testing.T) {
exp: &Config{
ListenAddr: "127.0.0.2",
ListenPort: defListenPort,
- MaxBufferedLogs: defMaxBufferedLogs,
+ ForwardInterval: defForwardInterval,
},
}, {
desc: "With no port",
@@ -122,7 +123,7 @@ func TestSetListen(t *testing.T) {
exp: &Config{
ListenAddr: "127.0.0.3",
ListenPort: defListenPort,
- MaxBufferedLogs: defMaxBufferedLogs,
+ ForwardInterval: defForwardInterval,
},
}}
@@ -146,7 +147,7 @@ func TestParseAcceptBackend(t *testing.T) {
exp: &Config{
ListenAddr: defListenAddr,
ListenPort: defListenPort,
- MaxBufferedLogs: defMaxBufferedLogs,
+ ForwardInterval: defForwardInterval,
},
}, {
desc: "With no separator",
@@ -154,7 +155,7 @@ func TestParseAcceptBackend(t *testing.T) {
exp: &Config{
ListenAddr: defListenAddr,
ListenPort: defListenPort,
- MaxBufferedLogs: defMaxBufferedLogs,
+ ForwardInterval: defForwardInterval,
AcceptBackend: []string{
"a ; b",
},
@@ -165,7 +166,7 @@ func TestParseAcceptBackend(t *testing.T) {
exp: &Config{
ListenAddr: defListenAddr,
ListenPort: defListenPort,
- MaxBufferedLogs: defMaxBufferedLogs,
+ ForwardInterval: defForwardInterval,
AcceptBackend: []string{
"a", "b",
},
@@ -192,7 +193,7 @@ func TestParseCaptureRequestHeader(t *testing.T) {
exp: &Config{
ListenAddr: defListenAddr,
ListenPort: defListenPort,
- MaxBufferedLogs: defMaxBufferedLogs,
+ ForwardInterval: defForwardInterval,
},
}, {
desc: "With no separator",
@@ -200,7 +201,7 @@ func TestParseCaptureRequestHeader(t *testing.T) {
exp: &Config{
ListenAddr: defListenAddr,
ListenPort: defListenPort,
- MaxBufferedLogs: defMaxBufferedLogs,
+ ForwardInterval: defForwardInterval,
RequestHeaders: []string{
"a ; b",
},
@@ -211,7 +212,7 @@ func TestParseCaptureRequestHeader(t *testing.T) {
exp: &Config{
ListenAddr: defListenAddr,
ListenPort: defListenPort,
- MaxBufferedLogs: defMaxBufferedLogs,
+ ForwardInterval: defForwardInterval,
RequestHeaders: []string{
"a", "b",
},
@@ -284,3 +285,32 @@ func TestParsePreprocessTag(t *testing.T) {
test.Assert(t, "retags", c.exp, cfg.retags, true)
}
}
+
+func TestSetForwardInterval(t *testing.T) {
+ cfg := NewConfig()
+
+ cases := []struct {
+ desc string
+ in string
+ exp time.Duration
+ }{{
+ desc: "With empty string",
+ exp: defForwardInterval,
+ }, {
+ desc: "With no interval unit",
+ in: "20",
+ exp: defForwardInterval,
+ }, {
+ desc: "With minus",
+ in: "-20s",
+ exp: defForwardInterval,
+ }}
+
+ for _, c := range cases {
+ t.Log(c.desc)
+
+ cfg.SetForwardInterval(c.in)
+
+ test.Assert(t, "ForwardInterval", c.exp, cfg.ForwardInterval, true)
+ }
+}
diff --git a/haminer.go b/haminer.go
index 3f62024..62a4aa7 100644
--- a/haminer.go
+++ b/haminer.go
@@ -11,6 +11,7 @@ import (
"os"
"os/signal"
"syscall"
+ "time"
)
//
@@ -149,21 +150,17 @@ func (h *Haminer) preprocess(halog *Halog) {
}
func (h *Haminer) produce() {
+ ticker := time.NewTicker(h.cfg.ForwardInterval)
halogs := make([]*Halog, 0)
for h.isRunning {
- halog, ok := <-h.chHalog
- if !ok {
- continue
- }
-
- h.preprocess(halog)
-
- halogs = append(halogs, halog)
+ select {
+ case halog := <-h.chHalog:
+ h.preprocess(halog)
- if len(halogs) >= h.cfg.MaxBufferedLogs {
+ halogs = append(halogs, halog)
+ case <-ticker.C:
h.forwards(halogs)
- halogs = make([]*Halog, 0)
}
}
}
diff --git a/testdata/haminer.conf b/testdata/haminer.conf
index 4e3bf5b..6f5b84f 100644
--- a/testdata/haminer.conf
+++ b/testdata/haminer.conf
@@ -3,8 +3,11 @@ listen = 0.0.0.0:8080
accept_backend = ,a , b,
capture_request_header = , host, referrer,
influxdb_api_write = http://127.0.0.1:8086/write
+forward_interval = 20s
[preprocess "tag"]
+# a comment
http_url = /[0-9]+-\\w+-\\w+-\\w+-\\w+-\\w+ => /-
+
http_url = /\\w+-\\w+-\\w+-\\w+-\\w+ => /-
http_url = /[0-9]+ => /-