diff options
Diffstat (limited to 'internal/queue/queue.go')
| -rw-r--r-- | internal/queue/queue.go | 29 |
1 files changed, 16 insertions, 13 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 30d92742..b408f3d0 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -221,7 +221,7 @@ type moduleVersion struct { // This should only be used for local development. type InMemory struct { queue chan moduleVersion - sem chan struct{} + done chan struct{} experiments []string } @@ -233,23 +233,24 @@ type inMemoryProcessFunc func(context.Context, string, string) (int, error) func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc inMemoryProcessFunc) *InMemory { q := &InMemory{ queue: make(chan moduleVersion, 1000), - sem: make(chan struct{}, workerCount), experiments: experiments, + done: make(chan struct{}), } + sem := make(chan struct{}, workerCount) go func() { for v := range q.queue { select { case <-ctx.Done(): return - case q.sem <- struct{}{}: + case sem <- struct{}{}: } // If a worker is available, make a request to the fetch service inside a // goroutine and wait for it to finish. go func(v moduleVersion) { - defer func() { <-q.sem }() + defer func() { <-sem }() - log.Infof(ctx, "Fetch requested: %q %q (workerCount = %d)", v.modulePath, v.version, cap(q.sem)) + log.Infof(ctx, "Fetch requested: %q %q (workerCount = %d)", v.modulePath, v.version, cap(sem)) fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) fetchCtx = experiment.NewContext(fetchCtx, experiments...) @@ -260,6 +261,14 @@ func NewInMemory(ctx context.Context, workerCount int, experiments []string, pro } }(v) } + for i := 0; i < cap(sem); i++ { + select { + case <-ctx.Done(): + panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err())) + case sem <- struct{}{}: + } + } + close(q.done) }() return q } @@ -273,13 +282,7 @@ func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, _ str // WaitForTesting waits for all queued requests to finish. It should only be // used by test code. -func (q InMemory) WaitForTesting(ctx context.Context) { - for i := 0; i < cap(q.sem); i++ { - select { - case <-ctx.Done(): - return - case q.sem <- struct{}{}: - } - } +func (q *InMemory) WaitForTesting(ctx context.Context) { close(q.queue) + <-q.done } |
