aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--_www/index.tmpl17
-rw-r--r--internal/internal.go2
-rw-r--r--reports.go2
-rw-r--r--server.go67
-rw-r--r--service.go3
-rw-r--r--worker.go54
6 files changed, 126 insertions, 19 deletions
diff --git a/_www/index.tmpl b/_www/index.tmpl
index 250843d..f7cebda 100644
--- a/_www/index.tmpl
+++ b/_www/index.tmpl
@@ -6,10 +6,21 @@
<title>{{.Title}}</title>
</head>
<body>
- {{range .Services}}
+ <h2>{{.Title}}</h2>
+ {{ range.Services }}
<div>
- <div>{{.Name}}</div>
+ <div>
+ <span>{{.Name}}</span>
+ {{if .Last.Success}}
+ <span class="success"> OK </span>
+ {{else}}
+ <span class="fail"> FAIL </span>
+ {{ end }}
+ </div>
+ {{if not .Last.Success}}
+ <div>{{.Last.At}}: {{ .Last.Error }}</div>
+ {{ end }}
</div>
- {{end}}
+ {{ end }}
</body>
</html>
diff --git a/internal/internal.go b/internal/internal.go
index ce6de02..46b9877 100644
--- a/internal/internal.go
+++ b/internal/internal.go
@@ -7,5 +7,5 @@ import "time"
// Now return the current time.
var Now = func() time.Time {
- return time.Now()
+ return time.Now().UTC().Round(time.Second)
}
diff --git a/reports.go b/reports.go
index 1ffbd43..91ff2df 100644
--- a/reports.go
+++ b/reports.go
@@ -5,6 +5,6 @@ package lilin
// Reports contains the report for all services.
type Reports struct {
- Services map[string]ServiceReport
+ Services map[string]*ServiceReport
Title string
}
diff --git a/server.go b/server.go
index 620c36f..0ff4e2a 100644
--- a/server.go
+++ b/server.go
@@ -10,6 +10,7 @@ import (
"html/template"
"log"
"net/http"
+ "sync"
"time"
)
@@ -24,11 +25,18 @@ type Server struct {
httpd *http.Server
worker *worker
+ // updateq channel to receive notification when the new report coming
+ // from worker.
+ updateq chan struct{}
+
Options ServerOptions
- Reports Reports
pageIndexTmpl *template.Template
pageIndexBody bytes.Buffer
+
+ // pageIndexMutex guard the pageIndexBody between changes from worker
+ // Reports (write) and when rendered to client (read).
+ pageIndexMutex sync.Mutex
}
// NewServer create new server to serve the content of _www and HTTP APIs.
@@ -37,9 +45,7 @@ func NewServer(opts ServerOptions) (srv *Server, err error) {
srv = &Server{
Options: opts,
- Reports: Reports{
- Services: map[string]ServiceReport{},
- },
+ updateq: make(chan struct{}, 1),
}
err = srv.Options.init()
@@ -52,17 +58,11 @@ func NewServer(opts ServerOptions) (srv *Server, err error) {
return nil, fmt.Errorf(`%s: %w`, logp, err)
}
- for name := range srv.worker.Services {
- srv.Reports.Services[name] = ServiceReport{
- Name: name,
- }
- }
-
srv.pageIndexTmpl, err = template.ParseFS(wwwFS, `_www/index.tmpl`)
if err != nil {
return nil, err
}
- err = srv.pageIndexTmpl.Execute(&srv.pageIndexBody, &srv.Reports)
+ err = srv.pageIndexTmpl.Execute(&srv.pageIndexBody, &srv.worker.Reports)
if err != nil {
return nil, err
}
@@ -81,6 +81,9 @@ func NewServer(opts ServerOptions) (srv *Server, err error) {
// ListenAndServe start handling request on incoming connections.
func (srv *Server) ListenAndServe() (err error) {
+ go srv.update()
+ go srv.worker.start(srv.updateq)
+
log.Printf(`lilin: starting HTTP server at %s`, srv.Options.Address)
err = srv.httpd.ListenAndServe()
@@ -97,6 +100,8 @@ func (srv *Server) Shutdown(ctx context.Context) (err error) {
if err != nil {
return fmt.Errorf(`Shutdown: %w`, err)
}
+ srv.worker.stop()
+ close(srv.updateq)
return nil
}
@@ -105,7 +110,9 @@ func (srv *Server) ServeHTTP(resw http.ResponseWriter, req *http.Request) {
switch req.URL.Path {
case `/`, `/index.html`:
+ srv.pageIndexMutex.Lock()
_, err = resw.Write(srv.pageIndexBody.Bytes())
+ srv.pageIndexMutex.Unlock()
if err != nil {
srv.internalError(resw, err)
}
@@ -122,3 +129,41 @@ func (srv *Server) internalError(resw http.ResponseWriter, err error) {
log.Println(`internalError:`, err.Error())
}
}
+
+// update the index page body periodically only when there is changes from
+// upstream worker.
+// Since services may have different scanned interval (and may longer than
+// default interval), it will takes times to update the index page body every
+// times we receive updates from worker.
+// Hence why we need to update only when changes happened.
+func (srv *Server) update() {
+ var ticker = time.NewTicker(defInterval)
+ var c int
+ var err error
+ for {
+ select {
+ case _, ok := <-srv.updateq:
+ if !ok {
+ return
+ }
+ c++
+ case <-ticker.C:
+ if c == 0 {
+ continue
+ }
+ c = 0
+
+ srv.pageIndexMutex.Lock()
+ srv.worker.reportsMutex.Lock()
+
+ srv.pageIndexBody.Reset()
+ err = srv.pageIndexTmpl.Execute(&srv.pageIndexBody, &srv.worker.Reports)
+ if err != nil {
+ log.Printf(`update: pageIndexTmpl: %s`, err)
+ }
+
+ srv.worker.reportsMutex.Unlock()
+ srv.pageIndexMutex.Unlock()
+ }
+ }
+}
diff --git a/service.go b/service.go
index b1bf11c..e8c1f8d 100644
--- a/service.go
+++ b/service.go
@@ -66,7 +66,7 @@ func (svc *Service) Scan() (report ScanReport) {
case serviceKindTCP, serviceKindUDP:
var conn net.Conn
- conn, err = svc.dialer.Dial(`udp`, svc.opts.scanURL.Host)
+ conn, err = svc.dialer.Dial(svc.opts.scanURL.Scheme, svc.opts.scanURL.Host)
if err != nil {
report.Error = err.Error()
return report
@@ -92,7 +92,6 @@ func (svc *Service) Start(reportq chan<- ScanReport) {
log.Printf("Service: %s not ok", svc.opts.Name)
return
}
- log.Printf(`Scan %s started`, svc.opts.Name)
reportq <- svc.Scan()
}
}
diff --git a/worker.go b/worker.go
index f58363c..f3348a7 100644
--- a/worker.go
+++ b/worker.go
@@ -16,17 +16,31 @@ import (
type worker struct {
// Internal service status, where key is service name.
Services map[string]*Service `ini:"service"`
- sync.Mutex
+
+ reportq chan ScanReport
+
+ Reports Reports
+ reportsMutex sync.Mutex
}
func newWorker(configDir string) (wrk *worker, err error) {
wrk = &worker{
Services: make(map[string]*Service),
}
+
err = wrk.loadServiceDir(configDir)
if err != nil {
return nil, err
}
+
+ wrk.Reports = Reports{
+ Services: make(map[string]*ServiceReport, len(wrk.Services)),
+ }
+ for name := range wrk.Services {
+ wrk.Reports.Services[name] = &ServiceReport{
+ Name: name,
+ }
+ }
return wrk, nil
}
@@ -83,3 +97,41 @@ func (wrk *worker) loadServiceDir(configDir string) (err error) {
}
return nil
}
+
+// start run all the services scanner in the background and consume the report
+// from it through channel.
+// Once new report coming, notify the upstream through the passed channel.
+func (wrk *worker) start(updateq chan<- struct{}) {
+ wrk.reportq = make(chan ScanReport, len(wrk.Services)+1)
+
+ var svc *Service
+ for _, svc = range wrk.Services {
+ go svc.Start(wrk.reportq)
+ }
+
+ var scanReport ScanReport
+ var ok bool
+ for {
+ scanReport, ok = <-wrk.reportq
+ if !ok {
+ break
+ }
+
+ wrk.reportsMutex.Lock()
+ var svcReport = wrk.Reports.Services[scanReport.Name]
+ svcReport.Last = scanReport
+ svcReport.History = append(svcReport.History, scanReport)
+ wrk.reportsMutex.Unlock()
+
+ updateq <- struct{}{}
+ }
+}
+
+func (wrk *worker) stop() {
+ close(wrk.reportq)
+
+ var svc *Service
+ for _, svc = range wrk.Services {
+ svc.Stop()
+ }
+}