diff options
| author | Jean Barkhuysen <jean.barkhuysen@gmail.com> | 2026-03-04 10:47:23 -0700 |
|---|---|---|
| committer | Jonathan Amsterdam <jba@google.com> | 2026-04-08 09:33:33 -0700 |
| commit | 372618454cdb62e4cbaab1fd14c58f2faf5db80a (patch) | |
| tree | 20af08dd7bde88b3ff344261368d953b98cf774f /internal/queue/pgqueue/queue_test.go | |
| parent | e09b3908a3542531bb1a84b28406102957861650 (diff) | |
| download | go-x-pkgsite-372618454cdb62e4cbaab1fd14c58f2faf5db80a.tar.xz | |
internal/queue: add postgres queue implementation, and use it in worker|frontend
Fixes golang/go#74027.
Change-Id: I916ac81093e782d4eda21fe11ef47eeff4f5f0b1
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/751480
Reviewed-by: Jonathan Amsterdam <jba@google.com>
kokoro-CI: kokoro <noreply+kokoro@google.com>
Reviewed-by: Ethan Lee <ethanalee@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Diffstat (limited to 'internal/queue/pgqueue/queue_test.go')
| -rw-r--r-- | internal/queue/pgqueue/queue_test.go | 289 |
1 files changed, 289 insertions, 0 deletions
diff --git a/internal/queue/pgqueue/queue_test.go b/internal/queue/pgqueue/queue_test.go new file mode 100644 index 00000000..b1621435 --- /dev/null +++ b/internal/queue/pgqueue/queue_test.go @@ -0,0 +1,289 @@ +// Copyright 2026 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pgqueue + +import ( + "context" + "errors" + "log" + "os" + "sync" + "testing" + "time" + + _ "github.com/jackc/pgx/v4/stdlib" + + "golang.org/x/pkgsite/internal/database" + "golang.org/x/pkgsite/internal/derrors" + "golang.org/x/pkgsite/internal/queue" +) + +const testDBName = "discovery_pgqueue_test" + +var testDB *database.DB + +func TestMain(m *testing.M) { + if os.Getenv("GO_DISCOVERY_TESTDB") != "true" { + log.Printf("SKIPPING: GO_DISCOVERY_TESTDB is not set to true") + return + } + if err := database.CreateDBIfNotExists(testDBName); err != nil { + if errors.Is(err, derrors.NotFound) { + log.Printf("SKIPPING: could not connect to DB: %v", err) + return + } + log.Fatal(err) + } + db, err := database.Open("pgx", database.DBConnURI(testDBName), "test") + if err != nil { + log.Fatal(err) + } + testDB = db + os.Exit(m.Run()) +} + +func setup(t *testing.T) *Queue { + t.Helper() + ctx := context.Background() + if testDB == nil { + t.Skip("test database not available") + } + // Drop and recreate the table for a clean slate. + if _, err := testDB.Exec(ctx, `DROP TABLE IF EXISTS queue_tasks`); err != nil { + t.Fatal(err) + } + q, err := New(ctx, testDB) + if err != nil { + t.Fatal(err) + } + return q +} + +func TestScheduleFetch(t *testing.T) { + q := setup(t) + ctx := context.Background() + + enqueued, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil) + if err != nil { + t.Fatal(err) + } + if !enqueued { + t.Error("ScheduleFetch() = false, want true") + } + + // Same module@version should be deduplicated. + enqueued, err = q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil) + if err != nil { + t.Fatal(err) + } + if enqueued { + t.Error("ScheduleFetch() duplicate = true, want false") + } +} + +func TestScheduleFetchWithSuffix(t *testing.T) { + q := setup(t) + ctx := context.Background() + + if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil { + t.Fatal(err) + } + + // Same module@version with a suffix should not be deduplicated. + enqueued, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", &queue.Options{Suffix: "reprocess-1"}) + if err != nil { + t.Fatal(err) + } + if !enqueued { + t.Error("ScheduleFetch() with suffix = false, want true") + } +} + +func TestPollProcessesTasks(t *testing.T) { + q := setup(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil { + t.Fatal(err) + } + if _, err := q.ScheduleFetch(ctx, "golang.org/x/net", "v0.1.0", nil); err != nil { + t.Fatal(err) + } + + var mu sync.Mutex + var processed []string + + go q.Poll(ctx, 2, func(ctx context.Context, modulePath, version string) (int, error) { + mu.Lock() + processed = append(processed, modulePath+"@"+version) + mu.Unlock() + return 200, nil + }) + + // Wait for tasks to be processed. + deadline := time.After(30 * time.Second) + for { + select { + case <-deadline: + t.Fatal("timed out waiting for tasks to be processed") + case <-time.After(100 * time.Millisecond): + mu.Lock() + n := len(processed) + mu.Unlock() + if n == 2 { + cancel() + return + } + } + } +} + +func TestPollDeletesTaskOnError(t *testing.T) { + q := setup(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + go func() { + q.Poll(ctx, 1, func(ctx context.Context, modulePath, version string) (int, error) { + close(done) + return 500, errors.New("something went wrong") + }) + }() + + select { + case <-done: + case <-time.After(30 * time.Second): + t.Fatal("timed out waiting for task to be processed") + } + cancel() + + // Verify the task was deleted despite the error. + var count int + err := testDB.QueryRow(context.Background(), `SELECT count(*) FROM queue_tasks`).Scan(&count) + if err != nil { + t.Fatal(err) + } + // Allow a brief moment for the delete to complete. + time.Sleep(100 * time.Millisecond) + err = testDB.QueryRow(context.Background(), `SELECT count(*) FROM queue_tasks`).Scan(&count) + if err != nil { + t.Fatal(err) + } + if count != 0 { + t.Errorf("queue_tasks count = %d, want 0", count) + } +} + +func TestPollReclaimsStalledTasks(t *testing.T) { + q := setup(t) + ctx := context.Background() + + if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil { + t.Fatal(err) + } + + // Simulate a stalled task by setting started_at to the past. + if _, err := testDB.Exec(ctx, + `UPDATE queue_tasks SET started_at = NOW() - INTERVAL '10 minutes'`); err != nil { + t.Fatal(err) + } + + pollCtx, cancel := context.WithCancel(ctx) + defer cancel() + + done := make(chan struct{}) + go func() { + q.Poll(pollCtx, 1, func(ctx context.Context, modulePath, version string) (int, error) { + close(done) + return 200, nil + }) + }() + + select { + case <-done: + // Task was reclaimed and processed. + case <-time.After(30 * time.Second): + t.Fatal("timed out waiting for stalled task to be reclaimed") + } + cancel() +} + +func TestStalledWorkerDeleteIsNoop(t *testing.T) { + // Worker1 claims a task, but stalls. Worker2 claims it later. Worker1 + // unstalls and completes, but does not delete the task from the queue: once + // worker2 has the task, only it may do the delete. It unstalls and finishes + // and we assert the task is deleted from the queue. + q := setup(t) + ctx := context.Background() + + if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil { + t.Fatal(err) + } + + worker1Claimed := make(chan struct{}) + worker1Stall := make(chan struct{}) + worker1Done := make(chan struct{}) + worker2Claimed := make(chan struct{}) + worker2Stall := make(chan struct{}) + worker2Done := make(chan struct{}) + + // Worker 1 claims the task and stalls. + go func() { + q.claimAndProcess(ctx, func(ctx context.Context, modulePath, version string) (int, error) { + close(worker1Claimed) + <-worker1Stall + return 200, nil + }) + close(worker1Done) + }() + + // Wait for worker 1 to enter processFunc, then backdate started_at so the + // task is eligible for reclaim. + <-worker1Claimed + if _, err := testDB.Exec(ctx, `UPDATE queue_tasks SET started_at = NOW() - INTERVAL '10 minutes'`); err != nil { + t.Fatal(err) + } + + // Worker 2 reclaims the task and stalls. + go func() { + q.claimAndProcess(ctx, func(ctx context.Context, modulePath, version string) (int, error) { + close(worker2Claimed) + <-worker2Stall + return 200, nil + }) + close(worker2Done) + }() + <-worker2Claimed + + // Unstall worker 1. Its delete should be a no-op because worker 2 has a + // newer started_at. + close(worker1Stall) + <-worker1Done + + var count int + if err := testDB.QueryRow(ctx, `SELECT count(*) FROM queue_tasks`).Scan(&count); err != nil { + t.Fatal(err) + } + if count != 1 { + t.Errorf("after worker 1: task count = %d, want 1 (worker 1 should not have deleted it)", count) + } + + // Unstall worker 2. Its delete should succeed. + close(worker2Stall) + <-worker2Done + + if err := testDB.QueryRow(ctx, `SELECT count(*) FROM queue_tasks`).Scan(&count); err != nil { + t.Fatal(err) + } + if count != 0 { + t.Errorf("after worker 2: task count = %d, want 0", count) + } +} |
