diff options
| author | Julie Qiu <julie@golang.org> | 2019-04-22 00:32:53 -0400 |
|---|---|---|
| committer | Julie Qiu <julie@golang.org> | 2020-03-27 16:46:36 -0400 |
| commit | d88ea7bb53ea92d4336cd6e90fdc90f0a4f72397 (patch) | |
| tree | 019c0cd8358ce2383327ac7846c4018dd208b46d /internal/postgres | |
| parent | cea7bbca7f632a05ad36a2c536f366e9d4eede85 (diff) | |
| download | go-x-pkgsite-d88ea7bb53ea92d4336cd6e90fdc90f0a4f72397.tar.xz | |
cmd/cron,internal/postgres: retry dropped versions
At the moment, there isn't a way to retry versions that have been
inserted into the version log if they are dropped after the
makeNewVersionsTimeout.
A /retry endpoint is added to the proxy index cron, which gets entries
from the version logs that are not present in the versions table and
have not failed, and retries fetching them.
The fetch service will now write errors to the error log.
This job is a temporary solution for populating the database, while the
go/go-discovery-etl design is being implemented.
Change-Id: I389c537068e05341ac79aac734d6df5766a1f278
Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/452776
Reviewed-by: Robert Findley <rfindley@google.com>
Diffstat (limited to 'internal/postgres')
| -rw-r--r-- | internal/postgres/postgres.go | 46 | ||||
| -rw-r--r-- | internal/postgres/postgres_test.go | 157 |
2 files changed, 164 insertions, 39 deletions
diff --git a/internal/postgres/postgres.go b/internal/postgres/postgres.go index 30ea5383..817f7e4e 100644 --- a/internal/postgres/postgres.go +++ b/internal/postgres/postgres.go @@ -179,6 +179,52 @@ func (db *DB) InsertVersionLogs(ctx context.Context, logs []*internal.VersionLog }) } +// UpdateVersionLogError updates the version_log row for modulePath and version with fetchErr. +func (db *DB) UpdateVersionLogError(ctx context.Context, modulePath, version string, fetchErr error) error { + query := "UPDATE version_logs SET error=$1 WHERE module_path=$2 AND version=$3" + + if _, err := db.ExecContext(ctx, query, fetchErr.Error(), modulePath, version); err != nil { + return fmt.Errorf("tx.ExecContext(ctx %q, %q, %q, %q): %v", query, fetchErr.Error(), modulePath, version, err) + } + return nil +} + +// GetVersionsToRetry fetches all rows from the version_logs table that do not +// have an error and do not exist in the versions table. +func (db *DB) GetVersionsToRetry(ctx context.Context) ([]*internal.VersionLog, error) { + query := ` + SELECT + vl.module_path, + vl.version + FROM + version_logs vl + LEFT JOIN + versions v + ON + v.module_path=vl.module_path + AND v.version=vl.version + WHERE + v.version IS NULL + AND (vl.error IS NULL OR vl.error = ''); + ` + rows, err := db.QueryContext(ctx, query) + if err != nil { + return nil, fmt.Errorf("db.QueryContext(ctx, %q): %v", query, err) + } + + var ( + version, modulePath string + logs []*internal.VersionLog + ) + for rows.Next() { + if err := rows.Scan(&modulePath, &version); err != nil { + return nil, fmt.Errorf("row.Scan(): %v", err) + } + logs = append(logs, &internal.VersionLog{ModulePath: modulePath, Version: version}) + } + return logs, nil +} + func zipLicenseInfo(licenseTypes []string, licensePaths []string) ([]*internal.LicenseInfo, error) { if len(licenseTypes) != len(licensePaths) { return nil, fmt.Errorf("BUG: got %d license types and %d license paths", len(licenseTypes), len(licensePaths)) diff --git a/internal/postgres/postgres_test.go b/internal/postgres/postgres_test.go index be1f7d2f..830ec24a 100644 --- a/internal/postgres/postgres_test.go +++ b/internal/postgres/postgres_test.go @@ -7,6 +7,7 @@ package postgres import ( "context" "database/sql" + "errors" "fmt" "sort" "testing" @@ -158,19 +159,19 @@ func TestBulkInsert(t *testing.T) { PRIMARY KEY (colA) );`, table) if _, err := testDB.ExecContext(ctx, createQuery); err != nil { - t.Fatalf("db.ExecContext(ctx, %q): %v", createQuery, err) + t.Fatalf("testDB.ExecContext(ctx, %q): %v", createQuery, err) } defer func() { dropTableQuery := fmt.Sprintf("DROP TABLE %s;", table) if _, err := testDB.ExecContext(ctx, dropTableQuery); err != nil { - t.Fatalf("db.ExecContext(ctx, %q): %v", dropTableQuery, err) + t.Fatalf("testDB.ExecContext(ctx, %q): %v", dropTableQuery, err) } }() if err := testDB.Transact(func(tx *sql.Tx) error { return bulkInsert(ctx, tx, table, tc.columns, tc.values, tc.conflictNoAction) }); tc.wantErr && err == nil || !tc.wantErr && err != nil { - t.Errorf("db.Transact: %v | wantErr = %t", err, tc.wantErr) + t.Errorf("testDB.Transact: %v | wantErr = %t", err, tc.wantErr) } if tc.wantCount != 0 { @@ -179,16 +180,79 @@ func TestBulkInsert(t *testing.T) { row := testDB.QueryRow(query) err := row.Scan(&count) if err != nil { - t.Fatalf("db.QueryRow(%q): %v", query, err) + t.Fatalf("testDB.QueryRow(%q): %v", query, err) } if count != tc.wantCount { - t.Errorf("db.QueryRow(%q) = %d; want = %d", query, count, tc.wantCount) + t.Errorf("testDB.QueryRow(%q) = %d; want = %d", query, count, tc.wantCount) } } }) } } +func TestGetVersionsToRetry(t *testing.T) { + for _, tc := range []struct { + name string + versions []*internal.Version + logs, wantDropped []*internal.VersionLog + }{ + { + name: "insert version logs fo.o and ba.r and drop ba.r", + versions: []*internal.Version{ + sampleVersion(func(v *internal.Version) { + v.ModulePath = "fo.o" + v.Version = "v1.0.0" + }), + }, + logs: []*internal.VersionLog{ + { + ModulePath: "fo.o", + Version: "v1.0.0", + CreatedAt: NowTruncated(), + Source: internal.VersionSourceProxyIndex, + }, + { + ModulePath: "ba.r", + Version: "v1.0.0", + CreatedAt: NowTruncated(), + Source: internal.VersionSourceProxyIndex, + }, + }, + wantDropped: []*internal.VersionLog{ + { + ModulePath: "ba.r", + Version: "v1.0.0", + }, + }, + }, + } { + + t.Run(tc.name, func(t *testing.T) { + defer ResetTestDB(testDB, t) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + for _, v := range tc.versions { + if err := testDB.InsertVersion(ctx, v, sampleLicenses); err != nil { + t.Errorf("testDB.InsertVersion(ctx, %+v): %v", v, err) + } + } + if err := testDB.InsertVersionLogs(ctx, tc.logs); err != nil { + t.Errorf("testDB.InsertVersionLogs(ctx, %+v) error: %v", tc.logs, err) + } + + got, err := testDB.GetVersionsToRetry(ctx) + if err != nil { + t.Errorf("testDB.GetVersionsToRetry(ctx): %v", err) + } + if diff := cmp.Diff(tc.wantDropped, got); diff != "" { + t.Errorf("testDB.GetVersionsToRetry(ctx) mismatch (-want +got):\n%s", diff) + } + }) + } +} + func TestPostgres_ReadAndWriteVersionAndPackages(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() @@ -271,51 +335,51 @@ func TestPostgres_ReadAndWriteVersionAndPackages(t *testing.T) { defer ResetTestDB(testDB, t) if err := testDB.InsertVersion(ctx, tc.version, sampleLicenses); derrors.Type(err) != tc.wantWriteErrType { - t.Errorf("db.InsertVersion(ctx, %+v) error: %v, want write error: %v", tc.version, err, tc.wantWriteErrType) + t.Errorf("testDB.InsertVersion(ctx, %+v) error: %v, want write error: %v", tc.version, err, tc.wantWriteErrType) } // Test that insertion of duplicate primary key won't fail. if err := testDB.InsertVersion(ctx, tc.version, sampleLicenses); derrors.Type(err) != tc.wantWriteErrType { - t.Errorf("db.InsertVersion(ctx, %+v) second insert error: %v, want write error: %v", tc.version, err, tc.wantWriteErrType) + t.Errorf("testDB.InsertVersion(ctx, %+v) second insert error: %v, want write error: %v", tc.version, err, tc.wantWriteErrType) } got, err := testDB.GetVersion(ctx, tc.getModule, tc.getVersion) if tc.wantReadErr != (err != nil) { - t.Fatalf("db.GetVersion(ctx, %q, %q) error: %v, want read error: %t", tc.getModule, tc.getVersion, err, tc.wantReadErr) + t.Fatalf("testDB.GetVersion(ctx, %q, %q) error: %v, want read error: %t", tc.getModule, tc.getVersion, err, tc.wantReadErr) } if !tc.wantReadErr && got == nil { - t.Fatalf("db.GetVersion(ctx, %q, %q) = %v, want %v", tc.getModule, tc.getVersion, got, tc.version) + t.Fatalf("testDB.GetVersion(ctx, %q, %q) = %v, want %v", tc.getModule, tc.getVersion, got, tc.version) } if tc.version != nil { if diff := cmp.Diff(&tc.version.VersionInfo, got); !tc.wantReadErr && diff != "" { - t.Errorf("db.GetVersion(ctx, %q, %q) mismatch (-want +got):\n%s", tc.getModule, tc.getVersion, diff) + t.Errorf("testDB.GetVersion(ctx, %q, %q) mismatch (-want +got):\n%s", tc.getModule, tc.getVersion, diff) } } gotPkg, err := testDB.GetPackage(ctx, tc.getPkg, tc.getVersion) if tc.version == nil || tc.version.Packages == nil || tc.getPkg == "" { if tc.wantReadErr != (err != nil) { - t.Fatalf("db.GetPackage(ctx, %q, %q) = %v, want %v", tc.getPkg, tc.getVersion, err, sql.ErrNoRows) + t.Fatalf("testDB.GetPackage(ctx, %q, %q) = %v, want %v", tc.getPkg, tc.getVersion, err, sql.ErrNoRows) } return } if err != nil { - t.Errorf("db.GetPackage(ctx, %q, %q): %v", tc.getPkg, tc.getVersion, err) + t.Errorf("testDB.GetPackage(ctx, %q, %q): %v", tc.getPkg, tc.getVersion, err) } wantPkg := tc.version.Packages[0] if err != nil { - t.Fatalf("db.GetPackage(ctx, %q, %q) = %v, want %v", tc.getPkg, tc.getVersion, gotPkg, wantPkg) + t.Fatalf("testDB.GetPackage(ctx, %q, %q) = %v, want %v", tc.getPkg, tc.getVersion, gotPkg, wantPkg) } if gotPkg.VersionInfo.Version != tc.version.Version { - t.Errorf("db.GetPackage(ctx, %q, %q) version.version = %v, want %v", tc.getPkg, tc.getVersion, gotPkg.VersionInfo.Version, tc.version.Version) + t.Errorf("testDB.GetPackage(ctx, %q, %q) version.version = %v, want %v", tc.getPkg, tc.getVersion, gotPkg.VersionInfo.Version, tc.version.Version) } if diff := cmp.Diff(wantPkg, &gotPkg.Package, cmpopts.IgnoreFields(internal.Package{}, "Imports")); diff != "" { - t.Errorf("db.GetPackage(%q, %q) Package mismatch (-want +got):\n%s", tc.getPkg, tc.getVersion, diff) + t.Errorf("testDB.GetPackage(%q, %q) Package mismatch (-want +got):\n%s", tc.getPkg, tc.getVersion, diff) } }) } @@ -390,17 +454,17 @@ func TestPostgres_GetLatestPackage(t *testing.T) { t.Run(tc.name, func(t *testing.T) { for _, v := range tc.versions { if err := testDB.InsertVersion(ctx, v, sampleLicenses); err != nil { - t.Errorf("db.InsertVersion(ctx, %v): %v", v, err) + t.Errorf("testDB.InsertVersion(ctx, %v): %v", v, err) } } gotPkg, err := testDB.GetLatestPackage(ctx, tc.path) if (err != nil) != tc.wantReadErr { - t.Errorf("db.GetLatestPackage(ctx, %q): %v", tc.path, err) + t.Errorf("testDB.GetLatestPackage(ctx, %q): %v", tc.path, err) } if diff := cmp.Diff(gotPkg, tc.wantPkg); diff != "" { - t.Errorf("db.GetLatestPackage(ctx, %q) mismatch (-got +want):\n%s", + t.Errorf("testDB.GetLatestPackage(ctx, %q) mismatch (-got +want):\n%s", tc.path, diff) } }) @@ -514,13 +578,13 @@ func TestPostgres_GetImportsAndImportedBy(t *testing.T) { for _, v := range testVersions { if err := testDB.InsertVersion(ctx, v, sampleLicenses); err != nil { - t.Errorf("db.InsertVersion(%v): %v", v, err) + t.Errorf("testDB.InsertVersion(%v): %v", v, err) } } got, err := testDB.GetImports(ctx, tc.path, tc.version) if err != nil { - t.Fatalf("db.GetImports(%q, %q): %v", tc.path, tc.version, err) + t.Fatalf("testDB.GetImports(%q, %q): %v", tc.path, tc.version, err) } sort.Slice(got, func(i, j int) bool { @@ -530,27 +594,22 @@ func TestPostgres_GetImportsAndImportedBy(t *testing.T) { return tc.wantImports[i].Name > tc.wantImports[j].Name }) if diff := cmp.Diff(tc.wantImports, got); diff != "" { - t.Errorf("db.GetImports(%q, %q) mismatch (-want +got):\n%s", tc.path, tc.version, diff) + t.Errorf("testDB.GetImports(%q, %q) mismatch (-want +got):\n%s", tc.path, tc.version, diff) } gotImportedBy, err := testDB.GetImportedBy(ctx, tc.path) if err != nil { - t.Fatalf("db.GetImports(%q, %q): %v", tc.path, tc.version, err) + t.Fatalf("testDB.GetImports(%q, %q): %v", tc.path, tc.version, err) } if diff := cmp.Diff(tc.wantImportedBy, gotImportedBy); diff != "" { - t.Errorf("db.GetImportedBy(%q, %q) mismatch (-want +got):\n%s", tc.path, tc.version, diff) + t.Errorf("testDB.GetImportedBy(%q, %q) mismatch (-want +got):\n%s", tc.path, tc.version, diff) } }) } } -func TestPostgress_InsertVersionLogs(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), testTimeout) - defer cancel() - - defer ResetTestDB(testDB, t) - +func TestInsertAndUpdateVersionLogs(t *testing.T) { now := NowTruncated().UTC() newVersions := []*internal.VersionLog{ &internal.VersionLog{ @@ -573,19 +632,39 @@ func TestPostgress_InsertVersionLogs(t *testing.T) { }, } + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + defer ResetTestDB(testDB, t) + if err := testDB.InsertVersionLogs(ctx, newVersions); err != nil { - t.Errorf("db.InsertVersionLogs(ctx, newVersions) error: %v", err) + t.Errorf("testDB.InsertVersionLogs(ctx, newVersions) error: %v", err) } dbTime, err := testDB.LatestProxyIndexUpdate(ctx) if err != nil { - t.Errorf("db.LatestProxyIndexUpdate error: %v", err) + t.Errorf("testDB.LatestProxyIndexUpdate error: %v", err) } // Since now is already truncated to Microsecond precision, we should get // back the exact same time. if !dbTime.Equal(now) { - t.Errorf("db.LatestProxyIndexUpdate(ctx) = %v, want %v", dbTime, now) + t.Errorf("testDB.LatestProxyIndexUpdate(ctx) = %v, want %v", dbTime, now) + } + + for _, v := range newVersions { + if err := testDB.UpdateVersionLogError(ctx, v.ModulePath, v.Version, errors.New("test error")); err != nil { + t.Errorf("testDB.UpdateVersionLogError(ctx, %q, %q, errors.New(test error)): %v", v.ModulePath, v.Version, err) + } + } + + var count int + row := testDB.QueryRowContext(ctx, `SELECT COUNT(*) FROM version_logs WHERE error='test error';`) + if err := row.Scan(&count); err != nil { + t.Fatalf("row.Scan(&count): %v", err) + } + if count != len(newVersions) { + t.Errorf("Number of rows with error='test error' = %d, want = %d", count, len(newVersions)) } } @@ -901,7 +980,7 @@ func TestPostgres_GetTaggedAndPseudoVersionsForPackageSeries(t *testing.T) { for i, v := range got { if diff := cmp.Diff(tc.wantTaggedVersions[i], v); diff != "" { - t.Errorf("db.GetTaggedVersionsForPackageSeries(%q) mismatch (-want +got):\n%s", tc.path, diff) + t.Errorf("testDB.GetTaggedVersionsForPackageSeries(%q) mismatch (-want +got):\n%s", tc.path, diff) } } }) @@ -1007,15 +1086,15 @@ func TestGetVersionForPackage(t *testing.T) { defer cancel() if err := testDB.InsertVersion(ctx, tc.wantVersion, sampleLicenses); err != nil { - t.Errorf("db.InsertVersion(ctx, %q %q): %v", tc.path, tc.version, err) + t.Errorf("testDB.InsertVersion(ctx, %q %q): %v", tc.path, tc.version, err) } got, err := testDB.GetVersionForPackage(ctx, tc.path, tc.version) if err != nil { - t.Errorf("db.GetVersionForPackage(ctx, %q, %q): %v", tc.path, tc.version, err) + t.Errorf("testDB.GetVersionForPackage(ctx, %q, %q): %v", tc.path, tc.version, err) } if diff := cmp.Diff(tc.wantVersion, got); diff != "" { - t.Errorf("db.GetVersionForPackage(ctx, %q, %q) mismatch (-want +got):\n%s", tc.path, tc.version, diff) + t.Errorf("testDB.GetVersionForPackage(ctx, %q, %q) mismatch (-want +got):\n%s", tc.path, tc.version, diff) } }) } @@ -1071,17 +1150,17 @@ func TestGetLicenses(t *testing.T) { defer cancel() if err := testDB.InsertVersion(ctx, testVersion, sampleLicenses); err != nil { - t.Errorf("db.InsertVersion(ctx, %q, licenses): %v", testVersion.Version, err) + t.Errorf("testDB.InsertVersion(ctx, %q, licenses): %v", testVersion.Version, err) } for _, test := range tests { t.Run(test.label, func(t *testing.T) { got, err := testDB.GetLicenses(ctx, test.pkgPath, testVersion.Version) if err != nil { - t.Fatalf("db.GetLicenses(ctx, %q, %q): %v", test.pkgPath, testVersion.Version, err) + t.Fatalf("testDB.GetLicenses(ctx, %q, %q): %v", test.pkgPath, testVersion.Version, err) } if diff := cmp.Diff(got, test.wantLicenses); diff != "" { - t.Errorf("db.GetLicenses(ctx, %q, %q) mismatch (-got +want):\n%s", test.pkgPath, testVersion.Version, diff) + t.Errorf("testDB.GetLicenses(ctx, %q, %q) mismatch (-got +want):\n%s", test.pkgPath, testVersion.Version, diff) } }) } |
