aboutsummaryrefslogtreecommitdiff
path: root/internal/queue/queue.go
diff options
context:
space:
mode:
authorJean Barkhuysen <jean.barkhuysen@gmail.com>2026-02-23 06:13:17 -0700
committerGopher Robot <gobot@golang.org>2026-02-25 06:09:15 -0800
commitd29b966ca794634ccd1758dcd59cd2c8baf23422 (patch)
treedbb5e7699d7ba11a6b8c657b50edcb373a157909 /internal/queue/queue.go
parentdfa517ef2f803e000bd058998099e76648f9e385 (diff)
downloadgo-x-pkgsite-d29b966ca794634ccd1758dcd59cd2c8baf23422.tar.xz
internal/queue: move InMemory queue to its own package
Currently, InMemory queue sits in queue, and is instantiated in gcpqueue for convenience. In preparation for a third queue type (Postgres), this CL separates the two more cleanly, making it more ergonomic for the new queue type to slot in next to the existing two. This CL doesn't change any logic: it just exists to make the next CL smaller and easier to review. I also took the liberty of adding some tests specific to the InMemory queue, since I didn't find any. Let me know if it's tested in another place, though, and if you'd prefer me to remove it. Updates golang/go#74027. Change-Id: I44bd92129f33bc7975fcd138c905e0b7ab49d257 Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/747881 kokoro-CI: kokoro <noreply+kokoro@google.com> Auto-Submit: Jonathan Amsterdam <jba@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Reviewed-by: Ethan Lee <ethanalee@google.com> Reviewed-by: Jonathan Amsterdam <jba@google.com>
Diffstat (limited to 'internal/queue/queue.go')
-rw-r--r--internal/queue/queue.go79
1 files changed, 0 insertions, 79 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 00b580c7..58ebdf8a 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -8,12 +8,6 @@ package queue
import (
"context"
- "fmt"
- "time"
-
- "golang.org/x/pkgsite/internal"
- "golang.org/x/pkgsite/internal/experiment"
- "golang.org/x/pkgsite/internal/log"
)
// A Queue provides an interface for asynchronous scheduling of fetch actions.
@@ -43,76 +37,3 @@ const (
SourceFrontendValue = "frontend"
SourceWorkerValue = "worker"
)
-
-// InMemory is a Queue implementation that schedules in-process fetch
-// operations. Unlike the GCP task queue, it will not automatically retry tasks
-// on failure.
-//
-// This should only be used for local development.
-type InMemory struct {
- queue chan internal.Modver
- done chan struct{}
- experiments []string
-}
-
-type InMemoryProcessFunc func(context.Context, string, string) (int, error)
-
-// NewInMemory creates a new InMemory that asynchronously fetches
-// from proxyClient and stores in db. It uses workerCount parallelism to
-// execute these fetches.
-func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc InMemoryProcessFunc) *InMemory {
- q := &InMemory{
- queue: make(chan internal.Modver, 1000),
- experiments: experiments,
- done: make(chan struct{}),
- }
- sem := make(chan struct{}, workerCount)
- go func() {
- for v := range q.queue {
- select {
- case <-ctx.Done():
- return
- case sem <- struct{}{}:
- }
-
- // If a worker is available, make a request to the fetch service inside a
- // goroutine and wait for it to finish.
- go func(v internal.Modver) {
- defer func() { <-sem }()
-
- log.Infof(ctx, "Fetch requested: %s (workerCount = %d)", v, cap(sem))
-
- fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
- fetchCtx = experiment.NewContext(fetchCtx, experiments...)
- defer cancel()
-
- if _, err := processFunc(fetchCtx, v.Path, v.Version); err != nil {
- log.Error(fetchCtx, err)
- }
- }(v)
- }
- for i := 0; i < cap(sem); i++ {
- select {
- case <-ctx.Done():
- panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err()))
- case sem <- struct{}{}:
- }
- }
- close(q.done)
- }()
- return q
-}
-
-// ScheduleFetch pushes a fetch task into the local queue to be processed
-// asynchronously.
-func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version string, _ *Options) (bool, error) {
- q.queue <- internal.Modver{Path: modulePath, Version: version}
- return true, nil
-}
-
-// WaitForTesting waits for all queued requests to finish. It should only be
-// used by test code.
-func (q *InMemory) WaitForTesting(ctx context.Context) {
- close(q.queue)
- <-q.done
-}