aboutsummaryrefslogtreecommitdiff
path: root/internal/queue/pgqueue/queue_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/queue/pgqueue/queue_test.go')
-rw-r--r--internal/queue/pgqueue/queue_test.go289
1 files changed, 289 insertions, 0 deletions
diff --git a/internal/queue/pgqueue/queue_test.go b/internal/queue/pgqueue/queue_test.go
new file mode 100644
index 00000000..b1621435
--- /dev/null
+++ b/internal/queue/pgqueue/queue_test.go
@@ -0,0 +1,289 @@
+// Copyright 2026 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pgqueue
+
+import (
+ "context"
+ "errors"
+ "log"
+ "os"
+ "sync"
+ "testing"
+ "time"
+
+ _ "github.com/jackc/pgx/v4/stdlib"
+
+ "golang.org/x/pkgsite/internal/database"
+ "golang.org/x/pkgsite/internal/derrors"
+ "golang.org/x/pkgsite/internal/queue"
+)
+
+const testDBName = "discovery_pgqueue_test"
+
+var testDB *database.DB
+
+func TestMain(m *testing.M) {
+ if os.Getenv("GO_DISCOVERY_TESTDB") != "true" {
+ log.Printf("SKIPPING: GO_DISCOVERY_TESTDB is not set to true")
+ return
+ }
+ if err := database.CreateDBIfNotExists(testDBName); err != nil {
+ if errors.Is(err, derrors.NotFound) {
+ log.Printf("SKIPPING: could not connect to DB: %v", err)
+ return
+ }
+ log.Fatal(err)
+ }
+ db, err := database.Open("pgx", database.DBConnURI(testDBName), "test")
+ if err != nil {
+ log.Fatal(err)
+ }
+ testDB = db
+ os.Exit(m.Run())
+}
+
+func setup(t *testing.T) *Queue {
+ t.Helper()
+ ctx := context.Background()
+ if testDB == nil {
+ t.Skip("test database not available")
+ }
+ // Drop and recreate the table for a clean slate.
+ if _, err := testDB.Exec(ctx, `DROP TABLE IF EXISTS queue_tasks`); err != nil {
+ t.Fatal(err)
+ }
+ q, err := New(ctx, testDB)
+ if err != nil {
+ t.Fatal(err)
+ }
+ return q
+}
+
+func TestScheduleFetch(t *testing.T) {
+ q := setup(t)
+ ctx := context.Background()
+
+ enqueued, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !enqueued {
+ t.Error("ScheduleFetch() = false, want true")
+ }
+
+ // Same module@version should be deduplicated.
+ enqueued, err = q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if enqueued {
+ t.Error("ScheduleFetch() duplicate = true, want false")
+ }
+}
+
+func TestScheduleFetchWithSuffix(t *testing.T) {
+ q := setup(t)
+ ctx := context.Background()
+
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
+ t.Fatal(err)
+ }
+
+ // Same module@version with a suffix should not be deduplicated.
+ enqueued, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", &queue.Options{Suffix: "reprocess-1"})
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !enqueued {
+ t.Error("ScheduleFetch() with suffix = false, want true")
+ }
+}
+
+func TestPollProcessesTasks(t *testing.T) {
+ q := setup(t)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
+ t.Fatal(err)
+ }
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/net", "v0.1.0", nil); err != nil {
+ t.Fatal(err)
+ }
+
+ var mu sync.Mutex
+ var processed []string
+
+ go q.Poll(ctx, 2, func(ctx context.Context, modulePath, version string) (int, error) {
+ mu.Lock()
+ processed = append(processed, modulePath+"@"+version)
+ mu.Unlock()
+ return 200, nil
+ })
+
+ // Wait for tasks to be processed.
+ deadline := time.After(30 * time.Second)
+ for {
+ select {
+ case <-deadline:
+ t.Fatal("timed out waiting for tasks to be processed")
+ case <-time.After(100 * time.Millisecond):
+ mu.Lock()
+ n := len(processed)
+ mu.Unlock()
+ if n == 2 {
+ cancel()
+ return
+ }
+ }
+ }
+}
+
+func TestPollDeletesTaskOnError(t *testing.T) {
+ q := setup(t)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
+ t.Fatal(err)
+ }
+
+ done := make(chan struct{})
+ go func() {
+ q.Poll(ctx, 1, func(ctx context.Context, modulePath, version string) (int, error) {
+ close(done)
+ return 500, errors.New("something went wrong")
+ })
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(30 * time.Second):
+ t.Fatal("timed out waiting for task to be processed")
+ }
+ cancel()
+
+ // Verify the task was deleted despite the error.
+ var count int
+ err := testDB.QueryRow(context.Background(), `SELECT count(*) FROM queue_tasks`).Scan(&count)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Allow a brief moment for the delete to complete.
+ time.Sleep(100 * time.Millisecond)
+ err = testDB.QueryRow(context.Background(), `SELECT count(*) FROM queue_tasks`).Scan(&count)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if count != 0 {
+ t.Errorf("queue_tasks count = %d, want 0", count)
+ }
+}
+
+func TestPollReclaimsStalledTasks(t *testing.T) {
+ q := setup(t)
+ ctx := context.Background()
+
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
+ t.Fatal(err)
+ }
+
+ // Simulate a stalled task by setting started_at to the past.
+ if _, err := testDB.Exec(ctx,
+ `UPDATE queue_tasks SET started_at = NOW() - INTERVAL '10 minutes'`); err != nil {
+ t.Fatal(err)
+ }
+
+ pollCtx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ done := make(chan struct{})
+ go func() {
+ q.Poll(pollCtx, 1, func(ctx context.Context, modulePath, version string) (int, error) {
+ close(done)
+ return 200, nil
+ })
+ }()
+
+ select {
+ case <-done:
+ // Task was reclaimed and processed.
+ case <-time.After(30 * time.Second):
+ t.Fatal("timed out waiting for stalled task to be reclaimed")
+ }
+ cancel()
+}
+
+func TestStalledWorkerDeleteIsNoop(t *testing.T) {
+ // Worker1 claims a task, but stalls. Worker2 claims it later. Worker1
+ // unstalls and completes, but does not delete the task from the queue: once
+ // worker2 has the task, only it may do the delete. It unstalls and finishes
+ // and we assert the task is deleted from the queue.
+ q := setup(t)
+ ctx := context.Background()
+
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
+ t.Fatal(err)
+ }
+
+ worker1Claimed := make(chan struct{})
+ worker1Stall := make(chan struct{})
+ worker1Done := make(chan struct{})
+ worker2Claimed := make(chan struct{})
+ worker2Stall := make(chan struct{})
+ worker2Done := make(chan struct{})
+
+ // Worker 1 claims the task and stalls.
+ go func() {
+ q.claimAndProcess(ctx, func(ctx context.Context, modulePath, version string) (int, error) {
+ close(worker1Claimed)
+ <-worker1Stall
+ return 200, nil
+ })
+ close(worker1Done)
+ }()
+
+ // Wait for worker 1 to enter processFunc, then backdate started_at so the
+ // task is eligible for reclaim.
+ <-worker1Claimed
+ if _, err := testDB.Exec(ctx, `UPDATE queue_tasks SET started_at = NOW() - INTERVAL '10 minutes'`); err != nil {
+ t.Fatal(err)
+ }
+
+ // Worker 2 reclaims the task and stalls.
+ go func() {
+ q.claimAndProcess(ctx, func(ctx context.Context, modulePath, version string) (int, error) {
+ close(worker2Claimed)
+ <-worker2Stall
+ return 200, nil
+ })
+ close(worker2Done)
+ }()
+ <-worker2Claimed
+
+ // Unstall worker 1. Its delete should be a no-op because worker 2 has a
+ // newer started_at.
+ close(worker1Stall)
+ <-worker1Done
+
+ var count int
+ if err := testDB.QueryRow(ctx, `SELECT count(*) FROM queue_tasks`).Scan(&count); err != nil {
+ t.Fatal(err)
+ }
+ if count != 1 {
+ t.Errorf("after worker 1: task count = %d, want 1 (worker 1 should not have deleted it)", count)
+ }
+
+ // Unstall worker 2. Its delete should succeed.
+ close(worker2Stall)
+ <-worker2Done
+
+ if err := testDB.QueryRow(ctx, `SELECT count(*) FROM queue_tasks`).Scan(&count); err != nil {
+ t.Fatal(err)
+ }
+ if count != 0 {
+ t.Errorf("after worker 2: task count = %d, want 0", count)
+ }
+}