summaryrefslogtreecommitdiffstats
path: root/library/std/src/sync
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:18:25 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:18:25 +0000
commit5363f350887b1e5b5dd21a86f88c8af9d7fea6da (patch)
tree35ca005eb6e0e9a1ba3bb5dbc033209ad445dc17 /library/std/src/sync
parentAdding debian version 1.66.0+dfsg1-1. (diff)
downloadrustc-5363f350887b1e5b5dd21a86f88c8af9d7fea6da.tar.xz
rustc-5363f350887b1e5b5dd21a86f88c8af9d7fea6da.zip
Merging upstream version 1.67.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'library/std/src/sync')
-rw-r--r--library/std/src/sync/condvar.rs2
-rw-r--r--library/std/src/sync/lazy_lock.rs4
-rw-r--r--library/std/src/sync/mod.rs1
-rw-r--r--library/std/src/sync/mpmc/array.rs513
-rw-r--r--library/std/src/sync/mpmc/context.rs155
-rw-r--r--library/std/src/sync/mpmc/counter.rs137
-rw-r--r--library/std/src/sync/mpmc/error.rs46
-rw-r--r--library/std/src/sync/mpmc/list.rs638
-rw-r--r--library/std/src/sync/mpmc/mod.rs430
-rw-r--r--library/std/src/sync/mpmc/select.rs71
-rw-r--r--library/std/src/sync/mpmc/utils.rs143
-rw-r--r--library/std/src/sync/mpmc/waker.rs204
-rw-r--r--library/std/src/sync/mpmc/zero.rs318
-rw-r--r--library/std/src/sync/mpsc/blocking.rs82
-rw-r--r--library/std/src/sync/mpsc/cache_aligned.rs25
-rw-r--r--library/std/src/sync/mpsc/mod.rs473
-rw-r--r--library/std/src/sync/mpsc/mpsc_queue.rs117
-rw-r--r--library/std/src/sync/mpsc/mpsc_queue/tests.rs47
-rw-r--r--library/std/src/sync/mpsc/oneshot.rs315
-rw-r--r--library/std/src/sync/mpsc/shared.rs501
-rw-r--r--library/std/src/sync/mpsc/spsc_queue.rs236
-rw-r--r--library/std/src/sync/mpsc/spsc_queue/tests.rs102
-rw-r--r--library/std/src/sync/mpsc/stream.rs457
-rw-r--r--library/std/src/sync/mpsc/sync.rs495
-rw-r--r--library/std/src/sync/mpsc/sync_tests.rs8
-rw-r--r--library/std/src/sync/mpsc/tests.rs15
-rw-r--r--library/std/src/sync/mutex.rs16
-rw-r--r--library/std/src/sync/once_lock.rs6
-rw-r--r--library/std/src/sync/rwlock.rs12
29 files changed, 2730 insertions, 2839 deletions
diff --git a/library/std/src/sync/condvar.rs b/library/std/src/sync/condvar.rs
index eb1e7135a..76a1b4a2a 100644
--- a/library/std/src/sync/condvar.rs
+++ b/library/std/src/sync/condvar.rs
@@ -3,7 +3,7 @@ mod tests;
use crate::fmt;
use crate::sync::{mutex, poison, LockResult, MutexGuard, PoisonError};
-use crate::sys_common::condvar as sys;
+use crate::sys::locks as sys;
use crate::time::{Duration, Instant};
/// A type indicating whether a timed wait on a condition variable returned
diff --git a/library/std/src/sync/lazy_lock.rs b/library/std/src/sync/lazy_lock.rs
index 535cc1c42..c8d3289ca 100644
--- a/library/std/src/sync/lazy_lock.rs
+++ b/library/std/src/sync/lazy_lock.rs
@@ -6,7 +6,9 @@ use crate::sync::OnceLock;
/// A value which is initialized on the first access.
///
-/// This type is a thread-safe `Lazy`, and can be used in statics.
+/// This type is a thread-safe [`LazyCell`], and can be used in statics.
+///
+/// [`LazyCell`]: crate::cell::LazyCell
///
/// # Examples
///
diff --git a/library/std/src/sync/mod.rs b/library/std/src/sync/mod.rs
index 7b507a169..4fee8d3e9 100644
--- a/library/std/src/sync/mod.rs
+++ b/library/std/src/sync/mod.rs
@@ -182,6 +182,7 @@ pub mod mpsc;
mod barrier;
mod condvar;
mod lazy_lock;
+mod mpmc;
mod mutex;
mod once;
mod once_lock;
diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs
new file mode 100644
index 000000000..c1e3e48b0
--- /dev/null
+++ b/library/std/src/sync/mpmc/array.rs
@@ -0,0 +1,513 @@
+//! Bounded channel based on a preallocated array.
+//!
+//! This flavor has a fixed, positive capacity.
+//!
+//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
+//!
+//! Source:
+//! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
+//! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
+
+use super::context::Context;
+use super::error::*;
+use super::select::{Operation, Selected, Token};
+use super::utils::{Backoff, CachePadded};
+use super::waker::SyncWaker;
+
+use crate::cell::UnsafeCell;
+use crate::mem::MaybeUninit;
+use crate::ptr;
+use crate::sync::atomic::{self, AtomicUsize, Ordering};
+use crate::time::Instant;
+
+/// A slot in a channel.
+struct Slot<T> {
+ /// The current stamp.
+ stamp: AtomicUsize,
+
+ /// The message in this slot.
+ msg: UnsafeCell<MaybeUninit<T>>,
+}
+
+/// The token type for the array flavor.
+#[derive(Debug)]
+pub(crate) struct ArrayToken {
+ /// Slot to read from or write to.
+ slot: *const u8,
+
+ /// Stamp to store into the slot after reading or writing.
+ stamp: usize,
+}
+
+impl Default for ArrayToken {
+ #[inline]
+ fn default() -> Self {
+ ArrayToken { slot: ptr::null(), stamp: 0 }
+ }
+}
+
+/// Bounded channel based on a preallocated array.
+pub(crate) struct Channel<T> {
+ /// The head of the channel.
+ ///
+ /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
+ /// packed into a single `usize`. The lower bits represent the index, while the upper bits
+ /// represent the lap. The mark bit in the head is always zero.
+ ///
+ /// Messages are popped from the head of the channel.
+ head: CachePadded<AtomicUsize>,
+
+ /// The tail of the channel.
+ ///
+ /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
+ /// packed into a single `usize`. The lower bits represent the index, while the upper bits
+ /// represent the lap. The mark bit indicates that the channel is disconnected.
+ ///
+ /// Messages are pushed into the tail of the channel.
+ tail: CachePadded<AtomicUsize>,
+
+ /// The buffer holding slots.
+ buffer: Box<[Slot<T>]>,
+
+ /// The channel capacity.
+ cap: usize,
+
+ /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
+ one_lap: usize,
+
+ /// If this bit is set in the tail, that means the channel is disconnected.
+ mark_bit: usize,
+
+ /// Senders waiting while the channel is full.
+ senders: SyncWaker,
+
+ /// Receivers waiting while the channel is empty and not disconnected.
+ receivers: SyncWaker,
+}
+
+impl<T> Channel<T> {
+ /// Creates a bounded channel of capacity `cap`.
+ pub(crate) fn with_capacity(cap: usize) -> Self {
+ assert!(cap > 0, "capacity must be positive");
+
+ // Compute constants `mark_bit` and `one_lap`.
+ let mark_bit = (cap + 1).next_power_of_two();
+ let one_lap = mark_bit * 2;
+
+ // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
+ let head = 0;
+ // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
+ let tail = 0;
+
+ // Allocate a buffer of `cap` slots initialized
+ // with stamps.
+ let buffer: Box<[Slot<T>]> = (0..cap)
+ .map(|i| {
+ // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
+ Slot { stamp: AtomicUsize::new(i), msg: UnsafeCell::new(MaybeUninit::uninit()) }
+ })
+ .collect();
+
+ Channel {
+ buffer,
+ cap,
+ one_lap,
+ mark_bit,
+ head: CachePadded::new(AtomicUsize::new(head)),
+ tail: CachePadded::new(AtomicUsize::new(tail)),
+ senders: SyncWaker::new(),
+ receivers: SyncWaker::new(),
+ }
+ }
+
+ /// Attempts to reserve a slot for sending a message.
+ fn start_send(&self, token: &mut Token) -> bool {
+ let backoff = Backoff::new();
+ let mut tail = self.tail.load(Ordering::Relaxed);
+
+ loop {
+ // Check if the channel is disconnected.
+ if tail & self.mark_bit != 0 {
+ token.array.slot = ptr::null();
+ token.array.stamp = 0;
+ return true;
+ }
+
+ // Deconstruct the tail.
+ let index = tail & (self.mark_bit - 1);
+ let lap = tail & !(self.one_lap - 1);
+
+ // Inspect the corresponding slot.
+ debug_assert!(index < self.buffer.len());
+ let slot = unsafe { self.buffer.get_unchecked(index) };
+ let stamp = slot.stamp.load(Ordering::Acquire);
+
+ // If the tail and the stamp match, we may attempt to push.
+ if tail == stamp {
+ let new_tail = if index + 1 < self.cap {
+ // Same lap, incremented index.
+ // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
+ tail + 1
+ } else {
+ // One lap forward, index wraps around to zero.
+ // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
+ lap.wrapping_add(self.one_lap)
+ };
+
+ // Try moving the tail.
+ match self.tail.compare_exchange_weak(
+ tail,
+ new_tail,
+ Ordering::SeqCst,
+ Ordering::Relaxed,
+ ) {
+ Ok(_) => {
+ // Prepare the token for the follow-up call to `write`.
+ token.array.slot = slot as *const Slot<T> as *const u8;
+ token.array.stamp = tail + 1;
+ return true;
+ }
+ Err(_) => {
+ backoff.spin_light();
+ tail = self.tail.load(Ordering::Relaxed);
+ }
+ }
+ } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
+ atomic::fence(Ordering::SeqCst);
+ let head = self.head.load(Ordering::Relaxed);
+
+ // If the head lags one lap behind the tail as well...
+ if head.wrapping_add(self.one_lap) == tail {
+ // ...then the channel is full.
+ return false;
+ }
+
+ backoff.spin_light();
+ tail = self.tail.load(Ordering::Relaxed);
+ } else {
+ // Snooze because we need to wait for the stamp to get updated.
+ backoff.spin_heavy();
+ tail = self.tail.load(Ordering::Relaxed);
+ }
+ }
+ }
+
+ /// Writes a message into the channel.
+ pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ // If there is no slot, the channel is disconnected.
+ if token.array.slot.is_null() {
+ return Err(msg);
+ }
+
+ let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
+
+ // Write the message into the slot and update the stamp.
+ slot.msg.get().write(MaybeUninit::new(msg));
+ slot.stamp.store(token.array.stamp, Ordering::Release);
+
+ // Wake a sleeping receiver.
+ self.receivers.notify();
+ Ok(())
+ }
+
+ /// Attempts to reserve a slot for receiving a message.
+ fn start_recv(&self, token: &mut Token) -> bool {
+ let backoff = Backoff::new();
+ let mut head = self.head.load(Ordering::Relaxed);
+
+ loop {
+ // Deconstruct the head.
+ let index = head & (self.mark_bit - 1);
+ let lap = head & !(self.one_lap - 1);
+
+ // Inspect the corresponding slot.
+ debug_assert!(index < self.buffer.len());
+ let slot = unsafe { self.buffer.get_unchecked(index) };
+ let stamp = slot.stamp.load(Ordering::Acquire);
+
+ // If the stamp is ahead of the head by 1, we may attempt to pop.
+ if head + 1 == stamp {
+ let new = if index + 1 < self.cap {
+ // Same lap, incremented index.
+ // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
+ head + 1
+ } else {
+ // One lap forward, index wraps around to zero.
+ // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
+ lap.wrapping_add(self.one_lap)
+ };
+
+ // Try moving the head.
+ match self.head.compare_exchange_weak(
+ head,
+ new,
+ Ordering::SeqCst,
+ Ordering::Relaxed,
+ ) {
+ Ok(_) => {
+ // Prepare the token for the follow-up call to `read`.
+ token.array.slot = slot as *const Slot<T> as *const u8;
+ token.array.stamp = head.wrapping_add(self.one_lap);
+ return true;
+ }
+ Err(_) => {
+ backoff.spin_light();
+ head = self.head.load(Ordering::Relaxed);
+ }
+ }
+ } else if stamp == head {
+ atomic::fence(Ordering::SeqCst);
+ let tail = self.tail.load(Ordering::Relaxed);
+
+ // If the tail equals the head, that means the channel is empty.
+ if (tail & !self.mark_bit) == head {
+ // If the channel is disconnected...
+ if tail & self.mark_bit != 0 {
+ // ...then receive an error.
+ token.array.slot = ptr::null();
+ token.array.stamp = 0;
+ return true;
+ } else {
+ // Otherwise, the receive operation is not ready.
+ return false;
+ }
+ }
+
+ backoff.spin_light();
+ head = self.head.load(Ordering::Relaxed);
+ } else {
+ // Snooze because we need to wait for the stamp to get updated.
+ backoff.spin_heavy();
+ head = self.head.load(Ordering::Relaxed);
+ }
+ }
+ }
+
+ /// Reads a message from the channel.
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ if token.array.slot.is_null() {
+ // The channel is disconnected.
+ return Err(());
+ }
+
+ let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
+
+ // Read the message from the slot and update the stamp.
+ let msg = slot.msg.get().read().assume_init();
+ slot.stamp.store(token.array.stamp, Ordering::Release);
+
+ // Wake a sleeping sender.
+ self.senders.notify();
+ Ok(msg)
+ }
+
+ /// Attempts to send a message into the channel.
+ pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ let token = &mut Token::default();
+ if self.start_send(token) {
+ unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
+ } else {
+ Err(TrySendError::Full(msg))
+ }
+ }
+
+ /// Sends a message into the channel.
+ pub(crate) fn send(
+ &self,
+ msg: T,
+ deadline: Option<Instant>,
+ ) -> Result<(), SendTimeoutError<T>> {
+ let token = &mut Token::default();
+ loop {
+ // Try sending a message several times.
+ let backoff = Backoff::new();
+ loop {
+ if self.start_send(token) {
+ let res = unsafe { self.write(token, msg) };
+ return res.map_err(SendTimeoutError::Disconnected);
+ }
+
+ if backoff.is_completed() {
+ break;
+ } else {
+ backoff.spin_light();
+ }
+ }
+
+ if let Some(d) = deadline {
+ if Instant::now() >= d {
+ return Err(SendTimeoutError::Timeout(msg));
+ }
+ }
+
+ Context::with(|cx| {
+ // Prepare for blocking until a receiver wakes us up.
+ let oper = Operation::hook(token);
+ self.senders.register(oper, cx);
+
+ // Has the channel become ready just now?
+ if !self.is_full() || self.is_disconnected() {
+ let _ = cx.try_select(Selected::Aborted);
+ }
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted | Selected::Disconnected => {
+ self.senders.unregister(oper).unwrap();
+ }
+ Selected::Operation(_) => {}
+ }
+ });
+ }
+ }
+
+ /// Attempts to receive a message without blocking.
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
+ let token = &mut Token::default();
+
+ if self.start_recv(token) {
+ unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
+ } else {
+ Err(TryRecvError::Empty)
+ }
+ }
+
+ /// Receives a message from the channel.
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ let token = &mut Token::default();
+ loop {
+ if self.start_recv(token) {
+ let res = unsafe { self.read(token) };
+ return res.map_err(|_| RecvTimeoutError::Disconnected);
+ }
+
+ if let Some(d) = deadline {
+ if Instant::now() >= d {
+ return Err(RecvTimeoutError::Timeout);
+ }
+ }
+
+ Context::with(|cx| {
+ // Prepare for blocking until a sender wakes us up.
+ let oper = Operation::hook(token);
+ self.receivers.register(oper, cx);
+
+ // Has the channel become ready just now?
+ if !self.is_empty() || self.is_disconnected() {
+ let _ = cx.try_select(Selected::Aborted);
+ }
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted | Selected::Disconnected => {
+ self.receivers.unregister(oper).unwrap();
+ // If the channel was disconnected, we still have to check for remaining
+ // messages.
+ }
+ Selected::Operation(_) => {}
+ }
+ });
+ }
+ }
+
+ /// Returns the current number of messages inside the channel.
+ pub(crate) fn len(&self) -> usize {
+ loop {
+ // Load the tail, then load the head.
+ let tail = self.tail.load(Ordering::SeqCst);
+ let head = self.head.load(Ordering::SeqCst);
+
+ // If the tail didn't change, we've got consistent values to work with.
+ if self.tail.load(Ordering::SeqCst) == tail {
+ let hix = head & (self.mark_bit - 1);
+ let tix = tail & (self.mark_bit - 1);
+
+ return if hix < tix {
+ tix - hix
+ } else if hix > tix {
+ self.cap - hix + tix
+ } else if (tail & !self.mark_bit) == head {
+ 0
+ } else {
+ self.cap
+ };
+ }
+ }
+ }
+
+ /// Returns the capacity of the channel.
+ #[allow(clippy::unnecessary_wraps)] // This is intentional.
+ pub(crate) fn capacity(&self) -> Option<usize> {
+ Some(self.cap)
+ }
+
+ /// Disconnects the channel and wakes up all blocked senders and receivers.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ pub(crate) fn disconnect(&self) -> bool {
+ let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
+
+ if tail & self.mark_bit == 0 {
+ self.senders.disconnect();
+ self.receivers.disconnect();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Returns `true` if the channel is disconnected.
+ pub(crate) fn is_disconnected(&self) -> bool {
+ self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
+ }
+
+ /// Returns `true` if the channel is empty.
+ pub(crate) fn is_empty(&self) -> bool {
+ let head = self.head.load(Ordering::SeqCst);
+ let tail = self.tail.load(Ordering::SeqCst);
+
+ // Is the tail equal to the head?
+ //
+ // Note: If the head changes just before we load the tail, that means there was a moment
+ // when the channel was not empty, so it is safe to just return `false`.
+ (tail & !self.mark_bit) == head
+ }
+
+ /// Returns `true` if the channel is full.
+ pub(crate) fn is_full(&self) -> bool {
+ let tail = self.tail.load(Ordering::SeqCst);
+ let head = self.head.load(Ordering::SeqCst);
+
+ // Is the head lagging one lap behind tail?
+ //
+ // Note: If the tail changes just before we load the head, that means there was a moment
+ // when the channel was not full, so it is safe to just return `false`.
+ head.wrapping_add(self.one_lap) == tail & !self.mark_bit
+ }
+}
+
+impl<T> Drop for Channel<T> {
+ fn drop(&mut self) {
+ // Get the index of the head.
+ let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
+
+ // Loop over all slots that hold a message and drop them.
+ for i in 0..self.len() {
+ // Compute the index of the next slot holding a message.
+ let index = if hix + i < self.cap { hix + i } else { hix + i - self.cap };
+
+ unsafe {
+ debug_assert!(index < self.buffer.len());
+ let slot = self.buffer.get_unchecked_mut(index);
+ let msg = &mut *slot.msg.get();
+ msg.as_mut_ptr().drop_in_place();
+ }
+ }
+ }
+}
diff --git a/library/std/src/sync/mpmc/context.rs b/library/std/src/sync/mpmc/context.rs
new file mode 100644
index 000000000..bbfc6ce00
--- /dev/null
+++ b/library/std/src/sync/mpmc/context.rs
@@ -0,0 +1,155 @@
+//! Thread-local channel context.
+
+use super::select::Selected;
+use super::waker::current_thread_id;
+
+use crate::cell::Cell;
+use crate::ptr;
+use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
+use crate::sync::Arc;
+use crate::thread::{self, Thread};
+use crate::time::Instant;
+
+/// Thread-local context.
+#[derive(Debug, Clone)]
+pub struct Context {
+ inner: Arc<Inner>,
+}
+
+/// Inner representation of `Context`.
+#[derive(Debug)]
+struct Inner {
+ /// Selected operation.
+ select: AtomicUsize,
+
+ /// A slot into which another thread may store a pointer to its `Packet`.
+ packet: AtomicPtr<()>,
+
+ /// Thread handle.
+ thread: Thread,
+
+ /// Thread id.
+ thread_id: usize,
+}
+
+impl Context {
+ /// Creates a new context for the duration of the closure.
+ #[inline]
+ pub fn with<F, R>(f: F) -> R
+ where
+ F: FnOnce(&Context) -> R,
+ {
+ thread_local! {
+ /// Cached thread-local context.
+ static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
+ }
+
+ let mut f = Some(f);
+ let mut f = |cx: &Context| -> R {
+ let f = f.take().unwrap();
+ f(cx)
+ };
+
+ CONTEXT
+ .try_with(|cell| match cell.take() {
+ None => f(&Context::new()),
+ Some(cx) => {
+ cx.reset();
+ let res = f(&cx);
+ cell.set(Some(cx));
+ res
+ }
+ })
+ .unwrap_or_else(|_| f(&Context::new()))
+ }
+
+ /// Creates a new `Context`.
+ #[cold]
+ fn new() -> Context {
+ Context {
+ inner: Arc::new(Inner {
+ select: AtomicUsize::new(Selected::Waiting.into()),
+ packet: AtomicPtr::new(ptr::null_mut()),
+ thread: thread::current(),
+ thread_id: current_thread_id(),
+ }),
+ }
+ }
+
+ /// Resets `select` and `packet`.
+ #[inline]
+ fn reset(&self) {
+ self.inner.select.store(Selected::Waiting.into(), Ordering::Release);
+ self.inner.packet.store(ptr::null_mut(), Ordering::Release);
+ }
+
+ /// Attempts to select an operation.
+ ///
+ /// On failure, the previously selected operation is returned.
+ #[inline]
+ pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
+ self.inner
+ .select
+ .compare_exchange(
+ Selected::Waiting.into(),
+ select.into(),
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ )
+ .map(|_| ())
+ .map_err(|e| e.into())
+ }
+
+ /// Stores a packet.
+ ///
+ /// This method must be called after `try_select` succeeds and there is a packet to provide.
+ #[inline]
+ pub fn store_packet(&self, packet: *mut ()) {
+ if !packet.is_null() {
+ self.inner.packet.store(packet, Ordering::Release);
+ }
+ }
+
+ /// Waits until an operation is selected and returns it.
+ ///
+ /// If the deadline is reached, `Selected::Aborted` will be selected.
+ #[inline]
+ pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
+ loop {
+ // Check whether an operation has been selected.
+ let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
+ if sel != Selected::Waiting {
+ return sel;
+ }
+
+ // If there's a deadline, park the current thread until the deadline is reached.
+ if let Some(end) = deadline {
+ let now = Instant::now();
+
+ if now < end {
+ thread::park_timeout(end - now);
+ } else {
+ // The deadline has been reached. Try aborting select.
+ return match self.try_select(Selected::Aborted) {
+ Ok(()) => Selected::Aborted,
+ Err(s) => s,
+ };
+ }
+ } else {
+ thread::park();
+ }
+ }
+ }
+
+ /// Unparks the thread this context belongs to.
+ #[inline]
+ pub fn unpark(&self) {
+ self.inner.thread.unpark();
+ }
+
+ /// Returns the id of the thread this context belongs to.
+ #[inline]
+ pub fn thread_id(&self) -> usize {
+ self.inner.thread_id
+ }
+}
diff --git a/library/std/src/sync/mpmc/counter.rs b/library/std/src/sync/mpmc/counter.rs
new file mode 100644
index 000000000..a5a6bdc67
--- /dev/null
+++ b/library/std/src/sync/mpmc/counter.rs
@@ -0,0 +1,137 @@
+use crate::ops;
+use crate::process;
+use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+
+/// Reference counter internals.
+struct Counter<C> {
+ /// The number of senders associated with the channel.
+ senders: AtomicUsize,
+
+ /// The number of receivers associated with the channel.
+ receivers: AtomicUsize,
+
+ /// Set to `true` if the last sender or the last receiver reference deallocates the channel.
+ destroy: AtomicBool,
+
+ /// The internal channel.
+ chan: C,
+}
+
+/// Wraps a channel into the reference counter.
+pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
+ let counter = Box::into_raw(Box::new(Counter {
+ senders: AtomicUsize::new(1),
+ receivers: AtomicUsize::new(1),
+ destroy: AtomicBool::new(false),
+ chan,
+ }));
+ let s = Sender { counter };
+ let r = Receiver { counter };
+ (s, r)
+}
+
+/// The sending side.
+pub(crate) struct Sender<C> {
+ counter: *mut Counter<C>,
+}
+
+impl<C> Sender<C> {
+ /// Returns the internal `Counter`.
+ fn counter(&self) -> &Counter<C> {
+ unsafe { &*self.counter }
+ }
+
+ /// Acquires another sender reference.
+ pub(crate) fn acquire(&self) -> Sender<C> {
+ let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
+
+ // Cloning senders and calling `mem::forget` on the clones could potentially overflow the
+ // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
+ // just abort when the count becomes very large.
+ if count > isize::MAX as usize {
+ process::abort();
+ }
+
+ Sender { counter: self.counter }
+ }
+
+ /// Releases the sender reference.
+ ///
+ /// Function `disconnect` will be called if this is the last sender reference.
+ pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
+ if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
+ disconnect(&self.counter().chan);
+
+ if self.counter().destroy.swap(true, Ordering::AcqRel) {
+ drop(Box::from_raw(self.counter));
+ }
+ }
+ }
+}
+
+impl<C> ops::Deref for Sender<C> {
+ type Target = C;
+
+ fn deref(&self) -> &C {
+ &self.counter().chan
+ }
+}
+
+impl<C> PartialEq for Sender<C> {
+ fn eq(&self, other: &Sender<C>) -> bool {
+ self.counter == other.counter
+ }
+}
+
+/// The receiving side.
+pub(crate) struct Receiver<C> {
+ counter: *mut Counter<C>,
+}
+
+impl<C> Receiver<C> {
+ /// Returns the internal `Counter`.
+ fn counter(&self) -> &Counter<C> {
+ unsafe { &*self.counter }
+ }
+
+ /// Acquires another receiver reference.
+ pub(crate) fn acquire(&self) -> Receiver<C> {
+ let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
+
+ // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
+ // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
+ // just abort when the count becomes very large.
+ if count > isize::MAX as usize {
+ process::abort();
+ }
+
+ Receiver { counter: self.counter }
+ }
+
+ /// Releases the receiver reference.
+ ///
+ /// Function `disconnect` will be called if this is the last receiver reference.
+ pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
+ if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
+ disconnect(&self.counter().chan);
+
+ if self.counter().destroy.swap(true, Ordering::AcqRel) {
+ drop(Box::from_raw(self.counter));
+ }
+ }
+ }
+}
+
+impl<C> ops::Deref for Receiver<C> {
+ type Target = C;
+
+ fn deref(&self) -> &C {
+ &self.counter().chan
+ }
+}
+
+impl<C> PartialEq for Receiver<C> {
+ fn eq(&self, other: &Receiver<C>) -> bool {
+ self.counter == other.counter
+ }
+}
diff --git a/library/std/src/sync/mpmc/error.rs b/library/std/src/sync/mpmc/error.rs
new file mode 100644
index 000000000..1b8a1f387
--- /dev/null
+++ b/library/std/src/sync/mpmc/error.rs
@@ -0,0 +1,46 @@
+use crate::error;
+use crate::fmt;
+
+pub use crate::sync::mpsc::{RecvError, RecvTimeoutError, SendError, TryRecvError, TrySendError};
+
+/// An error returned from the [`send_timeout`] method.
+///
+/// The error contains the message being sent so it can be recovered.
+///
+/// [`send_timeout`]: super::Sender::send_timeout
+#[derive(PartialEq, Eq, Clone, Copy)]
+pub enum SendTimeoutError<T> {
+ /// The message could not be sent because the channel is full and the operation timed out.
+ ///
+ /// If this is a zero-capacity channel, then the error indicates that there was no receiver
+ /// available to receive the message and the operation timed out.
+ Timeout(T),
+
+ /// The message could not be sent because the channel is disconnected.
+ Disconnected(T),
+}
+
+impl<T> fmt::Debug for SendTimeoutError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ "SendTimeoutError(..)".fmt(f)
+ }
+}
+
+impl<T> fmt::Display for SendTimeoutError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match *self {
+ SendTimeoutError::Timeout(..) => "timed out waiting on send operation".fmt(f),
+ SendTimeoutError::Disconnected(..) => "sending on a disconnected channel".fmt(f),
+ }
+ }
+}
+
+impl<T: Send> error::Error for SendTimeoutError<T> {}
+
+impl<T> From<SendError<T>> for SendTimeoutError<T> {
+ fn from(err: SendError<T>) -> SendTimeoutError<T> {
+ match err {
+ SendError(e) => SendTimeoutError::Disconnected(e),
+ }
+ }
+}
diff --git a/library/std/src/sync/mpmc/list.rs b/library/std/src/sync/mpmc/list.rs
new file mode 100644
index 000000000..ec6c0726a
--- /dev/null
+++ b/library/std/src/sync/mpmc/list.rs
@@ -0,0 +1,638 @@
+//! Unbounded channel implemented as a linked list.
+
+use super::context::Context;
+use super::error::*;
+use super::select::{Operation, Selected, Token};
+use super::utils::{Backoff, CachePadded};
+use super::waker::SyncWaker;
+
+use crate::cell::UnsafeCell;
+use crate::marker::PhantomData;
+use crate::mem::MaybeUninit;
+use crate::ptr;
+use crate::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
+use crate::time::Instant;
+
+// Bits indicating the state of a slot:
+// * If a message has been written into the slot, `WRITE` is set.
+// * If a message has been read from the slot, `READ` is set.
+// * If the block is being destroyed, `DESTROY` is set.
+const WRITE: usize = 1;
+const READ: usize = 2;
+const DESTROY: usize = 4;
+
+// Each block covers one "lap" of indices.
+const LAP: usize = 32;
+// The maximum number of messages a block can hold.
+const BLOCK_CAP: usize = LAP - 1;
+// How many lower bits are reserved for metadata.
+const SHIFT: usize = 1;
+// Has two different purposes:
+// * If set in head, indicates that the block is not the last one.
+// * If set in tail, indicates that the channel is disconnected.
+const MARK_BIT: usize = 1;
+
+/// A slot in a block.
+struct Slot<T> {
+ /// The message.
+ msg: UnsafeCell<MaybeUninit<T>>,
+
+ /// The state of the slot.
+ state: AtomicUsize,
+}
+
+impl<T> Slot<T> {
+ /// Waits until a message is written into the slot.
+ fn wait_write(&self) {
+ let backoff = Backoff::new();
+ while self.state.load(Ordering::Acquire) & WRITE == 0 {
+ backoff.spin_heavy();
+ }
+ }
+}
+
+/// A block in a linked list.
+///
+/// Each block in the list can hold up to `BLOCK_CAP` messages.
+struct Block<T> {
+ /// The next block in the linked list.
+ next: AtomicPtr<Block<T>>,
+
+ /// Slots for messages.
+ slots: [Slot<T>; BLOCK_CAP],
+}
+
+impl<T> Block<T> {
+ /// Creates an empty block.
+ fn new() -> Block<T> {
+ // SAFETY: This is safe because:
+ // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
+ // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
+ // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
+ // holds a MaybeUninit.
+ // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
+ unsafe { MaybeUninit::zeroed().assume_init() }
+ }
+
+ /// Waits until the next pointer is set.
+ fn wait_next(&self) -> *mut Block<T> {
+ let backoff = Backoff::new();
+ loop {
+ let next = self.next.load(Ordering::Acquire);
+ if !next.is_null() {
+ return next;
+ }
+ backoff.spin_heavy();
+ }
+ }
+
+ /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
+ unsafe fn destroy(this: *mut Block<T>, start: usize) {
+ // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
+ // begun destruction of the block.
+ for i in start..BLOCK_CAP - 1 {
+ let slot = (*this).slots.get_unchecked(i);
+
+ // Mark the `DESTROY` bit if a thread is still using the slot.
+ if slot.state.load(Ordering::Acquire) & READ == 0
+ && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
+ {
+ // If a thread is still using the slot, it will continue destruction of the block.
+ return;
+ }
+ }
+
+ // No thread is using the block, now it is safe to destroy it.
+ drop(Box::from_raw(this));
+ }
+}
+
+/// A position in a channel.
+#[derive(Debug)]
+struct Position<T> {
+ /// The index in the channel.
+ index: AtomicUsize,
+
+ /// The block in the linked list.
+ block: AtomicPtr<Block<T>>,
+}
+
+/// The token type for the list flavor.
+#[derive(Debug)]
+pub(crate) struct ListToken {
+ /// The block of slots.
+ block: *const u8,
+
+ /// The offset into the block.
+ offset: usize,
+}
+
+impl Default for ListToken {
+ #[inline]
+ fn default() -> Self {
+ ListToken { block: ptr::null(), offset: 0 }
+ }
+}
+
+/// Unbounded channel implemented as a linked list.
+///
+/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
+/// represented as numbers of type `usize` and wrap on overflow.
+///
+/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
+/// improve cache efficiency.
+pub(crate) struct Channel<T> {
+ /// The head of the channel.
+ head: CachePadded<Position<T>>,
+
+ /// The tail of the channel.
+ tail: CachePadded<Position<T>>,
+
+ /// Receivers waiting while the channel is empty and not disconnected.
+ receivers: SyncWaker,
+
+ /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
+ _marker: PhantomData<T>,
+}
+
+impl<T> Channel<T> {
+ /// Creates a new unbounded channel.
+ pub(crate) fn new() -> Self {
+ Channel {
+ head: CachePadded::new(Position {
+ block: AtomicPtr::new(ptr::null_mut()),
+ index: AtomicUsize::new(0),
+ }),
+ tail: CachePadded::new(Position {
+ block: AtomicPtr::new(ptr::null_mut()),
+ index: AtomicUsize::new(0),
+ }),
+ receivers: SyncWaker::new(),
+ _marker: PhantomData,
+ }
+ }
+
+ /// Attempts to reserve a slot for sending a message.
+ fn start_send(&self, token: &mut Token) -> bool {
+ let backoff = Backoff::new();
+ let mut tail = self.tail.index.load(Ordering::Acquire);
+ let mut block = self.tail.block.load(Ordering::Acquire);
+ let mut next_block = None;
+
+ loop {
+ // Check if the channel is disconnected.
+ if tail & MARK_BIT != 0 {
+ token.list.block = ptr::null();
+ return true;
+ }
+
+ // Calculate the offset of the index into the block.
+ let offset = (tail >> SHIFT) % LAP;
+
+ // If we reached the end of the block, wait until the next one is installed.
+ if offset == BLOCK_CAP {
+ backoff.spin_heavy();
+ tail = self.tail.index.load(Ordering::Acquire);
+ block = self.tail.block.load(Ordering::Acquire);
+ continue;
+ }
+
+ // If we're going to have to install the next block, allocate it in advance in order to
+ // make the wait for other threads as short as possible.
+ if offset + 1 == BLOCK_CAP && next_block.is_none() {
+ next_block = Some(Box::new(Block::<T>::new()));
+ }
+
+ // If this is the first message to be sent into the channel, we need to allocate the
+ // first block and install it.
+ if block.is_null() {
+ let new = Box::into_raw(Box::new(Block::<T>::new()));
+
+ if self
+ .tail
+ .block
+ .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
+ .is_ok()
+ {
+ self.head.block.store(new, Ordering::Release);
+ block = new;
+ } else {
+ next_block = unsafe { Some(Box::from_raw(new)) };
+ tail = self.tail.index.load(Ordering::Acquire);
+ block = self.tail.block.load(Ordering::Acquire);
+ continue;
+ }
+ }
+
+ let new_tail = tail + (1 << SHIFT);
+
+ // Try advancing the tail forward.
+ match self.tail.index.compare_exchange_weak(
+ tail,
+ new_tail,
+ Ordering::SeqCst,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => unsafe {
+ // If we've reached the end of the block, install the next one.
+ if offset + 1 == BLOCK_CAP {
+ let next_block = Box::into_raw(next_block.unwrap());
+ self.tail.block.store(next_block, Ordering::Release);
+ self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
+ (*block).next.store(next_block, Ordering::Release);
+ }
+
+ token.list.block = block as *const u8;
+ token.list.offset = offset;
+ return true;
+ },
+ Err(_) => {
+ backoff.spin_light();
+ tail = self.tail.index.load(Ordering::Acquire);
+ block = self.tail.block.load(Ordering::Acquire);
+ }
+ }
+ }
+ }
+
+ /// Writes a message into the channel.
+ pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ // If there is no slot, the channel is disconnected.
+ if token.list.block.is_null() {
+ return Err(msg);
+ }
+
+ // Write the message into the slot.
+ let block = token.list.block as *mut Block<T>;
+ let offset = token.list.offset;
+ let slot = (*block).slots.get_unchecked(offset);
+ slot.msg.get().write(MaybeUninit::new(msg));
+ slot.state.fetch_or(WRITE, Ordering::Release);
+
+ // Wake a sleeping receiver.
+ self.receivers.notify();
+ Ok(())
+ }
+
+ /// Attempts to reserve a slot for receiving a message.
+ fn start_recv(&self, token: &mut Token) -> bool {
+ let backoff = Backoff::new();
+ let mut head = self.head.index.load(Ordering::Acquire);
+ let mut block = self.head.block.load(Ordering::Acquire);
+
+ loop {
+ // Calculate the offset of the index into the block.
+ let offset = (head >> SHIFT) % LAP;
+
+ // If we reached the end of the block, wait until the next one is installed.
+ if offset == BLOCK_CAP {
+ backoff.spin_heavy();
+ head = self.head.index.load(Ordering::Acquire);
+ block = self.head.block.load(Ordering::Acquire);
+ continue;
+ }
+
+ let mut new_head = head + (1 << SHIFT);
+
+ if new_head & MARK_BIT == 0 {
+ atomic::fence(Ordering::SeqCst);
+ let tail = self.tail.index.load(Ordering::Relaxed);
+
+ // If the tail equals the head, that means the channel is empty.
+ if head >> SHIFT == tail >> SHIFT {
+ // If the channel is disconnected...
+ if tail & MARK_BIT != 0 {
+ // ...then receive an error.
+ token.list.block = ptr::null();
+ return true;
+ } else {
+ // Otherwise, the receive operation is not ready.
+ return false;
+ }
+ }
+
+ // If head and tail are not in the same block, set `MARK_BIT` in head.
+ if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
+ new_head |= MARK_BIT;
+ }
+ }
+
+ // The block can be null here only if the first message is being sent into the channel.
+ // In that case, just wait until it gets initialized.
+ if block.is_null() {
+ backoff.spin_heavy();
+ head = self.head.index.load(Ordering::Acquire);
+ block = self.head.block.load(Ordering::Acquire);
+ continue;
+ }
+
+ // Try moving the head index forward.
+ match self.head.index.compare_exchange_weak(
+ head,
+ new_head,
+ Ordering::SeqCst,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => unsafe {
+ // If we've reached the end of the block, move to the next one.
+ if offset + 1 == BLOCK_CAP {
+ let next = (*block).wait_next();
+ let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
+ if !(*next).next.load(Ordering::Relaxed).is_null() {
+ next_index |= MARK_BIT;
+ }
+
+ self.head.block.store(next, Ordering::Release);
+ self.head.index.store(next_index, Ordering::Release);
+ }
+
+ token.list.block = block as *const u8;
+ token.list.offset = offset;
+ return true;
+ },
+ Err(_) => {
+ backoff.spin_light();
+ head = self.head.index.load(Ordering::Acquire);
+ block = self.head.block.load(Ordering::Acquire);
+ }
+ }
+ }
+ }
+
+ /// Reads a message from the channel.
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ if token.list.block.is_null() {
+ // The channel is disconnected.
+ return Err(());
+ }
+
+ // Read the message.
+ let block = token.list.block as *mut Block<T>;
+ let offset = token.list.offset;
+ let slot = (*block).slots.get_unchecked(offset);
+ slot.wait_write();
+ let msg = slot.msg.get().read().assume_init();
+
+ // Destroy the block if we've reached the end, or if another thread wanted to destroy but
+ // couldn't because we were busy reading from the slot.
+ if offset + 1 == BLOCK_CAP {
+ Block::destroy(block, 0);
+ } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
+ Block::destroy(block, offset + 1);
+ }
+
+ Ok(msg)
+ }
+
+ /// Attempts to send a message into the channel.
+ pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ self.send(msg, None).map_err(|err| match err {
+ SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
+ SendTimeoutError::Timeout(_) => unreachable!(),
+ })
+ }
+
+ /// Sends a message into the channel.
+ pub(crate) fn send(
+ &self,
+ msg: T,
+ _deadline: Option<Instant>,
+ ) -> Result<(), SendTimeoutError<T>> {
+ let token = &mut Token::default();
+ assert!(self.start_send(token));
+ unsafe { self.write(token, msg).map_err(SendTimeoutError::Disconnected) }
+ }
+
+ /// Attempts to receive a message without blocking.
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
+ let token = &mut Token::default();
+
+ if self.start_recv(token) {
+ unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
+ } else {
+ Err(TryRecvError::Empty)
+ }
+ }
+
+ /// Receives a message from the channel.
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ let token = &mut Token::default();
+ loop {
+ if self.start_recv(token) {
+ unsafe {
+ return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
+ }
+ }
+
+ if let Some(d) = deadline {
+ if Instant::now() >= d {
+ return Err(RecvTimeoutError::Timeout);
+ }
+ }
+
+ // Prepare for blocking until a sender wakes us up.
+ Context::with(|cx| {
+ let oper = Operation::hook(token);
+ self.receivers.register(oper, cx);
+
+ // Has the channel become ready just now?
+ if !self.is_empty() || self.is_disconnected() {
+ let _ = cx.try_select(Selected::Aborted);
+ }
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted | Selected::Disconnected => {
+ self.receivers.unregister(oper).unwrap();
+ // If the channel was disconnected, we still have to check for remaining
+ // messages.
+ }
+ Selected::Operation(_) => {}
+ }
+ });
+ }
+ }
+
+ /// Returns the current number of messages inside the channel.
+ pub(crate) fn len(&self) -> usize {
+ loop {
+ // Load the tail index, then load the head index.
+ let mut tail = self.tail.index.load(Ordering::SeqCst);
+ let mut head = self.head.index.load(Ordering::SeqCst);
+
+ // If the tail index didn't change, we've got consistent indices to work with.
+ if self.tail.index.load(Ordering::SeqCst) == tail {
+ // Erase the lower bits.
+ tail &= !((1 << SHIFT) - 1);
+ head &= !((1 << SHIFT) - 1);
+
+ // Fix up indices if they fall onto block ends.
+ if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
+ tail = tail.wrapping_add(1 << SHIFT);
+ }
+ if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
+ head = head.wrapping_add(1 << SHIFT);
+ }
+
+ // Rotate indices so that head falls into the first block.
+ let lap = (head >> SHIFT) / LAP;
+ tail = tail.wrapping_sub((lap * LAP) << SHIFT);
+ head = head.wrapping_sub((lap * LAP) << SHIFT);
+
+ // Remove the lower bits.
+ tail >>= SHIFT;
+ head >>= SHIFT;
+
+ // Return the difference minus the number of blocks between tail and head.
+ return tail - head - tail / LAP;
+ }
+ }
+ }
+
+ /// Returns the capacity of the channel.
+ pub(crate) fn capacity(&self) -> Option<usize> {
+ None
+ }
+
+ /// Disconnects senders and wakes up all blocked receivers.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ pub(crate) fn disconnect_senders(&self) -> bool {
+ let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
+
+ if tail & MARK_BIT == 0 {
+ self.receivers.disconnect();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Disconnects receivers.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ pub(crate) fn disconnect_receivers(&self) -> bool {
+ let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
+
+ if tail & MARK_BIT == 0 {
+ // If receivers are dropped first, discard all messages to free
+ // memory eagerly.
+ self.discard_all_messages();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Discards all messages.
+ ///
+ /// This method should only be called when all receivers are dropped.
+ fn discard_all_messages(&self) {
+ let backoff = Backoff::new();
+ let mut tail = self.tail.index.load(Ordering::Acquire);
+ loop {
+ let offset = (tail >> SHIFT) % LAP;
+ if offset != BLOCK_CAP {
+ break;
+ }
+
+ // New updates to tail will be rejected by MARK_BIT and aborted unless it's
+ // at boundary. We need to wait for the updates take affect otherwise there
+ // can be memory leaks.
+ backoff.spin_heavy();
+ tail = self.tail.index.load(Ordering::Acquire);
+ }
+
+ let mut head = self.head.index.load(Ordering::Acquire);
+ let mut block = self.head.block.load(Ordering::Acquire);
+
+ unsafe {
+ // Drop all messages between head and tail and deallocate the heap-allocated blocks.
+ while head >> SHIFT != tail >> SHIFT {
+ let offset = (head >> SHIFT) % LAP;
+
+ if offset < BLOCK_CAP {
+ // Drop the message in the slot.
+ let slot = (*block).slots.get_unchecked(offset);
+ slot.wait_write();
+ let p = &mut *slot.msg.get();
+ p.as_mut_ptr().drop_in_place();
+ } else {
+ (*block).wait_next();
+ // Deallocate the block and move to the next one.
+ let next = (*block).next.load(Ordering::Acquire);
+ drop(Box::from_raw(block));
+ block = next;
+ }
+
+ head = head.wrapping_add(1 << SHIFT);
+ }
+
+ // Deallocate the last remaining block.
+ if !block.is_null() {
+ drop(Box::from_raw(block));
+ }
+ }
+ head &= !MARK_BIT;
+ self.head.block.store(ptr::null_mut(), Ordering::Release);
+ self.head.index.store(head, Ordering::Release);
+ }
+
+ /// Returns `true` if the channel is disconnected.
+ pub(crate) fn is_disconnected(&self) -> bool {
+ self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
+ }
+
+ /// Returns `true` if the channel is empty.
+ pub(crate) fn is_empty(&self) -> bool {
+ let head = self.head.index.load(Ordering::SeqCst);
+ let tail = self.tail.index.load(Ordering::SeqCst);
+ head >> SHIFT == tail >> SHIFT
+ }
+
+ /// Returns `true` if the channel is full.
+ pub(crate) fn is_full(&self) -> bool {
+ false
+ }
+}
+
+impl<T> Drop for Channel<T> {
+ fn drop(&mut self) {
+ let mut head = self.head.index.load(Ordering::Relaxed);
+ let mut tail = self.tail.index.load(Ordering::Relaxed);
+ let mut block = self.head.block.load(Ordering::Relaxed);
+
+ // Erase the lower bits.
+ head &= !((1 << SHIFT) - 1);
+ tail &= !((1 << SHIFT) - 1);
+
+ unsafe {
+ // Drop all messages between head and tail and deallocate the heap-allocated blocks.
+ while head != tail {
+ let offset = (head >> SHIFT) % LAP;
+
+ if offset < BLOCK_CAP {
+ // Drop the message in the slot.
+ let slot = (*block).slots.get_unchecked(offset);
+ let p = &mut *slot.msg.get();
+ p.as_mut_ptr().drop_in_place();
+ } else {
+ // Deallocate the block and move to the next one.
+ let next = (*block).next.load(Ordering::Relaxed);
+ drop(Box::from_raw(block));
+ block = next;
+ }
+
+ head = head.wrapping_add(1 << SHIFT);
+ }
+
+ // Deallocate the last remaining block.
+ if !block.is_null() {
+ drop(Box::from_raw(block));
+ }
+ }
+ }
+}
diff --git a/library/std/src/sync/mpmc/mod.rs b/library/std/src/sync/mpmc/mod.rs
new file mode 100644
index 000000000..7a602cecd
--- /dev/null
+++ b/library/std/src/sync/mpmc/mod.rs
@@ -0,0 +1,430 @@
+//! Multi-producer multi-consumer channels.
+
+// This module is not currently exposed publicly, but is used
+// as the implementation for the channels in `sync::mpsc`. The
+// implementation comes from the crossbeam-channel crate:
+//
+// Copyright (c) 2019 The Crossbeam Project Developers
+//
+// Permission is hereby granted, free of charge, to any
+// person obtaining a copy of this software and associated
+// documentation files (the "Software"), to deal in the
+// Software without restriction, including without
+// limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software
+// is furnished to do so, subject to the following
+// conditions:
+//
+// The above copyright notice and this permission notice
+// shall be included in all copies or substantial portions
+// of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+mod array;
+mod context;
+mod counter;
+mod error;
+mod list;
+mod select;
+mod utils;
+mod waker;
+mod zero;
+
+use crate::fmt;
+use crate::panic::{RefUnwindSafe, UnwindSafe};
+use crate::time::{Duration, Instant};
+pub use error::*;
+
+/// Creates a channel of unbounded capacity.
+///
+/// This channel has a growable buffer that can hold any number of messages at a time.
+pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ let (s, r) = counter::new(list::Channel::new());
+ let s = Sender { flavor: SenderFlavor::List(s) };
+ let r = Receiver { flavor: ReceiverFlavor::List(r) };
+ (s, r)
+}
+
+/// Creates a channel of bounded capacity.
+///
+/// This channel has a buffer that can hold at most `cap` messages at a time.
+///
+/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
+/// receive operations must appear at the same time in order to pair up and pass the message over.
+pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
+ if cap == 0 {
+ let (s, r) = counter::new(zero::Channel::new());
+ let s = Sender { flavor: SenderFlavor::Zero(s) };
+ let r = Receiver { flavor: ReceiverFlavor::Zero(r) };
+ (s, r)
+ } else {
+ let (s, r) = counter::new(array::Channel::with_capacity(cap));
+ let s = Sender { flavor: SenderFlavor::Array(s) };
+ let r = Receiver { flavor: ReceiverFlavor::Array(r) };
+ (s, r)
+ }
+}
+
+/// The sending side of a channel.
+pub struct Sender<T> {
+ flavor: SenderFlavor<T>,
+}
+
+/// Sender flavors.
+enum SenderFlavor<T> {
+ /// Bounded channel based on a preallocated array.
+ Array(counter::Sender<array::Channel<T>>),
+
+ /// Unbounded channel implemented as a linked list.
+ List(counter::Sender<list::Channel<T>>),
+
+ /// Zero-capacity channel.
+ Zero(counter::Sender<zero::Channel<T>>),
+}
+
+unsafe impl<T: Send> Send for Sender<T> {}
+unsafe impl<T: Send> Sync for Sender<T> {}
+
+impl<T> UnwindSafe for Sender<T> {}
+impl<T> RefUnwindSafe for Sender<T> {}
+
+impl<T> Sender<T> {
+ /// Attempts to send a message into the channel without blocking.
+ ///
+ /// This method will either send a message into the channel immediately or return an error if
+ /// the channel is full or disconnected. The returned error contains the original message.
+ ///
+ /// If called on a zero-capacity channel, this method will send the message only if there
+ /// happens to be a receive operation on the other side of the channel at the same time.
+ pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.try_send(msg),
+ SenderFlavor::List(chan) => chan.try_send(msg),
+ SenderFlavor::Zero(chan) => chan.try_send(msg),
+ }
+ }
+
+ /// Blocks the current thread until a message is sent or the channel is disconnected.
+ ///
+ /// If the channel is full and not disconnected, this call will block until the send operation
+ /// can proceed. If the channel becomes disconnected, this call will wake up and return an
+ /// error. The returned error contains the original message.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a receive operation to
+ /// appear on the other side of the channel.
+ pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.send(msg, None),
+ SenderFlavor::List(chan) => chan.send(msg, None),
+ SenderFlavor::Zero(chan) => chan.send(msg, None),
+ }
+ .map_err(|err| match err {
+ SendTimeoutError::Disconnected(msg) => SendError(msg),
+ SendTimeoutError::Timeout(_) => unreachable!(),
+ })
+ }
+}
+
+// The methods below are not used by `sync::mpsc`, but
+// are useful and we'll likely want to expose them
+// eventually
+#[allow(unused)]
+impl<T> Sender<T> {
+ /// Waits for a message to be sent into the channel, but only for a limited time.
+ ///
+ /// If the channel is full and not disconnected, this call will block until the send operation
+ /// can proceed or the operation times out. If the channel becomes disconnected, this call will
+ /// wake up and return an error. The returned error contains the original message.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a receive operation to
+ /// appear on the other side of the channel.
+ pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
+ match Instant::now().checked_add(timeout) {
+ Some(deadline) => self.send_deadline(msg, deadline),
+ // So far in the future that it's practically the same as waiting indefinitely.
+ None => self.send(msg).map_err(SendTimeoutError::from),
+ }
+ }
+
+ /// Waits for a message to be sent into the channel, but only until a given deadline.
+ ///
+ /// If the channel is full and not disconnected, this call will block until the send operation
+ /// can proceed or the operation times out. If the channel becomes disconnected, this call will
+ /// wake up and return an error. The returned error contains the original message.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a receive operation to
+ /// appear on the other side of the channel.
+ pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
+ SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
+ SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
+ }
+ }
+
+ /// Returns `true` if the channel is empty.
+ ///
+ /// Note: Zero-capacity channels are always empty.
+ pub fn is_empty(&self) -> bool {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.is_empty(),
+ SenderFlavor::List(chan) => chan.is_empty(),
+ SenderFlavor::Zero(chan) => chan.is_empty(),
+ }
+ }
+
+ /// Returns `true` if the channel is full.
+ ///
+ /// Note: Zero-capacity channels are always full.
+ pub fn is_full(&self) -> bool {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.is_full(),
+ SenderFlavor::List(chan) => chan.is_full(),
+ SenderFlavor::Zero(chan) => chan.is_full(),
+ }
+ }
+
+ /// Returns the number of messages in the channel.
+ pub fn len(&self) -> usize {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.len(),
+ SenderFlavor::List(chan) => chan.len(),
+ SenderFlavor::Zero(chan) => chan.len(),
+ }
+ }
+
+ /// If the channel is bounded, returns its capacity.
+ pub fn capacity(&self) -> Option<usize> {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.capacity(),
+ SenderFlavor::List(chan) => chan.capacity(),
+ SenderFlavor::Zero(chan) => chan.capacity(),
+ }
+ }
+
+ /// Returns `true` if senders belong to the same channel.
+ pub fn same_channel(&self, other: &Sender<T>) -> bool {
+ match (&self.flavor, &other.flavor) {
+ (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
+ (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
+ (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
+ _ => false,
+ }
+ }
+}
+
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ unsafe {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
+ SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
+ SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
+ }
+ }
+ }
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Self {
+ let flavor = match &self.flavor {
+ SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
+ SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
+ SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
+ };
+
+ Sender { flavor }
+ }
+}
+
+impl<T> fmt::Debug for Sender<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Sender { .. }")
+ }
+}
+
+/// The receiving side of a channel.
+pub struct Receiver<T> {
+ flavor: ReceiverFlavor<T>,
+}
+
+/// Receiver flavors.
+enum ReceiverFlavor<T> {
+ /// Bounded channel based on a preallocated array.
+ Array(counter::Receiver<array::Channel<T>>),
+
+ /// Unbounded channel implemented as a linked list.
+ List(counter::Receiver<list::Channel<T>>),
+
+ /// Zero-capacity channel.
+ Zero(counter::Receiver<zero::Channel<T>>),
+}
+
+unsafe impl<T: Send> Send for Receiver<T> {}
+unsafe impl<T: Send> Sync for Receiver<T> {}
+
+impl<T> UnwindSafe for Receiver<T> {}
+impl<T> RefUnwindSafe for Receiver<T> {}
+
+impl<T> Receiver<T> {
+ /// Attempts to receive a message from the channel without blocking.
+ ///
+ /// This method will either receive a message from the channel immediately or return an error
+ /// if the channel is empty.
+ ///
+ /// If called on a zero-capacity channel, this method will receive a message only if there
+ /// happens to be a send operation on the other side of the channel at the same time.
+ pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.try_recv(),
+ ReceiverFlavor::List(chan) => chan.try_recv(),
+ ReceiverFlavor::Zero(chan) => chan.try_recv(),
+ }
+ }
+
+ /// Blocks the current thread until a message is received or the channel is empty and
+ /// disconnected.
+ ///
+ /// If the channel is empty and not disconnected, this call will block until the receive
+ /// operation can proceed. If the channel is empty and becomes disconnected, this call will
+ /// wake up and return an error.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a send operation to appear
+ /// on the other side of the channel.
+ pub fn recv(&self) -> Result<T, RecvError> {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.recv(None),
+ ReceiverFlavor::List(chan) => chan.recv(None),
+ ReceiverFlavor::Zero(chan) => chan.recv(None),
+ }
+ .map_err(|_| RecvError)
+ }
+
+ /// Waits for a message to be received from the channel, but only for a limited time.
+ ///
+ /// If the channel is empty and not disconnected, this call will block until the receive
+ /// operation can proceed or the operation times out. If the channel is empty and becomes
+ /// disconnected, this call will wake up and return an error.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a send operation to appear
+ /// on the other side of the channel.
+ pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
+ match Instant::now().checked_add(timeout) {
+ Some(deadline) => self.recv_deadline(deadline),
+ // So far in the future that it's practically the same as waiting indefinitely.
+ None => self.recv().map_err(RecvTimeoutError::from),
+ }
+ }
+
+ /// Waits for a message to be received from the channel, but only for a limited time.
+ ///
+ /// If the channel is empty and not disconnected, this call will block until the receive
+ /// operation can proceed or the operation times out. If the channel is empty and becomes
+ /// disconnected, this call will wake up and return an error.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a send operation to appear
+ /// on the other side of the channel.
+ pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
+ ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
+ ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
+ }
+ }
+}
+
+// The methods below are not used by `sync::mpsc`, but
+// are useful and we'll likely want to expose them
+// eventually
+#[allow(unused)]
+impl<T> Receiver<T> {
+ /// Returns `true` if the channel is empty.
+ ///
+ /// Note: Zero-capacity channels are always empty.
+ pub fn is_empty(&self) -> bool {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.is_empty(),
+ ReceiverFlavor::List(chan) => chan.is_empty(),
+ ReceiverFlavor::Zero(chan) => chan.is_empty(),
+ }
+ }
+
+ /// Returns `true` if the channel is full.
+ ///
+ /// Note: Zero-capacity channels are always full.
+ pub fn is_full(&self) -> bool {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.is_full(),
+ ReceiverFlavor::List(chan) => chan.is_full(),
+ ReceiverFlavor::Zero(chan) => chan.is_full(),
+ }
+ }
+
+ /// Returns the number of messages in the channel.
+ pub fn len(&self) -> usize {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.len(),
+ ReceiverFlavor::List(chan) => chan.len(),
+ ReceiverFlavor::Zero(chan) => chan.len(),
+ }
+ }
+
+ /// If the channel is bounded, returns its capacity.
+ pub fn capacity(&self) -> Option<usize> {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.capacity(),
+ ReceiverFlavor::List(chan) => chan.capacity(),
+ ReceiverFlavor::Zero(chan) => chan.capacity(),
+ }
+ }
+
+ /// Returns `true` if receivers belong to the same channel.
+ pub fn same_channel(&self, other: &Receiver<T>) -> bool {
+ match (&self.flavor, &other.flavor) {
+ (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
+ (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
+ (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
+ _ => false,
+ }
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ unsafe {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
+ ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
+ ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
+ }
+ }
+ }
+}
+
+impl<T> Clone for Receiver<T> {
+ fn clone(&self) -> Self {
+ let flavor = match &self.flavor {
+ ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
+ ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
+ ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
+ };
+
+ Receiver { flavor }
+ }
+}
+
+impl<T> fmt::Debug for Receiver<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Receiver { .. }")
+ }
+}
diff --git a/library/std/src/sync/mpmc/select.rs b/library/std/src/sync/mpmc/select.rs
new file mode 100644
index 000000000..56a83fee2
--- /dev/null
+++ b/library/std/src/sync/mpmc/select.rs
@@ -0,0 +1,71 @@
+/// Temporary data that gets initialized during a blocking operation, and is consumed by
+/// `read` or `write`.
+///
+/// Each field contains data associated with a specific channel flavor.
+#[derive(Debug, Default)]
+pub struct Token {
+ pub(crate) array: super::array::ArrayToken,
+ pub(crate) list: super::list::ListToken,
+ #[allow(dead_code)]
+ pub(crate) zero: super::zero::ZeroToken,
+}
+
+/// Identifier associated with an operation by a specific thread on a specific channel.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct Operation(usize);
+
+impl Operation {
+ /// Creates an operation identifier from a mutable reference.
+ ///
+ /// This function essentially just turns the address of the reference into a number. The
+ /// reference should point to a variable that is specific to the thread and the operation,
+ /// and is alive for the entire duration of a blocking operation.
+ #[inline]
+ pub fn hook<T>(r: &mut T) -> Operation {
+ let val = r as *mut T as usize;
+ // Make sure that the pointer address doesn't equal the numerical representation of
+ // `Selected::{Waiting, Aborted, Disconnected}`.
+ assert!(val > 2);
+ Operation(val)
+ }
+}
+
+/// Current state of a blocking operation.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum Selected {
+ /// Still waiting for an operation.
+ Waiting,
+
+ /// The attempt to block the current thread has been aborted.
+ Aborted,
+
+ /// An operation became ready because a channel is disconnected.
+ Disconnected,
+
+ /// An operation became ready because a message can be sent or received.
+ Operation(Operation),
+}
+
+impl From<usize> for Selected {
+ #[inline]
+ fn from(val: usize) -> Selected {
+ match val {
+ 0 => Selected::Waiting,
+ 1 => Selected::Aborted,
+ 2 => Selected::Disconnected,
+ oper => Selected::Operation(Operation(oper)),
+ }
+ }
+}
+
+impl Into<usize> for Selected {
+ #[inline]
+ fn into(self) -> usize {
+ match self {
+ Selected::Waiting => 0,
+ Selected::Aborted => 1,
+ Selected::Disconnected => 2,
+ Selected::Operation(Operation(val)) => val,
+ }
+ }
+}
diff --git a/library/std/src/sync/mpmc/utils.rs b/library/std/src/sync/mpmc/utils.rs
new file mode 100644
index 000000000..cfe42750d
--- /dev/null
+++ b/library/std/src/sync/mpmc/utils.rs
@@ -0,0 +1,143 @@
+use crate::cell::Cell;
+use crate::ops::{Deref, DerefMut};
+
+/// Pads and aligns a value to the length of a cache line.
+#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)]
+// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
+// lines at a time, so we have to align to 128 bytes rather than 64.
+//
+// Sources:
+// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
+// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
+//
+// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
+//
+// Sources:
+// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
+//
+// powerpc64 has 128-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
+#[cfg_attr(
+ any(target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64",),
+ repr(align(128))
+)]
+// arm, mips, mips64, and riscv64 have 32-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7
+#[cfg_attr(
+ any(
+ target_arch = "arm",
+ target_arch = "mips",
+ target_arch = "mips64",
+ target_arch = "riscv64",
+ ),
+ repr(align(32))
+)]
+// s390x has 256-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
+#[cfg_attr(target_arch = "s390x", repr(align(256)))]
+// x86 and wasm have 64-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
+//
+// All others are assumed to have 64-byte cache line size.
+#[cfg_attr(
+ not(any(
+ target_arch = "x86_64",
+ target_arch = "aarch64",
+ target_arch = "powerpc64",
+ target_arch = "arm",
+ target_arch = "mips",
+ target_arch = "mips64",
+ target_arch = "riscv64",
+ target_arch = "s390x",
+ )),
+ repr(align(64))
+)]
+pub struct CachePadded<T> {
+ value: T,
+}
+
+impl<T> CachePadded<T> {
+ /// Pads and aligns a value to the length of a cache line.
+ pub fn new(value: T) -> CachePadded<T> {
+ CachePadded::<T> { value }
+ }
+}
+
+impl<T> Deref for CachePadded<T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ &self.value
+ }
+}
+
+impl<T> DerefMut for CachePadded<T> {
+ fn deref_mut(&mut self) -> &mut T {
+ &mut self.value
+ }
+}
+
+const SPIN_LIMIT: u32 = 6;
+
+/// Performs quadratic backoff in spin loops.
+pub struct Backoff {
+ step: Cell<u32>,
+}
+
+impl Backoff {
+ /// Creates a new `Backoff`.
+ pub fn new() -> Self {
+ Backoff { step: Cell::new(0) }
+ }
+
+ /// Backs off using lightweight spinning.
+ ///
+ /// This method should be used for:
+ /// - Retrying an operation because another thread made progress. i.e. on CAS failure.
+ /// - Waiting for an operation to complete by spinning optimistically for a few iterations
+ /// before falling back to parking the thread (see `Backoff::is_completed`).
+ #[inline]
+ pub fn spin_light(&self) {
+ let step = self.step.get().min(SPIN_LIMIT);
+ for _ in 0..step.pow(2) {
+ crate::hint::spin_loop();
+ }
+
+ self.step.set(self.step.get() + 1);
+ }
+
+ /// Backs off using heavyweight spinning.
+ ///
+ /// This method should be used in blocking loops where parking the thread is not an option.
+ #[inline]
+ pub fn spin_heavy(&self) {
+ if self.step.get() <= SPIN_LIMIT {
+ for _ in 0..self.step.get().pow(2) {
+ crate::hint::spin_loop()
+ }
+ } else {
+ crate::thread::yield_now();
+ }
+
+ self.step.set(self.step.get() + 1);
+ }
+
+ /// Returns `true` if quadratic backoff has completed and parking the thread is advised.
+ #[inline]
+ pub fn is_completed(&self) -> bool {
+ self.step.get() > SPIN_LIMIT
+ }
+}
diff --git a/library/std/src/sync/mpmc/waker.rs b/library/std/src/sync/mpmc/waker.rs
new file mode 100644
index 000000000..4912ca4f8
--- /dev/null
+++ b/library/std/src/sync/mpmc/waker.rs
@@ -0,0 +1,204 @@
+//! Waking mechanism for threads blocked on channel operations.
+
+use super::context::Context;
+use super::select::{Operation, Selected};
+
+use crate::ptr;
+use crate::sync::atomic::{AtomicBool, Ordering};
+use crate::sync::Mutex;
+
+/// Represents a thread blocked on a specific channel operation.
+pub(crate) struct Entry {
+ /// The operation.
+ pub(crate) oper: Operation,
+
+ /// Optional packet.
+ pub(crate) packet: *mut (),
+
+ /// Context associated with the thread owning this operation.
+ pub(crate) cx: Context,
+}
+
+/// A queue of threads blocked on channel operations.
+///
+/// This data structure is used by threads to register blocking operations and get woken up once
+/// an operation becomes ready.
+pub(crate) struct Waker {
+ /// A list of select operations.
+ selectors: Vec<Entry>,
+
+ /// A list of operations waiting to be ready.
+ observers: Vec<Entry>,
+}
+
+impl Waker {
+ /// Creates a new `Waker`.
+ #[inline]
+ pub(crate) fn new() -> Self {
+ Waker { selectors: Vec::new(), observers: Vec::new() }
+ }
+
+ /// Registers a select operation.
+ #[inline]
+ pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
+ self.register_with_packet(oper, ptr::null_mut(), cx);
+ }
+
+ /// Registers a select operation and a packet.
+ #[inline]
+ pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
+ self.selectors.push(Entry { oper, packet, cx: cx.clone() });
+ }
+
+ /// Unregisters a select operation.
+ #[inline]
+ pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
+ if let Some((i, _)) =
+ self.selectors.iter().enumerate().find(|&(_, entry)| entry.oper == oper)
+ {
+ let entry = self.selectors.remove(i);
+ Some(entry)
+ } else {
+ None
+ }
+ }
+
+ /// Attempts to find another thread's entry, select the operation, and wake it up.
+ #[inline]
+ pub(crate) fn try_select(&mut self) -> Option<Entry> {
+ self.selectors
+ .iter()
+ .position(|selector| {
+ // Does the entry belong to a different thread?
+ selector.cx.thread_id() != current_thread_id()
+ && selector // Try selecting this operation.
+ .cx
+ .try_select(Selected::Operation(selector.oper))
+ .is_ok()
+ && {
+ // Provide the packet.
+ selector.cx.store_packet(selector.packet);
+ // Wake the thread up.
+ selector.cx.unpark();
+ true
+ }
+ })
+ // Remove the entry from the queue to keep it clean and improve
+ // performance.
+ .map(|pos| self.selectors.remove(pos))
+ }
+
+ /// Notifies all operations waiting to be ready.
+ #[inline]
+ pub(crate) fn notify(&mut self) {
+ for entry in self.observers.drain(..) {
+ if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
+ entry.cx.unpark();
+ }
+ }
+ }
+
+ /// Notifies all registered operations that the channel is disconnected.
+ #[inline]
+ pub(crate) fn disconnect(&mut self) {
+ for entry in self.selectors.iter() {
+ if entry.cx.try_select(Selected::Disconnected).is_ok() {
+ // Wake the thread up.
+ //
+ // Here we don't remove the entry from the queue. Registered threads must
+ // unregister from the waker by themselves. They might also want to recover the
+ // packet value and destroy it, if necessary.
+ entry.cx.unpark();
+ }
+ }
+
+ self.notify();
+ }
+}
+
+impl Drop for Waker {
+ #[inline]
+ fn drop(&mut self) {
+ debug_assert_eq!(self.selectors.len(), 0);
+ debug_assert_eq!(self.observers.len(), 0);
+ }
+}
+
+/// A waker that can be shared among threads without locking.
+///
+/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
+pub(crate) struct SyncWaker {
+ /// The inner `Waker`.
+ inner: Mutex<Waker>,
+
+ /// `true` if the waker is empty.
+ is_empty: AtomicBool,
+}
+
+impl SyncWaker {
+ /// Creates a new `SyncWaker`.
+ #[inline]
+ pub(crate) fn new() -> Self {
+ SyncWaker { inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true) }
+ }
+
+ /// Registers the current thread with an operation.
+ #[inline]
+ pub(crate) fn register(&self, oper: Operation, cx: &Context) {
+ let mut inner = self.inner.lock().unwrap();
+ inner.register(oper, cx);
+ self.is_empty
+ .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
+ }
+
+ /// Unregisters an operation previously registered by the current thread.
+ #[inline]
+ pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
+ let mut inner = self.inner.lock().unwrap();
+ let entry = inner.unregister(oper);
+ self.is_empty
+ .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
+ entry
+ }
+
+ /// Attempts to find one thread (not the current one), select its operation, and wake it up.
+ #[inline]
+ pub(crate) fn notify(&self) {
+ if !self.is_empty.load(Ordering::SeqCst) {
+ let mut inner = self.inner.lock().unwrap();
+ if !self.is_empty.load(Ordering::SeqCst) {
+ inner.try_select();
+ inner.notify();
+ self.is_empty.store(
+ inner.selectors.is_empty() && inner.observers.is_empty(),
+ Ordering::SeqCst,
+ );
+ }
+ }
+ }
+
+ /// Notifies all threads that the channel is disconnected.
+ #[inline]
+ pub(crate) fn disconnect(&self) {
+ let mut inner = self.inner.lock().unwrap();
+ inner.disconnect();
+ self.is_empty
+ .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
+ }
+}
+
+impl Drop for SyncWaker {
+ #[inline]
+ fn drop(&mut self) {
+ debug_assert!(self.is_empty.load(Ordering::SeqCst));
+ }
+}
+
+/// Returns a unique id for the current thread.
+#[inline]
+pub fn current_thread_id() -> usize {
+ // `u8` is not drop so this variable will be available during thread destruction,
+ // whereas `thread::current()` would not be
+ thread_local! { static DUMMY: u8 = 0 }
+ DUMMY.with(|x| (x as *const u8).addr())
+}
diff --git a/library/std/src/sync/mpmc/zero.rs b/library/std/src/sync/mpmc/zero.rs
new file mode 100644
index 000000000..33f768dcb
--- /dev/null
+++ b/library/std/src/sync/mpmc/zero.rs
@@ -0,0 +1,318 @@
+//! Zero-capacity channel.
+//!
+//! This kind of channel is also known as *rendezvous* channel.
+
+use super::context::Context;
+use super::error::*;
+use super::select::{Operation, Selected, Token};
+use super::utils::Backoff;
+use super::waker::Waker;
+
+use crate::cell::UnsafeCell;
+use crate::marker::PhantomData;
+use crate::sync::atomic::{AtomicBool, Ordering};
+use crate::sync::Mutex;
+use crate::time::Instant;
+use crate::{fmt, ptr};
+
+/// A pointer to a packet.
+pub(crate) struct ZeroToken(*mut ());
+
+impl Default for ZeroToken {
+ fn default() -> Self {
+ Self(ptr::null_mut())
+ }
+}
+
+impl fmt::Debug for ZeroToken {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Debug::fmt(&(self.0 as usize), f)
+ }
+}
+
+/// A slot for passing one message from a sender to a receiver.
+struct Packet<T> {
+ /// Equals `true` if the packet is allocated on the stack.
+ on_stack: bool,
+
+ /// Equals `true` once the packet is ready for reading or writing.
+ ready: AtomicBool,
+
+ /// The message.
+ msg: UnsafeCell<Option<T>>,
+}
+
+impl<T> Packet<T> {
+ /// Creates an empty packet on the stack.
+ fn empty_on_stack() -> Packet<T> {
+ Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(None) }
+ }
+
+ /// Creates a packet on the stack, containing a message.
+ fn message_on_stack(msg: T) -> Packet<T> {
+ Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(Some(msg)) }
+ }
+
+ /// Waits until the packet becomes ready for reading or writing.
+ fn wait_ready(&self) {
+ let backoff = Backoff::new();
+ while !self.ready.load(Ordering::Acquire) {
+ backoff.spin_heavy();
+ }
+ }
+}
+
+/// Inner representation of a zero-capacity channel.
+struct Inner {
+ /// Senders waiting to pair up with a receive operation.
+ senders: Waker,
+
+ /// Receivers waiting to pair up with a send operation.
+ receivers: Waker,
+
+ /// Equals `true` when the channel is disconnected.
+ is_disconnected: bool,
+}
+
+/// Zero-capacity channel.
+pub(crate) struct Channel<T> {
+ /// Inner representation of the channel.
+ inner: Mutex<Inner>,
+
+ /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
+ _marker: PhantomData<T>,
+}
+
+impl<T> Channel<T> {
+ /// Constructs a new zero-capacity channel.
+ pub(crate) fn new() -> Self {
+ Channel {
+ inner: Mutex::new(Inner {
+ senders: Waker::new(),
+ receivers: Waker::new(),
+ is_disconnected: false,
+ }),
+ _marker: PhantomData,
+ }
+ }
+
+ /// Writes a message into the packet.
+ pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ // If there is no packet, the channel is disconnected.
+ if token.zero.0.is_null() {
+ return Err(msg);
+ }
+
+ let packet = &*(token.zero.0 as *const Packet<T>);
+ packet.msg.get().write(Some(msg));
+ packet.ready.store(true, Ordering::Release);
+ Ok(())
+ }
+
+ /// Reads a message from the packet.
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ // If there is no packet, the channel is disconnected.
+ if token.zero.0.is_null() {
+ return Err(());
+ }
+
+ let packet = &*(token.zero.0 as *const Packet<T>);
+
+ if packet.on_stack {
+ // The message has been in the packet from the beginning, so there is no need to wait
+ // for it. However, after reading the message, we need to set `ready` to `true` in
+ // order to signal that the packet can be destroyed.
+ let msg = packet.msg.get().replace(None).unwrap();
+ packet.ready.store(true, Ordering::Release);
+ Ok(msg)
+ } else {
+ // Wait until the message becomes available, then read it and destroy the
+ // heap-allocated packet.
+ packet.wait_ready();
+ let msg = packet.msg.get().replace(None).unwrap();
+ drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
+ Ok(msg)
+ }
+ }
+
+ /// Attempts to send a message into the channel.
+ pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock().unwrap();
+
+ // If there's a waiting receiver, pair up with it.
+ if let Some(operation) = inner.receivers.try_select() {
+ token.zero.0 = operation.packet;
+ drop(inner);
+ unsafe {
+ self.write(token, msg).ok().unwrap();
+ }
+ Ok(())
+ } else if inner.is_disconnected {
+ Err(TrySendError::Disconnected(msg))
+ } else {
+ Err(TrySendError::Full(msg))
+ }
+ }
+
+ /// Sends a message into the channel.
+ pub(crate) fn send(
+ &self,
+ msg: T,
+ deadline: Option<Instant>,
+ ) -> Result<(), SendTimeoutError<T>> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock().unwrap();
+
+ // If there's a waiting receiver, pair up with it.
+ if let Some(operation) = inner.receivers.try_select() {
+ token.zero.0 = operation.packet;
+ drop(inner);
+ unsafe {
+ self.write(token, msg).ok().unwrap();
+ }
+ return Ok(());
+ }
+
+ if inner.is_disconnected {
+ return Err(SendTimeoutError::Disconnected(msg));
+ }
+
+ Context::with(|cx| {
+ // Prepare for blocking until a receiver wakes us up.
+ let oper = Operation::hook(token);
+ let mut packet = Packet::<T>::message_on_stack(msg);
+ inner.senders.register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
+ inner.receivers.notify();
+ drop(inner);
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted => {
+ self.inner.lock().unwrap().senders.unregister(oper).unwrap();
+ let msg = unsafe { packet.msg.get().replace(None).unwrap() };
+ Err(SendTimeoutError::Timeout(msg))
+ }
+ Selected::Disconnected => {
+ self.inner.lock().unwrap().senders.unregister(oper).unwrap();
+ let msg = unsafe { packet.msg.get().replace(None).unwrap() };
+ Err(SendTimeoutError::Disconnected(msg))
+ }
+ Selected::Operation(_) => {
+ // Wait until the message is read, then drop the packet.
+ packet.wait_ready();
+ Ok(())
+ }
+ }
+ })
+ }
+
+ /// Attempts to receive a message without blocking.
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock().unwrap();
+
+ // If there's a waiting sender, pair up with it.
+ if let Some(operation) = inner.senders.try_select() {
+ token.zero.0 = operation.packet;
+ drop(inner);
+ unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
+ } else if inner.is_disconnected {
+ Err(TryRecvError::Disconnected)
+ } else {
+ Err(TryRecvError::Empty)
+ }
+ }
+
+ /// Receives a message from the channel.
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock().unwrap();
+
+ // If there's a waiting sender, pair up with it.
+ if let Some(operation) = inner.senders.try_select() {
+ token.zero.0 = operation.packet;
+ drop(inner);
+ unsafe {
+ return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
+ }
+ }
+
+ if inner.is_disconnected {
+ return Err(RecvTimeoutError::Disconnected);
+ }
+
+ Context::with(|cx| {
+ // Prepare for blocking until a sender wakes us up.
+ let oper = Operation::hook(token);
+ let mut packet = Packet::<T>::empty_on_stack();
+ inner.receivers.register_with_packet(
+ oper,
+ &mut packet as *mut Packet<T> as *mut (),
+ cx,
+ );
+ inner.senders.notify();
+ drop(inner);
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted => {
+ self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
+ Err(RecvTimeoutError::Timeout)
+ }
+ Selected::Disconnected => {
+ self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
+ Err(RecvTimeoutError::Disconnected)
+ }
+ Selected::Operation(_) => {
+ // Wait until the message is provided, then read it.
+ packet.wait_ready();
+ unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
+ }
+ }
+ })
+ }
+
+ /// Disconnects the channel and wakes up all blocked senders and receivers.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ pub(crate) fn disconnect(&self) -> bool {
+ let mut inner = self.inner.lock().unwrap();
+
+ if !inner.is_disconnected {
+ inner.is_disconnected = true;
+ inner.senders.disconnect();
+ inner.receivers.disconnect();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Returns the current number of messages inside the channel.
+ pub(crate) fn len(&self) -> usize {
+ 0
+ }
+
+ /// Returns the capacity of the channel.
+ #[allow(clippy::unnecessary_wraps)] // This is intentional.
+ pub(crate) fn capacity(&self) -> Option<usize> {
+ Some(0)
+ }
+
+ /// Returns `true` if the channel is empty.
+ pub(crate) fn is_empty(&self) -> bool {
+ true
+ }
+
+ /// Returns `true` if the channel is full.
+ pub(crate) fn is_full(&self) -> bool {
+ true
+ }
+}
diff --git a/library/std/src/sync/mpsc/blocking.rs b/library/std/src/sync/mpsc/blocking.rs
deleted file mode 100644
index 021df7b09..000000000
--- a/library/std/src/sync/mpsc/blocking.rs
+++ /dev/null
@@ -1,82 +0,0 @@
-//! Generic support for building blocking abstractions.
-
-use crate::sync::atomic::{AtomicBool, Ordering};
-use crate::sync::Arc;
-use crate::thread::{self, Thread};
-use crate::time::Instant;
-
-struct Inner {
- thread: Thread,
- woken: AtomicBool,
-}
-
-unsafe impl Send for Inner {}
-unsafe impl Sync for Inner {}
-
-#[derive(Clone)]
-pub struct SignalToken {
- inner: Arc<Inner>,
-}
-
-pub struct WaitToken {
- inner: Arc<Inner>,
-}
-
-impl !Send for WaitToken {}
-
-impl !Sync for WaitToken {}
-
-pub fn tokens() -> (WaitToken, SignalToken) {
- let inner = Arc::new(Inner { thread: thread::current(), woken: AtomicBool::new(false) });
- let wait_token = WaitToken { inner: inner.clone() };
- let signal_token = SignalToken { inner };
- (wait_token, signal_token)
-}
-
-impl SignalToken {
- pub fn signal(&self) -> bool {
- let wake = self
- .inner
- .woken
- .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
- .is_ok();
- if wake {
- self.inner.thread.unpark();
- }
- wake
- }
-
- /// Converts to an unsafe raw pointer. Useful for storing in a pipe's state
- /// flag.
- #[inline]
- pub unsafe fn to_raw(self) -> *mut u8 {
- Arc::into_raw(self.inner) as *mut u8
- }
-
- /// Converts from an unsafe raw pointer. Useful for retrieving a pipe's state
- /// flag.
- #[inline]
- pub unsafe fn from_raw(signal_ptr: *mut u8) -> SignalToken {
- SignalToken { inner: Arc::from_raw(signal_ptr as *mut Inner) }
- }
-}
-
-impl WaitToken {
- pub fn wait(self) {
- while !self.inner.woken.load(Ordering::SeqCst) {
- thread::park()
- }
- }
-
- /// Returns `true` if we wake up normally.
- pub fn wait_max_until(self, end: Instant) -> bool {
- while !self.inner.woken.load(Ordering::SeqCst) {
- let now = Instant::now();
- if now >= end {
- return false;
- }
- thread::park_timeout(end - now)
- }
- true
- }
-}
diff --git a/library/std/src/sync/mpsc/cache_aligned.rs b/library/std/src/sync/mpsc/cache_aligned.rs
deleted file mode 100644
index 9197f0d6e..000000000
--- a/library/std/src/sync/mpsc/cache_aligned.rs
+++ /dev/null
@@ -1,25 +0,0 @@
-use crate::ops::{Deref, DerefMut};
-
-#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
-#[cfg_attr(target_arch = "aarch64", repr(align(128)))]
-#[cfg_attr(not(target_arch = "aarch64"), repr(align(64)))]
-pub(super) struct CacheAligned<T>(pub T);
-
-impl<T> Deref for CacheAligned<T> {
- type Target = T;
- fn deref(&self) -> &Self::Target {
- &self.0
- }
-}
-
-impl<T> DerefMut for CacheAligned<T> {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
- }
-}
-
-impl<T> CacheAligned<T> {
- pub(super) fn new(t: T) -> Self {
- CacheAligned(t)
- }
-}
diff --git a/library/std/src/sync/mpsc/mod.rs b/library/std/src/sync/mpsc/mod.rs
index e85a87239..6e3c28f10 100644
--- a/library/std/src/sync/mpsc/mod.rs
+++ b/library/std/src/sync/mpsc/mod.rs
@@ -143,175 +143,16 @@ mod tests;
#[cfg(all(test, not(target_os = "emscripten")))]
mod sync_tests;
-// A description of how Rust's channel implementation works
-//
-// Channels are supposed to be the basic building block for all other
-// concurrent primitives that are used in Rust. As a result, the channel type
-// needs to be highly optimized, flexible, and broad enough for use everywhere.
-//
-// The choice of implementation of all channels is to be built on lock-free data
-// structures. The channels themselves are then consequently also lock-free data
-// structures. As always with lock-free code, this is a very "here be dragons"
-// territory, especially because I'm unaware of any academic papers that have
-// gone into great length about channels of these flavors.
-//
-// ## Flavors of channels
-//
-// From the perspective of a consumer of this library, there is only one flavor
-// of channel. This channel can be used as a stream and cloned to allow multiple
-// senders. Under the hood, however, there are actually three flavors of
-// channels in play.
-//
-// * Flavor::Oneshots - these channels are highly optimized for the one-send use
-// case. They contain as few atomics as possible and
-// involve one and exactly one allocation.
-// * Streams - these channels are optimized for the non-shared use case. They
-// use a different concurrent queue that is more tailored for this
-// use case. The initial allocation of this flavor of channel is not
-// optimized.
-// * Shared - this is the most general form of channel that this module offers,
-// a channel with multiple senders. This type is as optimized as it
-// can be, but the previous two types mentioned are much faster for
-// their use-cases.
-//
-// ## Concurrent queues
-//
-// The basic idea of Rust's Sender/Receiver types is that send() never blocks,
-// but recv() obviously blocks. This means that under the hood there must be
-// some shared and concurrent queue holding all of the actual data.
-//
-// With two flavors of channels, two flavors of queues are also used. We have
-// chosen to use queues from a well-known author that are abbreviated as SPSC
-// and MPSC (single producer, single consumer and multiple producer, single
-// consumer). SPSC queues are used for streams while MPSC queues are used for
-// shared channels.
-//
-// ### SPSC optimizations
-//
-// The SPSC queue found online is essentially a linked list of nodes where one
-// half of the nodes are the "queue of data" and the other half of nodes are a
-// cache of unused nodes. The unused nodes are used such that an allocation is
-// not required on every push() and a free doesn't need to happen on every
-// pop().
-//
-// As found online, however, the cache of nodes is of an infinite size. This
-// means that if a channel at one point in its life had 50k items in the queue,
-// then the queue will always have the capacity for 50k items. I believed that
-// this was an unnecessary limitation of the implementation, so I have altered
-// the queue to optionally have a bound on the cache size.
-//
-// By default, streams will have an unbounded SPSC queue with a small-ish cache
-// size. The hope is that the cache is still large enough to have very fast
-// send() operations while not too large such that millions of channels can
-// coexist at once.
-//
-// ### MPSC optimizations
-//
-// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
-// a linked list under the hood to earn its unboundedness, but I have not put
-// forth much effort into having a cache of nodes similar to the SPSC queue.
-//
-// For now, I believe that this is "ok" because shared channels are not the most
-// common type, but soon we may wish to revisit this queue choice and determine
-// another candidate for backend storage of shared channels.
-//
-// ## Overview of the Implementation
-//
-// Now that there's a little background on the concurrent queues used, it's
-// worth going into much more detail about the channels themselves. The basic
-// pseudocode for a send/recv are:
-//
-//
-// send(t) recv()
-// queue.push(t) return if queue.pop()
-// if increment() == -1 deschedule {
-// wakeup() if decrement() > 0
-// cancel_deschedule()
-// }
-// queue.pop()
-//
-// As mentioned before, there are no locks in this implementation, only atomic
-// instructions are used.
-//
-// ### The internal atomic counter
-//
-// Every channel has a shared counter with each half to keep track of the size
-// of the queue. This counter is used to abort descheduling by the receiver and
-// to know when to wake up on the sending side.
-//
-// As seen in the pseudocode, senders will increment this count and receivers
-// will decrement the count. The theory behind this is that if a sender sees a
-// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
-// then it doesn't need to block.
-//
-// The recv() method has a beginning call to pop(), and if successful, it needs
-// to decrement the count. It is a crucial implementation detail that this
-// decrement does *not* happen to the shared counter. If this were the case,
-// then it would be possible for the counter to be very negative when there were
-// no receivers waiting, in which case the senders would have to determine when
-// it was actually appropriate to wake up a receiver.
-//
-// Instead, the "steal count" is kept track of separately (not atomically
-// because it's only used by receivers), and then the decrement() call when
-// descheduling will lump in all of the recent steals into one large decrement.
-//
-// The implication of this is that if a sender sees a -1 count, then there's
-// guaranteed to be a waiter waiting!
-//
-// ## Native Implementation
-//
-// A major goal of these channels is to work seamlessly on and off the runtime.
-// All of the previous race conditions have been worded in terms of
-// scheduler-isms (which is obviously not available without the runtime).
-//
-// For now, native usage of channels (off the runtime) will fall back onto
-// mutexes/cond vars for descheduling/atomic decisions. The no-contention path
-// is still entirely lock-free, the "deschedule" blocks above are surrounded by
-// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
-// condition variable.
-//
-// ## Select
-//
-// Being able to support selection over channels has greatly influenced this
-// design, and not only does selection need to work inside the runtime, but also
-// outside the runtime.
-//
-// The implementation is fairly straightforward. The goal of select() is not to
-// return some data, but only to return which channel can receive data without
-// blocking. The implementation is essentially the entire blocking procedure
-// followed by an increment as soon as its woken up. The cancellation procedure
-// involves an increment and swapping out of to_wake to acquire ownership of the
-// thread to unblock.
-//
-// Sadly this current implementation requires multiple allocations, so I have
-// seen the throughput of select() be much worse than it should be. I do not
-// believe that there is anything fundamental that needs to change about these
-// channels, however, in order to support a more efficient select().
-//
-// FIXME: Select is now removed, so these factors are ready to be cleaned up!
-//
-// # Conclusion
-//
-// And now that you've seen all the races that I found and attempted to fix,
-// here's the code for you to find some more!
-
-use crate::cell::UnsafeCell;
+// MPSC channels are built as a wrapper around MPMC channels, which
+// were ported from the `crossbeam-channel` crate. MPMC channels are
+// not exposed publicly, but if you are curious about the implementation,
+// that's where everything is.
+
use crate::error;
use crate::fmt;
-use crate::mem;
-use crate::sync::Arc;
+use crate::sync::mpmc;
use crate::time::{Duration, Instant};
-mod blocking;
-mod mpsc_queue;
-mod oneshot;
-mod shared;
-mod spsc_queue;
-mod stream;
-mod sync;
-
-mod cache_aligned;
-
/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
/// This half can only be owned by one thread.
///
@@ -341,7 +182,7 @@ mod cache_aligned;
#[stable(feature = "rust1", since = "1.0.0")]
#[cfg_attr(not(test), rustc_diagnostic_item = "Receiver")]
pub struct Receiver<T> {
- inner: UnsafeCell<Flavor<T>>,
+ inner: mpmc::Receiver<T>,
}
// The receiver port can be sent from place to place, so long as it
@@ -498,7 +339,7 @@ pub struct IntoIter<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct Sender<T> {
- inner: UnsafeCell<Flavor<T>>,
+ inner: mpmc::Sender<T>,
}
// The send port can be sent from place to place, so long as it
@@ -557,7 +398,7 @@ impl<T> !Sync for Sender<T> {}
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct SyncSender<T> {
- inner: Arc<sync::Packet<T>>,
+ inner: mpmc::Sender<T>,
}
#[stable(feature = "rust1", since = "1.0.0")]
@@ -643,34 +484,6 @@ pub enum TrySendError<T> {
Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
}
-enum Flavor<T> {
- Oneshot(Arc<oneshot::Packet<T>>),
- Stream(Arc<stream::Packet<T>>),
- Shared(Arc<shared::Packet<T>>),
- Sync(Arc<sync::Packet<T>>),
-}
-
-#[doc(hidden)]
-trait UnsafeFlavor<T> {
- fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
- unsafe fn inner_mut(&self) -> &mut Flavor<T> {
- &mut *self.inner_unsafe().get()
- }
- unsafe fn inner(&self) -> &Flavor<T> {
- &*self.inner_unsafe().get()
- }
-}
-impl<T> UnsafeFlavor<T> for Sender<T> {
- fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
- &self.inner
- }
-}
-impl<T> UnsafeFlavor<T> for Receiver<T> {
- fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
- &self.inner
- }
-}
-
/// Creates a new asynchronous channel, returning the sender/receiver halves.
/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
/// the same order as it was sent, and no [`send`] will block the calling thread
@@ -711,8 +524,8 @@ impl<T> UnsafeFlavor<T> for Receiver<T> {
#[must_use]
#[stable(feature = "rust1", since = "1.0.0")]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
- let a = Arc::new(oneshot::Packet::new());
- (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
+ let (tx, rx) = mpmc::channel();
+ (Sender { inner: tx }, Receiver { inner: rx })
}
/// Creates a new synchronous, bounded channel.
@@ -760,8 +573,8 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
#[must_use]
#[stable(feature = "rust1", since = "1.0.0")]
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
- let a = Arc::new(sync::Packet::new(bound));
- (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
+ let (tx, rx) = mpmc::sync_channel(bound);
+ (SyncSender { inner: tx }, Receiver { inner: rx })
}
////////////////////////////////////////////////////////////////////////////////
@@ -769,10 +582,6 @@ pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
////////////////////////////////////////////////////////////////////////////////
impl<T> Sender<T> {
- fn new(inner: Flavor<T>) -> Sender<T> {
- Sender { inner: UnsafeCell::new(inner) }
- }
-
/// Attempts to send a value on this channel, returning it back if it could
/// not be sent.
///
@@ -802,40 +611,7 @@ impl<T> Sender<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
- let (new_inner, ret) = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => {
- if !p.sent() {
- return p.send(t).map_err(SendError);
- } else {
- let a = Arc::new(stream::Packet::new());
- let rx = Receiver::new(Flavor::Stream(a.clone()));
- match p.upgrade(rx) {
- oneshot::UpSuccess => {
- let ret = a.send(t);
- (a, ret)
- }
- oneshot::UpDisconnected => (a, Err(t)),
- oneshot::UpWoke(token) => {
- // This send cannot panic because the thread is
- // asleep (we're looking at it), so the receiver
- // can't go away.
- a.send(t).ok().unwrap();
- token.signal();
- (a, Ok(()))
- }
- }
- }
- }
- Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
- Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
- Flavor::Sync(..) => unreachable!(),
- };
-
- unsafe {
- let tmp = Sender::new(Flavor::Stream(new_inner));
- mem::swap(self.inner_mut(), tmp.inner_mut());
- }
- ret.map_err(SendError)
+ self.inner.send(t)
}
}
@@ -847,58 +623,13 @@ impl<T> Clone for Sender<T> {
/// (including the original) need to be dropped in order for
/// [`Receiver::recv`] to stop blocking.
fn clone(&self) -> Sender<T> {
- let packet = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => {
- let a = Arc::new(shared::Packet::new());
- {
- let guard = a.postinit_lock();
- let rx = Receiver::new(Flavor::Shared(a.clone()));
- let sleeper = match p.upgrade(rx) {
- oneshot::UpSuccess | oneshot::UpDisconnected => None,
- oneshot::UpWoke(task) => Some(task),
- };
- a.inherit_blocker(sleeper, guard);
- }
- a
- }
- Flavor::Stream(ref p) => {
- let a = Arc::new(shared::Packet::new());
- {
- let guard = a.postinit_lock();
- let rx = Receiver::new(Flavor::Shared(a.clone()));
- let sleeper = match p.upgrade(rx) {
- stream::UpSuccess | stream::UpDisconnected => None,
- stream::UpWoke(task) => Some(task),
- };
- a.inherit_blocker(sleeper, guard);
- }
- a
- }
- Flavor::Shared(ref p) => {
- p.clone_chan();
- return Sender::new(Flavor::Shared(p.clone()));
- }
- Flavor::Sync(..) => unreachable!(),
- };
-
- unsafe {
- let tmp = Sender::new(Flavor::Shared(packet.clone()));
- mem::swap(self.inner_mut(), tmp.inner_mut());
- }
- Sender::new(Flavor::Shared(packet))
+ Sender { inner: self.inner.clone() }
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for Sender<T> {
- fn drop(&mut self) {
- match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => p.drop_chan(),
- Flavor::Stream(ref p) => p.drop_chan(),
- Flavor::Shared(ref p) => p.drop_chan(),
- Flavor::Sync(..) => unreachable!(),
- }
- }
+ fn drop(&mut self) {}
}
#[stable(feature = "mpsc_debug", since = "1.8.0")]
@@ -913,10 +644,6 @@ impl<T> fmt::Debug for Sender<T> {
////////////////////////////////////////////////////////////////////////////////
impl<T> SyncSender<T> {
- fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
- SyncSender { inner }
- }
-
/// Sends a value on this synchronous channel.
///
/// This function will *block* until space in the internal buffer becomes
@@ -955,7 +682,7 @@ impl<T> SyncSender<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
- self.inner.send(t).map_err(SendError)
+ self.inner.send(t)
}
/// Attempts to send a value on this channel without blocking.
@@ -1011,21 +738,27 @@ impl<T> SyncSender<T> {
pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
self.inner.try_send(t)
}
+
+ // Attempts to send for a value on this receiver, returning an error if the
+ // corresponding channel has hung up, or if it waits more than `timeout`.
+ //
+ // This method is currently private and only used for tests.
+ #[allow(unused)]
+ fn send_timeout(&self, t: T, timeout: Duration) -> Result<(), mpmc::SendTimeoutError<T>> {
+ self.inner.send_timeout(t, timeout)
+ }
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Clone for SyncSender<T> {
fn clone(&self) -> SyncSender<T> {
- self.inner.clone_chan();
- SyncSender::new(self.inner.clone())
+ SyncSender { inner: self.inner.clone() }
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for SyncSender<T> {
- fn drop(&mut self) {
- self.inner.drop_chan();
- }
+ fn drop(&mut self) {}
}
#[stable(feature = "mpsc_debug", since = "1.8.0")]
@@ -1040,10 +773,6 @@ impl<T> fmt::Debug for SyncSender<T> {
////////////////////////////////////////////////////////////////////////////////
impl<T> Receiver<T> {
- fn new(inner: Flavor<T>) -> Receiver<T> {
- Receiver { inner: UnsafeCell::new(inner) }
- }
-
/// Attempts to return a pending value on this receiver without blocking.
///
/// This method will never block the caller in order to wait for data to
@@ -1069,35 +798,7 @@ impl<T> Receiver<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn try_recv(&self) -> Result<T, TryRecvError> {
- loop {
- let new_port = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(oneshot::Empty) => return Err(TryRecvError::Empty),
- Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
- Err(oneshot::Upgraded(rx)) => rx,
- },
- Flavor::Stream(ref p) => match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(stream::Empty) => return Err(TryRecvError::Empty),
- Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
- Err(stream::Upgraded(rx)) => rx,
- },
- Flavor::Shared(ref p) => match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(shared::Empty) => return Err(TryRecvError::Empty),
- Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
- },
- Flavor::Sync(ref p) => match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(sync::Empty) => return Err(TryRecvError::Empty),
- Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
- },
- };
- unsafe {
- mem::swap(self.inner_mut(), new_port.inner_mut());
- }
- }
+ self.inner.try_recv()
}
/// Attempts to wait for a value on this receiver, returning an error if the
@@ -1156,31 +857,7 @@ impl<T> Receiver<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn recv(&self) -> Result<T, RecvError> {
- loop {
- let new_port = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => match p.recv(None) {
- Ok(t) => return Ok(t),
- Err(oneshot::Disconnected) => return Err(RecvError),
- Err(oneshot::Upgraded(rx)) => rx,
- Err(oneshot::Empty) => unreachable!(),
- },
- Flavor::Stream(ref p) => match p.recv(None) {
- Ok(t) => return Ok(t),
- Err(stream::Disconnected) => return Err(RecvError),
- Err(stream::Upgraded(rx)) => rx,
- Err(stream::Empty) => unreachable!(),
- },
- Flavor::Shared(ref p) => match p.recv(None) {
- Ok(t) => return Ok(t),
- Err(shared::Disconnected) => return Err(RecvError),
- Err(shared::Empty) => unreachable!(),
- },
- Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
- };
- unsafe {
- mem::swap(self.inner_mut(), new_port.inner_mut());
- }
- }
+ self.inner.recv()
}
/// Attempts to wait for a value on this receiver, returning an error if the
@@ -1198,34 +875,6 @@ impl<T> Receiver<T> {
/// However, since channels are buffered, messages sent before the disconnect
/// will still be properly received.
///
- /// # Known Issues
- ///
- /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
- /// to panic unexpectedly with the following example:
- ///
- /// ```no_run
- /// use std::sync::mpsc::channel;
- /// use std::thread;
- /// use std::time::Duration;
- ///
- /// let (tx, rx) = channel::<String>();
- ///
- /// thread::spawn(move || {
- /// let d = Duration::from_millis(10);
- /// loop {
- /// println!("recv");
- /// let _r = rx.recv_timeout(d);
- /// }
- /// });
- ///
- /// thread::sleep(Duration::from_millis(100));
- /// let _c1 = tx.clone();
- ///
- /// thread::sleep(Duration::from_secs(1));
- /// ```
- ///
- /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
- ///
/// # Examples
///
/// Successfully receiving value before encountering timeout:
@@ -1268,17 +917,7 @@ impl<T> Receiver<T> {
/// ```
#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
- // Do an optimistic try_recv to avoid the performance impact of
- // Instant::now() in the full-channel case.
- match self.try_recv() {
- Ok(result) => Ok(result),
- Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
- Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
- Some(deadline) => self.recv_deadline(deadline),
- // So far in the future that it's practically the same as waiting indefinitely.
- None => self.recv().map_err(RecvTimeoutError::from),
- },
- }
+ self.inner.recv_timeout(timeout)
}
/// Attempts to wait for a value on this receiver, returning an error if the
@@ -1339,46 +978,7 @@ impl<T> Receiver<T> {
/// ```
#[unstable(feature = "deadline_api", issue = "46316")]
pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
- use self::RecvTimeoutError::*;
-
- loop {
- let port_or_empty = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(oneshot::Disconnected) => return Err(Disconnected),
- Err(oneshot::Upgraded(rx)) => Some(rx),
- Err(oneshot::Empty) => None,
- },
- Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(stream::Disconnected) => return Err(Disconnected),
- Err(stream::Upgraded(rx)) => Some(rx),
- Err(stream::Empty) => None,
- },
- Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(shared::Disconnected) => return Err(Disconnected),
- Err(shared::Empty) => None,
- },
- Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(sync::Disconnected) => return Err(Disconnected),
- Err(sync::Empty) => None,
- },
- };
-
- if let Some(new_port) = port_or_empty {
- unsafe {
- mem::swap(self.inner_mut(), new_port.inner_mut());
- }
- }
-
- // If we're already passed the deadline, and we're here without
- // data, return a timeout, else try again.
- if Instant::now() >= deadline {
- return Err(Timeout);
- }
- }
+ self.inner.recv_deadline(deadline)
}
/// Returns an iterator that will block waiting for messages, but never
@@ -1499,14 +1099,7 @@ impl<T> IntoIterator for Receiver<T> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for Receiver<T> {
- fn drop(&mut self) {
- match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => p.drop_port(),
- Flavor::Stream(ref p) => p.drop_port(),
- Flavor::Shared(ref p) => p.drop_port(),
- Flavor::Sync(ref p) => p.drop_port(),
- }
- }
+ fn drop(&mut self) {}
}
#[stable(feature = "mpsc_debug", since = "1.8.0")]
diff --git a/library/std/src/sync/mpsc/mpsc_queue.rs b/library/std/src/sync/mpsc/mpsc_queue.rs
deleted file mode 100644
index cdd64a5de..000000000
--- a/library/std/src/sync/mpsc/mpsc_queue.rs
+++ /dev/null
@@ -1,117 +0,0 @@
-//! A mostly lock-free multi-producer, single consumer queue.
-//!
-//! This module contains an implementation of a concurrent MPSC queue. This
-//! queue can be used to share data between threads, and is also used as the
-//! building block of channels in rust.
-//!
-//! Note that the current implementation of this queue has a caveat of the `pop`
-//! method, and see the method for more information about it. Due to this
-//! caveat, this queue might not be appropriate for all use-cases.
-
-// https://www.1024cores.net/home/lock-free-algorithms
-// /queues/non-intrusive-mpsc-node-based-queue
-
-#[cfg(all(test, not(target_os = "emscripten")))]
-mod tests;
-
-pub use self::PopResult::*;
-
-use core::cell::UnsafeCell;
-use core::ptr;
-
-use crate::boxed::Box;
-use crate::sync::atomic::{AtomicPtr, Ordering};
-
-/// A result of the `pop` function.
-pub enum PopResult<T> {
- /// Some data has been popped
- Data(T),
- /// The queue is empty
- Empty,
- /// The queue is in an inconsistent state. Popping data should succeed, but
- /// some pushers have yet to make enough progress in order allow a pop to
- /// succeed. It is recommended that a pop() occur "in the near future" in
- /// order to see if the sender has made progress or not
- Inconsistent,
-}
-
-struct Node<T> {
- next: AtomicPtr<Node<T>>,
- value: Option<T>,
-}
-
-/// The multi-producer single-consumer structure. This is not cloneable, but it
-/// may be safely shared so long as it is guaranteed that there is only one
-/// popper at a time (many pushers are allowed).
-pub struct Queue<T> {
- head: AtomicPtr<Node<T>>,
- tail: UnsafeCell<*mut Node<T>>,
-}
-
-unsafe impl<T: Send> Send for Queue<T> {}
-unsafe impl<T: Send> Sync for Queue<T> {}
-
-impl<T> Node<T> {
- unsafe fn new(v: Option<T>) -> *mut Node<T> {
- Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v })
- }
-}
-
-impl<T> Queue<T> {
- /// Creates a new queue that is safe to share among multiple producers and
- /// one consumer.
- pub fn new() -> Queue<T> {
- let stub = unsafe { Node::new(None) };
- Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) }
- }
-
- /// Pushes a new value onto this queue.
- pub fn push(&self, t: T) {
- unsafe {
- let n = Node::new(Some(t));
- let prev = self.head.swap(n, Ordering::AcqRel);
- (*prev).next.store(n, Ordering::Release);
- }
- }
-
- /// Pops some data from this queue.
- ///
- /// Note that the current implementation means that this function cannot
- /// return `Option<T>`. It is possible for this queue to be in an
- /// inconsistent state where many pushes have succeeded and completely
- /// finished, but pops cannot return `Some(t)`. This inconsistent state
- /// happens when a pusher is pre-empted at an inopportune moment.
- ///
- /// This inconsistent state means that this queue does indeed have data, but
- /// it does not currently have access to it at this time.
- pub fn pop(&self) -> PopResult<T> {
- unsafe {
- let tail = *self.tail.get();
- let next = (*tail).next.load(Ordering::Acquire);
-
- if !next.is_null() {
- *self.tail.get() = next;
- assert!((*tail).value.is_none());
- assert!((*next).value.is_some());
- let ret = (*next).value.take().unwrap();
- let _: Box<Node<T>> = Box::from_raw(tail);
- return Data(ret);
- }
-
- if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent }
- }
- }
-}
-
-impl<T> Drop for Queue<T> {
- fn drop(&mut self) {
- unsafe {
- let mut cur = *self.tail.get();
- while !cur.is_null() {
- let next = (*cur).next.load(Ordering::Relaxed);
- let _: Box<Node<T>> = Box::from_raw(cur);
- cur = next;
- }
- }
- }
-}
diff --git a/library/std/src/sync/mpsc/mpsc_queue/tests.rs b/library/std/src/sync/mpsc/mpsc_queue/tests.rs
deleted file mode 100644
index 34b2a9a98..000000000
--- a/library/std/src/sync/mpsc/mpsc_queue/tests.rs
+++ /dev/null
@@ -1,47 +0,0 @@
-use super::{Data, Empty, Inconsistent, Queue};
-use crate::sync::mpsc::channel;
-use crate::sync::Arc;
-use crate::thread;
-
-#[test]
-fn test_full() {
- let q: Queue<Box<_>> = Queue::new();
- q.push(Box::new(1));
- q.push(Box::new(2));
-}
-
-#[test]
-fn test() {
- let nthreads = 8;
- let nmsgs = if cfg!(miri) { 100 } else { 1000 };
- let q = Queue::new();
- match q.pop() {
- Empty => {}
- Inconsistent | Data(..) => panic!(),
- }
- let (tx, rx) = channel();
- let q = Arc::new(q);
-
- for _ in 0..nthreads {
- let tx = tx.clone();
- let q = q.clone();
- thread::spawn(move || {
- for i in 0..nmsgs {
- q.push(i);
- }
- tx.send(()).unwrap();
- });
- }
-
- let mut i = 0;
- while i < nthreads * nmsgs {
- match q.pop() {
- Empty | Inconsistent => {}
- Data(_) => i += 1,
- }
- }
- drop(tx);
- for _ in 0..nthreads {
- rx.recv().unwrap();
- }
-}
diff --git a/library/std/src/sync/mpsc/oneshot.rs b/library/std/src/sync/mpsc/oneshot.rs
deleted file mode 100644
index 0e259b8ae..000000000
--- a/library/std/src/sync/mpsc/oneshot.rs
+++ /dev/null
@@ -1,315 +0,0 @@
-/// Oneshot channels/ports
-///
-/// This is the initial flavor of channels/ports used for comm module. This is
-/// an optimization for the one-use case of a channel. The major optimization of
-/// this type is to have one and exactly one allocation when the chan/port pair
-/// is created.
-///
-/// Another possible optimization would be to not use an Arc box because
-/// in theory we know when the shared packet can be deallocated (no real need
-/// for the atomic reference counting), but I was having trouble how to destroy
-/// the data early in a drop of a Port.
-///
-/// # Implementation
-///
-/// Oneshots are implemented around one atomic usize variable. This variable
-/// indicates both the state of the port/chan but also contains any threads
-/// blocked on the port. All atomic operations happen on this one word.
-///
-/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
-/// on behalf of the channel side of things (it can be mentally thought of as
-/// consuming the port). This upgrade is then also stored in the shared packet.
-/// The one caveat to consider is that when a port sees a disconnected channel
-/// it must check for data because there is no "data plus upgrade" state.
-pub use self::Failure::*;
-use self::MyUpgrade::*;
-pub use self::UpgradeResult::*;
-
-use crate::cell::UnsafeCell;
-use crate::ptr;
-use crate::sync::atomic::{AtomicPtr, Ordering};
-use crate::sync::mpsc::blocking::{self, SignalToken};
-use crate::sync::mpsc::Receiver;
-use crate::time::Instant;
-
-// Various states you can find a port in.
-const EMPTY: *mut u8 = ptr::invalid_mut::<u8>(0); // initial state: no data, no blocked receiver
-const DATA: *mut u8 = ptr::invalid_mut::<u8>(1); // data ready for receiver to take
-const DISCONNECTED: *mut u8 = ptr::invalid_mut::<u8>(2); // channel is disconnected OR upgraded
-// Any other value represents a pointer to a SignalToken value. The
-// protocol ensures that when the state moves *to* a pointer,
-// ownership of the token is given to the packet, and when the state
-// moves *from* a pointer, ownership of the token is transferred to
-// whoever changed the state.
-
-pub struct Packet<T> {
- // Internal state of the chan/port pair (stores the blocked thread as well)
- state: AtomicPtr<u8>,
- // One-shot data slot location
- data: UnsafeCell<Option<T>>,
- // when used for the second time, a oneshot channel must be upgraded, and
- // this contains the slot for the upgrade
- upgrade: UnsafeCell<MyUpgrade<T>>,
-}
-
-pub enum Failure<T> {
- Empty,
- Disconnected,
- Upgraded(Receiver<T>),
-}
-
-pub enum UpgradeResult {
- UpSuccess,
- UpDisconnected,
- UpWoke(SignalToken),
-}
-
-enum MyUpgrade<T> {
- NothingSent,
- SendUsed,
- GoUp(Receiver<T>),
-}
-
-impl<T> Packet<T> {
- pub fn new() -> Packet<T> {
- Packet {
- data: UnsafeCell::new(None),
- upgrade: UnsafeCell::new(NothingSent),
- state: AtomicPtr::new(EMPTY),
- }
- }
-
- pub fn send(&self, t: T) -> Result<(), T> {
- unsafe {
- // Sanity check
- match *self.upgrade.get() {
- NothingSent => {}
- _ => panic!("sending on a oneshot that's already sent on "),
- }
- assert!((*self.data.get()).is_none());
- ptr::write(self.data.get(), Some(t));
- ptr::write(self.upgrade.get(), SendUsed);
-
- match self.state.swap(DATA, Ordering::SeqCst) {
- // Sent the data, no one was waiting
- EMPTY => Ok(()),
-
- // Couldn't send the data, the port hung up first. Return the data
- // back up the stack.
- DISCONNECTED => {
- self.state.swap(DISCONNECTED, Ordering::SeqCst);
- ptr::write(self.upgrade.get(), NothingSent);
- Err((&mut *self.data.get()).take().unwrap())
- }
-
- // Not possible, these are one-use channels
- DATA => unreachable!(),
-
- // There is a thread waiting on the other end. We leave the 'DATA'
- // state inside so it'll pick it up on the other end.
- ptr => {
- SignalToken::from_raw(ptr).signal();
- Ok(())
- }
- }
- }
- }
-
- // Just tests whether this channel has been sent on or not, this is only
- // safe to use from the sender.
- pub fn sent(&self) -> bool {
- unsafe { !matches!(*self.upgrade.get(), NothingSent) }
- }
-
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
- // Attempt to not block the thread (it's a little expensive). If it looks
- // like we're not empty, then immediately go through to `try_recv`.
- if self.state.load(Ordering::SeqCst) == EMPTY {
- let (wait_token, signal_token) = blocking::tokens();
- let ptr = unsafe { signal_token.to_raw() };
-
- // race with senders to enter the blocking state
- if self.state.compare_exchange(EMPTY, ptr, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
- if let Some(deadline) = deadline {
- let timed_out = !wait_token.wait_max_until(deadline);
- // Try to reset the state
- if timed_out {
- self.abort_selection().map_err(Upgraded)?;
- }
- } else {
- wait_token.wait();
- debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY);
- }
- } else {
- // drop the signal token, since we never blocked
- drop(unsafe { SignalToken::from_raw(ptr) });
- }
- }
-
- self.try_recv()
- }
-
- pub fn try_recv(&self) -> Result<T, Failure<T>> {
- unsafe {
- match self.state.load(Ordering::SeqCst) {
- EMPTY => Err(Empty),
-
- // We saw some data on the channel, but the channel can be used
- // again to send us an upgrade. As a result, we need to re-insert
- // into the channel that there's no data available (otherwise we'll
- // just see DATA next time). This is done as a cmpxchg because if
- // the state changes under our feet we'd rather just see that state
- // change.
- DATA => {
- let _ = self.state.compare_exchange(
- DATA,
- EMPTY,
- Ordering::SeqCst,
- Ordering::SeqCst,
- );
- match (&mut *self.data.get()).take() {
- Some(data) => Ok(data),
- None => unreachable!(),
- }
- }
-
- // There's no guarantee that we receive before an upgrade happens,
- // and an upgrade flags the channel as disconnected, so when we see
- // this we first need to check if there's data available and *then*
- // we go through and process the upgrade.
- DISCONNECTED => match (&mut *self.data.get()).take() {
- Some(data) => Ok(data),
- None => match ptr::replace(self.upgrade.get(), SendUsed) {
- SendUsed | NothingSent => Err(Disconnected),
- GoUp(upgrade) => Err(Upgraded(upgrade)),
- },
- },
-
- // We are the sole receiver; there cannot be a blocking
- // receiver already.
- _ => unreachable!(),
- }
- }
- }
-
- // Returns whether the upgrade was completed. If the upgrade wasn't
- // completed, then the port couldn't get sent to the other half (it will
- // never receive it).
- pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
- unsafe {
- let prev = match *self.upgrade.get() {
- NothingSent => NothingSent,
- SendUsed => SendUsed,
- _ => panic!("upgrading again"),
- };
- ptr::write(self.upgrade.get(), GoUp(up));
-
- match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
- // If the channel is empty or has data on it, then we're good to go.
- // Senders will check the data before the upgrade (in case we
- // plastered over the DATA state).
- DATA | EMPTY => UpSuccess,
-
- // If the other end is already disconnected, then we failed the
- // upgrade. Be sure to trash the port we were given.
- DISCONNECTED => {
- ptr::replace(self.upgrade.get(), prev);
- UpDisconnected
- }
-
- // If someone's waiting, we gotta wake them up
- ptr => UpWoke(SignalToken::from_raw(ptr)),
- }
- }
- }
-
- pub fn drop_chan(&self) {
- match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
- DATA | DISCONNECTED | EMPTY => {}
-
- // If someone's waiting, we gotta wake them up
- ptr => unsafe {
- SignalToken::from_raw(ptr).signal();
- },
- }
- }
-
- pub fn drop_port(&self) {
- match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
- // An empty channel has nothing to do, and a remotely disconnected
- // channel also has nothing to do b/c we're about to run the drop
- // glue
- DISCONNECTED | EMPTY => {}
-
- // There's data on the channel, so make sure we destroy it promptly.
- // This is why not using an arc is a little difficult (need the box
- // to stay valid while we take the data).
- DATA => unsafe {
- (&mut *self.data.get()).take().unwrap();
- },
-
- // We're the only ones that can block on this port
- _ => unreachable!(),
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // select implementation
- ////////////////////////////////////////////////////////////////////////////
-
- // Remove a previous selecting thread from this port. This ensures that the
- // blocked thread will no longer be visible to any other threads.
- //
- // The return value indicates whether there's data on this port.
- pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
- let state = match self.state.load(Ordering::SeqCst) {
- // Each of these states means that no further activity will happen
- // with regard to abortion selection
- s @ (EMPTY | DATA | DISCONNECTED) => s,
-
- // If we've got a blocked thread, then use an atomic to gain ownership
- // of it (may fail)
- ptr => self
- .state
- .compare_exchange(ptr, EMPTY, Ordering::SeqCst, Ordering::SeqCst)
- .unwrap_or_else(|x| x),
- };
-
- // Now that we've got ownership of our state, figure out what to do
- // about it.
- match state {
- EMPTY => unreachable!(),
- // our thread used for select was stolen
- DATA => Ok(true),
-
- // If the other end has hung up, then we have complete ownership
- // of the port. First, check if there was data waiting for us. This
- // is possible if the other end sent something and then hung up.
- //
- // We then need to check to see if there was an upgrade requested,
- // and if so, the upgraded port needs to have its selection aborted.
- DISCONNECTED => unsafe {
- if (*self.data.get()).is_some() {
- Ok(true)
- } else {
- match ptr::replace(self.upgrade.get(), SendUsed) {
- GoUp(port) => Err(port),
- _ => Ok(true),
- }
- }
- },
-
- // We woke ourselves up from select.
- ptr => unsafe {
- drop(SignalToken::from_raw(ptr));
- Ok(false)
- },
- }
- }
-}
-
-impl<T> Drop for Packet<T> {
- fn drop(&mut self) {
- assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED);
- }
-}
diff --git a/library/std/src/sync/mpsc/shared.rs b/library/std/src/sync/mpsc/shared.rs
deleted file mode 100644
index 51917bd96..000000000
--- a/library/std/src/sync/mpsc/shared.rs
+++ /dev/null
@@ -1,501 +0,0 @@
-/// Shared channels.
-///
-/// This is the flavor of channels which are not necessarily optimized for any
-/// particular use case, but are the most general in how they are used. Shared
-/// channels are cloneable allowing for multiple senders.
-///
-/// High level implementation details can be found in the comment of the parent
-/// module. You'll also note that the implementation of the shared and stream
-/// channels are quite similar, and this is no coincidence!
-pub use self::Failure::*;
-use self::StartResult::*;
-
-use core::cmp;
-use core::intrinsics::abort;
-
-use crate::cell::UnsafeCell;
-use crate::ptr;
-use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
-use crate::sync::mpsc::blocking::{self, SignalToken};
-use crate::sync::mpsc::mpsc_queue as mpsc;
-use crate::sync::{Mutex, MutexGuard};
-use crate::thread;
-use crate::time::Instant;
-
-const DISCONNECTED: isize = isize::MIN;
-const FUDGE: isize = 1024;
-const MAX_REFCOUNT: usize = (isize::MAX) as usize;
-#[cfg(test)]
-const MAX_STEALS: isize = 5;
-#[cfg(not(test))]
-const MAX_STEALS: isize = 1 << 20;
-const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver
-
-pub struct Packet<T> {
- queue: mpsc::Queue<T>,
- cnt: AtomicIsize, // How many items are on this channel
- steals: UnsafeCell<isize>, // How many times has a port received without blocking?
- to_wake: AtomicPtr<u8>, // SignalToken for wake up
-
- // The number of channels which are currently using this packet.
- channels: AtomicUsize,
-
- // See the discussion in Port::drop and the channel send methods for what
- // these are used for
- port_dropped: AtomicBool,
- sender_drain: AtomicIsize,
-
- // this lock protects various portions of this implementation during
- // select()
- select_lock: Mutex<()>,
-}
-
-pub enum Failure {
- Empty,
- Disconnected,
-}
-
-#[derive(PartialEq, Eq)]
-enum StartResult {
- Installed,
- Abort,
-}
-
-impl<T> Packet<T> {
- // Creation of a packet *must* be followed by a call to postinit_lock
- // and later by inherit_blocker
- pub fn new() -> Packet<T> {
- Packet {
- queue: mpsc::Queue::new(),
- cnt: AtomicIsize::new(0),
- steals: UnsafeCell::new(0),
- to_wake: AtomicPtr::new(EMPTY),
- channels: AtomicUsize::new(2),
- port_dropped: AtomicBool::new(false),
- sender_drain: AtomicIsize::new(0),
- select_lock: Mutex::new(()),
- }
- }
-
- // This function should be used after newly created Packet
- // was wrapped with an Arc
- // In other case mutex data will be duplicated while cloning
- // and that could cause problems on platforms where it is
- // represented by opaque data structure
- pub fn postinit_lock(&self) -> MutexGuard<'_, ()> {
- self.select_lock.lock().unwrap()
- }
-
- // This function is used at the creation of a shared packet to inherit a
- // previously blocked thread. This is done to prevent spurious wakeups of
- // threads in select().
- //
- // This can only be called at channel-creation time
- pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) {
- if let Some(token) = token {
- assert_eq!(self.cnt.load(Ordering::SeqCst), 0);
- assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
- self.to_wake.store(unsafe { token.to_raw() }, Ordering::SeqCst);
- self.cnt.store(-1, Ordering::SeqCst);
-
- // This store is a little sketchy. What's happening here is that
- // we're transferring a blocker from a oneshot or stream channel to
- // this shared channel. In doing so, we never spuriously wake them
- // up and rather only wake them up at the appropriate time. This
- // implementation of shared channels assumes that any blocking
- // recv() will undo the increment of steals performed in try_recv()
- // once the recv is complete. This thread that we're inheriting,
- // however, is not in the middle of recv. Hence, the first time we
- // wake them up, they're going to wake up from their old port, move
- // on to the upgraded port, and then call the block recv() function.
- //
- // When calling this function, they'll find there's data immediately
- // available, counting it as a steal. This in fact wasn't a steal
- // because we appropriately blocked them waiting for data.
- //
- // To offset this bad increment, we initially set the steal count to
- // -1. You'll find some special code in abort_selection() as well to
- // ensure that this -1 steal count doesn't escape too far.
- unsafe {
- *self.steals.get() = -1;
- }
- }
-
- // When the shared packet is constructed, we grabbed this lock. The
- // purpose of this lock is to ensure that abort_selection() doesn't
- // interfere with this method. After we unlock this lock, we're
- // signifying that we're done modifying self.cnt and self.to_wake and
- // the port is ready for the world to continue using it.
- drop(guard);
- }
-
- pub fn send(&self, t: T) -> Result<(), T> {
- // See Port::drop for what's going on
- if self.port_dropped.load(Ordering::SeqCst) {
- return Err(t);
- }
-
- // Note that the multiple sender case is a little trickier
- // semantically than the single sender case. The logic for
- // incrementing is "add and if disconnected store disconnected".
- // This could end up leading some senders to believe that there
- // wasn't a disconnect if in fact there was a disconnect. This means
- // that while one thread is attempting to re-store the disconnected
- // states, other threads could walk through merrily incrementing
- // this very-negative disconnected count. To prevent senders from
- // spuriously attempting to send when the channels is actually
- // disconnected, the count has a ranged check here.
- //
- // This is also done for another reason. Remember that the return
- // value of this function is:
- //
- // `true` == the data *may* be received, this essentially has no
- // meaning
- // `false` == the data will *never* be received, this has a lot of
- // meaning
- //
- // In the SPSC case, we have a check of 'queue.is_empty()' to see
- // whether the data was actually received, but this same condition
- // means nothing in a multi-producer context. As a result, this
- // preflight check serves as the definitive "this will never be
- // received". Once we get beyond this check, we have permanently
- // entered the realm of "this may be received"
- if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE {
- return Err(t);
- }
-
- self.queue.push(t);
- match self.cnt.fetch_add(1, Ordering::SeqCst) {
- -1 => {
- self.take_to_wake().signal();
- }
-
- // In this case, we have possibly failed to send our data, and
- // we need to consider re-popping the data in order to fully
- // destroy it. We must arbitrate among the multiple senders,
- // however, because the queues that we're using are
- // single-consumer queues. In order to do this, all exiting
- // pushers will use an atomic count in order to count those
- // flowing through. Pushers who see 0 are required to drain as
- // much as possible, and then can only exit when they are the
- // only pusher (otherwise they must try again).
- n if n < DISCONNECTED + FUDGE => {
- // see the comment in 'try' for a shared channel for why this
- // window of "not disconnected" is ok.
- self.cnt.store(DISCONNECTED, Ordering::SeqCst);
-
- if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 {
- loop {
- // drain the queue, for info on the thread yield see the
- // discussion in try_recv
- loop {
- match self.queue.pop() {
- mpsc::Data(..) => {}
- mpsc::Empty => break,
- mpsc::Inconsistent => thread::yield_now(),
- }
- }
- // maybe we're done, if we're not the last ones
- // here, then we need to go try again.
- if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 {
- break;
- }
- }
-
- // At this point, there may still be data on the queue,
- // but only if the count hasn't been incremented and
- // some other sender hasn't finished pushing data just
- // yet. That sender in question will drain its own data.
- }
- }
-
- // Can't make any assumptions about this case like in the SPSC case.
- _ => {}
- }
-
- Ok(())
- }
-
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
- // This code is essentially the exact same as that found in the stream
- // case (see stream.rs)
- match self.try_recv() {
- Err(Empty) => {}
- data => return data,
- }
-
- let (wait_token, signal_token) = blocking::tokens();
- if self.decrement(signal_token) == Installed {
- if let Some(deadline) = deadline {
- let timed_out = !wait_token.wait_max_until(deadline);
- if timed_out {
- self.abort_selection(false);
- }
- } else {
- wait_token.wait();
- }
- }
-
- match self.try_recv() {
- data @ Ok(..) => unsafe {
- *self.steals.get() -= 1;
- data
- },
- data => data,
- }
- }
-
- // Essentially the exact same thing as the stream decrement function.
- // Returns true if blocking should proceed.
- fn decrement(&self, token: SignalToken) -> StartResult {
- unsafe {
- assert_eq!(
- self.to_wake.load(Ordering::SeqCst),
- EMPTY,
- "This is a known bug in the Rust standard library. See https://github.com/rust-lang/rust/issues/39364"
- );
- let ptr = token.to_raw();
- self.to_wake.store(ptr, Ordering::SeqCst);
-
- let steals = ptr::replace(self.steals.get(), 0);
-
- match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
- DISCONNECTED => {
- self.cnt.store(DISCONNECTED, Ordering::SeqCst);
- }
- // If we factor in our steals and notice that the channel has no
- // data, we successfully sleep
- n => {
- assert!(n >= 0);
- if n - steals <= 0 {
- return Installed;
- }
- }
- }
-
- self.to_wake.store(EMPTY, Ordering::SeqCst);
- drop(SignalToken::from_raw(ptr));
- Abort
- }
- }
-
- pub fn try_recv(&self) -> Result<T, Failure> {
- let ret = match self.queue.pop() {
- mpsc::Data(t) => Some(t),
- mpsc::Empty => None,
-
- // This is a bit of an interesting case. The channel is reported as
- // having data available, but our pop() has failed due to the queue
- // being in an inconsistent state. This means that there is some
- // pusher somewhere which has yet to complete, but we are guaranteed
- // that a pop will eventually succeed. In this case, we spin in a
- // yield loop because the remote sender should finish their enqueue
- // operation "very quickly".
- //
- // Avoiding this yield loop would require a different queue
- // abstraction which provides the guarantee that after M pushes have
- // succeeded, at least M pops will succeed. The current queues
- // guarantee that if there are N active pushes, you can pop N times
- // once all N have finished.
- mpsc::Inconsistent => {
- let data;
- loop {
- thread::yield_now();
- match self.queue.pop() {
- mpsc::Data(t) => {
- data = t;
- break;
- }
- mpsc::Empty => panic!("inconsistent => empty"),
- mpsc::Inconsistent => {}
- }
- }
- Some(data)
- }
- };
- match ret {
- // See the discussion in the stream implementation for why we
- // might decrement steals.
- Some(data) => unsafe {
- if *self.steals.get() > MAX_STEALS {
- match self.cnt.swap(0, Ordering::SeqCst) {
- DISCONNECTED => {
- self.cnt.store(DISCONNECTED, Ordering::SeqCst);
- }
- n => {
- let m = cmp::min(n, *self.steals.get());
- *self.steals.get() -= m;
- self.bump(n - m);
- }
- }
- assert!(*self.steals.get() >= 0);
- }
- *self.steals.get() += 1;
- Ok(data)
- },
-
- // See the discussion in the stream implementation for why we try
- // again.
- None => {
- match self.cnt.load(Ordering::SeqCst) {
- n if n != DISCONNECTED => Err(Empty),
- _ => {
- match self.queue.pop() {
- mpsc::Data(t) => Ok(t),
- mpsc::Empty => Err(Disconnected),
- // with no senders, an inconsistency is impossible.
- mpsc::Inconsistent => unreachable!(),
- }
- }
- }
- }
- }
- }
-
- // Prepares this shared packet for a channel clone, essentially just bumping
- // a refcount.
- pub fn clone_chan(&self) {
- let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
-
- // See comments on Arc::clone() on why we do this (for `mem::forget`).
- if old_count > MAX_REFCOUNT {
- abort();
- }
- }
-
- // Decrement the reference count on a channel. This is called whenever a
- // Chan is dropped and may end up waking up a receiver. It's the receiver's
- // responsibility on the other end to figure out that we've disconnected.
- pub fn drop_chan(&self) {
- match self.channels.fetch_sub(1, Ordering::SeqCst) {
- 1 => {}
- n if n > 1 => return,
- n => panic!("bad number of channels left {n}"),
- }
-
- match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
- -1 => {
- self.take_to_wake().signal();
- }
- DISCONNECTED => {}
- n => {
- assert!(n >= 0);
- }
- }
- }
-
- // See the long discussion inside of stream.rs for why the queue is drained,
- // and why it is done in this fashion.
- pub fn drop_port(&self) {
- self.port_dropped.store(true, Ordering::SeqCst);
- let mut steals = unsafe { *self.steals.get() };
- while {
- match self.cnt.compare_exchange(
- steals,
- DISCONNECTED,
- Ordering::SeqCst,
- Ordering::SeqCst,
- ) {
- Ok(_) => false,
- Err(old) => old != DISCONNECTED,
- }
- } {
- // See the discussion in 'try_recv' for why we yield
- // control of this thread.
- loop {
- match self.queue.pop() {
- mpsc::Data(..) => {
- steals += 1;
- }
- mpsc::Empty | mpsc::Inconsistent => break,
- }
- }
- }
- }
-
- // Consumes ownership of the 'to_wake' field.
- fn take_to_wake(&self) -> SignalToken {
- let ptr = self.to_wake.load(Ordering::SeqCst);
- self.to_wake.store(EMPTY, Ordering::SeqCst);
- assert!(ptr != EMPTY);
- unsafe { SignalToken::from_raw(ptr) }
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // select implementation
- ////////////////////////////////////////////////////////////////////////////
-
- // increment the count on the channel (used for selection)
- fn bump(&self, amt: isize) -> isize {
- match self.cnt.fetch_add(amt, Ordering::SeqCst) {
- DISCONNECTED => {
- self.cnt.store(DISCONNECTED, Ordering::SeqCst);
- DISCONNECTED
- }
- n => n,
- }
- }
-
- // Cancels a previous thread waiting on this port, returning whether there's
- // data on the port.
- //
- // This is similar to the stream implementation (hence fewer comments), but
- // uses a different value for the "steals" variable.
- pub fn abort_selection(&self, _was_upgrade: bool) -> bool {
- // Before we do anything else, we bounce on this lock. The reason for
- // doing this is to ensure that any upgrade-in-progress is gone and
- // done with. Without this bounce, we can race with inherit_blocker
- // about looking at and dealing with to_wake. Once we have acquired the
- // lock, we are guaranteed that inherit_blocker is done.
- {
- let _guard = self.select_lock.lock().unwrap();
- }
-
- // Like the stream implementation, we want to make sure that the count
- // on the channel goes non-negative. We don't know how negative the
- // stream currently is, so instead of using a steal value of 1, we load
- // the channel count and figure out what we should do to make it
- // positive.
- let steals = {
- let cnt = self.cnt.load(Ordering::SeqCst);
- if cnt < 0 && cnt != DISCONNECTED { -cnt } else { 0 }
- };
- let prev = self.bump(steals + 1);
-
- if prev == DISCONNECTED {
- assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
- true
- } else {
- let cur = prev + steals + 1;
- assert!(cur >= 0);
- if prev < 0 {
- drop(self.take_to_wake());
- } else {
- while self.to_wake.load(Ordering::SeqCst) != EMPTY {
- thread::yield_now();
- }
- }
- unsafe {
- // if the number of steals is -1, it was the pre-emptive -1 steal
- // count from when we inherited a blocker. This is fine because
- // we're just going to overwrite it with a real value.
- let old = self.steals.get();
- assert!(*old == 0 || *old == -1);
- *old = steals;
- prev >= 0
- }
- }
- }
-}
-
-impl<T> Drop for Packet<T> {
- fn drop(&mut self) {
- // Note that this load is not only an assert for correctness about
- // disconnection, but also a proper fence before the read of
- // `to_wake`, so this assert cannot be removed with also removing
- // the `to_wake` assert.
- assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
- assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
- assert_eq!(self.channels.load(Ordering::SeqCst), 0);
- }
-}
diff --git a/library/std/src/sync/mpsc/spsc_queue.rs b/library/std/src/sync/mpsc/spsc_queue.rs
deleted file mode 100644
index 7e745eb31..000000000
--- a/library/std/src/sync/mpsc/spsc_queue.rs
+++ /dev/null
@@ -1,236 +0,0 @@
-//! A single-producer single-consumer concurrent queue
-//!
-//! This module contains the implementation of an SPSC queue which can be used
-//! concurrently between two threads. This data structure is safe to use and
-//! enforces the semantics that there is one pusher and one popper.
-
-// https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
-
-#[cfg(all(test, not(target_os = "emscripten")))]
-mod tests;
-
-use core::cell::UnsafeCell;
-use core::ptr;
-
-use crate::boxed::Box;
-use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
-
-use super::cache_aligned::CacheAligned;
-
-// Node within the linked list queue of messages to send
-struct Node<T> {
- // FIXME: this could be an uninitialized T if we're careful enough, and
- // that would reduce memory usage (and be a bit faster).
- // is it worth it?
- value: Option<T>, // nullable for re-use of nodes
- cached: bool, // This node goes into the node cache
- next: AtomicPtr<Node<T>>, // next node in the queue
-}
-
-/// The single-producer single-consumer queue. This structure is not cloneable,
-/// but it can be safely shared in an Arc if it is guaranteed that there
-/// is only one popper and one pusher touching the queue at any one point in
-/// time.
-pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> {
- // consumer fields
- consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
-
- // producer fields
- producer: CacheAligned<Producer<T, ProducerAddition>>,
-}
-
-struct Consumer<T, Addition> {
- tail: UnsafeCell<*mut Node<T>>, // where to pop from
- tail_prev: AtomicPtr<Node<T>>, // where to pop from
- cache_bound: usize, // maximum cache size
- cached_nodes: AtomicUsize, // number of nodes marked as cacheable
- addition: Addition,
-}
-
-struct Producer<T, Addition> {
- head: UnsafeCell<*mut Node<T>>, // where to push to
- first: UnsafeCell<*mut Node<T>>, // where to get new nodes from
- tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
- addition: Addition,
-}
-
-unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> {}
-
-unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> {}
-
-impl<T> Node<T> {
- fn new() -> *mut Node<T> {
- Box::into_raw(box Node {
- value: None,
- cached: false,
- next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
- })
- }
-}
-
-impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
- /// Creates a new queue. With given additional elements in the producer and
- /// consumer portions of the queue.
- ///
- /// Due to the performance implications of cache-contention,
- /// we wish to keep fields used mainly by the producer on a separate cache
- /// line than those used by the consumer.
- /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
- /// allocate one for small fields, so we allow users to insert additional
- /// fields into the cache lines already allocated by this for the producer
- /// and consumer.
- ///
- /// This is unsafe as the type system doesn't enforce a single
- /// consumer-producer relationship. It also allows the consumer to `pop`
- /// items while there is a `peek` active due to all methods having a
- /// non-mutable receiver.
- ///
- /// # Arguments
- ///
- /// * `bound` - This queue implementation is implemented with a linked
- /// list, and this means that a push is always a malloc. In
- /// order to amortize this cost, an internal cache of nodes is
- /// maintained to prevent a malloc from always being
- /// necessary. This bound is the limit on the size of the
- /// cache (if desired). If the value is 0, then the cache has
- /// no bound. Otherwise, the cache will never grow larger than
- /// `bound` (although the queue itself could be much larger.
- pub unsafe fn with_additions(
- bound: usize,
- producer_addition: ProducerAddition,
- consumer_addition: ConsumerAddition,
- ) -> Self {
- let n1 = Node::new();
- let n2 = Node::new();
- (*n1).next.store(n2, Ordering::Relaxed);
- Queue {
- consumer: CacheAligned::new(Consumer {
- tail: UnsafeCell::new(n2),
- tail_prev: AtomicPtr::new(n1),
- cache_bound: bound,
- cached_nodes: AtomicUsize::new(0),
- addition: consumer_addition,
- }),
- producer: CacheAligned::new(Producer {
- head: UnsafeCell::new(n2),
- first: UnsafeCell::new(n1),
- tail_copy: UnsafeCell::new(n1),
- addition: producer_addition,
- }),
- }
- }
-
- /// Pushes a new value onto this queue. Note that to use this function
- /// safely, it must be externally guaranteed that there is only one pusher.
- pub fn push(&self, t: T) {
- unsafe {
- // Acquire a node (which either uses a cached one or allocates a new
- // one), and then append this to the 'head' node.
- let n = self.alloc();
- assert!((*n).value.is_none());
- (*n).value = Some(t);
- (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
- (**self.producer.head.get()).next.store(n, Ordering::Release);
- *(&self.producer.head).get() = n;
- }
- }
-
- unsafe fn alloc(&self) -> *mut Node<T> {
- // First try to see if we can consume the 'first' node for our uses.
- if *self.producer.first.get() != *self.producer.tail_copy.get() {
- let ret = *self.producer.first.get();
- *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
- return ret;
- }
- // If the above fails, then update our copy of the tail and try
- // again.
- *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire);
- if *self.producer.first.get() != *self.producer.tail_copy.get() {
- let ret = *self.producer.first.get();
- *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
- return ret;
- }
- // If all of that fails, then we have to allocate a new node
- // (there's nothing in the node cache).
- Node::new()
- }
-
- /// Attempts to pop a value from this queue. Remember that to use this type
- /// safely you must ensure that there is only one popper at a time.
- pub fn pop(&self) -> Option<T> {
- unsafe {
- // The `tail` node is not actually a used node, but rather a
- // sentinel from where we should start popping from. Hence, look at
- // tail's next field and see if we can use it. If we do a pop, then
- // the current tail node is a candidate for going into the cache.
- let tail = *self.consumer.tail.get();
- let next = (*tail).next.load(Ordering::Acquire);
- if next.is_null() {
- return None;
- }
- assert!((*next).value.is_some());
- let ret = (*next).value.take();
-
- *self.consumer.0.tail.get() = next;
- if self.consumer.cache_bound == 0 {
- self.consumer.tail_prev.store(tail, Ordering::Release);
- } else {
- let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
- if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
- self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
- (*tail).cached = true;
- }
-
- if (*tail).cached {
- self.consumer.tail_prev.store(tail, Ordering::Release);
- } else {
- (*self.consumer.tail_prev.load(Ordering::Relaxed))
- .next
- .store(next, Ordering::Relaxed);
- // We have successfully erased all references to 'tail', so
- // now we can safely drop it.
- let _: Box<Node<T>> = Box::from_raw(tail);
- }
- }
- ret
- }
- }
-
- /// Attempts to peek at the head of the queue, returning `None` if the queue
- /// has no data currently
- ///
- /// # Warning
- /// The reference returned is invalid if it is not used before the consumer
- /// pops the value off the queue. If the producer then pushes another value
- /// onto the queue, it will overwrite the value pointed to by the reference.
- pub fn peek(&self) -> Option<&mut T> {
- // This is essentially the same as above with all the popping bits
- // stripped out.
- unsafe {
- let tail = *self.consumer.tail.get();
- let next = (*tail).next.load(Ordering::Acquire);
- if next.is_null() { None } else { (*next).value.as_mut() }
- }
- }
-
- pub fn producer_addition(&self) -> &ProducerAddition {
- &self.producer.addition
- }
-
- pub fn consumer_addition(&self) -> &ConsumerAddition {
- &self.consumer.addition
- }
-}
-
-impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
- fn drop(&mut self) {
- unsafe {
- let mut cur = *self.producer.first.get();
- while !cur.is_null() {
- let next = (*cur).next.load(Ordering::Relaxed);
- let _n: Box<Node<T>> = Box::from_raw(cur);
- cur = next;
- }
- }
- }
-}
diff --git a/library/std/src/sync/mpsc/spsc_queue/tests.rs b/library/std/src/sync/mpsc/spsc_queue/tests.rs
deleted file mode 100644
index eb6d5c2cf..000000000
--- a/library/std/src/sync/mpsc/spsc_queue/tests.rs
+++ /dev/null
@@ -1,102 +0,0 @@
-use super::Queue;
-use crate::sync::mpsc::channel;
-use crate::sync::Arc;
-use crate::thread;
-
-#[test]
-fn smoke() {
- unsafe {
- let queue = Queue::with_additions(0, (), ());
- queue.push(1);
- queue.push(2);
- assert_eq!(queue.pop(), Some(1));
- assert_eq!(queue.pop(), Some(2));
- assert_eq!(queue.pop(), None);
- queue.push(3);
- queue.push(4);
- assert_eq!(queue.pop(), Some(3));
- assert_eq!(queue.pop(), Some(4));
- assert_eq!(queue.pop(), None);
- }
-}
-
-#[test]
-fn peek() {
- unsafe {
- let queue = Queue::with_additions(0, (), ());
- queue.push(vec![1]);
-
- // Ensure the borrowchecker works
- match queue.peek() {
- Some(vec) => {
- assert_eq!(&*vec, &[1]);
- }
- None => unreachable!(),
- }
-
- match queue.pop() {
- Some(vec) => {
- assert_eq!(&*vec, &[1]);
- }
- None => unreachable!(),
- }
- }
-}
-
-#[test]
-fn drop_full() {
- unsafe {
- let q: Queue<Box<_>> = Queue::with_additions(0, (), ());
- q.push(Box::new(1));
- q.push(Box::new(2));
- }
-}
-
-#[test]
-fn smoke_bound() {
- unsafe {
- let q = Queue::with_additions(0, (), ());
- q.push(1);
- q.push(2);
- assert_eq!(q.pop(), Some(1));
- assert_eq!(q.pop(), Some(2));
- assert_eq!(q.pop(), None);
- q.push(3);
- q.push(4);
- assert_eq!(q.pop(), Some(3));
- assert_eq!(q.pop(), Some(4));
- assert_eq!(q.pop(), None);
- }
-}
-
-#[test]
-fn stress() {
- unsafe {
- stress_bound(0);
- stress_bound(1);
- }
-
- unsafe fn stress_bound(bound: usize) {
- let count = if cfg!(miri) { 1000 } else { 100000 };
- let q = Arc::new(Queue::with_additions(bound, (), ()));
-
- let (tx, rx) = channel();
- let q2 = q.clone();
- let _t = thread::spawn(move || {
- for _ in 0..count {
- loop {
- match q2.pop() {
- Some(1) => break,
- Some(_) => panic!(),
- None => {}
- }
- }
- }
- tx.send(()).unwrap();
- });
- for _ in 0..count {
- q.push(1);
- }
- rx.recv().unwrap();
- }
-}
diff --git a/library/std/src/sync/mpsc/stream.rs b/library/std/src/sync/mpsc/stream.rs
deleted file mode 100644
index 4592e9141..000000000
--- a/library/std/src/sync/mpsc/stream.rs
+++ /dev/null
@@ -1,457 +0,0 @@
-/// Stream channels
-///
-/// This is the flavor of channels which are optimized for one sender and one
-/// receiver. The sender will be upgraded to a shared channel if the channel is
-/// cloned.
-///
-/// High level implementation details can be found in the comment of the parent
-/// module.
-pub use self::Failure::*;
-use self::Message::*;
-pub use self::UpgradeResult::*;
-
-use core::cmp;
-
-use crate::cell::UnsafeCell;
-use crate::ptr;
-use crate::thread;
-use crate::time::Instant;
-
-use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, Ordering};
-use crate::sync::mpsc::blocking::{self, SignalToken};
-use crate::sync::mpsc::spsc_queue as spsc;
-use crate::sync::mpsc::Receiver;
-
-const DISCONNECTED: isize = isize::MIN;
-#[cfg(test)]
-const MAX_STEALS: isize = 5;
-#[cfg(not(test))]
-const MAX_STEALS: isize = 1 << 20;
-const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver
-
-pub struct Packet<T> {
- // internal queue for all messages
- queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
-}
-
-struct ProducerAddition {
- cnt: AtomicIsize, // How many items are on this channel
- to_wake: AtomicPtr<u8>, // SignalToken for the blocked thread to wake up
-
- port_dropped: AtomicBool, // flag if the channel has been destroyed.
-}
-
-struct ConsumerAddition {
- steals: UnsafeCell<isize>, // How many times has a port received without blocking?
-}
-
-pub enum Failure<T> {
- Empty,
- Disconnected,
- Upgraded(Receiver<T>),
-}
-
-pub enum UpgradeResult {
- UpSuccess,
- UpDisconnected,
- UpWoke(SignalToken),
-}
-
-// Any message could contain an "upgrade request" to a new shared port, so the
-// internal queue it's a queue of T, but rather Message<T>
-enum Message<T> {
- Data(T),
- GoUp(Receiver<T>),
-}
-
-impl<T> Packet<T> {
- pub fn new() -> Packet<T> {
- Packet {
- queue: unsafe {
- spsc::Queue::with_additions(
- 128,
- ProducerAddition {
- cnt: AtomicIsize::new(0),
- to_wake: AtomicPtr::new(EMPTY),
-
- port_dropped: AtomicBool::new(false),
- },
- ConsumerAddition { steals: UnsafeCell::new(0) },
- )
- },
- }
- }
-
- pub fn send(&self, t: T) -> Result<(), T> {
- // If the other port has deterministically gone away, then definitely
- // must return the data back up the stack. Otherwise, the data is
- // considered as being sent.
- if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
- return Err(t);
- }
-
- match self.do_send(Data(t)) {
- UpSuccess | UpDisconnected => {}
- UpWoke(token) => {
- token.signal();
- }
- }
- Ok(())
- }
-
- pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
- // If the port has gone away, then there's no need to proceed any
- // further.
- if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
- return UpDisconnected;
- }
-
- self.do_send(GoUp(up))
- }
-
- fn do_send(&self, t: Message<T>) -> UpgradeResult {
- self.queue.push(t);
- match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) {
- // As described in the mod's doc comment, -1 == wakeup
- -1 => UpWoke(self.take_to_wake()),
- // As described before, SPSC queues must be >= -2
- -2 => UpSuccess,
-
- // Be sure to preserve the disconnected state, and the return value
- // in this case is going to be whether our data was received or not.
- // This manifests itself on whether we have an empty queue or not.
- //
- // Primarily, are required to drain the queue here because the port
- // will never remove this data. We can only have at most one item to
- // drain (the port drains the rest).
- DISCONNECTED => {
- self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
- let first = self.queue.pop();
- let second = self.queue.pop();
- assert!(second.is_none());
-
- match first {
- Some(..) => UpSuccess, // we failed to send the data
- None => UpDisconnected, // we successfully sent data
- }
- }
-
- // Otherwise we just sent some data on a non-waiting queue, so just
- // make sure the world is sane and carry on!
- n => {
- assert!(n >= 0);
- UpSuccess
- }
- }
- }
-
- // Consumes ownership of the 'to_wake' field.
- fn take_to_wake(&self) -> SignalToken {
- let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
- self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
- assert!(ptr != EMPTY);
- unsafe { SignalToken::from_raw(ptr) }
- }
-
- // Decrements the count on the channel for a sleeper, returning the sleeper
- // back if it shouldn't sleep. Note that this is the location where we take
- // steals into account.
- fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
- assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
- let ptr = unsafe { token.to_raw() };
- self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);
-
- let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
-
- match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
- DISCONNECTED => {
- self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
- }
- // If we factor in our steals and notice that the channel has no
- // data, we successfully sleep
- n => {
- assert!(n >= 0);
- if n - steals <= 0 {
- return Ok(());
- }
- }
- }
-
- self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
- Err(unsafe { SignalToken::from_raw(ptr) })
- }
-
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
- // Optimistic preflight check (scheduling is expensive).
- match self.try_recv() {
- Err(Empty) => {}
- data => return data,
- }
-
- // Welp, our channel has no data. Deschedule the current thread and
- // initiate the blocking protocol.
- let (wait_token, signal_token) = blocking::tokens();
- if self.decrement(signal_token).is_ok() {
- if let Some(deadline) = deadline {
- let timed_out = !wait_token.wait_max_until(deadline);
- if timed_out {
- self.abort_selection(/* was_upgrade = */ false).map_err(Upgraded)?;
- }
- } else {
- wait_token.wait();
- }
- }
-
- match self.try_recv() {
- // Messages which actually popped from the queue shouldn't count as
- // a steal, so offset the decrement here (we already have our
- // "steal" factored into the channel count above).
- data @ (Ok(..) | Err(Upgraded(..))) => unsafe {
- *self.queue.consumer_addition().steals.get() -= 1;
- data
- },
-
- data => data,
- }
- }
-
- pub fn try_recv(&self) -> Result<T, Failure<T>> {
- match self.queue.pop() {
- // If we stole some data, record to that effect (this will be
- // factored into cnt later on).
- //
- // Note that we don't allow steals to grow without bound in order to
- // prevent eventual overflow of either steals or cnt as an overflow
- // would have catastrophic results. Sometimes, steals > cnt, but
- // other times cnt > steals, so we don't know the relation between
- // steals and cnt. This code path is executed only rarely, so we do
- // a pretty slow operation, of swapping 0 into cnt, taking steals
- // down as much as possible (without going negative), and then
- // adding back in whatever we couldn't factor into steals.
- Some(data) => unsafe {
- if *self.queue.consumer_addition().steals.get() > MAX_STEALS {
- match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) {
- DISCONNECTED => {
- self.queue
- .producer_addition()
- .cnt
- .store(DISCONNECTED, Ordering::SeqCst);
- }
- n => {
- let m = cmp::min(n, *self.queue.consumer_addition().steals.get());
- *self.queue.consumer_addition().steals.get() -= m;
- self.bump(n - m);
- }
- }
- assert!(*self.queue.consumer_addition().steals.get() >= 0);
- }
- *self.queue.consumer_addition().steals.get() += 1;
- match data {
- Data(t) => Ok(t),
- GoUp(up) => Err(Upgraded(up)),
- }
- },
-
- None => {
- match self.queue.producer_addition().cnt.load(Ordering::SeqCst) {
- n if n != DISCONNECTED => Err(Empty),
-
- // This is a little bit of a tricky case. We failed to pop
- // data above, and then we have viewed that the channel is
- // disconnected. In this window more data could have been
- // sent on the channel. It doesn't really make sense to
- // return that the channel is disconnected when there's
- // actually data on it, so be extra sure there's no data by
- // popping one more time.
- //
- // We can ignore steals because the other end is
- // disconnected and we'll never need to really factor in our
- // steals again.
- _ => match self.queue.pop() {
- Some(Data(t)) => Ok(t),
- Some(GoUp(up)) => Err(Upgraded(up)),
- None => Err(Disconnected),
- },
- }
- }
- }
- }
-
- pub fn drop_chan(&self) {
- // Dropping a channel is pretty simple, we just flag it as disconnected
- // and then wakeup a blocker if there is one.
- match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) {
- -1 => {
- self.take_to_wake().signal();
- }
- DISCONNECTED => {}
- n => {
- assert!(n >= 0);
- }
- }
- }
-
- pub fn drop_port(&self) {
- // Dropping a port seems like a fairly trivial thing. In theory all we
- // need to do is flag that we're disconnected and then everything else
- // can take over (we don't have anyone to wake up).
- //
- // The catch for Ports is that we want to drop the entire contents of
- // the queue. There are multiple reasons for having this property, the
- // largest of which is that if another chan is waiting in this channel
- // (but not received yet), then waiting on that port will cause a
- // deadlock.
- //
- // So if we accept that we must now destroy the entire contents of the
- // queue, this code may make a bit more sense. The tricky part is that
- // we can't let any in-flight sends go un-dropped, we have to make sure
- // *everything* is dropped and nothing new will come onto the channel.
-
- // The first thing we do is set a flag saying that we're done for. All
- // sends are gated on this flag, so we're immediately guaranteed that
- // there are a bounded number of active sends that we'll have to deal
- // with.
- self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst);
-
- // Now that we're guaranteed to deal with a bounded number of senders,
- // we need to drain the queue. This draining process happens atomically
- // with respect to the "count" of the channel. If the count is nonzero
- // (with steals taken into account), then there must be data on the
- // channel. In this case we drain everything and then try again. We will
- // continue to fail while active senders send data while we're dropping
- // data, but eventually we're guaranteed to break out of this loop
- // (because there is a bounded number of senders).
- let mut steals = unsafe { *self.queue.consumer_addition().steals.get() };
- while {
- match self.queue.producer_addition().cnt.compare_exchange(
- steals,
- DISCONNECTED,
- Ordering::SeqCst,
- Ordering::SeqCst,
- ) {
- Ok(_) => false,
- Err(old) => old != DISCONNECTED,
- }
- } {
- while self.queue.pop().is_some() {
- steals += 1;
- }
- }
-
- // At this point in time, we have gated all future senders from sending,
- // and we have flagged the channel as being disconnected. The senders
- // still have some responsibility, however, because some sends might not
- // complete until after we flag the disconnection. There are more
- // details in the sending methods that see DISCONNECTED
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // select implementation
- ////////////////////////////////////////////////////////////////////////////
-
- // increment the count on the channel (used for selection)
- fn bump(&self, amt: isize) -> isize {
- match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) {
- DISCONNECTED => {
- self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
- DISCONNECTED
- }
- n => n,
- }
- }
-
- // Removes a previous thread from being blocked in this port
- pub fn abort_selection(&self, was_upgrade: bool) -> Result<bool, Receiver<T>> {
- // If we're aborting selection after upgrading from a oneshot, then
- // we're guarantee that no one is waiting. The only way that we could
- // have seen the upgrade is if data was actually sent on the channel
- // half again. For us, this means that there is guaranteed to be data on
- // this channel. Furthermore, we're guaranteed that there was no
- // start_selection previously, so there's no need to modify `self.cnt`
- // at all.
- //
- // Hence, because of these invariants, we immediately return `Ok(true)`.
- // Note that the data might not actually be sent on the channel just yet.
- // The other end could have flagged the upgrade but not sent data to
- // this end. This is fine because we know it's a small bounded windows
- // of time until the data is actually sent.
- if was_upgrade {
- assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
- assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
- return Ok(true);
- }
-
- // We want to make sure that the count on the channel goes non-negative,
- // and in the stream case we can have at most one steal, so just assume
- // that we had one steal.
- let steals = 1;
- let prev = self.bump(steals + 1);
-
- // If we were previously disconnected, then we know for sure that there
- // is no thread in to_wake, so just keep going
- let has_data = if prev == DISCONNECTED {
- assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
- true // there is data, that data is that we're disconnected
- } else {
- let cur = prev + steals + 1;
- assert!(cur >= 0);
-
- // If the previous count was negative, then we just made things go
- // positive, hence we passed the -1 boundary and we're responsible
- // for removing the to_wake() field and trashing it.
- //
- // If the previous count was positive then we're in a tougher
- // situation. A possible race is that a sender just incremented
- // through -1 (meaning it's going to try to wake a thread up), but it
- // hasn't yet read the to_wake. In order to prevent a future recv()
- // from waking up too early (this sender picking up the plastered
- // over to_wake), we spin loop here waiting for to_wake to be 0.
- // Note that this entire select() implementation needs an overhaul,
- // and this is *not* the worst part of it, so this is not done as a
- // final solution but rather out of necessity for now to get
- // something working.
- if prev < 0 {
- drop(self.take_to_wake());
- } else {
- while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != EMPTY {
- thread::yield_now();
- }
- }
- unsafe {
- assert_eq!(*self.queue.consumer_addition().steals.get(), 0);
- *self.queue.consumer_addition().steals.get() = steals;
- }
-
- // if we were previously positive, then there's surely data to
- // receive
- prev >= 0
- };
-
- // Now that we've determined that this queue "has data", we peek at the
- // queue to see if the data is an upgrade or not. If it's an upgrade,
- // then we need to destroy this port and abort selection on the
- // upgraded port.
- if has_data {
- match self.queue.peek() {
- Some(&mut GoUp(..)) => match self.queue.pop() {
- Some(GoUp(port)) => Err(port),
- _ => unreachable!(),
- },
- _ => Ok(true),
- }
- } else {
- Ok(false)
- }
- }
-}
-
-impl<T> Drop for Packet<T> {
- fn drop(&mut self) {
- // Note that this load is not only an assert for correctness about
- // disconnection, but also a proper fence before the read of
- // `to_wake`, so this assert cannot be removed with also removing
- // the `to_wake` assert.
- assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
- assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
- }
-}
diff --git a/library/std/src/sync/mpsc/sync.rs b/library/std/src/sync/mpsc/sync.rs
deleted file mode 100644
index 733761671..000000000
--- a/library/std/src/sync/mpsc/sync.rs
+++ /dev/null
@@ -1,495 +0,0 @@
-use self::Blocker::*;
-/// Synchronous channels/ports
-///
-/// This channel implementation differs significantly from the asynchronous
-/// implementations found next to it (oneshot/stream/share). This is an
-/// implementation of a synchronous, bounded buffer channel.
-///
-/// Each channel is created with some amount of backing buffer, and sends will
-/// *block* until buffer space becomes available. A buffer size of 0 is valid,
-/// which means that every successful send is paired with a successful recv.
-///
-/// This flavor of channels defines a new `send_opt` method for channels which
-/// is the method by which a message is sent but the thread does not panic if it
-/// cannot be delivered.
-///
-/// Another major difference is that send() will *always* return back the data
-/// if it couldn't be sent. This is because it is deterministically known when
-/// the data is received and when it is not received.
-///
-/// Implementation-wise, it can all be summed up with "use a mutex plus some
-/// logic". The mutex used here is an OS native mutex, meaning that no user code
-/// is run inside of the mutex (to prevent context switching). This
-/// implementation shares almost all code for the buffered and unbuffered cases
-/// of a synchronous channel. There are a few branches for the unbuffered case,
-/// but they're mostly just relevant to blocking senders.
-pub use self::Failure::*;
-
-use core::intrinsics::abort;
-use core::mem;
-use core::ptr;
-
-use crate::sync::atomic::{AtomicUsize, Ordering};
-use crate::sync::mpsc::blocking::{self, SignalToken, WaitToken};
-use crate::sync::{Mutex, MutexGuard};
-use crate::time::Instant;
-
-const MAX_REFCOUNT: usize = (isize::MAX) as usize;
-
-pub struct Packet<T> {
- /// Only field outside of the mutex. Just done for kicks, but mainly because
- /// the other shared channel already had the code implemented
- channels: AtomicUsize,
-
- lock: Mutex<State<T>>,
-}
-
-unsafe impl<T: Send> Send for Packet<T> {}
-
-unsafe impl<T: Send> Sync for Packet<T> {}
-
-struct State<T> {
- disconnected: bool, // Is the channel disconnected yet?
- queue: Queue, // queue of senders waiting to send data
- blocker: Blocker, // currently blocked thread on this channel
- buf: Buffer<T>, // storage for buffered messages
- cap: usize, // capacity of this channel
-
- /// A curious flag used to indicate whether a sender failed or succeeded in
- /// blocking. This is used to transmit information back to the thread that it
- /// must dequeue its message from the buffer because it was not received.
- /// This is only relevant in the 0-buffer case. This obviously cannot be
- /// safely constructed, but it's guaranteed to always have a valid pointer
- /// value.
- canceled: Option<&'static mut bool>,
-}
-
-unsafe impl<T: Send> Send for State<T> {}
-
-/// Possible flavors of threads who can be blocked on this channel.
-enum Blocker {
- BlockedSender(SignalToken),
- BlockedReceiver(SignalToken),
- NoneBlocked,
-}
-
-/// Simple queue for threading threads together. Nodes are stack-allocated, so
-/// this structure is not safe at all
-struct Queue {
- head: *mut Node,
- tail: *mut Node,
-}
-
-struct Node {
- token: Option<SignalToken>,
- next: *mut Node,
-}
-
-unsafe impl Send for Node {}
-
-/// A simple ring-buffer
-struct Buffer<T> {
- buf: Vec<Option<T>>,
- start: usize,
- size: usize,
-}
-
-#[derive(Debug)]
-pub enum Failure {
- Empty,
- Disconnected,
-}
-
-/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
-/// in the meantime. This re-locks the mutex upon returning.
-fn wait<'a, 'b, T>(
- lock: &'a Mutex<State<T>>,
- mut guard: MutexGuard<'b, State<T>>,
- f: fn(SignalToken) -> Blocker,
-) -> MutexGuard<'a, State<T>> {
- let (wait_token, signal_token) = blocking::tokens();
- match mem::replace(&mut guard.blocker, f(signal_token)) {
- NoneBlocked => {}
- _ => unreachable!(),
- }
- drop(guard); // unlock
- wait_token.wait(); // block
- lock.lock().unwrap() // relock
-}
-
-/// Same as wait, but waiting at most until `deadline`.
-fn wait_timeout_receiver<'a, 'b, T>(
- lock: &'a Mutex<State<T>>,
- deadline: Instant,
- mut guard: MutexGuard<'b, State<T>>,
- success: &mut bool,
-) -> MutexGuard<'a, State<T>> {
- let (wait_token, signal_token) = blocking::tokens();
- match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) {
- NoneBlocked => {}
- _ => unreachable!(),
- }
- drop(guard); // unlock
- *success = wait_token.wait_max_until(deadline); // block
- let mut new_guard = lock.lock().unwrap(); // relock
- if !*success {
- abort_selection(&mut new_guard);
- }
- new_guard
-}
-
-fn abort_selection<T>(guard: &mut MutexGuard<'_, State<T>>) -> bool {
- match mem::replace(&mut guard.blocker, NoneBlocked) {
- NoneBlocked => true,
- BlockedSender(token) => {
- guard.blocker = BlockedSender(token);
- true
- }
- BlockedReceiver(token) => {
- drop(token);
- false
- }
- }
-}
-
-/// Wakes up a thread, dropping the lock at the correct time
-fn wakeup<T>(token: SignalToken, guard: MutexGuard<'_, State<T>>) {
- // We need to be careful to wake up the waiting thread *outside* of the mutex
- // in case it incurs a context switch.
- drop(guard);
- token.signal();
-}
-
-impl<T> Packet<T> {
- pub fn new(capacity: usize) -> Packet<T> {
- Packet {
- channels: AtomicUsize::new(1),
- lock: Mutex::new(State {
- disconnected: false,
- blocker: NoneBlocked,
- cap: capacity,
- canceled: None,
- queue: Queue { head: ptr::null_mut(), tail: ptr::null_mut() },
- buf: Buffer {
- buf: (0..capacity + if capacity == 0 { 1 } else { 0 }).map(|_| None).collect(),
- start: 0,
- size: 0,
- },
- }),
- }
- }
-
- // wait until a send slot is available, returning locked access to
- // the channel state.
- fn acquire_send_slot(&self) -> MutexGuard<'_, State<T>> {
- let mut node = Node { token: None, next: ptr::null_mut() };
- loop {
- let mut guard = self.lock.lock().unwrap();
- // are we ready to go?
- if guard.disconnected || guard.buf.size() < guard.buf.capacity() {
- return guard;
- }
- // no room; actually block
- let wait_token = guard.queue.enqueue(&mut node);
- drop(guard);
- wait_token.wait();
- }
- }
-
- pub fn send(&self, t: T) -> Result<(), T> {
- let mut guard = self.acquire_send_slot();
- if guard.disconnected {
- return Err(t);
- }
- guard.buf.enqueue(t);
-
- match mem::replace(&mut guard.blocker, NoneBlocked) {
- // if our capacity is 0, then we need to wait for a receiver to be
- // available to take our data. After waiting, we check again to make
- // sure the port didn't go away in the meantime. If it did, we need
- // to hand back our data.
- NoneBlocked if guard.cap == 0 => {
- let mut canceled = false;
- assert!(guard.canceled.is_none());
- guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
- let mut guard = wait(&self.lock, guard, BlockedSender);
- if canceled { Err(guard.buf.dequeue()) } else { Ok(()) }
- }
-
- // success, we buffered some data
- NoneBlocked => Ok(()),
-
- // success, someone's about to receive our buffered data.
- BlockedReceiver(token) => {
- wakeup(token, guard);
- Ok(())
- }
-
- BlockedSender(..) => panic!("lolwut"),
- }
- }
-
- pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
- let mut guard = self.lock.lock().unwrap();
- if guard.disconnected {
- Err(super::TrySendError::Disconnected(t))
- } else if guard.buf.size() == guard.buf.capacity() {
- Err(super::TrySendError::Full(t))
- } else if guard.cap == 0 {
- // With capacity 0, even though we have buffer space we can't
- // transfer the data unless there's a receiver waiting.
- match mem::replace(&mut guard.blocker, NoneBlocked) {
- NoneBlocked => Err(super::TrySendError::Full(t)),
- BlockedSender(..) => unreachable!(),
- BlockedReceiver(token) => {
- guard.buf.enqueue(t);
- wakeup(token, guard);
- Ok(())
- }
- }
- } else {
- // If the buffer has some space and the capacity isn't 0, then we
- // just enqueue the data for later retrieval, ensuring to wake up
- // any blocked receiver if there is one.
- assert!(guard.buf.size() < guard.buf.capacity());
- guard.buf.enqueue(t);
- match mem::replace(&mut guard.blocker, NoneBlocked) {
- BlockedReceiver(token) => wakeup(token, guard),
- NoneBlocked => {}
- BlockedSender(..) => unreachable!(),
- }
- Ok(())
- }
- }
-
- // Receives a message from this channel
- //
- // When reading this, remember that there can only ever be one receiver at
- // time.
- pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
- let mut guard = self.lock.lock().unwrap();
-
- let mut woke_up_after_waiting = false;
- // Wait for the buffer to have something in it. No need for a
- // while loop because we're the only receiver.
- if !guard.disconnected && guard.buf.size() == 0 {
- if let Some(deadline) = deadline {
- guard =
- wait_timeout_receiver(&self.lock, deadline, guard, &mut woke_up_after_waiting);
- } else {
- guard = wait(&self.lock, guard, BlockedReceiver);
- woke_up_after_waiting = true;
- }
- }
-
- // N.B., channel could be disconnected while waiting, so the order of
- // these conditionals is important.
- if guard.disconnected && guard.buf.size() == 0 {
- return Err(Disconnected);
- }
-
- // Pick up the data, wake up our neighbors, and carry on
- assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting));
-
- if guard.buf.size() == 0 {
- return Err(Empty);
- }
-
- let ret = guard.buf.dequeue();
- self.wakeup_senders(woke_up_after_waiting, guard);
- Ok(ret)
- }
-
- pub fn try_recv(&self) -> Result<T, Failure> {
- let mut guard = self.lock.lock().unwrap();
-
- // Easy cases first
- if guard.disconnected && guard.buf.size() == 0 {
- return Err(Disconnected);
- }
- if guard.buf.size() == 0 {
- return Err(Empty);
- }
-
- // Be sure to wake up neighbors
- let ret = Ok(guard.buf.dequeue());
- self.wakeup_senders(false, guard);
- ret
- }
-
- // Wake up pending senders after some data has been received
- //
- // * `waited` - flag if the receiver blocked to receive some data, or if it
- // just picked up some data on the way out
- // * `guard` - the lock guard that is held over this channel's lock
- fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<'_, State<T>>) {
- let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
-
- // If this is a no-buffer channel (cap == 0), then if we didn't wait we
- // need to ACK the sender. If we waited, then the sender waking us up
- // was already the ACK.
- let pending_sender2 = if guard.cap == 0 && !waited {
- match mem::replace(&mut guard.blocker, NoneBlocked) {
- NoneBlocked => None,
- BlockedReceiver(..) => unreachable!(),
- BlockedSender(token) => {
- guard.canceled.take();
- Some(token)
- }
- }
- } else {
- None
- };
- mem::drop(guard);
-
- // only outside of the lock do we wake up the pending threads
- if let Some(token) = pending_sender1 {
- token.signal();
- }
- if let Some(token) = pending_sender2 {
- token.signal();
- }
- }
-
- // Prepares this shared packet for a channel clone, essentially just bumping
- // a refcount.
- pub fn clone_chan(&self) {
- let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
-
- // See comments on Arc::clone() on why we do this (for `mem::forget`).
- if old_count > MAX_REFCOUNT {
- abort();
- }
- }
-
- pub fn drop_chan(&self) {
- // Only flag the channel as disconnected if we're the last channel
- match self.channels.fetch_sub(1, Ordering::SeqCst) {
- 1 => {}
- _ => return,
- }
-
- // Not much to do other than wake up a receiver if one's there
- let mut guard = self.lock.lock().unwrap();
- if guard.disconnected {
- return;
- }
- guard.disconnected = true;
- match mem::replace(&mut guard.blocker, NoneBlocked) {
- NoneBlocked => {}
- BlockedSender(..) => unreachable!(),
- BlockedReceiver(token) => wakeup(token, guard),
- }
- }
-
- pub fn drop_port(&self) {
- let mut guard = self.lock.lock().unwrap();
-
- if guard.disconnected {
- return;
- }
- guard.disconnected = true;
-
- // If the capacity is 0, then the sender may want its data back after
- // we're disconnected. Otherwise it's now our responsibility to destroy
- // the buffered data. As with many other portions of this code, this
- // needs to be careful to destroy the data *outside* of the lock to
- // prevent deadlock.
- let _data = if guard.cap != 0 { mem::take(&mut guard.buf.buf) } else { Vec::new() };
- let mut queue =
- mem::replace(&mut guard.queue, Queue { head: ptr::null_mut(), tail: ptr::null_mut() });
-
- let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
- NoneBlocked => None,
- BlockedSender(token) => {
- *guard.canceled.take().unwrap() = true;
- Some(token)
- }
- BlockedReceiver(..) => unreachable!(),
- };
- mem::drop(guard);
-
- while let Some(token) = queue.dequeue() {
- token.signal();
- }
- if let Some(token) = waiter {
- token.signal();
- }
- }
-}
-
-impl<T> Drop for Packet<T> {
- fn drop(&mut self) {
- assert_eq!(self.channels.load(Ordering::SeqCst), 0);
- let mut guard = self.lock.lock().unwrap();
- assert!(guard.queue.dequeue().is_none());
- assert!(guard.canceled.is_none());
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Buffer, a simple ring buffer backed by Vec<T>
-////////////////////////////////////////////////////////////////////////////////
-
-impl<T> Buffer<T> {
- fn enqueue(&mut self, t: T) {
- let pos = (self.start + self.size) % self.buf.len();
- self.size += 1;
- let prev = mem::replace(&mut self.buf[pos], Some(t));
- assert!(prev.is_none());
- }
-
- fn dequeue(&mut self) -> T {
- let start = self.start;
- self.size -= 1;
- self.start = (self.start + 1) % self.buf.len();
- let result = &mut self.buf[start];
- result.take().unwrap()
- }
-
- fn size(&self) -> usize {
- self.size
- }
- fn capacity(&self) -> usize {
- self.buf.len()
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-// Queue, a simple queue to enqueue threads with (stack-allocated nodes)
-////////////////////////////////////////////////////////////////////////////////
-
-impl Queue {
- fn enqueue(&mut self, node: &mut Node) -> WaitToken {
- let (wait_token, signal_token) = blocking::tokens();
- node.token = Some(signal_token);
- node.next = ptr::null_mut();
-
- if self.tail.is_null() {
- self.head = node as *mut Node;
- self.tail = node as *mut Node;
- } else {
- unsafe {
- (*self.tail).next = node as *mut Node;
- self.tail = node as *mut Node;
- }
- }
-
- wait_token
- }
-
- fn dequeue(&mut self) -> Option<SignalToken> {
- if self.head.is_null() {
- return None;
- }
- let node = self.head;
- self.head = unsafe { (*node).next };
- if self.head.is_null() {
- self.tail = ptr::null_mut();
- }
- unsafe {
- (*node).next = ptr::null_mut();
- Some((*node).token.take().unwrap())
- }
- }
-}
diff --git a/library/std/src/sync/mpsc/sync_tests.rs b/library/std/src/sync/mpsc/sync_tests.rs
index 63c794369..9d2f92ffc 100644
--- a/library/std/src/sync/mpsc/sync_tests.rs
+++ b/library/std/src/sync/mpsc/sync_tests.rs
@@ -1,5 +1,6 @@
use super::*;
use crate::env;
+use crate::sync::mpmc::SendTimeoutError;
use crate::thread;
use crate::time::Duration;
@@ -42,6 +43,13 @@ fn recv_timeout() {
}
#[test]
+fn send_timeout() {
+ let (tx, _rx) = sync_channel::<i32>(1);
+ assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Ok(()));
+ assert_eq!(tx.send_timeout(1, Duration::from_millis(1)), Err(SendTimeoutError::Timeout(1)));
+}
+
+#[test]
fn smoke_threads() {
let (tx, rx) = sync_channel::<i32>(0);
let _t = thread::spawn(move || {
diff --git a/library/std/src/sync/mpsc/tests.rs b/library/std/src/sync/mpsc/tests.rs
index f6d0796f6..1e52a4a70 100644
--- a/library/std/src/sync/mpsc/tests.rs
+++ b/library/std/src/sync/mpsc/tests.rs
@@ -706,3 +706,18 @@ fn issue_32114() {
let _ = tx.send(123);
assert_eq!(tx.send(123), Err(SendError(123)));
}
+
+#[test]
+fn issue_39364() {
+ let (tx, rx) = channel::<()>();
+ let t = thread::spawn(move || {
+ thread::sleep(Duration::from_millis(300));
+ let _ = tx.clone();
+ // Don't drop; hand back to caller.
+ tx
+ });
+
+ let _ = rx.recv_timeout(Duration::from_millis(500));
+ let _tx = t.join().unwrap(); // delay dropping until end of test
+ let _ = rx.recv_timeout(Duration::from_millis(500));
+}
diff --git a/library/std/src/sync/mutex.rs b/library/std/src/sync/mutex.rs
index de851c8fb..065045f44 100644
--- a/library/std/src/sync/mutex.rs
+++ b/library/std/src/sync/mutex.rs
@@ -5,7 +5,7 @@ use crate::cell::UnsafeCell;
use crate::fmt;
use crate::ops::{Deref, DerefMut};
use crate::sync::{poison, LockResult, TryLockError, TryLockResult};
-use crate::sys_common::mutex as sys;
+use crate::sys::locks as sys;
/// A mutual exclusion primitive useful for protecting shared data
///
@@ -163,7 +163,7 @@ use crate::sys_common::mutex as sys;
#[stable(feature = "rust1", since = "1.0.0")]
#[cfg_attr(not(test), rustc_diagnostic_item = "Mutex")]
pub struct Mutex<T: ?Sized> {
- inner: sys::MovableMutex,
+ inner: sys::Mutex,
poison: poison::Flag,
data: UnsafeCell<T>,
}
@@ -217,11 +217,7 @@ impl<T> Mutex<T> {
#[rustc_const_stable(feature = "const_locks", since = "1.63.0")]
#[inline]
pub const fn new(t: T) -> Mutex<T> {
- Mutex {
- inner: sys::MovableMutex::new(),
- poison: poison::Flag::new(),
- data: UnsafeCell::new(t),
- }
+ Mutex { inner: sys::Mutex::new(), poison: poison::Flag::new(), data: UnsafeCell::new(t) }
}
}
@@ -264,7 +260,7 @@ impl<T: ?Sized> Mutex<T> {
#[stable(feature = "rust1", since = "1.0.0")]
pub fn lock(&self) -> LockResult<MutexGuard<'_, T>> {
unsafe {
- self.inner.raw_lock();
+ self.inner.lock();
MutexGuard::new(self)
}
}
@@ -526,7 +522,7 @@ impl<T: ?Sized> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
unsafe {
self.lock.poison.done(&self.poison);
- self.lock.inner.raw_unlock();
+ self.lock.inner.unlock();
}
}
}
@@ -545,7 +541,7 @@ impl<T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'_, T> {
}
}
-pub fn guard_lock<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a sys::MovableMutex {
+pub fn guard_lock<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a sys::Mutex {
&guard.lock.inner
}
diff --git a/library/std/src/sync/once_lock.rs b/library/std/src/sync/once_lock.rs
index 37413ec62..16d1fd2a5 100644
--- a/library/std/src/sync/once_lock.rs
+++ b/library/std/src/sync/once_lock.rs
@@ -7,7 +7,9 @@ use crate::sync::Once;
/// A synchronization primitive which can be written to only once.
///
-/// This type is a thread-safe `OnceCell`.
+/// This type is a thread-safe [`OnceCell`], and can be used in statics.
+///
+/// [`OnceCell`]: crate::cell::OnceCell
///
/// # Examples
///
@@ -33,7 +35,7 @@ use crate::sync::Once;
#[unstable(feature = "once_cell", issue = "74465")]
pub struct OnceLock<T> {
once: Once,
- // Whether or not the value is initialized is tracked by `state_and_queue`.
+ // Whether or not the value is initialized is tracked by `once.is_completed()`.
value: UnsafeCell<MaybeUninit<T>>,
/// `PhantomData` to make sure dropck understands we're dropping T in our Drop impl.
///
diff --git a/library/std/src/sync/rwlock.rs b/library/std/src/sync/rwlock.rs
index 8b3877607..7c409cb3e 100644
--- a/library/std/src/sync/rwlock.rs
+++ b/library/std/src/sync/rwlock.rs
@@ -6,7 +6,7 @@ use crate::fmt;
use crate::ops::{Deref, DerefMut};
use crate::ptr::NonNull;
use crate::sync::{poison, LockResult, TryLockError, TryLockResult};
-use crate::sys_common::rwlock as sys;
+use crate::sys::locks as sys;
/// A reader-writer lock
///
@@ -78,7 +78,7 @@ use crate::sys_common::rwlock as sys;
#[stable(feature = "rust1", since = "1.0.0")]
#[cfg_attr(not(test), rustc_diagnostic_item = "RwLock")]
pub struct RwLock<T: ?Sized> {
- inner: sys::MovableRwLock,
+ inner: sys::RwLock,
poison: poison::Flag,
data: UnsafeCell<T>,
}
@@ -109,7 +109,7 @@ pub struct RwLockReadGuard<'a, T: ?Sized + 'a> {
// `NonNull` is also covariant over `T`, just like we would have with `&T`. `NonNull`
// is preferable over `const* T` to allow for niche optimization.
data: NonNull<T>,
- inner_lock: &'a sys::MovableRwLock,
+ inner_lock: &'a sys::RwLock,
}
#[stable(feature = "rust1", since = "1.0.0")]
@@ -158,11 +158,7 @@ impl<T> RwLock<T> {
#[rustc_const_stable(feature = "const_locks", since = "1.63.0")]
#[inline]
pub const fn new(t: T) -> RwLock<T> {
- RwLock {
- inner: sys::MovableRwLock::new(),
- poison: poison::Flag::new(),
- data: UnsafeCell::new(t),
- }
+ RwLock { inner: sys::RwLock::new(), poison: poison::Flag::new(), data: UnsafeCell::new(t) }
}
}