From 62161730b5802c4206f0cc169a744b364576bb74 Mon Sep 17 00:00:00 2001 From: Shulhan Date: Fri, 29 Aug 2025 01:37:28 +0700 Subject: all: implement notification using Mattermost incoming webhook In the server configuration, one can defined the following section to send the notification using Mattermost, [notif] kind = mattermost webhook_url = # The incoming webhook URL. channel = # The channel where the notification will be placed. down_template = # Message template when service is down. up_template = # Message template when service is up. --- README.md | 49 +++++++++ lilin_test.go | 100 ++++++++++++------ notif_config.go | 54 ++++++++++ server_config.go | 9 ++ server_config_test.go | 81 +++++++++++++++ service.go | 39 ++++++- service_test.go | 18 ++-- testdata/etc/lilin/lilin.cfg | 2 +- testdata/server_config/ok/etc/lilin/lilin.cfg | 19 ++++ .../worker/pushNotifMattermost/etc/lilin/lilin.cfg | 12 +++ .../etc/lilin/service.d/dummy_http.cfg | 6 ++ .../var/log/lilin/service.d/dummy_http.log | 0 worker.go | 114 ++++++++++++++++++++- worker_test.go | 29 ++++++ 14 files changed, 480 insertions(+), 52 deletions(-) create mode 100644 README.md create mode 100644 notif_config.go create mode 100644 server_config_test.go create mode 100644 testdata/server_config/ok/etc/lilin/lilin.cfg create mode 100644 testdata/worker/pushNotifMattermost/etc/lilin/lilin.cfg create mode 100644 testdata/worker/pushNotifMattermost/etc/lilin/service.d/dummy_http.cfg create mode 100644 testdata/worker/pushNotifMattermost/var/log/lilin/service.d/dummy_http.log diff --git a/README.md b/README.md new file mode 100644 index 0000000..27b26fa --- /dev/null +++ b/README.md @@ -0,0 +1,49 @@ +# lilin + +## Notification + +Lilin support sending notification to, + +- Mattermost using incoming webhook. + +See the next section on how to use the notification. + +### Mattermost incoming webhook + +In the server configuration, add the section "notif" with the following +format, + +``` +[notif] +kind = mattermost +webhook_url = # The incoming webhook URL. +channel = # The channel where the notification will be placed. +down_template = # Message template when service is down. +up_template = # Message template when service is up. +``` + +The `down_template` and `up_template` can contains the following variables, + +- .ID: the service ID +- .At: the time when service up or down +- .Error: the error message that cause the service marked as down + +For example, given the following scan report value, + +``` +.ID: http-server +.At: 2025-09-26 06:38:11 +0000 UTC +.Error: 503 Service Unavailable +``` + +The following `down_template` + +``` +{{.At}}: Service {{.ID}} is down: {{.Error}} +``` + +will be rendered as + +``` +2025-09-26 06:38:11 +0000 UTC: Service http-server is down: 503 Service Unavailable +``` diff --git a/lilin_test.go b/lilin_test.go index e4db531..091f7c7 100644 --- a/lilin_test.go +++ b/lilin_test.go @@ -1,37 +1,91 @@ // SPDX-FileCopyrightText: 2025 M. Shulhan // SPDX-License-Identifier: GPL-3.0-only -package lilin_test +package lilin import ( "context" "errors" + "io" "log" "net/http" "testing" "time" - "git.sr.ht/~shulhan/lilin" "git.sr.ht/~shulhan/lilin/internal" "git.sr.ht/~shulhan/pakakeh.go/lib/net" ) -var client *lilin.Client - const ( dummyHTTPAddress = `127.0.0.1:6330` ) +type dummyHTTPService struct { + mux *http.ServeMux + httpd *http.Server + reqbodyq chan []byte +} + +var dhs *dummyHTTPService +var client *Client + +func newDummyHTTPService() (dhs *dummyHTTPService) { + dhs = &dummyHTTPService{ + mux: http.NewServeMux(), + reqbodyq: make(chan []byte), + } + + dhs.mux.HandleFunc(`GET /health`, func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + }) + + dhs.mux.HandleFunc(`POST /mattermost`, func(w http.ResponseWriter, req *http.Request) { + var logp = `POST /mattermost` + var reqbody []byte + var err error + reqbody, err = io.ReadAll(req.Body) + if err != nil { + log.Fatalf(`%s: %s`, logp, err) + } + w.WriteHeader(http.StatusOK) + go func() { + dhs.reqbodyq <- reqbody + }() + }) + + dhs.httpd = &http.Server{ + Addr: dummyHTTPAddress, + Handler: dhs.mux, + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + } + return dhs +} + +func (dhs *dummyHTTPService) start() { + var err = dhs.httpd.ListenAndServe() + if err != nil { + log.Fatal(err) + } +} + func TestMain(m *testing.M) { internal.Now = func() time.Time { return time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) } - var server *lilin.Server + dhs = newDummyHTTPService() + go dhs.start() + + var err = net.WaitAlive(`tcp`, dummyHTTPAddress, 5*time.Second) + if err != nil { + log.Fatal(err) + } + + var server *Server server = startServer() client = createClient(server) - go dummyHTTPService() m.Run() @@ -40,19 +94,19 @@ func TestMain(m *testing.M) { ctx, cancelfn = context.WithTimeout(context.Background(), 5*time.Second) defer cancelfn() - var err = server.Shutdown(ctx) + err = server.Shutdown(ctx) if err != nil { log.Fatal(err) } } -func startServer() (server *lilin.Server) { - var serverOpts = lilin.ServerConfig{ +func startServer() (server *Server) { + var serverOpts = ServerConfig{ BaseDir: `testdata`, } var err error - server, err = lilin.NewServer(serverOpts) + server, err = NewServer(serverOpts) if err != nil { log.Fatal(err) } @@ -71,35 +125,15 @@ func startServer() (server *lilin.Server) { return server } -func createClient(server *lilin.Server) (client *lilin.Client) { - var clientOpts = lilin.ClientConfig{ +func createClient(server *Server) (client *Client) { + var clientOpts = ClientConfig{ ServerURL: `http://` + server.Config.Address, } var err error - client, err = lilin.NewClient(clientOpts) + client, err = NewClient(clientOpts) if err != nil { log.Fatal(err) } return client } - -func dummyHTTPService() { - var mux = http.NewServeMux() - - mux.HandleFunc(`GET /health`, func(w http.ResponseWriter, _ *http.Request) { - w.WriteHeader(http.StatusOK) - }) - - var httpd = http.Server{ - Addr: dummyHTTPAddress, - Handler: mux, - ReadTimeout: 5 * time.Second, - WriteTimeout: 5 * time.Second, - } - - var err = httpd.ListenAndServe() - if err != nil { - log.Fatal(err) - } -} diff --git a/notif_config.go b/notif_config.go new file mode 100644 index 0000000..7fa7d2d --- /dev/null +++ b/notif_config.go @@ -0,0 +1,54 @@ +// SPDX-FileCopyrightText: 2025 M. Shulhan +// SPDX-License-Identifier: GPL-3.0-only + +package lilin + +import ( + "fmt" + "html/template" + "net/url" +) + +const ( + notifKindMattermost = `mattermost` +) + +type NotifConfig struct { + webhookURL *url.URL + downTmpl *template.Template + upTmpl *template.Template + + Kind string `ini:"notif::kind"` + WebhookURL string `ini:"notif::webhook_url"` + Channel string `ini:"notif::channel"` + DownTemplate string `ini:"notif::down_template"` + UpTemplate string `ini:"notif::up_template"` +} + +func (notifConfig *NotifConfig) init() (err error) { + switch notifConfig.Kind { + case notifKindMattermost: + notifConfig.webhookURL, err = url.Parse(notifConfig.WebhookURL) + if err != nil { + return fmt.Errorf(`invalid WebhookURL %q: %s`, + notifConfig.WebhookURL, err) + } + default: + return fmt.Errorf(`unknown notif kind: %s`, notifConfig.Kind) + } + if notifConfig.DownTemplate != "" { + notifConfig.downTmpl = template.New(`down`) + notifConfig.downTmpl, err = notifConfig.downTmpl.Parse(notifConfig.DownTemplate) + if err != nil { + return fmt.Errorf(`failed to parse down_template: %s`, err) + } + } + if notifConfig.UpTemplate != "" { + notifConfig.upTmpl = template.New(`up`) + notifConfig.upTmpl, err = notifConfig.upTmpl.Parse(notifConfig.UpTemplate) + if err != nil { + return fmt.Errorf(`failed to parse up_template: %s`, err) + } + } + return nil +} diff --git a/server_config.go b/server_config.go index 50d93ff..c9c553c 100644 --- a/server_config.go +++ b/server_config.go @@ -31,6 +31,8 @@ type ServerConfig struct { // The address to listen for HTTP server and APIs. Address string `ini:"server::address"` + Notifs []*NotifConfig `ini:"notif"` + // IsDevelopment run the server in development mode with direct access // to file system in _www instead of using [embed.FS]. IsDevelopment bool @@ -76,5 +78,12 @@ func (cfg *ServerConfig) init() (err error) { cfg.Address = defAddress } + for _, notifConfig := range cfg.Notifs { + err = notifConfig.init() + if err != nil { + return fmt.Errorf(`%s: %w`, logp, err) + } + } + return nil } diff --git a/server_config_test.go b/server_config_test.go new file mode 100644 index 0000000..bb30e5c --- /dev/null +++ b/server_config_test.go @@ -0,0 +1,81 @@ +// SPDX-FileCopyrightText: 2025 M. Shulhan +// SPDX-License-Identifier: GPL-3.0-only + +package lilin + +import ( + "net/url" + "testing" + + "git.sr.ht/~shulhan/pakakeh.go/lib/test" +) + +func TestServerConfig_init(t *testing.T) { + type testCase struct { + baseDir string + exp ServerConfig + } + + var webhookURL12000 *url.URL + var webhookURL12001 *url.URL + var err error + + webhookURL12000, err = url.Parse(`http://127.0.0.1:12000`) + if err != nil { + t.Fatal(err) + } + webhookURL12001, err = url.Parse(`http://127.0.0.1:12001`) + if err != nil { + t.Fatal(err) + } + + var listCase = []testCase{{ + baseDir: `testdata/server_config/ok/`, + exp: ServerConfig{ + BaseDir: `testdata/server_config/ok/`, + configDir: `testdata/server_config/ok/etc/lilin`, + configServiceDir: `testdata/server_config/ok/etc/lilin/service.d`, + logServiceDir: `testdata/server_config/ok/var/log/lilin/service.d`, + Address: `127.0.0.1:12121`, + Notifs: []*NotifConfig{{ + Kind: `mattermost`, + WebhookURL: `http://127.0.0.1:12000`, + Channel: `chan_1`, + UpTemplate: `Service {{.ID}} is up again!`, + DownTemplate: `Service {{.ID}} is down: {{.Error}}`, + webhookURL: webhookURL12000, + }, { + Kind: `mattermost`, + WebhookURL: `http://127.0.0.1:12001`, + Channel: `chan_2`, + UpTemplate: `Service {{.ID}} is alive!`, + DownTemplate: `Service {{.ID}} is down: {{.Error}}`, + webhookURL: webhookURL12001, + }}, + }, + }} + + for _, tcase := range listCase { + var cfg = ServerConfig{ + BaseDir: tcase.baseDir, + } + + var err = cfg.init() + if err != nil { + t.Fatal(err) + } + + for x, notifCfg := range cfg.Notifs { + if notifCfg.upTmpl == nil { + t.Fatalf(`ServerConfig.Notifs #%d: upTmpl is nil`, x) + } + if notifCfg.downTmpl == nil { + t.Fatalf(`ServerConfig.Notifs #%d: downTmpl is nil`, x) + } + notifCfg.upTmpl = nil + notifCfg.downTmpl = nil + } + + test.Assert(t, tcase.baseDir, tcase.exp, cfg) + } +} diff --git a/service.go b/service.go index 69e68a3..e9c412c 100644 --- a/service.go +++ b/service.go @@ -13,12 +13,19 @@ import ( "git.sr.ht/~shulhan/lilin/internal" ) +// errorCountNotif define the number of consecutive error to trigger +// notification. +const errorCountNotif = 3 + type Service struct { httpConn *http.Client dialer *net.Dialer ticker *time.Ticker cfg ServiceConfig - isReady bool + + errorCount int + + isReady bool } // NewService create and initialize the connection to service. @@ -81,10 +88,12 @@ func (svc *Service) Scan() (report ScanReport) { } // Start scanning periodically and send the report to reportq. -func (svc *Service) Start(reportq chan<- ScanReport) { +func (svc *Service) Start(reportq, notifq chan<- ScanReport) { svc.ticker = time.NewTicker(svc.cfg.interval) - reportq <- svc.Scan() + var report = svc.Scan() + svc.publishNotif(notifq, report) + reportq <- report for { _, ok := <-svc.ticker.C @@ -92,7 +101,9 @@ func (svc *Service) Start(reportq chan<- ScanReport) { log.Printf("Service: %s not ok", svc.cfg.ID) return } - reportq <- svc.Scan() + report = svc.Scan() + svc.publishNotif(notifq, report) + reportq <- report } } @@ -116,3 +127,23 @@ func (svc *Service) connect() (err error) { svc.isReady = true return nil } + +func (svc *Service) publishNotif(notifq chan<- ScanReport, report ScanReport) { + if report.Success { + if svc.errorCount >= errorCountNotif { + // Trigger the UP notification. + select { + case notifq <- report: + } + } + svc.errorCount = 0 + } else { + svc.errorCount++ + if svc.errorCount >= errorCountNotif { + // Trigger the DOWN notification. + select { + case notifq <- report: + } + } + } +} diff --git a/service_test.go b/service_test.go index ba31192..712ee54 100644 --- a/service_test.go +++ b/service_test.go @@ -1,42 +1,42 @@ // SPDX-FileCopyrightText: 2025 M. Shulhan // SPDX-License-Identifier: GPL-3.0-only -package lilin_test +package lilin import ( "testing" - "git.sr.ht/~shulhan/lilin" "git.sr.ht/~shulhan/lilin/internal" "git.sr.ht/~shulhan/pakakeh.go/lib/test" ) func TestServiceStart_HTTP(t *testing.T) { type testCase struct { - expReport lilin.ScanReport - cfg lilin.ServiceConfig + expReport ScanReport + cfg ServiceConfig } var listCase = []testCase{{ - cfg: lilin.ServiceConfig{ + cfg: ServiceConfig{ ID: `http_service`, Address: `http://` + dummyHTTPAddress + `/health`, }, - expReport: lilin.ScanReport{ + expReport: ScanReport{ ID: `http_service`, At: internal.Now(), Success: true, }, }} - var reportq = make(chan lilin.ScanReport, 1) + var reportq = make(chan ScanReport, 1) + var notifq = make(chan ScanReport, 1) for _, tcase := range listCase { - svc, err := lilin.NewService(tcase.cfg) + svc, err := NewService(tcase.cfg) if err != nil { t.Fatal(err) } - go svc.Start(reportq) + go svc.Start(reportq, notifq) var gotReport = <-reportq svc.Stop() diff --git a/testdata/etc/lilin/lilin.cfg b/testdata/etc/lilin/lilin.cfg index fd82299..c2f03c3 100644 --- a/testdata/etc/lilin/lilin.cfg +++ b/testdata/etc/lilin/lilin.cfg @@ -2,5 +2,5 @@ ## SPDX-License-Identifier: GPL-3.0-only [server] -address = 127.0.0.1:6100 +address = 127.0.0.1:6101 default_timeout = 5s diff --git a/testdata/server_config/ok/etc/lilin/lilin.cfg b/testdata/server_config/ok/etc/lilin/lilin.cfg new file mode 100644 index 0000000..ef179d4 --- /dev/null +++ b/testdata/server_config/ok/etc/lilin/lilin.cfg @@ -0,0 +1,19 @@ +## SPDX-FileCopyrightText: 2025 M. Shulhan +## SPDX-License-Identifier: GPL-3.0-only + +[server] +address = 127.0.0.1:12121 + +[notif] +kind = mattermost +webhook_url = http://127.0.0.1:12000 +channel = chan_1 +down_template = Service {{.ID}} is down: {{.Error}} +up_template = Service {{.ID}} is up again! + +[notif] +kind = mattermost +webhook_url = http://127.0.0.1:12001 +channel = chan_2 +down_template = Service {{.ID}} is down: {{.Error}} +up_template = Service {{.ID}} is alive! diff --git a/testdata/worker/pushNotifMattermost/etc/lilin/lilin.cfg b/testdata/worker/pushNotifMattermost/etc/lilin/lilin.cfg new file mode 100644 index 0000000..3c479f3 --- /dev/null +++ b/testdata/worker/pushNotifMattermost/etc/lilin/lilin.cfg @@ -0,0 +1,12 @@ +## SPDX-FileCopyrightText: 2025 M. Shulhan +## SPDX-License-Identifier: GPL-3.0-only + +[server] +address = 127.0.0.1:12121 + +[notif] +kind = mattermost +webhook_url = http://127.0.0.1:6330/mattermost +channel = test_webhook +down_template = Service {{.ID}} is down: {{.Error}} +up_template = Service {{.ID}} is up again! diff --git a/testdata/worker/pushNotifMattermost/etc/lilin/service.d/dummy_http.cfg b/testdata/worker/pushNotifMattermost/etc/lilin/service.d/dummy_http.cfg new file mode 100644 index 0000000..bb3a84f --- /dev/null +++ b/testdata/worker/pushNotifMattermost/etc/lilin/service.d/dummy_http.cfg @@ -0,0 +1,6 @@ +## SPDX-FileCopyrightText: 2025 M. Shulhan +## SPDX-License-Identifier: GPL-3.0-only + +[service] +name = DummyHTTPService +address = http://127.0.0.1:6330/health diff --git a/testdata/worker/pushNotifMattermost/var/log/lilin/service.d/dummy_http.log b/testdata/worker/pushNotifMattermost/var/log/lilin/service.d/dummy_http.log new file mode 100644 index 0000000..e69de29 diff --git a/worker.go b/worker.go index 004e6e2..007bc52 100644 --- a/worker.go +++ b/worker.go @@ -4,26 +4,45 @@ package lilin import ( + "bytes" + "encoding/json" + "io" "log" + "net/http" "os" "path/filepath" "strings" + "time" "git.sr.ht/~shulhan/pakakeh.go/lib/ini" ) // worker contains the report of all services. type worker struct { + // httpc client for notification for kind is mattermost. + httpc *http.Client + + cfg ServerConfig + // Internal service status, where key is service name. Services map[string]*Service `ini:"service"` reportq chan ScanReport + // notifq contains queue for notification. + notifq chan ScanReport + Reports Reports } +type mattermostWebhook struct { + Channel string `json:"channel"` + Text string `json:"text"` +} + func newWorker(cfg ServerConfig) (wrk *worker, err error) { wrk = &worker{ + cfg: cfg, Services: make(map[string]*Service), } @@ -41,11 +60,17 @@ func newWorker(cfg ServerConfig) (wrk *worker, err error) { return nil, err } } - return wrk, nil -} -type serviceConfigs struct { - Config map[string]ServiceConfig `ini:"service"` + for _, notifConfig := range cfg.Notifs { + switch notifConfig.Kind { + case notifKindMattermost: + wrk.httpc = &http.Client{ + Timeout: 5 * time.Second, + } + } + } + + return wrk, nil } // loadServiceDir Load all the service configurations. @@ -101,12 +126,15 @@ func (wrk *worker) loadServiceDir(cfg ServerConfig) (err error) { // Once new report coming, notify the upstream through the passed channel. func (wrk *worker) start(updateq chan<- struct{}) { wrk.reportq = make(chan ScanReport, len(wrk.Services)+1) + wrk.notifq = make(chan ScanReport, len(wrk.Services)+1) var svc *Service for _, svc = range wrk.Services { - go svc.Start(wrk.reportq) + go svc.Start(wrk.reportq, wrk.notifq) } + go wrk.handlePushNotif() + var scanReport ScanReport var ok bool for { @@ -120,6 +148,7 @@ func (wrk *worker) start(updateq chan<- struct{}) { } func (wrk *worker) stop() { + close(wrk.notifq) close(wrk.reportq) var svc *Service @@ -136,3 +165,78 @@ func (wrk *worker) stop() { } } } + +// handlePushNotif consume the failed ScanReport from notifq to be forwarded +// to the all notification defined in NotifConfig. +func (wrk *worker) handlePushNotif() { + for scanReport := range wrk.notifq { + for _, notifConfig := range wrk.cfg.Notifs { + switch notifConfig.Kind { + case notifKindMattermost: + wrk.pushNotifMattermost(notifConfig, &scanReport) + } + } + } +} + +func (wrk *worker) pushNotifMattermost( + notifConfig *NotifConfig, + scanReport *ScanReport, +) { + var logp = `pushNotifMattermost` + var text strings.Builder + var err error + if scanReport.Success { + err = notifConfig.upTmpl.Execute(&text, scanReport) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + } else { + err = notifConfig.downTmpl.Execute(&text, scanReport) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + } + + var msg = mattermostWebhook{ + Channel: notifConfig.Channel, + Text: text.String(), + } + var body []byte + body, err = json.Marshal(&msg) + if err != nil { + log.Printf(`%s: %s`, logp, err) + return + } + + var req = &http.Request{ + Method: http.MethodPost, + URL: notifConfig.webhookURL, + Header: http.Header{ + `Content-Type`: []string{ + `application/json`, + }, + }, + Body: io.NopCloser(bytes.NewReader(body)), + } + + var resp *http.Response + resp, err = wrk.httpc.Do(req) + if err != nil { + log.Printf(`%s: %s`, logp, err) + } + + if resp.StatusCode == 200 { + log.Printf(`%s: send %q`, logp, body) + return + } + + body, err = io.ReadAll(resp.Body) + if err != nil { + log.Printf(`%s: %s`, logp, err) + } + + log.Printf(`%s: fail with status code %d: %s`, logp, resp.StatusCode, body) +} diff --git a/worker_test.go b/worker_test.go index c00339c..833322c 100644 --- a/worker_test.go +++ b/worker_test.go @@ -94,3 +94,32 @@ func TestNewWorker(t *testing.T) { test.Assert(t, `worker.Services`, tcase.expServices, wrk.Services) } } + +func TestWorker_pushNotifMattermost(t *testing.T) { + var serverCfg = &ServerConfig{ + BaseDir: `testdata/worker/pushNotifMattermost`, + } + + var err = serverCfg.init() + if err != nil { + t.Fatal(err) + } + + var notifCfgMattermost = serverCfg.Notifs[0] + + var wrk *worker + wrk, err = newWorker(*serverCfg) + if err != nil { + t.Fatal(err) + } + + var scanReport = &ScanReport{ + ID: `dummy_http`, + Error: `503 internal server error`, + } + wrk.pushNotifMattermost(notifCfgMattermost, scanReport) + + var gotRequestBody = <-dhs.reqbodyq + var expRequestBody = `{"channel":"test_webhook","text":"Service dummy_http is down: 503 internal server error"}` + test.Assert(t, `request body`, expRequestBody, string(gotRequestBody)) +} -- cgit v1.3