From e6fed3ef602c587602a7e1eb1de303a0aafdc527 Mon Sep 17 00:00:00 2001 From: Shulhan Date: Mon, 18 Mar 2024 01:32:59 +0700 Subject: all: implement forwarder for Postgresql The Postgresql forwarder accept single option "URL", [forwarder "postgresql"] url = postgres://:@/?sslmode=<> The user and database must already created first, manually. --- Makefile | 18 +++- README.md | 29 ++++-- _doc/index.adoc | 4 +- cmd/haminer/haminer.conf | 9 ++ config_forwarder.go | 5 +- forwarder_postgresql.go | 114 +++++++++++++++++++++ forwarder_postgresql_test.go | 77 ++++++++++++++ go.mod | 7 +- go.sum | 2 + haminer.go | 10 ++ haminer_test.go | 20 ++++ http_log.go | 85 ++++++++++++++++ testdata/forwarderPostgresql_Forwards_test.txt | 133 +++++++++++++++++++++++++ 13 files changed, 497 insertions(+), 16 deletions(-) create mode 100644 forwarder_postgresql.go create mode 100644 forwarder_postgresql_test.go create mode 100644 haminer_test.go create mode 100644 testdata/forwarderPostgresql_Forwards_test.txt diff --git a/Makefile b/Makefile index dbf01a4..f840717 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,13 @@ ## SPDX-FileCopyrightText: 2018 M. Shulhan ## SPDX-License-Identifier: GPL-3.0-or-later -.PHONY: all build lint install serve-doc -all: install +.PHONY: all build lint serve-doc +all: build lint test build: go build -o ./_bin/ ./cmd/... -## Run all tests and generate coverage as HTML. +##---- Run all tests and generate coverage as HTML. COVER_OUT:=cover.out COVER_HTML:=cover.html @@ -18,6 +18,14 @@ test: -coverprofile=$(COVER_OUT) ./... go tool cover -html=$(COVER_OUT) -o $(COVER_HTML) +.PHONY: test-integration +test-integration: + CGO_ENABLED=1 go test -failfast -timeout=1m -race \ + -coverprofile=$(COVER_OUT) -integration ./... + go tool cover -html=$(COVER_OUT) -o $(COVER_HTML) + +##---- + lint: -fieldalignment ./... -shadow ./... @@ -28,13 +36,13 @@ lint: --disable bodyclose \ ./... -install: build test lint +install: go install -v ./cmd/haminer serve-doc: ciigo serve _doc -## Initialize local development by creating image using mkosi. +##---- Initialize local development by creating image using mkosi. ## NOTE: only works on GNU/Linux OS. MACHINE_NAME:=haminer-test diff --git a/README.md b/README.md index 1ca54ae..1a78ccf 100644 --- a/README.md +++ b/README.md @@ -14,12 +14,11 @@ readability): ``` See -[HTTP log format -documentation](https://www.haproxy.com/documentation/hapee/1-8r1/onepage/#8.2.3) +[HTTP log format documentation](https://www.haproxy.com/documentation/hapee/1-8r1/onepage/#8.2.3) for more information. -Currently, there are two supported database where haminer can forward the -parsed log: Influxdb and Questdb. +Currently, there are supported database where haminer can forward the +parsed log: Influxdb, Questdb, and Postgresql. Haminer support Influxdb v1 and v2. ``` @@ -98,8 +97,8 @@ for an example of possible configuration and their explanation. ### Forwarders -Currently, there are two supported database where haminer can forward the -parsed log: Influxdb and Questdb. +Currently, there are several database where haminer can forward the parsed +log: Influxdb, Questdb, and Postgresql. Haminer support Influxdb v1 and v2. #### Influxdb v1 @@ -156,6 +155,24 @@ url = udp://127.0.0.1:9009 We did not need to create the table, Questdb will handled that automatically. +#### Postgresql + +For Postgresql, you need to create the user and database first, for example, + +``` +postgres$ psql +postgres=> CREATE ROLE haminer PASSWORD 'haminer' CREATEDB INHERIT LOGIN; +postgres=> CREATE DATABASE haminer OWNER haminer; +postgres=> \q +``` + +The configuration only need the Data Source Name (DSN), + +``` +[forwarder "postgresql"] +url = postgres://:@/?sslmode= +``` + ## Deployment diff --git a/_doc/index.adoc b/_doc/index.adoc index 53f12f4..7ced570 100644 --- a/_doc/index.adoc +++ b/_doc/index.adoc @@ -17,8 +17,8 @@ See https://www.haproxy.com/documentation/hapee/1-8r1/onepage/#8.2.3[HTTP log format documentation] for more information. -Currently, there are two supported database where haminer can forward the -parsed log: Influxdb and Questdb. +Currently, there are several database where haminer can forward the parsed +log: Influxdb, Questdb, and Postgresql. Haminer support Influxdb v1 and v2. ---- diff --git a/cmd/haminer/haminer.conf b/cmd/haminer/haminer.conf index b2b51ec..20b7a3c 100644 --- a/cmd/haminer/haminer.conf +++ b/cmd/haminer/haminer.conf @@ -145,6 +145,15 @@ ## ## [1]: https://questdb.io/docs/reference/api/ilp/overview +[forwarder "postgresql"] + +## The Data Source Name of Postgresql server in the following format, +## +## "postgres://$user:$pass@$host/$database?sslmode= +## +## An empty url means the forwarder is disabled. +url = + [forwarder "questdb"] ## The URL of questdb server in the following format, diff --git a/config_forwarder.go b/config_forwarder.go index 0bc3cf9..e49e0c6 100644 --- a/config_forwarder.go +++ b/config_forwarder.go @@ -14,8 +14,9 @@ const ( influxdVersion1 = `v1` influxdVersion2 = `v2` - forwarderKindInfluxd = `influxd` - forwarderKindQuestdb = `questdb` + forwarderKindInfluxd = `influxd` + forwarderKindQuestdb = `questdb` + forwarderKindPostgresql = `postgresql` ) // ConfigForwarder contains configuration for forwarding the logs. diff --git a/forwarder_postgresql.go b/forwarder_postgresql.go new file mode 100644 index 0000000..9dbe957 --- /dev/null +++ b/forwarder_postgresql.go @@ -0,0 +1,114 @@ +// SPDX-FileCopyrightText: 2024 M. Shulhan +// SPDX-License-Identifier: GPL-3.0-or-later + +package haminer + +import ( + "database/sql" + "fmt" + + "git.sr.ht/~shulhan/pakakeh.go/lib/mlog" + libsql "git.sr.ht/~shulhan/pakakeh.go/lib/sql" + "github.com/lib/pq" +) + +// forwarderPostgresql the client to write logs to Postgresql database. +type forwarderPostgresql struct { + conn *libsql.Client +} + +// newForwarderPostgresql create new forwarder for Postgresql. +func newForwarderPostgresql(cfg ConfigForwarder) (fw *forwarderPostgresql, err error) { + var logp = `newForwarderPostgresql` + + fw = &forwarderPostgresql{} + + var opts = libsql.ClientOptions{ + DriverName: libsql.DriverNamePostgres, + DSN: cfg.URL, + } + + fw.conn, err = libsql.NewClient(opts) + if err != nil { + return nil, fmt.Errorf(`%s: %w`, logp, err) + } + + return fw, nil +} + +// Forwards insert the list of HTTP log into the Postgresql. +func (fw *forwarderPostgresql) Forwards(listLog []*HTTPLog) { + var ( + logp = `Forwards` + + sqltx *sql.Tx + err error + ) + + sqltx, err = fw.conn.Begin() + if err != nil { + mlog.Errf(`%s: %s`, logp, err) + return + } + + var ( + httpLog = HTTPLog{} + meta = httpLog.generateSQLMeta(libsql.DriverNamePostgres, libsql.DMLKindInsert) + ) + + var q = pq.CopyInSchema(`public`, `http_log`, meta.ListName...) + + var ( + stmt *sql.Stmt + alog *HTTPLog + ) + + stmt, err = sqltx.Prepare(q) + if err != nil { + goto failed + } + + for _, alog = range listLog { + httpLog = *alog + + _, err = stmt.Exec(meta.ListValue...) + if err != nil { + goto failed + } + } + + _, err = stmt.Exec() + if err != nil { + goto failed + } + + err = stmt.Close() + if err != nil { + mlog.Errf(`%s: %s`, logp, err) + _ = sqltx.Rollback() + return + } + + err = sqltx.Commit() + if err != nil { + mlog.Errf(`%s: %s`, logp, err) + return + } + + return + +failed: + mlog.Errf(`%s: %s`, logp, err) + + if stmt != nil { + err = stmt.Close() //nolint:sqlclosecheck + if err != nil { + mlog.Errf(`%s: %s`, logp, err) + } + } + + err = sqltx.Rollback() + if err != nil { + mlog.Errf(`%s: %s`, logp, err) + } +} diff --git a/forwarder_postgresql_test.go b/forwarder_postgresql_test.go new file mode 100644 index 0000000..1ada5d7 --- /dev/null +++ b/forwarder_postgresql_test.go @@ -0,0 +1,77 @@ +// SPDX-FileCopyrightText: 2024 M. Shulhan +// SPDX-License-Identifier: GPL-3.0-or-later + +package haminer + +import ( + "encoding/json" + "testing" + + "git.sr.ht/~shulhan/pakakeh.go/lib/test" +) + +func TestForwarderPostgresql_Forwards(t *testing.T) { + if !testIntegration { + t.Skip() + } + + var ( + logp = `TestForwarderPostgresql_Forwards` + + tdata *test.Data + err error + ) + + tdata, err = test.LoadData(`testdata/forwarderPostgresql_Forwards_test.txt`) + if err != nil { + t.Fatal(logp, err) + } + + var ( + fwdConfig = ConfigForwarder{ + URL: `postgres://haminer:haminer@169.254.194.180/haminer?sslmode=disable`, + } + + fwdpg *forwarderPostgresql + ) + + fwdpg, err = newForwarderPostgresql(fwdConfig) + if err != nil { + t.Fatal(logp, err) + } + + err = fwdpg.conn.TruncateTable(tableNameHTTPLog) + if err != nil { + t.Fatal(logp, err) + } + + var ( + tag = `http_log.json` + rawb = tdata.Input[tag] + + logs []*HTTPLog + ) + + err = json.Unmarshal(rawb, &logs) + if err != nil { + t.Fatal(logp, err) + } + + fwdpg.Forwards(logs) + + var listLog []HTTPLog + + listLog, err = listHTTPLog(fwdpg.conn) + if err != nil { + t.Fatal(logp, err) + } + + rawb, err = json.MarshalIndent(listLog, ``, ` `) + if err != nil { + t.Fatal(logp, err) + } + + var exp = tdata.Output[tag] + + test.Assert(t, `listHTTPLog`, string(exp), string(rawb)) +} diff --git a/go.mod b/go.mod index b920d38..168e5da 100644 --- a/go.mod +++ b/go.mod @@ -2,9 +2,14 @@ module git.sr.ht/~shulhan/haminer go 1.21 -require git.sr.ht/~shulhan/pakakeh.go v0.53.2-0.20240315075343-713d51e4792f +require ( + git.sr.ht/~shulhan/pakakeh.go v0.53.2-0.20240315075343-713d51e4792f + github.com/lib/pq v1.10.9 +) require ( golang.org/x/net v0.22.0 // indirect golang.org/x/sys v0.18.0 // indirect ) + +//replace git.sr.ht/~shulhan/pakakeh.go => ../pakakeh.go diff --git a/go.sum b/go.sum index 190ab4f..3174779 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ git.sr.ht/~shulhan/pakakeh.go v0.53.2-0.20240315075343-713d51e4792f h1:bP4msj5TVm+kQ6GUt6QvwEXOVOzUk2MQc5c8bSto8sc= git.sr.ht/~shulhan/pakakeh.go v0.53.2-0.20240315075343-713d51e4792f/go.mod h1:tTHoHDHuBxj5q1zwpLZGCKrdc6i0I3sP8kPp+JEs16c= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= diff --git a/haminer.go b/haminer.go index ca1f850..64f47a8 100644 --- a/haminer.go +++ b/haminer.go @@ -92,6 +92,16 @@ func (h *Haminer) createForwarder() { continue } h.ff = append(h.ff, questc) + + case forwarderKindPostgresql: + var pgc *forwarderPostgresql + + pgc, err = newForwarderPostgresql(*fwCfg) + if err != nil { + log.Printf(`%s: %s: %s`, logp, fwName, err) + continue + } + h.ff = append(h.ff, pgc) } } } diff --git a/haminer_test.go b/haminer_test.go new file mode 100644 index 0000000..39c4763 --- /dev/null +++ b/haminer_test.go @@ -0,0 +1,20 @@ +// SPDX-FileCopyrightText: 2024 M. Shulhan +// SPDX-License-Identifier: GPL-3.0-or-later + +package haminer + +import ( + "flag" + "os" + "testing" +) + +var testIntegration bool + +func TestMain(m *testing.M) { + flag.BoolVar(&testIntegration, `integration`, false, `Run integration tests`) + flag.Parse() + + var status = m.Run() + os.Exit(status) +} diff --git a/http_log.go b/http_log.go index b4ad0b2..8aed58b 100644 --- a/http_log.go +++ b/http_log.go @@ -5,12 +5,15 @@ package haminer import ( "bytes" + "database/sql" "fmt" "io" "math" "strconv" "strings" "time" + + libsql "git.sr.ht/~shulhan/pakakeh.go/lib/sql" ) const ( @@ -44,6 +47,8 @@ const ( `bytes_read=%d` ) +const tableNameHTTPLog = `http_log` + // HTTPLog contains the mapping of haproxy HTTP log format to Go struct. // // Reference: https://cbonte.github.io/haproxy-dconv/1.7/configuration.html#8.2.3 @@ -93,6 +98,42 @@ type HTTPLog struct { BackendQueue int32 } +// listHTTPLog fetch all HTTPLog record from database. +func listHTTPLog(dbc libsql.Session) (list []HTTPLog, err error) { + var ( + logp = `ListHTTPLog` + httpLog = HTTPLog{} + meta = httpLog.generateSQLMeta(libsql.DriverNamePostgres, libsql.DMLKindSelect) + ) + + var q = fmt.Sprintf(`SELECT %s FROM %s ORDER BY request_date DESC;`, + meta.Names(), tableNameHTTPLog) + + var rows *sql.Rows + + rows, err = dbc.Query(q) + if err != nil { + return nil, fmt.Errorf(`%s: %w`, logp, err) + } + defer rows.Close() + + for rows.Next() { + err = rows.Scan(meta.ListValue...) + if err != nil { + return nil, fmt.Errorf(`%s: %w`, logp, err) + } + + var dup = httpLog + list = append(list, dup) + } + err = rows.Err() + if err != nil { + return nil, fmt.Errorf(`%s: %w`, logp, err) + } + + return list, nil +} + // ParseUDPPacket convert UDP packet (in bytes) to instance of HTTPLog. // // It will return nil if UDP packet is nil, have zero length, or cannot be @@ -352,6 +393,50 @@ func (httpLog *HTTPLog) parseConns(in []byte) (ok bool) { return } +func (httpLog *HTTPLog) generateSQLMeta(driver string, kind libsql.DMLKind) (meta *libsql.Meta) { + meta = libsql.NewMeta(driver, kind) + + meta.Bind(`request_date`, &httpLog.RequestDate) + meta.Bind(`client_ip`, &httpLog.ClientIP) + + meta.Bind(`frontend_name`, &httpLog.FrontendName) + meta.Bind(`backend_name`, &httpLog.BackendName) + meta.Bind(`server_name`, &httpLog.ServerName) + + meta.Bind(`http_proto`, &httpLog.HTTPProto) + meta.Bind(`http_method`, &httpLog.HTTPMethod) + meta.Bind(`http_url`, &httpLog.HTTPURL) + meta.Bind(`http_query`, &httpLog.HTTPQuery) + + meta.Bind(`header_request`, &httpLog.rawHeaderRequest) + meta.Bind(`header_response`, &httpLog.rawHeaderResponse) + + meta.Bind(`cookie_request`, &httpLog.CookieRequest) + meta.Bind(`cookie_response`, &httpLog.CookieResponse) + meta.Bind(`termination_state`, &httpLog.TerminationState) + + meta.Bind(`bytes_read`, &httpLog.BytesRead) + meta.Bind(`status_code`, &httpLog.StatusCode) + meta.Bind(`client_port`, &httpLog.ClientPort) + + meta.Bind(`time_request`, &httpLog.TimeRequest) + meta.Bind(`time_wait`, &httpLog.TimeWait) + meta.Bind(`time_connect`, &httpLog.TimeConnect) + meta.Bind(`time_response`, &httpLog.TimeResponse) + meta.Bind(`time_all`, &httpLog.TimeAll) + + meta.Bind(`conn_active`, &httpLog.ConnActive) + meta.Bind(`conn_frontend`, &httpLog.ConnFrontend) + meta.Bind(`conn_backend`, &httpLog.ConnBackend) + meta.Bind(`conn_server`, &httpLog.ConnServer) + meta.Bind(`retries`, &httpLog.Retries) + + meta.Bind(`server_queue`, &httpLog.ServerQueue) + meta.Bind(`backend_queue`, &httpLog.BackendQueue) + + return meta +} + func (httpLog *HTTPLog) parseQueue(in []byte) (ok bool) { httpLog.ServerQueue, ok = parseToInt32(in, '/') if !ok { diff --git a/testdata/forwarderPostgresql_Forwards_test.txt b/testdata/forwarderPostgresql_Forwards_test.txt new file mode 100644 index 0000000..b936661 --- /dev/null +++ b/testdata/forwarderPostgresql_Forwards_test.txt @@ -0,0 +1,133 @@ +Test data for forwarding with Postgresql. + +>>> http_log.json +[ + { + "RequestDate": "2024-03-17T05:08:28.886Z", + "HeaderRequest": null, + "HeaderResponse": null, + "ClientIP": "169.254.63.64", + "FrontendName": "fe-http", + "BackendName": "be-http", + "ServerName": "be-http2", + "HTTPProto": "HTTP/1.1", + "HTTPMethod": "GET", + "HTTPURL": "/", + "HTTPQuery": "", + "CookieRequest": "-", + "CookieResponse": "-", + "TerminationState": "----", + "BytesRead": 149, + "StatusCode": 200, + "ClientPort": 52722, + "TimeRequest": 10, + "TimeWait": 20, + "TimeConnect": 30, + "TimeResponse": 40, + "TimeAll": 50, + "ConnActive": 1, + "ConnFrontend": 1, + "ConnBackend": 2, + "ConnServer": 3, + "Retries": 4, + "ServerQueue": 5, + "BackendQueue": 6 + }, + { + "RequestDate": "2024-03-17T05:09:00.006Z", + "HeaderRequest": null, + "HeaderResponse": null, + "ClientIP": "169.254.63.65", + "FrontendName": "fe-http", + "BackendName": "be-http", + "ServerName": "be-http1", + "HTTPProto": "HTTP/1.1", + "HTTPMethod": "GET", + "HTTPURL": "/", + "HTTPQuery": "", + "CookieRequest": "-", + "CookieResponse": "-", + "TerminationState": "----", + "BytesRead": 149, + "StatusCode": 200, + "ClientPort": 52723, + "TimeRequest": 11, + "TimeWait": 21, + "TimeConnect": 31, + "TimeResponse": 41, + "TimeAll": 51, + "ConnActive": 1, + "ConnFrontend": 1, + "ConnBackend": 2, + "ConnServer": 3, + "Retries": 4, + "ServerQueue": 5, + "BackendQueue": 6 + } +] + +<<< http_log.json +[ + { + "RequestDate": "2024-03-17T05:09:00.006Z", + "HeaderRequest": null, + "HeaderResponse": null, + "ClientIP": "169.254.63.65", + "FrontendName": "fe-http", + "BackendName": "be-http", + "ServerName": "be-http1", + "HTTPProto": "HTTP/1.1", + "HTTPMethod": "GET", + "HTTPURL": "/", + "HTTPQuery": "", + "CookieRequest": "-", + "CookieResponse": "-", + "TerminationState": "----", + "BytesRead": 149, + "StatusCode": 200, + "ClientPort": 52723, + "TimeRequest": 11, + "TimeWait": 21, + "TimeConnect": 31, + "TimeResponse": 41, + "TimeAll": 51, + "ConnActive": 1, + "ConnFrontend": 1, + "ConnBackend": 2, + "ConnServer": 3, + "Retries": 4, + "ServerQueue": 5, + "BackendQueue": 6 + }, + { + "RequestDate": "2024-03-17T05:08:28.886Z", + "HeaderRequest": null, + "HeaderResponse": null, + "ClientIP": "169.254.63.64", + "FrontendName": "fe-http", + "BackendName": "be-http", + "ServerName": "be-http2", + "HTTPProto": "HTTP/1.1", + "HTTPMethod": "GET", + "HTTPURL": "/", + "HTTPQuery": "", + "CookieRequest": "-", + "CookieResponse": "-", + "TerminationState": "----", + "BytesRead": 149, + "StatusCode": 200, + "ClientPort": 52722, + "TimeRequest": 10, + "TimeWait": 20, + "TimeConnect": 30, + "TimeResponse": 40, + "TimeAll": 50, + "ConnActive": 1, + "ConnFrontend": 1, + "ConnBackend": 2, + "ConnServer": 3, + "Retries": 4, + "ServerQueue": 5, + "BackendQueue": 6 + } +] -- cgit v1.3