diff options
| author | Jean Barkhuysen <jean.barkhuysen@gmail.com> | 2026-03-04 10:47:23 -0700 |
|---|---|---|
| committer | Jonathan Amsterdam <jba@google.com> | 2026-04-08 09:33:33 -0700 |
| commit | 372618454cdb62e4cbaab1fd14c58f2faf5db80a (patch) | |
| tree | 20af08dd7bde88b3ff344261368d953b98cf774f | |
| parent | e09b3908a3542531bb1a84b28406102957861650 (diff) | |
| download | go-x-pkgsite-372618454cdb62e4cbaab1fd14c58f2faf5db80a.tar.xz | |
internal/queue: add postgres queue implementation, and use it in worker|frontend
Fixes golang/go#74027.
Change-Id: I916ac81093e782d4eda21fe11ef47eeff4f5f0b1
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/751480
Reviewed-by: Jonathan Amsterdam <jba@google.com>
kokoro-CI: kokoro <noreply+kokoro@google.com>
Reviewed-by: Ethan Lee <ethanalee@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
| -rw-r--r-- | cmd/frontend/main.go | 33 | ||||
| -rw-r--r-- | cmd/worker/main.go | 37 | ||||
| -rw-r--r-- | go.mod | 6 | ||||
| -rw-r--r-- | go.sum | 10 | ||||
| -rw-r--r-- | internal/queue/pgqueue/queue.go | 140 | ||||
| -rw-r--r-- | internal/queue/pgqueue/queue_test.go | 289 |
6 files changed, 493 insertions, 22 deletions
diff --git a/cmd/frontend/main.go b/cmd/frontend/main.go index 303b9616..1684981f 100644 --- a/cmd/frontend/main.go +++ b/cmd/frontend/main.go @@ -34,6 +34,7 @@ import ( "golang.org/x/pkgsite/internal/queue" "golang.org/x/pkgsite/internal/queue/gcpqueue" "golang.org/x/pkgsite/internal/queue/inmemqueue" + "golang.org/x/pkgsite/internal/queue/pgqueue" "golang.org/x/pkgsite/internal/source" "golang.org/x/pkgsite/internal/static" "golang.org/x/pkgsite/internal/trace" @@ -54,6 +55,7 @@ var ( "as a direct backend, bypassing the database") bypassLicenseCheck = flag.Bool("bypass_license_check", false, "display all information, even for non-redistributable paths") hostAddr = flag.String("host", "localhost:8080", "Host address for the server") + queueType = flag.String("queue", "inmemory", `queue implementation when not on GCP: "inmemory" or "postgres"`) ) func main() { @@ -127,19 +129,30 @@ func main() { } fetchQueue = q } else { - experiments, err := expg(ctx) - if err != nil { - log.Fatalf(ctx, "error getting experiment: %v", err) + processFunc := func(ctx context.Context, modulePath, version string) (int, error) { + return fetchserver.FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, db) } - var names []string - for _, e := range experiments { - if e.Rollout > 0 { - names = append(names, e.Name) + switch *queueType { + case "postgres": + q, err := pgqueue.New(ctx, db.Underlying()) + if err != nil { + log.Fatalf(ctx, "error creating postgres queue: %v", err) + } + go q.Poll(ctx, *workers, processFunc) + fetchQueue = q + default: + experiments, err := expg(ctx) + if err != nil { + log.Fatalf(ctx, "error getting experiment: %v", err) } + var names []string + for _, e := range experiments { + if e.Rollout > 0 { + names = append(names, e.Name) + } + } + fetchQueue = inmemqueue.New(ctx, *workers, names, processFunc) } - fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) { - return fetchserver.FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, db) - }) } } diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 8f178aa9..bf732bf7 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -31,6 +31,7 @@ import ( "golang.org/x/pkgsite/internal/queue" "golang.org/x/pkgsite/internal/queue/gcpqueue" "golang.org/x/pkgsite/internal/queue/inmemqueue" + "golang.org/x/pkgsite/internal/queue/pgqueue" "golang.org/x/pkgsite/internal/source" "golang.org/x/pkgsite/internal/trace" "golang.org/x/pkgsite/internal/worker" @@ -43,6 +44,7 @@ var ( // flag used in call to safehtml/template.TrustedSourceFromFlag _ = flag.String("static", "static", "path to folder containing static files served") bypassLicenseCheck = flag.Bool("bypass_license_check", false, "insert all data into the DB, even for non-redistributable paths") + queueType = flag.String("queue", "inmemory", `queue implementation when not on GCP: "inmemory" or "postgres"`) // Ordinarily, index polling is initiated by a separate scheduler that calls // /poll. But for convenience, you can instead have the worker periodically @@ -107,25 +109,36 @@ func main() { } fetchQueue = q } else { - experiments, err := expg(ctx) - if err != nil { - log.Fatalf(ctx, "error getting experiment: %v", err) - } - var names []string - for _, e := range experiments { - if e.Rollout > 0 { - names = append(names, e.Name) - } - } f := &worker.Fetcher{ ProxyClient: proxyClient, SourceClient: sourceClient, DB: db, } - fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) { + processFunc := func(ctx context.Context, modulePath, version string) (int, error) { code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel()) return code, err - }) + } + switch *queueType { + case "postgres": + q, err := pgqueue.New(ctx, db.Underlying()) + if err != nil { + log.Fatalf(ctx, "error creating postgres queue: %v", err) + } + go q.Poll(ctx, *workers, processFunc) + fetchQueue = q + default: + experiments, err := expg(ctx) + if err != nil { + log.Fatalf(ctx, "error getting experiment: %v", err) + } + var names []string + for _, e := range experiments { + if e.Rollout > 0 { + names = append(names, e.Name) + } + } + fetchQueue = inmemqueue.New(ctx, *workers, names, processFunc) + } } reporter := cmdconfig.Reporter(ctx, cfg) @@ -22,6 +22,7 @@ require ( github.com/google/go-replayers/httpreplay v1.0.0 github.com/google/licensecheck v0.3.1 github.com/google/safehtml v0.0.3-0.20211026203422-d6f0e11a5516 + github.com/jackc/pgx/v4 v4.10.1 github.com/jackc/pgx/v5 v5.9.1 github.com/jba/templatecheck v0.6.0 github.com/lib/pq v1.12.0 @@ -64,8 +65,13 @@ require ( github.com/googleapis/gax-go/v2 v2.11.0 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.0 // indirect + github.com/jackc/chunkreader/v2 v2.0.1 // indirect + github.com/jackc/pgconn v1.8.0 // indirect + github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgproto3/v2 v2.0.7 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/pgtype v1.6.2 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/kr/text v0.2.0 // indirect @@ -224,6 +224,7 @@ github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/cockroachdb/cockroach-go/v2 v2.1.1/go.mod h1:7NtUnP6eK+l6k483WSYNrq3Kb23bWV10IRV1TyeSpwM= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= @@ -465,6 +466,7 @@ github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6 github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/googleapis v1.2.0/go.mod h1:Njal3psf3qN6dwBtQfUmBZh2ybovJ0tlu3o/AC7HYjU= github.com/gogo/googleapis v1.4.0/go.mod h1:5YRNX2z1oM5gXdAkurHa942MDgEJyk02w4OecKY87+c= @@ -632,6 +634,7 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt github.com/j-keck/arping v0.0.0-20160618110441-2cf9dc699c56/go.mod h1:ymszkNOg6tORTn+6F6j+Jc8TOr5osrynvN6ivFWZ2GA= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA= github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE= @@ -639,9 +642,12 @@ github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsU github.com/jackc/pgconn v1.4.0/go.mod h1:Y2O3ZDF0q4mMacyWV3AstPJpeHXWGEetiFttmq5lahk= github.com/jackc/pgconn v1.5.0/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI= github.com/jackc/pgconn v1.5.1-0.20200601181101-fa742c524853/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI= +github.com/jackc/pgconn v1.8.0 h1:FmjZ0rOyXTr1wfWs45i4a9vjnjWUAGpMuQLD9OSs+lw= github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfGIB/o= github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2 h1:JVX6jT/XfzNqIjye4717ITLaNwV9mWbJx0dLCpcRzdA= github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= @@ -652,6 +658,7 @@ github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvW github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgproto3/v2 v2.0.7 h1:6Pwi1b3QdY65cuv6SyVO0FgPd5J3Bl7wf/nQQjinHMA= github.com/jackc/pgproto3/v2 v2.0.7/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= @@ -663,6 +670,7 @@ github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrU github.com/jackc/pgtype v1.2.0/go.mod h1:5m2OfMh1wTK7x+Fk952IDmI4nw3nPrvtQdM0ZT4WpC0= github.com/jackc/pgtype v1.3.1-0.20200510190516-8cd94a14c75a/go.mod h1:vaogEUkALtxZMCH411K+tKzNpwzCKU+AnPzBKZ+I+Po= github.com/jackc/pgtype v1.3.1-0.20200606141011-f6355165a91c/go.mod h1:cvk9Bgu/VzJ9/lxTO5R5sf80p0DiucVtN7ZxvaC4GmQ= +github.com/jackc/pgtype v1.6.2 h1:b3pDeuhbbzBYcg5kwNmNDun4pFUD/0AAr1kLXZLeNt8= github.com/jackc/pgtype v1.6.2/go.mod h1:JCULISAZBFGrHaOXIIFiyfzW5VY0GRitRr8NeJsrdig= github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y= github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= @@ -670,6 +678,7 @@ github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQ github.com/jackc/pgx/v4 v4.5.0/go.mod h1:EpAKPLdnTorwmPUUsqrPxy5fphV18j9q3wrfRXgo+kA= github.com/jackc/pgx/v4 v4.6.1-0.20200510190926-94ba730bb1e9/go.mod h1:t3/cdRQl6fOLDxqtlyhe9UWgfIi9R8+8v8GKV5TRA/o= github.com/jackc/pgx/v4 v4.6.1-0.20200606145419-4e5062306904/go.mod h1:ZDaNWkt9sW1JMiNn0kdYBaLelIhw7Pg4qd+Vk6tw7Hg= +github.com/jackc/pgx/v4 v4.10.1 h1:/6Q3ye4myIj6AaplUm+eRcz4OhK9HAvFf4ePsG40LJY= github.com/jackc/pgx/v4 v4.10.1/go.mod h1:QlrWebbs3kqEZPHCTGyxecvzG6tvIsYu+A5b1raylkA= github.com/jackc/pgx/v5 v5.9.1 h1:uwrxJXBnx76nyISkhr33kQLlUqjv7et7b9FjCen/tdc= github.com/jackc/pgx/v5 v5.9.1/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4= @@ -935,6 +944,7 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.0.4-0.20170822132746-89742aefa4b2/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= diff --git a/internal/queue/pgqueue/queue.go b/internal/queue/pgqueue/queue.go new file mode 100644 index 00000000..e4e07ddc --- /dev/null +++ b/internal/queue/pgqueue/queue.go @@ -0,0 +1,140 @@ +// Copyright 2026 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package pgqueue provides a Postgres-backed queue implementation for +// scheduling and processing fetch actions. It supports multiple concurrent +// workers (processes or goroutines) +package pgqueue + +import ( + "context" + "database/sql" + "errors" + "fmt" + "sync" + "time" + + "golang.org/x/pkgsite/internal/database" + "golang.org/x/pkgsite/internal/log" + "golang.org/x/pkgsite/internal/queue" +) + +// The frequency at which we poll for work. +const pollInterval = 5 * time.Second + +// ProcessFunc is the function signature for processing dequeued work. +type ProcessFunc func(ctx context.Context, modulePath, version string) (int, error) + +// Queue implements the Queue interface backed by a Postgres table. It is safe +// for concurrent use by multiple goroutines and processes. +type Queue struct { + db *database.DB +} + +// New creates the queue_tasks table if it doesn't exist and returns a Queue. +func New(ctx context.Context, db *database.DB) (*Queue, error) { + // TODO(jbarkhuysen): If we find it onerous to do table updates over time, we + // may want to consider alternatives to doing this here. + if _, err := db.Exec(ctx, createTableQuery); err != nil { + return nil, fmt.Errorf("pgqueue.New: creating table: %w", err) + } + return &Queue{db: db}, nil +} + +const createTableQuery = ` +CREATE TABLE IF NOT EXISTS queue_tasks ( + id BIGSERIAL PRIMARY KEY, + task_name TEXT UNIQUE NOT NULL, + module_path TEXT NOT NULL, + version TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + started_at TIMESTAMPTZ +); +CREATE INDEX IF NOT EXISTS idx_queue_tasks_started_created +ON queue_tasks (started_at, created_at);` + +// ScheduleFetch inserts a task into queue_tasks. It returns (true, nil) if the +// task was inserted, or (false, nil) if it was a duplicate. +func (q *Queue) ScheduleFetch(ctx context.Context, modulePath, version string, opts *queue.Options) (bool, error) { + taskName := modulePath + "@" + version + if opts != nil && opts.Suffix != "" { + taskName += "-" + opts.Suffix + } + n, err := q.db.Exec(ctx, + `INSERT INTO queue_tasks (task_name, module_path, version) VALUES ($1, $2, $3) ON CONFLICT (task_name) DO NOTHING`, + taskName, modulePath, version) + if err != nil { + return false, fmt.Errorf("pgqueue.ScheduleFetch(%q, %q): %w", modulePath, version, err) + } + return n == 1, nil +} + +// Poll starts background polling for work. It spawns the given number of worker +// goroutines, each of which periodically claims a task, runs processFunc, and +// deletes the task on completion. It blocks until ctx is cancelled. +func (q *Queue) Poll(ctx context.Context, workers int, processFunc ProcessFunc) { + wg := sync.WaitGroup{} + for range workers { + wg.Go(func() { + // Periodically claim work. + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + q.claimAndProcess(ctx, processFunc) + } + } + }) + } + wg.Wait() +} + +// TODO(jbarkhuysen): 5m stall timeout is baked in; we might want to make it +// variable in the future. +const dequeueQuery = ` +WITH next_task AS ( + SELECT id + FROM queue_tasks + WHERE started_at IS NULL + OR started_at + INTERVAL '5 minutes' < NOW() + ORDER BY created_at ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED +) +UPDATE queue_tasks +SET started_at = NOW() +WHERE id = (SELECT id FROM next_task) +RETURNING id, module_path, version, started_at` + +func (q *Queue) claimAndProcess(ctx context.Context, processFunc ProcessFunc) { + var id int64 + var modulePath, version string + var startedAt time.Time + err := q.db.QueryRow(ctx, dequeueQuery).Scan(&id, &modulePath, &version, &startedAt) + if errors.Is(err, sql.ErrNoRows) { + return // There's no work: no-op. + } + if err != nil { + log.Errorf(ctx, "pgqueue: dequeue: %v", err) + return + } + + log.Infof(ctx, "pgqueue: processing %s@%s (task %d)", modulePath, version, id) + code, err := processFunc(ctx, modulePath, version) + if err != nil { + log.Errorf(ctx, "pgqueue: processing %s@%s: status=%d err=%v", modulePath, version, code, err) + // This still gets removed (delete below) so that we don't endlessly + // fail the same work item. + } + + // Use a background context for cleanup so the delete succeeds even if + // the poll context has been cancelled. + delCtx := context.Background() + if _, err := q.db.Exec(delCtx, `DELETE FROM queue_tasks WHERE id = $1 AND started_at = $2`, id, startedAt); err != nil { + log.Errorf(delCtx, "pgqueue: deleting task %d: %v", id, err) + } +} diff --git a/internal/queue/pgqueue/queue_test.go b/internal/queue/pgqueue/queue_test.go new file mode 100644 index 00000000..b1621435 --- /dev/null +++ b/internal/queue/pgqueue/queue_test.go @@ -0,0 +1,289 @@ +// Copyright 2026 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package pgqueue + +import ( + "context" + "errors" + "log" + "os" + "sync" + "testing" + "time" + + _ "github.com/jackc/pgx/v4/stdlib" + + "golang.org/x/pkgsite/internal/database" + "golang.org/x/pkgsite/internal/derrors" + "golang.org/x/pkgsite/internal/queue" +) + +const testDBName = "discovery_pgqueue_test" + +var testDB *database.DB + +func TestMain(m *testing.M) { + if os.Getenv("GO_DISCOVERY_TESTDB") != "true" { + log.Printf("SKIPPING: GO_DISCOVERY_TESTDB is not set to true") + return + } + if err := database.CreateDBIfNotExists(testDBName); err != nil { + if errors.Is(err, derrors.NotFound) { + log.Printf("SKIPPING: could not connect to DB: %v", err) + return + } + log.Fatal(err) + } + db, err := database.Open("pgx", database.DBConnURI(testDBName), "test") + if err != nil { + log.Fatal(err) + } + testDB = db + os.Exit(m.Run()) +} + +func setup(t *testing.T) *Queue { + t.Helper() + ctx := context.Background() + if testDB == nil { + t.Skip("test database not available") + } + // Drop and recreate the table for a clean slate. + if _, err := testDB.Exec(ctx, `DROP TABLE IF EXISTS queue_tasks`); err != nil { + t.Fatal(err) + } + q, err := New(ctx, testDB) + if err != nil { + t.Fatal(err) + } + return q +} + +func TestScheduleFetch(t *testing.T) { + q := setup(t) + ctx := context.Background() + + enqueued, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil) + if err != nil { + t.Fatal(err) + } + if !enqueued { + t.Error("ScheduleFetch() = false, want true") + } + + // Same module@version should be deduplicated. + enqueued, err = q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil) + if err != nil { + t.Fatal(err) + } + if enqueued { + t.Error("ScheduleFetch() duplicate = true, want false") + } +} + +func TestScheduleFetchWithSuffix(t *testing.T) { + q := setup(t) + ctx := context.Background() + + if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil { + t.Fatal(err) + } + + // Same module@version with a suffix should not be deduplicated. + enqueued, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", &queue.Options{Suffix: "reprocess-1"}) + if err != nil { + t.Fatal(err) + } + if !enqueued { + t.Error("ScheduleFetch() with suffix = false, want true") + } +} + +func TestPollProcessesTasks(t *testing.T) { + q := setup(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil { + t.Fatal(err) + } + if _, err := q.ScheduleFetch(ctx, "golang.org/x/net", "v0.1.0", nil); err != nil { + t.Fatal(err) + } + + var mu sync.Mutex + var processed []string + + go q.Poll(ctx, 2, func(ctx context.Context, modulePath, version string) (int, error) { + mu.Lock() + processed = append(processed, modulePath+"@"+version) + mu.Unlock() + return 200, nil + }) + + // Wait for tasks to be processed. + deadline := time.After(30 * time.Second) + for { + select { + case <-deadline: + t.Fatal("timed out waiting for tasks to be processed") + case <-time.After(100 * time.Millisecond): + mu.Lock() + n := len(processed) + mu.Unlock() + if n == 2 { + cancel() + return + } + } + } +} + +func TestPollDeletesTaskOnError(t *testing.T) { + q := setup(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil { + t.Fatal(err) + } + + done := make(chan struct{}) + go func() { + q.Poll(ctx, 1, func(ctx context.Context, modulePath, version string) (int, error) { + close(done) + return 500, errors.New("something went wrong") + }) + }() + + select { + case <-done: + case <-time.After(30 * time.Second): + t.Fatal("timed out waiting for task to be processed") + } + cancel() + + // Verify the task was deleted despite the error. + var count int + err := testDB.QueryRow(context.Background(), `SELECT count(*) FROM queue_tasks`).Scan(&count) + if err != nil { + t.Fatal(err) + } + // Allow a brief moment for the delete to complete. + time.Sleep(100 * time.Millisecond) + err = testDB.QueryRow(context.Background(), `SELECT count(*) FROM queue_tasks`).Scan(&count) + if err != nil { + t.Fatal(err) + } + if count != 0 { + t.Errorf("queue_tasks count = %d, want 0", count) + } +} + +func TestPollReclaimsStalledTasks(t *testing.T) { + q := setup(t) + ctx := context.Background() + + if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil { + t.Fatal(err) + } + + // Simulate a stalled task by setting started_at to the past. + if _, err := testDB.Exec(ctx, + `UPDATE queue_tasks SET started_at = NOW() - INTERVAL '10 minutes'`); err != nil { + t.Fatal(err) + } + + pollCtx, cancel := context.WithCancel(ctx) + defer cancel() + + done := make(chan struct{}) + go func() { + q.Poll(pollCtx, 1, func(ctx context.Context, modulePath, version string) (int, error) { + close(done) + return 200, nil + }) + }() + + select { + case <-done: + // Task was reclaimed and processed. + case <-time.After(30 * time.Second): + t.Fatal("timed out waiting for stalled task to be reclaimed") + } + cancel() +} + +func TestStalledWorkerDeleteIsNoop(t *testing.T) { + // Worker1 claims a task, but stalls. Worker2 claims it later. Worker1 + // unstalls and completes, but does not delete the task from the queue: once + // worker2 has the task, only it may do the delete. It unstalls and finishes + // and we assert the task is deleted from the queue. + q := setup(t) + ctx := context.Background() + + if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil { + t.Fatal(err) + } + + worker1Claimed := make(chan struct{}) + worker1Stall := make(chan struct{}) + worker1Done := make(chan struct{}) + worker2Claimed := make(chan struct{}) + worker2Stall := make(chan struct{}) + worker2Done := make(chan struct{}) + + // Worker 1 claims the task and stalls. + go func() { + q.claimAndProcess(ctx, func(ctx context.Context, modulePath, version string) (int, error) { + close(worker1Claimed) + <-worker1Stall + return 200, nil + }) + close(worker1Done) + }() + + // Wait for worker 1 to enter processFunc, then backdate started_at so the + // task is eligible for reclaim. + <-worker1Claimed + if _, err := testDB.Exec(ctx, `UPDATE queue_tasks SET started_at = NOW() - INTERVAL '10 minutes'`); err != nil { + t.Fatal(err) + } + + // Worker 2 reclaims the task and stalls. + go func() { + q.claimAndProcess(ctx, func(ctx context.Context, modulePath, version string) (int, error) { + close(worker2Claimed) + <-worker2Stall + return 200, nil + }) + close(worker2Done) + }() + <-worker2Claimed + + // Unstall worker 1. Its delete should be a no-op because worker 2 has a + // newer started_at. + close(worker1Stall) + <-worker1Done + + var count int + if err := testDB.QueryRow(ctx, `SELECT count(*) FROM queue_tasks`).Scan(&count); err != nil { + t.Fatal(err) + } + if count != 1 { + t.Errorf("after worker 1: task count = %d, want 1 (worker 1 should not have deleted it)", count) + } + + // Unstall worker 2. Its delete should succeed. + close(worker2Stall) + <-worker2Done + + if err := testDB.QueryRow(ctx, `SELECT count(*) FROM queue_tasks`).Scan(&count); err != nil { + t.Fatal(err) + } + if count != 0 { + t.Errorf("after worker 2: task count = %d, want 0", count) + } +} |
