diff options
| author | Julie Qiu <julie@golang.org> | 2019-07-11 19:16:38 -0400 |
|---|---|---|
| committer | Julie Qiu <julie@golang.org> | 2020-03-27 16:46:39 -0400 |
| commit | f1d57da5e2392e7511a3d2f5a3fd5664948b3efc (patch) | |
| tree | 8993c8b237e0166b534d808ab31e4fa18395cc5d /internal/postgres | |
| parent | 4fa5b50006d3020c2bccb93e331759b9b57e260c (diff) | |
| download | go-x-pkgsite-f1d57da5e2392e7511a3d2f5a3fd5664948b3efc.tar.xz | |
internal: add reprocess handler for outdated versions
A repocess handler is added to allow for reprocessing outdated module
versions.
The endpoint /reprocess?app_version=<app_version> will mark versions
last processed before the specified app_version with HTTP Status 505.
These versions will then be requeued using the existing /requeue
handler.
Existing references to "cron" are also updated to "etl".
Update b/133518045
Change-Id: I12f0822340d0a3a30c2769a52fea2879409a669b
Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/505099
Reviewed-by: Robert Findley <rfindley@google.com>
Diffstat (limited to 'internal/postgres')
| -rw-r--r-- | internal/postgres/versionstate.go | 113 | ||||
| -rw-r--r-- | internal/postgres/versionstate_test.go | 57 |
2 files changed, 123 insertions, 47 deletions
diff --git a/internal/postgres/versionstate.go b/internal/postgres/versionstate.go index c8ada28c..503b6a96 100644 --- a/internal/postgres/versionstate.go +++ b/internal/postgres/versionstate.go @@ -8,6 +8,7 @@ import ( "context" "database/sql" "fmt" + "log" "time" "github.com/lib/pq" @@ -35,6 +36,48 @@ func (db *DB) InsertIndexVersions(ctx context.Context, versions []*internal.Inde }) } +// UpsertVersionState inserts or updates the module_version_state table with +// the results of a fetch operation for a given module version. +func (db *DB) UpsertVersionState(ctx context.Context, modulePath, version, appVersion string, timestamp time.Time, status int, fetchErr error) error { + ctx, span := trace.StartSpan(ctx, "UpsertVersionState") + defer span.End() + query := ` + INSERT INTO module_version_states AS mvs (module_path, version, app_version, index_timestamp, status, error) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (module_path, version) DO UPDATE + SET + app_version=excluded.app_version, + status=excluded.status, + error=excluded.error, + try_count=mvs.try_count+1, + last_processed_at=CURRENT_TIMESTAMP, + next_processed_after=CASE + WHEN mvs.last_processed_at IS NULL THEN + CURRENT_TIMESTAMP + INTERVAL '1 minute' + WHEN 2*(mvs.next_processed_after - mvs.last_processed_at) < INTERVAL '1 hour' THEN + CURRENT_TIMESTAMP + 2*(mvs.next_processed_after - mvs.last_processed_at) + ELSE + CURRENT_TIMESTAMP + INTERVAL '1 hour' + END;` + + var sqlErrorMsg sql.NullString + if fetchErr != nil { + sqlErrorMsg = sql.NullString{Valid: true, String: fetchErr.Error()} + } + result, err := db.ExecContext(ctx, query, modulePath, version, appVersion, timestamp, status, sqlErrorMsg) + if err != nil { + return fmt.Errorf("db.ExecContext(ctx, %q, %q, %q, %q, %q, %q, %v): %v", query, modulePath, version, appVersion, timestamp, status, sqlErrorMsg, err) + } + affected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("result.RowsAffected(): %v", err) + } + if affected != 1 { + return fmt.Errorf("version state update affected %d rows, expected exactly 1", affected) + } + return nil +} + // LatestIndexTimestamp returns the last timestamp successfully inserted into // the module_version_states table. func (db *DB) LatestIndexTimestamp(ctx context.Context) (time.Time, error) { @@ -55,6 +98,27 @@ func (db *DB) LatestIndexTimestamp(ctx context.Context) (time.Time, error) { } } +func (db *DB) UpdateVersionStatesForReprocessing(ctx context.Context, appVersion string) error { + query := ` + UPDATE module_version_states + SET + status = 505, + next_processed_after = CURRENT_TIMESTAMP, + last_processed_at = NULL + WHERE + app_version <= $1;` + result, err := db.ExecContext(ctx, query, appVersion) + if err != nil { + return fmt.Errorf("db.ExecContext(ctx, %q, %q): %v", query, appVersion, err) + } + affected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("result.RowsAffected(): %v", err) + } + log.Printf("Updated %d module version states to be reprocessed for app_version <= %q", affected, appVersion) + return nil +} + const versionStateColumns = ` module_path, version, @@ -64,13 +128,14 @@ const versionStateColumns = ` error, try_count, last_processed_at, - next_processed_after` + next_processed_after, + app_version` // scanVersionState constructs an *internal.VersionState from the given // scanner. It expects columns to be in the order of versionStateColumns. func scanVersionState(scan func(dest ...interface{}) error) (*internal.VersionState, error) { var ( - modulePath, version string + modulePath, version, appVersion string indexTimestamp, createdAt, nextProcessedAfter time.Time lastProcessedAt pq.NullTime status sql.NullInt64 @@ -78,7 +143,7 @@ func scanVersionState(scan func(dest ...interface{}) error) (*internal.VersionSt tryCount int ) if err := scan(&modulePath, &version, &indexTimestamp, &createdAt, &status, &errorMsg, - &tryCount, &lastProcessedAt, &nextProcessedAfter); err != nil { + &tryCount, &lastProcessedAt, &nextProcessedAfter, &appVersion); err != nil { return nil, err } v := &internal.VersionState{ @@ -88,6 +153,7 @@ func scanVersionState(scan func(dest ...interface{}) error) (*internal.VersionSt CreatedAt: createdAt, TryCount: tryCount, NextProcessedAfter: nextProcessedAfter, + AppVersion: appVersion, } if status.Valid { s := int(status.Int64) @@ -166,47 +232,6 @@ func (db *DB) GetRecentVersions(ctx context.Context, limit int) ([]*internal.Ver return db.queryVersionStates(ctx, queryFormat, limit) } -// UpsertVersionState inserts or updates the module_version_state table with -// the results of a fetch operation for a given module version. -func (db *DB) UpsertVersionState(ctx context.Context, modulePath, version string, timestamp time.Time, status int, fetchErr error) error { - ctx, span := trace.StartSpan(ctx, "UpsertVersionState") - defer span.End() - query := ` - INSERT INTO module_version_states AS mvs (module_path, version, index_timestamp, status, error) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (module_path, version) DO UPDATE - SET - status=excluded.status, - error=excluded.error, - try_count=mvs.try_count+1, - last_processed_at=CURRENT_TIMESTAMP, - next_processed_after=CASE - WHEN mvs.last_processed_at IS NULL THEN - CURRENT_TIMESTAMP + INTERVAL '1 minute' - WHEN 2*(mvs.next_processed_after - mvs.last_processed_at) < INTERVAL '1 hour' THEN - CURRENT_TIMESTAMP + 2*(mvs.next_processed_after - mvs.last_processed_at) - ELSE - CURRENT_TIMESTAMP + INTERVAL '1 hour' - END;` - - var sqlErrorMsg sql.NullString - if fetchErr != nil { - sqlErrorMsg = sql.NullString{Valid: true, String: fetchErr.Error()} - } - result, err := db.ExecContext(ctx, query, modulePath, version, timestamp, status, sqlErrorMsg) - if err != nil { - return fmt.Errorf("db.ExecContext(ctx, %q, %q, %q, %q, %q, %v): %v", query, modulePath, version, timestamp, status, sqlErrorMsg, err) - } - affected, err := result.RowsAffected() - if err != nil { - return fmt.Errorf("result.RowsAffected(): %v", err) - } - if affected != 1 { - return fmt.Errorf("version state update affected %d rows, expected exactly 1", affected) - } - return nil -} - // GetVersionState returns the current version state for modulePath and // version. func (db *DB) GetVersionState(ctx context.Context, modulePath, version string) (*internal.VersionState, error) { diff --git a/internal/postgres/versionstate_test.go b/internal/postgres/versionstate_test.go index 034ca6a0..3e265296 100644 --- a/internal/postgres/versionstate_test.go +++ b/internal/postgres/versionstate_test.go @@ -7,6 +7,7 @@ package postgres import ( "context" "errors" + "net/http" "testing" "time" @@ -75,9 +76,9 @@ func TestVersionState(t *testing.T) { statusCode = 500 fetchErr = errors.New("bad request") ) - if err := testDB.UpsertVersionState(ctx, fooVersion.Path, fooVersion.Version, fooVersion.Timestamp, statusCode, fetchErr); err != nil { - t.Fatalf("testDB.UpsertVersionState(ctx, %q, %q, %d, %v): %v", fooVersion.Path, - versions[0].Version, statusCode, fetchErr, err) + if err := testDB.UpsertVersionState(ctx, fooVersion.Path, fooVersion.Version, "", fooVersion.Timestamp, statusCode, fetchErr); err != nil { + t.Fatalf("testDB.UpsertVersionState(ctx, %q, %q, %q, %d, %v): %v", fooVersion.Path, + versions[0].Version, "", statusCode, fetchErr, err) } errString := fetchErr.Error() wantFooState := &internal.VersionState{ @@ -112,3 +113,53 @@ func TestVersionState(t *testing.T) { t.Errorf("testDB.GetVersionStats(ctx) mismatch (-want +got):\n%s", diff) } } + +func TestUpdateVersionStatesForReprocessing(t *testing.T) { + defer ResetTestDB(testDB, t) + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + now := sample.NowTruncated() + for _, v := range []*internal.IndexVersion{ + &internal.IndexVersion{ + Path: "foo.com/bar", + Version: "v1.0.0", + Timestamp: now, + }, + &internal.IndexVersion{ + Path: "baz.com/quux", + Version: "v2.0.1", + Timestamp: now, + }, + } { + if err := testDB.UpsertVersionState(ctx, v.Path, v.Version, "", v.Timestamp, http.StatusOK, nil); err != nil { + t.Fatalf("testDB.UpsertVersionState(ctx, %q, %q, %v, %d, nil): %v", v.Path, v.Version, v.Timestamp, http.StatusOK, err) + } + } + + gotVersions, err := testDB.GetNextVersionsToFetch(ctx, 10) + if err != nil { + t.Fatalf("testDB.GetVersionsToFetch(ctx, 10): %v", err) + } + if len(gotVersions) != 0 { + t.Fatalf("testDB.GetVersionsToFetch(ctx, 10) = %v; wanted 0 versions", gotVersions) + } + if err := testDB.UpdateVersionStatesForReprocessing(ctx, "20190709t112655"); err != nil { + t.Fatalf("testDB.UpdateVersionStatesForReprocessing(ctx, 20190709t112655): %v", err) + } + + gotVersions, err = testDB.GetNextVersionsToFetch(ctx, 10) + if err != nil { + t.Fatalf("testDB.GetVersionsToFetch(ctx, 10): %v", err) + } + + code := http.StatusHTTPVersionNotSupported + wantVersions := []*internal.VersionState{ + {ModulePath: "foo.com/bar", Version: "v1.0.0", IndexTimestamp: now, Status: &code}, + {ModulePath: "baz.com/quux", Version: "v2.0.1", IndexTimestamp: now, Status: &code}, + } + ignore := cmpopts.IgnoreFields(internal.VersionState{}, "CreatedAt", "LastProcessedAt", "NextProcessedAfter") + if diff := cmp.Diff(wantVersions, gotVersions, ignore); diff != "" { + t.Fatalf("testDB.GetVersionsToFetch(ctx, 10) mismatch (-want +got):\n%s", diff) + } +} |
