aboutsummaryrefslogtreecommitdiff
path: root/internal/postgres/versionstate.go
diff options
context:
space:
mode:
authorJulie Qiu <julie@golang.org>2019-07-11 19:16:38 -0400
committerJulie Qiu <julie@golang.org>2020-03-27 16:46:39 -0400
commitf1d57da5e2392e7511a3d2f5a3fd5664948b3efc (patch)
tree8993c8b237e0166b534d808ab31e4fa18395cc5d /internal/postgres/versionstate.go
parent4fa5b50006d3020c2bccb93e331759b9b57e260c (diff)
downloadgo-x-pkgsite-f1d57da5e2392e7511a3d2f5a3fd5664948b3efc.tar.xz
internal: add reprocess handler for outdated versions
A repocess handler is added to allow for reprocessing outdated module versions. The endpoint /reprocess?app_version=<app_version> will mark versions last processed before the specified app_version with HTTP Status 505. These versions will then be requeued using the existing /requeue handler. Existing references to "cron" are also updated to "etl". Update b/133518045 Change-Id: I12f0822340d0a3a30c2769a52fea2879409a669b Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/505099 Reviewed-by: Robert Findley <rfindley@google.com>
Diffstat (limited to 'internal/postgres/versionstate.go')
-rw-r--r--internal/postgres/versionstate.go113
1 files changed, 69 insertions, 44 deletions
diff --git a/internal/postgres/versionstate.go b/internal/postgres/versionstate.go
index c8ada28c..503b6a96 100644
--- a/internal/postgres/versionstate.go
+++ b/internal/postgres/versionstate.go
@@ -8,6 +8,7 @@ import (
"context"
"database/sql"
"fmt"
+ "log"
"time"
"github.com/lib/pq"
@@ -35,6 +36,48 @@ func (db *DB) InsertIndexVersions(ctx context.Context, versions []*internal.Inde
})
}
+// UpsertVersionState inserts or updates the module_version_state table with
+// the results of a fetch operation for a given module version.
+func (db *DB) UpsertVersionState(ctx context.Context, modulePath, version, appVersion string, timestamp time.Time, status int, fetchErr error) error {
+ ctx, span := trace.StartSpan(ctx, "UpsertVersionState")
+ defer span.End()
+ query := `
+ INSERT INTO module_version_states AS mvs (module_path, version, app_version, index_timestamp, status, error)
+ VALUES ($1, $2, $3, $4, $5, $6)
+ ON CONFLICT (module_path, version) DO UPDATE
+ SET
+ app_version=excluded.app_version,
+ status=excluded.status,
+ error=excluded.error,
+ try_count=mvs.try_count+1,
+ last_processed_at=CURRENT_TIMESTAMP,
+ next_processed_after=CASE
+ WHEN mvs.last_processed_at IS NULL THEN
+ CURRENT_TIMESTAMP + INTERVAL '1 minute'
+ WHEN 2*(mvs.next_processed_after - mvs.last_processed_at) < INTERVAL '1 hour' THEN
+ CURRENT_TIMESTAMP + 2*(mvs.next_processed_after - mvs.last_processed_at)
+ ELSE
+ CURRENT_TIMESTAMP + INTERVAL '1 hour'
+ END;`
+
+ var sqlErrorMsg sql.NullString
+ if fetchErr != nil {
+ sqlErrorMsg = sql.NullString{Valid: true, String: fetchErr.Error()}
+ }
+ result, err := db.ExecContext(ctx, query, modulePath, version, appVersion, timestamp, status, sqlErrorMsg)
+ if err != nil {
+ return fmt.Errorf("db.ExecContext(ctx, %q, %q, %q, %q, %q, %q, %v): %v", query, modulePath, version, appVersion, timestamp, status, sqlErrorMsg, err)
+ }
+ affected, err := result.RowsAffected()
+ if err != nil {
+ return fmt.Errorf("result.RowsAffected(): %v", err)
+ }
+ if affected != 1 {
+ return fmt.Errorf("version state update affected %d rows, expected exactly 1", affected)
+ }
+ return nil
+}
+
// LatestIndexTimestamp returns the last timestamp successfully inserted into
// the module_version_states table.
func (db *DB) LatestIndexTimestamp(ctx context.Context) (time.Time, error) {
@@ -55,6 +98,27 @@ func (db *DB) LatestIndexTimestamp(ctx context.Context) (time.Time, error) {
}
}
+func (db *DB) UpdateVersionStatesForReprocessing(ctx context.Context, appVersion string) error {
+ query := `
+ UPDATE module_version_states
+ SET
+ status = 505,
+ next_processed_after = CURRENT_TIMESTAMP,
+ last_processed_at = NULL
+ WHERE
+ app_version <= $1;`
+ result, err := db.ExecContext(ctx, query, appVersion)
+ if err != nil {
+ return fmt.Errorf("db.ExecContext(ctx, %q, %q): %v", query, appVersion, err)
+ }
+ affected, err := result.RowsAffected()
+ if err != nil {
+ return fmt.Errorf("result.RowsAffected(): %v", err)
+ }
+ log.Printf("Updated %d module version states to be reprocessed for app_version <= %q", affected, appVersion)
+ return nil
+}
+
const versionStateColumns = `
module_path,
version,
@@ -64,13 +128,14 @@ const versionStateColumns = `
error,
try_count,
last_processed_at,
- next_processed_after`
+ next_processed_after,
+ app_version`
// scanVersionState constructs an *internal.VersionState from the given
// scanner. It expects columns to be in the order of versionStateColumns.
func scanVersionState(scan func(dest ...interface{}) error) (*internal.VersionState, error) {
var (
- modulePath, version string
+ modulePath, version, appVersion string
indexTimestamp, createdAt, nextProcessedAfter time.Time
lastProcessedAt pq.NullTime
status sql.NullInt64
@@ -78,7 +143,7 @@ func scanVersionState(scan func(dest ...interface{}) error) (*internal.VersionSt
tryCount int
)
if err := scan(&modulePath, &version, &indexTimestamp, &createdAt, &status, &errorMsg,
- &tryCount, &lastProcessedAt, &nextProcessedAfter); err != nil {
+ &tryCount, &lastProcessedAt, &nextProcessedAfter, &appVersion); err != nil {
return nil, err
}
v := &internal.VersionState{
@@ -88,6 +153,7 @@ func scanVersionState(scan func(dest ...interface{}) error) (*internal.VersionSt
CreatedAt: createdAt,
TryCount: tryCount,
NextProcessedAfter: nextProcessedAfter,
+ AppVersion: appVersion,
}
if status.Valid {
s := int(status.Int64)
@@ -166,47 +232,6 @@ func (db *DB) GetRecentVersions(ctx context.Context, limit int) ([]*internal.Ver
return db.queryVersionStates(ctx, queryFormat, limit)
}
-// UpsertVersionState inserts or updates the module_version_state table with
-// the results of a fetch operation for a given module version.
-func (db *DB) UpsertVersionState(ctx context.Context, modulePath, version string, timestamp time.Time, status int, fetchErr error) error {
- ctx, span := trace.StartSpan(ctx, "UpsertVersionState")
- defer span.End()
- query := `
- INSERT INTO module_version_states AS mvs (module_path, version, index_timestamp, status, error)
- VALUES ($1, $2, $3, $4, $5)
- ON CONFLICT (module_path, version) DO UPDATE
- SET
- status=excluded.status,
- error=excluded.error,
- try_count=mvs.try_count+1,
- last_processed_at=CURRENT_TIMESTAMP,
- next_processed_after=CASE
- WHEN mvs.last_processed_at IS NULL THEN
- CURRENT_TIMESTAMP + INTERVAL '1 minute'
- WHEN 2*(mvs.next_processed_after - mvs.last_processed_at) < INTERVAL '1 hour' THEN
- CURRENT_TIMESTAMP + 2*(mvs.next_processed_after - mvs.last_processed_at)
- ELSE
- CURRENT_TIMESTAMP + INTERVAL '1 hour'
- END;`
-
- var sqlErrorMsg sql.NullString
- if fetchErr != nil {
- sqlErrorMsg = sql.NullString{Valid: true, String: fetchErr.Error()}
- }
- result, err := db.ExecContext(ctx, query, modulePath, version, timestamp, status, sqlErrorMsg)
- if err != nil {
- return fmt.Errorf("db.ExecContext(ctx, %q, %q, %q, %q, %q, %v): %v", query, modulePath, version, timestamp, status, sqlErrorMsg, err)
- }
- affected, err := result.RowsAffected()
- if err != nil {
- return fmt.Errorf("result.RowsAffected(): %v", err)
- }
- if affected != 1 {
- return fmt.Errorf("version state update affected %d rows, expected exactly 1", affected)
- }
- return nil
-}
-
// GetVersionState returns the current version state for modulePath and
// version.
func (db *DB) GetVersionState(ctx context.Context, modulePath, version string) (*internal.VersionState, error) {