diff options
Diffstat (limited to 'internal/queue/queue.go')
| -rw-r--r-- | internal/queue/queue.go | 79 |
1 files changed, 0 insertions, 79 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 00b580c7..58ebdf8a 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -8,12 +8,6 @@ package queue import ( "context" - "fmt" - "time" - - "golang.org/x/pkgsite/internal" - "golang.org/x/pkgsite/internal/experiment" - "golang.org/x/pkgsite/internal/log" ) // A Queue provides an interface for asynchronous scheduling of fetch actions. @@ -43,76 +37,3 @@ const ( SourceFrontendValue = "frontend" SourceWorkerValue = "worker" ) - -// InMemory is a Queue implementation that schedules in-process fetch -// operations. Unlike the GCP task queue, it will not automatically retry tasks -// on failure. -// -// This should only be used for local development. -type InMemory struct { - queue chan internal.Modver - done chan struct{} - experiments []string -} - -type InMemoryProcessFunc func(context.Context, string, string) (int, error) - -// NewInMemory creates a new InMemory that asynchronously fetches -// from proxyClient and stores in db. It uses workerCount parallelism to -// execute these fetches. -func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc InMemoryProcessFunc) *InMemory { - q := &InMemory{ - queue: make(chan internal.Modver, 1000), - experiments: experiments, - done: make(chan struct{}), - } - sem := make(chan struct{}, workerCount) - go func() { - for v := range q.queue { - select { - case <-ctx.Done(): - return - case sem <- struct{}{}: - } - - // If a worker is available, make a request to the fetch service inside a - // goroutine and wait for it to finish. - go func(v internal.Modver) { - defer func() { <-sem }() - - log.Infof(ctx, "Fetch requested: %s (workerCount = %d)", v, cap(sem)) - - fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) - fetchCtx = experiment.NewContext(fetchCtx, experiments...) - defer cancel() - - if _, err := processFunc(fetchCtx, v.Path, v.Version); err != nil { - log.Error(fetchCtx, err) - } - }(v) - } - for i := 0; i < cap(sem); i++ { - select { - case <-ctx.Done(): - panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err())) - case sem <- struct{}{}: - } - } - close(q.done) - }() - return q -} - -// ScheduleFetch pushes a fetch task into the local queue to be processed -// asynchronously. -func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version string, _ *Options) (bool, error) { - q.queue <- internal.Modver{Path: modulePath, Version: version} - return true, nil -} - -// WaitForTesting waits for all queued requests to finish. It should only be -// used by test code. -func (q *InMemory) WaitForTesting(ctx context.Context) { - close(q.queue) - <-q.done -} |
