aboutsummaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDamien Neil <dneil@google.com>2025-05-20 15:56:43 -0700
committerGopher Robot <gobot@golang.org>2025-05-29 10:26:00 -0700
commitb170c7e94c478e616d194af95caa7747d9fa4725 (patch)
treeb77064b4707cc99d60d3727ba62a8fbc1a16ee25 /src/sync
parent3b77085b40bf0d53528d6852d07c00c81021c855 (diff)
downloadgo-b170c7e94c478e616d194af95caa7747d9fa4725.tar.xz
runtime, internal/synctest, sync: associate WaitGroups with bubbles
Add support to internal/synctest for managing associations between arbitrary pointers and synctest bubbles. (Implemented internally to the runtime package by attaching a special to the pointer.) Associate WaitGroups with bubbles. Since WaitGroups don't have a constructor, perform the association when Add is called. All Add calls must be made from within the same bubble, or outside any bubble. When a bubbled goroutine calls WaitGroup.Wait, the wait is durably blocking iff the WaitGroup is associated with the current bubble. Change-Id: I77e2701e734ac2fa2b32b28d5b0c853b7b2825c9 Reviewed-on: https://go-review.googlesource.com/c/go/+/676656 Reviewed-by: Michael Knyszek <mknyszek@google.com> Reviewed-by: Michael Pratt <mpratt@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Auto-Submit: Damien Neil <dneil@google.com>
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/runtime.go2
-rw-r--r--src/sync/waitgroup.go53
2 files changed, 51 insertions, 4 deletions
diff --git a/src/sync/runtime.go b/src/sync/runtime.go
index 99e5bccbee..ae3368e58d 100644
--- a/src/sync/runtime.go
+++ b/src/sync/runtime.go
@@ -14,7 +14,7 @@ import "unsafe"
func runtime_Semacquire(s *uint32)
// SemacquireWaitGroup is like Semacquire, but for WaitGroup.Wait.
-func runtime_SemacquireWaitGroup(s *uint32)
+func runtime_SemacquireWaitGroup(s *uint32, synctestDurable bool)
// Semacquire(RW)Mutex(R) is like Semacquire, but for profiling contended
// Mutexes and RWMutexes.
diff --git a/src/sync/waitgroup.go b/src/sync/waitgroup.go
index c850f58ed1..efc63be099 100644
--- a/src/sync/waitgroup.go
+++ b/src/sync/waitgroup.go
@@ -6,6 +6,7 @@ package sync
import (
"internal/race"
+ "internal/synctest"
"sync/atomic"
"unsafe"
)
@@ -47,10 +48,17 @@ import (
type WaitGroup struct {
noCopy noCopy
- state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
+ // Bits (high to low):
+ // bits[0:32] counter
+ // bits[32] flag: synctest bubble membership
+ // bits[33:64] wait count
+ state atomic.Uint64
sema uint32
}
+// waitGroupBubbleFlag indicates that a WaitGroup is associated with a synctest bubble.
+const waitGroupBubbleFlag = 0x8000_0000
+
// Add adds delta, which may be negative, to the [WaitGroup] task counter.
// If the counter becomes zero, all goroutines blocked on [WaitGroup.Wait] are released.
// If the counter goes negative, Add panics.
@@ -75,9 +83,27 @@ func (wg *WaitGroup) Add(delta int) {
race.Disable()
defer race.Enable()
}
+ if synctest.IsInBubble() {
+ // If Add is called from within a bubble, then all Add calls must be made
+ // from the same bubble.
+ if !synctest.Associate(wg) {
+ // wg is already associated with a different bubble.
+ fatal("sync: WaitGroup.Add called from multiple synctest bubbles")
+ } else {
+ state := wg.state.Or(waitGroupBubbleFlag)
+ if state != 0 && state&waitGroupBubbleFlag == 0 {
+ // Add has been called from outside this bubble.
+ fatal("sync: WaitGroup.Add called from inside and outside synctest bubble")
+ }
+ }
+ }
state := wg.state.Add(uint64(delta) << 32)
+ if state&waitGroupBubbleFlag != 0 && !synctest.IsInBubble() {
+ // Add has been called from within a synctest bubble (and we aren't in one).
+ fatal("sync: WaitGroup.Add called from inside and outside synctest bubble")
+ }
v := int32(state >> 32)
- w := uint32(state)
+ w := uint32(state & 0x7fffffff)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
@@ -90,6 +116,13 @@ func (wg *WaitGroup) Add(delta int) {
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
+ if v == 0 && state&waitGroupBubbleFlag != 0 {
+ // Disassociate the WaitGroup from its bubble.
+ synctest.Disassociate(wg)
+ if w == 0 {
+ wg.state.Store(0)
+ }
+ }
if v > 0 || w == 0 {
return
}
@@ -147,7 +180,21 @@ func (wg *WaitGroup) Wait() {
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(&wg.sema))
}
- runtime_SemacquireWaitGroup(&wg.sema)
+ synctestDurable := false
+ if state&waitGroupBubbleFlag != 0 && synctest.IsInBubble() {
+ if race.Enabled {
+ race.Enable()
+ }
+ if synctest.IsAssociated(wg) {
+ // Add was called within the current bubble,
+ // so this Wait is durably blocking.
+ synctestDurable = true
+ }
+ if race.Enabled {
+ race.Disable()
+ }
+ }
+ runtime_SemacquireWaitGroup(&wg.sema, synctestDurable)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}