aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean Barkhuysen <jean.barkhuysen@gmail.com>2026-02-23 06:13:17 -0700
committerGopher Robot <gobot@golang.org>2026-02-25 06:09:15 -0800
commitd29b966ca794634ccd1758dcd59cd2c8baf23422 (patch)
treedbb5e7699d7ba11a6b8c657b50edcb373a157909
parentdfa517ef2f803e000bd058998099e76648f9e385 (diff)
downloadgo-x-pkgsite-d29b966ca794634ccd1758dcd59cd2c8baf23422.tar.xz
internal/queue: move InMemory queue to its own package
Currently, InMemory queue sits in queue, and is instantiated in gcpqueue for convenience. In preparation for a third queue type (Postgres), this CL separates the two more cleanly, making it more ergonomic for the new queue type to slot in next to the existing two. This CL doesn't change any logic: it just exists to make the next CL smaller and easier to review. I also took the liberty of adding some tests specific to the InMemory queue, since I didn't find any. Let me know if it's tested in another place, though, and if you'd prefer me to remove it. Updates golang/go#74027. Change-Id: I44bd92129f33bc7975fcd138c905e0b7ab49d257 Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/747881 kokoro-CI: kokoro <noreply+kokoro@google.com> Auto-Submit: Jonathan Amsterdam <jba@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Reviewed-by: Ethan Lee <ethanalee@google.com> Reviewed-by: Jonathan Amsterdam <jba@google.com>
-rw-r--r--cmd/frontend/main.go23
-rw-r--r--cmd/worker/main.go34
-rw-r--r--internal/frontend/fetchserver/fetch_test.go4
-rw-r--r--internal/queue/gcpqueue/queue.go18
-rw-r--r--internal/queue/inmemqueue/queue.go90
-rw-r--r--internal/queue/inmemqueue/queue_test.go118
-rw-r--r--internal/queue/queue.go79
-rw-r--r--internal/testing/integration/frontend_test.go3
-rw-r--r--internal/testing/integration/worker_test.go6
-rw-r--r--internal/worker/server_test.go4
10 files changed, 263 insertions, 116 deletions
diff --git a/cmd/frontend/main.go b/cmd/frontend/main.go
index b438c8be..303b9616 100644
--- a/cmd/frontend/main.go
+++ b/cmd/frontend/main.go
@@ -33,6 +33,7 @@ import (
"golang.org/x/pkgsite/internal/proxy"
"golang.org/x/pkgsite/internal/queue"
"golang.org/x/pkgsite/internal/queue/gcpqueue"
+ "golang.org/x/pkgsite/internal/queue/inmemqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/static"
"golang.org/x/pkgsite/internal/trace"
@@ -119,12 +120,26 @@ func main() {
// The closure passed to queue.New is only used for testing and local
// execution, not in production. So it's okay that it doesn't use a
// per-request connection.
- fetchQueue, err = gcpqueue.New(ctx, cfg, queueName, *workers, expg,
- func(ctx context.Context, modulePath, version string) (int, error) {
+ if serverconfig.OnGCP() {
+ q, err := gcpqueue.New(ctx, cfg, queueName, *workers)
+ if err != nil {
+ log.Fatalf(ctx, "error creating GCP queue: %v", err)
+ }
+ fetchQueue = q
+ } else {
+ experiments, err := expg(ctx)
+ if err != nil {
+ log.Fatalf(ctx, "error getting experiment: %v", err)
+ }
+ var names []string
+ for _, e := range experiments {
+ if e.Rollout > 0 {
+ names = append(names, e.Name)
+ }
+ }
+ fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) {
return fetchserver.FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, db)
})
- if err != nil {
- log.Fatalf(ctx, "gcpqueue.New: %v", err)
}
}
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 06990b2b..712301ec 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -28,7 +28,9 @@ import (
"golang.org/x/pkgsite/internal/middleware"
mtimeout "golang.org/x/pkgsite/internal/middleware/timeout"
"golang.org/x/pkgsite/internal/proxy"
+ "golang.org/x/pkgsite/internal/queue"
"golang.org/x/pkgsite/internal/queue/gcpqueue"
+ "golang.org/x/pkgsite/internal/queue/inmemqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/trace"
"golang.org/x/pkgsite/internal/worker"
@@ -96,18 +98,34 @@ func main() {
Timeout: config.SourceTimeout,
})
expg := cmdconfig.ExperimentGetter(ctx, cfg)
- fetchQueue, err := gcpqueue.New(ctx, cfg, queueName, *workers, expg,
- func(ctx context.Context, modulePath, version string) (int, error) {
- f := &worker.Fetcher{
- ProxyClient: proxyClient,
- SourceClient: sourceClient,
- DB: db,
+
+ var fetchQueue queue.Queue
+ if serverconfig.OnGCP() {
+ q, err := gcpqueue.New(ctx, cfg, queueName, *workers)
+ if err != nil {
+ log.Fatalf(ctx, "error creating GCP queue: %v", err)
+ }
+ fetchQueue = q
+ } else {
+ experiments, err := expg(ctx)
+ if err != nil {
+ log.Fatalf(ctx, "error getting experiment: %v", err)
+ }
+ var names []string
+ for _, e := range experiments {
+ if e.Rollout > 0 {
+ names = append(names, e.Name)
}
+ }
+ f := &worker.Fetcher{
+ ProxyClient: proxyClient,
+ SourceClient: sourceClient,
+ DB: db,
+ }
+ fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) {
code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel())
return code, err
})
- if err != nil {
- log.Fatalf(ctx, "gcpqueue.New: %v", err)
}
reporter := cmdconfig.Reporter(ctx, cfg)
diff --git a/internal/frontend/fetchserver/fetch_test.go b/internal/frontend/fetchserver/fetch_test.go
index 7f05fd56..91430a2d 100644
--- a/internal/frontend/fetchserver/fetch_test.go
+++ b/internal/frontend/fetchserver/fetch_test.go
@@ -18,7 +18,7 @@ import (
"golang.org/x/pkgsite/internal/frontend"
"golang.org/x/pkgsite/internal/postgres"
"golang.org/x/pkgsite/internal/proxy/proxytest"
- "golang.org/x/pkgsite/internal/queue"
+ "golang.org/x/pkgsite/internal/queue/inmemqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/testing/sample"
"golang.org/x/pkgsite/internal/testing/testhelper"
@@ -50,7 +50,7 @@ func newTestServerWithFetch(t *testing.T, proxyModules []*proxytest.Module, cach
sourceClient := source.NewClient(http.DefaultClient)
ctx := context.Background()
- q := queue.NewInMemory(ctx, 1, nil,
+ q := inmemqueue.New(ctx, 1, nil,
func(ctx context.Context, mpath, version string) (int, error) {
return FetchAndUpdateState(ctx, mpath, version, proxyClient, sourceClient, testDB)
})
diff --git a/internal/queue/gcpqueue/queue.go b/internal/queue/gcpqueue/queue.go
index e007bf0f..c4e82465 100644
--- a/internal/queue/gcpqueue/queue.go
+++ b/internal/queue/gcpqueue/queue.go
@@ -18,7 +18,6 @@ import (
cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb"
- "golang.org/x/pkgsite/internal/config/serverconfig"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
@@ -27,27 +26,12 @@ import (
"golang.org/x/pkgsite/internal/config"
"golang.org/x/pkgsite/internal/derrors"
"golang.org/x/pkgsite/internal/log"
- "golang.org/x/pkgsite/internal/middleware"
"golang.org/x/pkgsite/internal/queue"
)
// New creates a new Queue with name queueName based on the configuration
// in cfg. When running locally, Queue uses numWorkers concurrent workers.
-func New(ctx context.Context, cfg *config.Config, queueName string, numWorkers int, expGetter middleware.ExperimentGetter, processFunc queue.InMemoryProcessFunc) (queue.Queue, error) {
- if !serverconfig.OnGCP() {
- experiments, err := expGetter(ctx)
- if err != nil {
- return nil, err
- }
- var names []string
- for _, e := range experiments {
- if e.Rollout > 0 {
- names = append(names, e.Name)
- }
- }
- return queue.NewInMemory(ctx, numWorkers, names, processFunc), nil
- }
-
+func New(ctx context.Context, cfg *config.Config, queueName string, numWorkers int) (queue.Queue, error) {
client, err := cloudtasks.NewClient(ctx)
if err != nil {
return nil, err
diff --git a/internal/queue/inmemqueue/queue.go b/internal/queue/inmemqueue/queue.go
new file mode 100644
index 00000000..f930f5d5
--- /dev/null
+++ b/internal/queue/inmemqueue/queue.go
@@ -0,0 +1,90 @@
+// Copyright 2026 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package inmemqueue provides an in-memory queue implementation that can be
+// used for scheduling of fetch actions.
+package inmemqueue
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "golang.org/x/pkgsite/internal"
+ "golang.org/x/pkgsite/internal/experiment"
+ "golang.org/x/pkgsite/internal/log"
+ "golang.org/x/pkgsite/internal/queue"
+)
+
+// InMemory is a Queue implementation that schedules in-process fetch
+// operations. Unlike the GCP task queue, it will not automatically retry tasks
+// on failure.
+//
+// This should only be used for local development.
+type InMemory struct {
+ queue chan internal.Modver
+ done chan struct{}
+ experiments []string
+}
+
+type InMemoryProcessFunc func(context.Context, string, string) (int, error)
+
+// New creates a new InMemory that asynchronously fetches from proxyClient and
+// stores in db. It uses workerCount parallelism to execute these fetches.
+func New(ctx context.Context, workerCount int, experiments []string, processFunc InMemoryProcessFunc) *InMemory {
+ q := &InMemory{
+ queue: make(chan internal.Modver, 1000),
+ experiments: experiments,
+ done: make(chan struct{}),
+ }
+ sem := make(chan struct{}, workerCount)
+ go func() {
+ for v := range q.queue {
+ select {
+ case <-ctx.Done():
+ return
+ case sem <- struct{}{}:
+ }
+
+ // If a worker is available, make a request to the fetch service inside a
+ // goroutine and wait for it to finish.
+ go func(v internal.Modver) {
+ defer func() { <-sem }()
+
+ log.Infof(ctx, "Fetch requested: %s (workerCount = %d)", v, cap(sem))
+
+ fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
+ fetchCtx = experiment.NewContext(fetchCtx, experiments...)
+ defer cancel()
+
+ if _, err := processFunc(fetchCtx, v.Path, v.Version); err != nil {
+ log.Error(fetchCtx, err)
+ }
+ }(v)
+ }
+ for i := 0; i < cap(sem); i++ {
+ select {
+ case <-ctx.Done():
+ panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err()))
+ case sem <- struct{}{}:
+ }
+ }
+ close(q.done)
+ }()
+ return q
+}
+
+// ScheduleFetch pushes a fetch task into the local queue to be processed
+// asynchronously.
+func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version string, _ *queue.Options) (bool, error) {
+ q.queue <- internal.Modver{Path: modulePath, Version: version}
+ return true, nil
+}
+
+// WaitForTesting waits for all queued requests to finish. It should only be
+// used by test code.
+func (q *InMemory) WaitForTesting(ctx context.Context) {
+ close(q.queue)
+ <-q.done
+}
diff --git a/internal/queue/inmemqueue/queue_test.go b/internal/queue/inmemqueue/queue_test.go
new file mode 100644
index 00000000..79847140
--- /dev/null
+++ b/internal/queue/inmemqueue/queue_test.go
@@ -0,0 +1,118 @@
+// Copyright 2026 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package inmemqueue
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "testing"
+)
+
+func TestInMemory_ScheduleAndProcess(t *testing.T) {
+ ctx := context.Background()
+
+ var processed int
+ var mu sync.Mutex
+
+ processFunc := func(ctx context.Context, path, version string) (int, error) {
+ mu.Lock()
+ processed++
+ mu.Unlock()
+ return 200, nil
+ }
+
+ q := New(ctx, 2, nil, processFunc)
+
+ tests := []struct {
+ path string
+ version string
+ }{
+ {"example.com/mod1", "v1.0.0"},
+ {"example.com/mod2", "v1.1.0"},
+ {"example.com/mod3", "v2.0.0"},
+ }
+
+ for _, tc := range tests {
+ ok, err := q.ScheduleFetch(ctx, tc.path, tc.version, nil)
+ if err != nil {
+ t.Fatalf("ScheduleFetch(%q, %q) returned error: %v", tc.path, tc.version, err)
+ }
+ if !ok {
+ t.Errorf("ScheduleFetch(%q, %q) = false, want true", tc.path, tc.version)
+ }
+ }
+
+ q.WaitForTesting(ctx)
+
+ if got, want := processed, len(tests); got != want {
+ t.Errorf("processed tasks = %d, want %d", got, want)
+ }
+}
+
+func TestInMemory_ProcessFuncError(t *testing.T) {
+ ctx := context.Background()
+
+ processFunc := func(ctx context.Context, path, version string) (int, error) {
+ return 500, errors.New("simulated fetch error")
+ }
+
+ q := New(ctx, 1, nil, processFunc)
+
+ _, err := q.ScheduleFetch(ctx, "example.com/error-mod", "v1.0.0", nil)
+ if err != nil {
+ t.Fatalf("ScheduleFetch returned error: %v", err)
+ }
+
+ // This should complete without panicking or deadlocking.
+ q.WaitForTesting(ctx)
+}
+
+func TestInMemory_WorkerConcurrency(t *testing.T) {
+ ctx := context.Background()
+ workerCount := 2
+
+ var active, max int
+ var mu sync.Mutex
+ var wg sync.WaitGroup
+
+ blockCh := make(chan struct{})
+
+ processFunc := func(ctx context.Context, path, version string) (int, error) {
+ mu.Lock()
+ active++
+ if active > max {
+ max = active
+ }
+ mu.Unlock()
+
+ <-blockCh
+
+ mu.Lock()
+ active--
+ mu.Unlock()
+ wg.Done()
+ return 200, nil
+ }
+
+ q := New(ctx, workerCount, nil, processFunc)
+
+ taskCount := 5
+ wg.Add(taskCount)
+ for range taskCount {
+ _, err := q.ScheduleFetch(ctx, "example.com/concurrent", "v1.0.0", nil)
+ if err != nil {
+ t.Fatalf("ScheduleFetch returned error: %v", err)
+ }
+ }
+
+ close(blockCh)
+ wg.Wait()
+ q.WaitForTesting(ctx)
+
+ if max > workerCount {
+ t.Errorf("max concurrent workers = %d, want <= %d", max, workerCount)
+ }
+}
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 00b580c7..58ebdf8a 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -8,12 +8,6 @@ package queue
import (
"context"
- "fmt"
- "time"
-
- "golang.org/x/pkgsite/internal"
- "golang.org/x/pkgsite/internal/experiment"
- "golang.org/x/pkgsite/internal/log"
)
// A Queue provides an interface for asynchronous scheduling of fetch actions.
@@ -43,76 +37,3 @@ const (
SourceFrontendValue = "frontend"
SourceWorkerValue = "worker"
)
-
-// InMemory is a Queue implementation that schedules in-process fetch
-// operations. Unlike the GCP task queue, it will not automatically retry tasks
-// on failure.
-//
-// This should only be used for local development.
-type InMemory struct {
- queue chan internal.Modver
- done chan struct{}
- experiments []string
-}
-
-type InMemoryProcessFunc func(context.Context, string, string) (int, error)
-
-// NewInMemory creates a new InMemory that asynchronously fetches
-// from proxyClient and stores in db. It uses workerCount parallelism to
-// execute these fetches.
-func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc InMemoryProcessFunc) *InMemory {
- q := &InMemory{
- queue: make(chan internal.Modver, 1000),
- experiments: experiments,
- done: make(chan struct{}),
- }
- sem := make(chan struct{}, workerCount)
- go func() {
- for v := range q.queue {
- select {
- case <-ctx.Done():
- return
- case sem <- struct{}{}:
- }
-
- // If a worker is available, make a request to the fetch service inside a
- // goroutine and wait for it to finish.
- go func(v internal.Modver) {
- defer func() { <-sem }()
-
- log.Infof(ctx, "Fetch requested: %s (workerCount = %d)", v, cap(sem))
-
- fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
- fetchCtx = experiment.NewContext(fetchCtx, experiments...)
- defer cancel()
-
- if _, err := processFunc(fetchCtx, v.Path, v.Version); err != nil {
- log.Error(fetchCtx, err)
- }
- }(v)
- }
- for i := 0; i < cap(sem); i++ {
- select {
- case <-ctx.Done():
- panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err()))
- case sem <- struct{}{}:
- }
- }
- close(q.done)
- }()
- return q
-}
-
-// ScheduleFetch pushes a fetch task into the local queue to be processed
-// asynchronously.
-func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version string, _ *Options) (bool, error) {
- q.queue <- internal.Modver{Path: modulePath, Version: version}
- return true, nil
-}
-
-// WaitForTesting waits for all queued requests to finish. It should only be
-// used by test code.
-func (q *InMemory) WaitForTesting(ctx context.Context) {
- close(q.queue)
- <-q.done
-}
diff --git a/internal/testing/integration/frontend_test.go b/internal/testing/integration/frontend_test.go
index e9cbe997..66a6aa6e 100644
--- a/internal/testing/integration/frontend_test.go
+++ b/internal/testing/integration/frontend_test.go
@@ -25,6 +25,7 @@ import (
"golang.org/x/pkgsite/internal/proxy"
"golang.org/x/pkgsite/internal/proxy/proxytest"
"golang.org/x/pkgsite/internal/queue"
+ "golang.org/x/pkgsite/internal/queue/inmemqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/testing/htmlcheck"
)
@@ -89,7 +90,7 @@ func setupQueue(ctx context.Context, t *testing.T, proxyModules []*proxytest.Mod
cctx, cancel := context.WithCancel(ctx)
proxyClient, teardown := proxytest.SetupTestClient(t, proxyModules)
sourceClient := source.NewClient(http.DefaultClient)
- q := queue.NewInMemory(cctx, 1, experimentNames,
+ q := inmemqueue.New(cctx, 1, experimentNames,
func(ctx context.Context, mpath, version string) (_ int, err error) {
return fetchserver.FetchAndUpdateState(ctx, mpath, version, proxyClient, sourceClient, testDB)
})
diff --git a/internal/testing/integration/worker_test.go b/internal/testing/integration/worker_test.go
index d6f75766..1edf9620 100644
--- a/internal/testing/integration/worker_test.go
+++ b/internal/testing/integration/worker_test.go
@@ -22,13 +22,13 @@ import (
"golang.org/x/pkgsite/internal/config"
"golang.org/x/pkgsite/internal/index"
"golang.org/x/pkgsite/internal/proxy"
- "golang.org/x/pkgsite/internal/queue"
+ "golang.org/x/pkgsite/internal/queue/inmemqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/worker"
)
func setupWorker(ctx context.Context, t *testing.T, proxyClient *proxy.Client, indexClient *index.Client,
- redisCacheClient *redis.Client) (*httptest.Server, *worker.Fetcher, *queue.InMemory) {
+ redisCacheClient *redis.Client) (*httptest.Server, *worker.Fetcher, *inmemqueue.InMemory) {
t.Helper()
fetcher := &worker.Fetcher{
@@ -39,7 +39,7 @@ func setupWorker(ctx context.Context, t *testing.T, proxyClient *proxy.Client, i
}
// TODO: it would be better if InMemory made http requests
// back to worker, rather than calling fetch itself.
- queue := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {
+ queue := inmemqueue.New(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {
code, _, err := fetcher.FetchAndUpdateState(ctx, mpath, version, "test")
return code, err
})
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index 55108907..427dce9e 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -24,7 +24,7 @@ import (
"golang.org/x/pkgsite/internal/index"
"golang.org/x/pkgsite/internal/postgres"
"golang.org/x/pkgsite/internal/proxy/proxytest"
- "golang.org/x/pkgsite/internal/queue"
+ "golang.org/x/pkgsite/internal/queue/inmemqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/testing/sample"
)
@@ -178,7 +178,7 @@ func TestWorker(t *testing.T) {
f := &Fetcher{proxyClient, source.NewClient(http.DefaultClient), testDB, nil, nil, ""}
// Use 10 workers to have parallelism consistent with the worker binary.
- q := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {
+ q := inmemqueue.New(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {
code, _, err := f.FetchAndUpdateState(ctx, mpath, version, "")
return code, err
})