aboutsummaryrefslogtreecommitdiff
path: root/src/runtime
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2015-12-08 15:11:27 +0100
committerDmitry Vyukov <dvyukov@google.com>2015-12-11 11:31:12 +0000
commitfb6f8a96f24f6b30e99cc77d78bc0194ffec7a41 (patch)
tree4f407ee25a38172dacb825259c48a435549c8c23 /src/runtime
parent8545ea9cee087fd0fbac41bba7616d2fc4f2bc19 (diff)
downloadgo-fb6f8a96f24f6b30e99cc77d78bc0194ffec7a41.tar.xz
runtime: remove unnecessary wakeups of worker threads
Currently we wake up new worker threads whenever we pass through the scheduler with nmspinning==0. This leads to lots of unnecessary thread wake ups. Instead let only spinning threads wake up new spinning threads. For the following program: package main import "runtime" func main() { for i := 0; i < 1e7; i++ { runtime.Gosched() } } Before: $ time ./test real 0m4.278s user 0m7.634s sys 0m1.423s $ strace -c ./test % time seconds usecs/call calls errors syscall 99.93 9.314936 3 2685009 17536 futex After: $ time ./test real 0m1.200s user 0m1.181s sys 0m0.024s $ strace -c ./test % time seconds usecs/call calls errors syscall 3.11 0.000049 25 2 futex Fixes #13527 Change-Id: Ia1f5bf8a896dcc25d8b04beb1f4317aa9ff16f74 Reviewed-on: https://go-review.googlesource.com/17540 Reviewed-by: Austin Clements <austin@google.com> Run-TryBot: Dmitry Vyukov <dvyukov@google.com> TryBot-Result: Gobot Gobot <gobot@golang.org>
Diffstat (limited to 'src/runtime')
-rw-r--r--src/runtime/proc.go142
-rw-r--r--src/runtime/proc_test.go74
-rw-r--r--src/runtime/runtime2.go2
3 files changed, 181 insertions, 37 deletions
diff --git a/src/runtime/proc.go b/src/runtime/proc.go
index 9ef7bfb954..c0df6f1d05 100644
--- a/src/runtime/proc.go
+++ b/src/runtime/proc.go
@@ -22,6 +22,57 @@ import (
//
// Design doc at https://golang.org/s/go11sched.
+// Worker thread parking/unparking.
+// We need to balance between keeping enough running worker threads to utilize
+// available hardware parallelism and parking excessive running worker threads
+// to conserve CPU resources and power. This is not simple for two reasons:
+// (1) scheduler state is intentionally distributed (in particular, per-P work
+// queues), so it is not possible to compute global predicates on fast paths;
+// (2) for optimal thread management we would need to know the future (don't park
+// a worker thread when a new goroutine will be readied in near future).
+//
+// Three rejected approaches that would work badly:
+// 1. Centralize all scheduler state (would inhibit scalability).
+// 2. Direct goroutine handoff. That is, when we ready a new goroutine and there
+// is a spare P, unpark a thread and handoff it the thread and the goroutine.
+// This would lead to thread state thrashing, as the thread that readied the
+// goroutine can be out of work the very next moment, we will need to park it.
+// Also, it would destroy locality of computation as we want to preserve
+// dependent goroutines on the same thread; and introduce additional latency.
+// 3. Unpark an additional thread whenever we ready a goroutine and there is an
+// idle P, but don't do handoff. This would lead to excessive thread parking/
+// unparking as the additional threads will instantly park without discovering
+// any work to do.
+//
+// The current approach:
+// We unpark an additional thread when we ready a goroutine if (1) there is an
+// idle P and there are no "spinning" worker threads. A worker thread is considered
+// spinning if it is out of local work and did not find work in global run queue/
+// netpoller; the spinning state is denoted in m.spinning and in sched.nmspinning.
+// Threads unparked this way are also considered spinning; we don't do goroutine
+// handoff so such threads are out of work initially. Spinning threads do some
+// spinning looking for work in per-P run queues before parking. If a spinning
+// thread finds work it takes itself out of the spinning state and proceeds to
+// execution. If it does not find work it takes itself out of the spinning state
+// and then parks.
+// If there is at least one spinning thread (sched.nmspinning>1), we don't unpark
+// new threads when readying goroutines. To compensate for that, if the last spinning
+// thread finds work and stops spinning, it must unpark a new spinning thread.
+// This approach smooths out unjustified spikes of thread unparking,
+// but at the same time guarantees eventual maximal CPU parallelism utilization.
+//
+// The main implementation complication is that we need to be very careful during
+// spinning->non-spinning thread transition. This transition can race with submission
+// of a new goroutine, and either one part or another needs to unpark another worker
+// thread. If they both fail to do that, we can end up with semi-persistent CPU
+// underutilization. The general pattern for goroutine readying is: submit a goroutine
+// to local work queue, #StoreLoad-style memory barrier, check sched.nmspinning.
+// The general pattern for spinning->non-spinning transition is: decrement nmspinning,
+// #StoreLoad-style memory barrier, check all per-P work queues for new work.
+// Note that all this complexity does not apply to global run queue as we are not
+// sloppy about thread unparking when submitting to global queue. Also see comments
+// for nmspinning manipulation.
+
var (
m0 m
g0 g
@@ -1454,8 +1505,7 @@ func stopm() {
throw("stopm holding p")
}
if _g_.m.spinning {
- _g_.m.spinning = false
- atomic.Xadd(&sched.nmspinning, -1)
+ throw("stopm spinning")
}
retry:
@@ -1476,22 +1526,15 @@ retry:
}
func mspinning() {
- gp := getg()
- if !runqempty(gp.m.nextp.ptr()) {
- // Something (presumably the GC) was readied while the
- // runtime was starting up this M, so the M is no
- // longer spinning.
- if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
- throw("mspinning: nmspinning underflowed")
- }
- } else {
- gp.m.spinning = true
- }
+ // startm's caller incremented nmspinning. Set the new M's spinning.
+ getg().m.spinning = true
}
// Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P's does nothing.
// May run with m.p==nil, so write barriers are not allowed.
+// If spinning is set, the caller has incremented nmspinning and startm will
+// either decrement nmspinning or set m.spinning in the newly started M.
//go:nowritebarrier
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
@@ -1500,7 +1543,11 @@ func startm(_p_ *p, spinning bool) {
if _p_ == nil {
unlock(&sched.lock)
if spinning {
- atomic.Xadd(&sched.nmspinning, -1)
+ // The caller incremented nmspinning, but there are no idle Ps,
+ // so it's okay to just undo the increment and give up.
+ if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
+ throw("startm: negative nmspinning")
+ }
}
return
}
@@ -1510,6 +1557,7 @@ func startm(_p_ *p, spinning bool) {
if mp == nil {
var fn func()
if spinning {
+ // The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_)
@@ -1524,6 +1572,7 @@ func startm(_p_ *p, spinning bool) {
if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs")
}
+ // The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning
mp.nextp.set(_p_)
notewakeup(&mp.park)
@@ -1645,7 +1694,11 @@ func gcstopm() {
}
if _g_.m.spinning {
_g_.m.spinning = false
- atomic.Xadd(&sched.nmspinning, -1)
+ // OK to just drop nmspinning here,
+ // startTheWorld will unpark threads as necessary.
+ if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
+ throw("gcstopm: negative nmspinning")
+ }
}
_p_ := releasep()
lock(&sched.lock)
@@ -1818,9 +1871,26 @@ stop:
_p_ := releasep()
pidleput(_p_)
unlock(&sched.lock)
+
+ // Delicate dance: thread transitions from spinning to non-spinning state,
+ // potentially concurrently with submission of new goroutines. We must
+ // drop nmspinning first and then check all per-P queues again (with
+ // #StoreLoad memory barrier in between). If we do it the other way around,
+ // another thread can submit a goroutine after we've checked all run queues
+ // but before we drop nmspinning; as the result nobody will unpark a thread
+ // to run the goroutine.
+ // If we discover new work below, we need to restore m.spinning as a signal
+ // for resetspinning to unpark a new worker thread (because there can be more
+ // than one starving goroutine). However, if after discovering new work
+ // we also observe no idle Ps, it is OK to just park the current thread:
+ // the system is fully loaded so no spinning threads are required.
+ // Also see "Worker thread parking/unparking" comment at the top of the file.
+ wasSpinning := _g_.m.spinning
if _g_.m.spinning {
_g_.m.spinning = false
- atomic.Xadd(&sched.nmspinning, -1)
+ if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
+ throw("findrunnable: negative nmspinning")
+ }
}
// check all runqueues once again
@@ -1832,6 +1902,10 @@ stop:
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
+ if wasSpinning {
+ _g_.m.spinning = true
+ atomic.Xadd(&sched.nmspinning, 1)
+ }
goto top
}
break
@@ -1870,20 +1944,17 @@ stop:
func resetspinning() {
_g_ := getg()
-
- var nmspinning uint32
- if _g_.m.spinning {
- _g_.m.spinning = false
- nmspinning = atomic.Xadd(&sched.nmspinning, -1)
- if int32(nmspinning) < 0 {
- throw("findrunnable: negative nmspinning")
- }
- } else {
- nmspinning = atomic.Load(&sched.nmspinning)
+ if !_g_.m.spinning {
+ throw("resetspinning: not a spinning m")
}
-
- // M wakeup policy is deliberately somewhat conservative (see nmspinning handling),
- // so see if we need to wakeup another P here.
+ _g_.m.spinning = false
+ nmspinning := atomic.Xadd(&sched.nmspinning, -1)
+ if int32(nmspinning) < 0 {
+ throw("findrunnable: negative nmspinning")
+ }
+ // M wakeup policy is deliberately somewhat conservative, so check if we
+ // need to wakeup another P here. See "Worker thread parking/unparking"
+ // comment at the top of the file for details.
if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 {
wakep()
}
@@ -1944,14 +2015,10 @@ top:
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
- resetspinning()
}
}
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
- if gp != nil {
- resetspinning()
- }
}
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
@@ -1961,9 +2028,6 @@ top:
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
- if gp != nil {
- resetspinning()
- }
}
}
if gp == nil {
@@ -1974,6 +2038,12 @@ top:
}
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
+ }
+
+ // This thread is going to run a goroutine and is not spinning anymore,
+ // so if it was marked as spinning we need to reset it now and potentially
+ // start a new spinning M.
+ if _g_.m.spinning {
resetspinning()
}
diff --git a/src/runtime/proc_test.go b/src/runtime/proc_test.go
index 2be103e3a6..c0213086b3 100644
--- a/src/runtime/proc_test.go
+++ b/src/runtime/proc_test.go
@@ -6,6 +6,7 @@ package runtime_test
import (
"math"
+ "net"
"runtime"
"runtime/debug"
"sync"
@@ -132,6 +133,79 @@ func TestGoroutineParallelism(t *testing.T) {
}
}
+// Test that all runnable goroutines are scheduled at the same time.
+func TestGoroutineParallelism2(t *testing.T) {
+ //testGoroutineParallelism2(t, false, false)
+ testGoroutineParallelism2(t, true, false)
+ testGoroutineParallelism2(t, false, true)
+ testGoroutineParallelism2(t, true, true)
+}
+
+func testGoroutineParallelism2(t *testing.T, load, netpoll bool) {
+ if runtime.NumCPU() == 1 {
+ // Takes too long, too easy to deadlock, etc.
+ t.Skip("skipping on uniprocessor")
+ }
+ P := 4
+ N := 10
+ if testing.Short() {
+ N = 3
+ }
+ defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P))
+ // If runtime triggers a forced GC during this test then it will deadlock,
+ // since the goroutines can't be stopped/preempted.
+ // Disable GC for this test (see issue #10958).
+ defer debug.SetGCPercent(debug.SetGCPercent(-1))
+ for try := 0; try < N; try++ {
+ if load {
+ // Create P goroutines and wait until they all run.
+ // When we run the actual test below, worker threads
+ // running the goroutines will start parking.
+ done := make(chan bool)
+ x := uint32(0)
+ for p := 0; p < P; p++ {
+ go func() {
+ if atomic.AddUint32(&x, 1) == uint32(P) {
+ done <- true
+ return
+ }
+ for atomic.LoadUint32(&x) != uint32(P) {
+ }
+ }()
+ }
+ <-done
+ }
+ if netpoll {
+ // Enable netpoller, affects schedler behavior.
+ ln, err := net.Listen("tcp", "localhost:0")
+ if err != nil {
+ defer ln.Close() // yup, defer in a loop
+ }
+ }
+ done := make(chan bool)
+ x := uint32(0)
+ // Spawn P goroutines in a nested fashion just to differ from TestGoroutineParallelism.
+ for p := 0; p < P/2; p++ {
+ go func(p int) {
+ for p2 := 0; p2 < 2; p2++ {
+ go func(p2 int) {
+ for i := 0; i < 3; i++ {
+ expected := uint32(P*i + p*2 + p2)
+ for atomic.LoadUint32(&x) != expected {
+ }
+ atomic.StoreUint32(&x, expected+1)
+ }
+ done <- true
+ }(p2)
+ }
+ }(p)
+ }
+ for p := 0; p < P; p++ {
+ <-done
+ }
+ }
+}
+
func TestBlockLocked(t *testing.T) {
const N = 10
c := make(chan bool)
diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go
index cfe4589448..86ed846064 100644
--- a/src/runtime/runtime2.go
+++ b/src/runtime/runtime2.go
@@ -419,7 +419,7 @@ type schedt struct {
pidle puintptr // idle p's
npidle uint32
- nmspinning uint32 // limited to [0, 2^31-1]
+ nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.
// Global runnable queue.
runqhead guintptr