aboutsummaryrefslogtreecommitdiff
path: root/internal/postgres
diff options
context:
space:
mode:
authorJulie Qiu <julie@golang.org>2019-07-11 19:16:38 -0400
committerJulie Qiu <julie@golang.org>2020-03-27 16:46:39 -0400
commitf1d57da5e2392e7511a3d2f5a3fd5664948b3efc (patch)
tree8993c8b237e0166b534d808ab31e4fa18395cc5d /internal/postgres
parent4fa5b50006d3020c2bccb93e331759b9b57e260c (diff)
downloadgo-x-pkgsite-f1d57da5e2392e7511a3d2f5a3fd5664948b3efc.tar.xz
internal: add reprocess handler for outdated versions
A repocess handler is added to allow for reprocessing outdated module versions. The endpoint /reprocess?app_version=<app_version> will mark versions last processed before the specified app_version with HTTP Status 505. These versions will then be requeued using the existing /requeue handler. Existing references to "cron" are also updated to "etl". Update b/133518045 Change-Id: I12f0822340d0a3a30c2769a52fea2879409a669b Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/505099 Reviewed-by: Robert Findley <rfindley@google.com>
Diffstat (limited to 'internal/postgres')
-rw-r--r--internal/postgres/versionstate.go113
-rw-r--r--internal/postgres/versionstate_test.go57
2 files changed, 123 insertions, 47 deletions
diff --git a/internal/postgres/versionstate.go b/internal/postgres/versionstate.go
index c8ada28c..503b6a96 100644
--- a/internal/postgres/versionstate.go
+++ b/internal/postgres/versionstate.go
@@ -8,6 +8,7 @@ import (
"context"
"database/sql"
"fmt"
+ "log"
"time"
"github.com/lib/pq"
@@ -35,6 +36,48 @@ func (db *DB) InsertIndexVersions(ctx context.Context, versions []*internal.Inde
})
}
+// UpsertVersionState inserts or updates the module_version_state table with
+// the results of a fetch operation for a given module version.
+func (db *DB) UpsertVersionState(ctx context.Context, modulePath, version, appVersion string, timestamp time.Time, status int, fetchErr error) error {
+ ctx, span := trace.StartSpan(ctx, "UpsertVersionState")
+ defer span.End()
+ query := `
+ INSERT INTO module_version_states AS mvs (module_path, version, app_version, index_timestamp, status, error)
+ VALUES ($1, $2, $3, $4, $5, $6)
+ ON CONFLICT (module_path, version) DO UPDATE
+ SET
+ app_version=excluded.app_version,
+ status=excluded.status,
+ error=excluded.error,
+ try_count=mvs.try_count+1,
+ last_processed_at=CURRENT_TIMESTAMP,
+ next_processed_after=CASE
+ WHEN mvs.last_processed_at IS NULL THEN
+ CURRENT_TIMESTAMP + INTERVAL '1 minute'
+ WHEN 2*(mvs.next_processed_after - mvs.last_processed_at) < INTERVAL '1 hour' THEN
+ CURRENT_TIMESTAMP + 2*(mvs.next_processed_after - mvs.last_processed_at)
+ ELSE
+ CURRENT_TIMESTAMP + INTERVAL '1 hour'
+ END;`
+
+ var sqlErrorMsg sql.NullString
+ if fetchErr != nil {
+ sqlErrorMsg = sql.NullString{Valid: true, String: fetchErr.Error()}
+ }
+ result, err := db.ExecContext(ctx, query, modulePath, version, appVersion, timestamp, status, sqlErrorMsg)
+ if err != nil {
+ return fmt.Errorf("db.ExecContext(ctx, %q, %q, %q, %q, %q, %q, %v): %v", query, modulePath, version, appVersion, timestamp, status, sqlErrorMsg, err)
+ }
+ affected, err := result.RowsAffected()
+ if err != nil {
+ return fmt.Errorf("result.RowsAffected(): %v", err)
+ }
+ if affected != 1 {
+ return fmt.Errorf("version state update affected %d rows, expected exactly 1", affected)
+ }
+ return nil
+}
+
// LatestIndexTimestamp returns the last timestamp successfully inserted into
// the module_version_states table.
func (db *DB) LatestIndexTimestamp(ctx context.Context) (time.Time, error) {
@@ -55,6 +98,27 @@ func (db *DB) LatestIndexTimestamp(ctx context.Context) (time.Time, error) {
}
}
+func (db *DB) UpdateVersionStatesForReprocessing(ctx context.Context, appVersion string) error {
+ query := `
+ UPDATE module_version_states
+ SET
+ status = 505,
+ next_processed_after = CURRENT_TIMESTAMP,
+ last_processed_at = NULL
+ WHERE
+ app_version <= $1;`
+ result, err := db.ExecContext(ctx, query, appVersion)
+ if err != nil {
+ return fmt.Errorf("db.ExecContext(ctx, %q, %q): %v", query, appVersion, err)
+ }
+ affected, err := result.RowsAffected()
+ if err != nil {
+ return fmt.Errorf("result.RowsAffected(): %v", err)
+ }
+ log.Printf("Updated %d module version states to be reprocessed for app_version <= %q", affected, appVersion)
+ return nil
+}
+
const versionStateColumns = `
module_path,
version,
@@ -64,13 +128,14 @@ const versionStateColumns = `
error,
try_count,
last_processed_at,
- next_processed_after`
+ next_processed_after,
+ app_version`
// scanVersionState constructs an *internal.VersionState from the given
// scanner. It expects columns to be in the order of versionStateColumns.
func scanVersionState(scan func(dest ...interface{}) error) (*internal.VersionState, error) {
var (
- modulePath, version string
+ modulePath, version, appVersion string
indexTimestamp, createdAt, nextProcessedAfter time.Time
lastProcessedAt pq.NullTime
status sql.NullInt64
@@ -78,7 +143,7 @@ func scanVersionState(scan func(dest ...interface{}) error) (*internal.VersionSt
tryCount int
)
if err := scan(&modulePath, &version, &indexTimestamp, &createdAt, &status, &errorMsg,
- &tryCount, &lastProcessedAt, &nextProcessedAfter); err != nil {
+ &tryCount, &lastProcessedAt, &nextProcessedAfter, &appVersion); err != nil {
return nil, err
}
v := &internal.VersionState{
@@ -88,6 +153,7 @@ func scanVersionState(scan func(dest ...interface{}) error) (*internal.VersionSt
CreatedAt: createdAt,
TryCount: tryCount,
NextProcessedAfter: nextProcessedAfter,
+ AppVersion: appVersion,
}
if status.Valid {
s := int(status.Int64)
@@ -166,47 +232,6 @@ func (db *DB) GetRecentVersions(ctx context.Context, limit int) ([]*internal.Ver
return db.queryVersionStates(ctx, queryFormat, limit)
}
-// UpsertVersionState inserts or updates the module_version_state table with
-// the results of a fetch operation for a given module version.
-func (db *DB) UpsertVersionState(ctx context.Context, modulePath, version string, timestamp time.Time, status int, fetchErr error) error {
- ctx, span := trace.StartSpan(ctx, "UpsertVersionState")
- defer span.End()
- query := `
- INSERT INTO module_version_states AS mvs (module_path, version, index_timestamp, status, error)
- VALUES ($1, $2, $3, $4, $5)
- ON CONFLICT (module_path, version) DO UPDATE
- SET
- status=excluded.status,
- error=excluded.error,
- try_count=mvs.try_count+1,
- last_processed_at=CURRENT_TIMESTAMP,
- next_processed_after=CASE
- WHEN mvs.last_processed_at IS NULL THEN
- CURRENT_TIMESTAMP + INTERVAL '1 minute'
- WHEN 2*(mvs.next_processed_after - mvs.last_processed_at) < INTERVAL '1 hour' THEN
- CURRENT_TIMESTAMP + 2*(mvs.next_processed_after - mvs.last_processed_at)
- ELSE
- CURRENT_TIMESTAMP + INTERVAL '1 hour'
- END;`
-
- var sqlErrorMsg sql.NullString
- if fetchErr != nil {
- sqlErrorMsg = sql.NullString{Valid: true, String: fetchErr.Error()}
- }
- result, err := db.ExecContext(ctx, query, modulePath, version, timestamp, status, sqlErrorMsg)
- if err != nil {
- return fmt.Errorf("db.ExecContext(ctx, %q, %q, %q, %q, %q, %v): %v", query, modulePath, version, timestamp, status, sqlErrorMsg, err)
- }
- affected, err := result.RowsAffected()
- if err != nil {
- return fmt.Errorf("result.RowsAffected(): %v", err)
- }
- if affected != 1 {
- return fmt.Errorf("version state update affected %d rows, expected exactly 1", affected)
- }
- return nil
-}
-
// GetVersionState returns the current version state for modulePath and
// version.
func (db *DB) GetVersionState(ctx context.Context, modulePath, version string) (*internal.VersionState, error) {
diff --git a/internal/postgres/versionstate_test.go b/internal/postgres/versionstate_test.go
index 034ca6a0..3e265296 100644
--- a/internal/postgres/versionstate_test.go
+++ b/internal/postgres/versionstate_test.go
@@ -7,6 +7,7 @@ package postgres
import (
"context"
"errors"
+ "net/http"
"testing"
"time"
@@ -75,9 +76,9 @@ func TestVersionState(t *testing.T) {
statusCode = 500
fetchErr = errors.New("bad request")
)
- if err := testDB.UpsertVersionState(ctx, fooVersion.Path, fooVersion.Version, fooVersion.Timestamp, statusCode, fetchErr); err != nil {
- t.Fatalf("testDB.UpsertVersionState(ctx, %q, %q, %d, %v): %v", fooVersion.Path,
- versions[0].Version, statusCode, fetchErr, err)
+ if err := testDB.UpsertVersionState(ctx, fooVersion.Path, fooVersion.Version, "", fooVersion.Timestamp, statusCode, fetchErr); err != nil {
+ t.Fatalf("testDB.UpsertVersionState(ctx, %q, %q, %q, %d, %v): %v", fooVersion.Path,
+ versions[0].Version, "", statusCode, fetchErr, err)
}
errString := fetchErr.Error()
wantFooState := &internal.VersionState{
@@ -112,3 +113,53 @@ func TestVersionState(t *testing.T) {
t.Errorf("testDB.GetVersionStats(ctx) mismatch (-want +got):\n%s", diff)
}
}
+
+func TestUpdateVersionStatesForReprocessing(t *testing.T) {
+ defer ResetTestDB(testDB, t)
+ ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
+ defer cancel()
+
+ now := sample.NowTruncated()
+ for _, v := range []*internal.IndexVersion{
+ &internal.IndexVersion{
+ Path: "foo.com/bar",
+ Version: "v1.0.0",
+ Timestamp: now,
+ },
+ &internal.IndexVersion{
+ Path: "baz.com/quux",
+ Version: "v2.0.1",
+ Timestamp: now,
+ },
+ } {
+ if err := testDB.UpsertVersionState(ctx, v.Path, v.Version, "", v.Timestamp, http.StatusOK, nil); err != nil {
+ t.Fatalf("testDB.UpsertVersionState(ctx, %q, %q, %v, %d, nil): %v", v.Path, v.Version, v.Timestamp, http.StatusOK, err)
+ }
+ }
+
+ gotVersions, err := testDB.GetNextVersionsToFetch(ctx, 10)
+ if err != nil {
+ t.Fatalf("testDB.GetVersionsToFetch(ctx, 10): %v", err)
+ }
+ if len(gotVersions) != 0 {
+ t.Fatalf("testDB.GetVersionsToFetch(ctx, 10) = %v; wanted 0 versions", gotVersions)
+ }
+ if err := testDB.UpdateVersionStatesForReprocessing(ctx, "20190709t112655"); err != nil {
+ t.Fatalf("testDB.UpdateVersionStatesForReprocessing(ctx, 20190709t112655): %v", err)
+ }
+
+ gotVersions, err = testDB.GetNextVersionsToFetch(ctx, 10)
+ if err != nil {
+ t.Fatalf("testDB.GetVersionsToFetch(ctx, 10): %v", err)
+ }
+
+ code := http.StatusHTTPVersionNotSupported
+ wantVersions := []*internal.VersionState{
+ {ModulePath: "foo.com/bar", Version: "v1.0.0", IndexTimestamp: now, Status: &code},
+ {ModulePath: "baz.com/quux", Version: "v2.0.1", IndexTimestamp: now, Status: &code},
+ }
+ ignore := cmpopts.IgnoreFields(internal.VersionState{}, "CreatedAt", "LastProcessedAt", "NextProcessedAfter")
+ if diff := cmp.Diff(wantVersions, gotVersions, ignore); diff != "" {
+ t.Fatalf("testDB.GetVersionsToFetch(ctx, 10) mismatch (-want +got):\n%s", diff)
+ }
+}