From d88ea7bb53ea92d4336cd6e90fdc90f0a4f72397 Mon Sep 17 00:00:00 2001 From: Julie Qiu Date: Mon, 22 Apr 2019 00:32:53 -0400 Subject: cmd/cron,internal/postgres: retry dropped versions At the moment, there isn't a way to retry versions that have been inserted into the version log if they are dropped after the makeNewVersionsTimeout. A /retry endpoint is added to the proxy index cron, which gets entries from the version logs that are not present in the versions table and have not failed, and retries fetching them. The fetch service will now write errors to the error log. This job is a temporary solution for populating the database, while the go/go-discovery-etl design is being implemented. Change-Id: I389c537068e05341ac79aac734d6df5766a1f278 Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/452776 Reviewed-by: Robert Findley --- internal/postgres/postgres_test.go | 157 ++++++++++++++++++++++++++++--------- 1 file changed, 118 insertions(+), 39 deletions(-) (limited to 'internal/postgres/postgres_test.go') diff --git a/internal/postgres/postgres_test.go b/internal/postgres/postgres_test.go index be1f7d2f..830ec24a 100644 --- a/internal/postgres/postgres_test.go +++ b/internal/postgres/postgres_test.go @@ -7,6 +7,7 @@ package postgres import ( "context" "database/sql" + "errors" "fmt" "sort" "testing" @@ -158,19 +159,19 @@ func TestBulkInsert(t *testing.T) { PRIMARY KEY (colA) );`, table) if _, err := testDB.ExecContext(ctx, createQuery); err != nil { - t.Fatalf("db.ExecContext(ctx, %q): %v", createQuery, err) + t.Fatalf("testDB.ExecContext(ctx, %q): %v", createQuery, err) } defer func() { dropTableQuery := fmt.Sprintf("DROP TABLE %s;", table) if _, err := testDB.ExecContext(ctx, dropTableQuery); err != nil { - t.Fatalf("db.ExecContext(ctx, %q): %v", dropTableQuery, err) + t.Fatalf("testDB.ExecContext(ctx, %q): %v", dropTableQuery, err) } }() if err := testDB.Transact(func(tx *sql.Tx) error { return bulkInsert(ctx, tx, table, tc.columns, tc.values, tc.conflictNoAction) }); tc.wantErr && err == nil || !tc.wantErr && err != nil { - t.Errorf("db.Transact: %v | wantErr = %t", err, tc.wantErr) + t.Errorf("testDB.Transact: %v | wantErr = %t", err, tc.wantErr) } if tc.wantCount != 0 { @@ -179,16 +180,79 @@ func TestBulkInsert(t *testing.T) { row := testDB.QueryRow(query) err := row.Scan(&count) if err != nil { - t.Fatalf("db.QueryRow(%q): %v", query, err) + t.Fatalf("testDB.QueryRow(%q): %v", query, err) } if count != tc.wantCount { - t.Errorf("db.QueryRow(%q) = %d; want = %d", query, count, tc.wantCount) + t.Errorf("testDB.QueryRow(%q) = %d; want = %d", query, count, tc.wantCount) } } }) } } +func TestGetVersionsToRetry(t *testing.T) { + for _, tc := range []struct { + name string + versions []*internal.Version + logs, wantDropped []*internal.VersionLog + }{ + { + name: "insert version logs fo.o and ba.r and drop ba.r", + versions: []*internal.Version{ + sampleVersion(func(v *internal.Version) { + v.ModulePath = "fo.o" + v.Version = "v1.0.0" + }), + }, + logs: []*internal.VersionLog{ + { + ModulePath: "fo.o", + Version: "v1.0.0", + CreatedAt: NowTruncated(), + Source: internal.VersionSourceProxyIndex, + }, + { + ModulePath: "ba.r", + Version: "v1.0.0", + CreatedAt: NowTruncated(), + Source: internal.VersionSourceProxyIndex, + }, + }, + wantDropped: []*internal.VersionLog{ + { + ModulePath: "ba.r", + Version: "v1.0.0", + }, + }, + }, + } { + + t.Run(tc.name, func(t *testing.T) { + defer ResetTestDB(testDB, t) + + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + for _, v := range tc.versions { + if err := testDB.InsertVersion(ctx, v, sampleLicenses); err != nil { + t.Errorf("testDB.InsertVersion(ctx, %+v): %v", v, err) + } + } + if err := testDB.InsertVersionLogs(ctx, tc.logs); err != nil { + t.Errorf("testDB.InsertVersionLogs(ctx, %+v) error: %v", tc.logs, err) + } + + got, err := testDB.GetVersionsToRetry(ctx) + if err != nil { + t.Errorf("testDB.GetVersionsToRetry(ctx): %v", err) + } + if diff := cmp.Diff(tc.wantDropped, got); diff != "" { + t.Errorf("testDB.GetVersionsToRetry(ctx) mismatch (-want +got):\n%s", diff) + } + }) + } +} + func TestPostgres_ReadAndWriteVersionAndPackages(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() @@ -271,51 +335,51 @@ func TestPostgres_ReadAndWriteVersionAndPackages(t *testing.T) { defer ResetTestDB(testDB, t) if err := testDB.InsertVersion(ctx, tc.version, sampleLicenses); derrors.Type(err) != tc.wantWriteErrType { - t.Errorf("db.InsertVersion(ctx, %+v) error: %v, want write error: %v", tc.version, err, tc.wantWriteErrType) + t.Errorf("testDB.InsertVersion(ctx, %+v) error: %v, want write error: %v", tc.version, err, tc.wantWriteErrType) } // Test that insertion of duplicate primary key won't fail. if err := testDB.InsertVersion(ctx, tc.version, sampleLicenses); derrors.Type(err) != tc.wantWriteErrType { - t.Errorf("db.InsertVersion(ctx, %+v) second insert error: %v, want write error: %v", tc.version, err, tc.wantWriteErrType) + t.Errorf("testDB.InsertVersion(ctx, %+v) second insert error: %v, want write error: %v", tc.version, err, tc.wantWriteErrType) } got, err := testDB.GetVersion(ctx, tc.getModule, tc.getVersion) if tc.wantReadErr != (err != nil) { - t.Fatalf("db.GetVersion(ctx, %q, %q) error: %v, want read error: %t", tc.getModule, tc.getVersion, err, tc.wantReadErr) + t.Fatalf("testDB.GetVersion(ctx, %q, %q) error: %v, want read error: %t", tc.getModule, tc.getVersion, err, tc.wantReadErr) } if !tc.wantReadErr && got == nil { - t.Fatalf("db.GetVersion(ctx, %q, %q) = %v, want %v", tc.getModule, tc.getVersion, got, tc.version) + t.Fatalf("testDB.GetVersion(ctx, %q, %q) = %v, want %v", tc.getModule, tc.getVersion, got, tc.version) } if tc.version != nil { if diff := cmp.Diff(&tc.version.VersionInfo, got); !tc.wantReadErr && diff != "" { - t.Errorf("db.GetVersion(ctx, %q, %q) mismatch (-want +got):\n%s", tc.getModule, tc.getVersion, diff) + t.Errorf("testDB.GetVersion(ctx, %q, %q) mismatch (-want +got):\n%s", tc.getModule, tc.getVersion, diff) } } gotPkg, err := testDB.GetPackage(ctx, tc.getPkg, tc.getVersion) if tc.version == nil || tc.version.Packages == nil || tc.getPkg == "" { if tc.wantReadErr != (err != nil) { - t.Fatalf("db.GetPackage(ctx, %q, %q) = %v, want %v", tc.getPkg, tc.getVersion, err, sql.ErrNoRows) + t.Fatalf("testDB.GetPackage(ctx, %q, %q) = %v, want %v", tc.getPkg, tc.getVersion, err, sql.ErrNoRows) } return } if err != nil { - t.Errorf("db.GetPackage(ctx, %q, %q): %v", tc.getPkg, tc.getVersion, err) + t.Errorf("testDB.GetPackage(ctx, %q, %q): %v", tc.getPkg, tc.getVersion, err) } wantPkg := tc.version.Packages[0] if err != nil { - t.Fatalf("db.GetPackage(ctx, %q, %q) = %v, want %v", tc.getPkg, tc.getVersion, gotPkg, wantPkg) + t.Fatalf("testDB.GetPackage(ctx, %q, %q) = %v, want %v", tc.getPkg, tc.getVersion, gotPkg, wantPkg) } if gotPkg.VersionInfo.Version != tc.version.Version { - t.Errorf("db.GetPackage(ctx, %q, %q) version.version = %v, want %v", tc.getPkg, tc.getVersion, gotPkg.VersionInfo.Version, tc.version.Version) + t.Errorf("testDB.GetPackage(ctx, %q, %q) version.version = %v, want %v", tc.getPkg, tc.getVersion, gotPkg.VersionInfo.Version, tc.version.Version) } if diff := cmp.Diff(wantPkg, &gotPkg.Package, cmpopts.IgnoreFields(internal.Package{}, "Imports")); diff != "" { - t.Errorf("db.GetPackage(%q, %q) Package mismatch (-want +got):\n%s", tc.getPkg, tc.getVersion, diff) + t.Errorf("testDB.GetPackage(%q, %q) Package mismatch (-want +got):\n%s", tc.getPkg, tc.getVersion, diff) } }) } @@ -390,17 +454,17 @@ func TestPostgres_GetLatestPackage(t *testing.T) { t.Run(tc.name, func(t *testing.T) { for _, v := range tc.versions { if err := testDB.InsertVersion(ctx, v, sampleLicenses); err != nil { - t.Errorf("db.InsertVersion(ctx, %v): %v", v, err) + t.Errorf("testDB.InsertVersion(ctx, %v): %v", v, err) } } gotPkg, err := testDB.GetLatestPackage(ctx, tc.path) if (err != nil) != tc.wantReadErr { - t.Errorf("db.GetLatestPackage(ctx, %q): %v", tc.path, err) + t.Errorf("testDB.GetLatestPackage(ctx, %q): %v", tc.path, err) } if diff := cmp.Diff(gotPkg, tc.wantPkg); diff != "" { - t.Errorf("db.GetLatestPackage(ctx, %q) mismatch (-got +want):\n%s", + t.Errorf("testDB.GetLatestPackage(ctx, %q) mismatch (-got +want):\n%s", tc.path, diff) } }) @@ -514,13 +578,13 @@ func TestPostgres_GetImportsAndImportedBy(t *testing.T) { for _, v := range testVersions { if err := testDB.InsertVersion(ctx, v, sampleLicenses); err != nil { - t.Errorf("db.InsertVersion(%v): %v", v, err) + t.Errorf("testDB.InsertVersion(%v): %v", v, err) } } got, err := testDB.GetImports(ctx, tc.path, tc.version) if err != nil { - t.Fatalf("db.GetImports(%q, %q): %v", tc.path, tc.version, err) + t.Fatalf("testDB.GetImports(%q, %q): %v", tc.path, tc.version, err) } sort.Slice(got, func(i, j int) bool { @@ -530,27 +594,22 @@ func TestPostgres_GetImportsAndImportedBy(t *testing.T) { return tc.wantImports[i].Name > tc.wantImports[j].Name }) if diff := cmp.Diff(tc.wantImports, got); diff != "" { - t.Errorf("db.GetImports(%q, %q) mismatch (-want +got):\n%s", tc.path, tc.version, diff) + t.Errorf("testDB.GetImports(%q, %q) mismatch (-want +got):\n%s", tc.path, tc.version, diff) } gotImportedBy, err := testDB.GetImportedBy(ctx, tc.path) if err != nil { - t.Fatalf("db.GetImports(%q, %q): %v", tc.path, tc.version, err) + t.Fatalf("testDB.GetImports(%q, %q): %v", tc.path, tc.version, err) } if diff := cmp.Diff(tc.wantImportedBy, gotImportedBy); diff != "" { - t.Errorf("db.GetImportedBy(%q, %q) mismatch (-want +got):\n%s", tc.path, tc.version, diff) + t.Errorf("testDB.GetImportedBy(%q, %q) mismatch (-want +got):\n%s", tc.path, tc.version, diff) } }) } } -func TestPostgress_InsertVersionLogs(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), testTimeout) - defer cancel() - - defer ResetTestDB(testDB, t) - +func TestInsertAndUpdateVersionLogs(t *testing.T) { now := NowTruncated().UTC() newVersions := []*internal.VersionLog{ &internal.VersionLog{ @@ -573,19 +632,39 @@ func TestPostgress_InsertVersionLogs(t *testing.T) { }, } + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + defer ResetTestDB(testDB, t) + if err := testDB.InsertVersionLogs(ctx, newVersions); err != nil { - t.Errorf("db.InsertVersionLogs(ctx, newVersions) error: %v", err) + t.Errorf("testDB.InsertVersionLogs(ctx, newVersions) error: %v", err) } dbTime, err := testDB.LatestProxyIndexUpdate(ctx) if err != nil { - t.Errorf("db.LatestProxyIndexUpdate error: %v", err) + t.Errorf("testDB.LatestProxyIndexUpdate error: %v", err) } // Since now is already truncated to Microsecond precision, we should get // back the exact same time. if !dbTime.Equal(now) { - t.Errorf("db.LatestProxyIndexUpdate(ctx) = %v, want %v", dbTime, now) + t.Errorf("testDB.LatestProxyIndexUpdate(ctx) = %v, want %v", dbTime, now) + } + + for _, v := range newVersions { + if err := testDB.UpdateVersionLogError(ctx, v.ModulePath, v.Version, errors.New("test error")); err != nil { + t.Errorf("testDB.UpdateVersionLogError(ctx, %q, %q, errors.New(test error)): %v", v.ModulePath, v.Version, err) + } + } + + var count int + row := testDB.QueryRowContext(ctx, `SELECT COUNT(*) FROM version_logs WHERE error='test error';`) + if err := row.Scan(&count); err != nil { + t.Fatalf("row.Scan(&count): %v", err) + } + if count != len(newVersions) { + t.Errorf("Number of rows with error='test error' = %d, want = %d", count, len(newVersions)) } } @@ -901,7 +980,7 @@ func TestPostgres_GetTaggedAndPseudoVersionsForPackageSeries(t *testing.T) { for i, v := range got { if diff := cmp.Diff(tc.wantTaggedVersions[i], v); diff != "" { - t.Errorf("db.GetTaggedVersionsForPackageSeries(%q) mismatch (-want +got):\n%s", tc.path, diff) + t.Errorf("testDB.GetTaggedVersionsForPackageSeries(%q) mismatch (-want +got):\n%s", tc.path, diff) } } }) @@ -1007,15 +1086,15 @@ func TestGetVersionForPackage(t *testing.T) { defer cancel() if err := testDB.InsertVersion(ctx, tc.wantVersion, sampleLicenses); err != nil { - t.Errorf("db.InsertVersion(ctx, %q %q): %v", tc.path, tc.version, err) + t.Errorf("testDB.InsertVersion(ctx, %q %q): %v", tc.path, tc.version, err) } got, err := testDB.GetVersionForPackage(ctx, tc.path, tc.version) if err != nil { - t.Errorf("db.GetVersionForPackage(ctx, %q, %q): %v", tc.path, tc.version, err) + t.Errorf("testDB.GetVersionForPackage(ctx, %q, %q): %v", tc.path, tc.version, err) } if diff := cmp.Diff(tc.wantVersion, got); diff != "" { - t.Errorf("db.GetVersionForPackage(ctx, %q, %q) mismatch (-want +got):\n%s", tc.path, tc.version, diff) + t.Errorf("testDB.GetVersionForPackage(ctx, %q, %q) mismatch (-want +got):\n%s", tc.path, tc.version, diff) } }) } @@ -1071,17 +1150,17 @@ func TestGetLicenses(t *testing.T) { defer cancel() if err := testDB.InsertVersion(ctx, testVersion, sampleLicenses); err != nil { - t.Errorf("db.InsertVersion(ctx, %q, licenses): %v", testVersion.Version, err) + t.Errorf("testDB.InsertVersion(ctx, %q, licenses): %v", testVersion.Version, err) } for _, test := range tests { t.Run(test.label, func(t *testing.T) { got, err := testDB.GetLicenses(ctx, test.pkgPath, testVersion.Version) if err != nil { - t.Fatalf("db.GetLicenses(ctx, %q, %q): %v", test.pkgPath, testVersion.Version, err) + t.Fatalf("testDB.GetLicenses(ctx, %q, %q): %v", test.pkgPath, testVersion.Version, err) } if diff := cmp.Diff(got, test.wantLicenses); diff != "" { - t.Errorf("db.GetLicenses(ctx, %q, %q) mismatch (-got +want):\n%s", test.pkgPath, testVersion.Version, diff) + t.Errorf("testDB.GetLicenses(ctx, %q, %q) mismatch (-got +want):\n%s", test.pkgPath, testVersion.Version, diff) } }) } -- cgit v1.3-5-g9baa