aboutsummaryrefslogtreecommitdiff
path: root/src/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'src/runtime')
-rw-r--r--src/runtime/proc1.go141
-rw-r--r--src/runtime/proc_test.go51
-rw-r--r--src/runtime/runtime2.go15
3 files changed, 162 insertions, 45 deletions
diff --git a/src/runtime/proc1.go b/src/runtime/proc1.go
index d37c4f1a5a..166d7c84eb 100644
--- a/src/runtime/proc1.go
+++ b/src/runtime/proc1.go
@@ -145,7 +145,7 @@ func ready(gp *g, traceskip int) {
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
- runqput(_g_.m.p.ptr(), gp)
+ runqput(_g_.m.p.ptr(), gp, true)
if atomicload(&sched.npidle) != 0 && atomicload(&sched.nmspinning) == 0 { // TODO: fast atomic
wakep()
}
@@ -185,12 +185,12 @@ func readyExecute(gp *g, traceskip int) {
// Preempt the current g
casgstatus(_g_, _Grunning, _Grunnable)
- runqput(_g_.m.p.ptr(), _g_)
+ runqput(_g_.m.p.ptr(), _g_, false)
dropg()
// Ready gp and switch to it
casgstatus(gp, _Gwaiting, _Grunnable)
- execute(gp)
+ execute(gp, false)
})
}
@@ -1233,15 +1233,19 @@ func gcstopm() {
}
// Schedules gp to run on the current M.
+// If inheritTime is true, gp inherits the remaining time in the
+// current time slice. Otherwise, it starts a new time slice.
// Never returns.
-func execute(gp *g) {
+func execute(gp *g, inheritTime bool) {
_g_ := getg()
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
- _g_.m.p.ptr().schedtick++
+ if !inheritTime {
+ _g_.m.p.ptr().schedtick++
+ }
_g_.m.curg = gp
gp.m = _g_.m
@@ -1260,7 +1264,7 @@ func execute(gp *g) {
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
-func findrunnable() *g {
+func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
@@ -1275,8 +1279,8 @@ top:
}
// local runq
- if gp := runqget(_g_.m.p.ptr()); gp != nil {
- return gp
+ if gp, inheritTime := runqget(_g_.m.p.ptr()); gp != nil {
+ return gp, inheritTime
}
// global runq
@@ -1285,7 +1289,7 @@ top:
gp := globrunqget(_g_.m.p.ptr(), 0)
unlock(&sched.lock)
if gp != nil {
- return gp
+ return gp, false
}
}
@@ -1303,7 +1307,7 @@ top:
if trace.enabled {
traceGoUnpark(gp, 0)
}
- return gp
+ return gp, false
}
}
@@ -1325,12 +1329,12 @@ top:
_p_ := allp[fastrand1()%uint32(gomaxprocs)]
var gp *g
if _p_ == _g_.m.p.ptr() {
- gp = runqget(_p_)
+ gp, _ = runqget(_p_)
} else {
gp = runqsteal(_g_.m.p.ptr(), _p_)
}
if gp != nil {
- return gp
+ return gp, false
}
}
stop:
@@ -1344,7 +1348,7 @@ stop:
if trace.enabled {
traceGoUnpark(gp, 0)
}
- return gp
+ return gp, false
}
// return P and block
@@ -1356,7 +1360,7 @@ stop:
if sched.runqsize != 0 {
gp := globrunqget(_g_.m.p.ptr(), 0)
unlock(&sched.lock)
- return gp
+ return gp, false
}
_p_ := releasep()
pidleput(_p_)
@@ -1402,7 +1406,7 @@ stop:
if trace.enabled {
traceGoUnpark(gp, 0)
}
- return gp
+ return gp, false
}
injectglist(gp)
}
@@ -1468,7 +1472,7 @@ func schedule() {
if _g_.m.lockedg != nil {
stoplockedm()
- execute(_g_.m.lockedg) // Never returns.
+ execute(_g_.m.lockedg, false) // Never returns.
}
top:
@@ -1478,6 +1482,7 @@ top:
}
var gp *g
+ var inheritTime bool
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
@@ -1506,13 +1511,13 @@ top:
}
}
if gp == nil {
- gp = runqget(_g_.m.p.ptr())
+ gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
if gp == nil {
- gp = findrunnable() // blocks until work is available
+ gp, inheritTime = findrunnable() // blocks until work is available
resetspinning()
}
@@ -1523,7 +1528,7 @@ top:
goto top
}
- execute(gp)
+ execute(gp, inheritTime)
}
// dropg removes the association between m and the current goroutine m->curg (gp for short).
@@ -1568,7 +1573,7 @@ func park_m(gp *g) {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
- execute(gp) // Schedule it back, never returns.
+ execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
@@ -2007,12 +2012,12 @@ func exitsyscall0(gp *g) {
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
- execute(gp) // Never returns.
+ execute(gp, false) // Never returns.
}
if _g_.m.lockedg != nil {
// Wait until another thread schedules gp and so m again.
stoplockedm()
- execute(gp) // Never returns.
+ execute(gp, false) // Never returns.
}
stopm()
schedule() // Never returns.
@@ -2168,7 +2173,7 @@ func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr
if trace.enabled {
traceGoCreate(newg, newg.startpc)
}
- runqput(_p_, newg)
+ runqput(_p_, newg, true)
if atomicload(&sched.npidle) != 0 && atomicload(&sched.nmspinning) == 0 && unsafe.Pointer(fn.fn) != unsafe.Pointer(funcPC(main)) { // TODO: fast atomic
wakep()
@@ -2596,12 +2601,11 @@ func procresize(nprocs int32) *p {
p.runqtail--
gp := p.runq[p.runqtail%uint32(len(p.runq))]
// push onto head of global queue
- gp.schedlink = sched.runqhead
- sched.runqhead.set(gp)
- if sched.runqtail == 0 {
- sched.runqtail.set(gp)
- }
- sched.runqsize++
+ globrunqputhead(gp)
+ }
+ if p.runnext != 0 {
+ globrunqputhead(p.runnext.ptr())
+ p.runnext = 0
}
// if there's a background worker, make it runnable and put
// it on the global queue so it can clean itself up
@@ -3150,6 +3154,19 @@ func globrunqput(gp *g) {
sched.runqsize++
}
+// Put gp at the head of the global runnable queue.
+// Sched must be locked.
+// May run during STW, so write barriers are not allowed.
+//go:nowritebarrier
+func globrunqputhead(gp *g) {
+ gp.schedlink = sched.runqhead
+ sched.runqhead.set(gp)
+ if sched.runqtail == 0 {
+ sched.runqtail.set(gp)
+ }
+ sched.runqsize++
+}
+
// Put a batch of runnable goroutines on the global runnable queue.
// Sched must be locked.
func globrunqputbatch(ghead *g, gtail *g, n int32) {
@@ -3192,7 +3209,7 @@ func globrunqget(_p_ *p, max int32) *g {
for ; n > 0; n-- {
gp1 := sched.runqhead.ptr()
sched.runqhead = gp1.schedlink
- runqput(_p_, gp1)
+ runqput(_p_, gp1, false)
}
return gp
}
@@ -3223,13 +3240,28 @@ func pidleget() *p {
// runqempty returns true if _p_ has no Gs on its local run queue.
// Note that this test is generally racy.
func runqempty(_p_ *p) bool {
- return _p_.runqhead == _p_.runqtail
+ return _p_.runqhead == _p_.runqtail && _p_.runnext == 0
}
-// Try to put g on local runnable queue.
-// If it's full, put onto global queue.
+// runqput tries to put g on the local runnable queue.
+// If next if false, runqput adds g to the tail of the runnable queue.
+// If next is true, runqput puts g in the _p_.runnext slot.
+// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
-func runqput(_p_ *p, gp *g) {
+func runqput(_p_ *p, gp *g, next bool) {
+ if next {
+ retryNext:
+ oldnext := _p_.runnext
+ if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
+ goto retryNext
+ }
+ if oldnext == 0 {
+ return
+ }
+ // Kick the old runnext out to the regular run queue.
+ gp = oldnext.ptr()
+ }
+
retry:
h := atomicload(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
@@ -3277,17 +3309,30 @@ func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
}
// Get g from local runnable queue.
+// If inheritTime is true, gp should inherit the remaining time in the
+// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
-func runqget(_p_ *p) *g {
+func runqget(_p_ *p) (gp *g, inheritTime bool) {
+ // If there's a runnext, it's the next G to run.
+ for {
+ next := _p_.runnext
+ if next == 0 {
+ break
+ }
+ if _p_.runnext.cas(next, 0) {
+ return next.ptr(), true
+ }
+ }
+
for {
h := atomicload(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
if t == h {
- return nil
+ return nil, false
}
gp := _p_.runq[h%uint32(len(_p_.runq))]
if cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume
- return gp
+ return gp, false
}
}
}
@@ -3302,6 +3347,14 @@ func runqgrab(_p_ *p, batch []*g) uint32 {
n := t - h
n = n - n/2
if n == 0 {
+ // Try to steal from _p_.runnext.
+ if next := _p_.runnext; next != 0 {
+ if !_p_.runnext.cas(next, 0) {
+ continue
+ }
+ batch[0] = next.ptr()
+ return 1
+ }
return 0
}
if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
@@ -3347,19 +3400,19 @@ func testSchedLocalQueue() {
_p_ := new(p)
gs := make([]g, len(_p_.runq))
for i := 0; i < len(_p_.runq); i++ {
- if runqget(_p_) != nil {
+ if g, _ := runqget(_p_); g != nil {
throw("runq is not empty initially")
}
for j := 0; j < i; j++ {
- runqput(_p_, &gs[i])
+ runqput(_p_, &gs[i], false)
}
for j := 0; j < i; j++ {
- if runqget(_p_) != &gs[i] {
+ if g, _ := runqget(_p_); g != &gs[i] {
print("bad element at iter ", i, "/", j, "\n")
throw("bad element")
}
}
- if runqget(_p_) != nil {
+ if g, _ := runqget(_p_); g != nil {
throw("runq is not empty afterwards")
}
}
@@ -3372,7 +3425,7 @@ func testSchedLocalQueueSteal() {
for i := 0; i < len(p1.runq); i++ {
for j := 0; j < i; j++ {
gs[j].sig = 0
- runqput(p1, &gs[j])
+ runqput(p1, &gs[j], false)
}
gp := runqsteal(p2, p1)
s := 0
@@ -3381,7 +3434,7 @@ func testSchedLocalQueueSteal() {
gp.sig++
}
for {
- gp = runqget(p2)
+ gp, _ = runqget(p2)
if gp == nil {
break
}
@@ -3389,7 +3442,7 @@ func testSchedLocalQueueSteal() {
gp.sig++
}
for {
- gp = runqget(p1)
+ gp, _ = runqget(p1)
if gp == nil {
break
}
diff --git a/src/runtime/proc_test.go b/src/runtime/proc_test.go
index af90215238..fccf397062 100644
--- a/src/runtime/proc_test.go
+++ b/src/runtime/proc_test.go
@@ -292,6 +292,57 @@ func main() {
}
`
+func TestPingPongHog(t *testing.T) {
+ if testing.Short() {
+ t.Skip("skipping in -short mode")
+ }
+
+ defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1))
+ done := make(chan bool)
+ hogChan, lightChan := make(chan bool), make(chan bool)
+ hogCount, lightCount := 0, 0
+
+ run := func(limit int, counter *int, wake chan bool) {
+ for {
+ select {
+ case <-done:
+ return
+
+ case <-wake:
+ for i := 0; i < limit; i++ {
+ *counter++
+ }
+ wake <- true
+ }
+ }
+ }
+
+ // Start two co-scheduled hog goroutines.
+ for i := 0; i < 2; i++ {
+ go run(1e6, &hogCount, hogChan)
+ }
+
+ // Start two co-scheduled light goroutines.
+ for i := 0; i < 2; i++ {
+ go run(1e3, &lightCount, lightChan)
+ }
+
+ // Start goroutine pairs and wait for a few preemption rounds.
+ hogChan <- true
+ lightChan <- true
+ time.Sleep(100 * time.Millisecond)
+ close(done)
+ <-hogChan
+ <-lightChan
+
+ // Check that hogCount and lightCount are within a factor of
+ // 2, which indicates that both pairs of goroutines handed off
+ // the P within a time-slice to their buddy.
+ if hogCount > lightCount*2 || lightCount > hogCount*2 {
+ t.Fatalf("want hogCount/lightCount in [0.5, 2]; got %d/%d = %g", hogCount, lightCount, float64(hogCount)/float64(lightCount))
+ }
+}
+
func BenchmarkPingPongHog(b *testing.B) {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1))
diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go
index eacf5f094b..476108e36c 100644
--- a/src/runtime/runtime2.go
+++ b/src/runtime/runtime2.go
@@ -129,6 +129,9 @@ type guintptr uintptr
func (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) }
func (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) }
+func (gp *guintptr) cas(old, new guintptr) bool {
+ return casuintptr((*uintptr)(unsafe.Pointer(gp)), uintptr(old), uintptr(new))
+}
type puintptr uintptr
@@ -350,10 +353,20 @@ type p struct {
goidcache uint64
goidcacheend uint64
- // Queue of runnable goroutines.
+ // Queue of runnable goroutines. Accessed without lock.
runqhead uint32
runqtail uint32
runq [256]*g
+ // runnext, if non-nil, is a runnable G that was ready'd by
+ // the current G and should be run next instead of what's in
+ // runq if there's time remaining in the running G's time
+ // slice. It will inherit the time left in the current time
+ // slice. If a set of goroutines is locked in a
+ // communicate-and-wait pattern, this schedules that set as a
+ // unit and eliminates the (potentially large) scheduling
+ // latency that otherwise arises from adding the ready'd
+ // goroutines to the end of the run queue.
+ runnext guintptr
// Available G's (status == Gdead)
gfree *g