aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean Barkhuysen <jean.barkhuysen@gmail.com>2026-03-04 10:47:23 -0700
committerJonathan Amsterdam <jba@google.com>2026-04-08 09:33:33 -0700
commit372618454cdb62e4cbaab1fd14c58f2faf5db80a (patch)
tree20af08dd7bde88b3ff344261368d953b98cf774f
parente09b3908a3542531bb1a84b28406102957861650 (diff)
downloadgo-x-pkgsite-372618454cdb62e4cbaab1fd14c58f2faf5db80a.tar.xz
internal/queue: add postgres queue implementation, and use it in worker|frontend
Fixes golang/go#74027. Change-Id: I916ac81093e782d4eda21fe11ef47eeff4f5f0b1 Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/751480 Reviewed-by: Jonathan Amsterdam <jba@google.com> kokoro-CI: kokoro <noreply+kokoro@google.com> Reviewed-by: Ethan Lee <ethanalee@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
-rw-r--r--cmd/frontend/main.go33
-rw-r--r--cmd/worker/main.go37
-rw-r--r--go.mod6
-rw-r--r--go.sum10
-rw-r--r--internal/queue/pgqueue/queue.go140
-rw-r--r--internal/queue/pgqueue/queue_test.go289
6 files changed, 493 insertions, 22 deletions
diff --git a/cmd/frontend/main.go b/cmd/frontend/main.go
index 303b9616..1684981f 100644
--- a/cmd/frontend/main.go
+++ b/cmd/frontend/main.go
@@ -34,6 +34,7 @@ import (
"golang.org/x/pkgsite/internal/queue"
"golang.org/x/pkgsite/internal/queue/gcpqueue"
"golang.org/x/pkgsite/internal/queue/inmemqueue"
+ "golang.org/x/pkgsite/internal/queue/pgqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/static"
"golang.org/x/pkgsite/internal/trace"
@@ -54,6 +55,7 @@ var (
"as a direct backend, bypassing the database")
bypassLicenseCheck = flag.Bool("bypass_license_check", false, "display all information, even for non-redistributable paths")
hostAddr = flag.String("host", "localhost:8080", "Host address for the server")
+ queueType = flag.String("queue", "inmemory", `queue implementation when not on GCP: "inmemory" or "postgres"`)
)
func main() {
@@ -127,19 +129,30 @@ func main() {
}
fetchQueue = q
} else {
- experiments, err := expg(ctx)
- if err != nil {
- log.Fatalf(ctx, "error getting experiment: %v", err)
+ processFunc := func(ctx context.Context, modulePath, version string) (int, error) {
+ return fetchserver.FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, db)
}
- var names []string
- for _, e := range experiments {
- if e.Rollout > 0 {
- names = append(names, e.Name)
+ switch *queueType {
+ case "postgres":
+ q, err := pgqueue.New(ctx, db.Underlying())
+ if err != nil {
+ log.Fatalf(ctx, "error creating postgres queue: %v", err)
+ }
+ go q.Poll(ctx, *workers, processFunc)
+ fetchQueue = q
+ default:
+ experiments, err := expg(ctx)
+ if err != nil {
+ log.Fatalf(ctx, "error getting experiment: %v", err)
}
+ var names []string
+ for _, e := range experiments {
+ if e.Rollout > 0 {
+ names = append(names, e.Name)
+ }
+ }
+ fetchQueue = inmemqueue.New(ctx, *workers, names, processFunc)
}
- fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) {
- return fetchserver.FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, db)
- })
}
}
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 8f178aa9..bf732bf7 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -31,6 +31,7 @@ import (
"golang.org/x/pkgsite/internal/queue"
"golang.org/x/pkgsite/internal/queue/gcpqueue"
"golang.org/x/pkgsite/internal/queue/inmemqueue"
+ "golang.org/x/pkgsite/internal/queue/pgqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/trace"
"golang.org/x/pkgsite/internal/worker"
@@ -43,6 +44,7 @@ var (
// flag used in call to safehtml/template.TrustedSourceFromFlag
_ = flag.String("static", "static", "path to folder containing static files served")
bypassLicenseCheck = flag.Bool("bypass_license_check", false, "insert all data into the DB, even for non-redistributable paths")
+ queueType = flag.String("queue", "inmemory", `queue implementation when not on GCP: "inmemory" or "postgres"`)
// Ordinarily, index polling is initiated by a separate scheduler that calls
// /poll. But for convenience, you can instead have the worker periodically
@@ -107,25 +109,36 @@ func main() {
}
fetchQueue = q
} else {
- experiments, err := expg(ctx)
- if err != nil {
- log.Fatalf(ctx, "error getting experiment: %v", err)
- }
- var names []string
- for _, e := range experiments {
- if e.Rollout > 0 {
- names = append(names, e.Name)
- }
- }
f := &worker.Fetcher{
ProxyClient: proxyClient,
SourceClient: sourceClient,
DB: db,
}
- fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) {
+ processFunc := func(ctx context.Context, modulePath, version string) (int, error) {
code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel())
return code, err
- })
+ }
+ switch *queueType {
+ case "postgres":
+ q, err := pgqueue.New(ctx, db.Underlying())
+ if err != nil {
+ log.Fatalf(ctx, "error creating postgres queue: %v", err)
+ }
+ go q.Poll(ctx, *workers, processFunc)
+ fetchQueue = q
+ default:
+ experiments, err := expg(ctx)
+ if err != nil {
+ log.Fatalf(ctx, "error getting experiment: %v", err)
+ }
+ var names []string
+ for _, e := range experiments {
+ if e.Rollout > 0 {
+ names = append(names, e.Name)
+ }
+ }
+ fetchQueue = inmemqueue.New(ctx, *workers, names, processFunc)
+ }
}
reporter := cmdconfig.Reporter(ctx, cfg)
diff --git a/go.mod b/go.mod
index 069296b2..54fc3cc8 100644
--- a/go.mod
+++ b/go.mod
@@ -22,6 +22,7 @@ require (
github.com/google/go-replayers/httpreplay v1.0.0
github.com/google/licensecheck v0.3.1
github.com/google/safehtml v0.0.3-0.20211026203422-d6f0e11a5516
+ github.com/jackc/pgx/v4 v4.10.1
github.com/jackc/pgx/v5 v5.9.1
github.com/jba/templatecheck v0.6.0
github.com/lib/pq v1.12.0
@@ -64,8 +65,13 @@ require (
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.0 // indirect
+ github.com/jackc/chunkreader/v2 v2.0.1 // indirect
+ github.com/jackc/pgconn v1.8.0 // indirect
+ github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
+ github.com/jackc/pgproto3/v2 v2.0.7 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
+ github.com/jackc/pgtype v1.6.2 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/text v0.2.0 // indirect
diff --git a/go.sum b/go.sum
index ced6ae82..66f673c7 100644
--- a/go.sum
+++ b/go.sum
@@ -224,6 +224,7 @@ github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
+github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/cockroachdb/cockroach-go/v2 v2.1.1/go.mod h1:7NtUnP6eK+l6k483WSYNrq3Kb23bWV10IRV1TyeSpwM=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
@@ -465,6 +466,7 @@ github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
+github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/googleapis v1.2.0/go.mod h1:Njal3psf3qN6dwBtQfUmBZh2ybovJ0tlu3o/AC7HYjU=
github.com/gogo/googleapis v1.4.0/go.mod h1:5YRNX2z1oM5gXdAkurHa942MDgEJyk02w4OecKY87+c=
@@ -632,6 +634,7 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt
github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA=
github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo=
github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
+github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8=
github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA=
github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE=
@@ -639,9 +642,12 @@ github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsU
github.com/jackc/pgconn v1.4.0/go.mod h1:Y2O3ZDF0q4mMacyWV3AstPJpeHXWGEetiFttmq5lahk=
github.com/jackc/pgconn v1.5.0/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI=
github.com/jackc/pgconn v1.5.1-0.20200601181101-fa742c524853/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI=
+github.com/jackc/pgconn v1.8.0 h1:FmjZ0rOyXTr1wfWs45i4a9vjnjWUAGpMuQLD9OSs+lw=
github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfGIB/o=
github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
+github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
+github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2 h1:JVX6jT/XfzNqIjye4717ITLaNwV9mWbJx0dLCpcRzdA=
github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
@@ -652,6 +658,7 @@ github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvW
github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM=
github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
+github.com/jackc/pgproto3/v2 v2.0.7 h1:6Pwi1b3QdY65cuv6SyVO0FgPd5J3Bl7wf/nQQjinHMA=
github.com/jackc/pgproto3/v2 v2.0.7/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E=
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E=
@@ -663,6 +670,7 @@ github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrU
github.com/jackc/pgtype v1.2.0/go.mod h1:5m2OfMh1wTK7x+Fk952IDmI4nw3nPrvtQdM0ZT4WpC0=
github.com/jackc/pgtype v1.3.1-0.20200510190516-8cd94a14c75a/go.mod h1:vaogEUkALtxZMCH411K+tKzNpwzCKU+AnPzBKZ+I+Po=
github.com/jackc/pgtype v1.3.1-0.20200606141011-f6355165a91c/go.mod h1:cvk9Bgu/VzJ9/lxTO5R5sf80p0DiucVtN7ZxvaC4GmQ=
+github.com/jackc/pgtype v1.6.2 h1:b3pDeuhbbzBYcg5kwNmNDun4pFUD/0AAr1kLXZLeNt8=
github.com/jackc/pgtype v1.6.2/go.mod h1:JCULISAZBFGrHaOXIIFiyfzW5VY0GRitRr8NeJsrdig=
github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y=
github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM=
@@ -670,6 +678,7 @@ github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQ
github.com/jackc/pgx/v4 v4.5.0/go.mod h1:EpAKPLdnTorwmPUUsqrPxy5fphV18j9q3wrfRXgo+kA=
github.com/jackc/pgx/v4 v4.6.1-0.20200510190926-94ba730bb1e9/go.mod h1:t3/cdRQl6fOLDxqtlyhe9UWgfIi9R8+8v8GKV5TRA/o=
github.com/jackc/pgx/v4 v4.6.1-0.20200606145419-4e5062306904/go.mod h1:ZDaNWkt9sW1JMiNn0kdYBaLelIhw7Pg4qd+Vk6tw7Hg=
+github.com/jackc/pgx/v4 v4.10.1 h1:/6Q3ye4myIj6AaplUm+eRcz4OhK9HAvFf4ePsG40LJY=
github.com/jackc/pgx/v4 v4.10.1/go.mod h1:QlrWebbs3kqEZPHCTGyxecvzG6tvIsYu+A5b1raylkA=
github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc=
github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
@@ -935,6 +944,7 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
+github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.0.4-0.20170822132746-89742aefa4b2/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
diff --git a/internal/queue/pgqueue/queue.go b/internal/queue/pgqueue/queue.go
new file mode 100644
index 00000000..e4e07ddc
--- /dev/null
+++ b/internal/queue/pgqueue/queue.go
@@ -0,0 +1,140 @@
+// Copyright 2026 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package pgqueue provides a Postgres-backed queue implementation for
+// scheduling and processing fetch actions. It supports multiple concurrent
+// workers (processes or goroutines)
+package pgqueue
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "golang.org/x/pkgsite/internal/database"
+ "golang.org/x/pkgsite/internal/log"
+ "golang.org/x/pkgsite/internal/queue"
+)
+
+// The frequency at which we poll for work.
+const pollInterval = 5 * time.Second
+
+// ProcessFunc is the function signature for processing dequeued work.
+type ProcessFunc func(ctx context.Context, modulePath, version string) (int, error)
+
+// Queue implements the Queue interface backed by a Postgres table. It is safe
+// for concurrent use by multiple goroutines and processes.
+type Queue struct {
+ db *database.DB
+}
+
+// New creates the queue_tasks table if it doesn't exist and returns a Queue.
+func New(ctx context.Context, db *database.DB) (*Queue, error) {
+ // TODO(jbarkhuysen): If we find it onerous to do table updates over time, we
+ // may want to consider alternatives to doing this here.
+ if _, err := db.Exec(ctx, createTableQuery); err != nil {
+ return nil, fmt.Errorf("pgqueue.New: creating table: %w", err)
+ }
+ return &Queue{db: db}, nil
+}
+
+const createTableQuery = `
+CREATE TABLE IF NOT EXISTS queue_tasks (
+ id BIGSERIAL PRIMARY KEY,
+ task_name TEXT UNIQUE NOT NULL,
+ module_path TEXT NOT NULL,
+ version TEXT NOT NULL,
+ created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
+ started_at TIMESTAMPTZ
+);
+CREATE INDEX IF NOT EXISTS idx_queue_tasks_started_created
+ON queue_tasks (started_at, created_at);`
+
+// ScheduleFetch inserts a task into queue_tasks. It returns (true, nil) if the
+// task was inserted, or (false, nil) if it was a duplicate.
+func (q *Queue) ScheduleFetch(ctx context.Context, modulePath, version string, opts *queue.Options) (bool, error) {
+ taskName := modulePath + "@" + version
+ if opts != nil && opts.Suffix != "" {
+ taskName += "-" + opts.Suffix
+ }
+ n, err := q.db.Exec(ctx,
+ `INSERT INTO queue_tasks (task_name, module_path, version) VALUES ($1, $2, $3) ON CONFLICT (task_name) DO NOTHING`,
+ taskName, modulePath, version)
+ if err != nil {
+ return false, fmt.Errorf("pgqueue.ScheduleFetch(%q, %q): %w", modulePath, version, err)
+ }
+ return n == 1, nil
+}
+
+// Poll starts background polling for work. It spawns the given number of worker
+// goroutines, each of which periodically claims a task, runs processFunc, and
+// deletes the task on completion. It blocks until ctx is cancelled.
+func (q *Queue) Poll(ctx context.Context, workers int, processFunc ProcessFunc) {
+ wg := sync.WaitGroup{}
+ for range workers {
+ wg.Go(func() {
+ // Periodically claim work.
+ ticker := time.NewTicker(pollInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ q.claimAndProcess(ctx, processFunc)
+ }
+ }
+ })
+ }
+ wg.Wait()
+}
+
+// TODO(jbarkhuysen): 5m stall timeout is baked in; we might want to make it
+// variable in the future.
+const dequeueQuery = `
+WITH next_task AS (
+ SELECT id
+ FROM queue_tasks
+ WHERE started_at IS NULL
+ OR started_at + INTERVAL '5 minutes' < NOW()
+ ORDER BY created_at ASC
+ LIMIT 1
+ FOR UPDATE SKIP LOCKED
+)
+UPDATE queue_tasks
+SET started_at = NOW()
+WHERE id = (SELECT id FROM next_task)
+RETURNING id, module_path, version, started_at`
+
+func (q *Queue) claimAndProcess(ctx context.Context, processFunc ProcessFunc) {
+ var id int64
+ var modulePath, version string
+ var startedAt time.Time
+ err := q.db.QueryRow(ctx, dequeueQuery).Scan(&id, &modulePath, &version, &startedAt)
+ if errors.Is(err, sql.ErrNoRows) {
+ return // There's no work: no-op.
+ }
+ if err != nil {
+ log.Errorf(ctx, "pgqueue: dequeue: %v", err)
+ return
+ }
+
+ log.Infof(ctx, "pgqueue: processing %s@%s (task %d)", modulePath, version, id)
+ code, err := processFunc(ctx, modulePath, version)
+ if err != nil {
+ log.Errorf(ctx, "pgqueue: processing %s@%s: status=%d err=%v", modulePath, version, code, err)
+ // This still gets removed (delete below) so that we don't endlessly
+ // fail the same work item.
+ }
+
+ // Use a background context for cleanup so the delete succeeds even if
+ // the poll context has been cancelled.
+ delCtx := context.Background()
+ if _, err := q.db.Exec(delCtx, `DELETE FROM queue_tasks WHERE id = $1 AND started_at = $2`, id, startedAt); err != nil {
+ log.Errorf(delCtx, "pgqueue: deleting task %d: %v", id, err)
+ }
+}
diff --git a/internal/queue/pgqueue/queue_test.go b/internal/queue/pgqueue/queue_test.go
new file mode 100644
index 00000000..b1621435
--- /dev/null
+++ b/internal/queue/pgqueue/queue_test.go
@@ -0,0 +1,289 @@
+// Copyright 2026 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pgqueue
+
+import (
+ "context"
+ "errors"
+ "log"
+ "os"
+ "sync"
+ "testing"
+ "time"
+
+ _ "github.com/jackc/pgx/v4/stdlib"
+
+ "golang.org/x/pkgsite/internal/database"
+ "golang.org/x/pkgsite/internal/derrors"
+ "golang.org/x/pkgsite/internal/queue"
+)
+
+const testDBName = "discovery_pgqueue_test"
+
+var testDB *database.DB
+
+func TestMain(m *testing.M) {
+ if os.Getenv("GO_DISCOVERY_TESTDB") != "true" {
+ log.Printf("SKIPPING: GO_DISCOVERY_TESTDB is not set to true")
+ return
+ }
+ if err := database.CreateDBIfNotExists(testDBName); err != nil {
+ if errors.Is(err, derrors.NotFound) {
+ log.Printf("SKIPPING: could not connect to DB: %v", err)
+ return
+ }
+ log.Fatal(err)
+ }
+ db, err := database.Open("pgx", database.DBConnURI(testDBName), "test")
+ if err != nil {
+ log.Fatal(err)
+ }
+ testDB = db
+ os.Exit(m.Run())
+}
+
+func setup(t *testing.T) *Queue {
+ t.Helper()
+ ctx := context.Background()
+ if testDB == nil {
+ t.Skip("test database not available")
+ }
+ // Drop and recreate the table for a clean slate.
+ if _, err := testDB.Exec(ctx, `DROP TABLE IF EXISTS queue_tasks`); err != nil {
+ t.Fatal(err)
+ }
+ q, err := New(ctx, testDB)
+ if err != nil {
+ t.Fatal(err)
+ }
+ return q
+}
+
+func TestScheduleFetch(t *testing.T) {
+ q := setup(t)
+ ctx := context.Background()
+
+ enqueued, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !enqueued {
+ t.Error("ScheduleFetch() = false, want true")
+ }
+
+ // Same module@version should be deduplicated.
+ enqueued, err = q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if enqueued {
+ t.Error("ScheduleFetch() duplicate = true, want false")
+ }
+}
+
+func TestScheduleFetchWithSuffix(t *testing.T) {
+ q := setup(t)
+ ctx := context.Background()
+
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
+ t.Fatal(err)
+ }
+
+ // Same module@version with a suffix should not be deduplicated.
+ enqueued, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", &queue.Options{Suffix: "reprocess-1"})
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !enqueued {
+ t.Error("ScheduleFetch() with suffix = false, want true")
+ }
+}
+
+func TestPollProcessesTasks(t *testing.T) {
+ q := setup(t)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
+ t.Fatal(err)
+ }
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/net", "v0.1.0", nil); err != nil {
+ t.Fatal(err)
+ }
+
+ var mu sync.Mutex
+ var processed []string
+
+ go q.Poll(ctx, 2, func(ctx context.Context, modulePath, version string) (int, error) {
+ mu.Lock()
+ processed = append(processed, modulePath+"@"+version)
+ mu.Unlock()
+ return 200, nil
+ })
+
+ // Wait for tasks to be processed.
+ deadline := time.After(30 * time.Second)
+ for {
+ select {
+ case <-deadline:
+ t.Fatal("timed out waiting for tasks to be processed")
+ case <-time.After(100 * time.Millisecond):
+ mu.Lock()
+ n := len(processed)
+ mu.Unlock()
+ if n == 2 {
+ cancel()
+ return
+ }
+ }
+ }
+}
+
+func TestPollDeletesTaskOnError(t *testing.T) {
+ q := setup(t)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
+ t.Fatal(err)
+ }
+
+ done := make(chan struct{})
+ go func() {
+ q.Poll(ctx, 1, func(ctx context.Context, modulePath, version string) (int, error) {
+ close(done)
+ return 500, errors.New("something went wrong")
+ })
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(30 * time.Second):
+ t.Fatal("timed out waiting for task to be processed")
+ }
+ cancel()
+
+ // Verify the task was deleted despite the error.
+ var count int
+ err := testDB.QueryRow(context.Background(), `SELECT count(*) FROM queue_tasks`).Scan(&count)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Allow a brief moment for the delete to complete.
+ time.Sleep(100 * time.Millisecond)
+ err = testDB.QueryRow(context.Background(), `SELECT count(*) FROM queue_tasks`).Scan(&count)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if count != 0 {
+ t.Errorf("queue_tasks count = %d, want 0", count)
+ }
+}
+
+func TestPollReclaimsStalledTasks(t *testing.T) {
+ q := setup(t)
+ ctx := context.Background()
+
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
+ t.Fatal(err)
+ }
+
+ // Simulate a stalled task by setting started_at to the past.
+ if _, err := testDB.Exec(ctx,
+ `UPDATE queue_tasks SET started_at = NOW() - INTERVAL '10 minutes'`); err != nil {
+ t.Fatal(err)
+ }
+
+ pollCtx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ done := make(chan struct{})
+ go func() {
+ q.Poll(pollCtx, 1, func(ctx context.Context, modulePath, version string) (int, error) {
+ close(done)
+ return 200, nil
+ })
+ }()
+
+ select {
+ case <-done:
+ // Task was reclaimed and processed.
+ case <-time.After(30 * time.Second):
+ t.Fatal("timed out waiting for stalled task to be reclaimed")
+ }
+ cancel()
+}
+
+func TestStalledWorkerDeleteIsNoop(t *testing.T) {
+ // Worker1 claims a task, but stalls. Worker2 claims it later. Worker1
+ // unstalls and completes, but does not delete the task from the queue: once
+ // worker2 has the task, only it may do the delete. It unstalls and finishes
+ // and we assert the task is deleted from the queue.
+ q := setup(t)
+ ctx := context.Background()
+
+ if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
+ t.Fatal(err)
+ }
+
+ worker1Claimed := make(chan struct{})
+ worker1Stall := make(chan struct{})
+ worker1Done := make(chan struct{})
+ worker2Claimed := make(chan struct{})
+ worker2Stall := make(chan struct{})
+ worker2Done := make(chan struct{})
+
+ // Worker 1 claims the task and stalls.
+ go func() {
+ q.claimAndProcess(ctx, func(ctx context.Context, modulePath, version string) (int, error) {
+ close(worker1Claimed)
+ <-worker1Stall
+ return 200, nil
+ })
+ close(worker1Done)
+ }()
+
+ // Wait for worker 1 to enter processFunc, then backdate started_at so the
+ // task is eligible for reclaim.
+ <-worker1Claimed
+ if _, err := testDB.Exec(ctx, `UPDATE queue_tasks SET started_at = NOW() - INTERVAL '10 minutes'`); err != nil {
+ t.Fatal(err)
+ }
+
+ // Worker 2 reclaims the task and stalls.
+ go func() {
+ q.claimAndProcess(ctx, func(ctx context.Context, modulePath, version string) (int, error) {
+ close(worker2Claimed)
+ <-worker2Stall
+ return 200, nil
+ })
+ close(worker2Done)
+ }()
+ <-worker2Claimed
+
+ // Unstall worker 1. Its delete should be a no-op because worker 2 has a
+ // newer started_at.
+ close(worker1Stall)
+ <-worker1Done
+
+ var count int
+ if err := testDB.QueryRow(ctx, `SELECT count(*) FROM queue_tasks`).Scan(&count); err != nil {
+ t.Fatal(err)
+ }
+ if count != 1 {
+ t.Errorf("after worker 1: task count = %d, want 1 (worker 1 should not have deleted it)", count)
+ }
+
+ // Unstall worker 2. Its delete should succeed.
+ close(worker2Stall)
+ <-worker2Done
+
+ if err := testDB.QueryRow(ctx, `SELECT count(*) FROM queue_tasks`).Scan(&count); err != nil {
+ t.Fatal(err)
+ }
+ if count != 0 {
+ t.Errorf("after worker 2: task count = %d, want 0", count)
+ }
+}