aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--content/static/html/worker/index.tmpl46
-rw-r--r--internal/postgres/versionstate.go2
-rw-r--r--internal/testing/integration/integration_test.go7
-rw-r--r--internal/worker/server.go89
-rw-r--r--internal/worker/server_test.go25
5 files changed, 114 insertions, 55 deletions
diff --git a/content/static/html/worker/index.tmpl b/content/static/html/worker/index.tmpl
index a0a88666..b82b5c8b 100644
--- a/content/static/html/worker/index.tmpl
+++ b/content/static/html/worker/index.tmpl
@@ -109,29 +109,29 @@ td {
</p>
<div class="actions">
- <form action="/poll-and-queue" method="post" name="queueForm">
- <button title="Poll the module index for up to 2000 new versions, and enqueue them for processing."
- onclick="submitForm('queueForm', false); return false">Enqueue From Module Index</button>
- <input type="number" name="limit" value="10"></input>
- <output name="result"></output>
- </form>
- <form action="/requeue" method="post" name="requeueForm">
- <button title="Query the discovery database for failed versions, and re-queue them for processing."
- onclick="submitForm('requeueForm', true); return false">Requeue Failed Versions</button>
- <input type="number" name="limit" value="10">
- <output name="result"></output>
- </form>
- <form action="/reprocess" method="post" name="reprocessForm">
- <button title="Mark all versions created before the specified app_version to be reprocessed."
- onclick="submitForm('reprocessForm', true); return false">Reprocess Versions</button>
- <input type="text" name="app_version">
- <output name="result"></output>
- </form>
- <form action="/populate-stdlib" method="post" name="populateStdlibForm">
- <button title="Populates the database with all supported versions of the Go standard library."
- onclick="submitForm('populateStdlibForm', false); return false">Populate Standard Library</button>
- <output name="result"></output>
- </form>
+ <form action="/poll" method="post" name="pollForm">
+ <button title="Poll the module index for up to 2000 new versions."
+ onclick="submitForm('pollForm', false); return false">Poll Module Index</button>
+ <input type="number" name="limit" value="10"></input>
+ <output name="result"></output>
+ </form>
+ <form action="/enqueue" method="post" name="enqueueForm">
+ <button title="Query the discovery database for new or failed versions, and enqueue them for processing."
+ onclick="submitForm('enqueueForm', true); return false">Enqueue New and Failed Versions</button>
+ <input type="number" name="limit" value="10">
+ <output name="result"></output>
+ </form>
+ <form action="/reprocess" method="post" name="reprocessForm">
+ <button title="Mark all versions created before the specified app_version to be reprocessed."
+ onclick="submitForm('reprocessForm', true); return false">Reprocess Versions</button>
+ <input type="text" name="app_version">
+ <output name="result"></output>
+ </form>
+ <form action="/populate-stdlib" method="post" name="populateStdlibForm">
+ <button title="Populates the database with all supported versions of the Go standard library."
+ onclick="submitForm('populateStdlibForm', false); return false">Populate Standard Library</button>
+ <output name="result"></output>
+ </form>
</div>
<div class="config">
diff --git a/internal/postgres/versionstate.go b/internal/postgres/versionstate.go
index f2bfc99d..064fd9cc 100644
--- a/internal/postgres/versionstate.go
+++ b/internal/postgres/versionstate.go
@@ -21,7 +21,7 @@ import (
)
// InsertIndexVersions inserts new versions into the module_version_states
-// table.
+// table with a status of zero.
func (db *DB) InsertIndexVersions(ctx context.Context, versions []*internal.IndexVersion) (err error) {
defer derrors.Wrap(&err, "InsertIndexVersions(ctx, %v)", versions)
diff --git a/internal/testing/integration/integration_test.go b/internal/testing/integration/integration_test.go
index afb1327b..a185f40c 100644
--- a/internal/testing/integration/integration_test.go
+++ b/internal/testing/integration/integration_test.go
@@ -113,7 +113,12 @@ func TestEndToEndProcessing(t *testing.T) {
frontendServer.Install(frontendMux.Handle, redisCacheClient)
frontendHTTP := httptest.NewServer(frontendMux)
- if _, err := doGet(workerHTTP.URL + "/poll-and-queue"); err != nil {
+ if _, err := doGet(workerHTTP.URL + "/poll"); err != nil {
+ t.Fatal(err)
+ }
+ // TODO: This should really be made deterministic.
+ time.Sleep(100 * time.Millisecond)
+ if _, err := doGet(workerHTTP.URL + "/enqueue"); err != nil {
t.Fatal(err)
}
// TODO: This should really be made deterministic.
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 70972910..264efd88 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -98,21 +98,30 @@ func (s *Server) Install(handle func(string, http.Handler)) {
if s.reportingClient != nil {
rmw = middleware.ErrorReporting(s.reportingClient.Report)
}
- // cloud-scheduler: poll-and-queue polls the Module Index for new versions
+
+ // scheduled: poll polls the Module Index for new versions
+ // that have been published and inserts that metadata into
+ // module_version_states.
+ // This endpoint is intended to be invoked periodically by a scheduler.
+ // See the note about duplicate tasks for "/enqueue" below.
+ handle("/poll", rmw(s.errorHandler(s.handlePollIndex)))
+
+ // TODO: remove after /poll is in production and the scheduler jobs have been changed.
+ // scheduled: poll-and-queue polls the Module Index for new versions
// that have been published and inserts that metadata into
// module_version_states. It also inserts the version into the task-queue
// to to be fetched and processed.
- // This endpoint is invoked by a Cloud Scheduler job.
+ // This endpoint is intended to be invoked periodically by a scheduler.
// See the note about duplicate tasks for "/requeue" below.
handle("/poll-and-queue", rmw(s.errorHandler(s.handleIndexAndQueue)))
- // cloud-scheduler: update-imported-by-count updates the imported_by_count for packages
- // in search_documents where imported_by_count_updated_at is null or
- // imported_by_count_updated_at < version_updated_at.
- // This endpoint is invoked by a Cloud Scheduler job.
+ // scheduled: update-imported-by-count update the imported_by_count for
+ // packages in search_documents where imported_by_count_updated_at is null
+ // or imported_by_count_updated_at < version_updated_at.
+ // This endpoint is intended to be invoked periodically by a scheduler.
handle("/update-imported-by-count", rmw(s.errorHandler(s.handleUpdateImportedByCount)))
- // cloud-scheduler: download search document data and update the redis sorted
+ // scheduled: download search document data and update the redis sorted
// set(s) used in auto-completion.
handle("/update-redis-indexes", rmw(s.errorHandler(s.handleUpdateRedisIndexes)))
@@ -121,18 +130,30 @@ func (s *Server) Install(handle func(string, http.Handler)) {
// request fails for any reason other than an http.StatusInternalServerError,
// it will return an http.StatusOK so that the task queue does not retry
// fetching module versions that have a terminal error.
- // This endpoint is invoked by a Cloud Tasks queue.
+ // This endpoint is intended to be invoked by a task queue with semantics like
+ // Google Cloud Task Queues.
handle("/fetch/", http.StripPrefix("/fetch", http.HandlerFunc(s.handleFetch)))
- // manual: requeue queries the module_version_states table for the next
+ // scheduled: enqueue queries the module_version_states table for the next
+ // batch of module versions to process, and enqueues them for processing.
+ // Normally this will not cause duplicate processing, because Cloud Tasks
+ // are de-duplicated. That does not apply after a task has been finished or
+ // deleted for Server.taskIDChangeInterval (see
+ // https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#createtaskrequest,
+ // under "Task De-duplication"). If you cannot wait, you can force
+ // duplicate tasks by providing any string as the "suffix" query parameter.
+ handle("/enqueue", rmw(s.errorHandler(s.handleEnqueue)))
+
+ // TODO: remove after /queue is in production and the scheduler jobs have been changed.
+ // scheduled: requeue queries the module_version_states table for the next
// batch of module versions to process, and enqueues them for processing.
// Normally this will not cause duplicate processing, because Cloud Tasks
// are de-duplicated. That does not apply after a task has been finished or
- // deleted for one hour (see
+ // deleted for Server.taskIDChangeInterval (see
// https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#createtaskrequest,
- // under "Task De-duplication"). If you cannot wait an hour, you can force
+ // under "Task De-duplication"). If you cannot wait, you can force
// duplicate tasks by providing any string as the "suffix" query parameter.
- handle("/requeue", rmw(s.errorHandler(s.handleRequeue)))
+ handle("/requeue", rmw(s.errorHandler(s.handleEnqueue)))
// manual: reprocess sets a reprocess status for all records in the
// module_version_states table that were processed by an app_version that
@@ -175,7 +196,7 @@ func (s *Server) handleUpdateImportedByCount(w http.ResponseWriter, r *http.Requ
// handleRepopulateSearchDocuments repopulates every row in the search_documents table
// that was last updated before the given time.
func (s *Server) handleRepopulateSearchDocuments(w http.ResponseWriter, r *http.Request) error {
- limit := parseIntParam(r, "limit", 100)
+ limit := parseLimitParam(r, 100)
beforeParam := r.FormValue("before")
if beforeParam == "" {
return &serverError{
@@ -272,10 +293,29 @@ func parseModulePathAndVersion(requestPath string) (string, string, error) {
return parts[0], parts[1], nil
}
+func (s *Server) handlePollIndex(w http.ResponseWriter, r *http.Request) (err error) {
+ defer derrors.Wrap(&err, "handlePollIndex(%q)", r.URL.Path)
+ ctx := r.Context()
+ limit := parseLimitParam(r, 10)
+ since, err := s.db.LatestIndexTimestamp(ctx)
+ if err != nil {
+ return err
+ }
+ versions, err := s.indexClient.GetVersions(ctx, since, limit)
+ if err != nil {
+ return err
+ }
+ if err := s.db.InsertIndexVersions(ctx, versions); err != nil {
+ return err
+ }
+ log.Infof(ctx, "Inserted %d modules from the index", len(versions))
+ return nil
+}
+
func (s *Server) handleIndexAndQueue(w http.ResponseWriter, r *http.Request) (err error) {
defer derrors.Wrap(&err, "handleIndexAndQueue(%q)", r.URL.Path)
ctx := r.Context()
- limit := parseIntParam(r, "limit", 10)
+ limit := parseLimitParam(r, 10)
suffixParam := r.FormValue("suffix")
since, err := s.db.LatestIndexTimestamp(ctx)
if err != nil {
@@ -303,13 +343,13 @@ func (s *Server) handleIndexAndQueue(w http.ResponseWriter, r *http.Request) (er
return nil
}
-// handleRequeue queries the module_version_states table for the next
-// batch of module versions to process, and enqueues them for processing. Note
-// that this may cause duplicate processing.
-func (s *Server) handleRequeue(w http.ResponseWriter, r *http.Request) (err error) {
- defer derrors.Wrap(&err, "handleRequeue(%q)", r.URL.Path)
+// handleEnqueue queries the module_version_states table for the next batch of
+// module versions to process, and enqueues them for processing. Note that this
+// may cause duplicate processing.
+func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err error) {
+ defer derrors.Wrap(&err, "handleEnqueue(%q)", r.URL.Path)
ctx := r.Context()
- limit := parseIntParam(r, "limit", 10)
+ limit := parseLimitParam(r, 10)
suffixParam := r.FormValue("suffix") // append to task name to avoid deduplication
span := trace.FromContext(r.Context())
span.Annotate([]trace.Attribute{trace.Int64Attribute("limit", int64(limit))}, "processed limit")
@@ -320,13 +360,13 @@ func (s *Server) handleRequeue(w http.ResponseWriter, r *http.Request) (err erro
span.Annotate([]trace.Attribute{trace.Int64Attribute("versions to fetch", int64(len(versions)))}, "processed limit")
w.Header().Set("Content-Type", "text/plain")
- log.Infof(ctx, "Scheduling modules to be fetched: requeuing %d modules", len(versions))
+ log.Infof(ctx, "Scheduling modules to be fetched: queuing %d modules", len(versions))
for _, v := range versions {
if err := s.queue.ScheduleFetch(ctx, v.ModulePath, v.Version, suffixParam, s.taskIDChangeInterval); err != nil {
return err
}
}
- log.Infof(ctx, "Successfully scheduled modules to be fetched: %d modules requeued", len(versions))
+ log.Infof(ctx, "Successfully scheduled modules to be fetched: %d modules queued", len(versions))
return nil
}
@@ -573,10 +613,11 @@ func formatTime(t *time.Time) string {
return t.In(locNewYork).Format("2006-01-02 15:04:05")
}
-// parseIntParam parses the query parameter with name as in integer. If the
+// parseLimitParam parses the query parameter with name as in integer. If the
// parameter is missing or there is a parse error, it is logged and the default
// value is returned.
-func parseIntParam(r *http.Request, name string, defaultValue int) int {
+func parseLimitParam(r *http.Request, defaultValue int) int {
+ const name = "limit"
param := r.FormValue(name)
if param == "" {
return defaultValue
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index 372428ae..bc926307 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -89,11 +89,11 @@ func TestWorker(t *testing.T) {
}
state = func(version *internal.IndexVersion, code, tryCount, numPackages int) *internal.ModuleVersionState {
goModPath := version.Path
- if code >= 300 {
+ if code == 0 || code >= 300 {
goModPath = ""
}
var n *int
- if code != http.StatusNotFound {
+ if code != 0 && code != http.StatusNotFound {
n = &numPackages
}
return &internal.ModuleVersionState{
@@ -123,11 +123,22 @@ func TestWorker(t *testing.T) {
wantBar *internal.ModuleVersionState
}{
{
+ label: "poll only",
+ index: []*internal.IndexVersion{fooIndex, barIndex},
+ proxy: []*proxy.TestModule{fooProxy, barProxy},
+ requests: []*http.Request{
+ httptest.NewRequest("POST", "/poll", nil),
+ },
+ wantFoo: fooState(0, 0),
+ wantBar: barState(0, 0),
+ },
+ {
label: "full fetch",
index: []*internal.IndexVersion{fooIndex, barIndex},
proxy: []*proxy.TestModule{fooProxy, barProxy},
requests: []*http.Request{
- httptest.NewRequest("POST", "/poll-and-queue", nil),
+ httptest.NewRequest("POST", "/poll", nil),
+ httptest.NewRequest("POST", "/enqueue", nil),
},
wantFoo: fooState(http.StatusOK, 1),
wantBar: barState(http.StatusOK, 1),
@@ -136,7 +147,8 @@ func TestWorker(t *testing.T) {
index: []*internal.IndexVersion{fooIndex, barIndex},
proxy: []*proxy.TestModule{fooProxy, barProxy},
requests: []*http.Request{
- httptest.NewRequest("POST", "/poll-and-queue?limit=1", nil),
+ httptest.NewRequest("POST", "/poll?limit=1", nil),
+ httptest.NewRequest("POST", "/enqueue", nil),
},
wantFoo: fooState(http.StatusOK, 1),
}, {
@@ -144,7 +156,8 @@ func TestWorker(t *testing.T) {
index: []*internal.IndexVersion{fooIndex, barIndex},
proxy: []*proxy.TestModule{fooProxy},
requests: []*http.Request{
- httptest.NewRequest("POST", "/poll-and-queue", nil),
+ httptest.NewRequest("POST", "/poll", nil),
+ httptest.NewRequest("POST", "/enqueue", nil),
},
wantFoo: fooState(http.StatusOK, 1),
wantBar: barState(http.StatusNotFound, 1),
@@ -238,7 +251,7 @@ func TestParseIntParam(t *testing.T) {
{"312", 312},
{"bad", -1},
} {
- got := parseIntParam(httptest.NewRequest("GET", fmt.Sprintf("/foo?x=%s", test.in), nil), "x", -1)
+ got := parseLimitParam(httptest.NewRequest("GET", fmt.Sprintf("/foo?limit=%s", test.in), nil), -1)
if got != test.want {
t.Errorf("%q: got %d, want %d", test.in, got, test.want)
}