aboutsummaryrefslogtreecommitdiff
path: root/internal/queue/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/queue/queue.go')
-rw-r--r--internal/queue/queue.go79
1 files changed, 0 insertions, 79 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 00b580c7..58ebdf8a 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -8,12 +8,6 @@ package queue
import (
"context"
- "fmt"
- "time"
-
- "golang.org/x/pkgsite/internal"
- "golang.org/x/pkgsite/internal/experiment"
- "golang.org/x/pkgsite/internal/log"
)
// A Queue provides an interface for asynchronous scheduling of fetch actions.
@@ -43,76 +37,3 @@ const (
SourceFrontendValue = "frontend"
SourceWorkerValue = "worker"
)
-
-// InMemory is a Queue implementation that schedules in-process fetch
-// operations. Unlike the GCP task queue, it will not automatically retry tasks
-// on failure.
-//
-// This should only be used for local development.
-type InMemory struct {
- queue chan internal.Modver
- done chan struct{}
- experiments []string
-}
-
-type InMemoryProcessFunc func(context.Context, string, string) (int, error)
-
-// NewInMemory creates a new InMemory that asynchronously fetches
-// from proxyClient and stores in db. It uses workerCount parallelism to
-// execute these fetches.
-func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc InMemoryProcessFunc) *InMemory {
- q := &InMemory{
- queue: make(chan internal.Modver, 1000),
- experiments: experiments,
- done: make(chan struct{}),
- }
- sem := make(chan struct{}, workerCount)
- go func() {
- for v := range q.queue {
- select {
- case <-ctx.Done():
- return
- case sem <- struct{}{}:
- }
-
- // If a worker is available, make a request to the fetch service inside a
- // goroutine and wait for it to finish.
- go func(v internal.Modver) {
- defer func() { <-sem }()
-
- log.Infof(ctx, "Fetch requested: %s (workerCount = %d)", v, cap(sem))
-
- fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
- fetchCtx = experiment.NewContext(fetchCtx, experiments...)
- defer cancel()
-
- if _, err := processFunc(fetchCtx, v.Path, v.Version); err != nil {
- log.Error(fetchCtx, err)
- }
- }(v)
- }
- for i := 0; i < cap(sem); i++ {
- select {
- case <-ctx.Done():
- panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err()))
- case sem <- struct{}{}:
- }
- }
- close(q.done)
- }()
- return q
-}
-
-// ScheduleFetch pushes a fetch task into the local queue to be processed
-// asynchronously.
-func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version string, _ *Options) (bool, error) {
- q.queue <- internal.Modver{Path: modulePath, Version: version}
- return true, nil
-}
-
-// WaitForTesting waits for all queued requests to finish. It should only be
-// used by test code.
-func (q *InMemory) WaitForTesting(ctx context.Context) {
- close(q.queue)
- <-q.done
-}