aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/haminer/haminer.conf24
-rw-r--r--cmd/haminer/main.go29
-rw-r--r--config.go169
-rw-r--r--config_test.go253
-rw-r--r--forwarder.go2
-rw-r--r--go.mod4
-rw-r--r--go.sum5
-rw-r--r--halog.go9
-rw-r--r--haminer.go16
-rw-r--r--influxdb.go6
-rw-r--r--tagpreprocessor.go7
-rw-r--r--tagpreprocessor_test.go6
-rw-r--r--testdata/haminer.conf6
-rw-r--r--udppacket.go6
14 files changed, 173 insertions, 369 deletions
diff --git a/cmd/haminer/haminer.conf b/cmd/haminer/haminer.conf
index 8be012c..5159194 100644
--- a/cmd/haminer/haminer.conf
+++ b/cmd/haminer/haminer.conf
@@ -4,7 +4,7 @@
##
## Format
##
-## ADDR [ ":" PORT ]
+## listen = ADDR [ ":" PORT ]
##
## Default: 127.0.0.1:5140
##
@@ -17,18 +17,19 @@
#listen=
##
-## List of HAProxy backend to be accepted and forwarded to Influxdb. Each
-## backend name is separated by comma.
+## List of HAProxy backend to be accepted and forwarded to Influxdb.
+## Each accept_backend can be listed multiple times.
##
## Format
##
-## [ name ] *[ "," name ]
+## accept_backend = <string>
##
## Default: "", no filter (all backend are accepted).
##
## Examples
##
-## accept_backend=api_01,api_02
+## accept_backend=api_01
+## accept_backend=api_02
##
#accept_backend=
@@ -39,7 +40,7 @@
##
## Format
##
-## [ name ] *["," name]
+## capture_request_header = <string>
##
## The name should contains only alphabets and underscore.
##
@@ -47,7 +48,8 @@
##
## Examples
##
-## capture_request_header = host , referrer
+## capture_request_header = host
+## capture_request_header = referrer
##
#capture_request_header=
@@ -58,7 +60,7 @@
##
## Format
##
-## http://<hostname|IP-address>:<port>/write?db=<influxdb-name>[&u=username][&p=password]
+## influxdb_api_write = http://<hostname|IP-address>:<port>/write?db=<influxdb-name>[&u=username][&p=password]
##
## Default: "", empty. If empty the log will not forwarded to Influxdb.
##
@@ -74,7 +76,7 @@
##
## Format
##
-## DIGIT "s"
+## forward_interval = DIGIT "s"
##
## Default: "15s"
##
@@ -97,8 +99,8 @@
## http_url = /uuid/\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12} => /uuid/-
## http_url = /id/[0-9]+ => /id/-
##
-## This will replace "/id/1000" with "/id/-" and/or
-## "/uuid/e7282bca-73b3-48fc-9793-6446ea6ebff3" into "/uuid/-"
+## This will replace "/id/1000" with "/id/-" and/or
+## "/uuid/e7282bca-73b3-48fc-9793-6446ea6ebff3" into "/uuid/-"
##
## If the order of key is wrong, you may get the unexpected output.
## For example,
diff --git a/cmd/haminer/main.go b/cmd/haminer/main.go
index 578ccd4..2583d20 100644
--- a/cmd/haminer/main.go
+++ b/cmd/haminer/main.go
@@ -18,10 +18,12 @@ const (
defConfig = "/etc/haminer.conf"
)
-func initConfig() (cfg *haminer.Config) {
+func initConfig() (cfg *haminer.Config, err error) {
var (
- flagConfig, flagListen, flagAcceptBackend string
- flagInfluxAPIWrite string
+ flagConfig string
+ flagListen string
+ flagAcceptBackend string
+ flagInfluxAPIWrite string
)
log.SetPrefix(defLogPrefix)
@@ -45,7 +47,10 @@ func initConfig() (cfg *haminer.Config) {
flag.Parse()
if len(flagConfig) > 0 {
- cfg.Load(flagConfig)
+ err = cfg.Load(flagConfig)
+ if err != nil {
+ return nil, err
+ }
}
if len(flagListen) > 0 {
cfg.SetListen(flagListen)
@@ -57,18 +62,26 @@ func initConfig() (cfg *haminer.Config) {
cfg.InfluxAPIWrite = flagInfluxAPIWrite
}
- return cfg
+ return cfg, nil
}
func main() {
- cfg := initConfig()
+ var (
+ cfg *haminer.Config
+ err error
+ )
+
+ cfg, err = initConfig()
+ if err != nil {
+ log.Fatal(err)
+ }
fmt.Printf("Starting Haminer with config: %+v\n", cfg)
h := haminer.NewHaminer(cfg)
- err := h.Start()
+ err = h.Start()
if err != nil {
- log.Println(err)
+ log.Fatal(err)
}
}
diff --git a/config.go b/config.go
index d8c66e3..79d08f5 100644
--- a/config.go
+++ b/config.go
@@ -5,7 +5,7 @@
package haminer
import (
- "log"
+ "fmt"
"strconv"
"strings"
"time"
@@ -29,104 +29,81 @@ const (
defForwardInterval = 15 * time.Second
)
-//
// Config define options to create and run Haminer instance.
-//
type Config struct {
- // ListenAddr is an IP address where Haminer will bind and receiving
+ // Listen is the address where Haminer will bind and receiving
// log from HAProxy.
- ListenAddr string
- ListenPort int
+ Listen string `ini:"haminer::listen"`
- // AcceptBackend list of backend to be filtered.
- AcceptBackend []string
+ listenAddr string
- // ForwardInterval define an interval where logs will be forwarded.
- ForwardInterval time.Duration
+ // AcceptBackend list of backend to be filtered.
+ AcceptBackend []string `ini:"haminer::accept_backend"`
// List of request headers to be parsed and mapped as keys in halog
// output.
- RequestHeaders []string
+ RequestHeaders []string `ini:"haminer::capture_request_header"`
// InfluxAPIWrite define HTTP API to write to Influxdb.
- InfluxAPIWrite string
+ InfluxAPIWrite string `ini:"haminer::influxdb_api_write"`
+
+ HttpUrl []string `ini:"preprocess:tag:http_url"`
// retags contains list of pre-processing rules for tag.
retags []*tagPreprocessor
+
+ // ForwardInterval define an interval where logs will be forwarded.
+ ForwardInterval time.Duration `ini:"haminer::forward_interval"`
+
+ listenPort int
}
-//
// NewConfig will create, initialize, and return new config with default
// values.
-//
func NewConfig() (cfg *Config) {
return &Config{
- ListenAddr: defListenAddr,
- ListenPort: defListenPort,
+ listenAddr: defListenAddr,
+ listenPort: defListenPort,
ForwardInterval: defForwardInterval,
}
}
-//
// Load configuration from file defined by `path`.
-//
-func (cfg *Config) Load(path string) {
+func (cfg *Config) Load(path string) (err error) {
if len(path) == 0 {
return
}
- in, err := ini.Open(path)
- if err != nil {
- log.Println(err)
- return
- }
+ var (
+ logp = `Load`
- v, _ := in.Get("haminer", "", ConfigKeyListen)
- cfg.SetListen(v)
+ in *ini.Ini
+ )
- v, _ = in.Get("haminer", "", ConfigKeyAcceptBackend)
- cfg.ParseAcceptBackend(v)
-
- v, _ = in.Get("haminer", "", ConfigKeyCaptureRequestHeader)
- cfg.ParseCaptureRequestHeader(v)
-
- v, _ = in.Get("haminer", "", ConfigKeyInfluxAPIWrite)
- if len(v) > 0 {
- cfg.InfluxAPIWrite = v
+ in, err = ini.Open(path)
+ if err != nil {
+ return fmt.Errorf(`%s: %w`, logp, err)
}
- v, _ = in.Get("haminer", "", ConfigKeyForwardInterval)
- cfg.SetForwardInterval(v)
-
- sec := in.GetSection("preprocess", "tag")
-
- cfg.parsePreprocessTag(sec)
-}
-
-//
-// SetForwardInterval set forward interval using string formatted, e.g. "20s"
-// where "s" represent unit time in "second".
-//
-func (cfg *Config) SetForwardInterval(v string) {
- if len(v) == 0 {
- return
+ err = in.Unmarshal(cfg)
+ if err != nil {
+ return fmt.Errorf(`%s: %w`, logp, err)
}
- var err error
+ if len(cfg.Listen) != 0 {
+ cfg.SetListen(cfg.Listen)
+ }
- cfg.ForwardInterval, err = time.ParseDuration(v)
+ err = cfg.parsePreprocessTag()
if err != nil {
- log.Println("SetForwardInterval: ", err)
- }
- if cfg.ForwardInterval < defForwardInterval {
- cfg.ForwardInterval = defForwardInterval
+ return fmt.Errorf(`%s: %w`, logp, err)
}
+
+ return nil
}
-//
// SetListen will parse `v` value as "addr:port", and set config address and
// port based on it.
-//
func (cfg *Config) SetListen(v string) {
if len(v) == 0 {
return
@@ -137,78 +114,38 @@ func (cfg *Config) SetListen(v string) {
addrPort := strings.Split(v, ":")
switch len(addrPort) {
case 1:
- cfg.ListenAddr = addrPort[0]
+ cfg.listenAddr = addrPort[0]
case 2:
- cfg.ListenAddr = addrPort[0]
- cfg.ListenPort, err = strconv.Atoi(addrPort[1])
+ cfg.listenAddr = addrPort[0]
+ cfg.listenPort, err = strconv.Atoi(addrPort[1])
if err != nil {
- cfg.ListenPort = defListenPort
- }
- }
-}
-
-func (cfg *Config) ParseAcceptBackend(v string) {
- v = strings.TrimSpace(v)
- if len(v) == 0 {
- return
- }
-
- for _, v = range strings.Split(v, ",") {
- if len(v) == 0 {
- continue
+ cfg.listenPort = defListenPort
}
- cfg.AcceptBackend = append(cfg.AcceptBackend, strings.TrimSpace(v))
}
}
-//
-// ParseCaptureRequestHeader parse request header names where each name is
-// separated by ",".
-//
-func (cfg *Config) ParseCaptureRequestHeader(v string) {
- v = strings.TrimSpace(v)
- if len(v) == 0 {
- return
- }
+func (cfg *Config) parsePreprocessTag() (err error) {
+ var (
+ logp = `parsePreprocessTag`
- headers := strings.Split(v, ",")
- for x := 0; x < len(headers); x++ {
- headers[x] = strings.TrimSpace(headers[x])
- if len(headers[x]) == 0 {
- continue
- }
- cfg.RequestHeaders = append(cfg.RequestHeaders, headers[x])
- }
-}
-
-func (cfg *Config) parsePreprocessTag(sec *ini.Section) {
- if sec == nil {
- return
- }
-
- for _, v := range sec.Vars {
- if len(v.KeyLower) == 0 {
- continue
- }
- if v.KeyLower != "http_url" {
- log.Printf("parsePreprocessTag: unknown tag %q\n",
- v.KeyLower)
- continue
- }
+ retag *tagPreprocessor
+ httpUrl string
+ vals []string
+ )
- rep := strings.Split(v.Value, "=>")
- if len(rep) != 2 {
- log.Printf("parsePreprocessTag: invalid format %q\n",
- v.Value)
+ for _, httpUrl = range cfg.HttpUrl {
+ vals = strings.Split(httpUrl, "=>")
+ if len(vals) != 2 {
continue
}
- retag, err := newTagPreprocessor(v.KeyLower, rep[0], rep[1])
+ retag, err = newTagPreprocessor(`http_url`, vals[0], vals[1])
if err != nil {
- log.Printf("parsePreprocessTag: %s\n", err)
- continue
+ return fmt.Errorf(`%s: %w`, logp, err)
}
cfg.retags = append(cfg.retags, retag)
}
+
+ return nil
}
diff --git a/config_test.go b/config_test.go
index 478ec33..775743b 100644
--- a/config_test.go
+++ b/config_test.go
@@ -9,19 +9,18 @@ import (
"testing"
"time"
- "github.com/shuLhan/share/lib/ini"
"github.com/shuLhan/share/lib/test"
)
func TestNewConfig(t *testing.T) {
cases := []struct {
- desc string
exp *Config
+ desc string
}{{
desc: "With default config",
exp: &Config{
- ListenAddr: defListenAddr,
- ListenPort: defListenPort,
+ listenAddr: defListenAddr,
+ listenPort: defListenPort,
ForwardInterval: defForwardInterval,
},
}}
@@ -31,36 +30,39 @@ func TestNewConfig(t *testing.T) {
got := NewConfig()
- test.Assert(t, "Config", c.exp, got, true)
+ test.Assert(t, `Config`, c.exp, got)
}
}
func TestLoad(t *testing.T) {
- cases := []struct {
+ type testCase struct {
+ exp *Config
desc string
in string
- exp *Config
- }{{
+ }
+
+ var cases = []testCase{{
desc: "With empty path",
exp: &Config{
- ListenAddr: defListenAddr,
- ListenPort: defListenPort,
+ listenAddr: defListenAddr,
+ listenPort: defListenPort,
ForwardInterval: defForwardInterval,
},
}, {
desc: "With path not exist",
in: "testdata/notexist.conf",
exp: &Config{
- ListenAddr: defListenAddr,
- ListenPort: defListenPort,
+ listenAddr: defListenAddr,
+ listenPort: defListenPort,
ForwardInterval: defForwardInterval,
},
}, {
desc: "With path exist",
in: "testdata/haminer.conf",
exp: &Config{
- ListenAddr: "0.0.0.0",
- ListenPort: 8080,
+ Listen: `0.0.0.0:8080`,
+ listenAddr: `0.0.0.0`,
+ listenPort: 8080,
ForwardInterval: time.Second * 20,
AcceptBackend: []string{
"a",
@@ -71,6 +73,11 @@ func TestLoad(t *testing.T) {
"referrer",
},
InfluxAPIWrite: "http://127.0.0.1:8086/write",
+ HttpUrl: []string{
+ `/[0-9]+-\w+-\w+-\w+-\w+-\w+ => /-`,
+ `/\w+-\w+-\w+-\w+-\w+ => /-`,
+ `/[0-9]+ => /-`,
+ },
retags: []*tagPreprocessor{{
name: "http_url",
regex: regexp.MustCompile(`/[0-9]+-\w+-\w+-\w+-\w+-\w+`),
@@ -87,42 +94,51 @@ func TestLoad(t *testing.T) {
},
}}
- for _, c := range cases {
+ var (
+ c testCase
+ got *Config
+ err error
+ )
+
+ for _, c = range cases {
t.Log(c.desc)
- got := NewConfig()
- got.Load(c.in)
+ got = NewConfig()
+ err = got.Load(c.in)
+ if err != nil {
+ t.Fatal(err)
+ }
- test.Assert(t, "Config", c.exp, got, true)
+ test.Assert(t, `Config`, c.exp, got)
}
}
func TestSetListen(t *testing.T) {
cases := []struct {
+ exp *Config
desc string
in string
- exp *Config
}{{
desc: "With empty listen",
exp: &Config{
- ListenAddr: defListenAddr,
- ListenPort: defListenPort,
+ listenAddr: defListenAddr,
+ listenPort: defListenPort,
ForwardInterval: defForwardInterval,
},
}, {
desc: "With empty port",
in: "127.0.0.2",
exp: &Config{
- ListenAddr: "127.0.0.2",
- ListenPort: defListenPort,
+ listenAddr: `127.0.0.2`,
+ listenPort: defListenPort,
ForwardInterval: defForwardInterval,
},
}, {
desc: "With no port",
in: "127.0.0.3:",
exp: &Config{
- ListenAddr: "127.0.0.3",
- ListenPort: defListenPort,
+ listenAddr: `127.0.0.3`,
+ listenPort: defListenPort,
ForwardInterval: defForwardInterval,
},
}}
@@ -133,141 +149,31 @@ func TestSetListen(t *testing.T) {
got := NewConfig()
got.SetListen(c.in)
- test.Assert(t, "Config", c.exp, got, true)
- }
-}
-
-func TestParseAcceptBackend(t *testing.T) {
- cases := []struct {
- desc string
- in string
- exp *Config
- }{{
- desc: "With empty value",
- exp: &Config{
- ListenAddr: defListenAddr,
- ListenPort: defListenPort,
- ForwardInterval: defForwardInterval,
- },
- }, {
- desc: "With no separator",
- in: "a ; b",
- exp: &Config{
- ListenAddr: defListenAddr,
- ListenPort: defListenPort,
- ForwardInterval: defForwardInterval,
- AcceptBackend: []string{
- "a ; b",
- },
- },
- }, {
- desc: "With comma at beginning and end",
- in: ",a,b,",
- exp: &Config{
- ListenAddr: defListenAddr,
- ListenPort: defListenPort,
- ForwardInterval: defForwardInterval,
- AcceptBackend: []string{
- "a", "b",
- },
- },
- }}
-
- for _, c := range cases {
- t.Log(c.desc)
-
- got := NewConfig()
- got.ParseAcceptBackend(c.in)
-
- test.Assert(t, "Config", c.exp, got, true)
+ test.Assert(t, `Config`, c.exp, got)
}
}
-func TestParseCaptureRequestHeader(t *testing.T) {
- cases := []struct {
- desc string
- in string
- exp *Config
- }{{
- desc: "With empty value",
- exp: &Config{
- ListenAddr: defListenAddr,
- ListenPort: defListenPort,
- ForwardInterval: defForwardInterval,
- },
- }, {
- desc: "With no separator",
- in: "a ; b",
- exp: &Config{
- ListenAddr: defListenAddr,
- ListenPort: defListenPort,
- ForwardInterval: defForwardInterval,
- RequestHeaders: []string{
- "a ; b",
- },
- },
- }, {
- desc: "With separator at beginning and end",
- in: ",a,b,",
- exp: &Config{
- ListenAddr: defListenAddr,
- ListenPort: defListenPort,
- ForwardInterval: defForwardInterval,
- RequestHeaders: []string{
- "a", "b",
- },
- },
- }}
-
- for _, c := range cases {
- t.Log(c.desc)
-
- got := NewConfig()
- got.ParseCaptureRequestHeader(c.in)
-
- test.Assert(t, "Config", c.exp, got, true)
+func TestParsePreprocessTag(t *testing.T) {
+ type testCase struct {
+ desc string
+ httpUrl []string
+ exp []*tagPreprocessor
}
-}
-func TestParsePreprocessTag(t *testing.T) {
- cfg := NewConfig()
+ var (
+ cfg = NewConfig()
+ )
- cases := []struct {
- desc string
- in *ini.Section
- exp []*tagPreprocessor
- }{{
- desc: "With nil",
+ var cases = []testCase{{
+ desc: `With invalid format`,
+ httpUrl: []string{``},
}, {
- desc: "With unknown key",
- in: &ini.Section{
- Vars: []*ini.Variable{{
- KeyLower: "unknown",
- }},
- },
+ desc: `With empty regex`,
+ httpUrl: []string{`=>`},
}, {
- desc: "With invalid format",
- in: &ini.Section{
- Vars: []*ini.Variable{{
- KeyLower: "http_url",
- Value: "",
- }},
- },
- }, {
- desc: "With empty regex",
- in: &ini.Section{
- Vars: []*ini.Variable{{
- KeyLower: "http_url",
- Value: "=>",
- }},
- },
- }, {
- desc: "With valid value",
- in: &ini.Section{
- Vars: []*ini.Variable{{
- KeyLower: "http_url",
- Value: "/[0-9]+ => /-",
- }},
+ desc: `With valid value`,
+ httpUrl: []string{
+ `/[0-9]+ => /-`,
},
exp: []*tagPreprocessor{{
name: "http_url",
@@ -276,41 +182,22 @@ func TestParsePreprocessTag(t *testing.T) {
}},
}}
- for _, c := range cases {
+ var (
+ c testCase
+ err error
+ )
+
+ for _, c = range cases {
t.Log(c.desc)
cfg.retags = nil
- cfg.parsePreprocessTag(c.in)
-
- test.Assert(t, "retags", c.exp, cfg.retags, true)
- }
-}
-
-func TestSetForwardInterval(t *testing.T) {
- cfg := NewConfig()
-
- cases := []struct {
- desc string
- in string
- exp time.Duration
- }{{
- desc: "With empty string",
- exp: defForwardInterval,
- }, {
- desc: "With no interval unit",
- in: "20",
- exp: defForwardInterval,
- }, {
- desc: "With minus",
- in: "-20s",
- exp: defForwardInterval,
- }}
-
- for _, c := range cases {
- t.Log(c.desc)
+ cfg.HttpUrl = c.httpUrl
- cfg.SetForwardInterval(c.in)
+ err = cfg.parsePreprocessTag()
+ if err != nil {
+ t.Fatal(err)
+ }
- test.Assert(t, "ForwardInterval", c.exp, cfg.ForwardInterval, true)
+ test.Assert(t, `retags`, c.exp, cfg.retags)
}
}
diff --git a/forwarder.go b/forwarder.go
index 0d59156..79ab2bc 100644
--- a/forwarder.go
+++ b/forwarder.go
@@ -1,9 +1,7 @@
package haminer
-//
// Forwarder define an interface to forward parsed HAProxy log to storage
// engine.
-//
type Forwarder interface {
Forwards(halogs []*Halog)
}
diff --git a/go.mod b/go.mod
index 6ed0df5..a17af52 100644
--- a/go.mod
+++ b/go.mod
@@ -1,5 +1,5 @@
module github.com/shuLhan/haminer
-go 1.12
+go 1.18
-require github.com/shuLhan/share v0.2.0
+require github.com/shuLhan/share v0.40.0
diff --git a/go.sum b/go.sum
index 6da22fd..27da647 100644
--- a/go.sum
+++ b/go.sum
@@ -1,3 +1,2 @@
-github.com/shuLhan/share v0.2.0 h1:xm12WiZ0L9Yj5fNUgxKXfpxl4SJ9VuRdRwbQ5cACeik=
-github.com/shuLhan/share v0.2.0/go.mod h1:SjjW0kmafz7R07V8GP8D9YvTj0jQ4A5LoLVYjC+920U=
-golang.org/x/sys v0.0.0-20181228144115-9a3f9b0469bb/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+github.com/shuLhan/share v0.40.0 h1:C0c1lfKLzogUStIiYJecoTTP9TrEDMz64la1Y1l8Wl0=
+github.com/shuLhan/share v0.40.0/go.mod h1:hb3Kis5s4jPume4YD15JELE67naFybtuALshhh9TlOg=
diff --git a/halog.go b/halog.go
index 555811f..fd3f432 100644
--- a/halog.go
+++ b/halog.go
@@ -21,11 +21,9 @@ var heartbeat = &Halog{ // nolint: gochecknoglobals
HTTPMethod: "-",
}
-//
// Halog contains the mapping of haproxy HTTP log format to Go struct.
//
// Reference: https://cbonte.github.io/haproxy-dconv/1.7/configuration.html#8.2.3
-//
type Halog struct { // nolint: maligned
Timestamp time.Time
@@ -69,10 +67,8 @@ type Halog struct { // nolint: maligned
tagHTTPURL string
}
-//
// cleanPrefix will remove `<date-time> <process-name>[pid]: ` prefix (which
// come from systemd/rsyslog) in input.
-//
func cleanPrefix(in []byte) bool {
start := bytes.IndexByte(in, '[')
if start < 0 {
@@ -204,11 +200,9 @@ func (halog *Halog) parseQueue(in []byte) (ok bool) {
return
}
-//
// parserRequestHeaders parse the request header values in log file.
// The request headers start with '{' and end with '}'.
// Each header is separated by '|'.
-//
func (halog *Halog) parseRequestHeaders(in []byte, reqHeaders []string) (ok bool) {
if in[0] != '{' {
// Skip if we did not find the beginning.
@@ -259,7 +253,6 @@ func (halog *Halog) parseHTTP(in []byte) (ok bool) {
return ok
}
-//
// Parse will parse one line of HAProxy log format into Halog.
//
// nolint: gocyclo
@@ -377,13 +370,11 @@ func (halog *Halog) Parse(in []byte, reqHeaders []string) (ok bool) {
return ok
}
-//
// ParseUDPPacket will convert UDP packet (in bytes) to instance of
// Halog.
//
// It will return nil and false if UDP packet is nil, have zero length, or
// cannot be parsed (rejected).
-//
func (halog *Halog) ParseUDPPacket(p *UDPPacket, reqHeaders []string) bool {
if p == nil {
return false
diff --git a/haminer.go b/haminer.go
index d9aa0d7..4864a97 100644
--- a/haminer.go
+++ b/haminer.go
@@ -14,22 +14,18 @@ import (
"time"
)
-//
// Haminer define the log consumer and producer.
-//
type Haminer struct {
cfg *Config
udpConn *net.UDPConn
- isRunning bool
chSignal chan os.Signal
chHalog chan *Halog
ff []Forwarder
+ isRunning bool
}
-//
// NewHaminer create, initialize, and return new Haminer instance. If config
// parameter is nil, it will use the default options.
-//
func NewHaminer(cfg *Config) (h *Haminer) {
if cfg == nil {
cfg = NewConfig()
@@ -58,14 +54,12 @@ func (h *Haminer) createForwarder() {
}
}
-//
// Start will listen for UDP packet and start consuming log, parse, and
// publish it to analytic server.
-//
func (h *Haminer) Start() (err error) {
udpAddr := &net.UDPAddr{
- IP: net.ParseIP(h.cfg.ListenAddr),
- Port: h.cfg.ListenPort,
+ IP: net.ParseIP(h.cfg.listenAddr),
+ Port: h.cfg.listenPort,
}
h.udpConn, err = net.ListenUDP("udp", udpAddr)
@@ -85,9 +79,7 @@ func (h *Haminer) Start() (err error) {
return
}
-//
// filter will return true if log is accepted; otherwise it will return false.
-//
func (h *Haminer) filter(halog *Halog) bool {
if halog == nil {
return false
@@ -171,9 +163,7 @@ func (h *Haminer) produce() {
}
}
-//
// Stop will close UDP server and clear all resources.
-//
func (h *Haminer) Stop() {
h.isRunning = false
diff --git a/influxdb.go b/influxdb.go
index 03e983e..c5c1622 100644
--- a/influxdb.go
+++ b/influxdb.go
@@ -34,9 +34,7 @@ const (
"bytes_read=%d"
)
-//
// InfluxdbClient contains HTTP connection for writing logs to Influxdb.
-//
type InfluxdbClient struct {
conn *http.Client
apiWrite string
@@ -44,9 +42,7 @@ type InfluxdbClient struct {
buf bytes.Buffer
}
-//
// NewInfluxdbClient will create, initialize, and return new Influxdb client.
-//
func NewInfluxdbClient(apiWrite string) (cl *InfluxdbClient) {
cl = &InfluxdbClient{
apiWrite: apiWrite,
@@ -78,10 +74,8 @@ func (cl *InfluxdbClient) initConn() {
}
}
-//
// Forwards implement the Forwarder interface. It will write all logs to
// Influxdb.
-//
func (cl *InfluxdbClient) Forwards(halogs []*Halog) {
lsrc := "InfluxdbClient.Forwards"
err := cl.write(halogs)
diff --git a/tagpreprocessor.go b/tagpreprocessor.go
index 2be8846..64a9bec 100644
--- a/tagpreprocessor.go
+++ b/tagpreprocessor.go
@@ -5,7 +5,6 @@
package haminer
import (
- "errors"
"regexp"
"strings"
)
@@ -16,11 +15,9 @@ type tagPreprocessor struct {
repl string
}
-//
// newTagPreprocessor create and initialize replace tag pre-processing.
// The regex and repl strings must be enclosed with double-quote, except for
// repl it can be empty.
-//
func newTagPreprocessor(name, regex, repl string) (
retag *tagPreprocessor, err error,
) {
@@ -29,10 +26,10 @@ func newTagPreprocessor(name, regex, repl string) (
repl = strings.TrimSpace(repl)
if len(name) == 0 {
- return nil, errors.New("newTagPreprocessor: empty name parameter")
+ return nil, nil
}
if len(regex) == 0 {
- return nil, errors.New("newTagPreprocessor: empty regex parameter")
+ return nil, nil
}
re, err := regexp.Compile(regex)
diff --git a/tagpreprocessor_test.go b/tagpreprocessor_test.go
index c5f59e9..2a5a785 100644
--- a/tagpreprocessor_test.go
+++ b/tagpreprocessor_test.go
@@ -48,11 +48,11 @@ func TestNewTagPreprocessor(t *testing.T) {
got, err := newTagPreprocessor(c.name, c.regex, c.repl)
if err != nil {
- test.Assert(t, "error", c.expErr, err.Error(), true)
+ test.Assert(t, `error`, c.expErr, err.Error())
continue
}
- test.Assert(t, "TagPreprocessor", c.exp, got, true)
+ test.Assert(t, `TagPreprocessor`, c.exp, got)
}
}
@@ -124,6 +124,6 @@ func TestPreprocess(t *testing.T) {
t.Log("got: ", got)
}
- test.Assert(t, "preprocess", c.exp, got, true)
+ test.Assert(t, `preprocess`, c.exp, got)
}
}
diff --git a/testdata/haminer.conf b/testdata/haminer.conf
index 6f5b84f..e0b1d70 100644
--- a/testdata/haminer.conf
+++ b/testdata/haminer.conf
@@ -1,7 +1,9 @@
[haminer]
listen = 0.0.0.0:8080
-accept_backend = ,a , b,
-capture_request_header = , host, referrer,
+accept_backend = a
+accept_backend = b
+capture_request_header = host
+capture_request_header = referrer
influxdb_api_write = http://127.0.0.1:8086/write
forward_interval = 20s
diff --git a/udppacket.go b/udppacket.go
index 8cecfb0..fd52a61 100644
--- a/udppacket.go
+++ b/udppacket.go
@@ -8,16 +8,12 @@ const (
defSize = 1024
)
-//
// UDPPacket wrap the slice of bytes for easy manipulation.
-//
type UDPPacket struct {
Bytes []byte
}
-//
// NewUDPPacket will create and initialize UDP packet.
-//
func NewUDPPacket(size uint32) (p *UDPPacket) {
if size <= 0 {
size = defSize
@@ -29,10 +25,8 @@ func NewUDPPacket(size uint32) (p *UDPPacket) {
return
}
-//
// Reset will set the content of packet data to zero, so it can be used
// against on Read().
-//
func (p *UDPPacket) Reset() {
p.Bytes[0] = 0
for x := 1; x < len(p.Bytes); x *= 2 {