From dfa517ef2f803e000bd058998099e76648f9e385 Mon Sep 17 00:00:00 2001 From: Jean Barkhuysen Date: Sun, 22 Feb 2026 13:38:30 -0700 Subject: internal/worker: add an option for in-process periodic poll/enqueue-ing Fixes golang/go#77689. Change-Id: Id754eeafdc8b680acb1aa137225292c808d0bf04 Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/747880 Reviewed-by: Jonathan Amsterdam LUCI-TryBot-Result: Go LUCI kokoro-CI: kokoro Reviewed-by: David Chase --- cmd/worker/main.go | 51 +++++++++++++++++++++++++++++++++++++++++++++++ internal/worker/server.go | 27 ++++++++++++++++++++----- 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 2917d499..06990b2b 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -41,6 +41,19 @@ var ( // flag used in call to safehtml/template.TrustedSourceFromFlag _ = flag.String("static", "static", "path to folder containing static files served") bypassLicenseCheck = flag.Bool("bypass_license_check", false, "insert all data into the DB, even for non-redistributable paths") + + // Ordinarily, index polling is initiated by a separate scheduler that calls + // /poll. But for convenience, you can instead have the worker periodically + // do the same. + pollIndexPeriod = flag.Duration("poll_index_period", 0, "when set >0, schedules an index poll at this period") + pollIndexLimit = flag.Int("poll_index_limit", 10, "the amount of modules to fetch from the index when periodically polling") + pollIndexHorizon = flag.Duration("poll_index_horizon", time.Hour, "the amount of time ago to request modules each iteration when periodically polling") + + // Ordinarily, module version process enqueueing is initiated by a separate + // scheduler that calls /enqueue. But for convenience, you can instead have + // the worker periodically do the same. + enqueuePeriod = flag.Duration("enqueue_period", 0, "when set >0, schedules the worker to periodically enqueue work from the module_version_states table for processing, at this period") + enqueueLimit = flag.Int("enqueue_limit", 10, "the amount of modules to enqueue when periodically enqueueing") ) func main() { @@ -117,6 +130,44 @@ func main() { if err != nil { log.Fatal(ctx, err) } + + if *pollIndexPeriod != 0 { + go func() { + log.Infof(ctx, "starting periodic index polling. period=%v, limit=%v, horizon=%v", *pollIndexPeriod, *pollIndexLimit, *pollIndexHorizon) + ticker := time.NewTicker(*pollIndexPeriod) + for { + select { + case <-ctx.Done(): + log.Warningf(ctx, "cancelling periodic index polling: %v", ctx.Err()) + return + case <-ticker.C: + since := time.Now().Add(-1 * *pollIndexHorizon) + if err := server.PollIndex(ctx, since, *pollIndexLimit); err != nil { + log.Warningf(ctx, "error during periodic index polling: %v", err) + } + } + } + }() + } + + if *enqueuePeriod != 0 { + go func() { + log.Infof(ctx, "starting periodic enqueueing. period=%v, limit=%v", *enqueuePeriod, *enqueueLimit) + ticker := time.NewTicker(*enqueuePeriod) + for { + select { + case <-ctx.Done(): + log.Warningf(ctx, "cancelling periodic enqueueing: %v", ctx.Err()) + return + case <-ticker.C: + if err := server.Enqueue(ctx, nil, *enqueueLimit, ""); err != nil { + log.Warningf(ctx, "error during periodic enqueueing: %v", err) + } + } + } + }() + } + router := dcensus.NewRouter(nil) server.Install(router.Handle) diff --git a/internal/worker/server.go b/internal/worker/server.go index d01e69fb..736e729f 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -429,6 +429,11 @@ func (s *Server) handlePollIndex(w http.ResponseWriter, r *http.Request) (err er return err } } + return s.PollIndex(ctx, since, limit) +} + +// Polls the module index and stores the results. +func (s *Server) PollIndex(ctx context.Context, since time.Time, limit int) error { log.Infof(ctx, "fetching %d versions since %v from the index", limit, since) modules, err := s.indexClient.GetVersions(ctx, since, limit) if err != nil { @@ -483,10 +488,20 @@ func (s *Server) computeUnprocessedModules(ctx context.Context) { // may cause duplicate processing. func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err error) { defer derrors.Wrap(&err, "handleEnqueue(%q)", r.URL.Path) - ctx := r.Context() limit := parseIntParam(r, "limit", 10) suffixParam := r.FormValue("suffix") // append to task name to avoid deduplication - span := trace.FromContext(r.Context()) + return s.Enqueue(r.Context(), w, limit, suffixParam) +} + +// Enqueues the next batch of module versions to process into the server's +// queue. +// +// w may be nil. +// limit limits the number of modules enqueued. +// suffix is used to force reprocessing of tasks that would normally be +// de-duplicated. It is appended to the task name. +func (s *Server) Enqueue(ctx context.Context, w http.ResponseWriter, limit int, suffix string) error { + span := trace.FromContext(ctx) span.Annotate([]trace.Attribute{trace.Int64Attribute("limit", int64(limit))}, "processed limit") modules, err := s.db.GetNextModulesToFetch(ctx, limit) if err != nil { @@ -494,7 +509,9 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err erro } span.Annotate([]trace.Attribute{trace.Int64Attribute("modules to fetch", int64(len(modules)))}, "processed limit") - w.Header().Set("Content-Type", "text/plain") + if w != nil { + w.Header().Set("Content-Type", "text/plain") + } log.Infof(ctx, "Scheduling modules to be fetched: queuing %d modules", len(modules)) // Enqueue concurrently, because sequentially takes a while. @@ -507,7 +524,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err erro for _, m := range modules { m := m opts := queue.Options{ - Suffix: suffixParam, + Suffix: suffix, DisableProxyFetch: shouldDisableProxyFetch(m), Source: queue.SourceWorkerValue, } @@ -521,7 +538,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err erro nErrors++ } else if enqueued { nEnqueued++ - recordEnqueue(r.Context(), m.Status) + recordEnqueue(ctx, m.Status) } mu.Unlock() }() -- cgit v1.3