summaryrefslogtreecommitdiffstats
path: root/vendor/crossbeam-deque/src/deque.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/crossbeam-deque/src/deque.rs')
-rw-r--r--vendor/crossbeam-deque/src/deque.rs76
1 files changed, 37 insertions, 39 deletions
diff --git a/vendor/crossbeam-deque/src/deque.rs b/vendor/crossbeam-deque/src/deque.rs
index 802a2fef5..bda3bf820 100644
--- a/vendor/crossbeam-deque/src/deque.rs
+++ b/vendor/crossbeam-deque/src/deque.rs
@@ -3,7 +3,7 @@ use std::cmp;
use std::fmt;
use std::iter::FromIterator;
use std::marker::PhantomData;
-use std::mem::{self, MaybeUninit};
+use std::mem::{self, ManuallyDrop, MaybeUninit};
use std::ptr;
use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
use std::sync::Arc;
@@ -38,9 +38,8 @@ impl<T> Buffer<T> {
fn alloc(cap: usize) -> Buffer<T> {
debug_assert_eq!(cap, cap.next_power_of_two());
- let mut v = Vec::with_capacity(cap);
+ let mut v = ManuallyDrop::new(Vec::with_capacity(cap));
let ptr = v.as_mut_ptr();
- mem::forget(v);
Buffer { ptr, cap }
}
@@ -53,6 +52,8 @@ impl<T> Buffer<T> {
/// Returns a pointer to the task at the specified `index`.
unsafe fn at(&self, index: isize) -> *mut T {
// `self.cap` is always a power of two.
+ // We do all the loads at `MaybeUninit` because we might realize, after loading, that we
+ // don't actually have the right to access this memory.
self.ptr.offset(index & (self.cap - 1) as isize)
}
@@ -62,8 +63,8 @@ impl<T> Buffer<T> {
/// technically speaking a data race and therefore UB. We should use an atomic store here, but
/// that would be more expensive and difficult to implement generically for all types `T`.
/// Hence, as a hack, we use a volatile write instead.
- unsafe fn write(&self, index: isize, task: T) {
- ptr::write_volatile(self.at(index), task)
+ unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
+ ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task)
}
/// Reads a task from the specified `index`.
@@ -71,9 +72,9 @@ impl<T> Buffer<T> {
/// This method might be concurrently called with another `write` at the same index, which is
/// technically speaking a data race and therefore UB. We should use an atomic load here, but
/// that would be more expensive and difficult to implement generically for all types `T`.
- /// Hence, as a hack, we use a volatile write instead.
- unsafe fn read(&self, index: isize) -> T {
- ptr::read_volatile(self.at(index))
+ /// Hence, as a hack, we use a volatile load instead.
+ unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
+ ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>())
}
}
@@ -115,8 +116,8 @@ struct Inner<T> {
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
// Load the back index, front index, and buffer.
- let b = self.back.load(Ordering::Relaxed);
- let f = self.front.load(Ordering::Relaxed);
+ let b = *self.back.get_mut();
+ let f = *self.front.get_mut();
unsafe {
let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
@@ -406,7 +407,7 @@ impl<T> Worker<T> {
// Write `task` into the slot.
unsafe {
- buffer.write(b, task);
+ buffer.write(b, MaybeUninit::new(task));
}
atomic::fence(Ordering::Release);
@@ -461,7 +462,7 @@ impl<T> Worker<T> {
unsafe {
// Read the popped task.
let buffer = self.buffer.get();
- let task = buffer.read(f);
+ let task = buffer.read(f).assume_init();
// Shrink the buffer if `len - 1` is less than one fourth of the capacity.
if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
@@ -509,8 +510,8 @@ impl<T> Worker<T> {
)
.is_err()
{
- // Failed. We didn't pop anything.
- mem::forget(task.take());
+ // Failed. We didn't pop anything. Reset to `None`.
+ task.take();
}
// Restore the back index to the original task.
@@ -524,7 +525,7 @@ impl<T> Worker<T> {
}
}
- task
+ task.map(|t| unsafe { t.assume_init() })
}
}
}
@@ -661,12 +662,11 @@ impl<T> Stealer<T> {
.is_err()
{
// We didn't steal this task, forget it.
- mem::forget(task);
return Steal::Retry;
}
// Return the stolen task.
- Steal::Success(task)
+ Steal::Success(unsafe { task.assume_init() })
}
/// Steals a batch of tasks and pushes them into another worker.
@@ -821,7 +821,6 @@ impl<T> Stealer<T> {
.is_err()
{
// We didn't steal this task, forget it and break from the loop.
- mem::forget(task);
batch_size = i;
break;
}
@@ -975,7 +974,6 @@ impl<T> Stealer<T> {
.is_err()
{
// We didn't steal this task, forget it.
- mem::forget(task);
return Steal::Retry;
}
@@ -992,7 +990,6 @@ impl<T> Stealer<T> {
.is_err()
{
// We didn't steal this task, forget it.
- mem::forget(task);
return Steal::Retry;
}
@@ -1037,7 +1034,6 @@ impl<T> Stealer<T> {
.is_err()
{
// We didn't steal this task, forget it and break from the loop.
- mem::forget(tmp);
batch_size = i;
break;
}
@@ -1077,7 +1073,7 @@ impl<T> Stealer<T> {
dest.inner.back.store(dest_b, Ordering::Release);
// Return with success.
- Steal::Success(task)
+ Steal::Success(unsafe { task.assume_init() })
}
}
@@ -1123,6 +1119,11 @@ struct Slot<T> {
}
impl<T> Slot<T> {
+ const UNINIT: Self = Self {
+ task: UnsafeCell::new(MaybeUninit::uninit()),
+ state: AtomicUsize::new(0),
+ };
+
/// Waits until a task is written into the slot.
fn wait_write(&self) {
let backoff = Backoff::new();
@@ -1146,13 +1147,10 @@ struct Block<T> {
impl<T> Block<T> {
/// Creates an empty block that starts at `start_index`.
fn new() -> Block<T> {
- // SAFETY: This is safe because:
- // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
- // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
- // [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it
- // holds a MaybeUninit.
- // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
- unsafe { MaybeUninit::zeroed().assume_init() }
+ Self {
+ next: AtomicPtr::new(ptr::null_mut()),
+ slots: [Slot::UNINIT; BLOCK_CAP],
+ }
}
/// Waits until the next pointer is set.
@@ -1535,7 +1533,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i);
slot.wait_write();
- let task = slot.task.get().read().assume_init();
+ let task = slot.task.get().read();
// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add(i as isize), task);
@@ -1547,7 +1545,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i);
slot.wait_write();
- let task = slot.task.get().read().assume_init();
+ let task = slot.task.get().read();
// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
@@ -1689,7 +1687,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
- let task = slot.task.get().read().assume_init();
+ let task = slot.task.get().read();
match dest.flavor {
Flavor::Fifo => {
@@ -1698,7 +1696,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i + 1);
slot.wait_write();
- let task = slot.task.get().read().assume_init();
+ let task = slot.task.get().read();
// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add(i as isize), task);
@@ -1711,7 +1709,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i + 1);
slot.wait_write();
- let task = slot.task.get().read().assume_init();
+ let task = slot.task.get().read();
// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
@@ -1744,7 +1742,7 @@ impl<T> Injector<T> {
}
}
- Steal::Success(task)
+ Steal::Success(task.assume_init())
}
}
@@ -1820,9 +1818,9 @@ impl<T> Injector<T> {
impl<T> Drop for Injector<T> {
fn drop(&mut self) {
- let mut head = self.head.index.load(Ordering::Relaxed);
- let mut tail = self.tail.index.load(Ordering::Relaxed);
- let mut block = self.head.block.load(Ordering::Relaxed);
+ let mut head = *self.head.index.get_mut();
+ let mut tail = *self.tail.index.get_mut();
+ let mut block = *self.head.block.get_mut();
// Erase the lower bits.
head &= !((1 << SHIFT) - 1);
@@ -1840,7 +1838,7 @@ impl<T> Drop for Injector<T> {
p.as_mut_ptr().drop_in_place();
} else {
// Deallocate the block and move to the next one.
- let next = (*block).next.load(Ordering::Relaxed);
+ let next = *(*block).next.get_mut();
drop(Box::from_raw(block));
block = next;
}