aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/proc.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/runtime/proc.go')
-rw-r--r--src/runtime/proc.go776
1 files changed, 521 insertions, 255 deletions
diff --git a/src/runtime/proc.go b/src/runtime/proc.go
index dbb430fd25..d9f8c65530 100644
--- a/src/runtime/proc.go
+++ b/src/runtime/proc.go
@@ -5,15 +5,14 @@
package runtime
import (
- "internal/bytealg"
+ "internal/abi"
"internal/cpu"
+ "internal/goexperiment"
"runtime/internal/atomic"
"runtime/internal/sys"
"unsafe"
)
-var buildVersion = sys.TheVersion
-
// set using cmd/go/internal/modload.ModInfoProg
var modinfo string
@@ -52,33 +51,64 @@ var modinfo string
// any work to do.
//
// The current approach:
-// We unpark an additional thread when we ready a goroutine if (1) there is an
-// idle P and there are no "spinning" worker threads. A worker thread is considered
-// spinning if it is out of local work and did not find work in global run queue/
-// netpoller; the spinning state is denoted in m.spinning and in sched.nmspinning.
-// Threads unparked this way are also considered spinning; we don't do goroutine
-// handoff so such threads are out of work initially. Spinning threads do some
-// spinning looking for work in per-P run queues before parking. If a spinning
+//
+// This approach applies to three primary sources of potential work: readying a
+// goroutine, new/modified-earlier timers, and idle-priority GC. See below for
+// additional details.
+//
+// We unpark an additional thread when we submit work if (this is wakep()):
+// 1. There is an idle P, and
+// 2. There are no "spinning" worker threads.
+//
+// A worker thread is considered spinning if it is out of local work and did
+// not find work in the global run queue or netpoller; the spinning state is
+// denoted in m.spinning and in sched.nmspinning. Threads unparked this way are
+// also considered spinning; we don't do goroutine handoff so such threads are
+// out of work initially. Spinning threads spin on looking for work in per-P
+// run queues and timer heaps or from the GC before parking. If a spinning
// thread finds work it takes itself out of the spinning state and proceeds to
-// execution. If it does not find work it takes itself out of the spinning state
-// and then parks.
-// If there is at least one spinning thread (sched.nmspinning>1), we don't unpark
-// new threads when readying goroutines. To compensate for that, if the last spinning
-// thread finds work and stops spinning, it must unpark a new spinning thread.
-// This approach smooths out unjustified spikes of thread unparking,
-// but at the same time guarantees eventual maximal CPU parallelism utilization.
+// execution. If it does not find work it takes itself out of the spinning
+// state and then parks.
+//
+// If there is at least one spinning thread (sched.nmspinning>1), we don't
+// unpark new threads when submitting work. To compensate for that, if the last
+// spinning thread finds work and stops spinning, it must unpark a new spinning
+// thread. This approach smooths out unjustified spikes of thread unparking,
+// but at the same time guarantees eventual maximal CPU parallelism
+// utilization.
//
-// The main implementation complication is that we need to be very careful during
-// spinning->non-spinning thread transition. This transition can race with submission
-// of a new goroutine, and either one part or another needs to unpark another worker
-// thread. If they both fail to do that, we can end up with semi-persistent CPU
-// underutilization. The general pattern for goroutine readying is: submit a goroutine
-// to local work queue, #StoreLoad-style memory barrier, check sched.nmspinning.
-// The general pattern for spinning->non-spinning transition is: decrement nmspinning,
-// #StoreLoad-style memory barrier, check all per-P work queues for new work.
-// Note that all this complexity does not apply to global run queue as we are not
-// sloppy about thread unparking when submitting to global queue. Also see comments
-// for nmspinning manipulation.
+// The main implementation complication is that we need to be very careful
+// during spinning->non-spinning thread transition. This transition can race
+// with submission of new work, and either one part or another needs to unpark
+// another worker thread. If they both fail to do that, we can end up with
+// semi-persistent CPU underutilization.
+//
+// The general pattern for submission is:
+// 1. Submit work to the local run queue, timer heap, or GC state.
+// 2. #StoreLoad-style memory barrier.
+// 3. Check sched.nmspinning.
+//
+// The general pattern for spinning->non-spinning transition is:
+// 1. Decrement nmspinning.
+// 2. #StoreLoad-style memory barrier.
+// 3. Check all per-P work queues and GC for new work.
+//
+// Note that all this complexity does not apply to global run queue as we are
+// not sloppy about thread unparking when submitting to global queue. Also see
+// comments for nmspinning manipulation.
+//
+// How these different sources of work behave varies, though it doesn't affect
+// the synchronization approach:
+// * Ready goroutine: this is an obvious source of work; the goroutine is
+// immediately ready and must run on some thread eventually.
+// * New/modified-earlier timer: The current timer implementation (see time.go)
+// uses netpoll in a thread with no work available to wait for the soonest
+// timer. If there is no thread waiting, we want a new spinning thread to go
+// wait.
+// * Idle-priority GC: The GC wakes a stopped idle thread to contribute to
+// background GC work (note: currently disabled per golang.org/issue/19112).
+// Also see golang.org/issue/44313, as this should be extended to all GC
+// workers.
var (
m0 m
@@ -541,6 +571,30 @@ func atomicAllGIndex(ptr **g, i uintptr) *g {
return *(**g)(add(unsafe.Pointer(ptr), i*sys.PtrSize))
}
+// forEachG calls fn on every G from allgs.
+//
+// forEachG takes a lock to exclude concurrent addition of new Gs.
+func forEachG(fn func(gp *g)) {
+ lock(&allglock)
+ for _, gp := range allgs {
+ fn(gp)
+ }
+ unlock(&allglock)
+}
+
+// forEachGRace calls fn on every G from allgs.
+//
+// forEachGRace avoids locking, but does not exclude addition of new Gs during
+// execution, which may be missed.
+func forEachGRace(fn func(gp *g)) {
+ ptr, length := atomicAllG()
+ for i := uintptr(0); i < length; i++ {
+ gp := atomicAllGIndex(ptr, i)
+ fn(gp)
+ }
+ return
+}
+
const (
// Number of goroutine ids to grab from sched.goidgen to local per-P cache at once.
// 16 seems to provide enough amortization, but other than that it's mostly arbitrary number.
@@ -644,6 +698,11 @@ func schedinit() {
sigsave(&_g_.m.sigmask)
initSigmask = _g_.m.sigmask
+ if offset := unsafe.Offsetof(sched.timeToRun); offset%8 != 0 {
+ println(offset)
+ throw("sched.timeToRun not aligned to 8 bytes")
+ }
+
goargs()
goenvs()
parsedebugvars()
@@ -920,6 +979,37 @@ func casgstatus(gp *g, oldval, newval uint32) {
nextYield = nanotime() + yieldDelay/2
}
}
+
+ // Handle tracking for scheduling latencies.
+ if oldval == _Grunning {
+ // Track every 8th time a goroutine transitions out of running.
+ if gp.trackingSeq%gTrackingPeriod == 0 {
+ gp.tracking = true
+ }
+ gp.trackingSeq++
+ }
+ if gp.tracking {
+ now := nanotime()
+ if oldval == _Grunnable {
+ // We transitioned out of runnable, so measure how much
+ // time we spent in this state and add it to
+ // runnableTime.
+ gp.runnableTime += now - gp.runnableStamp
+ gp.runnableStamp = 0
+ }
+ if newval == _Grunnable {
+ // We just transitioned into runnable, so record what
+ // time that happened.
+ gp.runnableStamp = now
+ } else if newval == _Grunning {
+ // We're transitioning into running, so turn off
+ // tracking and record how much time we spent in
+ // runnable.
+ gp.tracking = false
+ sched.timeToRun.record(gp.runnableTime)
+ gp.runnableTime = 0
+ }
+ }
}
// casgstatus(gp, oldstatus, Gcopystack), assuming oldstatus is Gwaiting or Grunnable.
@@ -1213,7 +1303,7 @@ func usesLibcall() bool {
case "aix", "darwin", "illumos", "ios", "solaris", "windows":
return true
case "openbsd":
- return GOARCH == "amd64" || GOARCH == "arm64"
+ return GOARCH == "386" || GOARCH == "amd64" || GOARCH == "arm64"
}
return false
}
@@ -1226,7 +1316,7 @@ func mStackIsSystemAllocated() bool {
return true
case "openbsd":
switch GOARCH {
- case "amd64", "arm64":
+ case "386", "amd64", "arm64":
return true
}
}
@@ -1234,7 +1324,7 @@ func mStackIsSystemAllocated() bool {
}
// mstart is the entry-point for new Ms.
-// It is written in assembly, marked TOPFRAME, and calls mstart0.
+// It is written in assembly, uses ABI0, is marked TOPFRAME, and calls mstart0.
func mstart()
// mstart0 is the Go entry-point for new Ms.
@@ -1294,7 +1384,7 @@ func mstart1() {
throw("bad runtime·mstart")
}
- // Set up m.g0.sched as a label returning returning to just
+ // Set up m.g0.sched as a label returning to just
// after the mstart1 call in mstart0 above, for use by goexit0 and mcall.
// We're never coming back to mstart1 after we call schedule,
// so other calls can reuse the current frame.
@@ -1349,6 +1439,9 @@ func mPark() {
g := getg()
for {
notesleep(&g.m.park)
+ // Note, because of signal handling by this parked m,
+ // a preemptive mDoFixup() may actually occur via
+ // mDoFixupAndOSYield(). (See golang.org/issue/44193)
noteclear(&g.m.park)
if !mDoFixup() {
return
@@ -1582,6 +1675,22 @@ func syscall_runtime_doAllThreadsSyscall(fn func(bool) bool) {
for atomic.Load(&sched.sysmonStarting) != 0 {
osyield()
}
+
+ // We don't want this thread to handle signals for the
+ // duration of this critical section. The underlying issue
+ // being that this locked coordinating m is the one monitoring
+ // for fn() execution by all the other m's of the runtime,
+ // while no regular go code execution is permitted (the world
+ // is stopped). If this present m were to get distracted to
+ // run signal handling code, and find itself waiting for a
+ // second thread to execute go code before being able to
+ // return from that signal handling, a deadlock will result.
+ // (See golang.org/issue/44193.)
+ lockOSThread()
+ var sigmask sigset
+ sigsave(&sigmask)
+ sigblock(false)
+
stopTheWorldGC("doAllThreadsSyscall")
if atomic.Load(&newmHandoff.haveTemplateThread) != 0 {
// Ensure that there are no in-flight thread
@@ -1633,6 +1742,7 @@ func syscall_runtime_doAllThreadsSyscall(fn func(bool) bool) {
// the possibility of racing with mp.
lock(&mp.mFixup.lock)
mp.mFixup.fn = fn
+ atomic.Store(&mp.mFixup.used, 1)
if mp.doesPark {
// For non-service threads this will
// cause the wakeup to be short lived
@@ -1649,9 +1759,7 @@ func syscall_runtime_doAllThreadsSyscall(fn func(bool) bool) {
if mp.procid == tid {
continue
}
- lock(&mp.mFixup.lock)
- done = done && (mp.mFixup.fn == nil)
- unlock(&mp.mFixup.lock)
+ done = atomic.Load(&mp.mFixup.used) == 0
}
if done {
break
@@ -1678,6 +1786,8 @@ func syscall_runtime_doAllThreadsSyscall(fn func(bool) bool) {
unlock(&mFixupRace.lock)
}
startTheWorldGC()
+ msigrestore(sigmask)
+ unlockOSThread()
}
// runSafePointFn runs the safe point function, if any, for this P.
@@ -1859,6 +1969,10 @@ func needm() {
// Store the original signal mask for use by minit.
mp.sigmask = sigmask
+ // Install TLS on some platforms (previously setg
+ // would do this if necessary).
+ osSetupTLS(mp)
+
// Install g (= m->g0) and set the stack bounds
// to match the current stack. We don't actually know
// how big the stack is, like we don't know how big any
@@ -1909,7 +2023,7 @@ func oneNewExtraM() {
// the goroutine stack ends.
mp := allocm(nil, nil, -1)
gp := malg(4096)
- gp.sched.pc = funcPC(goexit) + sys.PCQuantum
+ gp.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum
gp.sched.sp = gp.stack.hi
gp.sched.sp -= 4 * sys.PtrSize // extra space in case of reads slightly beyond frame
gp.sched.lr = 0
@@ -2168,9 +2282,21 @@ var mFixupRace struct {
// mDoFixup runs any outstanding fixup function for the running m.
// Returns true if a fixup was outstanding and actually executed.
//
+// Note: to avoid deadlocks, and the need for the fixup function
+// itself to be async safe, signals are blocked for the working m
+// while it holds the mFixup lock. (See golang.org/issue/44193)
+//
//go:nosplit
func mDoFixup() bool {
_g_ := getg()
+ if used := atomic.Load(&_g_.m.mFixup.used); used == 0 {
+ return false
+ }
+
+ // slow path - if fixup fn is used, block signals and lock.
+ var sigmask sigset
+ sigsave(&sigmask)
+ sigblock(false)
lock(&_g_.m.mFixup.lock)
fn := _g_.m.mFixup.fn
if fn != nil {
@@ -2187,7 +2313,6 @@ func mDoFixup() bool {
// is more obviously safe.
throw("GC must be disabled to protect validity of fn value")
}
- *(*uintptr)(unsafe.Pointer(&_g_.m.mFixup.fn)) = 0
if _g_.racectx != 0 || !raceenabled {
fn(false)
} else {
@@ -2202,11 +2327,24 @@ func mDoFixup() bool {
_g_.racectx = 0
unlock(&mFixupRace.lock)
}
+ *(*uintptr)(unsafe.Pointer(&_g_.m.mFixup.fn)) = 0
+ atomic.Store(&_g_.m.mFixup.used, 0)
}
unlock(&_g_.m.mFixup.lock)
+ msigrestore(sigmask)
return fn != nil
}
+// mDoFixupAndOSYield is called when an m is unable to send a signal
+// because the allThreadsSyscall mechanism is in progress. That is, an
+// mPark() has been interrupted with this signal handler so we need to
+// ensure the fixup is executed from this context.
+//go:nosplit
+func mDoFixupAndOSYield() {
+ mDoFixup()
+ osyield()
+}
+
// templateThread is a thread in a known-good state that exists solely
// to start new threads in known-good states when the calling thread
// may not be in a good state.
@@ -2624,85 +2762,40 @@ top:
}
}
- // Steal work from other P's.
+ // Spinning Ms: steal work from other Ps.
+ //
+ // Limit the number of spinning Ms to half the number of busy Ps.
+ // This is necessary to prevent excessive CPU consumption when
+ // GOMAXPROCS>>1 but the program parallelism is low.
procs := uint32(gomaxprocs)
- ranTimer := false
- // If number of spinning M's >= number of busy P's, block.
- // This is necessary to prevent excessive CPU consumption
- // when GOMAXPROCS>>1 but the program parallelism is low.
- if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
- goto stop
- }
- if !_g_.m.spinning {
- _g_.m.spinning = true
- atomic.Xadd(&sched.nmspinning, 1)
- }
- const stealTries = 4
- for i := 0; i < stealTries; i++ {
- stealTimersOrRunNextG := i == stealTries-1
-
- for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
- if sched.gcwaiting != 0 {
- goto top
- }
- p2 := allp[enum.position()]
- if _p_ == p2 {
- continue
- }
-
- // Steal timers from p2. This call to checkTimers is the only place
- // where we might hold a lock on a different P's timers. We do this
- // once on the last pass before checking runnext because stealing
- // from the other P's runnext should be the last resort, so if there
- // are timers to steal do that first.
- //
- // We only check timers on one of the stealing iterations because
- // the time stored in now doesn't change in this loop and checking
- // the timers for each P more than once with the same value of now
- // is probably a waste of time.
- //
- // timerpMask tells us whether the P may have timers at all. If it
- // can't, no need to check at all.
- if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
- tnow, w, ran := checkTimers(p2, now)
- now = tnow
- if w != 0 && (pollUntil == 0 || w < pollUntil) {
- pollUntil = w
- }
- if ran {
- // Running the timers may have
- // made an arbitrary number of G's
- // ready and added them to this P's
- // local run queue. That invalidates
- // the assumption of runqsteal
- // that is always has room to add
- // stolen G's. So check now if there
- // is a local G to run.
- if gp, inheritTime := runqget(_p_); gp != nil {
- return gp, inheritTime
- }
- ranTimer = true
- }
- }
+ if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
+ if !_g_.m.spinning {
+ _g_.m.spinning = true
+ atomic.Xadd(&sched.nmspinning, 1)
+ }
- // Don't bother to attempt to steal if p2 is idle.
- if !idlepMask.read(enum.position()) {
- if gp := runqsteal(_p_, p2, stealTimersOrRunNextG); gp != nil {
- return gp, false
- }
- }
+ gp, inheritTime, tnow, w, newWork := stealWork(now)
+ now = tnow
+ if gp != nil {
+ // Successfully stole.
+ return gp, inheritTime
+ }
+ if newWork {
+ // There may be new timer or GC work; restart to
+ // discover.
+ goto top
+ }
+ if w != 0 && (pollUntil == 0 || w < pollUntil) {
+ // Earlier timer to wait for.
+ pollUntil = w
}
}
- if ranTimer {
- // Running a timer may have made some goroutine ready.
- goto top
- }
-
-stop:
- // We have nothing to do. If we're in the GC mark phase, can
- // safely scan and blacken objects, and have work to do, run
- // idle-time marking rather than give up the P.
+ // We have nothing to do.
+ //
+ // If we're in the GC mark phase, can safely scan and blacken objects,
+ // and have work to do, run idle-time marking rather than give up the
+ // P.
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())
if node != nil {
@@ -2716,17 +2809,11 @@ stop:
}
}
- delta := int64(-1)
- if pollUntil != 0 {
- // checkTimers ensures that polluntil > now.
- delta = pollUntil - now
- }
-
// wasm only:
// If a callback returned and no other goroutine is awake,
// then wake event handler goroutine which pauses execution
// until a callback was triggered.
- gp, otherReady := beforeIdle(delta)
+ gp, otherReady := beforeIdle(now, pollUntil)
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
@@ -2765,18 +2852,25 @@ stop:
pidleput(_p_)
unlock(&sched.lock)
- // Delicate dance: thread transitions from spinning to non-spinning state,
- // potentially concurrently with submission of new goroutines. We must
- // drop nmspinning first and then check all per-P queues again (with
- // #StoreLoad memory barrier in between). If we do it the other way around,
- // another thread can submit a goroutine after we've checked all run queues
- // but before we drop nmspinning; as a result nobody will unpark a thread
- // to run the goroutine.
+ // Delicate dance: thread transitions from spinning to non-spinning
+ // state, potentially concurrently with submission of new work. We must
+ // drop nmspinning first and then check all sources again (with
+ // #StoreLoad memory barrier in between). If we do it the other way
+ // around, another thread can submit work after we've checked all
+ // sources but before we drop nmspinning; as a result nobody will
+ // unpark a thread to run the work.
+ //
+ // This applies to the following sources of work:
+ //
+ // * Goroutines added to a per-P run queue.
+ // * New/modified-earlier timers on a per-P timer heap.
+ // * Idle-priority GC work (barring golang.org/issue/19112).
+ //
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
- // we also observe no idle Ps, it is OK to just park the current thread:
- // the system is fully loaded so no spinning threads are required.
+ // we also observe no idle Ps it is OK to skip unparking a new worker
+ // thread: the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
@@ -2784,97 +2878,48 @@ stop:
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
- }
- // check all runqueues once again
- for id, _p_ := range allpSnapshot {
- if !idlepMaskSnapshot.read(uint32(id)) && !runqempty(_p_) {
- lock(&sched.lock)
- _p_ = pidleget()
- unlock(&sched.lock)
- if _p_ != nil {
- acquirep(_p_)
- if wasSpinning {
- _g_.m.spinning = true
- atomic.Xadd(&sched.nmspinning, 1)
- }
- goto top
- }
- break
- }
- }
-
- // Similar to above, check for timer creation or expiry concurrently with
- // transitioning from spinning to non-spinning. Note that we cannot use
- // checkTimers here because it calls adjusttimers which may need to allocate
- // memory, and that isn't allowed when we don't have an active P.
- for id, _p_ := range allpSnapshot {
- if timerpMaskSnapshot.read(uint32(id)) {
- w := nobarrierWakeTime(_p_)
- if w != 0 && (pollUntil == 0 || w < pollUntil) {
- pollUntil = w
- }
- }
- }
- if pollUntil != 0 {
- if now == 0 {
- now = nanotime()
- }
- delta = pollUntil - now
- if delta < 0 {
- delta = 0
- }
- }
+ // Note the for correctness, only the last M transitioning from
+ // spinning to non-spinning must perform these rechecks to
+ // ensure no missed work. We are performing it on every M that
+ // transitions as a conservative change to monitor effects on
+ // latency. See golang.org/issue/43997.
- // Check for idle-priority GC work again.
- //
- // N.B. Since we have no P, gcBlackenEnabled may change at any time; we
- // must check again after acquiring a P.
- if atomic.Load(&gcBlackenEnabled) != 0 && gcMarkWorkAvailable(nil) {
- // Work is available; we can start an idle GC worker only if
- // there is an available P and available worker G.
- //
- // We can attempt to acquire these in either order. Workers are
- // almost always available (see comment in findRunnableGCWorker
- // for the one case there may be none). Since we're slightly
- // less likely to find a P, check for that first.
- lock(&sched.lock)
- var node *gcBgMarkWorkerNode
- _p_ = pidleget()
+ // Check all runqueues once again.
+ _p_ = checkRunqsNoP(allpSnapshot, idlepMaskSnapshot)
if _p_ != nil {
- // Now that we own a P, gcBlackenEnabled can't change
- // (as it requires STW).
- if gcBlackenEnabled != 0 {
- node = (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())
- if node == nil {
- pidleput(_p_)
- _p_ = nil
- }
- } else {
- pidleput(_p_)
- _p_ = nil
- }
+ acquirep(_p_)
+ _g_.m.spinning = true
+ atomic.Xadd(&sched.nmspinning, 1)
+ goto top
}
- unlock(&sched.lock)
+
+ // Check for idle-priority GC work again.
+ _p_, gp = checkIdleGCNoP()
if _p_ != nil {
acquirep(_p_)
- if wasSpinning {
- _g_.m.spinning = true
- atomic.Xadd(&sched.nmspinning, 1)
- }
+ _g_.m.spinning = true
+ atomic.Xadd(&sched.nmspinning, 1)
// Run the idle worker.
_p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
- gp := node.gp.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
+
+ // Finally, check for timer creation or expiry concurrently with
+ // transitioning from spinning to non-spinning.
+ //
+ // Note that we cannot use checkTimers here because it calls
+ // adjusttimers which may need to allocate memory, and that isn't
+ // allowed when we don't have an active P.
+ pollUntil = checkTimersNoP(allpSnapshot, timerpMaskSnapshot, pollUntil)
}
- // poll network
+ // Poll network until next timer.
if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
atomic.Store64(&sched.pollUntil, uint64(pollUntil))
if _g_.m.p != 0 {
@@ -2883,11 +2928,21 @@ stop:
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
+ delay := int64(-1)
+ if pollUntil != 0 {
+ if now == 0 {
+ now = nanotime()
+ }
+ delay = pollUntil - now
+ if delay < 0 {
+ delay = 0
+ }
+ }
if faketime != 0 {
// When using fake time, just poll.
- delta = 0
+ delay = 0
}
- list := netpoll(delta) // block until new work is available
+ list := netpoll(delay) // block until new work is available
atomic.Store64(&sched.pollUntil, 0)
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if faketime != 0 && list.empty() {
@@ -2949,6 +3004,168 @@ func pollWork() bool {
return false
}
+// stealWork attempts to steal a runnable goroutine or timer from any P.
+//
+// If newWork is true, new work may have been readied.
+//
+// If now is not 0 it is the current time. stealWork returns the passed time or
+// the current time if now was passed as 0.
+func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
+ pp := getg().m.p.ptr()
+
+ ranTimer := false
+
+ const stealTries = 4
+ for i := 0; i < stealTries; i++ {
+ stealTimersOrRunNextG := i == stealTries-1
+
+ for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
+ if sched.gcwaiting != 0 {
+ // GC work may be available.
+ return nil, false, now, pollUntil, true
+ }
+ p2 := allp[enum.position()]
+ if pp == p2 {
+ continue
+ }
+
+ // Steal timers from p2. This call to checkTimers is the only place
+ // where we might hold a lock on a different P's timers. We do this
+ // once on the last pass before checking runnext because stealing
+ // from the other P's runnext should be the last resort, so if there
+ // are timers to steal do that first.
+ //
+ // We only check timers on one of the stealing iterations because
+ // the time stored in now doesn't change in this loop and checking
+ // the timers for each P more than once with the same value of now
+ // is probably a waste of time.
+ //
+ // timerpMask tells us whether the P may have timers at all. If it
+ // can't, no need to check at all.
+ if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
+ tnow, w, ran := checkTimers(p2, now)
+ now = tnow
+ if w != 0 && (pollUntil == 0 || w < pollUntil) {
+ pollUntil = w
+ }
+ if ran {
+ // Running the timers may have
+ // made an arbitrary number of G's
+ // ready and added them to this P's
+ // local run queue. That invalidates
+ // the assumption of runqsteal
+ // that it always has room to add
+ // stolen G's. So check now if there
+ // is a local G to run.
+ if gp, inheritTime := runqget(pp); gp != nil {
+ return gp, inheritTime, now, pollUntil, ranTimer
+ }
+ ranTimer = true
+ }
+ }
+
+ // Don't bother to attempt to steal if p2 is idle.
+ if !idlepMask.read(enum.position()) {
+ if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
+ return gp, false, now, pollUntil, ranTimer
+ }
+ }
+ }
+ }
+
+ // No goroutines found to steal. Regardless, running a timer may have
+ // made some goroutine ready that we missed. Indicate the next timer to
+ // wait for.
+ return nil, false, now, pollUntil, ranTimer
+}
+
+// Check all Ps for a runnable G to steal.
+//
+// On entry we have no P. If a G is available to steal and a P is available,
+// the P is returned which the caller should acquire and attempt to steal the
+// work to.
+func checkRunqsNoP(allpSnapshot []*p, idlepMaskSnapshot pMask) *p {
+ for id, p2 := range allpSnapshot {
+ if !idlepMaskSnapshot.read(uint32(id)) && !runqempty(p2) {
+ lock(&sched.lock)
+ pp := pidleget()
+ unlock(&sched.lock)
+ if pp != nil {
+ return pp
+ }
+
+ // Can't get a P, don't bother checking remaining Ps.
+ break
+ }
+ }
+
+ return nil
+}
+
+// Check all Ps for a timer expiring sooner than pollUntil.
+//
+// Returns updated pollUntil value.
+func checkTimersNoP(allpSnapshot []*p, timerpMaskSnapshot pMask, pollUntil int64) int64 {
+ for id, p2 := range allpSnapshot {
+ if timerpMaskSnapshot.read(uint32(id)) {
+ w := nobarrierWakeTime(p2)
+ if w != 0 && (pollUntil == 0 || w < pollUntil) {
+ pollUntil = w
+ }
+ }
+ }
+
+ return pollUntil
+}
+
+// Check for idle-priority GC, without a P on entry.
+//
+// If some GC work, a P, and a worker G are all available, the P and G will be
+// returned. The returned P has not been wired yet.
+func checkIdleGCNoP() (*p, *g) {
+ // N.B. Since we have no P, gcBlackenEnabled may change at any time; we
+ // must check again after acquiring a P.
+ if atomic.Load(&gcBlackenEnabled) == 0 {
+ return nil, nil
+ }
+ if !gcMarkWorkAvailable(nil) {
+ return nil, nil
+ }
+
+ // Work is available; we can start an idle GC worker only if
+ // there is an available P and available worker G.
+ //
+ // We can attempt to acquire these in either order. Workers are
+ // almost always available (see comment in findRunnableGCWorker
+ // for the one case there may be none). Since we're slightly
+ // less likely to find a P, check for that first.
+ lock(&sched.lock)
+ pp := pidleget()
+ unlock(&sched.lock)
+ if pp == nil {
+ return nil, nil
+ }
+
+ // Now that we own a P, gcBlackenEnabled can't change
+ // (as it requires STW).
+ if gcBlackenEnabled == 0 {
+ lock(&sched.lock)
+ pidleput(pp)
+ unlock(&sched.lock)
+ return nil, nil
+ }
+
+ node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())
+ if node == nil {
+ lock(&sched.lock)
+ pidleput(pp)
+ unlock(&sched.lock)
+ return nil, nil
+ }
+
+ return pp, node.gp.ptr()
+}
+
// wakeNetPoller wakes up the thread sleeping in the network poller if it isn't
// going to wake up before the when argument; or it wakes an idle P to service
// timers and the network poller if there isn't one already.
@@ -3115,7 +3332,9 @@ top:
}
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
- tryWakeP = tryWakeP || gp != nil
+ if gp != nil {
+ tryWakeP = true
+ }
}
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
@@ -3191,7 +3410,7 @@ func dropg() {
// checkTimers runs any timers for the P that are ready.
// If now is not 0 it is the current time.
-// It returns the current time or 0 if it is not known,
+// It returns the passed time or the current time if now was passed as 0.
// and the time when the next timer should run or 0 if there is no next timer,
// and reports whether it ran any timers.
// If the time when the next timer should run is not 0,
@@ -3828,15 +4047,15 @@ func exitsyscallfast_pidle() bool {
// exitsyscall slow path on g0.
// Failed to acquire P, enqueue gp as runnable.
//
+// Called via mcall, so gp is the calling g from this M.
+//
//go:nowritebarrierrec
func exitsyscall0(gp *g) {
- _g_ := getg()
-
casgstatus(gp, _Gsyscall, _Grunnable)
dropg()
lock(&sched.lock)
var _p_ *p
- if schedEnabled(_g_) {
+ if schedEnabled(gp) {
_p_ = pidleget()
}
if _p_ == nil {
@@ -3850,8 +4069,11 @@ func exitsyscall0(gp *g) {
acquirep(_p_)
execute(gp, false) // Never returns.
}
- if _g_.m.lockedg != 0 {
+ if gp.lockedm != 0 {
// Wait until another thread schedules gp and so m again.
+ //
+ // N.B. lockedm must be this M, as this g was running on this M
+ // before entersyscall.
stoplockedm()
execute(gp, false) // Never returns.
}
@@ -4016,6 +4238,14 @@ func newproc(siz int32, fn *funcval) {
//
//go:systemstack
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
+ if goexperiment.RegabiDefer && narg != 0 {
+ // TODO: When we commit to GOEXPERIMENT=regabidefer,
+ // rewrite the comments for newproc and newproc1.
+ // newproc will no longer have a funny stack layout or
+ // need to be nosplit.
+ throw("go with non-empty frame")
+ }
+
_g_ := getg()
if fn == nil {
@@ -4081,7 +4311,7 @@ func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerp
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
- newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
+ newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
@@ -4093,6 +4323,11 @@ func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerp
if isSystemGoroutine(newg, false) {
atomic.Xadd(&sched.ngsys, +1)
}
+ // Track initial transition?
+ newg.trackingSeq = uint8(fastrand())
+ if newg.trackingSeq%gTrackingPeriod == 0 {
+ newg.tracking = true
+ }
casgstatus(newg, _Gdead, _Grunnable)
if _p_.goidcache == _p_.goidcacheend {
@@ -4170,17 +4405,25 @@ func gfput(_p_ *p, gp *g) {
_p_.gFree.push(gp)
_p_.gFree.n++
if _p_.gFree.n >= 64 {
- lock(&sched.gFree.lock)
+ var (
+ inc int32
+ stackQ gQueue
+ noStackQ gQueue
+ )
for _p_.gFree.n >= 32 {
- _p_.gFree.n--
gp = _p_.gFree.pop()
+ _p_.gFree.n--
if gp.stack.lo == 0 {
- sched.gFree.noStack.push(gp)
+ noStackQ.push(gp)
} else {
- sched.gFree.stack.push(gp)
+ stackQ.push(gp)
}
- sched.gFree.n++
+ inc++
}
+ lock(&sched.gFree.lock)
+ sched.gFree.noStack.pushAll(noStackQ)
+ sched.gFree.stack.pushAll(stackQ)
+ sched.gFree.n += inc
unlock(&sched.gFree.lock)
}
}
@@ -4232,17 +4475,25 @@ retry:
// Purge all cached G's from gfree list to the global list.
func gfpurge(_p_ *p) {
- lock(&sched.gFree.lock)
+ var (
+ inc int32
+ stackQ gQueue
+ noStackQ gQueue
+ )
for !_p_.gFree.empty() {
gp := _p_.gFree.pop()
_p_.gFree.n--
if gp.stack.lo == 0 {
- sched.gFree.noStack.push(gp)
+ noStackQ.push(gp)
} else {
- sched.gFree.stack.push(gp)
+ stackQ.push(gp)
}
- sched.gFree.n++
+ inc++
}
+ lock(&sched.gFree.lock)
+ sched.gFree.noStack.pushAll(noStackQ)
+ sched.gFree.stack.pushAll(stackQ)
+ sched.gFree.n += inc
unlock(&sched.gFree.lock)
}
@@ -4953,11 +5204,9 @@ func checkdead() {
}
grunning := 0
- lock(&allglock)
- for i := 0; i < len(allgs); i++ {
- gp := allgs[i]
+ forEachG(func(gp *g) {
if isSystemGoroutine(gp, false) {
- continue
+ return
}
s := readgstatus(gp)
switch s &^ _Gscan {
@@ -4970,8 +5219,7 @@ func checkdead() {
print("runtime: checkdead: find g ", gp.goid, " in status ", s, "\n")
throw("checkdead: runnable g")
}
- }
- unlock(&allglock)
+ })
if grunning == 0 { // possible if main goroutine calls runtime·Goexit()
unlock(&sched.lock) // unlock so that GODEBUG=scheddetail=1 doesn't hang
throw("no goroutines (main called runtime.Goexit) - deadlock!")
@@ -5275,7 +5523,7 @@ func preemptall() bool {
// Tell the goroutine running on processor P to stop.
// This function is purely best-effort. It can incorrectly fail to inform the
-// goroutine. It can send inform the wrong goroutine. Even if it informs the
+// goroutine. It can inform the wrong goroutine. Even if it informs the
// correct goroutine, that goroutine might ignore the request if it is
// simultaneously executing newstack.
// No lock needs to be held.
@@ -5295,7 +5543,7 @@ func preemptone(_p_ *p) bool {
gp.preempt = true
- // Every call in a go routine checks for stack overflow by
+ // Every call in a goroutine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
@@ -5374,9 +5622,7 @@ func schedtrace(detailed bool) {
print(" M", mp.id, ": p=", id1, " curg=", id2, " mallocing=", mp.mallocing, " throwing=", mp.throwing, " preemptoff=", mp.preemptoff, ""+" locks=", mp.locks, " dying=", mp.dying, " spinning=", mp.spinning, " blocked=", mp.blocked, " lockedg=", id3, "\n")
}
- lock(&allglock)
- for gi := 0; gi < len(allgs); gi++ {
- gp := allgs[gi]
+ forEachG(func(gp *g) {
mp := gp.m
lockedm := gp.lockedm.ptr()
id1 := int64(-1)
@@ -5388,8 +5634,7 @@ func schedtrace(detailed bool) {
id2 = lockedm.id
}
print(" G", gp.goid, ": status=", readgstatus(gp), "(", gp.waitreason.String(), ") m=", id1, " lockedm=", id2, "\n")
- }
- unlock(&allglock)
+ })
unlock(&sched.lock)
}
@@ -5484,6 +5729,8 @@ func globrunqputhead(gp *g) {
// Put a batch of runnable goroutines on the global runnable queue.
// This clears *batch.
// sched.lock must be held.
+// May run during STW, so write barriers are not allowed.
+//go:nowritebarrierrec
func globrunqputbatch(batch *gQueue, n int32) {
assertLockHeld(&sched.lock)
@@ -5799,6 +6046,45 @@ func runqget(_p_ *p) (gp *g, inheritTime bool) {
}
}
+// runqdrain drains the local runnable queue of _p_ and returns all goroutines in it.
+// Executed only by the owner P.
+func runqdrain(_p_ *p) (drainQ gQueue, n uint32) {
+ oldNext := _p_.runnext
+ if oldNext != 0 && _p_.runnext.cas(oldNext, 0) {
+ drainQ.pushBack(oldNext.ptr())
+ n++
+ }
+
+retry:
+ h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
+ t := _p_.runqtail
+ qn := t - h
+ if qn == 0 {
+ return
+ }
+ if qn > uint32(len(_p_.runq)) { // read inconsistent h and t
+ goto retry
+ }
+
+ if !atomic.CasRel(&_p_.runqhead, h, h+qn) { // cas-release, commits consume
+ goto retry
+ }
+
+ // We've inverted the order in which it gets G's from the local P's runnable queue
+ // and then advances the head pointer because we don't want to mess up the statuses of G's
+ // while runqdrain() and runqsteal() are running in parallel.
+ // Thus we should advance the head pointer before draining the local P into a gQueue,
+ // so that we can update any gp.schedlink only after we take the full ownership of G,
+ // meanwhile, other P's can't access to all G's in local P's runnable queue and steal them.
+ // See https://groups.google.com/g/golang-dev/c/0pTKxEKhHSc/m/6Q85QjdVBQAJ for more details.
+ for i := uint32(0); i < qn; i++ {
+ gp := _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
+ drainQ.pushBack(gp)
+ n++
+ }
+ return
+}
+
// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
@@ -5992,26 +6278,6 @@ func setMaxThreads(in int) (out int) {
return
}
-func haveexperiment(name string) bool {
- x := sys.Goexperiment
- for x != "" {
- xname := ""
- i := bytealg.IndexByteString(x, ',')
- if i < 0 {
- xname, x = x, ""
- } else {
- xname, x = x[:i], x[i+1:]
- }
- if xname == name {
- return true
- }
- if len(xname) > 2 && xname[:2] == "no" && xname[2:] == name {
- return false
- }
- }
- return false
-}
-
//go:nosplit
func procPin() int {
_g_ := getg()
@@ -6148,7 +6414,7 @@ var inittrace tracestat
type tracestat struct {
active bool // init tracing activation status
- id int64 // init go routine id
+ id int64 // init goroutine id
allocs uint64 // heap allocations
bytes uint64 // heap allocated bytes
}
@@ -6180,7 +6446,7 @@ func doInit(t *initTask) {
if inittrace.active {
start = nanotime()
- // Load stats non-atomically since tracinit is updated only by this init go routine.
+ // Load stats non-atomically since tracinit is updated only by this init goroutine.
before = inittrace
}
@@ -6193,7 +6459,7 @@ func doInit(t *initTask) {
if inittrace.active {
end := nanotime()
- // Load stats non-atomically since tracinit is updated only by this init go routine.
+ // Load stats non-atomically since tracinit is updated only by this init goroutine.
after := inittrace
pkg := funcpkgpath(findfunc(funcPC(firstFunc)))