diff options
Diffstat (limited to 'internal/worker/server.go')
| -rw-r--r-- | internal/worker/server.go | 27 |
1 files changed, 22 insertions, 5 deletions
diff --git a/internal/worker/server.go b/internal/worker/server.go index d01e69fb..736e729f 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -429,6 +429,11 @@ func (s *Server) handlePollIndex(w http.ResponseWriter, r *http.Request) (err er return err } } + return s.PollIndex(ctx, since, limit) +} + +// Polls the module index and stores the results. +func (s *Server) PollIndex(ctx context.Context, since time.Time, limit int) error { log.Infof(ctx, "fetching %d versions since %v from the index", limit, since) modules, err := s.indexClient.GetVersions(ctx, since, limit) if err != nil { @@ -483,10 +488,20 @@ func (s *Server) computeUnprocessedModules(ctx context.Context) { // may cause duplicate processing. func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err error) { defer derrors.Wrap(&err, "handleEnqueue(%q)", r.URL.Path) - ctx := r.Context() limit := parseIntParam(r, "limit", 10) suffixParam := r.FormValue("suffix") // append to task name to avoid deduplication - span := trace.FromContext(r.Context()) + return s.Enqueue(r.Context(), w, limit, suffixParam) +} + +// Enqueues the next batch of module versions to process into the server's +// queue. +// +// w may be nil. +// limit limits the number of modules enqueued. +// suffix is used to force reprocessing of tasks that would normally be +// de-duplicated. It is appended to the task name. +func (s *Server) Enqueue(ctx context.Context, w http.ResponseWriter, limit int, suffix string) error { + span := trace.FromContext(ctx) span.Annotate([]trace.Attribute{trace.Int64Attribute("limit", int64(limit))}, "processed limit") modules, err := s.db.GetNextModulesToFetch(ctx, limit) if err != nil { @@ -494,7 +509,9 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err erro } span.Annotate([]trace.Attribute{trace.Int64Attribute("modules to fetch", int64(len(modules)))}, "processed limit") - w.Header().Set("Content-Type", "text/plain") + if w != nil { + w.Header().Set("Content-Type", "text/plain") + } log.Infof(ctx, "Scheduling modules to be fetched: queuing %d modules", len(modules)) // Enqueue concurrently, because sequentially takes a while. @@ -507,7 +524,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err erro for _, m := range modules { m := m opts := queue.Options{ - Suffix: suffixParam, + Suffix: suffix, DisableProxyFetch: shouldDisableProxyFetch(m), Source: queue.SourceWorkerValue, } @@ -521,7 +538,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err erro nErrors++ } else if enqueued { nEnqueued++ - recordEnqueue(r.Context(), m.Status) + recordEnqueue(ctx, m.Status) } mu.Unlock() }() |
