diff options
Diffstat (limited to 'cmd/worker/main.go')
| -rw-r--r-- | cmd/worker/main.go | 51 |
1 files changed, 51 insertions, 0 deletions
diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 2917d499..06990b2b 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -41,6 +41,19 @@ var ( // flag used in call to safehtml/template.TrustedSourceFromFlag _ = flag.String("static", "static", "path to folder containing static files served") bypassLicenseCheck = flag.Bool("bypass_license_check", false, "insert all data into the DB, even for non-redistributable paths") + + // Ordinarily, index polling is initiated by a separate scheduler that calls + // /poll. But for convenience, you can instead have the worker periodically + // do the same. + pollIndexPeriod = flag.Duration("poll_index_period", 0, "when set >0, schedules an index poll at this period") + pollIndexLimit = flag.Int("poll_index_limit", 10, "the amount of modules to fetch from the index when periodically polling") + pollIndexHorizon = flag.Duration("poll_index_horizon", time.Hour, "the amount of time ago to request modules each iteration when periodically polling") + + // Ordinarily, module version process enqueueing is initiated by a separate + // scheduler that calls /enqueue. But for convenience, you can instead have + // the worker periodically do the same. + enqueuePeriod = flag.Duration("enqueue_period", 0, "when set >0, schedules the worker to periodically enqueue work from the module_version_states table for processing, at this period") + enqueueLimit = flag.Int("enqueue_limit", 10, "the amount of modules to enqueue when periodically enqueueing") ) func main() { @@ -117,6 +130,44 @@ func main() { if err != nil { log.Fatal(ctx, err) } + + if *pollIndexPeriod != 0 { + go func() { + log.Infof(ctx, "starting periodic index polling. period=%v, limit=%v, horizon=%v", *pollIndexPeriod, *pollIndexLimit, *pollIndexHorizon) + ticker := time.NewTicker(*pollIndexPeriod) + for { + select { + case <-ctx.Done(): + log.Warningf(ctx, "cancelling periodic index polling: %v", ctx.Err()) + return + case <-ticker.C: + since := time.Now().Add(-1 * *pollIndexHorizon) + if err := server.PollIndex(ctx, since, *pollIndexLimit); err != nil { + log.Warningf(ctx, "error during periodic index polling: %v", err) + } + } + } + }() + } + + if *enqueuePeriod != 0 { + go func() { + log.Infof(ctx, "starting periodic enqueueing. period=%v, limit=%v", *enqueuePeriod, *enqueueLimit) + ticker := time.NewTicker(*enqueuePeriod) + for { + select { + case <-ctx.Done(): + log.Warningf(ctx, "cancelling periodic enqueueing: %v", ctx.Err()) + return + case <-ticker.C: + if err := server.Enqueue(ctx, nil, *enqueueLimit, ""); err != nil { + log.Warningf(ctx, "error during periodic enqueueing: %v", err) + } + } + } + }() + } + router := dcensus.NewRouter(nil) server.Install(router.Handle) |
