summaryrefslogtreecommitdiffstats
path: root/src/sync/poolqueue.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/sync/poolqueue.go')
-rw-r--r--src/sync/poolqueue.go309
1 files changed, 309 insertions, 0 deletions
diff --git a/src/sync/poolqueue.go b/src/sync/poolqueue.go
new file mode 100644
index 0000000..9be83e9
--- /dev/null
+++ b/src/sync/poolqueue.go
@@ -0,0 +1,309 @@
+// Copyright 2019 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sync
+
+import (
+ "sync/atomic"
+ "unsafe"
+)
+
+// poolDequeue is a lock-free fixed-size single-producer,
+// multi-consumer queue. The single producer can both push and pop
+// from the head, and consumers can pop from the tail.
+//
+// It has the added feature that it nils out unused slots to avoid
+// unnecessary retention of objects. This is important for sync.Pool,
+// but not typically a property considered in the literature.
+type poolDequeue struct {
+ // headTail packs together a 32-bit head index and a 32-bit
+ // tail index. Both are indexes into vals modulo len(vals)-1.
+ //
+ // tail = index of oldest data in queue
+ // head = index of next slot to fill
+ //
+ // Slots in the range [tail, head) are owned by consumers.
+ // A consumer continues to own a slot outside this range until
+ // it nils the slot, at which point ownership passes to the
+ // producer.
+ //
+ // The head index is stored in the most-significant bits so
+ // that we can atomically add to it and the overflow is
+ // harmless.
+ headTail uint64
+
+ // vals is a ring buffer of interface{} values stored in this
+ // dequeue. The size of this must be a power of 2.
+ //
+ // vals[i].typ is nil if the slot is empty and non-nil
+ // otherwise. A slot is still in use until *both* the tail
+ // index has moved beyond it and typ has been set to nil. This
+ // is set to nil atomically by the consumer and read
+ // atomically by the producer.
+ vals []eface
+}
+
+type eface struct {
+ typ, val unsafe.Pointer
+}
+
+const dequeueBits = 32
+
+// dequeueLimit is the maximum size of a poolDequeue.
+//
+// This must be at most (1<<dequeueBits)/2 because detecting fullness
+// depends on wrapping around the ring buffer without wrapping around
+// the index. We divide by 4 so this fits in an int on 32-bit.
+const dequeueLimit = (1 << dequeueBits) / 4
+
+// dequeueNil is used in poolDequeue to represent interface{}(nil).
+// Since we use nil to represent empty slots, we need a sentinel value
+// to represent nil.
+type dequeueNil *struct{}
+
+func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
+ const mask = 1<<dequeueBits - 1
+ head = uint32((ptrs >> dequeueBits) & mask)
+ tail = uint32(ptrs & mask)
+ return
+}
+
+func (d *poolDequeue) pack(head, tail uint32) uint64 {
+ const mask = 1<<dequeueBits - 1
+ return (uint64(head) << dequeueBits) |
+ uint64(tail&mask)
+}
+
+// pushHead adds val at the head of the queue. It returns false if the
+// queue is full. It must only be called by a single producer.
+func (d *poolDequeue) pushHead(val interface{}) bool {
+ ptrs := atomic.LoadUint64(&d.headTail)
+ head, tail := d.unpack(ptrs)
+ if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
+ // Queue is full.
+ return false
+ }
+ slot := &d.vals[head&uint32(len(d.vals)-1)]
+
+ // Check if the head slot has been released by popTail.
+ typ := atomic.LoadPointer(&slot.typ)
+ if typ != nil {
+ // Another goroutine is still cleaning up the tail, so
+ // the queue is actually still full.
+ return false
+ }
+
+ // The head slot is free, so we own it.
+ if val == nil {
+ val = dequeueNil(nil)
+ }
+ *(*interface{})(unsafe.Pointer(slot)) = val
+
+ // Increment head. This passes ownership of slot to popTail
+ // and acts as a store barrier for writing the slot.
+ atomic.AddUint64(&d.headTail, 1<<dequeueBits)
+ return true
+}
+
+// popHead removes and returns the element at the head of the queue.
+// It returns false if the queue is empty. It must only be called by a
+// single producer.
+func (d *poolDequeue) popHead() (interface{}, bool) {
+ var slot *eface
+ for {
+ ptrs := atomic.LoadUint64(&d.headTail)
+ head, tail := d.unpack(ptrs)
+ if tail == head {
+ // Queue is empty.
+ return nil, false
+ }
+
+ // Confirm tail and decrement head. We do this before
+ // reading the value to take back ownership of this
+ // slot.
+ head--
+ ptrs2 := d.pack(head, tail)
+ if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
+ // We successfully took back slot.
+ slot = &d.vals[head&uint32(len(d.vals)-1)]
+ break
+ }
+ }
+
+ val := *(*interface{})(unsafe.Pointer(slot))
+ if val == dequeueNil(nil) {
+ val = nil
+ }
+ // Zero the slot. Unlike popTail, this isn't racing with
+ // pushHead, so we don't need to be careful here.
+ *slot = eface{}
+ return val, true
+}
+
+// popTail removes and returns the element at the tail of the queue.
+// It returns false if the queue is empty. It may be called by any
+// number of consumers.
+func (d *poolDequeue) popTail() (interface{}, bool) {
+ var slot *eface
+ for {
+ ptrs := atomic.LoadUint64(&d.headTail)
+ head, tail := d.unpack(ptrs)
+ if tail == head {
+ // Queue is empty.
+ return nil, false
+ }
+
+ // Confirm head and tail (for our speculative check
+ // above) and increment tail. If this succeeds, then
+ // we own the slot at tail.
+ ptrs2 := d.pack(head, tail+1)
+ if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
+ // Success.
+ slot = &d.vals[tail&uint32(len(d.vals)-1)]
+ break
+ }
+ }
+
+ // We now own slot.
+ val := *(*interface{})(unsafe.Pointer(slot))
+ if val == dequeueNil(nil) {
+ val = nil
+ }
+
+ // Tell pushHead that we're done with this slot. Zeroing the
+ // slot is also important so we don't leave behind references
+ // that could keep this object live longer than necessary.
+ //
+ // We write to val first and then publish that we're done with
+ // this slot by atomically writing to typ.
+ slot.val = nil
+ atomic.StorePointer(&slot.typ, nil)
+ // At this point pushHead owns the slot.
+
+ return val, true
+}
+
+// poolChain is a dynamically-sized version of poolDequeue.
+//
+// This is implemented as a doubly-linked list queue of poolDequeues
+// where each dequeue is double the size of the previous one. Once a
+// dequeue fills up, this allocates a new one and only ever pushes to
+// the latest dequeue. Pops happen from the other end of the list and
+// once a dequeue is exhausted, it gets removed from the list.
+type poolChain struct {
+ // head is the poolDequeue to push to. This is only accessed
+ // by the producer, so doesn't need to be synchronized.
+ head *poolChainElt
+
+ // tail is the poolDequeue to popTail from. This is accessed
+ // by consumers, so reads and writes must be atomic.
+ tail *poolChainElt
+}
+
+type poolChainElt struct {
+ poolDequeue
+
+ // next and prev link to the adjacent poolChainElts in this
+ // poolChain.
+ //
+ // next is written atomically by the producer and read
+ // atomically by the consumer. It only transitions from nil to
+ // non-nil.
+ //
+ // prev is written atomically by the consumer and read
+ // atomically by the producer. It only transitions from
+ // non-nil to nil.
+ next, prev *poolChainElt
+}
+
+func storePoolChainElt(pp **poolChainElt, v *poolChainElt) {
+ atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
+}
+
+func loadPoolChainElt(pp **poolChainElt) *poolChainElt {
+ return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
+}
+
+func (c *poolChain) pushHead(val interface{}) {
+ d := c.head
+ if d == nil {
+ // Initialize the chain.
+ const initSize = 8 // Must be a power of 2
+ d = new(poolChainElt)
+ d.vals = make([]eface, initSize)
+ c.head = d
+ storePoolChainElt(&c.tail, d)
+ }
+
+ if d.pushHead(val) {
+ return
+ }
+
+ // The current dequeue is full. Allocate a new one of twice
+ // the size.
+ newSize := len(d.vals) * 2
+ if newSize >= dequeueLimit {
+ // Can't make it any bigger.
+ newSize = dequeueLimit
+ }
+
+ d2 := &poolChainElt{prev: d}
+ d2.vals = make([]eface, newSize)
+ c.head = d2
+ storePoolChainElt(&d.next, d2)
+ d2.pushHead(val)
+}
+
+func (c *poolChain) popHead() (interface{}, bool) {
+ d := c.head
+ for d != nil {
+ if val, ok := d.popHead(); ok {
+ return val, ok
+ }
+ // There may still be unconsumed elements in the
+ // previous dequeue, so try backing up.
+ d = loadPoolChainElt(&d.prev)
+ }
+ return nil, false
+}
+
+func (c *poolChain) popTail() (interface{}, bool) {
+ d := loadPoolChainElt(&c.tail)
+ if d == nil {
+ return nil, false
+ }
+
+ for {
+ // It's important that we load the next pointer
+ // *before* popping the tail. In general, d may be
+ // transiently empty, but if next is non-nil before
+ // the pop and the pop fails, then d is permanently
+ // empty, which is the only condition under which it's
+ // safe to drop d from the chain.
+ d2 := loadPoolChainElt(&d.next)
+
+ if val, ok := d.popTail(); ok {
+ return val, ok
+ }
+
+ if d2 == nil {
+ // This is the only dequeue. It's empty right
+ // now, but could be pushed to in the future.
+ return nil, false
+ }
+
+ // The tail of the chain has been drained, so move on
+ // to the next dequeue. Try to drop it from the chain
+ // so the next pop doesn't have to look at the empty
+ // dequeue again.
+ if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
+ // We won the race. Clear the prev pointer so
+ // the garbage collector can collect the empty
+ // dequeue and so popHead doesn't back up
+ // further than necessary.
+ storePoolChainElt(&d2.prev, nil)
+ }
+ d = d2
+ }
+}