diff options
| author | Michael Anthony Knyszek <mknyszek@google.com> | 2023-07-27 19:04:04 +0000 |
|---|---|---|
| committer | Gopher Robot <gobot@golang.org> | 2023-11-09 22:34:25 +0000 |
| commit | f119abb65dbe42f6cb40db698b54be3668357934 (patch) | |
| tree | 1cb07931c9e0f488724462616283a3d7ca3cc723 /src/runtime/proc.go | |
| parent | e3585c67576bc1b0b161448b617eb2725e9c9d69 (diff) | |
| download | go-f119abb65dbe42f6cb40db698b54be3668357934.tar.xz | |
runtime: refactor runtime->tracer API to appear more like a lock
Currently the execution tracer synchronizes with itself using very
heavyweight operations. As a result, it's totally fine for most of the
tracer code to look like:
if traceEnabled() {
traceXXX(...)
}
However, if we want to make that synchronization more lightweight (as
issue #60773 proposes), then this is insufficient. In particular, we
need to make sure the tracer can't observe an inconsistency between g
atomicstatus and the event that would be emitted for a particular
g transition. This means making the g status change appear to happen
atomically with the corresponding trace event being written out from the
perspective of the tracer.
This requires a change in API to something more like a lock. While we're
here, we might as well make sure that trace events can *only* be emitted
while this lock is held. This change introduces such an API:
traceAcquire, which returns a value that can emit events, and
traceRelease, which requires the value that was returned by
traceAcquire. In practice, this won't be a real lock, it'll be more like
a seqlock.
For the current tracer, this API is completely overkill and the value
returned by traceAcquire basically just checks trace.enabled. But it's
necessary for the tracer described in #60773 and we can implement that
more cleanly if we do this refactoring now instead of later.
For #60773.
Change-Id: Ibb9ff5958376339fafc2b5180aef65cf2ba18646
Reviewed-on: https://go-review.googlesource.com/c/go/+/515635
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Auto-Submit: Michael Knyszek <mknyszek@google.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
Diffstat (limited to 'src/runtime/proc.go')
| -rw-r--r-- | src/runtime/proc.go | 302 |
1 files changed, 202 insertions, 100 deletions
diff --git a/src/runtime/proc.go b/src/runtime/proc.go index 7ed3068063..ae2562a5b7 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -946,10 +946,6 @@ func fastrandinit() { // Mark gp ready to run. func ready(gp *g, traceskip int, next bool) { - if traceEnabled() { - traceGoUnpark(gp, traceskip) - } - status := readgstatus(gp) // Mark runnable. @@ -960,7 +956,12 @@ func ready(gp *g, traceskip int, next bool) { } // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq + trace := traceAcquire() casgstatus(gp, _Gwaiting, _Grunnable) + if trace.ok() { + trace.GoUnpark(gp, traceskip) + traceRelease(trace) + } runqput(mp.p.ptr(), gp, next) wakep() releasem(mp) @@ -1407,8 +1408,10 @@ var gcsema uint32 = 1 // Holding worldsema causes any other goroutines invoking // stopTheWorld to block. func stopTheWorldWithSema(reason stwReason) { - if traceEnabled() { - traceSTWStart(reason) + trace := traceAcquire() + if trace.ok() { + trace.STWStart(reason) + traceRelease(trace) } gp := getg() @@ -1426,17 +1429,22 @@ func stopTheWorldWithSema(reason stwReason) { gp.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic. sched.stopwait-- // try to retake all P's in Psyscall status + trace = traceAcquire() for _, pp := range allp { s := pp.status if s == _Psyscall && atomic.Cas(&pp.status, s, _Pgcstop) { - if traceEnabled() { - traceGoSysBlock(pp) - traceProcStop(pp) + if trace.ok() { + trace.GoSysBlock(pp) + trace.ProcStop(pp) } pp.syscalltick++ sched.stopwait-- } } + if trace.ok() { + traceRelease(trace) + } + // stop idle P's now := nanotime() for { @@ -1533,8 +1541,10 @@ func startTheWorldWithSema() int64 { // Capture start-the-world time before doing clean-up tasks. startTime := nanotime() - if traceEnabled() { - traceSTWDone() + trace := traceAcquire() + if trace.ok() { + trace.STWDone() + traceRelease(trace) } // Wakeup an additional proc in case we have excessive runnable goroutines @@ -1853,17 +1863,21 @@ func forEachP(fn func(*p)) { // Force Ps currently in _Psyscall into _Pidle and hand them // off to induce safe point function execution. + trace := traceAcquire() for _, p2 := range allp { s := p2.status if s == _Psyscall && p2.runSafePointFn == 1 && atomic.Cas(&p2.status, s, _Pidle) { - if traceEnabled() { - traceGoSysBlock(p2) - traceProcStop(p2) + if trace.ok() { + trace.GoSysBlock(p2) + trace.ProcStop(p2) } p2.syscalltick++ handoffp(p2) } } + if trace.ok() { + traceRelease(trace) + } // Wait for remaining Ps to run fn. if wait { @@ -2172,8 +2186,10 @@ func oneNewExtraM() { if raceenabled { gp.racectx = racegostart(abi.FuncPCABIInternal(newextram) + sys.PCQuantum) } - if traceEnabled() { - traceOneNewExtraM(gp) + trace := traceAcquire() + if trace.ok() { + trace.OneNewExtraM(gp) + traceRelease(trace) } // put on allg for garbage collector allgadd(gp) @@ -2921,13 +2937,15 @@ func execute(gp *g, inheritTime bool) { setThreadCPUProfiler(hz) } - if traceEnabled() { + trace := traceAcquire() + if trace.ok() { // GoSysExit has to happen when we have a P, but before GoStart. // So we emit it here. if gp.syscallsp != 0 { - traceGoSysExit() + trace.GoSysExit() } - traceGoStart() + trace.GoStart() + traceRelease(trace) } gogo(&gp.sched) @@ -2964,8 +2982,12 @@ top: if traceEnabled() || traceShuttingDown() { gp := traceReader() if gp != nil { + trace := traceAcquire() casgstatus(gp, _Gwaiting, _Grunnable) - traceGoUnpark(gp, 0) + if trace.ok() { + trace.GoUnpark(gp, 0) + traceRelease(trace) + } return gp, false, true } } @@ -3028,9 +3050,11 @@ top: gp := list.pop() injectglist(&list) netpollAdjustWaiters(delta) + trace := traceAcquire() casgstatus(gp, _Gwaiting, _Grunnable) - if traceEnabled() { - traceGoUnpark(gp, 0) + if trace.ok() { + trace.GoUnpark(gp, 0) + traceRelease(trace) } return gp, false, false } @@ -3073,9 +3097,12 @@ top: if node != nil { pp.gcMarkWorkerMode = gcMarkWorkerIdleMode gp := node.gp.ptr() + + trace := traceAcquire() casgstatus(gp, _Gwaiting, _Grunnable) - if traceEnabled() { - traceGoUnpark(gp, 0) + if trace.ok() { + trace.GoUnpark(gp, 0) + traceRelease(trace) } return gp, false, false } @@ -3088,9 +3115,11 @@ top: // until a callback was triggered. gp, otherReady := beforeIdle(now, pollUntil) if gp != nil { + trace := traceAcquire() casgstatus(gp, _Gwaiting, _Grunnable) - if traceEnabled() { - traceGoUnpark(gp, 0) + if trace.ok() { + trace.GoUnpark(gp, 0) + traceRelease(trace) } return gp, false, false } @@ -3216,9 +3245,11 @@ top: // Run the idle worker. pp.gcMarkWorkerMode = gcMarkWorkerIdleMode + trace := traceAcquire() casgstatus(gp, _Gwaiting, _Grunnable) - if traceEnabled() { - traceGoUnpark(gp, 0) + if trace.ok() { + trace.GoUnpark(gp, 0) + traceRelease(trace) } return gp, false, false } @@ -3278,9 +3309,11 @@ top: gp := list.pop() injectglist(&list) netpollAdjustWaiters(delta) + trace := traceAcquire() casgstatus(gp, _Gwaiting, _Grunnable) - if traceEnabled() { - traceGoUnpark(gp, 0) + if trace.ok() { + trace.GoUnpark(gp, 0) + traceRelease(trace) } return gp, false, false } @@ -3548,10 +3581,12 @@ func injectglist(glist *gList) { if glist.empty() { return } - if traceEnabled() { + trace := traceAcquire() + if trace.ok() { for gp := glist.head.ptr(); gp != nil; gp = gp.schedlink.ptr() { - traceGoUnpark(gp, 0) + trace.GoUnpark(gp, 0) } + traceRelease(trace) } // Mark all the goroutines as runnable before we put them @@ -3791,13 +3826,16 @@ func parkunlock_c(gp *g, lock unsafe.Pointer) bool { func park_m(gp *g) { mp := getg().m - if traceEnabled() { - traceGoPark(mp.waitTraceBlockReason, mp.waitTraceSkip) - } + trace := traceAcquire() // N.B. Not using casGToWaiting here because the waitreason is // set by park_m's caller. casgstatus(gp, _Grunning, _Gwaiting) + if trace.ok() { + trace.GoPark(mp.waitTraceBlockReason, mp.waitTraceSkip) + traceRelease(trace) + } + dropg() if fn := mp.waitunlockf; fn != nil { @@ -3805,23 +3843,35 @@ func park_m(gp *g) { mp.waitunlockf = nil mp.waitlock = nil if !ok { - if traceEnabled() { - traceGoUnpark(gp, 2) - } + trace := traceAcquire() casgstatus(gp, _Gwaiting, _Grunnable) + if trace.ok() { + trace.GoUnpark(gp, 2) + traceRelease(trace) + } execute(gp, true) // Schedule it back, never returns. } } schedule() } -func goschedImpl(gp *g) { +func goschedImpl(gp *g, preempted bool) { + trace := traceAcquire() status := readgstatus(gp) if status&^_Gscan != _Grunning { dumpgstatus(gp) throw("bad g status") } casgstatus(gp, _Grunning, _Grunnable) + if trace.ok() { + if preempted { + trace.GoPreempt() + } else { + trace.GoSched() + } + traceRelease(trace) + } + dropg() lock(&sched.lock) globrunqput(gp) @@ -3836,39 +3886,25 @@ func goschedImpl(gp *g) { // Gosched continuation on g0. func gosched_m(gp *g) { - if traceEnabled() { - traceGoSched() - } - goschedImpl(gp) + goschedImpl(gp, false) } // goschedguarded is a forbidden-states-avoided version of gosched_m. func goschedguarded_m(gp *g) { - if !canPreemptM(gp.m) { gogo(&gp.sched) // never return } - - if traceEnabled() { - traceGoSched() - } - goschedImpl(gp) + goschedImpl(gp, false) } func gopreempt_m(gp *g) { - if traceEnabled() { - traceGoPreempt() - } - goschedImpl(gp) + goschedImpl(gp, true) } // preemptPark parks gp and puts it in _Gpreempted. // //go:systemstack func preemptPark(gp *g) { - if traceEnabled() { - traceGoPark(traceBlockPreempted, 0) - } status := readgstatus(gp) if status&^_Gscan != _Grunning { dumpgstatus(gp) @@ -3897,7 +3933,30 @@ func preemptPark(gp *g) { // transitions until we can dropg. casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted) dropg() + + // Be careful about how we trace this next event. The ordering + // is subtle. + // + // The moment we CAS into _Gpreempted, suspendG could CAS to + // _Gwaiting, do its work, and ready the goroutine. All of + // this could happen before we even get the chance to emit + // an event. The end result is that the events could appear + // out of order, and the tracer generally assumes the scheduler + // takes care of the ordering between GoPark and GoUnpark. + // + // The answer here is simple: emit the event while we still hold + // the _Gscan bit on the goroutine. We still need to traceAcquire + // and traceRelease across the CAS because the tracer could be + // what's calling suspendG in the first place, and we want the + // CAS and event emission to appear atomic to the tracer. + trace := traceAcquire() + if trace.ok() { + trace.GoPark(traceBlockPreempted, 0) + } casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted) + if trace.ok() { + traceRelease(trace) + } schedule() } @@ -3910,11 +3969,13 @@ func goyield() { } func goyield_m(gp *g) { - if traceEnabled() { - traceGoPreempt() - } + trace := traceAcquire() pp := gp.m.p.ptr() casgstatus(gp, _Grunning, _Grunnable) + if trace.ok() { + trace.GoPreempt() + traceRelease(trace) + } dropg() runqput(pp, gp, false) schedule() @@ -3925,8 +3986,10 @@ func goexit1() { if raceenabled { racegoend() } - if traceEnabled() { - traceGoEnd() + trace := traceAcquire() + if trace.ok() { + trace.GoEnd() + traceRelease(trace) } mcall(goexit0) } @@ -4065,6 +4128,7 @@ func save(pc, sp uintptr) { // //go:nosplit func reentersyscall(pc, sp uintptr) { + trace := traceAcquire() gp := getg() // Disable preemption because during this function g is in Gsyscall status, @@ -4095,8 +4159,11 @@ func reentersyscall(pc, sp uintptr) { }) } - if traceEnabled() { - systemstack(traceGoSysCall) + if trace.ok() { + systemstack(func() { + trace.GoSysCall() + traceRelease(trace) + }) // systemstack itself clobbers g.sched.{pc,sp} and we might // need them later when the G is genuinely blocked in a // syscall @@ -4153,9 +4220,11 @@ func entersyscall_gcwait() { lock(&sched.lock) if sched.stopwait > 0 && atomic.Cas(&pp.status, _Psyscall, _Pgcstop) { - if traceEnabled() { - traceGoSysBlock(pp) - traceProcStop(pp) + trace := traceAcquire() + if trace.ok() { + trace.GoSysBlock(pp) + trace.ProcStop(pp) + traceRelease(trace) } pp.syscalltick++ if sched.stopwait--; sched.stopwait == 0 { @@ -4209,9 +4278,11 @@ func entersyscallblock() { } func entersyscallblock_handoff() { - if traceEnabled() { - traceGoSysCall() - traceGoSysBlock(getg().m.p.ptr()) + trace := traceAcquire() + if trace.ok() { + trace.GoSysCall() + trace.GoSysBlock(getg().m.p.ptr()) + traceRelease(trace) } handoffp(releasep()) } @@ -4250,15 +4321,21 @@ func exitsyscall() { tryRecordGoroutineProfileWB(gp) }) } - if traceEnabled() { + trace := traceAcquire() + if trace.ok() { if oldp != gp.m.p.ptr() || gp.m.syscalltick != gp.m.p.ptr().syscalltick { - systemstack(traceGoStart) + systemstack(func() { + trace.GoStart() + }) } } // There's a cpu for us, so we can run. gp.m.p.ptr().syscalltick++ // We need to cas the status and scan before resuming... casgstatus(gp, _Gsyscall, _Grunning) + if trace.ok() { + traceRelease(trace) + } // Garbage collector isn't running (since we are), // so okay to clear syscallsp. @@ -4281,7 +4358,8 @@ func exitsyscall() { return } - if traceEnabled() { + trace := traceAcquire() + if trace.ok() { // Wait till traceGoSysBlock event is emitted. // This ensures consistency of the trace (the goroutine is started after it is blocked). for oldp != nil && oldp.syscalltick == gp.m.syscalltick { @@ -4292,6 +4370,7 @@ func exitsyscall() { // So instead we remember the syscall exit time and emit the event // in execute when we have a P. gp.trace.sysExitTime = traceClockNow() + traceRelease(trace) } gp.m.locks-- @@ -4332,15 +4411,19 @@ func exitsyscallfast(oldp *p) bool { var ok bool systemstack(func() { ok = exitsyscallfast_pidle() - if ok && traceEnabled() { - if oldp != nil { - // Wait till traceGoSysBlock event is emitted. - // This ensures consistency of the trace (the goroutine is started after it is blocked). - for oldp.syscalltick == gp.m.syscalltick { - osyield() + if ok { + trace := traceAcquire() + if trace.ok() { + if oldp != nil { + // Wait till traceGoSysBlock event is emitted. + // This ensures consistency of the trace (the goroutine is started after it is blocked). + for oldp.syscalltick == gp.m.syscalltick { + osyield() + } } + trace.GoSysExit() + traceRelease(trace) } - traceGoSysExit() } }) if ok { @@ -4358,15 +4441,17 @@ func exitsyscallfast(oldp *p) bool { func exitsyscallfast_reacquired() { gp := getg() if gp.m.syscalltick != gp.m.p.ptr().syscalltick { - if traceEnabled() { + trace := traceAcquire() + if trace.ok() { // The p was retaken and then enter into syscall again (since gp.m.syscalltick has changed). // traceGoSysBlock for this syscall was already emitted, // but here we effectively retake the p from the new syscall running on the same p. systemstack(func() { // Denote blocking of the new syscall. - traceGoSysBlock(gp.m.p.ptr()) + trace.GoSysBlock(gp.m.p.ptr()) // Denote completion of the current syscall. - traceGoSysExit() + trace.GoSysExit() + traceRelease(trace) }) } gp.m.p.ptr().syscalltick++ @@ -4631,9 +4716,11 @@ func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g { if newg.trackingSeq%gTrackingPeriod == 0 { newg.tracking = true } - casgstatus(newg, _Gdead, _Grunnable) gcController.addScannableStack(pp, int64(newg.stack.hi-newg.stack.lo)) + // Get a goid and switch to runnable. Make all this atomic to the tracer. + trace := traceAcquire() + casgstatus(newg, _Gdead, _Grunnable) if pp.goidcache == pp.goidcacheend { // Sched.goidgen is the last allocated id, // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch]. @@ -4644,6 +4731,12 @@ func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g { } newg.goid = pp.goidcache pp.goidcache++ + if trace.ok() { + trace.GoCreate(newg, newg.startpc) + traceRelease(trace) + } + + // Set up race context. if raceenabled { newg.racectx = racegostart(callerpc) newg.raceignore = 0 @@ -4653,9 +4746,6 @@ func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g { racereleasemergeg(newg, unsafe.Pointer(&labelSync)) } } - if traceEnabled() { - traceGoCreate(newg, newg.startpc) - } releasem(mp) return newg @@ -5264,8 +5354,10 @@ func procresize(nprocs int32) *p { if old < 0 || nprocs <= 0 { throw("procresize: invalid arg") } - if traceEnabled() { - traceGomaxprocs(nprocs) + trace := traceAcquire() + if trace.ok() { + trace.Gomaxprocs(nprocs) + traceRelease(trace) } // update statistics @@ -5330,12 +5422,14 @@ func procresize(nprocs int32) *p { // because p.destroy itself has write barriers, so we // need to do that from a valid P. if gp.m.p != 0 { - if traceEnabled() { + trace := traceAcquire() + if trace.ok() { // Pretend that we were descheduled // and then scheduled again to keep // the trace sane. - traceGoSched() - traceProcStop(gp.m.p.ptr()) + trace.GoSched() + trace.ProcStop(gp.m.p.ptr()) + traceRelease(trace) } gp.m.p.ptr().m = 0 } @@ -5344,8 +5438,10 @@ func procresize(nprocs int32) *p { pp.m = 0 pp.status = _Pidle acquirep(pp) - if traceEnabled() { - traceGoStart() + trace := traceAcquire() + if trace.ok() { + trace.GoStart() + traceRelease(trace) } } @@ -5409,8 +5505,10 @@ func acquirep(pp *p) { // from a potentially stale mcache. pp.mcache.prepareForSweep() - if traceEnabled() { - traceProcStart() + trace := traceAcquire() + if trace.ok() { + trace.ProcStart() + traceRelease(trace) } } @@ -5451,8 +5549,10 @@ func releasep() *p { print("releasep: m=", gp.m, " m->p=", gp.m.p.ptr(), " p->m=", hex(pp.m), " p->status=", pp.status, "\n") throw("releasep: invalid p state") } - if traceEnabled() { - traceProcStop(gp.m.p.ptr()) + trace := traceAcquire() + if trace.ok() { + trace.ProcStop(gp.m.p.ptr()) + traceRelease(trace) } gp.m.p = 0 pp.m = 0 @@ -5799,9 +5899,11 @@ func retake(now int64) uint32 { // increment nmidle and report deadlock. incidlelocked(-1) if atomic.Cas(&pp.status, s, _Pidle) { - if traceEnabled() { - traceGoSysBlock(pp) - traceProcStop(pp) + trace := traceAcquire() + if trace.ok() { + trace.GoSysBlock(pp) + trace.ProcStop(pp) + traceRelease(trace) } n++ pp.syscalltick++ |
