diff options
| author | Jean Barkhuysen <jean.barkhuysen@gmail.com> | 2026-02-23 06:13:17 -0700 |
|---|---|---|
| committer | Gopher Robot <gobot@golang.org> | 2026-02-25 06:09:15 -0800 |
| commit | d29b966ca794634ccd1758dcd59cd2c8baf23422 (patch) | |
| tree | dbb5e7699d7ba11a6b8c657b50edcb373a157909 /cmd | |
| parent | dfa517ef2f803e000bd058998099e76648f9e385 (diff) | |
| download | go-x-pkgsite-d29b966ca794634ccd1758dcd59cd2c8baf23422.tar.xz | |
internal/queue: move InMemory queue to its own package
Currently, InMemory queue sits in queue, and is instantiated in gcpqueue for
convenience. In preparation for a third queue type (Postgres), this CL
separates the two more cleanly, making it more ergonomic for the new queue type
to slot in next to the existing two.
This CL doesn't change any logic: it just exists to make the next CL smaller
and easier to review.
I also took the liberty of adding some tests specific to the InMemory queue,
since I didn't find any. Let me know if it's tested in another place, though,
and if you'd prefer me to remove it.
Updates golang/go#74027.
Change-Id: I44bd92129f33bc7975fcd138c905e0b7ab49d257
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/747881
kokoro-CI: kokoro <noreply+kokoro@google.com>
Auto-Submit: Jonathan Amsterdam <jba@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Ethan Lee <ethanalee@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Diffstat (limited to 'cmd')
| -rw-r--r-- | cmd/frontend/main.go | 23 | ||||
| -rw-r--r-- | cmd/worker/main.go | 34 |
2 files changed, 45 insertions, 12 deletions
diff --git a/cmd/frontend/main.go b/cmd/frontend/main.go index b438c8be..303b9616 100644 --- a/cmd/frontend/main.go +++ b/cmd/frontend/main.go @@ -33,6 +33,7 @@ import ( "golang.org/x/pkgsite/internal/proxy" "golang.org/x/pkgsite/internal/queue" "golang.org/x/pkgsite/internal/queue/gcpqueue" + "golang.org/x/pkgsite/internal/queue/inmemqueue" "golang.org/x/pkgsite/internal/source" "golang.org/x/pkgsite/internal/static" "golang.org/x/pkgsite/internal/trace" @@ -119,12 +120,26 @@ func main() { // The closure passed to queue.New is only used for testing and local // execution, not in production. So it's okay that it doesn't use a // per-request connection. - fetchQueue, err = gcpqueue.New(ctx, cfg, queueName, *workers, expg, - func(ctx context.Context, modulePath, version string) (int, error) { + if serverconfig.OnGCP() { + q, err := gcpqueue.New(ctx, cfg, queueName, *workers) + if err != nil { + log.Fatalf(ctx, "error creating GCP queue: %v", err) + } + fetchQueue = q + } else { + experiments, err := expg(ctx) + if err != nil { + log.Fatalf(ctx, "error getting experiment: %v", err) + } + var names []string + for _, e := range experiments { + if e.Rollout > 0 { + names = append(names, e.Name) + } + } + fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) { return fetchserver.FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, db) }) - if err != nil { - log.Fatalf(ctx, "gcpqueue.New: %v", err) } } diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 06990b2b..712301ec 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -28,7 +28,9 @@ import ( "golang.org/x/pkgsite/internal/middleware" mtimeout "golang.org/x/pkgsite/internal/middleware/timeout" "golang.org/x/pkgsite/internal/proxy" + "golang.org/x/pkgsite/internal/queue" "golang.org/x/pkgsite/internal/queue/gcpqueue" + "golang.org/x/pkgsite/internal/queue/inmemqueue" "golang.org/x/pkgsite/internal/source" "golang.org/x/pkgsite/internal/trace" "golang.org/x/pkgsite/internal/worker" @@ -96,18 +98,34 @@ func main() { Timeout: config.SourceTimeout, }) expg := cmdconfig.ExperimentGetter(ctx, cfg) - fetchQueue, err := gcpqueue.New(ctx, cfg, queueName, *workers, expg, - func(ctx context.Context, modulePath, version string) (int, error) { - f := &worker.Fetcher{ - ProxyClient: proxyClient, - SourceClient: sourceClient, - DB: db, + + var fetchQueue queue.Queue + if serverconfig.OnGCP() { + q, err := gcpqueue.New(ctx, cfg, queueName, *workers) + if err != nil { + log.Fatalf(ctx, "error creating GCP queue: %v", err) + } + fetchQueue = q + } else { + experiments, err := expg(ctx) + if err != nil { + log.Fatalf(ctx, "error getting experiment: %v", err) + } + var names []string + for _, e := range experiments { + if e.Rollout > 0 { + names = append(names, e.Name) } + } + f := &worker.Fetcher{ + ProxyClient: proxyClient, + SourceClient: sourceClient, + DB: db, + } + fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) { code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel()) return code, err }) - if err != nil { - log.Fatalf(ctx, "gcpqueue.New: %v", err) } reporter := cmdconfig.Reporter(ctx, cfg) |
