diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/runtime/proc.go | 166 | ||||
| -rw-r--r-- | src/runtime/runtime2.go | 14 | ||||
| -rw-r--r-- | src/runtime/time.go | 21 |
3 files changed, 186 insertions, 15 deletions
diff --git a/src/runtime/proc.go b/src/runtime/proc.go index c419dee771..1a51b1d83b 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -2221,6 +2221,9 @@ top: if _p_.runSafePointFn != 0 { runSafePointFn() } + + now, pollUntil, _ := checkTimers(_p_, 0) + if fingwait && fingwake { if gp := wakefing(); gp != nil { ready(gp, 0, true) @@ -2266,12 +2269,7 @@ top: // Steal work from other P's. procs := uint32(gomaxprocs) - if atomic.Load(&sched.npidle) == procs-1 { - // Either GOMAXPROCS=1 or everybody, except for us, is idle already. - // New work can appear from returning syscall/cgocall, network or timers. - // Neither of that submits to local run queues, so no point in stealing. - goto stop - } + ranTimer := false // If number of spinning M's >= number of busy P's, block. // This is necessary to prevent excessive CPU consumption // when GOMAXPROCS>>1 but the program parallelism is low. @@ -2288,11 +2286,48 @@ top: goto top } stealRunNextG := i > 2 // first look for ready queues with more than 1 g - if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { + p2 := allp[enum.position()] + if _p_ == p2 { + continue + } + if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil { return gp, false } + + // Consider stealing timers from p2. + // This call to checkTimers is the only place where + // we hold a lock on a different P's timers. + // Lock contention can be a problem here, so avoid + // grabbing the lock if p2 is running and not marked + // for preemption. If p2 is running and not being + // preempted we assume it will handle its own timers. + if i > 2 && shouldStealTimers(p2) { + tnow, w, ran := checkTimers(p2, now) + now = tnow + if w != 0 && (pollUntil == 0 || w < pollUntil) { + pollUntil = w + } + if ran { + // Running the timers may have + // made an arbitrary number of G's + // ready and added them to this P's + // local run queue. That invalidates + // the assumption of runqsteal + // that is always has room to add + // stolen G's. So check now if there + // is a local G to run. + if gp, inheritTime := runqget(_p_); gp != nil { + return gp, inheritTime + } + ranTimer = true + } + } } } + if ranTimer { + // Running a timer may have made some goroutine ready. + goto top + } stop: @@ -2309,6 +2344,12 @@ stop: return gp, false } + delta := int64(-1) + if pollUntil != 0 { + // checkTimers ensures that polluntil > now. + delta = pollUntil - now + } + // wasm only: // If a callback returned and no other goroutine is awake, // then pause execution until a callback was triggered. @@ -2400,14 +2441,16 @@ stop: } // poll network - if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 { + if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 { + atomic.Store64(&sched.pollUntil, uint64(pollUntil)) if _g_.m.p != 0 { throw("findrunnable: netpoll with p") } if _g_.m.spinning { throw("findrunnable: netpoll with spinning") } - list := netpoll(-1) // block until new work is available + list := netpoll(delta) // block until new work is available + atomic.Store64(&sched.pollUntil, 0) atomic.Store64(&sched.lastpoll, uint64(nanotime())) lock(&sched.lock) _p_ = pidleget() @@ -2431,6 +2474,11 @@ stop: } goto top } + } else if pollUntil != 0 && netpollinited() { + pollerPollUntil := int64(atomic.Load64(&sched.pollUntil)) + if pollerPollUntil == 0 || pollerPollUntil > pollUntil { + netpollBreak() + } } stopm() goto top @@ -2457,6 +2505,22 @@ func pollWork() bool { return false } +// wakeNetPoller wakes up the thread sleeping in the network poller, +// if there is one, and if it isn't going to wake up anyhow before +// the when argument. +func wakeNetPoller(when int64) { + if atomic.Load64(&sched.lastpoll) == 0 { + // In findrunnable we ensure that when polling the pollUntil + // field is either zero or the time to which the current + // poll is expected to run. This can have a spurious wakeup + // but should never miss a wakeup. + pollerPollUntil := int64(atomic.Load64(&sched.pollUntil)) + if pollerPollUntil == 0 || pollerPollUntil > when { + netpollBreak() + } + } +} + func resetspinning() { _g_ := getg() if !_g_.m.spinning { @@ -2525,10 +2589,20 @@ top: gcstopm() goto top } - if _g_.m.p.ptr().runSafePointFn != 0 { + pp := _g_.m.p.ptr() + if pp.runSafePointFn != 0 { runSafePointFn() } + // Sanity check: if we are spinning, the run queue should be empty. + // Check this before calling checkTimers, as that might call + // goready to put a ready goroutine on the local run queue. + if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) { + throw("schedule: spinning with local work") + } + + checkTimers(pp, 0) + var gp *g var inheritTime bool @@ -2560,9 +2634,8 @@ top: } if gp == nil { gp, inheritTime = runqget(_g_.m.p.ptr()) - if gp != nil && _g_.m.spinning { - throw("schedule: spinning with local work") - } + // We can see gp != nil here even if the M is spinning, + // if checkTimers added a local goroutine via goready. } if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available @@ -2623,6 +2696,60 @@ func dropg() { setGNoWB(&_g_.m.curg, nil) } +// checkTimers runs any timers for the P that are ready. +// If now is not 0 it is the current time. +// It returns the current time or 0 if it is not known, +// and the time when the next timer should run or 0 if there is no next timer, +// and reports whether it ran any timers. +// If the time when the next timer should run is not 0, +// it is always larger than the returned time. +// We pass now in and out to avoid extra calls of nanotime. +//go:yeswritebarrierrec +func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { + lock(&pp.timersLock) + + adjusttimers(pp) + + rnow = now + if len(pp.timers) > 0 { + if rnow == 0 { + rnow = nanotime() + } + for len(pp.timers) > 0 { + if tw := runtimer(pp, rnow); tw != 0 { + if tw > 0 { + pollUntil = tw + } + break + } + ran = true + } + } + + unlock(&pp.timersLock) + + return rnow, pollUntil, ran +} + +// shouldStealTimers reports whether we should try stealing the timers from p2. +// We don't steal timers from a running P that is not marked for preemption, +// on the assumption that it will run its own timers. This reduces +// contention on the timers lock. +func shouldStealTimers(p2 *p) bool { + if p2.status != _Prunning { + return true + } + mp := p2.m.ptr() + if mp == nil || mp.locks > 0 { + return false + } + gp := mp.curg + if gp == nil || gp.atomicstatus != _Grunning || !gp.preempt { + return false + } + return true +} + func parkunlock_c(gp *g, lock unsafe.Pointer) bool { unlock((*mutex)(lock)) return true @@ -4305,6 +4432,13 @@ func checkdead() { return } + // There are no goroutines running, so we can look at the P's. + for _, _p_ := range allp { + if len(_p_.timers) > 0 { + return + } + } + getg().m.throwing = -1 // do not dump full stacks throw("all goroutines are asleep - deadlock!") } @@ -4392,6 +4526,12 @@ func sysmon() { incidlelocked(1) } } + if timeSleepUntil() < now { + // There are timers that should have already run, + // perhaps because there is an unpreemptible P. + // Try to start an M to run them. + startm(nil, false) + } // retake P's blocked in syscalls // and preempt long running G's if retake(now) != 0 { diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go index dd399e00a6..f44cd2fb14 100644 --- a/src/runtime/runtime2.go +++ b/src/runtime/runtime2.go @@ -598,13 +598,23 @@ type p struct { runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point + // Lock for timers. We normally access the timers while running + // on this P, but the scheduler can also do it from a different P. + timersLock mutex + + // Actions to take at some time. This is used to implement the + // standard library's time package. + // Must hold timersLock to access. + timers []*timer + pad cpu.CacheLinePad } type schedt struct { // accessed atomically. keep at top to ensure alignment on 32-bit systems. - goidgen uint64 - lastpoll uint64 + goidgen uint64 + lastpoll uint64 // time of last network poll, 0 if currently polling + pollUntil uint64 // time to which current poll is sleeping lock mutex diff --git a/src/runtime/time.go b/src/runtime/time.go index 5521b8a807..1bbb5684cb 100644 --- a/src/runtime/time.go +++ b/src/runtime/time.go @@ -325,6 +325,27 @@ func timerproc(tb *timersBucket) { } } +// adjusttimers looks through the timers in the current P's heap for +// any timers that have been modified to run earlier, and puts them in +// the correct place in the heap. +// The caller must have locked the timers for pp. +func adjusttimers(pp *p) { + if len(pp.timers) == 0 { + return + } + throw("adjusttimers: not yet implemented") +} + +// runtimer examines the first timer in timers. If it is ready based on now, +// it runs the timer and removes or updates it. +// Returns 0 if it ran a timer, -1 if there are no more timers, or the time +// when the first timer should run. +// The caller must have locked the timers for pp. +func runtimer(pp *p, now int64) int64 { + throw("runtimer: not yet implemented") + return -1 +} + func timejump() *g { if faketime == 0 { return nil |
