diff options
| -rw-r--r-- | _www/index.tmpl | 17 | ||||
| -rw-r--r-- | internal/internal.go | 2 | ||||
| -rw-r--r-- | reports.go | 2 | ||||
| -rw-r--r-- | server.go | 67 | ||||
| -rw-r--r-- | service.go | 3 | ||||
| -rw-r--r-- | worker.go | 54 |
6 files changed, 126 insertions, 19 deletions
diff --git a/_www/index.tmpl b/_www/index.tmpl index 250843d..f7cebda 100644 --- a/_www/index.tmpl +++ b/_www/index.tmpl @@ -6,10 +6,21 @@ <title>{{.Title}}</title> </head> <body> - {{range .Services}} + <h2>{{.Title}}</h2> + {{ range.Services }} <div> - <div>{{.Name}}</div> + <div> + <span>{{.Name}}</span> + {{if .Last.Success}} + <span class="success"> OK </span> + {{else}} + <span class="fail"> FAIL </span> + {{ end }} + </div> + {{if not .Last.Success}} + <div>{{.Last.At}}: {{ .Last.Error }}</div> + {{ end }} </div> - {{end}} + {{ end }} </body> </html> diff --git a/internal/internal.go b/internal/internal.go index ce6de02..46b9877 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -7,5 +7,5 @@ import "time" // Now return the current time. var Now = func() time.Time { - return time.Now() + return time.Now().UTC().Round(time.Second) } @@ -5,6 +5,6 @@ package lilin // Reports contains the report for all services. type Reports struct { - Services map[string]ServiceReport + Services map[string]*ServiceReport Title string } @@ -10,6 +10,7 @@ import ( "html/template" "log" "net/http" + "sync" "time" ) @@ -24,11 +25,18 @@ type Server struct { httpd *http.Server worker *worker + // updateq channel to receive notification when the new report coming + // from worker. + updateq chan struct{} + Options ServerOptions - Reports Reports pageIndexTmpl *template.Template pageIndexBody bytes.Buffer + + // pageIndexMutex guard the pageIndexBody between changes from worker + // Reports (write) and when rendered to client (read). + pageIndexMutex sync.Mutex } // NewServer create new server to serve the content of _www and HTTP APIs. @@ -37,9 +45,7 @@ func NewServer(opts ServerOptions) (srv *Server, err error) { srv = &Server{ Options: opts, - Reports: Reports{ - Services: map[string]ServiceReport{}, - }, + updateq: make(chan struct{}, 1), } err = srv.Options.init() @@ -52,17 +58,11 @@ func NewServer(opts ServerOptions) (srv *Server, err error) { return nil, fmt.Errorf(`%s: %w`, logp, err) } - for name := range srv.worker.Services { - srv.Reports.Services[name] = ServiceReport{ - Name: name, - } - } - srv.pageIndexTmpl, err = template.ParseFS(wwwFS, `_www/index.tmpl`) if err != nil { return nil, err } - err = srv.pageIndexTmpl.Execute(&srv.pageIndexBody, &srv.Reports) + err = srv.pageIndexTmpl.Execute(&srv.pageIndexBody, &srv.worker.Reports) if err != nil { return nil, err } @@ -81,6 +81,9 @@ func NewServer(opts ServerOptions) (srv *Server, err error) { // ListenAndServe start handling request on incoming connections. func (srv *Server) ListenAndServe() (err error) { + go srv.update() + go srv.worker.start(srv.updateq) + log.Printf(`lilin: starting HTTP server at %s`, srv.Options.Address) err = srv.httpd.ListenAndServe() @@ -97,6 +100,8 @@ func (srv *Server) Shutdown(ctx context.Context) (err error) { if err != nil { return fmt.Errorf(`Shutdown: %w`, err) } + srv.worker.stop() + close(srv.updateq) return nil } @@ -105,7 +110,9 @@ func (srv *Server) ServeHTTP(resw http.ResponseWriter, req *http.Request) { switch req.URL.Path { case `/`, `/index.html`: + srv.pageIndexMutex.Lock() _, err = resw.Write(srv.pageIndexBody.Bytes()) + srv.pageIndexMutex.Unlock() if err != nil { srv.internalError(resw, err) } @@ -122,3 +129,41 @@ func (srv *Server) internalError(resw http.ResponseWriter, err error) { log.Println(`internalError:`, err.Error()) } } + +// update the index page body periodically only when there is changes from +// upstream worker. +// Since services may have different scanned interval (and may longer than +// default interval), it will takes times to update the index page body every +// times we receive updates from worker. +// Hence why we need to update only when changes happened. +func (srv *Server) update() { + var ticker = time.NewTicker(defInterval) + var c int + var err error + for { + select { + case _, ok := <-srv.updateq: + if !ok { + return + } + c++ + case <-ticker.C: + if c == 0 { + continue + } + c = 0 + + srv.pageIndexMutex.Lock() + srv.worker.reportsMutex.Lock() + + srv.pageIndexBody.Reset() + err = srv.pageIndexTmpl.Execute(&srv.pageIndexBody, &srv.worker.Reports) + if err != nil { + log.Printf(`update: pageIndexTmpl: %s`, err) + } + + srv.worker.reportsMutex.Unlock() + srv.pageIndexMutex.Unlock() + } + } +} @@ -66,7 +66,7 @@ func (svc *Service) Scan() (report ScanReport) { case serviceKindTCP, serviceKindUDP: var conn net.Conn - conn, err = svc.dialer.Dial(`udp`, svc.opts.scanURL.Host) + conn, err = svc.dialer.Dial(svc.opts.scanURL.Scheme, svc.opts.scanURL.Host) if err != nil { report.Error = err.Error() return report @@ -92,7 +92,6 @@ func (svc *Service) Start(reportq chan<- ScanReport) { log.Printf("Service: %s not ok", svc.opts.Name) return } - log.Printf(`Scan %s started`, svc.opts.Name) reportq <- svc.Scan() } } @@ -16,17 +16,31 @@ import ( type worker struct { // Internal service status, where key is service name. Services map[string]*Service `ini:"service"` - sync.Mutex + + reportq chan ScanReport + + Reports Reports + reportsMutex sync.Mutex } func newWorker(configDir string) (wrk *worker, err error) { wrk = &worker{ Services: make(map[string]*Service), } + err = wrk.loadServiceDir(configDir) if err != nil { return nil, err } + + wrk.Reports = Reports{ + Services: make(map[string]*ServiceReport, len(wrk.Services)), + } + for name := range wrk.Services { + wrk.Reports.Services[name] = &ServiceReport{ + Name: name, + } + } return wrk, nil } @@ -83,3 +97,41 @@ func (wrk *worker) loadServiceDir(configDir string) (err error) { } return nil } + +// start run all the services scanner in the background and consume the report +// from it through channel. +// Once new report coming, notify the upstream through the passed channel. +func (wrk *worker) start(updateq chan<- struct{}) { + wrk.reportq = make(chan ScanReport, len(wrk.Services)+1) + + var svc *Service + for _, svc = range wrk.Services { + go svc.Start(wrk.reportq) + } + + var scanReport ScanReport + var ok bool + for { + scanReport, ok = <-wrk.reportq + if !ok { + break + } + + wrk.reportsMutex.Lock() + var svcReport = wrk.Reports.Services[scanReport.Name] + svcReport.Last = scanReport + svcReport.History = append(svcReport.History, scanReport) + wrk.reportsMutex.Unlock() + + updateq <- struct{}{} + } +} + +func (wrk *worker) stop() { + close(wrk.reportq) + + var svc *Service + for _, svc = range wrk.Services { + svc.Stop() + } +} |
