diff options
| author | Austin Clements <austin@google.com> | 2019-03-01 13:16:37 -0500 |
|---|---|---|
| committer | Austin Clements <austin@google.com> | 2019-04-05 18:49:03 +0000 |
| commit | 2b605670020fc637e20a609b1dc86c1f0e7afdd1 (patch) | |
| tree | fa5ecc1c95a04fe0c1009fd6d1702aec631df295 /src/sync/poolqueue.go | |
| parent | e47ced78578c471cbcd34a7d6b223a71e84a46c8 (diff) | |
| download | go-2b605670020fc637e20a609b1dc86c1f0e7afdd1.tar.xz | |
sync: internal fixed size lock-free queue for sync.Pool
This is the first step toward fixing multiple issues with sync.Pool.
This adds a fixed size, lock-free, single-producer, multi-consumer
queue that will be used in the new Pool stealing implementation.
For #22950, #22331.
Change-Id: I50e85e3cb83a2ee71f611ada88e7f55996504bb5
Reviewed-on: https://go-review.googlesource.com/c/go/+/166957
Run-TryBot: Austin Clements <austin@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: David Chase <drchase@google.com>
Diffstat (limited to 'src/sync/poolqueue.go')
| -rw-r--r-- | src/sync/poolqueue.go | 185 |
1 files changed, 185 insertions, 0 deletions
diff --git a/src/sync/poolqueue.go b/src/sync/poolqueue.go new file mode 100644 index 0000000000..bc2ab647ff --- /dev/null +++ b/src/sync/poolqueue.go @@ -0,0 +1,185 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "sync/atomic" + "unsafe" +) + +// poolDequeue is a lock-free fixed-size single-producer, +// multi-consumer queue. The single producer can both push and pop +// from the head, and consumers can pop from the tail. +// +// It has the added feature that it nils out unused slots to avoid +// unnecessary retention of objects. This is important for sync.Pool, +// but not typically a property considered in the literature. +type poolDequeue struct { + // headTail packs together a 32-bit head index and a 32-bit + // tail index. Both are indexes into vals modulo len(vals)-1. + // + // tail = index of oldest data in queue + // head = index of next slot to fill + // + // Slots in the range [tail, head) are owned by consumers. + // A consumer continues to own a slot outside this range until + // it nils the slot, at which point ownership passes to the + // producer. + // + // The head index is stored in the most-significant bits so + // that we can atomically add to it and the overflow is + // harmless. + headTail uint64 + + // vals is a ring buffer of interface{} values stored in this + // dequeue. The size of this must be a power of 2. + // + // vals[i].typ is nil if the slot is empty and non-nil + // otherwise. A slot is still in use until *both* the tail + // index has moved beyond it and typ has been set to nil. This + // is set to nil atomically by the consumer and read + // atomically by the producer. + vals []eface +} + +type eface struct { + typ, val unsafe.Pointer +} + +const dequeueBits = 32 + +// dequeueLimit is the maximum size of a poolDequeue. +// +// This is half of 1<<dequeueBits because detecting fullness depends +// on wrapping around the ring buffer without wrapping around the +// index. +const dequeueLimit = (1 << dequeueBits) / 2 + +// dequeueNil is used in poolDeqeue to represent interface{}(nil). +// Since we use nil to represent empty slots, we need a sentinel value +// to represent nil. +type dequeueNil *struct{} + +func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) { + const mask = 1<<dequeueBits - 1 + head = uint32((ptrs >> dequeueBits) & mask) + tail = uint32(ptrs & mask) + return +} + +func (d *poolDequeue) pack(head, tail uint32) uint64 { + const mask = 1<<dequeueBits - 1 + return (uint64(head) << dequeueBits) | + uint64(tail&mask) +} + +// pushHead adds val at the head of the queue. It returns false if the +// queue is full. It must only be called by a single producer. +func (d *poolDequeue) pushHead(val interface{}) bool { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { + // Queue is full. + return false + } + slot := &d.vals[head&uint32(len(d.vals)-1)] + + // Check if the head slot has been released by popTail. + typ := atomic.LoadPointer(&slot.typ) + if typ != nil { + // Another goroutine is still cleaning up the tail, so + // the queue is actually still full. + return false + } + + // The head slot is free, so we own it. + if val == nil { + val = dequeueNil(nil) + } + *(*interface{})(unsafe.Pointer(slot)) = val + + // Increment head. This passes ownership of slot to popTail + // and acts as a store barrier for writing the slot. + atomic.AddUint64(&d.headTail, 1<<dequeueBits) + return true +} + +// popHead removes and returns the element at the head of the queue. +// It returns false if the queue is empty. It must only be called by a +// single producer. +func (d *poolDequeue) popHead() (interface{}, bool) { + var slot *eface + for { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if tail == head { + // Queue is empty. + return nil, false + } + + // Confirm tail and decrement head. We do this before + // reading the value to take back ownership of this + // slot. + head-- + ptrs2 := d.pack(head, tail) + if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { + // We successfully took back slot. + slot = &d.vals[head&uint32(len(d.vals)-1)] + break + } + } + + val := *(*interface{})(unsafe.Pointer(slot)) + if val == dequeueNil(nil) { + val = nil + } + // Zero the slot. Unlike popTail, this isn't racing with + // pushHead, so we don't need to be careful here. + *slot = eface{} + return val, true +} + +// popTail removes and returns the element at the tail of the queue. +// It returns false if the queue is empty. It may be called by any +// number of consumers. +func (d *poolDequeue) popTail() (interface{}, bool) { + var slot *eface + for { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if tail == head { + // Queue is empty. + return nil, false + } + + // Confirm head and tail (for our speculative check + // above) and increment tail. If this succeeds, then + // we own the slot at tail. + ptrs2 := d.pack(head, tail+1) + if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { + // Success. + slot = &d.vals[tail&uint32(len(d.vals)-1)] + break + } + } + + // We now own slot. + val := *(*interface{})(unsafe.Pointer(slot)) + if val == dequeueNil(nil) { + val = nil + } + + // Tell pushHead that we're done with this slot. Zeroing the + // slot is also important so we don't leave behind references + // that could keep this object live longer than necessary. + // + // We write to val first and then publish that we're done with + // this slot by atomically writing to typ. + slot.val = nil + atomic.StorePointer(&slot.typ, nil) + // At this point pushHead owns the slot. + + return val, true +} |
