aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean Barkhuysen <jean.barkhuysen@gmail.com>2026-02-22 13:38:30 -0700
committerJonathan Amsterdam <jba@google.com>2026-02-24 10:07:15 -0800
commitdfa517ef2f803e000bd058998099e76648f9e385 (patch)
tree10b2bad23d2639ac76f19322520973f5bab1a77b
parent1c7acd4bf511b868c084297235f6f38db2264b21 (diff)
downloadgo-x-pkgsite-dfa517ef2f803e000bd058998099e76648f9e385.tar.xz
internal/worker: add an option for in-process periodic poll/enqueue-ing
Fixes golang/go#77689. Change-Id: Id754eeafdc8b680acb1aa137225292c808d0bf04 Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/747880 Reviewed-by: Jonathan Amsterdam <jba@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> kokoro-CI: kokoro <noreply+kokoro@google.com> Reviewed-by: David Chase <drchase@google.com>
-rw-r--r--cmd/worker/main.go51
-rw-r--r--internal/worker/server.go27
2 files changed, 73 insertions, 5 deletions
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 2917d499..06990b2b 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -41,6 +41,19 @@ var (
// flag used in call to safehtml/template.TrustedSourceFromFlag
_ = flag.String("static", "static", "path to folder containing static files served")
bypassLicenseCheck = flag.Bool("bypass_license_check", false, "insert all data into the DB, even for non-redistributable paths")
+
+ // Ordinarily, index polling is initiated by a separate scheduler that calls
+ // /poll. But for convenience, you can instead have the worker periodically
+ // do the same.
+ pollIndexPeriod = flag.Duration("poll_index_period", 0, "when set >0, schedules an index poll at this period")
+ pollIndexLimit = flag.Int("poll_index_limit", 10, "the amount of modules to fetch from the index when periodically polling")
+ pollIndexHorizon = flag.Duration("poll_index_horizon", time.Hour, "the amount of time ago to request modules each iteration when periodically polling")
+
+ // Ordinarily, module version process enqueueing is initiated by a separate
+ // scheduler that calls /enqueue. But for convenience, you can instead have
+ // the worker periodically do the same.
+ enqueuePeriod = flag.Duration("enqueue_period", 0, "when set >0, schedules the worker to periodically enqueue work from the module_version_states table for processing, at this period")
+ enqueueLimit = flag.Int("enqueue_limit", 10, "the amount of modules to enqueue when periodically enqueueing")
)
func main() {
@@ -117,6 +130,44 @@ func main() {
if err != nil {
log.Fatal(ctx, err)
}
+
+ if *pollIndexPeriod != 0 {
+ go func() {
+ log.Infof(ctx, "starting periodic index polling. period=%v, limit=%v, horizon=%v", *pollIndexPeriod, *pollIndexLimit, *pollIndexHorizon)
+ ticker := time.NewTicker(*pollIndexPeriod)
+ for {
+ select {
+ case <-ctx.Done():
+ log.Warningf(ctx, "cancelling periodic index polling: %v", ctx.Err())
+ return
+ case <-ticker.C:
+ since := time.Now().Add(-1 * *pollIndexHorizon)
+ if err := server.PollIndex(ctx, since, *pollIndexLimit); err != nil {
+ log.Warningf(ctx, "error during periodic index polling: %v", err)
+ }
+ }
+ }
+ }()
+ }
+
+ if *enqueuePeriod != 0 {
+ go func() {
+ log.Infof(ctx, "starting periodic enqueueing. period=%v, limit=%v", *enqueuePeriod, *enqueueLimit)
+ ticker := time.NewTicker(*enqueuePeriod)
+ for {
+ select {
+ case <-ctx.Done():
+ log.Warningf(ctx, "cancelling periodic enqueueing: %v", ctx.Err())
+ return
+ case <-ticker.C:
+ if err := server.Enqueue(ctx, nil, *enqueueLimit, ""); err != nil {
+ log.Warningf(ctx, "error during periodic enqueueing: %v", err)
+ }
+ }
+ }
+ }()
+ }
+
router := dcensus.NewRouter(nil)
server.Install(router.Handle)
diff --git a/internal/worker/server.go b/internal/worker/server.go
index d01e69fb..736e729f 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -429,6 +429,11 @@ func (s *Server) handlePollIndex(w http.ResponseWriter, r *http.Request) (err er
return err
}
}
+ return s.PollIndex(ctx, since, limit)
+}
+
+// Polls the module index and stores the results.
+func (s *Server) PollIndex(ctx context.Context, since time.Time, limit int) error {
log.Infof(ctx, "fetching %d versions since %v from the index", limit, since)
modules, err := s.indexClient.GetVersions(ctx, since, limit)
if err != nil {
@@ -483,10 +488,20 @@ func (s *Server) computeUnprocessedModules(ctx context.Context) {
// may cause duplicate processing.
func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err error) {
defer derrors.Wrap(&err, "handleEnqueue(%q)", r.URL.Path)
- ctx := r.Context()
limit := parseIntParam(r, "limit", 10)
suffixParam := r.FormValue("suffix") // append to task name to avoid deduplication
- span := trace.FromContext(r.Context())
+ return s.Enqueue(r.Context(), w, limit, suffixParam)
+}
+
+// Enqueues the next batch of module versions to process into the server's
+// queue.
+//
+// w may be nil.
+// limit limits the number of modules enqueued.
+// suffix is used to force reprocessing of tasks that would normally be
+// de-duplicated. It is appended to the task name.
+func (s *Server) Enqueue(ctx context.Context, w http.ResponseWriter, limit int, suffix string) error {
+ span := trace.FromContext(ctx)
span.Annotate([]trace.Attribute{trace.Int64Attribute("limit", int64(limit))}, "processed limit")
modules, err := s.db.GetNextModulesToFetch(ctx, limit)
if err != nil {
@@ -494,7 +509,9 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err erro
}
span.Annotate([]trace.Attribute{trace.Int64Attribute("modules to fetch", int64(len(modules)))}, "processed limit")
- w.Header().Set("Content-Type", "text/plain")
+ if w != nil {
+ w.Header().Set("Content-Type", "text/plain")
+ }
log.Infof(ctx, "Scheduling modules to be fetched: queuing %d modules", len(modules))
// Enqueue concurrently, because sequentially takes a while.
@@ -507,7 +524,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err erro
for _, m := range modules {
m := m
opts := queue.Options{
- Suffix: suffixParam,
+ Suffix: suffix,
DisableProxyFetch: shouldDisableProxyFetch(m),
Source: queue.SourceWorkerValue,
}
@@ -521,7 +538,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err erro
nErrors++
} else if enqueued {
nEnqueued++
- recordEnqueue(r.Context(), m.Status)
+ recordEnqueue(ctx, m.Status)
}
mu.Unlock()
}()