aboutsummaryrefslogtreecommitdiff
path: root/internal/queue
diff options
context:
space:
mode:
Diffstat (limited to 'internal/queue')
-rw-r--r--internal/queue/pgqueue/queue.go140
-rw-r--r--internal/queue/pgqueue/queue_test.go289
2 files changed, 429 insertions, 0 deletions
diff --git a/internal/queue/pgqueue/queue.go b/internal/queue/pgqueue/queue.go
new file mode 100644
index 00000000..e4e07ddc
--- /dev/null
+++ b/internal/queue/pgqueue/queue.go
@@ -0,0 +1,140 @@
+// Copyright 2026 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package pgqueue provides a Postgres-backed queue implementation for
+// scheduling and processing fetch actions. It supports multiple concurrent
+// workers (processes or goroutines)
+package pgqueue
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "golang.org/x/pkgsite/internal/database"
+ "golang.org/x/pkgsite/internal/log"
+ "golang.org/x/pkgsite/internal/queue"
+)
+
+// The frequency at which we poll for work.
+const pollInterval = 5 * time.Second
+
+// ProcessFunc is the function signature for processing dequeued work.
+type ProcessFunc func(ctx context.Context, modulePath, version string) (int, error)
+
+// Queue implements the Queue interface backed by a Postgres table. It is safe
+// for concurrent use by multiple goroutines and processes.
+type Queue struct {
+ db *database.DB
+}
+
+// New creates the queue_tasks table if it doesn't exist and returns a Queue.
+func New(ctx context.Context, db *database.DB) (*Queue, error) {
+ // TODO(jbarkhuysen): If we find it onerous to do table updates over time, we
+ // may want to consider alternatives to doing this here.
+ if _, err := db.Exec(ctx, createTableQuery); err != nil {
+ return nil, fmt.Errorf("pgqueue.New: creating table: %w", err)
+ }
+ return &Queue{db: db}, nil
+}
+
+const createTableQuery = `
+CREATE TABLE IF NOT EXISTS queue_tasks (
+ id BIGSERIAL PRIMARY KEY,
+ task_name TEXT UNIQUE NOT NULL,
+ module_path TEXT NOT NULL,
+ version TEXT NOT NULL,
+ created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
+ started_at TIMESTAMPTZ
+);
+CREATE INDEX IF NOT EXISTS idx_queue_tasks_started_created
+ON queue_tasks (started_at, created_at);`
+
+// ScheduleFetch inserts a task into queue_tasks. It returns (true, nil) if the
+// task was inserted, or (false, nil) if it was a duplicate.
+func (q *Queue) ScheduleFetch(ctx context.Context, modulePath, version string, opts *queue.Options) (bool, error) {
+ taskName := modulePath + "@" + version
+ if opts != nil && opts.Suffix != "" {
+ taskName += "-" + opts.Suffix
+ }
+ n, err := q.db.Exec(ctx,
+ `INSERT INTO queue_tasks (task_name, module_path, version) VALUES ($1, $2, $3) ON CONFLICT (task_name) DO NOTHING`,
+ taskName, modulePath, version)
+ if err != nil {
+ return false, fmt.Errorf("pgqueue.ScheduleFetch(%q, %q): %w", modulePath, version, err)
+ }
+ return n == 1, nil
+}
+
+// Poll starts background polling for work. It spawns the given number of worker
+// goroutines, each of which periodically claims a task, runs processFunc, and
+// deletes the task on completion. It blocks until ctx is cancelled.
+func (q *Queue) Poll(ctx context.Context, workers int, processFunc ProcessFunc) {
+ wg := sync.WaitGroup{}
+ for range workers {
+ wg.Go(func() {
+ // Periodically claim work.
+ ticker := time.NewTicker(pollInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ q.claimAndProcess(ctx, processFunc)
+ }
+ }
+ })
+ }
+ wg.Wait()
+}
+
+// TODO(jbarkhuysen): 5m stall timeout is baked in; we might want to make it
+// variable in the future.
+const dequeueQuery = `
+WITH next_task AS (
+ SELECT id
+ FROM queue_tasks
+ WHERE started_at IS NULL
+ OR started_at + INTERVAL '5 minutes' < NOW()
+ ORDER BY created_at ASC
+ LIMIT 1
+ FOR UPDATE SKIP LOCKED
+)
+UPDATE queue_tasks
+SET started_at = NOW()
+WHERE id = (SELECT id FROM next_task)
+RETURNING id, module_path, version, started_at`
+
+func (q *Queue) claimAndProcess(ctx context.Context, processFunc ProcessFunc) {
+ var id int64
+ var modulePath, version string
+ var startedAt time.Time
+ err := q.db.QueryRow(ctx, dequeueQuery).Scan(&id, &modulePath, &version, &startedAt)
+ if errors.Is(err, sql.ErrNoRows) {
+ return // There's no work: no-op.
+ }
+ if err != nil {
+ log.Errorf(ctx, "pgqueue: dequeue: %v", err)
+ return
+ }
+
+ log.Infof(ctx, "pgqueue: processing %s@%s (task %d)", modulePath, version, id)
+ code, err := processFunc(ctx, modulePath, version)
+ if err != nil {
+ log.Errorf(ctx, "pgqueue: processing %s@%s: status=%d err=%v", modulePath, version, code, err)
+ // This still gets removed (delete below) so that we don't endlessly
+ // fail the same work item.
+ }
+
+ // Use a background context for cleanup so the delete succeeds even if
+ // the poll context has been cancelled.
+ delCtx := context.Background()
+ if _, err := q.db.Exec(delCtx, `DELETE FROM queue_tasks WHERE id = $1 AND started_at = $2`, id, startedAt); err != nil {
+ log.Errorf(delCtx, "pgqueue: deleting task %d: %v", id, err)
+ }
+}
diff --git a/internal/queue/pgqueue/queue_test.go b/internal/queue/pgqueue/queue_test.go
new file mode 100644
index 00000000..b1621435
--- /dev/null
+++ b/internal/queue/pgqueue/queue_test.go
@@ -0,0 +1,289 @@
+// Copyright 2026 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pgqueue
+
+import (
+ "context"
+ "errors"
+ "log"
+ "os"
+ "sync"
+ "testing"
+ "time"
+
+ _ "github.com/jackc/pgx/v4/stdlib"
+
+ "golang.org/x/pkgsite/internal/database"
+ "golang.org/x/pkgsite/internal/derrors"
+ "golang.org/x/pkgsite/internal/queue"
+)
+
+const testDBName = "discovery_pgqueue_test"
+
+var testDB *database.DB
+
+func TestMain(m *testing.M) {
+ if os.Getenv("GO_DISCOVERY_TESTDB") != "true" {
+ log.Printf("SKIPPING: GO_DISCOVERY_TESTDB is not set to true")
+ return
+ }
+ if err := database.CreateDBIfNotExists(testDBName); err != nil {
+ if errors.Is(err, derrors.NotFound) {
+ log.Printf("SKIPPING: could not connect to DB: %v", err)
+ return
+ }
+ log.Fatal(err)
+ }
+ db, err := database.Open("pgx", database.DBConnURI(testDBName), "test")
+ if err != nil {
+ log.Fatal(err)
+ }
+ testDB = db
+ os.Exit(m.Run())
+}
+
+func setup(t *testing.T) *Queue {
+ t.Helper()
+ ctx := context.Background()
+ if testDB == nil {
+ t.Skip("test database not available")
+ }
+ // Drop and recreate the table for a clean slate.
+ if _, err := testDB.Exec(ctx, `DROP TABLE IF EXISTS queue_tasks`); err != nil {
+ t.Fatal(err)
+ }
+ q, err := New(ctx, testDB)
+ if err != nil {
+ t.Fatal(err)
+ }
+ return q
+}
+
+func TestScheduleFetch(t *testing.T) {
+ q := setup(t)
+ ctx := context.Background()
+
+ enqueued, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !enqueued {
+ t.Error("ScheduleFetch() = false, want true")
+ }
+
+ // Same module@version should be deduplicated.
+ enqueued, err = q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if enqueued {
+ t.Error("ScheduleFetch() duplicate = true, want false")
+ }
+}
+
+func TestScheduleFetchWithSuffix(t *testing.T) {
+ q := setup(t)
+ ctx := context.Background()
+
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
+ t.Fatal(err)
+ }
+
+ // Same module@version with a suffix should not be deduplicated.
+ enqueued, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", &queue.Options{Suffix: "reprocess-1"})
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !enqueued {
+ t.Error("ScheduleFetch() with suffix = false, want true")
+ }
+}
+
+func TestPollProcessesTasks(t *testing.T) {
+ q := setup(t)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
+ t.Fatal(err)
+ }
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/net", "v0.1.0", nil); err != nil {
+ t.Fatal(err)
+ }
+
+ var mu sync.Mutex
+ var processed []string
+
+ go q.Poll(ctx, 2, func(ctx context.Context, modulePath, version string) (int, error) {
+ mu.Lock()
+ processed = append(processed, modulePath+"@"+version)
+ mu.Unlock()
+ return 200, nil
+ })
+
+ // Wait for tasks to be processed.
+ deadline := time.After(30 * time.Second)
+ for {
+ select {
+ case <-deadline:
+ t.Fatal("timed out waiting for tasks to be processed")
+ case <-time.After(100 * time.Millisecond):
+ mu.Lock()
+ n := len(processed)
+ mu.Unlock()
+ if n == 2 {
+ cancel()
+ return
+ }
+ }
+ }
+}
+
+func TestPollDeletesTaskOnError(t *testing.T) {
+ q := setup(t)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
+ t.Fatal(err)
+ }
+
+ done := make(chan struct{})
+ go func() {
+ q.Poll(ctx, 1, func(ctx context.Context, modulePath, version string) (int, error) {
+ close(done)
+ return 500, errors.New("something went wrong")
+ })
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(30 * time.Second):
+ t.Fatal("timed out waiting for task to be processed")
+ }
+ cancel()
+
+ // Verify the task was deleted despite the error.
+ var count int
+ err := testDB.QueryRow(context.Background(), `SELECT count(*) FROM queue_tasks`).Scan(&count)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Allow a brief moment for the delete to complete.
+ time.Sleep(100 * time.Millisecond)
+ err = testDB.QueryRow(context.Background(), `SELECT count(*) FROM queue_tasks`).Scan(&count)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if count != 0 {
+ t.Errorf("queue_tasks count = %d, want 0", count)
+ }
+}
+
+func TestPollReclaimsStalledTasks(t *testing.T) {
+ q := setup(t)
+ ctx := context.Background()
+
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
+ t.Fatal(err)
+ }
+
+ // Simulate a stalled task by setting started_at to the past.
+ if _, err := testDB.Exec(ctx,
+ `UPDATE queue_tasks SET started_at = NOW() - INTERVAL '10 minutes'`); err != nil {
+ t.Fatal(err)
+ }
+
+ pollCtx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ done := make(chan struct{})
+ go func() {
+ q.Poll(pollCtx, 1, func(ctx context.Context, modulePath, version string) (int, error) {
+ close(done)
+ return 200, nil
+ })
+ }()
+
+ select {
+ case <-done:
+ // Task was reclaimed and processed.
+ case <-time.After(30 * time.Second):
+ t.Fatal("timed out waiting for stalled task to be reclaimed")
+ }
+ cancel()
+}
+
+func TestStalledWorkerDeleteIsNoop(t *testing.T) {
+ // Worker1 claims a task, but stalls. Worker2 claims it later. Worker1
+ // unstalls and completes, but does not delete the task from the queue: once
+ // worker2 has the task, only it may do the delete. It unstalls and finishes
+ // and we assert the task is deleted from the queue.
+ q := setup(t)
+ ctx := context.Background()
+
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
+ t.Fatal(err)
+ }
+
+ worker1Claimed := make(chan struct{})
+ worker1Stall := make(chan struct{})
+ worker1Done := make(chan struct{})
+ worker2Claimed := make(chan struct{})
+ worker2Stall := make(chan struct{})
+ worker2Done := make(chan struct{})
+
+ // Worker 1 claims the task and stalls.
+ go func() {
+ q.claimAndProcess(ctx, func(ctx context.Context, modulePath, version string) (int, error) {
+ close(worker1Claimed)
+ <-worker1Stall
+ return 200, nil
+ })
+ close(worker1Done)
+ }()
+
+ // Wait for worker 1 to enter processFunc, then backdate started_at so the
+ // task is eligible for reclaim.
+ <-worker1Claimed
+ if _, err := testDB.Exec(ctx, `UPDATE queue_tasks SET started_at = NOW() - INTERVAL '10 minutes'`); err != nil {
+ t.Fatal(err)
+ }
+
+ // Worker 2 reclaims the task and stalls.
+ go func() {
+ q.claimAndProcess(ctx, func(ctx context.Context, modulePath, version string) (int, error) {
+ close(worker2Claimed)
+ <-worker2Stall
+ return 200, nil
+ })
+ close(worker2Done)
+ }()
+ <-worker2Claimed
+
+ // Unstall worker 1. Its delete should be a no-op because worker 2 has a
+ // newer started_at.
+ close(worker1Stall)
+ <-worker1Done
+
+ var count int
+ if err := testDB.QueryRow(ctx, `SELECT count(*) FROM queue_tasks`).Scan(&count); err != nil {
+ t.Fatal(err)
+ }
+ if count != 1 {
+ t.Errorf("after worker 1: task count = %d, want 1 (worker 1 should not have deleted it)", count)
+ }
+
+ // Unstall worker 2. Its delete should succeed.
+ close(worker2Stall)
+ <-worker2Done
+
+ if err := testDB.QueryRow(ctx, `SELECT count(*) FROM queue_tasks`).Scan(&count); err != nil {
+ t.Fatal(err)
+ }
+ if count != 0 {
+ t.Errorf("after worker 2: task count = %d, want 0", count)
+ }
+}