aboutsummaryrefslogtreecommitdiff
path: root/cmd/worker/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/worker/main.go')
-rw-r--r--cmd/worker/main.go51
1 files changed, 51 insertions, 0 deletions
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 2917d499..06990b2b 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -41,6 +41,19 @@ var (
// flag used in call to safehtml/template.TrustedSourceFromFlag
_ = flag.String("static", "static", "path to folder containing static files served")
bypassLicenseCheck = flag.Bool("bypass_license_check", false, "insert all data into the DB, even for non-redistributable paths")
+
+ // Ordinarily, index polling is initiated by a separate scheduler that calls
+ // /poll. But for convenience, you can instead have the worker periodically
+ // do the same.
+ pollIndexPeriod = flag.Duration("poll_index_period", 0, "when set >0, schedules an index poll at this period")
+ pollIndexLimit = flag.Int("poll_index_limit", 10, "the amount of modules to fetch from the index when periodically polling")
+ pollIndexHorizon = flag.Duration("poll_index_horizon", time.Hour, "the amount of time ago to request modules each iteration when periodically polling")
+
+ // Ordinarily, module version process enqueueing is initiated by a separate
+ // scheduler that calls /enqueue. But for convenience, you can instead have
+ // the worker periodically do the same.
+ enqueuePeriod = flag.Duration("enqueue_period", 0, "when set >0, schedules the worker to periodically enqueue work from the module_version_states table for processing, at this period")
+ enqueueLimit = flag.Int("enqueue_limit", 10, "the amount of modules to enqueue when periodically enqueueing")
)
func main() {
@@ -117,6 +130,44 @@ func main() {
if err != nil {
log.Fatal(ctx, err)
}
+
+ if *pollIndexPeriod != 0 {
+ go func() {
+ log.Infof(ctx, "starting periodic index polling. period=%v, limit=%v, horizon=%v", *pollIndexPeriod, *pollIndexLimit, *pollIndexHorizon)
+ ticker := time.NewTicker(*pollIndexPeriod)
+ for {
+ select {
+ case <-ctx.Done():
+ log.Warningf(ctx, "cancelling periodic index polling: %v", ctx.Err())
+ return
+ case <-ticker.C:
+ since := time.Now().Add(-1 * *pollIndexHorizon)
+ if err := server.PollIndex(ctx, since, *pollIndexLimit); err != nil {
+ log.Warningf(ctx, "error during periodic index polling: %v", err)
+ }
+ }
+ }
+ }()
+ }
+
+ if *enqueuePeriod != 0 {
+ go func() {
+ log.Infof(ctx, "starting periodic enqueueing. period=%v, limit=%v", *enqueuePeriod, *enqueueLimit)
+ ticker := time.NewTicker(*enqueuePeriod)
+ for {
+ select {
+ case <-ctx.Done():
+ log.Warningf(ctx, "cancelling periodic enqueueing: %v", ctx.Err())
+ return
+ case <-ticker.C:
+ if err := server.Enqueue(ctx, nil, *enqueueLimit, ""); err != nil {
+ log.Warningf(ctx, "error during periodic enqueueing: %v", err)
+ }
+ }
+ }
+ }()
+ }
+
router := dcensus.NewRouter(nil)
server.Install(router.Handle)