aboutsummaryrefslogtreecommitdiff
path: root/internal/queue/queue.go
diff options
context:
space:
mode:
authorJonathan Amsterdam <jba@google.com>2021-06-11 10:15:57 -0400
committerJonathan Amsterdam <jba@google.com>2021-06-11 18:04:46 +0000
commit1c1eedc50b671114ea445f5b95edb250064bdbbb (patch)
tree834ef1d9b82aab7183666bc3f93aae3dfc308417 /internal/queue/queue.go
parenta973529477fa73b703014db727a0ab425b917d40 (diff)
downloadgo-x-pkgsite-1c1eedc50b671114ea445f5b95edb250064bdbbb.tar.xz
internal/queue: fix race condition
InMemory.WaitForTesting was waiting for the set of workers to be empty, but that's not quite the same as the queue being empty: all the workers can finish momentarily before a new one starts up. Wait for the queue itself to be empty. Change-Id: Icf0e970fdb2f45aa30c358c9d4af1083442a0293 Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/327109 Trust: Jonathan Amsterdam <jba@google.com> Run-TryBot: Jonathan Amsterdam <jba@google.com> TryBot-Result: kokoro <noreply+kokoro@google.com> Reviewed-by: Julie Qiu <julie@golang.org>
Diffstat (limited to 'internal/queue/queue.go')
-rw-r--r--internal/queue/queue.go29
1 files changed, 16 insertions, 13 deletions
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 30d92742..b408f3d0 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -221,7 +221,7 @@ type moduleVersion struct {
// This should only be used for local development.
type InMemory struct {
queue chan moduleVersion
- sem chan struct{}
+ done chan struct{}
experiments []string
}
@@ -233,23 +233,24 @@ type inMemoryProcessFunc func(context.Context, string, string) (int, error)
func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc inMemoryProcessFunc) *InMemory {
q := &InMemory{
queue: make(chan moduleVersion, 1000),
- sem: make(chan struct{}, workerCount),
experiments: experiments,
+ done: make(chan struct{}),
}
+ sem := make(chan struct{}, workerCount)
go func() {
for v := range q.queue {
select {
case <-ctx.Done():
return
- case q.sem <- struct{}{}:
+ case sem <- struct{}{}:
}
// If a worker is available, make a request to the fetch service inside a
// goroutine and wait for it to finish.
go func(v moduleVersion) {
- defer func() { <-q.sem }()
+ defer func() { <-sem }()
- log.Infof(ctx, "Fetch requested: %q %q (workerCount = %d)", v.modulePath, v.version, cap(q.sem))
+ log.Infof(ctx, "Fetch requested: %q %q (workerCount = %d)", v.modulePath, v.version, cap(sem))
fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
fetchCtx = experiment.NewContext(fetchCtx, experiments...)
@@ -260,6 +261,14 @@ func NewInMemory(ctx context.Context, workerCount int, experiments []string, pro
}
}(v)
}
+ for i := 0; i < cap(sem); i++ {
+ select {
+ case <-ctx.Done():
+ panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err()))
+ case sem <- struct{}{}:
+ }
+ }
+ close(q.done)
}()
return q
}
@@ -273,13 +282,7 @@ func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, _ str
// WaitForTesting waits for all queued requests to finish. It should only be
// used by test code.
-func (q InMemory) WaitForTesting(ctx context.Context) {
- for i := 0; i < cap(q.sem); i++ {
- select {
- case <-ctx.Done():
- return
- case q.sem <- struct{}{}:
- }
- }
+func (q *InMemory) WaitForTesting(ctx context.Context) {
close(q.queue)
+ <-q.done
}