diff options
Diffstat (limited to 'src/runtime/mcleanup.go')
| -rw-r--r-- | src/runtime/mcleanup.go | 434 |
1 files changed, 432 insertions, 2 deletions
diff --git a/src/runtime/mcleanup.go b/src/runtime/mcleanup.go index d41a4971b5..f27758d9f2 100644 --- a/src/runtime/mcleanup.go +++ b/src/runtime/mcleanup.go @@ -6,6 +6,10 @@ package runtime import ( "internal/abi" + "internal/cpu" + "internal/goarch" + "internal/runtime/atomic" + "internal/runtime/sys" "unsafe" ) @@ -110,8 +114,10 @@ func AddCleanup[T, S any](ptr *T, cleanup func(S), arg S) Cleanup { panic("runtime.AddCleanup: ptr not in allocated block") } - // Ensure we have a finalizer processing goroutine running. - createfing() + // Create another G if necessary. + if gcCleanups.needG() { + gcCleanups.createGs() + } id := addCleanup(unsafe.Pointer(ptr), fv) return Cleanup{ @@ -191,3 +197,427 @@ func (c Cleanup) Stop() { mheap_.specialCleanupAlloc.free(unsafe.Pointer(found)) unlock(&mheap_.speciallock) } + +const cleanupBlockSize = 512 + +// cleanupBlock is an block of cleanups to be executed. +// +// cleanupBlock is allocated from non-GC'd memory, so any heap pointers +// must be specially handled. The GC and cleanup queue currently assume +// that the cleanup queue does not grow during marking (but it can shrink). +type cleanupBlock struct { + cleanupBlockHeader + cleanups [(cleanupBlockSize - unsafe.Sizeof(cleanupBlockHeader{})) / goarch.PtrSize]*funcval +} + +var cleanupBlockPtrMask [cleanupBlockSize / goarch.PtrSize / 8]byte + +type cleanupBlockHeader struct { + _ sys.NotInHeap + lfnode + alllink *cleanupBlock + + // n is sometimes accessed atomically. + // + // The invariant depends on what phase the garbage collector is in. + // During the sweep phase (gcphase == _GCoff), each block has exactly + // one owner, so it's always safe to update this without atomics. + // But if this *could* be updated during the mark phase, it must be + // updated atomically to synchronize with the garbage collector + // scanning the block as a root. + n uint32 +} + +// enqueue pushes a single cleanup function into the block. +// +// Returns if this enqueue call filled the block. This is odd, +// but we want to flush full blocks eagerly to get cleanups +// running as soon as possible. +// +// Must only be called if the GC is in the sweep phase (gcphase == _GCoff), +// because it does not synchronize with the garbage collector. +func (b *cleanupBlock) enqueue(fn *funcval) bool { + b.cleanups[b.n] = fn + b.n++ + return b.full() +} + +// full returns true if the cleanup block is full. +func (b *cleanupBlock) full() bool { + return b.n == uint32(len(b.cleanups)) +} + +// empty returns true if the cleanup block is empty. +func (b *cleanupBlock) empty() bool { + return b.n == 0 +} + +// take moves as many cleanups as possible from b into a. +func (a *cleanupBlock) take(b *cleanupBlock) { + dst := a.cleanups[a.n:] + if uint32(len(dst)) >= b.n { + // Take all. + copy(dst, b.cleanups[:]) + a.n += b.n + b.n = 0 + } else { + // Partial take. Copy from the tail to avoid having + // to move more memory around. + copy(dst, b.cleanups[b.n-uint32(len(dst)):b.n]) + a.n = uint32(len(a.cleanups)) + b.n -= uint32(len(dst)) + } +} + +// cleanupQueue is a queue of ready-to-run cleanup functions. +type cleanupQueue struct { + // Stack of full cleanup blocks. + full lfstack + _ [cpu.CacheLinePadSize - unsafe.Sizeof(lfstack(0))]byte + + // Stack of free cleanup blocks. + free lfstack + + // flushed indicates whether all local cleanupBlocks have been + // flushed, and we're in a period of time where this condition is + // stable (after the last sweeper, before the next sweep phase + // begins). + flushed atomic.Bool // Next to free because frequently accessed together. + + _ [cpu.CacheLinePadSize - unsafe.Sizeof(lfstack(0)) - 1]byte + + // Linked list of all cleanup blocks. + all atomic.UnsafePointer // *cleanupBlock + _ [cpu.CacheLinePadSize - unsafe.Sizeof(atomic.UnsafePointer{})]byte + + state cleanupSleep + _ [cpu.CacheLinePadSize - unsafe.Sizeof(cleanupSleep{})]byte + + // Goroutine block state. + // + // lock protects sleeping and writes to ng. It is also the lock + // used by cleanup goroutines to park atomically with updates to + // sleeping and ng. + lock mutex + sleeping gList + running atomic.Uint32 + ng atomic.Uint32 + needg atomic.Uint32 +} + +// cleanupSleep is an atomically-updatable cleanupSleepState. +type cleanupSleep struct { + u atomic.Uint64 // cleanupSleepState +} + +func (s *cleanupSleep) load() cleanupSleepState { + return cleanupSleepState(s.u.Load()) +} + +// awaken indicates that N cleanup goroutines should be awoken. +func (s *cleanupSleep) awaken(n int) { + s.u.Add(int64(n)) +} + +// sleep indicates that a cleanup goroutine is about to go to sleep. +func (s *cleanupSleep) sleep() { + s.u.Add(1 << 32) +} + +// take returns the number of goroutines to wake to handle +// the cleanup load, and also how many extra wake signals +// there were. The caller takes responsibility for waking +// up "wake" cleanup goroutines. +// +// The number of goroutines to wake is guaranteed to be +// bounded by the current sleeping goroutines, provided +// they call sleep before going to sleep, and all wakeups +// are preceded by a call to take. +func (s *cleanupSleep) take() (wake, extra uint32) { + for { + old := s.load() + if old == 0 { + return 0, 0 + } + if old.wakes() > old.asleep() { + wake = old.asleep() + extra = old.wakes() - old.asleep() + } else { + wake = old.wakes() + extra = 0 + } + new := cleanupSleepState(old.asleep()-wake) << 32 + if s.u.CompareAndSwap(uint64(old), uint64(new)) { + return + } + } +} + +// cleanupSleepState consists of two fields: the number of +// goroutines currently asleep (equivalent to len(q.sleeping)), and +// the number of times a wakeup signal has been sent. +// These two fields are packed together in a uint64, such +// that they may be updated atomically as part of cleanupSleep. +// The top 32 bits is the number of sleeping goroutines, +// and the bottom 32 bits is the number of wakeup signals. +type cleanupSleepState uint64 + +func (s cleanupSleepState) asleep() uint32 { + return uint32(s >> 32) +} + +func (s cleanupSleepState) wakes() uint32 { + return uint32(s) +} + +// enqueue queues a single cleanup for execution. +// +// Called by the sweeper, and only the sweeper. +func (q *cleanupQueue) enqueue(fn *funcval) { + mp := acquirem() + pp := mp.p.ptr() + b := pp.cleanups + if b == nil { + if q.flushed.Load() { + q.flushed.Store(false) + } + b = (*cleanupBlock)(q.free.pop()) + if b == nil { + b = (*cleanupBlock)(persistentalloc(cleanupBlockSize, tagAlign, &memstats.gcMiscSys)) + for { + next := (*cleanupBlock)(q.all.Load()) + b.alllink = next + if q.all.CompareAndSwap(unsafe.Pointer(next), unsafe.Pointer(b)) { + break + } + } + } + pp.cleanups = b + } + if full := b.enqueue(fn); full { + q.full.push(&b.lfnode) + pp.cleanups = nil + q.state.awaken(1) + } + releasem(mp) +} + +// dequeue pops a block of cleanups from the queue. Blocks until one is available +// and never returns nil. +func (q *cleanupQueue) dequeue() *cleanupBlock { + for { + b := (*cleanupBlock)(q.full.pop()) + if b != nil { + return b + } + lock(&q.lock) + q.sleeping.push(getg()) + q.state.sleep() + goparkunlock(&q.lock, waitReasonCleanupWait, traceBlockSystemGoroutine, 1) + } +} + +// tryDequeue is a non-blocking attempt to dequeue a block of cleanups. +// May return nil if there are no blocks to run. +func (q *cleanupQueue) tryDequeue() *cleanupBlock { + return (*cleanupBlock)(q.full.pop()) +} + +// flush pushes all active cleanup blocks to the full list and wakes up cleanup +// goroutines to handle them. +// +// Must only be called at a point when we can guarantee that no more cleanups +// are being queued, such as after the final sweeper for the cycle is done +// but before the next mark phase. +func (q *cleanupQueue) flush() { + mp := acquirem() + flushed := 0 + emptied := 0 + missing := 0 + + // Coalesce the partially-filled blocks to present a more accurate picture of demand. + // We use the number of coalesced blocks to process as a signal for demand to create + // new cleanup goroutines. + var cb *cleanupBlock + for _, pp := range allp { + b := pp.cleanups + if b == nil { + missing++ + continue + } + pp.cleanups = nil + if cb == nil { + cb = b + continue + } + // N.B. After take, either cb is full, b is empty, or both. + cb.take(b) + if cb.full() { + q.full.push(&cb.lfnode) + flushed++ + cb = b + b = nil + } + if b != nil && b.empty() { + q.free.push(&b.lfnode) + emptied++ + } + } + if cb != nil { + q.full.push(&cb.lfnode) + flushed++ + } + if flushed != 0 { + q.state.awaken(flushed) + } + if flushed+emptied+missing != len(allp) { + throw("failed to correctly flush all P-owned cleanup blocks") + } + q.flushed.Store(true) + releasem(mp) +} + +// needsWake returns true if cleanup goroutines need to be awoken or created to handle cleanup load. +func (q *cleanupQueue) needsWake() bool { + s := q.state.load() + return s.wakes() > 0 && (s.asleep() > 0 || q.ng.Load() < maxCleanupGs()) +} + +// wake wakes up one or more goroutines to process the cleanup queue. If there aren't +// enough sleeping goroutines to handle the demand, wake will arrange for new goroutines +// to be created. +func (q *cleanupQueue) wake() { + wake, extra := q.state.take() + if extra != 0 { + newg := min(extra, maxCleanupGs()-q.ng.Load()) + if newg > 0 { + q.needg.Add(int32(newg)) + } + } + if wake == 0 { + return + } + + // By calling 'take', we've taken ownership of waking 'wake' goroutines. + // Nobody else will wake up these goroutines, so they're guaranteed + // to be sitting on q.sleeping, waiting for us to wake them. + // + // Collect them and schedule them. + var list gList + lock(&q.lock) + for range wake { + list.push(q.sleeping.pop()) + } + unlock(&q.lock) + + injectglist(&list) + return +} + +func (q *cleanupQueue) needG() bool { + have := q.ng.Load() + if have >= maxCleanupGs() { + return false + } + if have == 0 { + // Make sure we have at least one. + return true + } + return q.needg.Load() > 0 +} + +func (q *cleanupQueue) createGs() { + lock(&q.lock) + have := q.ng.Load() + need := min(q.needg.Swap(0), maxCleanupGs()-have) + if have == 0 && need == 0 { + // Make sure we have at least one. + need = 1 + } + if need > 0 { + q.ng.Add(int32(need)) + } + unlock(&q.lock) + + for range need { + go runCleanups() + } +} + +func (q *cleanupQueue) beginRunningCleanups() { + // Update runningCleanups and running atomically with respect + // to goroutine profiles by disabling preemption. + mp := acquirem() + getg().runningCleanups.Store(true) + q.running.Add(1) + releasem(mp) +} + +func (q *cleanupQueue) endRunningCleanups() { + // Update runningCleanups and running atomically with respect + // to goroutine profiles by disabling preemption. + mp := acquirem() + getg().runningCleanups.Store(false) + q.running.Add(-1) + releasem(mp) +} + +func maxCleanupGs() uint32 { + // N.B. Left as a function to make changing the policy easier. + return uint32(max(gomaxprocs/4, 1)) +} + +// gcCleanups is the global cleanup queue. +var gcCleanups cleanupQueue + +// runCleanups is the entrypoint for all cleanup-running goroutines. +func runCleanups() { + for { + b := gcCleanups.dequeue() + if raceenabled { + racefingo() + } + + gcCleanups.beginRunningCleanups() + for i := 0; i < int(b.n); i++ { + fn := b.cleanups[i] + cleanup := *(*func())(unsafe.Pointer(&fn)) + cleanup() + b.cleanups[i] = nil + } + gcCleanups.endRunningCleanups() + + atomic.Store(&b.n, 0) // Synchronize with markroot. See comment in cleanupBlockHeader. + gcCleanups.free.push(&b.lfnode) + } +} + +// blockUntilEmpty blocks until either the cleanup queue is emptied +// and the cleanups have been executed, or the timeout is reached. +// Returns true if the cleanup queue was emptied. +// This is used by the sync and unique tests. +func (q *cleanupQueue) blockUntilEmpty(timeout int64) bool { + start := nanotime() + for nanotime()-start < timeout { + lock(&q.lock) + // The queue is empty when there's no work left to do *and* all the cleanup goroutines + // are asleep. If they're not asleep, they may be actively working on a block. + if q.flushed.Load() && q.full.empty() && uint32(q.sleeping.size) == q.ng.Load() { + unlock(&q.lock) + return true + } + unlock(&q.lock) + Gosched() + } + return false +} + +//go:linkname unique_runtime_blockUntilEmptyCleanupQueue unique.runtime_blockUntilEmptyCleanupQueue +func unique_runtime_blockUntilEmptyCleanupQueue(timeout int64) bool { + return gcCleanups.blockUntilEmpty(timeout) +} + +//go:linkname sync_test_runtime_blockUntilEmptyCleanupQueue sync_test.runtime_blockUntilEmptyCleanupQueue +func sync_test_runtime_blockUntilEmptyCleanupQueue(timeout int64) bool { + return gcCleanups.blockUntilEmpty(timeout) +} |
