aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/internal/synctest/synctest_test.go55
-rw-r--r--src/runtime/chan.go6
-rw-r--r--src/runtime/proc.go4
-rw-r--r--src/runtime/select.go2
-rw-r--r--src/runtime/synctest.go3
-rw-r--r--src/runtime/time.go97
6 files changed, 113 insertions, 54 deletions
diff --git a/src/internal/synctest/synctest_test.go b/src/internal/synctest/synctest_test.go
index 53c7c89716..c2f84be736 100644
--- a/src/internal/synctest/synctest_test.go
+++ b/src/internal/synctest/synctest_test.go
@@ -226,8 +226,8 @@ func TestTimerNondeterminism(t *testing.T) {
const iterations = 1000
var seen1, seen2 bool
for range iterations {
- tm1 := time.NewTimer(0)
- tm2 := time.NewTimer(0)
+ tm1 := time.NewTimer(1)
+ tm2 := time.NewTimer(1)
select {
case <-tm1.C:
seen1 = true
@@ -278,6 +278,57 @@ func TestSleepNondeterminism(t *testing.T) {
})
}
+// TestTimerRunsImmediately verifies that a 0-duration timer sends on its channel
+// without waiting for the bubble to block.
+func TestTimerRunsImmediately(t *testing.T) {
+ synctest.Run(func() {
+ start := time.Now()
+ tm := time.NewTimer(0)
+ select {
+ case got := <-tm.C:
+ if !got.Equal(start) {
+ t.Errorf("<-tm.C = %v, want %v", got, start)
+ }
+ default:
+ t.Errorf("0-duration timer channel is not readable; want it to be")
+ }
+ })
+}
+
+// TestTimerRunsLater verifies that reading from a timer's channel receives the
+// timer fired, even when that time is in reading from a timer's channel receives the
+// time the timer fired, even when that time is in the past.
+func TestTimerRanInPast(t *testing.T) {
+ synctest.Run(func() {
+ delay := 1 * time.Second
+ want := time.Now().Add(delay)
+ tm := time.NewTimer(delay)
+ time.Sleep(2 * delay)
+ select {
+ case got := <-tm.C:
+ if !got.Equal(want) {
+ t.Errorf("<-tm.C = %v, want %v", got, want)
+ }
+ default:
+ t.Errorf("0-duration timer channel is not readable; want it to be")
+ }
+ })
+}
+
+// TestAfterFuncRunsImmediately verifies that a 0-duration AfterFunc is scheduled
+// without waiting for the bubble to block.
+func TestAfterFuncRunsImmediately(t *testing.T) {
+ synctest.Run(func() {
+ var b atomic.Bool
+ time.AfterFunc(0, func() {
+ b.Store(true)
+ })
+ for !b.Load() {
+ runtime.Gosched()
+ }
+ })
+}
+
func TestChannelFromOutsideBubble(t *testing.T) {
choutside := make(chan struct{})
for _, test := range []struct {
diff --git a/src/runtime/chan.go b/src/runtime/chan.go
index df48267e97..bb554ebfdb 100644
--- a/src/runtime/chan.go
+++ b/src/runtime/chan.go
@@ -497,7 +497,7 @@ func empty(c *hchan) bool {
// c.timer is also immutable (it is set after make(chan) but before any channel operations).
// All timer channels have dataqsiz > 0.
if c.timer != nil {
- c.timer.maybeRunChan()
+ c.timer.maybeRunChan(c)
}
return atomic.Loaduint(&c.qcount) == 0
}
@@ -542,7 +542,7 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
}
if c.timer != nil {
- c.timer.maybeRunChan()
+ c.timer.maybeRunChan(c)
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
@@ -821,7 +821,7 @@ func chanlen(c *hchan) int {
}
async := debug.asynctimerchan.Load() != 0
if c.timer != nil && async {
- c.timer.maybeRunChan()
+ c.timer.maybeRunChan(c)
}
if c.timer != nil && !async {
// timer channels have a buffered implementation
diff --git a/src/runtime/proc.go b/src/runtime/proc.go
index 5b8db2bee4..37a7b7f684 100644
--- a/src/runtime/proc.go
+++ b/src/runtime/proc.go
@@ -3341,7 +3341,7 @@ top:
// which may steal timers. It's important that between now
// and then, nothing blocks, so these numbers remain mostly
// relevant.
- now, pollUntil, _ := pp.timers.check(0)
+ now, pollUntil, _ := pp.timers.check(0, nil)
// Try to schedule the trace reader.
if traceEnabled() || traceShuttingDown() {
@@ -3780,7 +3780,7 @@ func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWo
// timerpMask tells us whether the P may have timers at all. If it
// can't, no need to check at all.
if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
- tnow, w, ran := p2.timers.check(now)
+ tnow, w, ran := p2.timers.check(now, nil)
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
pollUntil = w
diff --git a/src/runtime/select.go b/src/runtime/select.go
index 19256df6a6..ae7754b173 100644
--- a/src/runtime/select.go
+++ b/src/runtime/select.go
@@ -185,7 +185,7 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo
}
if cas.c.timer != nil {
- cas.c.timer.maybeRunChan()
+ cas.c.timer.maybeRunChan(cas.c)
}
j := cheaprandn(uint32(norder + 1))
diff --git a/src/runtime/synctest.go b/src/runtime/synctest.go
index f676afa20d..c837c792a5 100644
--- a/src/runtime/synctest.go
+++ b/src/runtime/synctest.go
@@ -185,7 +185,6 @@ func synctestRun(f func()) {
}
const synctestBaseTime = 946684800000000000 // midnight UTC 2000-01-01
bubble.now = synctestBaseTime
- bubble.timers.bubble = bubble
lockInit(&bubble.mu, lockRankSynctest)
lockInit(&bubble.timers.mu, lockRankTimers)
@@ -213,7 +212,7 @@ func synctestRun(f func()) {
// so timer goroutines inherit their child race context from g0.
curg := gp.m.curg
gp.m.curg = nil
- gp.bubble.timers.check(gp.bubble.now)
+ gp.bubble.timers.check(bubble.now, bubble)
gp.m.curg = curg
})
gopark(synctestidle_c, nil, waitReasonSynctestRun, traceBlockSynctest, 0)
diff --git a/src/runtime/time.go b/src/runtime/time.go
index a1f8351a1e..4880dce8cd 100644
--- a/src/runtime/time.go
+++ b/src/runtime/time.go
@@ -157,8 +157,6 @@ type timers struct {
// heap[i].when over timers with the timerModified bit set.
// If minWhenModified = 0, it means there are no timerModified timers in the heap.
minWhenModified atomic.Int64
-
- bubble *synctestBubble
}
type timerWhen struct {
@@ -403,7 +401,7 @@ func newTimer(when, period int64, f func(arg any, seq uintptr, delay int64), arg
throw("invalid timer channel: no capacity")
}
}
- if gr := getg().bubble; gr != nil {
+ if bubble := getg().bubble; bubble != nil {
t.isFake = true
}
t.modify(when, period, f, arg, 0)
@@ -485,7 +483,7 @@ func (t *timer) maybeRunAsync() {
// timer ourselves now is fine.)
if now := nanotime(); t.when <= now {
systemstack(func() {
- t.unlockAndRun(now) // resets t.when
+ t.unlockAndRun(now, nil) // resets t.when
})
t.lock()
}
@@ -621,6 +619,29 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in
add := t.needsAdd()
+ if add && t.isFake {
+ // If this is a bubbled timer scheduled to fire immediately,
+ // run it now rather than waiting for the bubble's timer scheduler.
+ // This avoids deferring timer execution until after the bubble
+ // becomes durably blocked.
+ //
+ // Don't do this for non-bubbled timers: It isn't necessary,
+ // and there may be cases where the runtime executes timers with
+ // the expectation the timer func will not run in the current goroutine.
+ // Bubbled timers are always created by the time package, and are
+ // safe to run in the current goroutine.
+ bubble := getg().bubble
+ if bubble == nil {
+ throw("fake timer executing with no bubble")
+ }
+ if t.state&timerHeaped == 0 && when <= bubble.now {
+ systemstack(func() {
+ t.unlockAndRun(bubble.now, bubble)
+ })
+ return pending
+ }
+ }
+
if !async && t.isChan {
// Stop any future sends with stale values.
// See timer.unlockAndRun.
@@ -657,7 +678,7 @@ func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay in
// t must be locked.
func (t *timer) needsAdd() bool {
assertLockHeld(&t.mu)
- need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.isFake || t.blocked > 0)
+ need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.blocked > 0)
if need {
t.trace("needsAdd+")
} else {
@@ -982,7 +1003,7 @@ func (ts *timers) wakeTime() int64 {
// We pass now in and out to avoid extra calls of nanotime.
//
//go:yeswritebarrierrec
-func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) {
+func (ts *timers) check(now int64, bubble *synctestBubble) (rnow, pollUntil int64, ran bool) {
ts.trace("check")
// If it's not yet time for the first timer, or the first adjusted
// timer, then there is nothing to do.
@@ -1015,7 +1036,7 @@ func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) {
ts.adjust(now, false)
for len(ts.heap) > 0 {
// Note that runtimer may temporarily unlock ts.
- if tw := ts.run(now); tw != 0 {
+ if tw := ts.run(now, bubble); tw != 0 {
if tw > 0 {
pollUntil = tw
}
@@ -1047,7 +1068,7 @@ func (ts *timers) check(now int64) (rnow, pollUntil int64, ran bool) {
// If a timer is run, this will temporarily unlock ts.
//
//go:systemstack
-func (ts *timers) run(now int64) int64 {
+func (ts *timers) run(now int64, bubble *synctestBubble) int64 {
ts.trace("run")
assertLockHeld(&ts.mu)
Redo:
@@ -1081,7 +1102,7 @@ Redo:
return t.when
}
- t.unlockAndRun(now)
+ t.unlockAndRun(now, bubble)
assertLockHeld(&ts.mu) // t is unlocked now, but not ts
return 0
}
@@ -1092,7 +1113,7 @@ Redo:
// unlockAndRun returns with t unlocked and t.ts (re-)locked.
//
//go:systemstack
-func (t *timer) unlockAndRun(now int64) {
+func (t *timer) unlockAndRun(now int64, bubble *synctestBubble) {
t.trace("unlockAndRun")
assertLockHeld(&t.mu)
if t.ts != nil {
@@ -1104,10 +1125,10 @@ func (t *timer) unlockAndRun(now int64) {
// out from under us while this function executes.
gp := getg()
var tsLocal *timers
- if t.ts == nil || t.ts.bubble == nil {
+ if bubble == nil {
tsLocal = &gp.m.p.ptr().timers
} else {
- tsLocal = &t.ts.bubble.timers
+ tsLocal = &bubble.timers
}
if tsLocal.raceCtx == 0 {
tsLocal.raceCtx = racegostart(abi.FuncPCABIInternal((*timers).run) + sys.PCQuantum)
@@ -1160,10 +1181,10 @@ func (t *timer) unlockAndRun(now int64) {
if gp.racectx != 0 {
throw("unexpected racectx")
}
- if ts == nil || ts.bubble == nil {
+ if bubble == nil {
gp.racectx = gp.m.p.ptr().timers.raceCtx
} else {
- gp.racectx = ts.bubble.timers.raceCtx
+ gp.racectx = bubble.timers.raceCtx
}
}
@@ -1171,14 +1192,14 @@ func (t *timer) unlockAndRun(now int64) {
ts.unlock()
}
- if ts != nil && ts.bubble != nil {
+ if bubble != nil {
// Temporarily use the timer's synctest group for the G running this timer.
gp := getg()
if gp.bubble != nil {
throw("unexpected syncgroup set")
}
- gp.bubble = ts.bubble
- ts.bubble.changegstatus(gp, _Gdead, _Grunning)
+ gp.bubble = bubble
+ bubble.changegstatus(gp, _Gdead, _Grunning)
}
if !async && t.isChan {
@@ -1222,13 +1243,13 @@ func (t *timer) unlockAndRun(now int64) {
unlock(&t.sendLock)
}
- if ts != nil && ts.bubble != nil {
+ if bubble != nil {
gp := getg()
- ts.bubble.changegstatus(gp, _Grunning, _Gdead)
+ bubble.changegstatus(gp, _Grunning, _Gdead)
if raceenabled {
// Establish a happens-before between this timer event and
// the next synctest.Wait call.
- racereleasemergeg(gp, ts.bubble.raceaddr())
+ racereleasemergeg(gp, bubble.raceaddr())
}
gp.bubble = nil
}
@@ -1415,24 +1436,10 @@ func badTimer() {
// maybeRunChan checks whether the timer needs to run
// to send a value to its associated channel. If so, it does.
// The timer must not be locked.
-func (t *timer) maybeRunChan() {
- if t.isFake {
- t.lock()
- var timerBubble *synctestBubble
- if t.ts != nil {
- timerBubble = t.ts.bubble
- }
- t.unlock()
- bubble := getg().bubble
- if bubble == nil {
- panic(plainError("synctest timer accessed from outside bubble"))
- }
- if timerBubble != nil && bubble != timerBubble {
- panic(plainError("timer moved between synctest bubbles"))
- }
- // No need to do anything here.
- // synctest.Run will run the timer when it advances its fake clock.
- return
+func (t *timer) maybeRunChan(c *hchan) {
+ if t.isFake && getg().bubble != c.bubble {
+ // This should have been checked by the caller, but check just in case.
+ fatal("synctest timer accessed from outside bubble")
}
if t.astate.Load()&timerHeaped != 0 {
// If the timer is in the heap, the ordinary timer code
@@ -1442,6 +1449,9 @@ func (t *timer) maybeRunChan() {
t.lock()
now := nanotime()
+ if t.isFake {
+ now = getg().bubble.now
+ }
if t.state&timerHeaped != 0 || t.when == 0 || t.when > now {
t.trace("maybeRunChan-")
// Timer in the heap, or not running at all, or not triggered.
@@ -1450,7 +1460,7 @@ func (t *timer) maybeRunChan() {
}
t.trace("maybeRunChan+")
systemstack(func() {
- t.unlockAndRun(now)
+ t.unlockAndRun(now, c.bubble)
})
}
@@ -1460,9 +1470,11 @@ func (t *timer) maybeRunChan() {
// adding it if needed.
func blockTimerChan(c *hchan) {
t := c.timer
- if t.isFake {
- return
+ if t.isFake && c.bubble != getg().bubble {
+ // This should have been checked by the caller, but check just in case.
+ fatal("synctest timer accessed from outside bubble")
}
+
t.lock()
t.trace("blockTimerChan")
if !t.isChan {
@@ -1500,9 +1512,6 @@ func blockTimerChan(c *hchan) {
// blocked on it anymore.
func unblockTimerChan(c *hchan) {
t := c.timer
- if t.isFake {
- return
- }
t.lock()
t.trace("unblockTimerChan")
if !t.isChan || t.blocked == 0 {