diff options
| author | Damien Neil <dneil@google.com> | 2025-05-20 15:56:43 -0700 |
|---|---|---|
| committer | Gopher Robot <gobot@golang.org> | 2025-05-29 10:26:00 -0700 |
| commit | b170c7e94c478e616d194af95caa7747d9fa4725 (patch) | |
| tree | b77064b4707cc99d60d3727ba62a8fbc1a16ee25 /src/sync | |
| parent | 3b77085b40bf0d53528d6852d07c00c81021c855 (diff) | |
| download | go-b170c7e94c478e616d194af95caa7747d9fa4725.tar.xz | |
runtime, internal/synctest, sync: associate WaitGroups with bubbles
Add support to internal/synctest for managing associations between
arbitrary pointers and synctest bubbles. (Implemented internally to
the runtime package by attaching a special to the pointer.)
Associate WaitGroups with bubbles.
Since WaitGroups don't have a constructor,
perform the association when Add is called.
All Add calls must be made from within the same bubble,
or outside any bubble.
When a bubbled goroutine calls WaitGroup.Wait,
the wait is durably blocking iff the WaitGroup is associated
with the current bubble.
Change-Id: I77e2701e734ac2fa2b32b28d5b0c853b7b2825c9
Reviewed-on: https://go-review.googlesource.com/c/go/+/676656
Reviewed-by: Michael Knyszek <mknyszek@google.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Auto-Submit: Damien Neil <dneil@google.com>
Diffstat (limited to 'src/sync')
| -rw-r--r-- | src/sync/runtime.go | 2 | ||||
| -rw-r--r-- | src/sync/waitgroup.go | 53 |
2 files changed, 51 insertions, 4 deletions
diff --git a/src/sync/runtime.go b/src/sync/runtime.go index 99e5bccbee..ae3368e58d 100644 --- a/src/sync/runtime.go +++ b/src/sync/runtime.go @@ -14,7 +14,7 @@ import "unsafe" func runtime_Semacquire(s *uint32) // SemacquireWaitGroup is like Semacquire, but for WaitGroup.Wait. -func runtime_SemacquireWaitGroup(s *uint32) +func runtime_SemacquireWaitGroup(s *uint32, synctestDurable bool) // Semacquire(RW)Mutex(R) is like Semacquire, but for profiling contended // Mutexes and RWMutexes. diff --git a/src/sync/waitgroup.go b/src/sync/waitgroup.go index c850f58ed1..efc63be099 100644 --- a/src/sync/waitgroup.go +++ b/src/sync/waitgroup.go @@ -6,6 +6,7 @@ package sync import ( "internal/race" + "internal/synctest" "sync/atomic" "unsafe" ) @@ -47,10 +48,17 @@ import ( type WaitGroup struct { noCopy noCopy - state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count. + // Bits (high to low): + // bits[0:32] counter + // bits[32] flag: synctest bubble membership + // bits[33:64] wait count + state atomic.Uint64 sema uint32 } +// waitGroupBubbleFlag indicates that a WaitGroup is associated with a synctest bubble. +const waitGroupBubbleFlag = 0x8000_0000 + // Add adds delta, which may be negative, to the [WaitGroup] task counter. // If the counter becomes zero, all goroutines blocked on [WaitGroup.Wait] are released. // If the counter goes negative, Add panics. @@ -75,9 +83,27 @@ func (wg *WaitGroup) Add(delta int) { race.Disable() defer race.Enable() } + if synctest.IsInBubble() { + // If Add is called from within a bubble, then all Add calls must be made + // from the same bubble. + if !synctest.Associate(wg) { + // wg is already associated with a different bubble. + fatal("sync: WaitGroup.Add called from multiple synctest bubbles") + } else { + state := wg.state.Or(waitGroupBubbleFlag) + if state != 0 && state&waitGroupBubbleFlag == 0 { + // Add has been called from outside this bubble. + fatal("sync: WaitGroup.Add called from inside and outside synctest bubble") + } + } + } state := wg.state.Add(uint64(delta) << 32) + if state&waitGroupBubbleFlag != 0 && !synctest.IsInBubble() { + // Add has been called from within a synctest bubble (and we aren't in one). + fatal("sync: WaitGroup.Add called from inside and outside synctest bubble") + } v := int32(state >> 32) - w := uint32(state) + w := uint32(state & 0x7fffffff) if race.Enabled && delta > 0 && v == int32(delta) { // The first increment must be synchronized with Wait. // Need to model this as a read, because there can be @@ -90,6 +116,13 @@ func (wg *WaitGroup) Add(delta int) { if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } + if v == 0 && state&waitGroupBubbleFlag != 0 { + // Disassociate the WaitGroup from its bubble. + synctest.Disassociate(wg) + if w == 0 { + wg.state.Store(0) + } + } if v > 0 || w == 0 { return } @@ -147,7 +180,21 @@ func (wg *WaitGroup) Wait() { // otherwise concurrent Waits will race with each other. race.Write(unsafe.Pointer(&wg.sema)) } - runtime_SemacquireWaitGroup(&wg.sema) + synctestDurable := false + if state&waitGroupBubbleFlag != 0 && synctest.IsInBubble() { + if race.Enabled { + race.Enable() + } + if synctest.IsAssociated(wg) { + // Add was called within the current bubble, + // so this Wait is durably blocking. + synctestDurable = true + } + if race.Enabled { + race.Disable() + } + } + runtime_SemacquireWaitGroup(&wg.sema, synctestDurable) if wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } |
