aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2025-07-31 20:00:28 +0700
committerShulhan <ms@kilabit.info>2025-07-31 20:00:28 +0700
commitbec99d46d65c451f18000a7247d3df06765a3894 (patch)
treead1d073ac7bcc2fc5d76c5c550c672f2a4552302
parent9998582575aa02dbd2206497953f207af1f117d2 (diff)
downloadlilin-bec99d46d65c451f18000a7247d3df06765a3894.tar.xz
all: implement worker start and stop
The start method run all the services scanner in the background and consume the report from it from channel. Each time new report coming it will notify the Server to update the index page body.
-rw-r--r--_www/index.tmpl17
-rw-r--r--internal/internal.go2
-rw-r--r--reports.go2
-rw-r--r--server.go67
-rw-r--r--service.go3
-rw-r--r--worker.go54
6 files changed, 126 insertions, 19 deletions
diff --git a/_www/index.tmpl b/_www/index.tmpl
index 250843d..f7cebda 100644
--- a/_www/index.tmpl
+++ b/_www/index.tmpl
@@ -6,10 +6,21 @@
<title>{{.Title}}</title>
</head>
<body>
- {{range .Services}}
+ <h2>{{.Title}}</h2>
+ {{ range.Services }}
<div>
- <div>{{.Name}}</div>
+ <div>
+ <span>{{.Name}}</span>
+ {{if .Last.Success}}
+ <span class="success"> OK </span>
+ {{else}}
+ <span class="fail"> FAIL </span>
+ {{ end }}
+ </div>
+ {{if not .Last.Success}}
+ <div>{{.Last.At}}: {{ .Last.Error }}</div>
+ {{ end }}
</div>
- {{end}}
+ {{ end }}
</body>
</html>
diff --git a/internal/internal.go b/internal/internal.go
index ce6de02..46b9877 100644
--- a/internal/internal.go
+++ b/internal/internal.go
@@ -7,5 +7,5 @@ import "time"
// Now return the current time.
var Now = func() time.Time {
- return time.Now()
+ return time.Now().UTC().Round(time.Second)
}
diff --git a/reports.go b/reports.go
index 1ffbd43..91ff2df 100644
--- a/reports.go
+++ b/reports.go
@@ -5,6 +5,6 @@ package lilin
// Reports contains the report for all services.
type Reports struct {
- Services map[string]ServiceReport
+ Services map[string]*ServiceReport
Title string
}
diff --git a/server.go b/server.go
index 620c36f..0ff4e2a 100644
--- a/server.go
+++ b/server.go
@@ -10,6 +10,7 @@ import (
"html/template"
"log"
"net/http"
+ "sync"
"time"
)
@@ -24,11 +25,18 @@ type Server struct {
httpd *http.Server
worker *worker
+ // updateq channel to receive notification when the new report coming
+ // from worker.
+ updateq chan struct{}
+
Options ServerOptions
- Reports Reports
pageIndexTmpl *template.Template
pageIndexBody bytes.Buffer
+
+ // pageIndexMutex guard the pageIndexBody between changes from worker
+ // Reports (write) and when rendered to client (read).
+ pageIndexMutex sync.Mutex
}
// NewServer create new server to serve the content of _www and HTTP APIs.
@@ -37,9 +45,7 @@ func NewServer(opts ServerOptions) (srv *Server, err error) {
srv = &Server{
Options: opts,
- Reports: Reports{
- Services: map[string]ServiceReport{},
- },
+ updateq: make(chan struct{}, 1),
}
err = srv.Options.init()
@@ -52,17 +58,11 @@ func NewServer(opts ServerOptions) (srv *Server, err error) {
return nil, fmt.Errorf(`%s: %w`, logp, err)
}
- for name := range srv.worker.Services {
- srv.Reports.Services[name] = ServiceReport{
- Name: name,
- }
- }
-
srv.pageIndexTmpl, err = template.ParseFS(wwwFS, `_www/index.tmpl`)
if err != nil {
return nil, err
}
- err = srv.pageIndexTmpl.Execute(&srv.pageIndexBody, &srv.Reports)
+ err = srv.pageIndexTmpl.Execute(&srv.pageIndexBody, &srv.worker.Reports)
if err != nil {
return nil, err
}
@@ -81,6 +81,9 @@ func NewServer(opts ServerOptions) (srv *Server, err error) {
// ListenAndServe start handling request on incoming connections.
func (srv *Server) ListenAndServe() (err error) {
+ go srv.update()
+ go srv.worker.start(srv.updateq)
+
log.Printf(`lilin: starting HTTP server at %s`, srv.Options.Address)
err = srv.httpd.ListenAndServe()
@@ -97,6 +100,8 @@ func (srv *Server) Shutdown(ctx context.Context) (err error) {
if err != nil {
return fmt.Errorf(`Shutdown: %w`, err)
}
+ srv.worker.stop()
+ close(srv.updateq)
return nil
}
@@ -105,7 +110,9 @@ func (srv *Server) ServeHTTP(resw http.ResponseWriter, req *http.Request) {
switch req.URL.Path {
case `/`, `/index.html`:
+ srv.pageIndexMutex.Lock()
_, err = resw.Write(srv.pageIndexBody.Bytes())
+ srv.pageIndexMutex.Unlock()
if err != nil {
srv.internalError(resw, err)
}
@@ -122,3 +129,41 @@ func (srv *Server) internalError(resw http.ResponseWriter, err error) {
log.Println(`internalError:`, err.Error())
}
}
+
+// update the index page body periodically only when there is changes from
+// upstream worker.
+// Since services may have different scanned interval (and may longer than
+// default interval), it will takes times to update the index page body every
+// times we receive updates from worker.
+// Hence why we need to update only when changes happened.
+func (srv *Server) update() {
+ var ticker = time.NewTicker(defInterval)
+ var c int
+ var err error
+ for {
+ select {
+ case _, ok := <-srv.updateq:
+ if !ok {
+ return
+ }
+ c++
+ case <-ticker.C:
+ if c == 0 {
+ continue
+ }
+ c = 0
+
+ srv.pageIndexMutex.Lock()
+ srv.worker.reportsMutex.Lock()
+
+ srv.pageIndexBody.Reset()
+ err = srv.pageIndexTmpl.Execute(&srv.pageIndexBody, &srv.worker.Reports)
+ if err != nil {
+ log.Printf(`update: pageIndexTmpl: %s`, err)
+ }
+
+ srv.worker.reportsMutex.Unlock()
+ srv.pageIndexMutex.Unlock()
+ }
+ }
+}
diff --git a/service.go b/service.go
index b1bf11c..e8c1f8d 100644
--- a/service.go
+++ b/service.go
@@ -66,7 +66,7 @@ func (svc *Service) Scan() (report ScanReport) {
case serviceKindTCP, serviceKindUDP:
var conn net.Conn
- conn, err = svc.dialer.Dial(`udp`, svc.opts.scanURL.Host)
+ conn, err = svc.dialer.Dial(svc.opts.scanURL.Scheme, svc.opts.scanURL.Host)
if err != nil {
report.Error = err.Error()
return report
@@ -92,7 +92,6 @@ func (svc *Service) Start(reportq chan<- ScanReport) {
log.Printf("Service: %s not ok", svc.opts.Name)
return
}
- log.Printf(`Scan %s started`, svc.opts.Name)
reportq <- svc.Scan()
}
}
diff --git a/worker.go b/worker.go
index f58363c..f3348a7 100644
--- a/worker.go
+++ b/worker.go
@@ -16,17 +16,31 @@ import (
type worker struct {
// Internal service status, where key is service name.
Services map[string]*Service `ini:"service"`
- sync.Mutex
+
+ reportq chan ScanReport
+
+ Reports Reports
+ reportsMutex sync.Mutex
}
func newWorker(configDir string) (wrk *worker, err error) {
wrk = &worker{
Services: make(map[string]*Service),
}
+
err = wrk.loadServiceDir(configDir)
if err != nil {
return nil, err
}
+
+ wrk.Reports = Reports{
+ Services: make(map[string]*ServiceReport, len(wrk.Services)),
+ }
+ for name := range wrk.Services {
+ wrk.Reports.Services[name] = &ServiceReport{
+ Name: name,
+ }
+ }
return wrk, nil
}
@@ -83,3 +97,41 @@ func (wrk *worker) loadServiceDir(configDir string) (err error) {
}
return nil
}
+
+// start run all the services scanner in the background and consume the report
+// from it through channel.
+// Once new report coming, notify the upstream through the passed channel.
+func (wrk *worker) start(updateq chan<- struct{}) {
+ wrk.reportq = make(chan ScanReport, len(wrk.Services)+1)
+
+ var svc *Service
+ for _, svc = range wrk.Services {
+ go svc.Start(wrk.reportq)
+ }
+
+ var scanReport ScanReport
+ var ok bool
+ for {
+ scanReport, ok = <-wrk.reportq
+ if !ok {
+ break
+ }
+
+ wrk.reportsMutex.Lock()
+ var svcReport = wrk.Reports.Services[scanReport.Name]
+ svcReport.Last = scanReport
+ svcReport.History = append(svcReport.History, scanReport)
+ wrk.reportsMutex.Unlock()
+
+ updateq <- struct{}{}
+ }
+}
+
+func (wrk *worker) stop() {
+ close(wrk.reportq)
+
+ var svc *Service
+ for _, svc = range wrk.Services {
+ svc.Stop()
+ }
+}