diff options
| -rw-r--r-- | cmd/frontend/main.go | 23 | ||||
| -rw-r--r-- | cmd/worker/main.go | 34 | ||||
| -rw-r--r-- | internal/frontend/fetchserver/fetch_test.go | 4 | ||||
| -rw-r--r-- | internal/queue/gcpqueue/queue.go | 18 | ||||
| -rw-r--r-- | internal/queue/inmemqueue/queue.go | 90 | ||||
| -rw-r--r-- | internal/queue/inmemqueue/queue_test.go | 118 | ||||
| -rw-r--r-- | internal/queue/queue.go | 79 | ||||
| -rw-r--r-- | internal/testing/integration/frontend_test.go | 3 | ||||
| -rw-r--r-- | internal/testing/integration/worker_test.go | 6 | ||||
| -rw-r--r-- | internal/worker/server_test.go | 4 |
10 files changed, 263 insertions, 116 deletions
diff --git a/cmd/frontend/main.go b/cmd/frontend/main.go index b438c8be..303b9616 100644 --- a/cmd/frontend/main.go +++ b/cmd/frontend/main.go @@ -33,6 +33,7 @@ import ( "golang.org/x/pkgsite/internal/proxy" "golang.org/x/pkgsite/internal/queue" "golang.org/x/pkgsite/internal/queue/gcpqueue" + "golang.org/x/pkgsite/internal/queue/inmemqueue" "golang.org/x/pkgsite/internal/source" "golang.org/x/pkgsite/internal/static" "golang.org/x/pkgsite/internal/trace" @@ -119,12 +120,26 @@ func main() { // The closure passed to queue.New is only used for testing and local // execution, not in production. So it's okay that it doesn't use a // per-request connection. - fetchQueue, err = gcpqueue.New(ctx, cfg, queueName, *workers, expg, - func(ctx context.Context, modulePath, version string) (int, error) { + if serverconfig.OnGCP() { + q, err := gcpqueue.New(ctx, cfg, queueName, *workers) + if err != nil { + log.Fatalf(ctx, "error creating GCP queue: %v", err) + } + fetchQueue = q + } else { + experiments, err := expg(ctx) + if err != nil { + log.Fatalf(ctx, "error getting experiment: %v", err) + } + var names []string + for _, e := range experiments { + if e.Rollout > 0 { + names = append(names, e.Name) + } + } + fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) { return fetchserver.FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, db) }) - if err != nil { - log.Fatalf(ctx, "gcpqueue.New: %v", err) } } diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 06990b2b..712301ec 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -28,7 +28,9 @@ import ( "golang.org/x/pkgsite/internal/middleware" mtimeout "golang.org/x/pkgsite/internal/middleware/timeout" "golang.org/x/pkgsite/internal/proxy" + "golang.org/x/pkgsite/internal/queue" "golang.org/x/pkgsite/internal/queue/gcpqueue" + "golang.org/x/pkgsite/internal/queue/inmemqueue" "golang.org/x/pkgsite/internal/source" "golang.org/x/pkgsite/internal/trace" "golang.org/x/pkgsite/internal/worker" @@ -96,18 +98,34 @@ func main() { Timeout: config.SourceTimeout, }) expg := cmdconfig.ExperimentGetter(ctx, cfg) - fetchQueue, err := gcpqueue.New(ctx, cfg, queueName, *workers, expg, - func(ctx context.Context, modulePath, version string) (int, error) { - f := &worker.Fetcher{ - ProxyClient: proxyClient, - SourceClient: sourceClient, - DB: db, + + var fetchQueue queue.Queue + if serverconfig.OnGCP() { + q, err := gcpqueue.New(ctx, cfg, queueName, *workers) + if err != nil { + log.Fatalf(ctx, "error creating GCP queue: %v", err) + } + fetchQueue = q + } else { + experiments, err := expg(ctx) + if err != nil { + log.Fatalf(ctx, "error getting experiment: %v", err) + } + var names []string + for _, e := range experiments { + if e.Rollout > 0 { + names = append(names, e.Name) } + } + f := &worker.Fetcher{ + ProxyClient: proxyClient, + SourceClient: sourceClient, + DB: db, + } + fetchQueue = inmemqueue.New(ctx, *workers, names, func(ctx context.Context, modulePath, version string) (int, error) { code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel()) return code, err }) - if err != nil { - log.Fatalf(ctx, "gcpqueue.New: %v", err) } reporter := cmdconfig.Reporter(ctx, cfg) diff --git a/internal/frontend/fetchserver/fetch_test.go b/internal/frontend/fetchserver/fetch_test.go index 7f05fd56..91430a2d 100644 --- a/internal/frontend/fetchserver/fetch_test.go +++ b/internal/frontend/fetchserver/fetch_test.go @@ -18,7 +18,7 @@ import ( "golang.org/x/pkgsite/internal/frontend" "golang.org/x/pkgsite/internal/postgres" "golang.org/x/pkgsite/internal/proxy/proxytest" - "golang.org/x/pkgsite/internal/queue" + "golang.org/x/pkgsite/internal/queue/inmemqueue" "golang.org/x/pkgsite/internal/source" "golang.org/x/pkgsite/internal/testing/sample" "golang.org/x/pkgsite/internal/testing/testhelper" @@ -50,7 +50,7 @@ func newTestServerWithFetch(t *testing.T, proxyModules []*proxytest.Module, cach sourceClient := source.NewClient(http.DefaultClient) ctx := context.Background() - q := queue.NewInMemory(ctx, 1, nil, + q := inmemqueue.New(ctx, 1, nil, func(ctx context.Context, mpath, version string) (int, error) { return FetchAndUpdateState(ctx, mpath, version, proxyClient, sourceClient, testDB) }) diff --git a/internal/queue/gcpqueue/queue.go b/internal/queue/gcpqueue/queue.go index e007bf0f..c4e82465 100644 --- a/internal/queue/gcpqueue/queue.go +++ b/internal/queue/gcpqueue/queue.go @@ -18,7 +18,6 @@ import ( cloudtasks "cloud.google.com/go/cloudtasks/apiv2" taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb" - "golang.org/x/pkgsite/internal/config/serverconfig" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" @@ -27,27 +26,12 @@ import ( "golang.org/x/pkgsite/internal/config" "golang.org/x/pkgsite/internal/derrors" "golang.org/x/pkgsite/internal/log" - "golang.org/x/pkgsite/internal/middleware" "golang.org/x/pkgsite/internal/queue" ) // New creates a new Queue with name queueName based on the configuration // in cfg. When running locally, Queue uses numWorkers concurrent workers. -func New(ctx context.Context, cfg *config.Config, queueName string, numWorkers int, expGetter middleware.ExperimentGetter, processFunc queue.InMemoryProcessFunc) (queue.Queue, error) { - if !serverconfig.OnGCP() { - experiments, err := expGetter(ctx) - if err != nil { - return nil, err - } - var names []string - for _, e := range experiments { - if e.Rollout > 0 { - names = append(names, e.Name) - } - } - return queue.NewInMemory(ctx, numWorkers, names, processFunc), nil - } - +func New(ctx context.Context, cfg *config.Config, queueName string, numWorkers int) (queue.Queue, error) { client, err := cloudtasks.NewClient(ctx) if err != nil { return nil, err diff --git a/internal/queue/inmemqueue/queue.go b/internal/queue/inmemqueue/queue.go new file mode 100644 index 00000000..f930f5d5 --- /dev/null +++ b/internal/queue/inmemqueue/queue.go @@ -0,0 +1,90 @@ +// Copyright 2026 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package inmemqueue provides an in-memory queue implementation that can be +// used for scheduling of fetch actions. +package inmemqueue + +import ( + "context" + "fmt" + "time" + + "golang.org/x/pkgsite/internal" + "golang.org/x/pkgsite/internal/experiment" + "golang.org/x/pkgsite/internal/log" + "golang.org/x/pkgsite/internal/queue" +) + +// InMemory is a Queue implementation that schedules in-process fetch +// operations. Unlike the GCP task queue, it will not automatically retry tasks +// on failure. +// +// This should only be used for local development. +type InMemory struct { + queue chan internal.Modver + done chan struct{} + experiments []string +} + +type InMemoryProcessFunc func(context.Context, string, string) (int, error) + +// New creates a new InMemory that asynchronously fetches from proxyClient and +// stores in db. It uses workerCount parallelism to execute these fetches. +func New(ctx context.Context, workerCount int, experiments []string, processFunc InMemoryProcessFunc) *InMemory { + q := &InMemory{ + queue: make(chan internal.Modver, 1000), + experiments: experiments, + done: make(chan struct{}), + } + sem := make(chan struct{}, workerCount) + go func() { + for v := range q.queue { + select { + case <-ctx.Done(): + return + case sem <- struct{}{}: + } + + // If a worker is available, make a request to the fetch service inside a + // goroutine and wait for it to finish. + go func(v internal.Modver) { + defer func() { <-sem }() + + log.Infof(ctx, "Fetch requested: %s (workerCount = %d)", v, cap(sem)) + + fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + fetchCtx = experiment.NewContext(fetchCtx, experiments...) + defer cancel() + + if _, err := processFunc(fetchCtx, v.Path, v.Version); err != nil { + log.Error(fetchCtx, err) + } + }(v) + } + for i := 0; i < cap(sem); i++ { + select { + case <-ctx.Done(): + panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err())) + case sem <- struct{}{}: + } + } + close(q.done) + }() + return q +} + +// ScheduleFetch pushes a fetch task into the local queue to be processed +// asynchronously. +func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version string, _ *queue.Options) (bool, error) { + q.queue <- internal.Modver{Path: modulePath, Version: version} + return true, nil +} + +// WaitForTesting waits for all queued requests to finish. It should only be +// used by test code. +func (q *InMemory) WaitForTesting(ctx context.Context) { + close(q.queue) + <-q.done +} diff --git a/internal/queue/inmemqueue/queue_test.go b/internal/queue/inmemqueue/queue_test.go new file mode 100644 index 00000000..79847140 --- /dev/null +++ b/internal/queue/inmemqueue/queue_test.go @@ -0,0 +1,118 @@ +// Copyright 2026 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package inmemqueue + +import ( + "context" + "errors" + "sync" + "testing" +) + +func TestInMemory_ScheduleAndProcess(t *testing.T) { + ctx := context.Background() + + var processed int + var mu sync.Mutex + + processFunc := func(ctx context.Context, path, version string) (int, error) { + mu.Lock() + processed++ + mu.Unlock() + return 200, nil + } + + q := New(ctx, 2, nil, processFunc) + + tests := []struct { + path string + version string + }{ + {"example.com/mod1", "v1.0.0"}, + {"example.com/mod2", "v1.1.0"}, + {"example.com/mod3", "v2.0.0"}, + } + + for _, tc := range tests { + ok, err := q.ScheduleFetch(ctx, tc.path, tc.version, nil) + if err != nil { + t.Fatalf("ScheduleFetch(%q, %q) returned error: %v", tc.path, tc.version, err) + } + if !ok { + t.Errorf("ScheduleFetch(%q, %q) = false, want true", tc.path, tc.version) + } + } + + q.WaitForTesting(ctx) + + if got, want := processed, len(tests); got != want { + t.Errorf("processed tasks = %d, want %d", got, want) + } +} + +func TestInMemory_ProcessFuncError(t *testing.T) { + ctx := context.Background() + + processFunc := func(ctx context.Context, path, version string) (int, error) { + return 500, errors.New("simulated fetch error") + } + + q := New(ctx, 1, nil, processFunc) + + _, err := q.ScheduleFetch(ctx, "example.com/error-mod", "v1.0.0", nil) + if err != nil { + t.Fatalf("ScheduleFetch returned error: %v", err) + } + + // This should complete without panicking or deadlocking. + q.WaitForTesting(ctx) +} + +func TestInMemory_WorkerConcurrency(t *testing.T) { + ctx := context.Background() + workerCount := 2 + + var active, max int + var mu sync.Mutex + var wg sync.WaitGroup + + blockCh := make(chan struct{}) + + processFunc := func(ctx context.Context, path, version string) (int, error) { + mu.Lock() + active++ + if active > max { + max = active + } + mu.Unlock() + + <-blockCh + + mu.Lock() + active-- + mu.Unlock() + wg.Done() + return 200, nil + } + + q := New(ctx, workerCount, nil, processFunc) + + taskCount := 5 + wg.Add(taskCount) + for range taskCount { + _, err := q.ScheduleFetch(ctx, "example.com/concurrent", "v1.0.0", nil) + if err != nil { + t.Fatalf("ScheduleFetch returned error: %v", err) + } + } + + close(blockCh) + wg.Wait() + q.WaitForTesting(ctx) + + if max > workerCount { + t.Errorf("max concurrent workers = %d, want <= %d", max, workerCount) + } +} diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 00b580c7..58ebdf8a 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -8,12 +8,6 @@ package queue import ( "context" - "fmt" - "time" - - "golang.org/x/pkgsite/internal" - "golang.org/x/pkgsite/internal/experiment" - "golang.org/x/pkgsite/internal/log" ) // A Queue provides an interface for asynchronous scheduling of fetch actions. @@ -43,76 +37,3 @@ const ( SourceFrontendValue = "frontend" SourceWorkerValue = "worker" ) - -// InMemory is a Queue implementation that schedules in-process fetch -// operations. Unlike the GCP task queue, it will not automatically retry tasks -// on failure. -// -// This should only be used for local development. -type InMemory struct { - queue chan internal.Modver - done chan struct{} - experiments []string -} - -type InMemoryProcessFunc func(context.Context, string, string) (int, error) - -// NewInMemory creates a new InMemory that asynchronously fetches -// from proxyClient and stores in db. It uses workerCount parallelism to -// execute these fetches. -func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc InMemoryProcessFunc) *InMemory { - q := &InMemory{ - queue: make(chan internal.Modver, 1000), - experiments: experiments, - done: make(chan struct{}), - } - sem := make(chan struct{}, workerCount) - go func() { - for v := range q.queue { - select { - case <-ctx.Done(): - return - case sem <- struct{}{}: - } - - // If a worker is available, make a request to the fetch service inside a - // goroutine and wait for it to finish. - go func(v internal.Modver) { - defer func() { <-sem }() - - log.Infof(ctx, "Fetch requested: %s (workerCount = %d)", v, cap(sem)) - - fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) - fetchCtx = experiment.NewContext(fetchCtx, experiments...) - defer cancel() - - if _, err := processFunc(fetchCtx, v.Path, v.Version); err != nil { - log.Error(fetchCtx, err) - } - }(v) - } - for i := 0; i < cap(sem); i++ { - select { - case <-ctx.Done(): - panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err())) - case sem <- struct{}{}: - } - } - close(q.done) - }() - return q -} - -// ScheduleFetch pushes a fetch task into the local queue to be processed -// asynchronously. -func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version string, _ *Options) (bool, error) { - q.queue <- internal.Modver{Path: modulePath, Version: version} - return true, nil -} - -// WaitForTesting waits for all queued requests to finish. It should only be -// used by test code. -func (q *InMemory) WaitForTesting(ctx context.Context) { - close(q.queue) - <-q.done -} diff --git a/internal/testing/integration/frontend_test.go b/internal/testing/integration/frontend_test.go index e9cbe997..66a6aa6e 100644 --- a/internal/testing/integration/frontend_test.go +++ b/internal/testing/integration/frontend_test.go @@ -25,6 +25,7 @@ import ( "golang.org/x/pkgsite/internal/proxy" "golang.org/x/pkgsite/internal/proxy/proxytest" "golang.org/x/pkgsite/internal/queue" + "golang.org/x/pkgsite/internal/queue/inmemqueue" "golang.org/x/pkgsite/internal/source" "golang.org/x/pkgsite/internal/testing/htmlcheck" ) @@ -89,7 +90,7 @@ func setupQueue(ctx context.Context, t *testing.T, proxyModules []*proxytest.Mod cctx, cancel := context.WithCancel(ctx) proxyClient, teardown := proxytest.SetupTestClient(t, proxyModules) sourceClient := source.NewClient(http.DefaultClient) - q := queue.NewInMemory(cctx, 1, experimentNames, + q := inmemqueue.New(cctx, 1, experimentNames, func(ctx context.Context, mpath, version string) (_ int, err error) { return fetchserver.FetchAndUpdateState(ctx, mpath, version, proxyClient, sourceClient, testDB) }) diff --git a/internal/testing/integration/worker_test.go b/internal/testing/integration/worker_test.go index d6f75766..1edf9620 100644 --- a/internal/testing/integration/worker_test.go +++ b/internal/testing/integration/worker_test.go @@ -22,13 +22,13 @@ import ( "golang.org/x/pkgsite/internal/config" "golang.org/x/pkgsite/internal/index" "golang.org/x/pkgsite/internal/proxy" - "golang.org/x/pkgsite/internal/queue" + "golang.org/x/pkgsite/internal/queue/inmemqueue" "golang.org/x/pkgsite/internal/source" "golang.org/x/pkgsite/internal/worker" ) func setupWorker(ctx context.Context, t *testing.T, proxyClient *proxy.Client, indexClient *index.Client, - redisCacheClient *redis.Client) (*httptest.Server, *worker.Fetcher, *queue.InMemory) { + redisCacheClient *redis.Client) (*httptest.Server, *worker.Fetcher, *inmemqueue.InMemory) { t.Helper() fetcher := &worker.Fetcher{ @@ -39,7 +39,7 @@ func setupWorker(ctx context.Context, t *testing.T, proxyClient *proxy.Client, i } // TODO: it would be better if InMemory made http requests // back to worker, rather than calling fetch itself. - queue := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) { + queue := inmemqueue.New(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) { code, _, err := fetcher.FetchAndUpdateState(ctx, mpath, version, "test") return code, err }) diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index 55108907..427dce9e 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -24,7 +24,7 @@ import ( "golang.org/x/pkgsite/internal/index" "golang.org/x/pkgsite/internal/postgres" "golang.org/x/pkgsite/internal/proxy/proxytest" - "golang.org/x/pkgsite/internal/queue" + "golang.org/x/pkgsite/internal/queue/inmemqueue" "golang.org/x/pkgsite/internal/source" "golang.org/x/pkgsite/internal/testing/sample" ) @@ -178,7 +178,7 @@ func TestWorker(t *testing.T) { f := &Fetcher{proxyClient, source.NewClient(http.DefaultClient), testDB, nil, nil, ""} // Use 10 workers to have parallelism consistent with the worker binary. - q := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) { + q := inmemqueue.New(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) { code, _, err := f.FetchAndUpdateState(ctx, mpath, version, "") return code, err }) |
