diff options
| author | Jonathan Amsterdam <jba@google.com> | 2021-06-11 10:15:57 -0400 |
|---|---|---|
| committer | Jonathan Amsterdam <jba@google.com> | 2021-06-11 18:04:46 +0000 |
| commit | 1c1eedc50b671114ea445f5b95edb250064bdbbb (patch) | |
| tree | 834ef1d9b82aab7183666bc3f93aae3dfc308417 /internal/queue | |
| parent | a973529477fa73b703014db727a0ab425b917d40 (diff) | |
| download | go-x-pkgsite-1c1eedc50b671114ea445f5b95edb250064bdbbb.tar.xz | |
internal/queue: fix race condition
InMemory.WaitForTesting was waiting for the set of workers to be
empty, but that's not quite the same as the queue being empty: all the
workers can finish momentarily before a new one starts up.
Wait for the queue itself to be empty.
Change-Id: Icf0e970fdb2f45aa30c358c9d4af1083442a0293
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/327109
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Julie Qiu <julie@golang.org>
Diffstat (limited to 'internal/queue')
| -rw-r--r-- | internal/queue/queue.go | 29 |
1 files changed, 16 insertions, 13 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 30d92742..b408f3d0 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -221,7 +221,7 @@ type moduleVersion struct { // This should only be used for local development. type InMemory struct { queue chan moduleVersion - sem chan struct{} + done chan struct{} experiments []string } @@ -233,23 +233,24 @@ type inMemoryProcessFunc func(context.Context, string, string) (int, error) func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc inMemoryProcessFunc) *InMemory { q := &InMemory{ queue: make(chan moduleVersion, 1000), - sem: make(chan struct{}, workerCount), experiments: experiments, + done: make(chan struct{}), } + sem := make(chan struct{}, workerCount) go func() { for v := range q.queue { select { case <-ctx.Done(): return - case q.sem <- struct{}{}: + case sem <- struct{}{}: } // If a worker is available, make a request to the fetch service inside a // goroutine and wait for it to finish. go func(v moduleVersion) { - defer func() { <-q.sem }() + defer func() { <-sem }() - log.Infof(ctx, "Fetch requested: %q %q (workerCount = %d)", v.modulePath, v.version, cap(q.sem)) + log.Infof(ctx, "Fetch requested: %q %q (workerCount = %d)", v.modulePath, v.version, cap(sem)) fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) fetchCtx = experiment.NewContext(fetchCtx, experiments...) @@ -260,6 +261,14 @@ func NewInMemory(ctx context.Context, workerCount int, experiments []string, pro } }(v) } + for i := 0; i < cap(sem); i++ { + select { + case <-ctx.Done(): + panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err())) + case sem <- struct{}{}: + } + } + close(q.done) }() return q } @@ -273,13 +282,7 @@ func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, _ str // WaitForTesting waits for all queued requests to finish. It should only be // used by test code. -func (q InMemory) WaitForTesting(ctx context.Context) { - for i := 0; i < cap(q.sem); i++ { - select { - case <-ctx.Done(): - return - case q.sem <- struct{}{}: - } - } +func (q *InMemory) WaitForTesting(ctx context.Context) { close(q.queue) + <-q.done } |
