aboutsummaryrefslogtreecommitdiff
path: root/src/runtime/proc1.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/runtime/proc1.go')
-rw-r--r--src/runtime/proc1.go141
1 files changed, 97 insertions, 44 deletions
diff --git a/src/runtime/proc1.go b/src/runtime/proc1.go
index d37c4f1a5a..166d7c84eb 100644
--- a/src/runtime/proc1.go
+++ b/src/runtime/proc1.go
@@ -145,7 +145,7 @@ func ready(gp *g, traceskip int) {
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
- runqput(_g_.m.p.ptr(), gp)
+ runqput(_g_.m.p.ptr(), gp, true)
if atomicload(&sched.npidle) != 0 && atomicload(&sched.nmspinning) == 0 { // TODO: fast atomic
wakep()
}
@@ -185,12 +185,12 @@ func readyExecute(gp *g, traceskip int) {
// Preempt the current g
casgstatus(_g_, _Grunning, _Grunnable)
- runqput(_g_.m.p.ptr(), _g_)
+ runqput(_g_.m.p.ptr(), _g_, false)
dropg()
// Ready gp and switch to it
casgstatus(gp, _Gwaiting, _Grunnable)
- execute(gp)
+ execute(gp, false)
})
}
@@ -1233,15 +1233,19 @@ func gcstopm() {
}
// Schedules gp to run on the current M.
+// If inheritTime is true, gp inherits the remaining time in the
+// current time slice. Otherwise, it starts a new time slice.
// Never returns.
-func execute(gp *g) {
+func execute(gp *g, inheritTime bool) {
_g_ := getg()
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
- _g_.m.p.ptr().schedtick++
+ if !inheritTime {
+ _g_.m.p.ptr().schedtick++
+ }
_g_.m.curg = gp
gp.m = _g_.m
@@ -1260,7 +1264,7 @@ func execute(gp *g) {
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
-func findrunnable() *g {
+func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
@@ -1275,8 +1279,8 @@ top:
}
// local runq
- if gp := runqget(_g_.m.p.ptr()); gp != nil {
- return gp
+ if gp, inheritTime := runqget(_g_.m.p.ptr()); gp != nil {
+ return gp, inheritTime
}
// global runq
@@ -1285,7 +1289,7 @@ top:
gp := globrunqget(_g_.m.p.ptr(), 0)
unlock(&sched.lock)
if gp != nil {
- return gp
+ return gp, false
}
}
@@ -1303,7 +1307,7 @@ top:
if trace.enabled {
traceGoUnpark(gp, 0)
}
- return gp
+ return gp, false
}
}
@@ -1325,12 +1329,12 @@ top:
_p_ := allp[fastrand1()%uint32(gomaxprocs)]
var gp *g
if _p_ == _g_.m.p.ptr() {
- gp = runqget(_p_)
+ gp, _ = runqget(_p_)
} else {
gp = runqsteal(_g_.m.p.ptr(), _p_)
}
if gp != nil {
- return gp
+ return gp, false
}
}
stop:
@@ -1344,7 +1348,7 @@ stop:
if trace.enabled {
traceGoUnpark(gp, 0)
}
- return gp
+ return gp, false
}
// return P and block
@@ -1356,7 +1360,7 @@ stop:
if sched.runqsize != 0 {
gp := globrunqget(_g_.m.p.ptr(), 0)
unlock(&sched.lock)
- return gp
+ return gp, false
}
_p_ := releasep()
pidleput(_p_)
@@ -1402,7 +1406,7 @@ stop:
if trace.enabled {
traceGoUnpark(gp, 0)
}
- return gp
+ return gp, false
}
injectglist(gp)
}
@@ -1468,7 +1472,7 @@ func schedule() {
if _g_.m.lockedg != nil {
stoplockedm()
- execute(_g_.m.lockedg) // Never returns.
+ execute(_g_.m.lockedg, false) // Never returns.
}
top:
@@ -1478,6 +1482,7 @@ top:
}
var gp *g
+ var inheritTime bool
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
@@ -1506,13 +1511,13 @@ top:
}
}
if gp == nil {
- gp = runqget(_g_.m.p.ptr())
+ gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
if gp == nil {
- gp = findrunnable() // blocks until work is available
+ gp, inheritTime = findrunnable() // blocks until work is available
resetspinning()
}
@@ -1523,7 +1528,7 @@ top:
goto top
}
- execute(gp)
+ execute(gp, inheritTime)
}
// dropg removes the association between m and the current goroutine m->curg (gp for short).
@@ -1568,7 +1573,7 @@ func park_m(gp *g) {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
- execute(gp) // Schedule it back, never returns.
+ execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
@@ -2007,12 +2012,12 @@ func exitsyscall0(gp *g) {
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
- execute(gp) // Never returns.
+ execute(gp, false) // Never returns.
}
if _g_.m.lockedg != nil {
// Wait until another thread schedules gp and so m again.
stoplockedm()
- execute(gp) // Never returns.
+ execute(gp, false) // Never returns.
}
stopm()
schedule() // Never returns.
@@ -2168,7 +2173,7 @@ func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr
if trace.enabled {
traceGoCreate(newg, newg.startpc)
}
- runqput(_p_, newg)
+ runqput(_p_, newg, true)
if atomicload(&sched.npidle) != 0 && atomicload(&sched.nmspinning) == 0 && unsafe.Pointer(fn.fn) != unsafe.Pointer(funcPC(main)) { // TODO: fast atomic
wakep()
@@ -2596,12 +2601,11 @@ func procresize(nprocs int32) *p {
p.runqtail--
gp := p.runq[p.runqtail%uint32(len(p.runq))]
// push onto head of global queue
- gp.schedlink = sched.runqhead
- sched.runqhead.set(gp)
- if sched.runqtail == 0 {
- sched.runqtail.set(gp)
- }
- sched.runqsize++
+ globrunqputhead(gp)
+ }
+ if p.runnext != 0 {
+ globrunqputhead(p.runnext.ptr())
+ p.runnext = 0
}
// if there's a background worker, make it runnable and put
// it on the global queue so it can clean itself up
@@ -3150,6 +3154,19 @@ func globrunqput(gp *g) {
sched.runqsize++
}
+// Put gp at the head of the global runnable queue.
+// Sched must be locked.
+// May run during STW, so write barriers are not allowed.
+//go:nowritebarrier
+func globrunqputhead(gp *g) {
+ gp.schedlink = sched.runqhead
+ sched.runqhead.set(gp)
+ if sched.runqtail == 0 {
+ sched.runqtail.set(gp)
+ }
+ sched.runqsize++
+}
+
// Put a batch of runnable goroutines on the global runnable queue.
// Sched must be locked.
func globrunqputbatch(ghead *g, gtail *g, n int32) {
@@ -3192,7 +3209,7 @@ func globrunqget(_p_ *p, max int32) *g {
for ; n > 0; n-- {
gp1 := sched.runqhead.ptr()
sched.runqhead = gp1.schedlink
- runqput(_p_, gp1)
+ runqput(_p_, gp1, false)
}
return gp
}
@@ -3223,13 +3240,28 @@ func pidleget() *p {
// runqempty returns true if _p_ has no Gs on its local run queue.
// Note that this test is generally racy.
func runqempty(_p_ *p) bool {
- return _p_.runqhead == _p_.runqtail
+ return _p_.runqhead == _p_.runqtail && _p_.runnext == 0
}
-// Try to put g on local runnable queue.
-// If it's full, put onto global queue.
+// runqput tries to put g on the local runnable queue.
+// If next if false, runqput adds g to the tail of the runnable queue.
+// If next is true, runqput puts g in the _p_.runnext slot.
+// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
-func runqput(_p_ *p, gp *g) {
+func runqput(_p_ *p, gp *g, next bool) {
+ if next {
+ retryNext:
+ oldnext := _p_.runnext
+ if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
+ goto retryNext
+ }
+ if oldnext == 0 {
+ return
+ }
+ // Kick the old runnext out to the regular run queue.
+ gp = oldnext.ptr()
+ }
+
retry:
h := atomicload(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
@@ -3277,17 +3309,30 @@ func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
}
// Get g from local runnable queue.
+// If inheritTime is true, gp should inherit the remaining time in the
+// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
-func runqget(_p_ *p) *g {
+func runqget(_p_ *p) (gp *g, inheritTime bool) {
+ // If there's a runnext, it's the next G to run.
+ for {
+ next := _p_.runnext
+ if next == 0 {
+ break
+ }
+ if _p_.runnext.cas(next, 0) {
+ return next.ptr(), true
+ }
+ }
+
for {
h := atomicload(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
if t == h {
- return nil
+ return nil, false
}
gp := _p_.runq[h%uint32(len(_p_.runq))]
if cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume
- return gp
+ return gp, false
}
}
}
@@ -3302,6 +3347,14 @@ func runqgrab(_p_ *p, batch []*g) uint32 {
n := t - h
n = n - n/2
if n == 0 {
+ // Try to steal from _p_.runnext.
+ if next := _p_.runnext; next != 0 {
+ if !_p_.runnext.cas(next, 0) {
+ continue
+ }
+ batch[0] = next.ptr()
+ return 1
+ }
return 0
}
if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
@@ -3347,19 +3400,19 @@ func testSchedLocalQueue() {
_p_ := new(p)
gs := make([]g, len(_p_.runq))
for i := 0; i < len(_p_.runq); i++ {
- if runqget(_p_) != nil {
+ if g, _ := runqget(_p_); g != nil {
throw("runq is not empty initially")
}
for j := 0; j < i; j++ {
- runqput(_p_, &gs[i])
+ runqput(_p_, &gs[i], false)
}
for j := 0; j < i; j++ {
- if runqget(_p_) != &gs[i] {
+ if g, _ := runqget(_p_); g != &gs[i] {
print("bad element at iter ", i, "/", j, "\n")
throw("bad element")
}
}
- if runqget(_p_) != nil {
+ if g, _ := runqget(_p_); g != nil {
throw("runq is not empty afterwards")
}
}
@@ -3372,7 +3425,7 @@ func testSchedLocalQueueSteal() {
for i := 0; i < len(p1.runq); i++ {
for j := 0; j < i; j++ {
gs[j].sig = 0
- runqput(p1, &gs[j])
+ runqput(p1, &gs[j], false)
}
gp := runqsteal(p2, p1)
s := 0
@@ -3381,7 +3434,7 @@ func testSchedLocalQueueSteal() {
gp.sig++
}
for {
- gp = runqget(p2)
+ gp, _ = runqget(p2)
if gp == nil {
break
}
@@ -3389,7 +3442,7 @@ func testSchedLocalQueueSteal() {
gp.sig++
}
for {
- gp = runqget(p1)
+ gp, _ = runqget(p1)
if gp == nil {
break
}