aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/frontend/main.go23
-rw-r--r--cmd/worker/main.go34
-rw-r--r--internal/frontend/fetchserver/fetch_test.go4
-rw-r--r--internal/queue/gcpqueue/queue.go18
-rw-r--r--internal/queue/inmemqueue/queue.go90
-rw-r--r--internal/queue/inmemqueue/queue_test.go118
-rw-r--r--internal/queue/queue.go79
-rw-r--r--internal/testing/integration/frontend_test.go3
-rw-r--r--internal/testing/integration/worker_test.go6
-rw-r--r--internal/worker/server_test.go4
10 files changed, 263 insertions, 116 deletions
diff --git a/cmd/frontend/main.go b/cmd/frontend/main.go
index b438c8be..303b9616 100644
--- a/cmd/frontend/main.go
+++ b/cmd/frontend/main.go
@@ -33,6 +33,7 @@ import (
"golang.org/x/pkgsite/internal/proxy"
"golang.org/x/pkgsite/internal/queue"
"golang.org/x/pkgsite/internal/queue/gcpqueue"
+ "golang.org/x/pkgsite/internal/queue/inmemqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/static"
"golang.org/x/pkgsite/internal/trace"
@@ -119,12 +120,26 @@ func main() {
// The closure passed to queue.New is only used for testing and local
// execution, not in production. So it's okay that it doesn't use a
// per-request connection.
- fetchQueue, err = gcpqueue.New(ctx, cfg, queueName, *workers, expg,
- func(ctx context.Context, modulePath, version string) (int, error) {
+ if serverconfig.OnGCP() {
+ q, err := gcpqueue.New(ctx, cfg, queueName, *workers)
+ if err != nil {
+ log.Fatalf(ctx, "error creating GCP queue: %v", err)
+ }
+ fetchQueue = q
+ } else {
+ experiments, err := expg(ctx)
+ if err != nil {
+ log.Fatalf(ctx, "error getting experiment: %v", err)
+ }
+ var names []string
+ for _, e := range experiments {
+ if e.Rollout > 0 {
+ names = append(names, e.Name)
+ }
+ }
+ fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) {
return fetchserver.FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, db)
})
- if err != nil {
- log.Fatalf(ctx, "gcpqueue.New: %v", err)
}
}
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 06990b2b..712301ec 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -28,7 +28,9 @@ import (
"golang.org/x/pkgsite/internal/middleware"
mtimeout "golang.org/x/pkgsite/internal/middleware/timeout"
"golang.org/x/pkgsite/internal/proxy"
+ "golang.org/x/pkgsite/internal/queue"
"golang.org/x/pkgsite/internal/queue/gcpqueue"
+ "golang.org/x/pkgsite/internal/queue/inmemqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/trace"
"golang.org/x/pkgsite/internal/worker"
@@ -96,18 +98,34 @@ func main() {
Timeout: config.SourceTimeout,
})
expg := cmdconfig.ExperimentGetter(ctx, cfg)
- fetchQueue, err := gcpqueue.New(ctx, cfg, queueName, *workers, expg,
- func(ctx context.Context, modulePath, version string) (int, error) {
- f := &worker.Fetcher{
- ProxyClient: proxyClient,
- SourceClient: sourceClient,
- DB: db,
+
+ var fetchQueue queue.Queue
+ if serverconfig.OnGCP() {
+ q, err := gcpqueue.New(ctx, cfg, queueName, *workers)
+ if err != nil {
+ log.Fatalf(ctx, "error creating GCP queue: %v", err)
+ }
+ fetchQueue = q
+ } else {
+ experiments, err := expg(ctx)
+ if err != nil {
+ log.Fatalf(ctx, "error getting experiment: %v", err)
+ }
+ var names []string
+ for _, e := range experiments {
+ if e.Rollout > 0 {
+ names = append(names, e.Name)
}
+ }
+ f := &worker.Fetcher{
+ ProxyClient: proxyClient,
+ SourceClient: sourceClient,
+ DB: db,
+ }
+ fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) {
code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel())
return code, err
})
- if err != nil {
- log.Fatalf(ctx, "gcpqueue.New: %v", err)
}
reporter := cmdconfig.Reporter(ctx, cfg)
diff --git a/internal/frontend/fetchserver/fetch_test.go b/internal/frontend/fetchserver/fetch_test.go
index 7f05fd56..91430a2d 100644
--- a/internal/frontend/fetchserver/fetch_test.go
+++ b/internal/frontend/fetchserver/fetch_test.go
@@ -18,7 +18,7 @@ import (
"golang.org/x/pkgsite/internal/frontend"
"golang.org/x/pkgsite/internal/postgres"
"golang.org/x/pkgsite/internal/proxy/proxytest"
- "golang.org/x/pkgsite/internal/queue"
+ "golang.org/x/pkgsite/internal/queue/inmemqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/testing/sample"
"golang.org/x/pkgsite/internal/testing/testhelper"
@@ -50,7 +50,7 @@ func newTestServerWithFetch(t *testing.T, proxyModules []*proxytest.Module, cach
sourceClient := source.NewClient(http.DefaultClient)
ctx := context.Background()
- q := queue.NewInMemory(ctx, 1, nil,
+ q := inmemqueue.New(ctx, 1, nil,
func(ctx context.Context, mpath, version string) (int, error) {
return FetchAndUpdateState(ctx, mpath, version, proxyClient, sourceClient, testDB)
})
diff --git a/internal/queue/gcpqueue/queue.go b/internal/queue/gcpqueue/queue.go
index e007bf0f..c4e82465 100644
--- a/internal/queue/gcpqueue/queue.go
+++ b/internal/queue/gcpqueue/queue.go
@@ -18,7 +18,6 @@ import (
cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb"
- "golang.org/x/pkgsite/internal/config/serverconfig"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
@@ -27,27 +26,12 @@ import (
"golang.org/x/pkgsite/internal/config"
"golang.org/x/pkgsite/internal/derrors"
"golang.org/x/pkgsite/internal/log"
- "golang.org/x/pkgsite/internal/middleware"
"golang.org/x/pkgsite/internal/queue"
)
// New creates a new Queue with name queueName based on the configuration
// in cfg. When running locally, Queue uses numWorkers concurrent workers.
-func New(ctx context.Context, cfg *config.Config, queueName string, numWorkers int, expGetter middleware.ExperimentGetter, processFunc queue.InMemoryProcessFunc) (queue.Queue, error) {
- if !serverconfig.OnGCP() {
- experiments, err := expGetter(ctx)
- if err != nil {
- return nil, err
- }
- var names []string
- for _, e := range experiments {
- if e.Rollout > 0 {
- names = append(names, e.Name)
- }
- }
- return queue.NewInMemory(ctx, numWorkers, names, processFunc), nil
- }
-
+func New(ctx context.Context, cfg *config.Config, queueName string, numWorkers int) (queue.Queue, error) {
client, err := cloudtasks.NewClient(ctx)
if err != nil {
return nil, err
diff --git a/internal/queue/inmemqueue/queue.go b/internal/queue/inmemqueue/queue.go
new file mode 100644
index 00000000..f930f5d5
--- /dev/null
+++ b/internal/queue/inmemqueue/queue.go
@@ -0,0 +1,90 @@
+// Copyright 2026 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package inmemqueue provides an in-memory queue implementation that can be
+// used for scheduling of fetch actions.
+package inmemqueue
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "golang.org/x/pkgsite/internal"
+ "golang.org/x/pkgsite/internal/experiment"
+ "golang.org/x/pkgsite/internal/log"
+ "golang.org/x/pkgsite/internal/queue"
+)
+
+// InMemory is a Queue implementation that schedules in-process fetch
+// operations. Unlike the GCP task queue, it will not automatically retry tasks
+// on failure.
+//
+// This should only be used for local development.
+type InMemory struct {
+ queue chan internal.Modver
+ done chan struct{}
+ experiments []string
+}
+
+type InMemoryProcessFunc func(context.Context, string, string) (int, error)
+
+// New creates a new InMemory that asynchronously fetches from proxyClient and
+// stores in db. It uses workerCount parallelism to execute these fetches.
+func New(ctx context.Context, workerCount int, experiments []string, processFunc InMemoryProcessFunc) *InMemory {
+ q := &InMemory{
+ queue: make(chan internal.Modver, 1000),
+ experiments: experiments,
+ done: make(chan struct{}),
+ }
+ sem := make(chan struct{}, workerCount)
+ go func() {
+ for v := range q.queue {
+ select {
+ case <-ctx.Done():
+ return
+ case sem <- struct{}{}:
+ }
+
+ // If a worker is available, make a request to the fetch service inside a
+ // goroutine and wait for it to finish.
+ go func(v internal.Modver) {
+ defer func() { <-sem }()
+
+ log.Infof(ctx, "Fetch requested: %s (workerCount = %d)", v, cap(sem))
+
+ fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
+ fetchCtx = experiment.NewContext(fetchCtx, experiments...)
+ defer cancel()
+
+ if _, err := processFunc(fetchCtx, v.Path, v.Version); err != nil {
+ log.Error(fetchCtx, err)
+ }
+ }(v)
+ }
+ for i := 0; i < cap(sem); i++ {
+ select {
+ case <-ctx.Done():
+ panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err()))
+ case sem <- struct{}{}:
+ }
+ }
+ close(q.done)
+ }()
+ return q
+}
+
+// ScheduleFetch pushes a fetch task into the local queue to be processed
+// asynchronously.
+func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version string, _ *queue.Options) (bool, error) {
+ q.queue <- internal.Modver{Path: modulePath, Version: version}
+ return true, nil
+}
+
+// WaitForTesting waits for all queued requests to finish. It should only be
+// used by test code.
+func (q *InMemory) WaitForTesting(ctx context.Context) {
+ close(q.queue)
+ <-q.done
+}
diff --git a/internal/queue/inmemqueue/queue_test.go b/internal/queue/inmemqueue/queue_test.go
new file mode 100644
index 00000000..79847140
--- /dev/null
+++ b/internal/queue/inmemqueue/queue_test.go
@@ -0,0 +1,118 @@
+// Copyright 2026 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package inmemqueue
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "testing"
+)
+
+func TestInMemory_ScheduleAndProcess(t *testing.T) {
+ ctx := context.Background()
+
+ var processed int
+ var mu sync.Mutex
+
+ processFunc := func(ctx context.Context, path, version string) (int, error) {
+ mu.Lock()
+ processed++
+ mu.Unlock()
+ return 200, nil
+ }
+
+ q := New(ctx, 2, nil, processFunc)
+
+ tests := []struct {
+ path string
+ version string
+ }{
+ {"example.com/mod1", "v1.0.0"},
+ {"example.com/mod2", "v1.1.0"},
+ {"example.com/mod3", "v2.0.0"},
+ }
+
+ for _, tc := range tests {
+ ok, err := q.ScheduleFetch(ctx, tc.path, tc.version, nil)
+ if err != nil {
+ t.Fatalf("ScheduleFetch(%q, %q) returned error: %v", tc.path, tc.version, err)
+ }
+ if !ok {
+ t.Errorf("ScheduleFetch(%q, %q) = false, want true", tc.path, tc.version)
+ }
+ }
+
+ q.WaitForTesting(ctx)
+
+ if got, want := processed, len(tests); got != want {
+ t.Errorf("processed tasks = %d, want %d", got, want)
+ }
+}
+
+func TestInMemory_ProcessFuncError(t *testing.T) {
+ ctx := context.Background()
+
+ processFunc := func(ctx context.Context, path, version string) (int, error) {
+ return 500, errors.New("simulated fetch error")
+ }
+
+ q := New(ctx, 1, nil, processFunc)
+
+ _, err := q.ScheduleFetch(ctx, "example.com/error-mod", "v1.0.0", nil)
+ if err != nil {
+ t.Fatalf("ScheduleFetch returned error: %v", err)
+ }
+
+ // This should complete without panicking or deadlocking.
+ q.WaitForTesting(ctx)
+}
+
+func TestInMemory_WorkerConcurrency(t *testing.T) {
+ ctx := context.Background()
+ workerCount := 2
+
+ var active, max int
+ var mu sync.Mutex
+ var wg sync.WaitGroup
+
+ blockCh := make(chan struct{})
+
+ processFunc := func(ctx context.Context, path, version string) (int, error) {
+ mu.Lock()
+ active++
+ if active > max {
+ max = active
+ }
+ mu.Unlock()
+
+ <-blockCh
+
+ mu.Lock()
+ active--
+ mu.Unlock()
+ wg.Done()
+ return 200, nil
+ }
+
+ q := New(ctx, workerCount, nil, processFunc)
+
+ taskCount := 5
+ wg.Add(taskCount)
+ for range taskCount {
+ _, err := q.ScheduleFetch(ctx, "example.com/concurrent", "v1.0.0", nil)
+ if err != nil {
+ t.Fatalf("ScheduleFetch returned error: %v", err)
+ }
+ }
+
+ close(blockCh)
+ wg.Wait()
+ q.WaitForTesting(ctx)
+
+ if max > workerCount {
+ t.Errorf("max concurrent workers = %d, want <= %d", max, workerCount)
+ }
+}
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 00b580c7..58ebdf8a 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -8,12 +8,6 @@ package queue
import (
"context"
- "fmt"
- "time"
-
- "golang.org/x/pkgsite/internal"
- "golang.org/x/pkgsite/internal/experiment"
- "golang.org/x/pkgsite/internal/log"
)
// A Queue provides an interface for asynchronous scheduling of fetch actions.
@@ -43,76 +37,3 @@ const (
SourceFrontendValue = "frontend"
SourceWorkerValue = "worker"
)
-
-// InMemory is a Queue implementation that schedules in-process fetch
-// operations. Unlike the GCP task queue, it will not automatically retry tasks
-// on failure.
-//
-// This should only be used for local development.
-type InMemory struct {
- queue chan internal.Modver
- done chan struct{}
- experiments []string
-}
-
-type InMemoryProcessFunc func(context.Context, string, string) (int, error)
-
-// NewInMemory creates a new InMemory that asynchronously fetches
-// from proxyClient and stores in db. It uses workerCount parallelism to
-// execute these fetches.
-func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc InMemoryProcessFunc) *InMemory {
- q := &InMemory{
- queue: make(chan internal.Modver, 1000),
- experiments: experiments,
- done: make(chan struct{}),
- }
- sem := make(chan struct{}, workerCount)
- go func() {
- for v := range q.queue {
- select {
- case <-ctx.Done():
- return
- case sem <- struct{}{}:
- }
-
- // If a worker is available, make a request to the fetch service inside a
- // goroutine and wait for it to finish.
- go func(v internal.Modver) {
- defer func() { <-sem }()
-
- log.Infof(ctx, "Fetch requested: %s (workerCount = %d)", v, cap(sem))
-
- fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
- fetchCtx = experiment.NewContext(fetchCtx, experiments...)
- defer cancel()
-
- if _, err := processFunc(fetchCtx, v.Path, v.Version); err != nil {
- log.Error(fetchCtx, err)
- }
- }(v)
- }
- for i := 0; i < cap(sem); i++ {
- select {
- case <-ctx.Done():
- panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err()))
- case sem <- struct{}{}:
- }
- }
- close(q.done)
- }()
- return q
-}
-
-// ScheduleFetch pushes a fetch task into the local queue to be processed
-// asynchronously.
-func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version string, _ *Options) (bool, error) {
- q.queue <- internal.Modver{Path: modulePath, Version: version}
- return true, nil
-}
-
-// WaitForTesting waits for all queued requests to finish. It should only be
-// used by test code.
-func (q *InMemory) WaitForTesting(ctx context.Context) {
- close(q.queue)
- <-q.done
-}
diff --git a/internal/testing/integration/frontend_test.go b/internal/testing/integration/frontend_test.go
index e9cbe997..66a6aa6e 100644
--- a/internal/testing/integration/frontend_test.go
+++ b/internal/testing/integration/frontend_test.go
@@ -25,6 +25,7 @@ import (
"golang.org/x/pkgsite/internal/proxy"
"golang.org/x/pkgsite/internal/proxy/proxytest"
"golang.org/x/pkgsite/internal/queue"
+ "golang.org/x/pkgsite/internal/queue/inmemqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/testing/htmlcheck"
)
@@ -89,7 +90,7 @@ func setupQueue(ctx context.Context, t *testing.T, proxyModules []*proxytest.Mod
cctx, cancel := context.WithCancel(ctx)
proxyClient, teardown := proxytest.SetupTestClient(t, proxyModules)
sourceClient := source.NewClient(http.DefaultClient)
- q := queue.NewInMemory(cctx, 1, experimentNames,
+ q := inmemqueue.New(cctx, 1, experimentNames,
func(ctx context.Context, mpath, version string) (_ int, err error) {
return fetchserver.FetchAndUpdateState(ctx, mpath, version, proxyClient, sourceClient, testDB)
})
diff --git a/internal/testing/integration/worker_test.go b/internal/testing/integration/worker_test.go
index d6f75766..1edf9620 100644
--- a/internal/testing/integration/worker_test.go
+++ b/internal/testing/integration/worker_test.go
@@ -22,13 +22,13 @@ import (
"golang.org/x/pkgsite/internal/config"
"golang.org/x/pkgsite/internal/index"
"golang.org/x/pkgsite/internal/proxy"
- "golang.org/x/pkgsite/internal/queue"
+ "golang.org/x/pkgsite/internal/queue/inmemqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/worker"
)
func setupWorker(ctx context.Context, t *testing.T, proxyClient *proxy.Client, indexClient *index.Client,
- redisCacheClient *redis.Client) (*httptest.Server, *worker.Fetcher, *queue.InMemory) {
+ redisCacheClient *redis.Client) (*httptest.Server, *worker.Fetcher, *inmemqueue.InMemory) {
t.Helper()
fetcher := &worker.Fetcher{
@@ -39,7 +39,7 @@ func setupWorker(ctx context.Context, t *testing.T, proxyClient *proxy.Client, i
}
// TODO: it would be better if InMemory made http requests
// back to worker, rather than calling fetch itself.
- queue := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {
+ queue := inmemqueue.New(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {
code, _, err := fetcher.FetchAndUpdateState(ctx, mpath, version, "test")
return code, err
})
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index 55108907..427dce9e 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -24,7 +24,7 @@ import (
"golang.org/x/pkgsite/internal/index"
"golang.org/x/pkgsite/internal/postgres"
"golang.org/x/pkgsite/internal/proxy/proxytest"
- "golang.org/x/pkgsite/internal/queue"
+ "golang.org/x/pkgsite/internal/queue/inmemqueue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/testing/sample"
)
@@ -178,7 +178,7 @@ func TestWorker(t *testing.T) {
f := &Fetcher{proxyClient, source.NewClient(http.DefaultClient), testDB, nil, nil, ""}
// Use 10 workers to have parallelism consistent with the worker binary.
- q := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {
+ q := inmemqueue.New(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {
code, _, err := f.FetchAndUpdateState(ctx, mpath, version, "")
return code, err
})