diff options
| author | Shulhan <ms@kilabit.info> | 2025-07-31 20:00:28 +0700 |
|---|---|---|
| committer | Shulhan <ms@kilabit.info> | 2025-07-31 20:00:28 +0700 |
| commit | bec99d46d65c451f18000a7247d3df06765a3894 (patch) | |
| tree | ad1d073ac7bcc2fc5d76c5c550c672f2a4552302 | |
| parent | 9998582575aa02dbd2206497953f207af1f117d2 (diff) | |
| download | lilin-bec99d46d65c451f18000a7247d3df06765a3894.tar.xz | |
all: implement worker start and stop
The start method run all the services scanner in the background and
consume the report from it from channel.
Each time new report coming it will notify the Server to update the
index page body.
| -rw-r--r-- | _www/index.tmpl | 17 | ||||
| -rw-r--r-- | internal/internal.go | 2 | ||||
| -rw-r--r-- | reports.go | 2 | ||||
| -rw-r--r-- | server.go | 67 | ||||
| -rw-r--r-- | service.go | 3 | ||||
| -rw-r--r-- | worker.go | 54 |
6 files changed, 126 insertions, 19 deletions
diff --git a/_www/index.tmpl b/_www/index.tmpl index 250843d..f7cebda 100644 --- a/_www/index.tmpl +++ b/_www/index.tmpl @@ -6,10 +6,21 @@ <title>{{.Title}}</title> </head> <body> - {{range .Services}} + <h2>{{.Title}}</h2> + {{ range.Services }} <div> - <div>{{.Name}}</div> + <div> + <span>{{.Name}}</span> + {{if .Last.Success}} + <span class="success"> OK </span> + {{else}} + <span class="fail"> FAIL </span> + {{ end }} + </div> + {{if not .Last.Success}} + <div>{{.Last.At}}: {{ .Last.Error }}</div> + {{ end }} </div> - {{end}} + {{ end }} </body> </html> diff --git a/internal/internal.go b/internal/internal.go index ce6de02..46b9877 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -7,5 +7,5 @@ import "time" // Now return the current time. var Now = func() time.Time { - return time.Now() + return time.Now().UTC().Round(time.Second) } @@ -5,6 +5,6 @@ package lilin // Reports contains the report for all services. type Reports struct { - Services map[string]ServiceReport + Services map[string]*ServiceReport Title string } @@ -10,6 +10,7 @@ import ( "html/template" "log" "net/http" + "sync" "time" ) @@ -24,11 +25,18 @@ type Server struct { httpd *http.Server worker *worker + // updateq channel to receive notification when the new report coming + // from worker. + updateq chan struct{} + Options ServerOptions - Reports Reports pageIndexTmpl *template.Template pageIndexBody bytes.Buffer + + // pageIndexMutex guard the pageIndexBody between changes from worker + // Reports (write) and when rendered to client (read). + pageIndexMutex sync.Mutex } // NewServer create new server to serve the content of _www and HTTP APIs. @@ -37,9 +45,7 @@ func NewServer(opts ServerOptions) (srv *Server, err error) { srv = &Server{ Options: opts, - Reports: Reports{ - Services: map[string]ServiceReport{}, - }, + updateq: make(chan struct{}, 1), } err = srv.Options.init() @@ -52,17 +58,11 @@ func NewServer(opts ServerOptions) (srv *Server, err error) { return nil, fmt.Errorf(`%s: %w`, logp, err) } - for name := range srv.worker.Services { - srv.Reports.Services[name] = ServiceReport{ - Name: name, - } - } - srv.pageIndexTmpl, err = template.ParseFS(wwwFS, `_www/index.tmpl`) if err != nil { return nil, err } - err = srv.pageIndexTmpl.Execute(&srv.pageIndexBody, &srv.Reports) + err = srv.pageIndexTmpl.Execute(&srv.pageIndexBody, &srv.worker.Reports) if err != nil { return nil, err } @@ -81,6 +81,9 @@ func NewServer(opts ServerOptions) (srv *Server, err error) { // ListenAndServe start handling request on incoming connections. func (srv *Server) ListenAndServe() (err error) { + go srv.update() + go srv.worker.start(srv.updateq) + log.Printf(`lilin: starting HTTP server at %s`, srv.Options.Address) err = srv.httpd.ListenAndServe() @@ -97,6 +100,8 @@ func (srv *Server) Shutdown(ctx context.Context) (err error) { if err != nil { return fmt.Errorf(`Shutdown: %w`, err) } + srv.worker.stop() + close(srv.updateq) return nil } @@ -105,7 +110,9 @@ func (srv *Server) ServeHTTP(resw http.ResponseWriter, req *http.Request) { switch req.URL.Path { case `/`, `/index.html`: + srv.pageIndexMutex.Lock() _, err = resw.Write(srv.pageIndexBody.Bytes()) + srv.pageIndexMutex.Unlock() if err != nil { srv.internalError(resw, err) } @@ -122,3 +129,41 @@ func (srv *Server) internalError(resw http.ResponseWriter, err error) { log.Println(`internalError:`, err.Error()) } } + +// update the index page body periodically only when there is changes from +// upstream worker. +// Since services may have different scanned interval (and may longer than +// default interval), it will takes times to update the index page body every +// times we receive updates from worker. +// Hence why we need to update only when changes happened. +func (srv *Server) update() { + var ticker = time.NewTicker(defInterval) + var c int + var err error + for { + select { + case _, ok := <-srv.updateq: + if !ok { + return + } + c++ + case <-ticker.C: + if c == 0 { + continue + } + c = 0 + + srv.pageIndexMutex.Lock() + srv.worker.reportsMutex.Lock() + + srv.pageIndexBody.Reset() + err = srv.pageIndexTmpl.Execute(&srv.pageIndexBody, &srv.worker.Reports) + if err != nil { + log.Printf(`update: pageIndexTmpl: %s`, err) + } + + srv.worker.reportsMutex.Unlock() + srv.pageIndexMutex.Unlock() + } + } +} @@ -66,7 +66,7 @@ func (svc *Service) Scan() (report ScanReport) { case serviceKindTCP, serviceKindUDP: var conn net.Conn - conn, err = svc.dialer.Dial(`udp`, svc.opts.scanURL.Host) + conn, err = svc.dialer.Dial(svc.opts.scanURL.Scheme, svc.opts.scanURL.Host) if err != nil { report.Error = err.Error() return report @@ -92,7 +92,6 @@ func (svc *Service) Start(reportq chan<- ScanReport) { log.Printf("Service: %s not ok", svc.opts.Name) return } - log.Printf(`Scan %s started`, svc.opts.Name) reportq <- svc.Scan() } } @@ -16,17 +16,31 @@ import ( type worker struct { // Internal service status, where key is service name. Services map[string]*Service `ini:"service"` - sync.Mutex + + reportq chan ScanReport + + Reports Reports + reportsMutex sync.Mutex } func newWorker(configDir string) (wrk *worker, err error) { wrk = &worker{ Services: make(map[string]*Service), } + err = wrk.loadServiceDir(configDir) if err != nil { return nil, err } + + wrk.Reports = Reports{ + Services: make(map[string]*ServiceReport, len(wrk.Services)), + } + for name := range wrk.Services { + wrk.Reports.Services[name] = &ServiceReport{ + Name: name, + } + } return wrk, nil } @@ -83,3 +97,41 @@ func (wrk *worker) loadServiceDir(configDir string) (err error) { } return nil } + +// start run all the services scanner in the background and consume the report +// from it through channel. +// Once new report coming, notify the upstream through the passed channel. +func (wrk *worker) start(updateq chan<- struct{}) { + wrk.reportq = make(chan ScanReport, len(wrk.Services)+1) + + var svc *Service + for _, svc = range wrk.Services { + go svc.Start(wrk.reportq) + } + + var scanReport ScanReport + var ok bool + for { + scanReport, ok = <-wrk.reportq + if !ok { + break + } + + wrk.reportsMutex.Lock() + var svcReport = wrk.Reports.Services[scanReport.Name] + svcReport.Last = scanReport + svcReport.History = append(svcReport.History, scanReport) + wrk.reportsMutex.Unlock() + + updateq <- struct{}{} + } +} + +func (wrk *worker) stop() { + close(wrk.reportq) + + var svc *Service + for _, svc = range wrk.Services { + svc.Stop() + } +} |
