diff options
| author | Jean Barkhuysen <jean.barkhuysen@gmail.com> | 2026-03-04 10:47:23 -0700 |
|---|---|---|
| committer | Jonathan Amsterdam <jba@google.com> | 2026-04-08 09:33:33 -0700 |
| commit | 372618454cdb62e4cbaab1fd14c58f2faf5db80a (patch) | |
| tree | 20af08dd7bde88b3ff344261368d953b98cf774f /internal/queue/pgqueue | |
| parent | e09b3908a3542531bb1a84b28406102957861650 (diff) | |
| download | go-x-pkgsite-372618454cdb62e4cbaab1fd14c58f2faf5db80a.tar.xz | |
internal/queue: add postgres queue implementation, and use it in worker|frontend
Fixes golang/go#74027.
Change-Id: I916ac81093e782d4eda21fe11ef47eeff4f5f0b1
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/751480
Reviewed-by: Jonathan Amsterdam <jba@google.com>
kokoro-CI: kokoro <noreply+kokoro@google.com>
Reviewed-by: Ethan Lee <ethanalee@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Diffstat (limited to 'internal/queue/pgqueue')
| -rw-r--r-- | internal/queue/pgqueue/queue.go | 140 | ||||
| -rw-r--r-- | internal/queue/pgqueue/queue_test.go | 289 |
2 files changed, 429 insertions, 0 deletions
diff --git a/internal/queue/pgqueue/queue.go b/internal/queue/pgqueue/queue.go new file mode 100644 index 00000000..e4e07ddc --- /dev/null +++ b/internal/queue/pgqueue/queue.go @@ -0,0 +1,140 @@ +// Copyright 2026 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package pgqueue provides a Postgres-backed queue implementation for +// scheduling and processing fetch actions. It supports multiple concurrent +// workers (processes or goroutines) +package pgqueue + +import ( + "context" + "database/sql" + "errors" + "fmt" + "sync" + "time" + + "golang.org/x/pkgsite/internal/database" + "golang.org/x/pkgsite/internal/log" + "golang.org/x/pkgsite/internal/queue" +) + +// The frequency at which we poll for work. +const pollInterval = 5 * time.Second + +// ProcessFunc is the function signature for processing dequeued work. +type ProcessFunc func(ctx context.Context, modulePath, version string) (int, error) + +// Queue implements the Queue interface backed by a Postgres table. It is safe +// for concurrent use by multiple goroutines and processes. +type Queue struct { + db *database.DB +} + +// New creates the queue_tasks table if it doesn't exist and returns a Queue. +func New(ctx context.Context, db *database.DB) (*Queue, error) { + // TODO(jbarkhuysen): If we find it onerous to do table updates over time, we + // may want to consider alternatives to doing this here. + if _, err := db.Exec(ctx, createTableQuery); err != nil { + return nil, fmt.Errorf("pgqueue.New: creating table: %w", err) + } + return &Queue{db: db}, nil +} + +const createTableQuery = ` +CREATE TABLE IF NOT EXISTS queue_tasks ( + id BIGSERIAL PRIMARY KEY, + task_name TEXT UNIQUE NOT NULL, + module_path TEXT NOT NULL, + version TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + started_at TIMESTAMPTZ +); +CREATE INDEX IF NOT EXISTS idx_queue_tasks_started_created +ON queue_tasks (started_at, created_at);` + +// ScheduleFetch inserts a task into queue_tasks. It returns (true, nil) if the +// task was inserted, or (false, nil) if it was a duplicate. +func (q *Queue) ScheduleFetch(ctx context.Context, modulePath, version string, opts *queue.Options) (bool, error) { + taskName := modulePath + "@" + version + if opts != nil && opts.Suffix != "" { + taskName += "-" + opts.Suffix + } + n, err := q.db.Exec(ctx, + `INSERT INTO queue_tasks (task_name, module_path, version) VALUES ($1, $2, $3) ON CONFLICT (task_name) DO NOTHING`, + taskName, modulePath, version) + if err != nil { + return false, fmt.Errorf("pgqueue.ScheduleFetch(%q, %q): %w", modulePath, version, err) + } + return n == 1, nil +} + +// Poll starts background polling for work. It spawns the given number of worker +// goroutines, each of which periodically claims a task, runs processFunc, and +// deletes the task on completion. It blocks until ctx is cancelled. +func (q *Queue) Poll(ctx context.Context, workers int, processFunc ProcessFunc) { + wg := sync.WaitGroup{} + for range workers { + wg.Go(func() { + // Periodically claim work. + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + q.claimAndProcess(ctx, processFunc) + } + } + }) + } + wg.Wait() +} + +// TODO(jbarkhuysen): 5m stall timeout is baked in; we might want to make it +// variable in the future. +const dequeueQuery = ` +WITH next_task AS ( + SELECT id + FROM queue_tasks + WHERE started_at IS NULL + OR started_at + INTERVAL '5 minutes' < NOW() + ORDER BY created_at ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED +) +UPDATE queue_tasks +SET started_at = NOW() +WHERE id = (SELECT id FROM next_task) +RETURNING id, module_path, version, started_at` + +func (q *Queue) claimAndProcess(ctx context.Context, processFunc ProcessFunc) { + var id int64 + var modulePath, version string + var startedAt time.Time + err := q.db.QueryRow(ctx, dequeueQuery).Scan(&id, &modulePath, &version, &startedAt) + if errors.Is(err, sql.ErrNoRows) { + return // There's no work: no-op. + } + if err != nil { + log.Errorf(ctx, "pgqueue: dequeue: %v", err) + return + } + + log.Infof(ctx, "pgqueue: processing %s@%s (task %d)", modulePath, version, id) + code, err := processFunc(ctx, modulePath, version) + if err != nil { + log.Errorf(ctx, "pgqueue: processing %s@%s: status=%d err=%v", modulePath, version, code, err) + // This still gets removed (delete below) so that we don't endlessly + // fail the same work item. + } + + // Use a background context for cleanup so the delete succeeds even if + // the poll context has been cancelled. + delCtx := context.Background() + if _, err := q.db.Exec(delCtx, `DELETE FROM queue_tasks WHERE id = $1 AND started_at = $2`, id, startedAt); err != nil { + log.Errorf(delCtx, "pgqueue: deleting task %d: %v", id, err) + } +} diff --git a/internal/queue/pgqueue/queue_test.go b/internal/queue/pgqueue/queue_test.go new file mode 100644 index 00000000..b1621435 --- /dev/null +++ b/internal/queue/pgqueue/queue_test.go @@ -0,0 +1,289 @@ +// Copyright 2026 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pgqueue + +import ( + "context" + "errors" + "log" + "os" + "sync" + "testing" + "time" + + _ "github.com/jackc/pgx/v4/stdlib" + + "golang.org/x/pkgsite/internal/database" + "golang.org/x/pkgsite/internal/derrors" + "golang.org/x/pkgsite/internal/queue" +) + +const testDBName = "discovery_pgqueue_test" + +var testDB *database.DB + +func TestMain(m *testing.M) { + if os.Getenv("GO_DISCOVERY_TESTDB") != "true" { + log.Printf("SKIPPING: GO_DISCOVERY_TESTDB is not set to true") + return + } + if err := database.CreateDBIfNotExists(testDBName); err != nil { + if errors.Is(err, derrors.NotFound) { + log.Printf("SKIPPING: could not connect to DB: %v", err) + return + } + log.Fatal(err) + } + db, err := database.Open("pgx", database.DBConnURI(testDBName), "test") + if err != nil { + log.Fatal(err) + } + testDB = db + os.Exit(m.Run()) +} + +func setup(t *testing.T) *Queue { + t.Helper() + ctx := context.Background() + if testDB == nil { + t.Skip("test database not available") + } + // Drop and recreate the table for a clean slate. + if _, err := testDB.Exec(ctx, `DROP TABLE IF EXISTS queue_tasks`); err != nil { + t.Fatal(err) + } + q, err := New(ctx, testDB) + if err != nil { + t.Fatal(err) + } + return q +} + +func TestScheduleFetch(t *testing.T) { + q := setup(t) + ctx := context.Background() + + enqueued, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil) + if err != nil { + t.Fatal(err) + } + if !enqueued { + t.Error("ScheduleFetch() = false, want true") + } + + // Same module@version should be deduplicated. + enqueued, err = q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil) + if err != nil { + t.Fatal(err) + } + if enqueued { + t.Error("ScheduleFetch() duplicate = true, want false") + } +} + +func TestScheduleFetchWithSuffix(t *testing.T) { + q := setup(t) + ctx := context.Background() + + if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil { + t.Fatal(err) + } + + // Same module@version with a suffix should not be deduplicated. + enqueued, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", &queue.Options{Suffix: "reprocess-1"}) + if err != nil { + t.Fatal(err) + } + if !enqueued { + t.Error("ScheduleFetch() with suffix = false, want true") + } +} + +func TestPollProcessesTasks(t *testing.T) { + q := setup(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil { + t.Fatal(err) + } + if _, err := q.ScheduleFetch(ctx, "golang.org/x/net", "v0.1.0", nil); err != nil { + t.Fatal(err) + } + + var mu sync.Mutex + var processed []string + + go q.Poll(ctx, 2, func(ctx context.Context, modulePath, version string) (int, error) { + mu.Lock() + processed = append(processed, modulePath+"@"+version) + mu.Unlock() + return 200, nil + }) + + // Wait for tasks to be processed. + deadline := time.After(30 * time.Second) + for { + select { + case <-deadline: + t.Fatal("timed out waiting for tasks to be processed") + case <-time.After(100 * time.Millisecond): + mu.Lock() + n := len(processed) + mu.Unlock() + if n == 2 { + cancel() + return + } + } + } +} + +func TestPollDeletesTaskOnError(t *testing.T) { + q := setup(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + go func() { + q.Poll(ctx, 1, func(ctx context.Context, modulePath, version string) (int, error) { + close(done) + return 500, errors.New("something went wrong") + }) + }() + + select { + case <-done: + case <-time.After(30 * time.Second): + t.Fatal("timed out waiting for task to be processed") + } + cancel() + + // Verify the task was deleted despite the error. + var count int + err := testDB.QueryRow(context.Background(), `SELECT count(*) FROM queue_tasks`).Scan(&count) + if err != nil { + t.Fatal(err) + } + // Allow a brief moment for the delete to complete. + time.Sleep(100 * time.Millisecond) + err = testDB.QueryRow(context.Background(), `SELECT count(*) FROM queue_tasks`).Scan(&count) + if err != nil { + t.Fatal(err) + } + if count != 0 { + t.Errorf("queue_tasks count = %d, want 0", count) + } +} + +func TestPollReclaimsStalledTasks(t *testing.T) { + q := setup(t) + ctx := context.Background() + + if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil { + t.Fatal(err) + } + + // Simulate a stalled task by setting started_at to the past. + if _, err := testDB.Exec(ctx, + `UPDATE queue_tasks SET started_at = NOW() - INTERVAL '10 minutes'`); err != nil { + t.Fatal(err) + } + + pollCtx, cancel := context.WithCancel(ctx) + defer cancel() + + done := make(chan struct{}) + go func() { + q.Poll(pollCtx, 1, func(ctx context.Context, modulePath, version string) (int, error) { + close(done) + return 200, nil + }) + }() + + select { + case <-done: + // Task was reclaimed and processed. + case <-time.After(30 * time.Second): + t.Fatal("timed out waiting for stalled task to be reclaimed") + } + cancel() +} + +func TestStalledWorkerDeleteIsNoop(t *testing.T) { + // Worker1 claims a task, but stalls. Worker2 claims it later. Worker1 + // unstalls and completes, but does not delete the task from the queue: once + // worker2 has the task, only it may do the delete. It unstalls and finishes + // and we assert the task is deleted from the queue. + q := setup(t) + ctx := context.Background() + + if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil { + t.Fatal(err) + } + + worker1Claimed := make(chan struct{}) + worker1Stall := make(chan struct{}) + worker1Done := make(chan struct{}) + worker2Claimed := make(chan struct{}) + worker2Stall := make(chan struct{}) + worker2Done := make(chan struct{}) + + // Worker 1 claims the task and stalls. + go func() { + q.claimAndProcess(ctx, func(ctx context.Context, modulePath, version string) (int, error) { + close(worker1Claimed) + <-worker1Stall + return 200, nil + }) + close(worker1Done) + }() + + // Wait for worker 1 to enter processFunc, then backdate started_at so the + // task is eligible for reclaim. + <-worker1Claimed + if _, err := testDB.Exec(ctx, `UPDATE queue_tasks SET started_at = NOW() - INTERVAL '10 minutes'`); err != nil { + t.Fatal(err) + } + + // Worker 2 reclaims the task and stalls. + go func() { + q.claimAndProcess(ctx, func(ctx context.Context, modulePath, version string) (int, error) { + close(worker2Claimed) + <-worker2Stall + return 200, nil + }) + close(worker2Done) + }() + <-worker2Claimed + + // Unstall worker 1. Its delete should be a no-op because worker 2 has a + // newer started_at. + close(worker1Stall) + <-worker1Done + + var count int + if err := testDB.QueryRow(ctx, `SELECT count(*) FROM queue_tasks`).Scan(&count); err != nil { + t.Fatal(err) + } + if count != 1 { + t.Errorf("after worker 1: task count = %d, want 1 (worker 1 should not have deleted it)", count) + } + + // Unstall worker 2. Its delete should succeed. + close(worker2Stall) + <-worker2Done + + if err := testDB.QueryRow(ctx, `SELECT count(*) FROM queue_tasks`).Scan(&count); err != nil { + t.Fatal(err) + } + if count != 0 { + t.Errorf("after worker 2: task count = %d, want 0", count) + } +} |
