aboutsummaryrefslogtreecommitdiff
path: root/internal/worker/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/worker/server.go')
-rw-r--r--internal/worker/server.go27
1 files changed, 22 insertions, 5 deletions
diff --git a/internal/worker/server.go b/internal/worker/server.go
index d01e69fb..736e729f 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -429,6 +429,11 @@ func (s *Server) handlePollIndex(w http.ResponseWriter, r *http.Request) (err er
return err
}
}
+ return s.PollIndex(ctx, since, limit)
+}
+
+// Polls the module index and stores the results.
+func (s *Server) PollIndex(ctx context.Context, since time.Time, limit int) error {
log.Infof(ctx, "fetching %d versions since %v from the index", limit, since)
modules, err := s.indexClient.GetVersions(ctx, since, limit)
if err != nil {
@@ -483,10 +488,20 @@ func (s *Server) computeUnprocessedModules(ctx context.Context) {
// may cause duplicate processing.
func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err error) {
defer derrors.Wrap(&err, "handleEnqueue(%q)", r.URL.Path)
- ctx := r.Context()
limit := parseIntParam(r, "limit", 10)
suffixParam := r.FormValue("suffix") // append to task name to avoid deduplication
- span := trace.FromContext(r.Context())
+ return s.Enqueue(r.Context(), w, limit, suffixParam)
+}
+
+// Enqueues the next batch of module versions to process into the server's
+// queue.
+//
+// w may be nil.
+// limit limits the number of modules enqueued.
+// suffix is used to force reprocessing of tasks that would normally be
+// de-duplicated. It is appended to the task name.
+func (s *Server) Enqueue(ctx context.Context, w http.ResponseWriter, limit int, suffix string) error {
+ span := trace.FromContext(ctx)
span.Annotate([]trace.Attribute{trace.Int64Attribute("limit", int64(limit))}, "processed limit")
modules, err := s.db.GetNextModulesToFetch(ctx, limit)
if err != nil {
@@ -494,7 +509,9 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err erro
}
span.Annotate([]trace.Attribute{trace.Int64Attribute("modules to fetch", int64(len(modules)))}, "processed limit")
- w.Header().Set("Content-Type", "text/plain")
+ if w != nil {
+ w.Header().Set("Content-Type", "text/plain")
+ }
log.Infof(ctx, "Scheduling modules to be fetched: queuing %d modules", len(modules))
// Enqueue concurrently, because sequentially takes a while.
@@ -507,7 +524,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err erro
for _, m := range modules {
m := m
opts := queue.Options{
- Suffix: suffixParam,
+ Suffix: suffix,
DisableProxyFetch: shouldDisableProxyFetch(m),
Source: queue.SourceWorkerValue,
}
@@ -521,7 +538,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err erro
nErrors++
} else if enqueued {
nEnqueued++
- recordEnqueue(r.Context(), m.Status)
+ recordEnqueue(ctx, m.Status)
}
mu.Unlock()
}()