diff options
| author | Damien Neil <dneil@google.com> | 2025-06-02 09:26:27 -0700 |
|---|---|---|
| committer | Gopher Robot <gobot@golang.org> | 2025-06-04 09:20:21 -0700 |
| commit | 3432c68467d50ffc622fed230a37cd401d82d4bf (patch) | |
| tree | 68e8152e4ba53aef6cb685d95ccc891b14d8c272 /src/runtime | |
| parent | 1aa336209363d9715e145244c7b22620ac0f0584 (diff) | |
| download | go-3432c68467d50ffc622fed230a37cd401d82d4bf.tar.xz | |
runtime: make bubbled timers more consistent with unbubbled
This CL makes two changes to reduce the predictability
with which bubbled timers fire.
When asynctimerchan=0 (the default), regular timers with an associated
channel are only added to a timer heap when some channel operation
is blocked on that channel. This allows us to garbage collect
unreferenced, unstopped timers. Timers in a synctest bubble, in
contrast, are always added to the bubble's timer heap.
This CL changes bubbled timers with a channel to be handled the
same as unbubbled ones, adding them to the bubble's timer heap only
when some channel operation is blocked on the timer's channel.
This permits unstopped bubbled timers to be garbage collected,
but more importantly it makes all timers past their deadline
behave identically, regardless of whether they are in a bubble.
This CL also changes timer scheduling to execute bubbled timers
immediately when possible rather than adding them to a heap.
Timers in a bubble's heap are executed when the bubble is idle.
Executing timers immediately avoids creating a predictable
order of execution.
For #73850
Fixes #73934
Change-Id: If82e441546408f780f6af6fb7f6e416d3160295d
Reviewed-on: https://go-review.googlesource.com/c/go/+/678075
Auto-Submit: Damien Neil <dneil@google.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Diffstat (limited to 'src/runtime')
| -rw-r--r-- | src/runtime/chan.go | 6 | ||||
| -rw-r--r-- | src/runtime/proc.go | 4 | ||||
| -rw-r--r-- | src/runtime/select.go | 2 | ||||
| -rw-r--r-- | src/runtime/synctest.go | 3 | ||||
| -rw-r--r-- | src/runtime/time.go | 97 |
5 files changed, 60 insertions, 52 deletions
diff --git a/src/runtime/chan.go b/src/runtime/chan.go index df48267e97..bb554ebfdb 100644 --- a/src/runtime/chan.go +++ b/src/runtime/chan.go @@ -497,7 +497,7 @@ func empty(c *hchan) bool { // c.timer is also immutable (it is set after make(chan) but before any channel operations). // All timer channels have dataqsiz > 0. if c.timer != nil { - c.timer.maybeRunChan() + c.timer.maybeRunChan(c) } return atomic.Loaduint(&c.qcount) == 0 } @@ -542,7 +542,7 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) } if c.timer != nil { - c.timer.maybeRunChan() + c.timer.maybeRunChan(c) } // Fast path: check for failed non-blocking operation without acquiring the lock. @@ -821,7 +821,7 @@ func chanlen(c *hchan) int { } async := debug.asynctimerchan.Load() != 0 if c.timer != nil && async { - c.timer.maybeRunChan() + c.timer.maybeRunChan(c) } if c.timer != nil && !async { // timer channels have a buffered implementation diff --git a/src/runtime/proc.go b/src/runtime/proc.go index 5b8db2bee4..37a7b7f684 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -3341,7 +3341,7 @@ top: // which may steal timers. It's important that between now // and then, nothing blocks, so these numbers remain mostly // relevant. - now, pollUntil, _ := pp.timers.check(0) + now, pollUntil, _ := pp.timers.check(0, nil) // Try to schedule the trace reader. if traceEnabled() || traceShuttingDown() { @@ -3780,7 +3780,7 @@ func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWo // timerpMask tells us whether the P may have timers at all. If it // can't, no need to check at all. if stealTimersOrRunNextG && timerpMask.read(enum.position()) { - tnow, w, ran := p2.timers.check(now) + tnow, w, ran := p2.timers.check(now, nil) now = tnow if w != 0 && (pollUntil == 0 || w < pollUntil) { pollUntil = w diff --git a/src/runtime/select.go b/src/runtime/select.go index 19256df6a6..ae7754b173 100644 --- a/src/runtime/select.go +++ b/src/runtime/select.go @@ -185,7 +185,7 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo } if cas.c.timer != nil { - cas.c.timer.maybeRunChan() + cas.c.timer.maybeRunChan(cas.c) } j := cheaprandn(uint32(norder + 1)) diff --git a/src/runtime/synctest.go b/src/runtime/synctest.go index f676afa20d..c837c792a5 100644 --- a/src/runtime/synctest.go +++ b/src/runtime/synctest.go @@ -185,7 +185,6 @@ func synctestRun(f func()) { } const synctestBaseTime = 946684800000000000 // midnight UTC 2000-01-01 bubble.now = synctestBaseTime - bubble.timers.bubble = bubble lockInit(&bubble.mu, lockRankSynctest) lockInit(&bubble.timers.mu, lockRankTimers) @@ -213,7 +212,7 @@ func synctestRun(f func()) { // so timer goroutines inherit their child race context from g0. curg := gp.m.curg gp.m.curg = nil - gp.bubble.timers.check(gp.bubble.now) + gp.bubble.timers.check(bubble.now, bubble) gp.m.curg = curg }) gopark(synctestidle_c, nil, waitReasonSynctestRun, traceBlockSynctest, 0) diff --git a/src/runtime/time.go b/src/runtime/time.go index a1f8351a1e..4880dce8cd 100644 --- a/src/runtime/time.go +++ b/src/runtime/time.go @@ -157,8 +157,6 @@ type timers struct { // heap[i].when over timers with the timerModified bit set. // If minWhenModified = 0, it means there are no timerModified timers in the heap. minWhenModified atomic.Int64 - - bubble *synctestBubble } type timerWhen struct { @@ -403,7 +401,7 @@ func newTimer(when, period int64, f func(arg any, seq uintptr, delay int64), arg throw("invalid timer channel: no capacity") } } - if gr := getg().bubble; gr != nil { + if bubble := getg().bubble; bubble != nil { t.isFake = true } t.modify(when, period, f, arg, 0) @@ -485,7 +483,7 @@ func (t *timer) maybeRunAsync() { // timer ourselves now is fine.) if now := nanotime(); t.when <= now { systemstack(func() { - t.unlockAndRun(now) // resets t.when + t.unlockAndRun(now, nil) // resets t.when }) t.lock() } @@ -621,6 +619,29 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in add := t.needsAdd() + if add && t.isFake { + // If this is a bubbled timer scheduled to fire immediately, + // run it now rather than waiting for the bubble's timer scheduler. + // This avoids deferring timer execution until after the bubble + // becomes durably blocked. + // + // Don't do this for non-bubbled timers: It isn't necessary, + // and there may be cases where the runtime executes timers with + // the expectation the timer func will not run in the current goroutine. + // Bubbled timers are always created by the time package, and are + // safe to run in the current goroutine. + bubble := getg().bubble + if bubble == nil { + throw("fake timer executing with no bubble") + } + if t.state&timerHeaped == 0 && when <= bubble.now { + systemstack(func() { + t.unlockAndRun(bubble.now, bubble) + }) + return pending + } + } + if !async && t.isChan { // Stop any future sends with stale values. // See timer.unlockAndRun. @@ -657,7 +678,7 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in // t must be locked. func (t *timer) needsAdd() bool { assertLockHeld(&t.mu) - need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.isFake || t.blocked > 0) + need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.blocked > 0) if need { t.trace("needsAdd+") } else { @@ -982,7 +1003,7 @@ func (ts *timers) wakeTime() int64 { // We pass now in and out to avoid extra calls of nanotime. // //go:yeswritebarrierrec -func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) { +func (ts *timers) check(now int64, bubble *synctestBubble) (rnow, pollUntil int64, ran bool) { ts.trace("check") // If it's not yet time for the first timer, or the first adjusted // timer, then there is nothing to do. @@ -1015,7 +1036,7 @@ func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) { ts.adjust(now, false) for len(ts.heap) > 0 { // Note that runtimer may temporarily unlock ts. - if tw := ts.run(now); tw != 0 { + if tw := ts.run(now, bubble); tw != 0 { if tw > 0 { pollUntil = tw } @@ -1047,7 +1068,7 @@ func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) { // If a timer is run, this will temporarily unlock ts. // //go:systemstack -func (ts *timers) run(now int64) int64 { +func (ts *timers) run(now int64, bubble *synctestBubble) int64 { ts.trace("run") assertLockHeld(&ts.mu) Redo: @@ -1081,7 +1102,7 @@ Redo: return t.when } - t.unlockAndRun(now) + t.unlockAndRun(now, bubble) assertLockHeld(&ts.mu) // t is unlocked now, but not ts return 0 } @@ -1092,7 +1113,7 @@ Redo: // unlockAndRun returns with t unlocked and t.ts (re-)locked. // //go:systemstack -func (t *timer) unlockAndRun(now int64) { +func (t *timer) unlockAndRun(now int64, bubble *synctestBubble) { t.trace("unlockAndRun") assertLockHeld(&t.mu) if t.ts != nil { @@ -1104,10 +1125,10 @@ func (t *timer) unlockAndRun(now int64) { // out from under us while this function executes. gp := getg() var tsLocal *timers - if t.ts == nil || t.ts.bubble == nil { + if bubble == nil { tsLocal = &gp.m.p.ptr().timers } else { - tsLocal = &t.ts.bubble.timers + tsLocal = &bubble.timers } if tsLocal.raceCtx == 0 { tsLocal.raceCtx = racegostart(abi.FuncPCABIInternal((*timers).run) + sys.PCQuantum) @@ -1160,10 +1181,10 @@ func (t *timer) unlockAndRun(now int64) { if gp.racectx != 0 { throw("unexpected racectx") } - if ts == nil || ts.bubble == nil { + if bubble == nil { gp.racectx = gp.m.p.ptr().timers.raceCtx } else { - gp.racectx = ts.bubble.timers.raceCtx + gp.racectx = bubble.timers.raceCtx } } @@ -1171,14 +1192,14 @@ func (t *timer) unlockAndRun(now int64) { ts.unlock() } - if ts != nil && ts.bubble != nil { + if bubble != nil { // Temporarily use the timer's synctest group for the G running this timer. gp := getg() if gp.bubble != nil { throw("unexpected syncgroup set") } - gp.bubble = ts.bubble - ts.bubble.changegstatus(gp, _Gdead, _Grunning) + gp.bubble = bubble + bubble.changegstatus(gp, _Gdead, _Grunning) } if !async && t.isChan { @@ -1222,13 +1243,13 @@ func (t *timer) unlockAndRun(now int64) { unlock(&t.sendLock) } - if ts != nil && ts.bubble != nil { + if bubble != nil { gp := getg() - ts.bubble.changegstatus(gp, _Grunning, _Gdead) + bubble.changegstatus(gp, _Grunning, _Gdead) if raceenabled { // Establish a happens-before between this timer event and // the next synctest.Wait call. - racereleasemergeg(gp, ts.bubble.raceaddr()) + racereleasemergeg(gp, bubble.raceaddr()) } gp.bubble = nil } @@ -1415,24 +1436,10 @@ func badTimer() { // maybeRunChan checks whether the timer needs to run // to send a value to its associated channel. If so, it does. // The timer must not be locked. -func (t *timer) maybeRunChan() { - if t.isFake { - t.lock() - var timerBubble *synctestBubble - if t.ts != nil { - timerBubble = t.ts.bubble - } - t.unlock() - bubble := getg().bubble - if bubble == nil { - panic(plainError("synctest timer accessed from outside bubble")) - } - if timerBubble != nil && bubble != timerBubble { - panic(plainError("timer moved between synctest bubbles")) - } - // No need to do anything here. - // synctest.Run will run the timer when it advances its fake clock. - return +func (t *timer) maybeRunChan(c *hchan) { + if t.isFake && getg().bubble != c.bubble { + // This should have been checked by the caller, but check just in case. + fatal("synctest timer accessed from outside bubble") } if t.astate.Load()&timerHeaped != 0 { // If the timer is in the heap, the ordinary timer code @@ -1442,6 +1449,9 @@ func (t *timer) maybeRunChan() { t.lock() now := nanotime() + if t.isFake { + now = getg().bubble.now + } if t.state&timerHeaped != 0 || t.when == 0 || t.when > now { t.trace("maybeRunChan-") // Timer in the heap, or not running at all, or not triggered. @@ -1450,7 +1460,7 @@ func (t *timer) maybeRunChan() { } t.trace("maybeRunChan+") systemstack(func() { - t.unlockAndRun(now) + t.unlockAndRun(now, c.bubble) }) } @@ -1460,9 +1470,11 @@ func (t *timer) maybeRunChan() { // adding it if needed. func blockTimerChan(c *hchan) { t := c.timer - if t.isFake { - return + if t.isFake && c.bubble != getg().bubble { + // This should have been checked by the caller, but check just in case. + fatal("synctest timer accessed from outside bubble") } + t.lock() t.trace("blockTimerChan") if !t.isChan { @@ -1500,9 +1512,6 @@ func blockTimerChan(c *hchan) { // blocked on it anymore. func unblockTimerChan(c *hchan) { t := c.timer - if t.isFake { - return - } t.lock() t.trace("unblockTimerChan") if !t.isChan || t.blocked == 0 { |
