diff options
| -rw-r--r-- | content/static/html/worker/index.tmpl | 46 | ||||
| -rw-r--r-- | internal/postgres/versionstate.go | 2 | ||||
| -rw-r--r-- | internal/testing/integration/integration_test.go | 7 | ||||
| -rw-r--r-- | internal/worker/server.go | 89 | ||||
| -rw-r--r-- | internal/worker/server_test.go | 25 |
5 files changed, 114 insertions, 55 deletions
diff --git a/content/static/html/worker/index.tmpl b/content/static/html/worker/index.tmpl index a0a88666..b82b5c8b 100644 --- a/content/static/html/worker/index.tmpl +++ b/content/static/html/worker/index.tmpl @@ -109,29 +109,29 @@ td { </p> <div class="actions"> - <form action="/poll-and-queue" method="post" name="queueForm"> - <button title="Poll the module index for up to 2000 new versions, and enqueue them for processing." - onclick="submitForm('queueForm', false); return false">Enqueue From Module Index</button> - <input type="number" name="limit" value="10"></input> - <output name="result"></output> - </form> - <form action="/requeue" method="post" name="requeueForm"> - <button title="Query the discovery database for failed versions, and re-queue them for processing." - onclick="submitForm('requeueForm', true); return false">Requeue Failed Versions</button> - <input type="number" name="limit" value="10"> - <output name="result"></output> - </form> - <form action="/reprocess" method="post" name="reprocessForm"> - <button title="Mark all versions created before the specified app_version to be reprocessed." - onclick="submitForm('reprocessForm', true); return false">Reprocess Versions</button> - <input type="text" name="app_version"> - <output name="result"></output> - </form> - <form action="/populate-stdlib" method="post" name="populateStdlibForm"> - <button title="Populates the database with all supported versions of the Go standard library." - onclick="submitForm('populateStdlibForm', false); return false">Populate Standard Library</button> - <output name="result"></output> - </form> + <form action="/poll" method="post" name="pollForm"> + <button title="Poll the module index for up to 2000 new versions." + onclick="submitForm('pollForm', false); return false">Poll Module Index</button> + <input type="number" name="limit" value="10"></input> + <output name="result"></output> + </form> + <form action="/enqueue" method="post" name="enqueueForm"> + <button title="Query the discovery database for new or failed versions, and enqueue them for processing." + onclick="submitForm('enqueueForm', true); return false">Enqueue New and Failed Versions</button> + <input type="number" name="limit" value="10"> + <output name="result"></output> + </form> + <form action="/reprocess" method="post" name="reprocessForm"> + <button title="Mark all versions created before the specified app_version to be reprocessed." + onclick="submitForm('reprocessForm', true); return false">Reprocess Versions</button> + <input type="text" name="app_version"> + <output name="result"></output> + </form> + <form action="/populate-stdlib" method="post" name="populateStdlibForm"> + <button title="Populates the database with all supported versions of the Go standard library." + onclick="submitForm('populateStdlibForm', false); return false">Populate Standard Library</button> + <output name="result"></output> + </form> </div> <div class="config"> diff --git a/internal/postgres/versionstate.go b/internal/postgres/versionstate.go index f2bfc99d..064fd9cc 100644 --- a/internal/postgres/versionstate.go +++ b/internal/postgres/versionstate.go @@ -21,7 +21,7 @@ import ( ) // InsertIndexVersions inserts new versions into the module_version_states -// table. +// table with a status of zero. func (db *DB) InsertIndexVersions(ctx context.Context, versions []*internal.IndexVersion) (err error) { defer derrors.Wrap(&err, "InsertIndexVersions(ctx, %v)", versions) diff --git a/internal/testing/integration/integration_test.go b/internal/testing/integration/integration_test.go index afb1327b..a185f40c 100644 --- a/internal/testing/integration/integration_test.go +++ b/internal/testing/integration/integration_test.go @@ -113,7 +113,12 @@ func TestEndToEndProcessing(t *testing.T) { frontendServer.Install(frontendMux.Handle, redisCacheClient) frontendHTTP := httptest.NewServer(frontendMux) - if _, err := doGet(workerHTTP.URL + "/poll-and-queue"); err != nil { + if _, err := doGet(workerHTTP.URL + "/poll"); err != nil { + t.Fatal(err) + } + // TODO: This should really be made deterministic. + time.Sleep(100 * time.Millisecond) + if _, err := doGet(workerHTTP.URL + "/enqueue"); err != nil { t.Fatal(err) } // TODO: This should really be made deterministic. diff --git a/internal/worker/server.go b/internal/worker/server.go index 70972910..264efd88 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -98,21 +98,30 @@ func (s *Server) Install(handle func(string, http.Handler)) { if s.reportingClient != nil { rmw = middleware.ErrorReporting(s.reportingClient.Report) } - // cloud-scheduler: poll-and-queue polls the Module Index for new versions + + // scheduled: poll polls the Module Index for new versions + // that have been published and inserts that metadata into + // module_version_states. + // This endpoint is intended to be invoked periodically by a scheduler. + // See the note about duplicate tasks for "/enqueue" below. + handle("/poll", rmw(s.errorHandler(s.handlePollIndex))) + + // TODO: remove after /poll is in production and the scheduler jobs have been changed. + // scheduled: poll-and-queue polls the Module Index for new versions // that have been published and inserts that metadata into // module_version_states. It also inserts the version into the task-queue // to to be fetched and processed. - // This endpoint is invoked by a Cloud Scheduler job. + // This endpoint is intended to be invoked periodically by a scheduler. // See the note about duplicate tasks for "/requeue" below. handle("/poll-and-queue", rmw(s.errorHandler(s.handleIndexAndQueue))) - // cloud-scheduler: update-imported-by-count updates the imported_by_count for packages - // in search_documents where imported_by_count_updated_at is null or - // imported_by_count_updated_at < version_updated_at. - // This endpoint is invoked by a Cloud Scheduler job. + // scheduled: update-imported-by-count update the imported_by_count for + // packages in search_documents where imported_by_count_updated_at is null + // or imported_by_count_updated_at < version_updated_at. + // This endpoint is intended to be invoked periodically by a scheduler. handle("/update-imported-by-count", rmw(s.errorHandler(s.handleUpdateImportedByCount))) - // cloud-scheduler: download search document data and update the redis sorted + // scheduled: download search document data and update the redis sorted // set(s) used in auto-completion. handle("/update-redis-indexes", rmw(s.errorHandler(s.handleUpdateRedisIndexes))) @@ -121,18 +130,30 @@ func (s *Server) Install(handle func(string, http.Handler)) { // request fails for any reason other than an http.StatusInternalServerError, // it will return an http.StatusOK so that the task queue does not retry // fetching module versions that have a terminal error. - // This endpoint is invoked by a Cloud Tasks queue. + // This endpoint is intended to be invoked by a task queue with semantics like + // Google Cloud Task Queues. handle("/fetch/", http.StripPrefix("/fetch", http.HandlerFunc(s.handleFetch))) - // manual: requeue queries the module_version_states table for the next + // scheduled: enqueue queries the module_version_states table for the next + // batch of module versions to process, and enqueues them for processing. + // Normally this will not cause duplicate processing, because Cloud Tasks + // are de-duplicated. That does not apply after a task has been finished or + // deleted for Server.taskIDChangeInterval (see + // https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#createtaskrequest, + // under "Task De-duplication"). If you cannot wait, you can force + // duplicate tasks by providing any string as the "suffix" query parameter. + handle("/enqueue", rmw(s.errorHandler(s.handleEnqueue))) + + // TODO: remove after /queue is in production and the scheduler jobs have been changed. + // scheduled: requeue queries the module_version_states table for the next // batch of module versions to process, and enqueues them for processing. // Normally this will not cause duplicate processing, because Cloud Tasks // are de-duplicated. That does not apply after a task has been finished or - // deleted for one hour (see + // deleted for Server.taskIDChangeInterval (see // https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#createtaskrequest, - // under "Task De-duplication"). If you cannot wait an hour, you can force + // under "Task De-duplication"). If you cannot wait, you can force // duplicate tasks by providing any string as the "suffix" query parameter. - handle("/requeue", rmw(s.errorHandler(s.handleRequeue))) + handle("/requeue", rmw(s.errorHandler(s.handleEnqueue))) // manual: reprocess sets a reprocess status for all records in the // module_version_states table that were processed by an app_version that @@ -175,7 +196,7 @@ func (s *Server) handleUpdateImportedByCount(w http.ResponseWriter, r *http.Requ // handleRepopulateSearchDocuments repopulates every row in the search_documents table // that was last updated before the given time. func (s *Server) handleRepopulateSearchDocuments(w http.ResponseWriter, r *http.Request) error { - limit := parseIntParam(r, "limit", 100) + limit := parseLimitParam(r, 100) beforeParam := r.FormValue("before") if beforeParam == "" { return &serverError{ @@ -272,10 +293,29 @@ func parseModulePathAndVersion(requestPath string) (string, string, error) { return parts[0], parts[1], nil } +func (s *Server) handlePollIndex(w http.ResponseWriter, r *http.Request) (err error) { + defer derrors.Wrap(&err, "handlePollIndex(%q)", r.URL.Path) + ctx := r.Context() + limit := parseLimitParam(r, 10) + since, err := s.db.LatestIndexTimestamp(ctx) + if err != nil { + return err + } + versions, err := s.indexClient.GetVersions(ctx, since, limit) + if err != nil { + return err + } + if err := s.db.InsertIndexVersions(ctx, versions); err != nil { + return err + } + log.Infof(ctx, "Inserted %d modules from the index", len(versions)) + return nil +} + func (s *Server) handleIndexAndQueue(w http.ResponseWriter, r *http.Request) (err error) { defer derrors.Wrap(&err, "handleIndexAndQueue(%q)", r.URL.Path) ctx := r.Context() - limit := parseIntParam(r, "limit", 10) + limit := parseLimitParam(r, 10) suffixParam := r.FormValue("suffix") since, err := s.db.LatestIndexTimestamp(ctx) if err != nil { @@ -303,13 +343,13 @@ func (s *Server) handleIndexAndQueue(w http.ResponseWriter, r *http.Request) (er return nil } -// handleRequeue queries the module_version_states table for the next -// batch of module versions to process, and enqueues them for processing. Note -// that this may cause duplicate processing. -func (s *Server) handleRequeue(w http.ResponseWriter, r *http.Request) (err error) { - defer derrors.Wrap(&err, "handleRequeue(%q)", r.URL.Path) +// handleEnqueue queries the module_version_states table for the next batch of +// module versions to process, and enqueues them for processing. Note that this +// may cause duplicate processing. +func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err error) { + defer derrors.Wrap(&err, "handleEnqueue(%q)", r.URL.Path) ctx := r.Context() - limit := parseIntParam(r, "limit", 10) + limit := parseLimitParam(r, 10) suffixParam := r.FormValue("suffix") // append to task name to avoid deduplication span := trace.FromContext(r.Context()) span.Annotate([]trace.Attribute{trace.Int64Attribute("limit", int64(limit))}, "processed limit") @@ -320,13 +360,13 @@ func (s *Server) handleRequeue(w http.ResponseWriter, r *http.Request) (err erro span.Annotate([]trace.Attribute{trace.Int64Attribute("versions to fetch", int64(len(versions)))}, "processed limit") w.Header().Set("Content-Type", "text/plain") - log.Infof(ctx, "Scheduling modules to be fetched: requeuing %d modules", len(versions)) + log.Infof(ctx, "Scheduling modules to be fetched: queuing %d modules", len(versions)) for _, v := range versions { if err := s.queue.ScheduleFetch(ctx, v.ModulePath, v.Version, suffixParam, s.taskIDChangeInterval); err != nil { return err } } - log.Infof(ctx, "Successfully scheduled modules to be fetched: %d modules requeued", len(versions)) + log.Infof(ctx, "Successfully scheduled modules to be fetched: %d modules queued", len(versions)) return nil } @@ -573,10 +613,11 @@ func formatTime(t *time.Time) string { return t.In(locNewYork).Format("2006-01-02 15:04:05") } -// parseIntParam parses the query parameter with name as in integer. If the +// parseLimitParam parses the query parameter with name as in integer. If the // parameter is missing or there is a parse error, it is logged and the default // value is returned. -func parseIntParam(r *http.Request, name string, defaultValue int) int { +func parseLimitParam(r *http.Request, defaultValue int) int { + const name = "limit" param := r.FormValue(name) if param == "" { return defaultValue diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index 372428ae..bc926307 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -89,11 +89,11 @@ func TestWorker(t *testing.T) { } state = func(version *internal.IndexVersion, code, tryCount, numPackages int) *internal.ModuleVersionState { goModPath := version.Path - if code >= 300 { + if code == 0 || code >= 300 { goModPath = "" } var n *int - if code != http.StatusNotFound { + if code != 0 && code != http.StatusNotFound { n = &numPackages } return &internal.ModuleVersionState{ @@ -123,11 +123,22 @@ func TestWorker(t *testing.T) { wantBar *internal.ModuleVersionState }{ { + label: "poll only", + index: []*internal.IndexVersion{fooIndex, barIndex}, + proxy: []*proxy.TestModule{fooProxy, barProxy}, + requests: []*http.Request{ + httptest.NewRequest("POST", "/poll", nil), + }, + wantFoo: fooState(0, 0), + wantBar: barState(0, 0), + }, + { label: "full fetch", index: []*internal.IndexVersion{fooIndex, barIndex}, proxy: []*proxy.TestModule{fooProxy, barProxy}, requests: []*http.Request{ - httptest.NewRequest("POST", "/poll-and-queue", nil), + httptest.NewRequest("POST", "/poll", nil), + httptest.NewRequest("POST", "/enqueue", nil), }, wantFoo: fooState(http.StatusOK, 1), wantBar: barState(http.StatusOK, 1), @@ -136,7 +147,8 @@ func TestWorker(t *testing.T) { index: []*internal.IndexVersion{fooIndex, barIndex}, proxy: []*proxy.TestModule{fooProxy, barProxy}, requests: []*http.Request{ - httptest.NewRequest("POST", "/poll-and-queue?limit=1", nil), + httptest.NewRequest("POST", "/poll?limit=1", nil), + httptest.NewRequest("POST", "/enqueue", nil), }, wantFoo: fooState(http.StatusOK, 1), }, { @@ -144,7 +156,8 @@ func TestWorker(t *testing.T) { index: []*internal.IndexVersion{fooIndex, barIndex}, proxy: []*proxy.TestModule{fooProxy}, requests: []*http.Request{ - httptest.NewRequest("POST", "/poll-and-queue", nil), + httptest.NewRequest("POST", "/poll", nil), + httptest.NewRequest("POST", "/enqueue", nil), }, wantFoo: fooState(http.StatusOK, 1), wantBar: barState(http.StatusNotFound, 1), @@ -238,7 +251,7 @@ func TestParseIntParam(t *testing.T) { {"312", 312}, {"bad", -1}, } { - got := parseIntParam(httptest.NewRequest("GET", fmt.Sprintf("/foo?x=%s", test.in), nil), "x", -1) + got := parseLimitParam(httptest.NewRequest("GET", fmt.Sprintf("/foo?limit=%s", test.in), nil), -1) if got != test.want { t.Errorf("%q: got %d, want %d", test.in, got, test.want) } |
