diff options
Diffstat (limited to 'internal/queue/inmemqueue/queue.go')
| -rw-r--r-- | internal/queue/inmemqueue/queue.go | 90 |
1 files changed, 90 insertions, 0 deletions
diff --git a/internal/queue/inmemqueue/queue.go b/internal/queue/inmemqueue/queue.go new file mode 100644 index 00000000..f930f5d5 --- /dev/null +++ b/internal/queue/inmemqueue/queue.go @@ -0,0 +1,90 @@ +// Copyright 2026 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package inmemqueue provides an in-memory queue implementation that can be +// used for scheduling of fetch actions. +package inmemqueue + +import ( + "context" + "fmt" + "time" + + "golang.org/x/pkgsite/internal" + "golang.org/x/pkgsite/internal/experiment" + "golang.org/x/pkgsite/internal/log" + "golang.org/x/pkgsite/internal/queue" +) + +// InMemory is a Queue implementation that schedules in-process fetch +// operations. Unlike the GCP task queue, it will not automatically retry tasks +// on failure. +// +// This should only be used for local development. +type InMemory struct { + queue chan internal.Modver + done chan struct{} + experiments []string +} + +type InMemoryProcessFunc func(context.Context, string, string) (int, error) + +// New creates a new InMemory that asynchronously fetches from proxyClient and +// stores in db. It uses workerCount parallelism to execute these fetches. +func New(ctx context.Context, workerCount int, experiments []string, processFunc InMemoryProcessFunc) *InMemory { + q := &InMemory{ + queue: make(chan internal.Modver, 1000), + experiments: experiments, + done: make(chan struct{}), + } + sem := make(chan struct{}, workerCount) + go func() { + for v := range q.queue { + select { + case <-ctx.Done(): + return + case sem <- struct{}{}: + } + + // If a worker is available, make a request to the fetch service inside a + // goroutine and wait for it to finish. + go func(v internal.Modver) { + defer func() { <-sem }() + + log.Infof(ctx, "Fetch requested: %s (workerCount = %d)", v, cap(sem)) + + fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + fetchCtx = experiment.NewContext(fetchCtx, experiments...) + defer cancel() + + if _, err := processFunc(fetchCtx, v.Path, v.Version); err != nil { + log.Error(fetchCtx, err) + } + }(v) + } + for i := 0; i < cap(sem); i++ { + select { + case <-ctx.Done(): + panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err())) + case sem <- struct{}{}: + } + } + close(q.done) + }() + return q +} + +// ScheduleFetch pushes a fetch task into the local queue to be processed +// asynchronously. +func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version string, _ *queue.Options) (bool, error) { + q.queue <- internal.Modver{Path: modulePath, Version: version} + return true, nil +} + +// WaitForTesting waits for all queued requests to finish. It should only be +// used by test code. +func (q *InMemory) WaitForTesting(ctx context.Context) { + close(q.queue) + <-q.done +} |
