aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/time.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/runtime/time.go')
-rw-r--r--src/runtime/time.go97
1 files changed, 53 insertions, 44 deletions
diff --git a/src/runtime/time.go b/src/runtime/time.go
index a1f8351a1e..4880dce8cd 100644
--- a/src/runtime/time.go
+++ b/src/runtime/time.go
@@ -157,8 +157,6 @@ type timers struct {
// heap[i].when over timers with the timerModified bit set.
// If minWhenModified = 0, it means there are no timerModified timers in the heap.
minWhenModified atomic.Int64
-
- bubble *synctestBubble
}
type timerWhen struct {
@@ -403,7 +401,7 @@ func newTimer(when, period int64, f func(arg any, seq uintptr, delay int64), arg
throw("invalid timer channel: no capacity")
}
}
- if gr := getg().bubble; gr != nil {
+ if bubble := getg().bubble; bubble != nil {
t.isFake = true
}
t.modify(when, period, f, arg, 0)
@@ -485,7 +483,7 @@ func (t *timer) maybeRunAsync() {
// timer ourselves now is fine.)
if now := nanotime(); t.when <= now {
systemstack(func() {
- t.unlockAndRun(now) // resets t.when
+ t.unlockAndRun(now, nil) // resets t.when
})
t.lock()
}
@@ -621,6 +619,29 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in
add := t.needsAdd()
+ if add && t.isFake {
+ // If this is a bubbled timer scheduled to fire immediately,
+ // run it now rather than waiting for the bubble's timer scheduler.
+ // This avoids deferring timer execution until after the bubble
+ // becomes durably blocked.
+ //
+ // Don't do this for non-bubbled timers: It isn't necessary,
+ // and there may be cases where the runtime executes timers with
+ // the expectation the timer func will not run in the current goroutine.
+ // Bubbled timers are always created by the time package, and are
+ // safe to run in the current goroutine.
+ bubble := getg().bubble
+ if bubble == nil {
+ throw("fake timer executing with no bubble")
+ }
+ if t.state&timerHeaped == 0 && when <= bubble.now {
+ systemstack(func() {
+ t.unlockAndRun(bubble.now, bubble)
+ })
+ return pending
+ }
+ }
+
if !async && t.isChan {
// Stop any future sends with stale values.
// See timer.unlockAndRun.
@@ -657,7 +678,7 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in
// t must be locked.
func (t *timer) needsAdd() bool {
assertLockHeld(&t.mu)
- need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.isFake || t.blocked > 0)
+ need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.blocked > 0)
if need {
t.trace("needsAdd+")
} else {
@@ -982,7 +1003,7 @@ func (ts *timers) wakeTime() int64 {
// We pass now in and out to avoid extra calls of nanotime.
//
//go:yeswritebarrierrec
-func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) {
+func (ts *timers) check(now int64, bubble *synctestBubble) (rnow, pollUntil int64, ran bool) {
ts.trace("check")
// If it's not yet time for the first timer, or the first adjusted
// timer, then there is nothing to do.
@@ -1015,7 +1036,7 @@ func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) {
ts.adjust(now, false)
for len(ts.heap) > 0 {
// Note that runtimer may temporarily unlock ts.
- if tw := ts.run(now); tw != 0 {
+ if tw := ts.run(now, bubble); tw != 0 {
if tw > 0 {
pollUntil = tw
}
@@ -1047,7 +1068,7 @@ func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) {
// If a timer is run, this will temporarily unlock ts.
//
//go:systemstack
-func (ts *timers) run(now int64) int64 {
+func (ts *timers) run(now int64, bubble *synctestBubble) int64 {
ts.trace("run")
assertLockHeld(&ts.mu)
Redo:
@@ -1081,7 +1102,7 @@ Redo:
return t.when
}
- t.unlockAndRun(now)
+ t.unlockAndRun(now, bubble)
assertLockHeld(&ts.mu) // t is unlocked now, but not ts
return 0
}
@@ -1092,7 +1113,7 @@ Redo:
// unlockAndRun returns with t unlocked and t.ts (re-)locked.
//
//go:systemstack
-func (t *timer) unlockAndRun(now int64) {
+func (t *timer) unlockAndRun(now int64, bubble *synctestBubble) {
t.trace("unlockAndRun")
assertLockHeld(&t.mu)
if t.ts != nil {
@@ -1104,10 +1125,10 @@ func (t *timer) unlockAndRun(now int64) {
// out from under us while this function executes.
gp := getg()
var tsLocal *timers
- if t.ts == nil || t.ts.bubble == nil {
+ if bubble == nil {
tsLocal = &gp.m.p.ptr().timers
} else {
- tsLocal = &t.ts.bubble.timers
+ tsLocal = &bubble.timers
}
if tsLocal.raceCtx == 0 {
tsLocal.raceCtx = racegostart(abi.FuncPCABIInternal((*timers).run) + sys.PCQuantum)
@@ -1160,10 +1181,10 @@ func (t *timer) unlockAndRun(now int64) {
if gp.racectx != 0 {
throw("unexpected racectx")
}
- if ts == nil || ts.bubble == nil {
+ if bubble == nil {
gp.racectx = gp.m.p.ptr().timers.raceCtx
} else {
- gp.racectx = ts.bubble.timers.raceCtx
+ gp.racectx = bubble.timers.raceCtx
}
}
@@ -1171,14 +1192,14 @@ func (t *timer) unlockAndRun(now int64) {
ts.unlock()
}
- if ts != nil && ts.bubble != nil {
+ if bubble != nil {
// Temporarily use the timer's synctest group for the G running this timer.
gp := getg()
if gp.bubble != nil {
throw("unexpected syncgroup set")
}
- gp.bubble = ts.bubble
- ts.bubble.changegstatus(gp, _Gdead, _Grunning)
+ gp.bubble = bubble
+ bubble.changegstatus(gp, _Gdead, _Grunning)
}
if !async && t.isChan {
@@ -1222,13 +1243,13 @@ func (t *timer) unlockAndRun(now int64) {
unlock(&t.sendLock)
}
- if ts != nil && ts.bubble != nil {
+ if bubble != nil {
gp := getg()
- ts.bubble.changegstatus(gp, _Grunning, _Gdead)
+ bubble.changegstatus(gp, _Grunning, _Gdead)
if raceenabled {
// Establish a happens-before between this timer event and
// the next synctest.Wait call.
- racereleasemergeg(gp, ts.bubble.raceaddr())
+ racereleasemergeg(gp, bubble.raceaddr())
}
gp.bubble = nil
}
@@ -1415,24 +1436,10 @@ func badTimer() {
// maybeRunChan checks whether the timer needs to run
// to send a value to its associated channel. If so, it does.
// The timer must not be locked.
-func (t *timer) maybeRunChan() {
- if t.isFake {
- t.lock()
- var timerBubble *synctestBubble
- if t.ts != nil {
- timerBubble = t.ts.bubble
- }
- t.unlock()
- bubble := getg().bubble
- if bubble == nil {
- panic(plainError("synctest timer accessed from outside bubble"))
- }
- if timerBubble != nil && bubble != timerBubble {
- panic(plainError("timer moved between synctest bubbles"))
- }
- // No need to do anything here.
- // synctest.Run will run the timer when it advances its fake clock.
- return
+func (t *timer) maybeRunChan(c *hchan) {
+ if t.isFake && getg().bubble != c.bubble {
+ // This should have been checked by the caller, but check just in case.
+ fatal("synctest timer accessed from outside bubble")
}
if t.astate.Load()&timerHeaped != 0 {
// If the timer is in the heap, the ordinary timer code
@@ -1442,6 +1449,9 @@ func (t *timer) maybeRunChan() {
t.lock()
now := nanotime()
+ if t.isFake {
+ now = getg().bubble.now
+ }
if t.state&timerHeaped != 0 || t.when == 0 || t.when > now {
t.trace("maybeRunChan-")
// Timer in the heap, or not running at all, or not triggered.
@@ -1450,7 +1460,7 @@ func (t *timer) maybeRunChan() {
}
t.trace("maybeRunChan+")
systemstack(func() {
- t.unlockAndRun(now)
+ t.unlockAndRun(now, c.bubble)
})
}
@@ -1460,9 +1470,11 @@ func (t *timer) maybeRunChan() {
// adding it if needed.
func blockTimerChan(c *hchan) {
t := c.timer
- if t.isFake {
- return
+ if t.isFake && c.bubble != getg().bubble {
+ // This should have been checked by the caller, but check just in case.
+ fatal("synctest timer accessed from outside bubble")
}
+
t.lock()
t.trace("blockTimerChan")
if !t.isChan {
@@ -1500,9 +1512,6 @@ func blockTimerChan(c *hchan) {
// blocked on it anymore.
func unblockTimerChan(c *hchan) {
t := c.timer
- if t.isFake {
- return
- }
t.lock()
t.trace("unblockTimerChan")
if !t.isChan || t.blocked == 0 {