From bec99d46d65c451f18000a7247d3df06765a3894 Mon Sep 17 00:00:00 2001 From: Shulhan Date: Thu, 31 Jul 2025 20:00:28 +0700 Subject: all: implement worker start and stop The start method run all the services scanner in the background and consume the report from it from channel. Each time new report coming it will notify the Server to update the index page body. --- _www/index.tmpl | 17 ++++++++++--- internal/internal.go | 2 +- reports.go | 2 +- server.go | 67 +++++++++++++++++++++++++++++++++++++++++++--------- service.go | 3 +-- worker.go | 54 +++++++++++++++++++++++++++++++++++++++++- 6 files changed, 126 insertions(+), 19 deletions(-) diff --git a/_www/index.tmpl b/_www/index.tmpl index 250843d..f7cebda 100644 --- a/_www/index.tmpl +++ b/_www/index.tmpl @@ -6,10 +6,21 @@ {{.Title}} - {{range .Services}} +

{{.Title}}

+ {{ range.Services }}
-
{{.Name}}
+
+ {{.Name}} + {{if .Last.Success}} + OK + {{else}} + FAIL + {{ end }} +
+ {{if not .Last.Success}} +
{{.Last.At}}: {{ .Last.Error }}
+ {{ end }}
- {{end}} + {{ end }} diff --git a/internal/internal.go b/internal/internal.go index ce6de02..46b9877 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -7,5 +7,5 @@ import "time" // Now return the current time. var Now = func() time.Time { - return time.Now() + return time.Now().UTC().Round(time.Second) } diff --git a/reports.go b/reports.go index 1ffbd43..91ff2df 100644 --- a/reports.go +++ b/reports.go @@ -5,6 +5,6 @@ package lilin // Reports contains the report for all services. type Reports struct { - Services map[string]ServiceReport + Services map[string]*ServiceReport Title string } diff --git a/server.go b/server.go index 620c36f..0ff4e2a 100644 --- a/server.go +++ b/server.go @@ -10,6 +10,7 @@ import ( "html/template" "log" "net/http" + "sync" "time" ) @@ -24,11 +25,18 @@ type Server struct { httpd *http.Server worker *worker + // updateq channel to receive notification when the new report coming + // from worker. + updateq chan struct{} + Options ServerOptions - Reports Reports pageIndexTmpl *template.Template pageIndexBody bytes.Buffer + + // pageIndexMutex guard the pageIndexBody between changes from worker + // Reports (write) and when rendered to client (read). + pageIndexMutex sync.Mutex } // NewServer create new server to serve the content of _www and HTTP APIs. @@ -37,9 +45,7 @@ func NewServer(opts ServerOptions) (srv *Server, err error) { srv = &Server{ Options: opts, - Reports: Reports{ - Services: map[string]ServiceReport{}, - }, + updateq: make(chan struct{}, 1), } err = srv.Options.init() @@ -52,17 +58,11 @@ func NewServer(opts ServerOptions) (srv *Server, err error) { return nil, fmt.Errorf(`%s: %w`, logp, err) } - for name := range srv.worker.Services { - srv.Reports.Services[name] = ServiceReport{ - Name: name, - } - } - srv.pageIndexTmpl, err = template.ParseFS(wwwFS, `_www/index.tmpl`) if err != nil { return nil, err } - err = srv.pageIndexTmpl.Execute(&srv.pageIndexBody, &srv.Reports) + err = srv.pageIndexTmpl.Execute(&srv.pageIndexBody, &srv.worker.Reports) if err != nil { return nil, err } @@ -81,6 +81,9 @@ func NewServer(opts ServerOptions) (srv *Server, err error) { // ListenAndServe start handling request on incoming connections. func (srv *Server) ListenAndServe() (err error) { + go srv.update() + go srv.worker.start(srv.updateq) + log.Printf(`lilin: starting HTTP server at %s`, srv.Options.Address) err = srv.httpd.ListenAndServe() @@ -97,6 +100,8 @@ func (srv *Server) Shutdown(ctx context.Context) (err error) { if err != nil { return fmt.Errorf(`Shutdown: %w`, err) } + srv.worker.stop() + close(srv.updateq) return nil } @@ -105,7 +110,9 @@ func (srv *Server) ServeHTTP(resw http.ResponseWriter, req *http.Request) { switch req.URL.Path { case `/`, `/index.html`: + srv.pageIndexMutex.Lock() _, err = resw.Write(srv.pageIndexBody.Bytes()) + srv.pageIndexMutex.Unlock() if err != nil { srv.internalError(resw, err) } @@ -122,3 +129,41 @@ func (srv *Server) internalError(resw http.ResponseWriter, err error) { log.Println(`internalError:`, err.Error()) } } + +// update the index page body periodically only when there is changes from +// upstream worker. +// Since services may have different scanned interval (and may longer than +// default interval), it will takes times to update the index page body every +// times we receive updates from worker. +// Hence why we need to update only when changes happened. +func (srv *Server) update() { + var ticker = time.NewTicker(defInterval) + var c int + var err error + for { + select { + case _, ok := <-srv.updateq: + if !ok { + return + } + c++ + case <-ticker.C: + if c == 0 { + continue + } + c = 0 + + srv.pageIndexMutex.Lock() + srv.worker.reportsMutex.Lock() + + srv.pageIndexBody.Reset() + err = srv.pageIndexTmpl.Execute(&srv.pageIndexBody, &srv.worker.Reports) + if err != nil { + log.Printf(`update: pageIndexTmpl: %s`, err) + } + + srv.worker.reportsMutex.Unlock() + srv.pageIndexMutex.Unlock() + } + } +} diff --git a/service.go b/service.go index b1bf11c..e8c1f8d 100644 --- a/service.go +++ b/service.go @@ -66,7 +66,7 @@ func (svc *Service) Scan() (report ScanReport) { case serviceKindTCP, serviceKindUDP: var conn net.Conn - conn, err = svc.dialer.Dial(`udp`, svc.opts.scanURL.Host) + conn, err = svc.dialer.Dial(svc.opts.scanURL.Scheme, svc.opts.scanURL.Host) if err != nil { report.Error = err.Error() return report @@ -92,7 +92,6 @@ func (svc *Service) Start(reportq chan<- ScanReport) { log.Printf("Service: %s not ok", svc.opts.Name) return } - log.Printf(`Scan %s started`, svc.opts.Name) reportq <- svc.Scan() } } diff --git a/worker.go b/worker.go index f58363c..f3348a7 100644 --- a/worker.go +++ b/worker.go @@ -16,17 +16,31 @@ import ( type worker struct { // Internal service status, where key is service name. Services map[string]*Service `ini:"service"` - sync.Mutex + + reportq chan ScanReport + + Reports Reports + reportsMutex sync.Mutex } func newWorker(configDir string) (wrk *worker, err error) { wrk = &worker{ Services: make(map[string]*Service), } + err = wrk.loadServiceDir(configDir) if err != nil { return nil, err } + + wrk.Reports = Reports{ + Services: make(map[string]*ServiceReport, len(wrk.Services)), + } + for name := range wrk.Services { + wrk.Reports.Services[name] = &ServiceReport{ + Name: name, + } + } return wrk, nil } @@ -83,3 +97,41 @@ func (wrk *worker) loadServiceDir(configDir string) (err error) { } return nil } + +// start run all the services scanner in the background and consume the report +// from it through channel. +// Once new report coming, notify the upstream through the passed channel. +func (wrk *worker) start(updateq chan<- struct{}) { + wrk.reportq = make(chan ScanReport, len(wrk.Services)+1) + + var svc *Service + for _, svc = range wrk.Services { + go svc.Start(wrk.reportq) + } + + var scanReport ScanReport + var ok bool + for { + scanReport, ok = <-wrk.reportq + if !ok { + break + } + + wrk.reportsMutex.Lock() + var svcReport = wrk.Reports.Services[scanReport.Name] + svcReport.Last = scanReport + svcReport.History = append(svcReport.History, scanReport) + wrk.reportsMutex.Unlock() + + updateq <- struct{}{} + } +} + +func (wrk *worker) stop() { + close(wrk.reportq) + + var svc *Service + for _, svc = range wrk.Services { + svc.Stop() + } +} -- cgit v1.3