aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/runtime/proc.go166
-rw-r--r--src/runtime/runtime2.go14
-rw-r--r--src/runtime/time.go21
3 files changed, 186 insertions, 15 deletions
diff --git a/src/runtime/proc.go b/src/runtime/proc.go
index c419dee771..1a51b1d83b 100644
--- a/src/runtime/proc.go
+++ b/src/runtime/proc.go
@@ -2221,6 +2221,9 @@ top:
if _p_.runSafePointFn != 0 {
runSafePointFn()
}
+
+ now, pollUntil, _ := checkTimers(_p_, 0)
+
if fingwait && fingwake {
if gp := wakefing(); gp != nil {
ready(gp, 0, true)
@@ -2266,12 +2269,7 @@ top:
// Steal work from other P's.
procs := uint32(gomaxprocs)
- if atomic.Load(&sched.npidle) == procs-1 {
- // Either GOMAXPROCS=1 or everybody, except for us, is idle already.
- // New work can appear from returning syscall/cgocall, network or timers.
- // Neither of that submits to local run queues, so no point in stealing.
- goto stop
- }
+ ranTimer := false
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
@@ -2288,11 +2286,48 @@ top:
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
- if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
+ p2 := allp[enum.position()]
+ if _p_ == p2 {
+ continue
+ }
+ if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
return gp, false
}
+
+ // Consider stealing timers from p2.
+ // This call to checkTimers is the only place where
+ // we hold a lock on a different P's timers.
+ // Lock contention can be a problem here, so avoid
+ // grabbing the lock if p2 is running and not marked
+ // for preemption. If p2 is running and not being
+ // preempted we assume it will handle its own timers.
+ if i > 2 && shouldStealTimers(p2) {
+ tnow, w, ran := checkTimers(p2, now)
+ now = tnow
+ if w != 0 && (pollUntil == 0 || w < pollUntil) {
+ pollUntil = w
+ }
+ if ran {
+ // Running the timers may have
+ // made an arbitrary number of G's
+ // ready and added them to this P's
+ // local run queue. That invalidates
+ // the assumption of runqsteal
+ // that is always has room to add
+ // stolen G's. So check now if there
+ // is a local G to run.
+ if gp, inheritTime := runqget(_p_); gp != nil {
+ return gp, inheritTime
+ }
+ ranTimer = true
+ }
+ }
}
}
+ if ranTimer {
+ // Running a timer may have made some goroutine ready.
+ goto top
+ }
stop:
@@ -2309,6 +2344,12 @@ stop:
return gp, false
}
+ delta := int64(-1)
+ if pollUntil != 0 {
+ // checkTimers ensures that polluntil > now.
+ delta = pollUntil - now
+ }
+
// wasm only:
// If a callback returned and no other goroutine is awake,
// then pause execution until a callback was triggered.
@@ -2400,14 +2441,16 @@ stop:
}
// poll network
- if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
+ if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
+ atomic.Store64(&sched.pollUntil, uint64(pollUntil))
if _g_.m.p != 0 {
throw("findrunnable: netpoll with p")
}
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
- list := netpoll(-1) // block until new work is available
+ list := netpoll(delta) // block until new work is available
+ atomic.Store64(&sched.pollUntil, 0)
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
lock(&sched.lock)
_p_ = pidleget()
@@ -2431,6 +2474,11 @@ stop:
}
goto top
}
+ } else if pollUntil != 0 && netpollinited() {
+ pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
+ if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
+ netpollBreak()
+ }
}
stopm()
goto top
@@ -2457,6 +2505,22 @@ func pollWork() bool {
return false
}
+// wakeNetPoller wakes up the thread sleeping in the network poller,
+// if there is one, and if it isn't going to wake up anyhow before
+// the when argument.
+func wakeNetPoller(when int64) {
+ if atomic.Load64(&sched.lastpoll) == 0 {
+ // In findrunnable we ensure that when polling the pollUntil
+ // field is either zero or the time to which the current
+ // poll is expected to run. This can have a spurious wakeup
+ // but should never miss a wakeup.
+ pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
+ if pollerPollUntil == 0 || pollerPollUntil > when {
+ netpollBreak()
+ }
+ }
+}
+
func resetspinning() {
_g_ := getg()
if !_g_.m.spinning {
@@ -2525,10 +2589,20 @@ top:
gcstopm()
goto top
}
- if _g_.m.p.ptr().runSafePointFn != 0 {
+ pp := _g_.m.p.ptr()
+ if pp.runSafePointFn != 0 {
runSafePointFn()
}
+ // Sanity check: if we are spinning, the run queue should be empty.
+ // Check this before calling checkTimers, as that might call
+ // goready to put a ready goroutine on the local run queue.
+ if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
+ throw("schedule: spinning with local work")
+ }
+
+ checkTimers(pp, 0)
+
var gp *g
var inheritTime bool
@@ -2560,9 +2634,8 @@ top:
}
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
- if gp != nil && _g_.m.spinning {
- throw("schedule: spinning with local work")
- }
+ // We can see gp != nil here even if the M is spinning,
+ // if checkTimers added a local goroutine via goready.
}
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
@@ -2623,6 +2696,60 @@ func dropg() {
setGNoWB(&_g_.m.curg, nil)
}
+// checkTimers runs any timers for the P that are ready.
+// If now is not 0 it is the current time.
+// It returns the current time or 0 if it is not known,
+// and the time when the next timer should run or 0 if there is no next timer,
+// and reports whether it ran any timers.
+// If the time when the next timer should run is not 0,
+// it is always larger than the returned time.
+// We pass now in and out to avoid extra calls of nanotime.
+//go:yeswritebarrierrec
+func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
+ lock(&pp.timersLock)
+
+ adjusttimers(pp)
+
+ rnow = now
+ if len(pp.timers) > 0 {
+ if rnow == 0 {
+ rnow = nanotime()
+ }
+ for len(pp.timers) > 0 {
+ if tw := runtimer(pp, rnow); tw != 0 {
+ if tw > 0 {
+ pollUntil = tw
+ }
+ break
+ }
+ ran = true
+ }
+ }
+
+ unlock(&pp.timersLock)
+
+ return rnow, pollUntil, ran
+}
+
+// shouldStealTimers reports whether we should try stealing the timers from p2.
+// We don't steal timers from a running P that is not marked for preemption,
+// on the assumption that it will run its own timers. This reduces
+// contention on the timers lock.
+func shouldStealTimers(p2 *p) bool {
+ if p2.status != _Prunning {
+ return true
+ }
+ mp := p2.m.ptr()
+ if mp == nil || mp.locks > 0 {
+ return false
+ }
+ gp := mp.curg
+ if gp == nil || gp.atomicstatus != _Grunning || !gp.preempt {
+ return false
+ }
+ return true
+}
+
func parkunlock_c(gp *g, lock unsafe.Pointer) bool {
unlock((*mutex)(lock))
return true
@@ -4305,6 +4432,13 @@ func checkdead() {
return
}
+ // There are no goroutines running, so we can look at the P's.
+ for _, _p_ := range allp {
+ if len(_p_.timers) > 0 {
+ return
+ }
+ }
+
getg().m.throwing = -1 // do not dump full stacks
throw("all goroutines are asleep - deadlock!")
}
@@ -4392,6 +4526,12 @@ func sysmon() {
incidlelocked(1)
}
}
+ if timeSleepUntil() < now {
+ // There are timers that should have already run,
+ // perhaps because there is an unpreemptible P.
+ // Try to start an M to run them.
+ startm(nil, false)
+ }
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go
index dd399e00a6..f44cd2fb14 100644
--- a/src/runtime/runtime2.go
+++ b/src/runtime/runtime2.go
@@ -598,13 +598,23 @@ type p struct {
runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point
+ // Lock for timers. We normally access the timers while running
+ // on this P, but the scheduler can also do it from a different P.
+ timersLock mutex
+
+ // Actions to take at some time. This is used to implement the
+ // standard library's time package.
+ // Must hold timersLock to access.
+ timers []*timer
+
pad cpu.CacheLinePad
}
type schedt struct {
// accessed atomically. keep at top to ensure alignment on 32-bit systems.
- goidgen uint64
- lastpoll uint64
+ goidgen uint64
+ lastpoll uint64 // time of last network poll, 0 if currently polling
+ pollUntil uint64 // time to which current poll is sleeping
lock mutex
diff --git a/src/runtime/time.go b/src/runtime/time.go
index 5521b8a807..1bbb5684cb 100644
--- a/src/runtime/time.go
+++ b/src/runtime/time.go
@@ -325,6 +325,27 @@ func timerproc(tb *timersBucket) {
}
}
+// adjusttimers looks through the timers in the current P's heap for
+// any timers that have been modified to run earlier, and puts them in
+// the correct place in the heap.
+// The caller must have locked the timers for pp.
+func adjusttimers(pp *p) {
+ if len(pp.timers) == 0 {
+ return
+ }
+ throw("adjusttimers: not yet implemented")
+}
+
+// runtimer examines the first timer in timers. If it is ready based on now,
+// it runs the timer and removes or updates it.
+// Returns 0 if it ran a timer, -1 if there are no more timers, or the time
+// when the first timer should run.
+// The caller must have locked the timers for pp.
+func runtimer(pp *p, now int64) int64 {
+ throw("runtimer: not yet implemented")
+ return -1
+}
+
func timejump() *g {
if faketime == 0 {
return nil