summaryrefslogtreecommitdiffstats
path: root/library/std/src/sync/mpmc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:18:32 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:18:32 +0000
commit4547b622d8d29df964fa2914213088b148c498fc (patch)
tree9fc6b25f3c3add6b745be9a2400a6e96140046e9 /library/std/src/sync/mpmc
parentReleasing progress-linux version 1.66.0+dfsg1-1~progress7.99u1. (diff)
downloadrustc-4547b622d8d29df964fa2914213088b148c498fc.tar.xz
rustc-4547b622d8d29df964fa2914213088b148c498fc.zip
Merging upstream version 1.67.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'library/std/src/sync/mpmc')
-rw-r--r--library/std/src/sync/mpmc/array.rs513
-rw-r--r--library/std/src/sync/mpmc/context.rs155
-rw-r--r--library/std/src/sync/mpmc/counter.rs137
-rw-r--r--library/std/src/sync/mpmc/error.rs46
-rw-r--r--library/std/src/sync/mpmc/list.rs638
-rw-r--r--library/std/src/sync/mpmc/mod.rs430
-rw-r--r--library/std/src/sync/mpmc/select.rs71
-rw-r--r--library/std/src/sync/mpmc/utils.rs143
-rw-r--r--library/std/src/sync/mpmc/waker.rs204
-rw-r--r--library/std/src/sync/mpmc/zero.rs318
10 files changed, 2655 insertions, 0 deletions
diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs
new file mode 100644
index 000000000..c1e3e48b0
--- /dev/null
+++ b/library/std/src/sync/mpmc/array.rs
@@ -0,0 +1,513 @@
+//! Bounded channel based on a preallocated array.
+//!
+//! This flavor has a fixed, positive capacity.
+//!
+//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
+//!
+//! Source:
+//! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
+//! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
+
+use super::context::Context;
+use super::error::*;
+use super::select::{Operation, Selected, Token};
+use super::utils::{Backoff, CachePadded};
+use super::waker::SyncWaker;
+
+use crate::cell::UnsafeCell;
+use crate::mem::MaybeUninit;
+use crate::ptr;
+use crate::sync::atomic::{self, AtomicUsize, Ordering};
+use crate::time::Instant;
+
+/// A slot in a channel.
+struct Slot<T> {
+ /// The current stamp.
+ stamp: AtomicUsize,
+
+ /// The message in this slot.
+ msg: UnsafeCell<MaybeUninit<T>>,
+}
+
+/// The token type for the array flavor.
+#[derive(Debug)]
+pub(crate) struct ArrayToken {
+ /// Slot to read from or write to.
+ slot: *const u8,
+
+ /// Stamp to store into the slot after reading or writing.
+ stamp: usize,
+}
+
+impl Default for ArrayToken {
+ #[inline]
+ fn default() -> Self {
+ ArrayToken { slot: ptr::null(), stamp: 0 }
+ }
+}
+
+/// Bounded channel based on a preallocated array.
+pub(crate) struct Channel<T> {
+ /// The head of the channel.
+ ///
+ /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
+ /// packed into a single `usize`. The lower bits represent the index, while the upper bits
+ /// represent the lap. The mark bit in the head is always zero.
+ ///
+ /// Messages are popped from the head of the channel.
+ head: CachePadded<AtomicUsize>,
+
+ /// The tail of the channel.
+ ///
+ /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
+ /// packed into a single `usize`. The lower bits represent the index, while the upper bits
+ /// represent the lap. The mark bit indicates that the channel is disconnected.
+ ///
+ /// Messages are pushed into the tail of the channel.
+ tail: CachePadded<AtomicUsize>,
+
+ /// The buffer holding slots.
+ buffer: Box<[Slot<T>]>,
+
+ /// The channel capacity.
+ cap: usize,
+
+ /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
+ one_lap: usize,
+
+ /// If this bit is set in the tail, that means the channel is disconnected.
+ mark_bit: usize,
+
+ /// Senders waiting while the channel is full.
+ senders: SyncWaker,
+
+ /// Receivers waiting while the channel is empty and not disconnected.
+ receivers: SyncWaker,
+}
+
+impl<T> Channel<T> {
+ /// Creates a bounded channel of capacity `cap`.
+ pub(crate) fn with_capacity(cap: usize) -> Self {
+ assert!(cap > 0, "capacity must be positive");
+
+ // Compute constants `mark_bit` and `one_lap`.
+ let mark_bit = (cap + 1).next_power_of_two();
+ let one_lap = mark_bit * 2;
+
+ // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
+ let head = 0;
+ // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
+ let tail = 0;
+
+ // Allocate a buffer of `cap` slots initialized
+ // with stamps.
+ let buffer: Box<[Slot<T>]> = (0..cap)
+ .map(|i| {
+ // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
+ Slot { stamp: AtomicUsize::new(i), msg: UnsafeCell::new(MaybeUninit::uninit()) }
+ })
+ .collect();
+
+ Channel {
+ buffer,
+ cap,
+ one_lap,
+ mark_bit,
+ head: CachePadded::new(AtomicUsize::new(head)),
+ tail: CachePadded::new(AtomicUsize::new(tail)),
+ senders: SyncWaker::new(),
+ receivers: SyncWaker::new(),
+ }
+ }
+
+ /// Attempts to reserve a slot for sending a message.
+ fn start_send(&self, token: &mut Token) -> bool {
+ let backoff = Backoff::new();
+ let mut tail = self.tail.load(Ordering::Relaxed);
+
+ loop {
+ // Check if the channel is disconnected.
+ if tail & self.mark_bit != 0 {
+ token.array.slot = ptr::null();
+ token.array.stamp = 0;
+ return true;
+ }
+
+ // Deconstruct the tail.
+ let index = tail & (self.mark_bit - 1);
+ let lap = tail & !(self.one_lap - 1);
+
+ // Inspect the corresponding slot.
+ debug_assert!(index < self.buffer.len());
+ let slot = unsafe { self.buffer.get_unchecked(index) };
+ let stamp = slot.stamp.load(Ordering::Acquire);
+
+ // If the tail and the stamp match, we may attempt to push.
+ if tail == stamp {
+ let new_tail = if index + 1 < self.cap {
+ // Same lap, incremented index.
+ // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
+ tail + 1
+ } else {
+ // One lap forward, index wraps around to zero.
+ // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
+ lap.wrapping_add(self.one_lap)
+ };
+
+ // Try moving the tail.
+ match self.tail.compare_exchange_weak(
+ tail,
+ new_tail,
+ Ordering::SeqCst,
+ Ordering::Relaxed,
+ ) {
+ Ok(_) => {
+ // Prepare the token for the follow-up call to `write`.
+ token.array.slot = slot as *const Slot<T> as *const u8;
+ token.array.stamp = tail + 1;
+ return true;
+ }
+ Err(_) => {
+ backoff.spin_light();
+ tail = self.tail.load(Ordering::Relaxed);
+ }
+ }
+ } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
+ atomic::fence(Ordering::SeqCst);
+ let head = self.head.load(Ordering::Relaxed);
+
+ // If the head lags one lap behind the tail as well...
+ if head.wrapping_add(self.one_lap) == tail {
+ // ...then the channel is full.
+ return false;
+ }
+
+ backoff.spin_light();
+ tail = self.tail.load(Ordering::Relaxed);
+ } else {
+ // Snooze because we need to wait for the stamp to get updated.
+ backoff.spin_heavy();
+ tail = self.tail.load(Ordering::Relaxed);
+ }
+ }
+ }
+
+ /// Writes a message into the channel.
+ pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ // If there is no slot, the channel is disconnected.
+ if token.array.slot.is_null() {
+ return Err(msg);
+ }
+
+ let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
+
+ // Write the message into the slot and update the stamp.
+ slot.msg.get().write(MaybeUninit::new(msg));
+ slot.stamp.store(token.array.stamp, Ordering::Release);
+
+ // Wake a sleeping receiver.
+ self.receivers.notify();
+ Ok(())
+ }
+
+ /// Attempts to reserve a slot for receiving a message.
+ fn start_recv(&self, token: &mut Token) -> bool {
+ let backoff = Backoff::new();
+ let mut head = self.head.load(Ordering::Relaxed);
+
+ loop {
+ // Deconstruct the head.
+ let index = head & (self.mark_bit - 1);
+ let lap = head & !(self.one_lap - 1);
+
+ // Inspect the corresponding slot.
+ debug_assert!(index < self.buffer.len());
+ let slot = unsafe { self.buffer.get_unchecked(index) };
+ let stamp = slot.stamp.load(Ordering::Acquire);
+
+ // If the stamp is ahead of the head by 1, we may attempt to pop.
+ if head + 1 == stamp {
+ let new = if index + 1 < self.cap {
+ // Same lap, incremented index.
+ // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
+ head + 1
+ } else {
+ // One lap forward, index wraps around to zero.
+ // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
+ lap.wrapping_add(self.one_lap)
+ };
+
+ // Try moving the head.
+ match self.head.compare_exchange_weak(
+ head,
+ new,
+ Ordering::SeqCst,
+ Ordering::Relaxed,
+ ) {
+ Ok(_) => {
+ // Prepare the token for the follow-up call to `read`.
+ token.array.slot = slot as *const Slot<T> as *const u8;
+ token.array.stamp = head.wrapping_add(self.one_lap);
+ return true;
+ }
+ Err(_) => {
+ backoff.spin_light();
+ head = self.head.load(Ordering::Relaxed);
+ }
+ }
+ } else if stamp == head {
+ atomic::fence(Ordering::SeqCst);
+ let tail = self.tail.load(Ordering::Relaxed);
+
+ // If the tail equals the head, that means the channel is empty.
+ if (tail & !self.mark_bit) == head {
+ // If the channel is disconnected...
+ if tail & self.mark_bit != 0 {
+ // ...then receive an error.
+ token.array.slot = ptr::null();
+ token.array.stamp = 0;
+ return true;
+ } else {
+ // Otherwise, the receive operation is not ready.
+ return false;
+ }
+ }
+
+ backoff.spin_light();
+ head = self.head.load(Ordering::Relaxed);
+ } else {
+ // Snooze because we need to wait for the stamp to get updated.
+ backoff.spin_heavy();
+ head = self.head.load(Ordering::Relaxed);
+ }
+ }
+ }
+
+ /// Reads a message from the channel.
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ if token.array.slot.is_null() {
+ // The channel is disconnected.
+ return Err(());
+ }
+
+ let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
+
+ // Read the message from the slot and update the stamp.
+ let msg = slot.msg.get().read().assume_init();
+ slot.stamp.store(token.array.stamp, Ordering::Release);
+
+ // Wake a sleeping sender.
+ self.senders.notify();
+ Ok(msg)
+ }
+
+ /// Attempts to send a message into the channel.
+ pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ let token = &mut Token::default();
+ if self.start_send(token) {
+ unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
+ } else {
+ Err(TrySendError::Full(msg))
+ }
+ }
+
+ /// Sends a message into the channel.
+ pub(crate) fn send(
+ &self,
+ msg: T,
+ deadline: Option<Instant>,
+ ) -> Result<(), SendTimeoutError<T>> {
+ let token = &mut Token::default();
+ loop {
+ // Try sending a message several times.
+ let backoff = Backoff::new();
+ loop {
+ if self.start_send(token) {
+ let res = unsafe { self.write(token, msg) };
+ return res.map_err(SendTimeoutError::Disconnected);
+ }
+
+ if backoff.is_completed() {
+ break;
+ } else {
+ backoff.spin_light();
+ }
+ }
+
+ if let Some(d) = deadline {
+ if Instant::now() >= d {
+ return Err(SendTimeoutError::Timeout(msg));
+ }
+ }
+
+ Context::with(|cx| {
+ // Prepare for blocking until a receiver wakes us up.
+ let oper = Operation::hook(token);
+ self.senders.register(oper, cx);
+
+ // Has the channel become ready just now?
+ if !self.is_full() || self.is_disconnected() {
+ let _ = cx.try_select(Selected::Aborted);
+ }
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted | Selected::Disconnected => {
+ self.senders.unregister(oper).unwrap();
+ }
+ Selected::Operation(_) => {}
+ }
+ });
+ }
+ }
+
+ /// Attempts to receive a message without blocking.
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
+ let token = &mut Token::default();
+
+ if self.start_recv(token) {
+ unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
+ } else {
+ Err(TryRecvError::Empty)
+ }
+ }
+
+ /// Receives a message from the channel.
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ let token = &mut Token::default();
+ loop {
+ if self.start_recv(token) {
+ let res = unsafe { self.read(token) };
+ return res.map_err(|_| RecvTimeoutError::Disconnected);
+ }
+
+ if let Some(d) = deadline {
+ if Instant::now() >= d {
+ return Err(RecvTimeoutError::Timeout);
+ }
+ }
+
+ Context::with(|cx| {
+ // Prepare for blocking until a sender wakes us up.
+ let oper = Operation::hook(token);
+ self.receivers.register(oper, cx);
+
+ // Has the channel become ready just now?
+ if !self.is_empty() || self.is_disconnected() {
+ let _ = cx.try_select(Selected::Aborted);
+ }
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted | Selected::Disconnected => {
+ self.receivers.unregister(oper).unwrap();
+ // If the channel was disconnected, we still have to check for remaining
+ // messages.
+ }
+ Selected::Operation(_) => {}
+ }
+ });
+ }
+ }
+
+ /// Returns the current number of messages inside the channel.
+ pub(crate) fn len(&self) -> usize {
+ loop {
+ // Load the tail, then load the head.
+ let tail = self.tail.load(Ordering::SeqCst);
+ let head = self.head.load(Ordering::SeqCst);
+
+ // If the tail didn't change, we've got consistent values to work with.
+ if self.tail.load(Ordering::SeqCst) == tail {
+ let hix = head & (self.mark_bit - 1);
+ let tix = tail & (self.mark_bit - 1);
+
+ return if hix < tix {
+ tix - hix
+ } else if hix > tix {
+ self.cap - hix + tix
+ } else if (tail & !self.mark_bit) == head {
+ 0
+ } else {
+ self.cap
+ };
+ }
+ }
+ }
+
+ /// Returns the capacity of the channel.
+ #[allow(clippy::unnecessary_wraps)] // This is intentional.
+ pub(crate) fn capacity(&self) -> Option<usize> {
+ Some(self.cap)
+ }
+
+ /// Disconnects the channel and wakes up all blocked senders and receivers.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ pub(crate) fn disconnect(&self) -> bool {
+ let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
+
+ if tail & self.mark_bit == 0 {
+ self.senders.disconnect();
+ self.receivers.disconnect();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Returns `true` if the channel is disconnected.
+ pub(crate) fn is_disconnected(&self) -> bool {
+ self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
+ }
+
+ /// Returns `true` if the channel is empty.
+ pub(crate) fn is_empty(&self) -> bool {
+ let head = self.head.load(Ordering::SeqCst);
+ let tail = self.tail.load(Ordering::SeqCst);
+
+ // Is the tail equal to the head?
+ //
+ // Note: If the head changes just before we load the tail, that means there was a moment
+ // when the channel was not empty, so it is safe to just return `false`.
+ (tail & !self.mark_bit) == head
+ }
+
+ /// Returns `true` if the channel is full.
+ pub(crate) fn is_full(&self) -> bool {
+ let tail = self.tail.load(Ordering::SeqCst);
+ let head = self.head.load(Ordering::SeqCst);
+
+ // Is the head lagging one lap behind tail?
+ //
+ // Note: If the tail changes just before we load the head, that means there was a moment
+ // when the channel was not full, so it is safe to just return `false`.
+ head.wrapping_add(self.one_lap) == tail & !self.mark_bit
+ }
+}
+
+impl<T> Drop for Channel<T> {
+ fn drop(&mut self) {
+ // Get the index of the head.
+ let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
+
+ // Loop over all slots that hold a message and drop them.
+ for i in 0..self.len() {
+ // Compute the index of the next slot holding a message.
+ let index = if hix + i < self.cap { hix + i } else { hix + i - self.cap };
+
+ unsafe {
+ debug_assert!(index < self.buffer.len());
+ let slot = self.buffer.get_unchecked_mut(index);
+ let msg = &mut *slot.msg.get();
+ msg.as_mut_ptr().drop_in_place();
+ }
+ }
+ }
+}
diff --git a/library/std/src/sync/mpmc/context.rs b/library/std/src/sync/mpmc/context.rs
new file mode 100644
index 000000000..bbfc6ce00
--- /dev/null
+++ b/library/std/src/sync/mpmc/context.rs
@@ -0,0 +1,155 @@
+//! Thread-local channel context.
+
+use super::select::Selected;
+use super::waker::current_thread_id;
+
+use crate::cell::Cell;
+use crate::ptr;
+use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
+use crate::sync::Arc;
+use crate::thread::{self, Thread};
+use crate::time::Instant;
+
+/// Thread-local context.
+#[derive(Debug, Clone)]
+pub struct Context {
+ inner: Arc<Inner>,
+}
+
+/// Inner representation of `Context`.
+#[derive(Debug)]
+struct Inner {
+ /// Selected operation.
+ select: AtomicUsize,
+
+ /// A slot into which another thread may store a pointer to its `Packet`.
+ packet: AtomicPtr<()>,
+
+ /// Thread handle.
+ thread: Thread,
+
+ /// Thread id.
+ thread_id: usize,
+}
+
+impl Context {
+ /// Creates a new context for the duration of the closure.
+ #[inline]
+ pub fn with<F, R>(f: F) -> R
+ where
+ F: FnOnce(&Context) -> R,
+ {
+ thread_local! {
+ /// Cached thread-local context.
+ static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
+ }
+
+ let mut f = Some(f);
+ let mut f = |cx: &Context| -> R {
+ let f = f.take().unwrap();
+ f(cx)
+ };
+
+ CONTEXT
+ .try_with(|cell| match cell.take() {
+ None => f(&Context::new()),
+ Some(cx) => {
+ cx.reset();
+ let res = f(&cx);
+ cell.set(Some(cx));
+ res
+ }
+ })
+ .unwrap_or_else(|_| f(&Context::new()))
+ }
+
+ /// Creates a new `Context`.
+ #[cold]
+ fn new() -> Context {
+ Context {
+ inner: Arc::new(Inner {
+ select: AtomicUsize::new(Selected::Waiting.into()),
+ packet: AtomicPtr::new(ptr::null_mut()),
+ thread: thread::current(),
+ thread_id: current_thread_id(),
+ }),
+ }
+ }
+
+ /// Resets `select` and `packet`.
+ #[inline]
+ fn reset(&self) {
+ self.inner.select.store(Selected::Waiting.into(), Ordering::Release);
+ self.inner.packet.store(ptr::null_mut(), Ordering::Release);
+ }
+
+ /// Attempts to select an operation.
+ ///
+ /// On failure, the previously selected operation is returned.
+ #[inline]
+ pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
+ self.inner
+ .select
+ .compare_exchange(
+ Selected::Waiting.into(),
+ select.into(),
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ )
+ .map(|_| ())
+ .map_err(|e| e.into())
+ }
+
+ /// Stores a packet.
+ ///
+ /// This method must be called after `try_select` succeeds and there is a packet to provide.
+ #[inline]
+ pub fn store_packet(&self, packet: *mut ()) {
+ if !packet.is_null() {
+ self.inner.packet.store(packet, Ordering::Release);
+ }
+ }
+
+ /// Waits until an operation is selected and returns it.
+ ///
+ /// If the deadline is reached, `Selected::Aborted` will be selected.
+ #[inline]
+ pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
+ loop {
+ // Check whether an operation has been selected.
+ let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
+ if sel != Selected::Waiting {
+ return sel;
+ }
+
+ // If there's a deadline, park the current thread until the deadline is reached.
+ if let Some(end) = deadline {
+ let now = Instant::now();
+
+ if now < end {
+ thread::park_timeout(end - now);
+ } else {
+ // The deadline has been reached. Try aborting select.
+ return match self.try_select(Selected::Aborted) {
+ Ok(()) => Selected::Aborted,
+ Err(s) => s,
+ };
+ }
+ } else {
+ thread::park();
+ }
+ }
+ }
+
+ /// Unparks the thread this context belongs to.
+ #[inline]
+ pub fn unpark(&self) {
+ self.inner.thread.unpark();
+ }
+
+ /// Returns the id of the thread this context belongs to.
+ #[inline]
+ pub fn thread_id(&self) -> usize {
+ self.inner.thread_id
+ }
+}
diff --git a/library/std/src/sync/mpmc/counter.rs b/library/std/src/sync/mpmc/counter.rs
new file mode 100644
index 000000000..a5a6bdc67
--- /dev/null
+++ b/library/std/src/sync/mpmc/counter.rs
@@ -0,0 +1,137 @@
+use crate::ops;
+use crate::process;
+use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+
+/// Reference counter internals.
+struct Counter<C> {
+ /// The number of senders associated with the channel.
+ senders: AtomicUsize,
+
+ /// The number of receivers associated with the channel.
+ receivers: AtomicUsize,
+
+ /// Set to `true` if the last sender or the last receiver reference deallocates the channel.
+ destroy: AtomicBool,
+
+ /// The internal channel.
+ chan: C,
+}
+
+/// Wraps a channel into the reference counter.
+pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
+ let counter = Box::into_raw(Box::new(Counter {
+ senders: AtomicUsize::new(1),
+ receivers: AtomicUsize::new(1),
+ destroy: AtomicBool::new(false),
+ chan,
+ }));
+ let s = Sender { counter };
+ let r = Receiver { counter };
+ (s, r)
+}
+
+/// The sending side.
+pub(crate) struct Sender<C> {
+ counter: *mut Counter<C>,
+}
+
+impl<C> Sender<C> {
+ /// Returns the internal `Counter`.
+ fn counter(&self) -> &Counter<C> {
+ unsafe { &*self.counter }
+ }
+
+ /// Acquires another sender reference.
+ pub(crate) fn acquire(&self) -> Sender<C> {
+ let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
+
+ // Cloning senders and calling `mem::forget` on the clones could potentially overflow the
+ // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
+ // just abort when the count becomes very large.
+ if count > isize::MAX as usize {
+ process::abort();
+ }
+
+ Sender { counter: self.counter }
+ }
+
+ /// Releases the sender reference.
+ ///
+ /// Function `disconnect` will be called if this is the last sender reference.
+ pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
+ if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
+ disconnect(&self.counter().chan);
+
+ if self.counter().destroy.swap(true, Ordering::AcqRel) {
+ drop(Box::from_raw(self.counter));
+ }
+ }
+ }
+}
+
+impl<C> ops::Deref for Sender<C> {
+ type Target = C;
+
+ fn deref(&self) -> &C {
+ &self.counter().chan
+ }
+}
+
+impl<C> PartialEq for Sender<C> {
+ fn eq(&self, other: &Sender<C>) -> bool {
+ self.counter == other.counter
+ }
+}
+
+/// The receiving side.
+pub(crate) struct Receiver<C> {
+ counter: *mut Counter<C>,
+}
+
+impl<C> Receiver<C> {
+ /// Returns the internal `Counter`.
+ fn counter(&self) -> &Counter<C> {
+ unsafe { &*self.counter }
+ }
+
+ /// Acquires another receiver reference.
+ pub(crate) fn acquire(&self) -> Receiver<C> {
+ let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
+
+ // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
+ // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
+ // just abort when the count becomes very large.
+ if count > isize::MAX as usize {
+ process::abort();
+ }
+
+ Receiver { counter: self.counter }
+ }
+
+ /// Releases the receiver reference.
+ ///
+ /// Function `disconnect` will be called if this is the last receiver reference.
+ pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
+ if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
+ disconnect(&self.counter().chan);
+
+ if self.counter().destroy.swap(true, Ordering::AcqRel) {
+ drop(Box::from_raw(self.counter));
+ }
+ }
+ }
+}
+
+impl<C> ops::Deref for Receiver<C> {
+ type Target = C;
+
+ fn deref(&self) -> &C {
+ &self.counter().chan
+ }
+}
+
+impl<C> PartialEq for Receiver<C> {
+ fn eq(&self, other: &Receiver<C>) -> bool {
+ self.counter == other.counter
+ }
+}
diff --git a/library/std/src/sync/mpmc/error.rs b/library/std/src/sync/mpmc/error.rs
new file mode 100644
index 000000000..1b8a1f387
--- /dev/null
+++ b/library/std/src/sync/mpmc/error.rs
@@ -0,0 +1,46 @@
+use crate::error;
+use crate::fmt;
+
+pub use crate::sync::mpsc::{RecvError, RecvTimeoutError, SendError, TryRecvError, TrySendError};
+
+/// An error returned from the [`send_timeout`] method.
+///
+/// The error contains the message being sent so it can be recovered.
+///
+/// [`send_timeout`]: super::Sender::send_timeout
+#[derive(PartialEq, Eq, Clone, Copy)]
+pub enum SendTimeoutError<T> {
+ /// The message could not be sent because the channel is full and the operation timed out.
+ ///
+ /// If this is a zero-capacity channel, then the error indicates that there was no receiver
+ /// available to receive the message and the operation timed out.
+ Timeout(T),
+
+ /// The message could not be sent because the channel is disconnected.
+ Disconnected(T),
+}
+
+impl<T> fmt::Debug for SendTimeoutError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ "SendTimeoutError(..)".fmt(f)
+ }
+}
+
+impl<T> fmt::Display for SendTimeoutError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match *self {
+ SendTimeoutError::Timeout(..) => "timed out waiting on send operation".fmt(f),
+ SendTimeoutError::Disconnected(..) => "sending on a disconnected channel".fmt(f),
+ }
+ }
+}
+
+impl<T: Send> error::Error for SendTimeoutError<T> {}
+
+impl<T> From<SendError<T>> for SendTimeoutError<T> {
+ fn from(err: SendError<T>) -> SendTimeoutError<T> {
+ match err {
+ SendError(e) => SendTimeoutError::Disconnected(e),
+ }
+ }
+}
diff --git a/library/std/src/sync/mpmc/list.rs b/library/std/src/sync/mpmc/list.rs
new file mode 100644
index 000000000..ec6c0726a
--- /dev/null
+++ b/library/std/src/sync/mpmc/list.rs
@@ -0,0 +1,638 @@
+//! Unbounded channel implemented as a linked list.
+
+use super::context::Context;
+use super::error::*;
+use super::select::{Operation, Selected, Token};
+use super::utils::{Backoff, CachePadded};
+use super::waker::SyncWaker;
+
+use crate::cell::UnsafeCell;
+use crate::marker::PhantomData;
+use crate::mem::MaybeUninit;
+use crate::ptr;
+use crate::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
+use crate::time::Instant;
+
+// Bits indicating the state of a slot:
+// * If a message has been written into the slot, `WRITE` is set.
+// * If a message has been read from the slot, `READ` is set.
+// * If the block is being destroyed, `DESTROY` is set.
+const WRITE: usize = 1;
+const READ: usize = 2;
+const DESTROY: usize = 4;
+
+// Each block covers one "lap" of indices.
+const LAP: usize = 32;
+// The maximum number of messages a block can hold.
+const BLOCK_CAP: usize = LAP - 1;
+// How many lower bits are reserved for metadata.
+const SHIFT: usize = 1;
+// Has two different purposes:
+// * If set in head, indicates that the block is not the last one.
+// * If set in tail, indicates that the channel is disconnected.
+const MARK_BIT: usize = 1;
+
+/// A slot in a block.
+struct Slot<T> {
+ /// The message.
+ msg: UnsafeCell<MaybeUninit<T>>,
+
+ /// The state of the slot.
+ state: AtomicUsize,
+}
+
+impl<T> Slot<T> {
+ /// Waits until a message is written into the slot.
+ fn wait_write(&self) {
+ let backoff = Backoff::new();
+ while self.state.load(Ordering::Acquire) & WRITE == 0 {
+ backoff.spin_heavy();
+ }
+ }
+}
+
+/// A block in a linked list.
+///
+/// Each block in the list can hold up to `BLOCK_CAP` messages.
+struct Block<T> {
+ /// The next block in the linked list.
+ next: AtomicPtr<Block<T>>,
+
+ /// Slots for messages.
+ slots: [Slot<T>; BLOCK_CAP],
+}
+
+impl<T> Block<T> {
+ /// Creates an empty block.
+ fn new() -> Block<T> {
+ // SAFETY: This is safe because:
+ // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
+ // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
+ // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
+ // holds a MaybeUninit.
+ // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
+ unsafe { MaybeUninit::zeroed().assume_init() }
+ }
+
+ /// Waits until the next pointer is set.
+ fn wait_next(&self) -> *mut Block<T> {
+ let backoff = Backoff::new();
+ loop {
+ let next = self.next.load(Ordering::Acquire);
+ if !next.is_null() {
+ return next;
+ }
+ backoff.spin_heavy();
+ }
+ }
+
+ /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
+ unsafe fn destroy(this: *mut Block<T>, start: usize) {
+ // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
+ // begun destruction of the block.
+ for i in start..BLOCK_CAP - 1 {
+ let slot = (*this).slots.get_unchecked(i);
+
+ // Mark the `DESTROY` bit if a thread is still using the slot.
+ if slot.state.load(Ordering::Acquire) & READ == 0
+ && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
+ {
+ // If a thread is still using the slot, it will continue destruction of the block.
+ return;
+ }
+ }
+
+ // No thread is using the block, now it is safe to destroy it.
+ drop(Box::from_raw(this));
+ }
+}
+
+/// A position in a channel.
+#[derive(Debug)]
+struct Position<T> {
+ /// The index in the channel.
+ index: AtomicUsize,
+
+ /// The block in the linked list.
+ block: AtomicPtr<Block<T>>,
+}
+
+/// The token type for the list flavor.
+#[derive(Debug)]
+pub(crate) struct ListToken {
+ /// The block of slots.
+ block: *const u8,
+
+ /// The offset into the block.
+ offset: usize,
+}
+
+impl Default for ListToken {
+ #[inline]
+ fn default() -> Self {
+ ListToken { block: ptr::null(), offset: 0 }
+ }
+}
+
+/// Unbounded channel implemented as a linked list.
+///
+/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
+/// represented as numbers of type `usize` and wrap on overflow.
+///
+/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
+/// improve cache efficiency.
+pub(crate) struct Channel<T> {
+ /// The head of the channel.
+ head: CachePadded<Position<T>>,
+
+ /// The tail of the channel.
+ tail: CachePadded<Position<T>>,
+
+ /// Receivers waiting while the channel is empty and not disconnected.
+ receivers: SyncWaker,
+
+ /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
+ _marker: PhantomData<T>,
+}
+
+impl<T> Channel<T> {
+ /// Creates a new unbounded channel.
+ pub(crate) fn new() -> Self {
+ Channel {
+ head: CachePadded::new(Position {
+ block: AtomicPtr::new(ptr::null_mut()),
+ index: AtomicUsize::new(0),
+ }),
+ tail: CachePadded::new(Position {
+ block: AtomicPtr::new(ptr::null_mut()),
+ index: AtomicUsize::new(0),
+ }),
+ receivers: SyncWaker::new(),
+ _marker: PhantomData,
+ }
+ }
+
+ /// Attempts to reserve a slot for sending a message.
+ fn start_send(&self, token: &mut Token) -> bool {
+ let backoff = Backoff::new();
+ let mut tail = self.tail.index.load(Ordering::Acquire);
+ let mut block = self.tail.block.load(Ordering::Acquire);
+ let mut next_block = None;
+
+ loop {
+ // Check if the channel is disconnected.
+ if tail & MARK_BIT != 0 {
+ token.list.block = ptr::null();
+ return true;
+ }
+
+ // Calculate the offset of the index into the block.
+ let offset = (tail >> SHIFT) % LAP;
+
+ // If we reached the end of the block, wait until the next one is installed.
+ if offset == BLOCK_CAP {
+ backoff.spin_heavy();
+ tail = self.tail.index.load(Ordering::Acquire);
+ block = self.tail.block.load(Ordering::Acquire);
+ continue;
+ }
+
+ // If we're going to have to install the next block, allocate it in advance in order to
+ // make the wait for other threads as short as possible.
+ if offset + 1 == BLOCK_CAP && next_block.is_none() {
+ next_block = Some(Box::new(Block::<T>::new()));
+ }
+
+ // If this is the first message to be sent into the channel, we need to allocate the
+ // first block and install it.
+ if block.is_null() {
+ let new = Box::into_raw(Box::new(Block::<T>::new()));
+
+ if self
+ .tail
+ .block
+ .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
+ .is_ok()
+ {
+ self.head.block.store(new, Ordering::Release);
+ block = new;
+ } else {
+ next_block = unsafe { Some(Box::from_raw(new)) };
+ tail = self.tail.index.load(Ordering::Acquire);
+ block = self.tail.block.load(Ordering::Acquire);
+ continue;
+ }
+ }
+
+ let new_tail = tail + (1 << SHIFT);
+
+ // Try advancing the tail forward.
+ match self.tail.index.compare_exchange_weak(
+ tail,
+ new_tail,
+ Ordering::SeqCst,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => unsafe {
+ // If we've reached the end of the block, install the next one.
+ if offset + 1 == BLOCK_CAP {
+ let next_block = Box::into_raw(next_block.unwrap());
+ self.tail.block.store(next_block, Ordering::Release);
+ self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
+ (*block).next.store(next_block, Ordering::Release);
+ }
+
+ token.list.block = block as *const u8;
+ token.list.offset = offset;
+ return true;
+ },
+ Err(_) => {
+ backoff.spin_light();
+ tail = self.tail.index.load(Ordering::Acquire);
+ block = self.tail.block.load(Ordering::Acquire);
+ }
+ }
+ }
+ }
+
+ /// Writes a message into the channel.
+ pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ // If there is no slot, the channel is disconnected.
+ if token.list.block.is_null() {
+ return Err(msg);
+ }
+
+ // Write the message into the slot.
+ let block = token.list.block as *mut Block<T>;
+ let offset = token.list.offset;
+ let slot = (*block).slots.get_unchecked(offset);
+ slot.msg.get().write(MaybeUninit::new(msg));
+ slot.state.fetch_or(WRITE, Ordering::Release);
+
+ // Wake a sleeping receiver.
+ self.receivers.notify();
+ Ok(())
+ }
+
+ /// Attempts to reserve a slot for receiving a message.
+ fn start_recv(&self, token: &mut Token) -> bool {
+ let backoff = Backoff::new();
+ let mut head = self.head.index.load(Ordering::Acquire);
+ let mut block = self.head.block.load(Ordering::Acquire);
+
+ loop {
+ // Calculate the offset of the index into the block.
+ let offset = (head >> SHIFT) % LAP;
+
+ // If we reached the end of the block, wait until the next one is installed.
+ if offset == BLOCK_CAP {
+ backoff.spin_heavy();
+ head = self.head.index.load(Ordering::Acquire);
+ block = self.head.block.load(Ordering::Acquire);
+ continue;
+ }
+
+ let mut new_head = head + (1 << SHIFT);
+
+ if new_head & MARK_BIT == 0 {
+ atomic::fence(Ordering::SeqCst);
+ let tail = self.tail.index.load(Ordering::Relaxed);
+
+ // If the tail equals the head, that means the channel is empty.
+ if head >> SHIFT == tail >> SHIFT {
+ // If the channel is disconnected...
+ if tail & MARK_BIT != 0 {
+ // ...then receive an error.
+ token.list.block = ptr::null();
+ return true;
+ } else {
+ // Otherwise, the receive operation is not ready.
+ return false;
+ }
+ }
+
+ // If head and tail are not in the same block, set `MARK_BIT` in head.
+ if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
+ new_head |= MARK_BIT;
+ }
+ }
+
+ // The block can be null here only if the first message is being sent into the channel.
+ // In that case, just wait until it gets initialized.
+ if block.is_null() {
+ backoff.spin_heavy();
+ head = self.head.index.load(Ordering::Acquire);
+ block = self.head.block.load(Ordering::Acquire);
+ continue;
+ }
+
+ // Try moving the head index forward.
+ match self.head.index.compare_exchange_weak(
+ head,
+ new_head,
+ Ordering::SeqCst,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => unsafe {
+ // If we've reached the end of the block, move to the next one.
+ if offset + 1 == BLOCK_CAP {
+ let next = (*block).wait_next();
+ let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
+ if !(*next).next.load(Ordering::Relaxed).is_null() {
+ next_index |= MARK_BIT;
+ }
+
+ self.head.block.store(next, Ordering::Release);
+ self.head.index.store(next_index, Ordering::Release);
+ }
+
+ token.list.block = block as *const u8;
+ token.list.offset = offset;
+ return true;
+ },
+ Err(_) => {
+ backoff.spin_light();
+ head = self.head.index.load(Ordering::Acquire);
+ block = self.head.block.load(Ordering::Acquire);
+ }
+ }
+ }
+ }
+
+ /// Reads a message from the channel.
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ if token.list.block.is_null() {
+ // The channel is disconnected.
+ return Err(());
+ }
+
+ // Read the message.
+ let block = token.list.block as *mut Block<T>;
+ let offset = token.list.offset;
+ let slot = (*block).slots.get_unchecked(offset);
+ slot.wait_write();
+ let msg = slot.msg.get().read().assume_init();
+
+ // Destroy the block if we've reached the end, or if another thread wanted to destroy but
+ // couldn't because we were busy reading from the slot.
+ if offset + 1 == BLOCK_CAP {
+ Block::destroy(block, 0);
+ } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
+ Block::destroy(block, offset + 1);
+ }
+
+ Ok(msg)
+ }
+
+ /// Attempts to send a message into the channel.
+ pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ self.send(msg, None).map_err(|err| match err {
+ SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
+ SendTimeoutError::Timeout(_) => unreachable!(),
+ })
+ }
+
+ /// Sends a message into the channel.
+ pub(crate) fn send(
+ &self,
+ msg: T,
+ _deadline: Option<Instant>,
+ ) -> Result<(), SendTimeoutError<T>> {
+ let token = &mut Token::default();
+ assert!(self.start_send(token));
+ unsafe { self.write(token, msg).map_err(SendTimeoutError::Disconnected) }
+ }
+
+ /// Attempts to receive a message without blocking.
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
+ let token = &mut Token::default();
+
+ if self.start_recv(token) {
+ unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
+ } else {
+ Err(TryRecvError::Empty)
+ }
+ }
+
+ /// Receives a message from the channel.
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ let token = &mut Token::default();
+ loop {
+ if self.start_recv(token) {
+ unsafe {
+ return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
+ }
+ }
+
+ if let Some(d) = deadline {
+ if Instant::now() >= d {
+ return Err(RecvTimeoutError::Timeout);
+ }
+ }
+
+ // Prepare for blocking until a sender wakes us up.
+ Context::with(|cx| {
+ let oper = Operation::hook(token);
+ self.receivers.register(oper, cx);
+
+ // Has the channel become ready just now?
+ if !self.is_empty() || self.is_disconnected() {
+ let _ = cx.try_select(Selected::Aborted);
+ }
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted | Selected::Disconnected => {
+ self.receivers.unregister(oper).unwrap();
+ // If the channel was disconnected, we still have to check for remaining
+ // messages.
+ }
+ Selected::Operation(_) => {}
+ }
+ });
+ }
+ }
+
+ /// Returns the current number of messages inside the channel.
+ pub(crate) fn len(&self) -> usize {
+ loop {
+ // Load the tail index, then load the head index.
+ let mut tail = self.tail.index.load(Ordering::SeqCst);
+ let mut head = self.head.index.load(Ordering::SeqCst);
+
+ // If the tail index didn't change, we've got consistent indices to work with.
+ if self.tail.index.load(Ordering::SeqCst) == tail {
+ // Erase the lower bits.
+ tail &= !((1 << SHIFT) - 1);
+ head &= !((1 << SHIFT) - 1);
+
+ // Fix up indices if they fall onto block ends.
+ if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
+ tail = tail.wrapping_add(1 << SHIFT);
+ }
+ if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
+ head = head.wrapping_add(1 << SHIFT);
+ }
+
+ // Rotate indices so that head falls into the first block.
+ let lap = (head >> SHIFT) / LAP;
+ tail = tail.wrapping_sub((lap * LAP) << SHIFT);
+ head = head.wrapping_sub((lap * LAP) << SHIFT);
+
+ // Remove the lower bits.
+ tail >>= SHIFT;
+ head >>= SHIFT;
+
+ // Return the difference minus the number of blocks between tail and head.
+ return tail - head - tail / LAP;
+ }
+ }
+ }
+
+ /// Returns the capacity of the channel.
+ pub(crate) fn capacity(&self) -> Option<usize> {
+ None
+ }
+
+ /// Disconnects senders and wakes up all blocked receivers.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ pub(crate) fn disconnect_senders(&self) -> bool {
+ let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
+
+ if tail & MARK_BIT == 0 {
+ self.receivers.disconnect();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Disconnects receivers.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ pub(crate) fn disconnect_receivers(&self) -> bool {
+ let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
+
+ if tail & MARK_BIT == 0 {
+ // If receivers are dropped first, discard all messages to free
+ // memory eagerly.
+ self.discard_all_messages();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Discards all messages.
+ ///
+ /// This method should only be called when all receivers are dropped.
+ fn discard_all_messages(&self) {
+ let backoff = Backoff::new();
+ let mut tail = self.tail.index.load(Ordering::Acquire);
+ loop {
+ let offset = (tail >> SHIFT) % LAP;
+ if offset != BLOCK_CAP {
+ break;
+ }
+
+ // New updates to tail will be rejected by MARK_BIT and aborted unless it's
+ // at boundary. We need to wait for the updates take affect otherwise there
+ // can be memory leaks.
+ backoff.spin_heavy();
+ tail = self.tail.index.load(Ordering::Acquire);
+ }
+
+ let mut head = self.head.index.load(Ordering::Acquire);
+ let mut block = self.head.block.load(Ordering::Acquire);
+
+ unsafe {
+ // Drop all messages between head and tail and deallocate the heap-allocated blocks.
+ while head >> SHIFT != tail >> SHIFT {
+ let offset = (head >> SHIFT) % LAP;
+
+ if offset < BLOCK_CAP {
+ // Drop the message in the slot.
+ let slot = (*block).slots.get_unchecked(offset);
+ slot.wait_write();
+ let p = &mut *slot.msg.get();
+ p.as_mut_ptr().drop_in_place();
+ } else {
+ (*block).wait_next();
+ // Deallocate the block and move to the next one.
+ let next = (*block).next.load(Ordering::Acquire);
+ drop(Box::from_raw(block));
+ block = next;
+ }
+
+ head = head.wrapping_add(1 << SHIFT);
+ }
+
+ // Deallocate the last remaining block.
+ if !block.is_null() {
+ drop(Box::from_raw(block));
+ }
+ }
+ head &= !MARK_BIT;
+ self.head.block.store(ptr::null_mut(), Ordering::Release);
+ self.head.index.store(head, Ordering::Release);
+ }
+
+ /// Returns `true` if the channel is disconnected.
+ pub(crate) fn is_disconnected(&self) -> bool {
+ self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
+ }
+
+ /// Returns `true` if the channel is empty.
+ pub(crate) fn is_empty(&self) -> bool {
+ let head = self.head.index.load(Ordering::SeqCst);
+ let tail = self.tail.index.load(Ordering::SeqCst);
+ head >> SHIFT == tail >> SHIFT
+ }
+
+ /// Returns `true` if the channel is full.
+ pub(crate) fn is_full(&self) -> bool {
+ false
+ }
+}
+
+impl<T> Drop for Channel<T> {
+ fn drop(&mut self) {
+ let mut head = self.head.index.load(Ordering::Relaxed);
+ let mut tail = self.tail.index.load(Ordering::Relaxed);
+ let mut block = self.head.block.load(Ordering::Relaxed);
+
+ // Erase the lower bits.
+ head &= !((1 << SHIFT) - 1);
+ tail &= !((1 << SHIFT) - 1);
+
+ unsafe {
+ // Drop all messages between head and tail and deallocate the heap-allocated blocks.
+ while head != tail {
+ let offset = (head >> SHIFT) % LAP;
+
+ if offset < BLOCK_CAP {
+ // Drop the message in the slot.
+ let slot = (*block).slots.get_unchecked(offset);
+ let p = &mut *slot.msg.get();
+ p.as_mut_ptr().drop_in_place();
+ } else {
+ // Deallocate the block and move to the next one.
+ let next = (*block).next.load(Ordering::Relaxed);
+ drop(Box::from_raw(block));
+ block = next;
+ }
+
+ head = head.wrapping_add(1 << SHIFT);
+ }
+
+ // Deallocate the last remaining block.
+ if !block.is_null() {
+ drop(Box::from_raw(block));
+ }
+ }
+ }
+}
diff --git a/library/std/src/sync/mpmc/mod.rs b/library/std/src/sync/mpmc/mod.rs
new file mode 100644
index 000000000..7a602cecd
--- /dev/null
+++ b/library/std/src/sync/mpmc/mod.rs
@@ -0,0 +1,430 @@
+//! Multi-producer multi-consumer channels.
+
+// This module is not currently exposed publicly, but is used
+// as the implementation for the channels in `sync::mpsc`. The
+// implementation comes from the crossbeam-channel crate:
+//
+// Copyright (c) 2019 The Crossbeam Project Developers
+//
+// Permission is hereby granted, free of charge, to any
+// person obtaining a copy of this software and associated
+// documentation files (the "Software"), to deal in the
+// Software without restriction, including without
+// limitation the rights to use, copy, modify, merge,
+// publish, distribute, sublicense, and/or sell copies of
+// the Software, and to permit persons to whom the Software
+// is furnished to do so, subject to the following
+// conditions:
+//
+// The above copyright notice and this permission notice
+// shall be included in all copies or substantial portions
+// of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+mod array;
+mod context;
+mod counter;
+mod error;
+mod list;
+mod select;
+mod utils;
+mod waker;
+mod zero;
+
+use crate::fmt;
+use crate::panic::{RefUnwindSafe, UnwindSafe};
+use crate::time::{Duration, Instant};
+pub use error::*;
+
+/// Creates a channel of unbounded capacity.
+///
+/// This channel has a growable buffer that can hold any number of messages at a time.
+pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ let (s, r) = counter::new(list::Channel::new());
+ let s = Sender { flavor: SenderFlavor::List(s) };
+ let r = Receiver { flavor: ReceiverFlavor::List(r) };
+ (s, r)
+}
+
+/// Creates a channel of bounded capacity.
+///
+/// This channel has a buffer that can hold at most `cap` messages at a time.
+///
+/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and
+/// receive operations must appear at the same time in order to pair up and pass the message over.
+pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
+ if cap == 0 {
+ let (s, r) = counter::new(zero::Channel::new());
+ let s = Sender { flavor: SenderFlavor::Zero(s) };
+ let r = Receiver { flavor: ReceiverFlavor::Zero(r) };
+ (s, r)
+ } else {
+ let (s, r) = counter::new(array::Channel::with_capacity(cap));
+ let s = Sender { flavor: SenderFlavor::Array(s) };
+ let r = Receiver { flavor: ReceiverFlavor::Array(r) };
+ (s, r)
+ }
+}
+
+/// The sending side of a channel.
+pub struct Sender<T> {
+ flavor: SenderFlavor<T>,
+}
+
+/// Sender flavors.
+enum SenderFlavor<T> {
+ /// Bounded channel based on a preallocated array.
+ Array(counter::Sender<array::Channel<T>>),
+
+ /// Unbounded channel implemented as a linked list.
+ List(counter::Sender<list::Channel<T>>),
+
+ /// Zero-capacity channel.
+ Zero(counter::Sender<zero::Channel<T>>),
+}
+
+unsafe impl<T: Send> Send for Sender<T> {}
+unsafe impl<T: Send> Sync for Sender<T> {}
+
+impl<T> UnwindSafe for Sender<T> {}
+impl<T> RefUnwindSafe for Sender<T> {}
+
+impl<T> Sender<T> {
+ /// Attempts to send a message into the channel without blocking.
+ ///
+ /// This method will either send a message into the channel immediately or return an error if
+ /// the channel is full or disconnected. The returned error contains the original message.
+ ///
+ /// If called on a zero-capacity channel, this method will send the message only if there
+ /// happens to be a receive operation on the other side of the channel at the same time.
+ pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.try_send(msg),
+ SenderFlavor::List(chan) => chan.try_send(msg),
+ SenderFlavor::Zero(chan) => chan.try_send(msg),
+ }
+ }
+
+ /// Blocks the current thread until a message is sent or the channel is disconnected.
+ ///
+ /// If the channel is full and not disconnected, this call will block until the send operation
+ /// can proceed. If the channel becomes disconnected, this call will wake up and return an
+ /// error. The returned error contains the original message.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a receive operation to
+ /// appear on the other side of the channel.
+ pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.send(msg, None),
+ SenderFlavor::List(chan) => chan.send(msg, None),
+ SenderFlavor::Zero(chan) => chan.send(msg, None),
+ }
+ .map_err(|err| match err {
+ SendTimeoutError::Disconnected(msg) => SendError(msg),
+ SendTimeoutError::Timeout(_) => unreachable!(),
+ })
+ }
+}
+
+// The methods below are not used by `sync::mpsc`, but
+// are useful and we'll likely want to expose them
+// eventually
+#[allow(unused)]
+impl<T> Sender<T> {
+ /// Waits for a message to be sent into the channel, but only for a limited time.
+ ///
+ /// If the channel is full and not disconnected, this call will block until the send operation
+ /// can proceed or the operation times out. If the channel becomes disconnected, this call will
+ /// wake up and return an error. The returned error contains the original message.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a receive operation to
+ /// appear on the other side of the channel.
+ pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
+ match Instant::now().checked_add(timeout) {
+ Some(deadline) => self.send_deadline(msg, deadline),
+ // So far in the future that it's practically the same as waiting indefinitely.
+ None => self.send(msg).map_err(SendTimeoutError::from),
+ }
+ }
+
+ /// Waits for a message to be sent into the channel, but only until a given deadline.
+ ///
+ /// If the channel is full and not disconnected, this call will block until the send operation
+ /// can proceed or the operation times out. If the channel becomes disconnected, this call will
+ /// wake up and return an error. The returned error contains the original message.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a receive operation to
+ /// appear on the other side of the channel.
+ pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
+ SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
+ SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
+ }
+ }
+
+ /// Returns `true` if the channel is empty.
+ ///
+ /// Note: Zero-capacity channels are always empty.
+ pub fn is_empty(&self) -> bool {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.is_empty(),
+ SenderFlavor::List(chan) => chan.is_empty(),
+ SenderFlavor::Zero(chan) => chan.is_empty(),
+ }
+ }
+
+ /// Returns `true` if the channel is full.
+ ///
+ /// Note: Zero-capacity channels are always full.
+ pub fn is_full(&self) -> bool {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.is_full(),
+ SenderFlavor::List(chan) => chan.is_full(),
+ SenderFlavor::Zero(chan) => chan.is_full(),
+ }
+ }
+
+ /// Returns the number of messages in the channel.
+ pub fn len(&self) -> usize {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.len(),
+ SenderFlavor::List(chan) => chan.len(),
+ SenderFlavor::Zero(chan) => chan.len(),
+ }
+ }
+
+ /// If the channel is bounded, returns its capacity.
+ pub fn capacity(&self) -> Option<usize> {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.capacity(),
+ SenderFlavor::List(chan) => chan.capacity(),
+ SenderFlavor::Zero(chan) => chan.capacity(),
+ }
+ }
+
+ /// Returns `true` if senders belong to the same channel.
+ pub fn same_channel(&self, other: &Sender<T>) -> bool {
+ match (&self.flavor, &other.flavor) {
+ (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b,
+ (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b,
+ (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b,
+ _ => false,
+ }
+ }
+}
+
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ unsafe {
+ match &self.flavor {
+ SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
+ SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
+ SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
+ }
+ }
+ }
+}
+
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Self {
+ let flavor = match &self.flavor {
+ SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
+ SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
+ SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
+ };
+
+ Sender { flavor }
+ }
+}
+
+impl<T> fmt::Debug for Sender<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Sender { .. }")
+ }
+}
+
+/// The receiving side of a channel.
+pub struct Receiver<T> {
+ flavor: ReceiverFlavor<T>,
+}
+
+/// Receiver flavors.
+enum ReceiverFlavor<T> {
+ /// Bounded channel based on a preallocated array.
+ Array(counter::Receiver<array::Channel<T>>),
+
+ /// Unbounded channel implemented as a linked list.
+ List(counter::Receiver<list::Channel<T>>),
+
+ /// Zero-capacity channel.
+ Zero(counter::Receiver<zero::Channel<T>>),
+}
+
+unsafe impl<T: Send> Send for Receiver<T> {}
+unsafe impl<T: Send> Sync for Receiver<T> {}
+
+impl<T> UnwindSafe for Receiver<T> {}
+impl<T> RefUnwindSafe for Receiver<T> {}
+
+impl<T> Receiver<T> {
+ /// Attempts to receive a message from the channel without blocking.
+ ///
+ /// This method will either receive a message from the channel immediately or return an error
+ /// if the channel is empty.
+ ///
+ /// If called on a zero-capacity channel, this method will receive a message only if there
+ /// happens to be a send operation on the other side of the channel at the same time.
+ pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.try_recv(),
+ ReceiverFlavor::List(chan) => chan.try_recv(),
+ ReceiverFlavor::Zero(chan) => chan.try_recv(),
+ }
+ }
+
+ /// Blocks the current thread until a message is received or the channel is empty and
+ /// disconnected.
+ ///
+ /// If the channel is empty and not disconnected, this call will block until the receive
+ /// operation can proceed. If the channel is empty and becomes disconnected, this call will
+ /// wake up and return an error.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a send operation to appear
+ /// on the other side of the channel.
+ pub fn recv(&self) -> Result<T, RecvError> {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.recv(None),
+ ReceiverFlavor::List(chan) => chan.recv(None),
+ ReceiverFlavor::Zero(chan) => chan.recv(None),
+ }
+ .map_err(|_| RecvError)
+ }
+
+ /// Waits for a message to be received from the channel, but only for a limited time.
+ ///
+ /// If the channel is empty and not disconnected, this call will block until the receive
+ /// operation can proceed or the operation times out. If the channel is empty and becomes
+ /// disconnected, this call will wake up and return an error.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a send operation to appear
+ /// on the other side of the channel.
+ pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
+ match Instant::now().checked_add(timeout) {
+ Some(deadline) => self.recv_deadline(deadline),
+ // So far in the future that it's practically the same as waiting indefinitely.
+ None => self.recv().map_err(RecvTimeoutError::from),
+ }
+ }
+
+ /// Waits for a message to be received from the channel, but only for a limited time.
+ ///
+ /// If the channel is empty and not disconnected, this call will block until the receive
+ /// operation can proceed or the operation times out. If the channel is empty and becomes
+ /// disconnected, this call will wake up and return an error.
+ ///
+ /// If called on a zero-capacity channel, this method will wait for a send operation to appear
+ /// on the other side of the channel.
+ pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
+ ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
+ ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
+ }
+ }
+}
+
+// The methods below are not used by `sync::mpsc`, but
+// are useful and we'll likely want to expose them
+// eventually
+#[allow(unused)]
+impl<T> Receiver<T> {
+ /// Returns `true` if the channel is empty.
+ ///
+ /// Note: Zero-capacity channels are always empty.
+ pub fn is_empty(&self) -> bool {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.is_empty(),
+ ReceiverFlavor::List(chan) => chan.is_empty(),
+ ReceiverFlavor::Zero(chan) => chan.is_empty(),
+ }
+ }
+
+ /// Returns `true` if the channel is full.
+ ///
+ /// Note: Zero-capacity channels are always full.
+ pub fn is_full(&self) -> bool {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.is_full(),
+ ReceiverFlavor::List(chan) => chan.is_full(),
+ ReceiverFlavor::Zero(chan) => chan.is_full(),
+ }
+ }
+
+ /// Returns the number of messages in the channel.
+ pub fn len(&self) -> usize {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.len(),
+ ReceiverFlavor::List(chan) => chan.len(),
+ ReceiverFlavor::Zero(chan) => chan.len(),
+ }
+ }
+
+ /// If the channel is bounded, returns its capacity.
+ pub fn capacity(&self) -> Option<usize> {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.capacity(),
+ ReceiverFlavor::List(chan) => chan.capacity(),
+ ReceiverFlavor::Zero(chan) => chan.capacity(),
+ }
+ }
+
+ /// Returns `true` if receivers belong to the same channel.
+ pub fn same_channel(&self, other: &Receiver<T>) -> bool {
+ match (&self.flavor, &other.flavor) {
+ (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
+ (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
+ (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
+ _ => false,
+ }
+ }
+}
+
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ unsafe {
+ match &self.flavor {
+ ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
+ ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
+ ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
+ }
+ }
+ }
+}
+
+impl<T> Clone for Receiver<T> {
+ fn clone(&self) -> Self {
+ let flavor = match &self.flavor {
+ ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
+ ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
+ ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
+ };
+
+ Receiver { flavor }
+ }
+}
+
+impl<T> fmt::Debug for Receiver<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Receiver { .. }")
+ }
+}
diff --git a/library/std/src/sync/mpmc/select.rs b/library/std/src/sync/mpmc/select.rs
new file mode 100644
index 000000000..56a83fee2
--- /dev/null
+++ b/library/std/src/sync/mpmc/select.rs
@@ -0,0 +1,71 @@
+/// Temporary data that gets initialized during a blocking operation, and is consumed by
+/// `read` or `write`.
+///
+/// Each field contains data associated with a specific channel flavor.
+#[derive(Debug, Default)]
+pub struct Token {
+ pub(crate) array: super::array::ArrayToken,
+ pub(crate) list: super::list::ListToken,
+ #[allow(dead_code)]
+ pub(crate) zero: super::zero::ZeroToken,
+}
+
+/// Identifier associated with an operation by a specific thread on a specific channel.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct Operation(usize);
+
+impl Operation {
+ /// Creates an operation identifier from a mutable reference.
+ ///
+ /// This function essentially just turns the address of the reference into a number. The
+ /// reference should point to a variable that is specific to the thread and the operation,
+ /// and is alive for the entire duration of a blocking operation.
+ #[inline]
+ pub fn hook<T>(r: &mut T) -> Operation {
+ let val = r as *mut T as usize;
+ // Make sure that the pointer address doesn't equal the numerical representation of
+ // `Selected::{Waiting, Aborted, Disconnected}`.
+ assert!(val > 2);
+ Operation(val)
+ }
+}
+
+/// Current state of a blocking operation.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum Selected {
+ /// Still waiting for an operation.
+ Waiting,
+
+ /// The attempt to block the current thread has been aborted.
+ Aborted,
+
+ /// An operation became ready because a channel is disconnected.
+ Disconnected,
+
+ /// An operation became ready because a message can be sent or received.
+ Operation(Operation),
+}
+
+impl From<usize> for Selected {
+ #[inline]
+ fn from(val: usize) -> Selected {
+ match val {
+ 0 => Selected::Waiting,
+ 1 => Selected::Aborted,
+ 2 => Selected::Disconnected,
+ oper => Selected::Operation(Operation(oper)),
+ }
+ }
+}
+
+impl Into<usize> for Selected {
+ #[inline]
+ fn into(self) -> usize {
+ match self {
+ Selected::Waiting => 0,
+ Selected::Aborted => 1,
+ Selected::Disconnected => 2,
+ Selected::Operation(Operation(val)) => val,
+ }
+ }
+}
diff --git a/library/std/src/sync/mpmc/utils.rs b/library/std/src/sync/mpmc/utils.rs
new file mode 100644
index 000000000..cfe42750d
--- /dev/null
+++ b/library/std/src/sync/mpmc/utils.rs
@@ -0,0 +1,143 @@
+use crate::cell::Cell;
+use crate::ops::{Deref, DerefMut};
+
+/// Pads and aligns a value to the length of a cache line.
+#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)]
+// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
+// lines at a time, so we have to align to 128 bytes rather than 64.
+//
+// Sources:
+// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
+// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
+//
+// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
+//
+// Sources:
+// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
+//
+// powerpc64 has 128-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
+#[cfg_attr(
+ any(target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64",),
+ repr(align(128))
+)]
+// arm, mips, mips64, and riscv64 have 32-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7
+#[cfg_attr(
+ any(
+ target_arch = "arm",
+ target_arch = "mips",
+ target_arch = "mips64",
+ target_arch = "riscv64",
+ ),
+ repr(align(32))
+)]
+// s390x has 256-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
+#[cfg_attr(target_arch = "s390x", repr(align(256)))]
+// x86 and wasm have 64-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
+//
+// All others are assumed to have 64-byte cache line size.
+#[cfg_attr(
+ not(any(
+ target_arch = "x86_64",
+ target_arch = "aarch64",
+ target_arch = "powerpc64",
+ target_arch = "arm",
+ target_arch = "mips",
+ target_arch = "mips64",
+ target_arch = "riscv64",
+ target_arch = "s390x",
+ )),
+ repr(align(64))
+)]
+pub struct CachePadded<T> {
+ value: T,
+}
+
+impl<T> CachePadded<T> {
+ /// Pads and aligns a value to the length of a cache line.
+ pub fn new(value: T) -> CachePadded<T> {
+ CachePadded::<T> { value }
+ }
+}
+
+impl<T> Deref for CachePadded<T> {
+ type Target = T;
+
+ fn deref(&self) -> &T {
+ &self.value
+ }
+}
+
+impl<T> DerefMut for CachePadded<T> {
+ fn deref_mut(&mut self) -> &mut T {
+ &mut self.value
+ }
+}
+
+const SPIN_LIMIT: u32 = 6;
+
+/// Performs quadratic backoff in spin loops.
+pub struct Backoff {
+ step: Cell<u32>,
+}
+
+impl Backoff {
+ /// Creates a new `Backoff`.
+ pub fn new() -> Self {
+ Backoff { step: Cell::new(0) }
+ }
+
+ /// Backs off using lightweight spinning.
+ ///
+ /// This method should be used for:
+ /// - Retrying an operation because another thread made progress. i.e. on CAS failure.
+ /// - Waiting for an operation to complete by spinning optimistically for a few iterations
+ /// before falling back to parking the thread (see `Backoff::is_completed`).
+ #[inline]
+ pub fn spin_light(&self) {
+ let step = self.step.get().min(SPIN_LIMIT);
+ for _ in 0..step.pow(2) {
+ crate::hint::spin_loop();
+ }
+
+ self.step.set(self.step.get() + 1);
+ }
+
+ /// Backs off using heavyweight spinning.
+ ///
+ /// This method should be used in blocking loops where parking the thread is not an option.
+ #[inline]
+ pub fn spin_heavy(&self) {
+ if self.step.get() <= SPIN_LIMIT {
+ for _ in 0..self.step.get().pow(2) {
+ crate::hint::spin_loop()
+ }
+ } else {
+ crate::thread::yield_now();
+ }
+
+ self.step.set(self.step.get() + 1);
+ }
+
+ /// Returns `true` if quadratic backoff has completed and parking the thread is advised.
+ #[inline]
+ pub fn is_completed(&self) -> bool {
+ self.step.get() > SPIN_LIMIT
+ }
+}
diff --git a/library/std/src/sync/mpmc/waker.rs b/library/std/src/sync/mpmc/waker.rs
new file mode 100644
index 000000000..4912ca4f8
--- /dev/null
+++ b/library/std/src/sync/mpmc/waker.rs
@@ -0,0 +1,204 @@
+//! Waking mechanism for threads blocked on channel operations.
+
+use super::context::Context;
+use super::select::{Operation, Selected};
+
+use crate::ptr;
+use crate::sync::atomic::{AtomicBool, Ordering};
+use crate::sync::Mutex;
+
+/// Represents a thread blocked on a specific channel operation.
+pub(crate) struct Entry {
+ /// The operation.
+ pub(crate) oper: Operation,
+
+ /// Optional packet.
+ pub(crate) packet: *mut (),
+
+ /// Context associated with the thread owning this operation.
+ pub(crate) cx: Context,
+}
+
+/// A queue of threads blocked on channel operations.
+///
+/// This data structure is used by threads to register blocking operations and get woken up once
+/// an operation becomes ready.
+pub(crate) struct Waker {
+ /// A list of select operations.
+ selectors: Vec<Entry>,
+
+ /// A list of operations waiting to be ready.
+ observers: Vec<Entry>,
+}
+
+impl Waker {
+ /// Creates a new `Waker`.
+ #[inline]
+ pub(crate) fn new() -> Self {
+ Waker { selectors: Vec::new(), observers: Vec::new() }
+ }
+
+ /// Registers a select operation.
+ #[inline]
+ pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
+ self.register_with_packet(oper, ptr::null_mut(), cx);
+ }
+
+ /// Registers a select operation and a packet.
+ #[inline]
+ pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
+ self.selectors.push(Entry { oper, packet, cx: cx.clone() });
+ }
+
+ /// Unregisters a select operation.
+ #[inline]
+ pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
+ if let Some((i, _)) =
+ self.selectors.iter().enumerate().find(|&(_, entry)| entry.oper == oper)
+ {
+ let entry = self.selectors.remove(i);
+ Some(entry)
+ } else {
+ None
+ }
+ }
+
+ /// Attempts to find another thread's entry, select the operation, and wake it up.
+ #[inline]
+ pub(crate) fn try_select(&mut self) -> Option<Entry> {
+ self.selectors
+ .iter()
+ .position(|selector| {
+ // Does the entry belong to a different thread?
+ selector.cx.thread_id() != current_thread_id()
+ && selector // Try selecting this operation.
+ .cx
+ .try_select(Selected::Operation(selector.oper))
+ .is_ok()
+ && {
+ // Provide the packet.
+ selector.cx.store_packet(selector.packet);
+ // Wake the thread up.
+ selector.cx.unpark();
+ true
+ }
+ })
+ // Remove the entry from the queue to keep it clean and improve
+ // performance.
+ .map(|pos| self.selectors.remove(pos))
+ }
+
+ /// Notifies all operations waiting to be ready.
+ #[inline]
+ pub(crate) fn notify(&mut self) {
+ for entry in self.observers.drain(..) {
+ if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
+ entry.cx.unpark();
+ }
+ }
+ }
+
+ /// Notifies all registered operations that the channel is disconnected.
+ #[inline]
+ pub(crate) fn disconnect(&mut self) {
+ for entry in self.selectors.iter() {
+ if entry.cx.try_select(Selected::Disconnected).is_ok() {
+ // Wake the thread up.
+ //
+ // Here we don't remove the entry from the queue. Registered threads must
+ // unregister from the waker by themselves. They might also want to recover the
+ // packet value and destroy it, if necessary.
+ entry.cx.unpark();
+ }
+ }
+
+ self.notify();
+ }
+}
+
+impl Drop for Waker {
+ #[inline]
+ fn drop(&mut self) {
+ debug_assert_eq!(self.selectors.len(), 0);
+ debug_assert_eq!(self.observers.len(), 0);
+ }
+}
+
+/// A waker that can be shared among threads without locking.
+///
+/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
+pub(crate) struct SyncWaker {
+ /// The inner `Waker`.
+ inner: Mutex<Waker>,
+
+ /// `true` if the waker is empty.
+ is_empty: AtomicBool,
+}
+
+impl SyncWaker {
+ /// Creates a new `SyncWaker`.
+ #[inline]
+ pub(crate) fn new() -> Self {
+ SyncWaker { inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true) }
+ }
+
+ /// Registers the current thread with an operation.
+ #[inline]
+ pub(crate) fn register(&self, oper: Operation, cx: &Context) {
+ let mut inner = self.inner.lock().unwrap();
+ inner.register(oper, cx);
+ self.is_empty
+ .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
+ }
+
+ /// Unregisters an operation previously registered by the current thread.
+ #[inline]
+ pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
+ let mut inner = self.inner.lock().unwrap();
+ let entry = inner.unregister(oper);
+ self.is_empty
+ .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
+ entry
+ }
+
+ /// Attempts to find one thread (not the current one), select its operation, and wake it up.
+ #[inline]
+ pub(crate) fn notify(&self) {
+ if !self.is_empty.load(Ordering::SeqCst) {
+ let mut inner = self.inner.lock().unwrap();
+ if !self.is_empty.load(Ordering::SeqCst) {
+ inner.try_select();
+ inner.notify();
+ self.is_empty.store(
+ inner.selectors.is_empty() && inner.observers.is_empty(),
+ Ordering::SeqCst,
+ );
+ }
+ }
+ }
+
+ /// Notifies all threads that the channel is disconnected.
+ #[inline]
+ pub(crate) fn disconnect(&self) {
+ let mut inner = self.inner.lock().unwrap();
+ inner.disconnect();
+ self.is_empty
+ .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst);
+ }
+}
+
+impl Drop for SyncWaker {
+ #[inline]
+ fn drop(&mut self) {
+ debug_assert!(self.is_empty.load(Ordering::SeqCst));
+ }
+}
+
+/// Returns a unique id for the current thread.
+#[inline]
+pub fn current_thread_id() -> usize {
+ // `u8` is not drop so this variable will be available during thread destruction,
+ // whereas `thread::current()` would not be
+ thread_local! { static DUMMY: u8 = 0 }
+ DUMMY.with(|x| (x as *const u8).addr())
+}
diff --git a/library/std/src/sync/mpmc/zero.rs b/library/std/src/sync/mpmc/zero.rs
new file mode 100644
index 000000000..33f768dcb
--- /dev/null
+++ b/library/std/src/sync/mpmc/zero.rs
@@ -0,0 +1,318 @@
+//! Zero-capacity channel.
+//!
+//! This kind of channel is also known as *rendezvous* channel.
+
+use super::context::Context;
+use super::error::*;
+use super::select::{Operation, Selected, Token};
+use super::utils::Backoff;
+use super::waker::Waker;
+
+use crate::cell::UnsafeCell;
+use crate::marker::PhantomData;
+use crate::sync::atomic::{AtomicBool, Ordering};
+use crate::sync::Mutex;
+use crate::time::Instant;
+use crate::{fmt, ptr};
+
+/// A pointer to a packet.
+pub(crate) struct ZeroToken(*mut ());
+
+impl Default for ZeroToken {
+ fn default() -> Self {
+ Self(ptr::null_mut())
+ }
+}
+
+impl fmt::Debug for ZeroToken {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt::Debug::fmt(&(self.0 as usize), f)
+ }
+}
+
+/// A slot for passing one message from a sender to a receiver.
+struct Packet<T> {
+ /// Equals `true` if the packet is allocated on the stack.
+ on_stack: bool,
+
+ /// Equals `true` once the packet is ready for reading or writing.
+ ready: AtomicBool,
+
+ /// The message.
+ msg: UnsafeCell<Option<T>>,
+}
+
+impl<T> Packet<T> {
+ /// Creates an empty packet on the stack.
+ fn empty_on_stack() -> Packet<T> {
+ Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(None) }
+ }
+
+ /// Creates a packet on the stack, containing a message.
+ fn message_on_stack(msg: T) -> Packet<T> {
+ Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(Some(msg)) }
+ }
+
+ /// Waits until the packet becomes ready for reading or writing.
+ fn wait_ready(&self) {
+ let backoff = Backoff::new();
+ while !self.ready.load(Ordering::Acquire) {
+ backoff.spin_heavy();
+ }
+ }
+}
+
+/// Inner representation of a zero-capacity channel.
+struct Inner {
+ /// Senders waiting to pair up with a receive operation.
+ senders: Waker,
+
+ /// Receivers waiting to pair up with a send operation.
+ receivers: Waker,
+
+ /// Equals `true` when the channel is disconnected.
+ is_disconnected: bool,
+}
+
+/// Zero-capacity channel.
+pub(crate) struct Channel<T> {
+ /// Inner representation of the channel.
+ inner: Mutex<Inner>,
+
+ /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
+ _marker: PhantomData<T>,
+}
+
+impl<T> Channel<T> {
+ /// Constructs a new zero-capacity channel.
+ pub(crate) fn new() -> Self {
+ Channel {
+ inner: Mutex::new(Inner {
+ senders: Waker::new(),
+ receivers: Waker::new(),
+ is_disconnected: false,
+ }),
+ _marker: PhantomData,
+ }
+ }
+
+ /// Writes a message into the packet.
+ pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
+ // If there is no packet, the channel is disconnected.
+ if token.zero.0.is_null() {
+ return Err(msg);
+ }
+
+ let packet = &*(token.zero.0 as *const Packet<T>);
+ packet.msg.get().write(Some(msg));
+ packet.ready.store(true, Ordering::Release);
+ Ok(())
+ }
+
+ /// Reads a message from the packet.
+ pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
+ // If there is no packet, the channel is disconnected.
+ if token.zero.0.is_null() {
+ return Err(());
+ }
+
+ let packet = &*(token.zero.0 as *const Packet<T>);
+
+ if packet.on_stack {
+ // The message has been in the packet from the beginning, so there is no need to wait
+ // for it. However, after reading the message, we need to set `ready` to `true` in
+ // order to signal that the packet can be destroyed.
+ let msg = packet.msg.get().replace(None).unwrap();
+ packet.ready.store(true, Ordering::Release);
+ Ok(msg)
+ } else {
+ // Wait until the message becomes available, then read it and destroy the
+ // heap-allocated packet.
+ packet.wait_ready();
+ let msg = packet.msg.get().replace(None).unwrap();
+ drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
+ Ok(msg)
+ }
+ }
+
+ /// Attempts to send a message into the channel.
+ pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock().unwrap();
+
+ // If there's a waiting receiver, pair up with it.
+ if let Some(operation) = inner.receivers.try_select() {
+ token.zero.0 = operation.packet;
+ drop(inner);
+ unsafe {
+ self.write(token, msg).ok().unwrap();
+ }
+ Ok(())
+ } else if inner.is_disconnected {
+ Err(TrySendError::Disconnected(msg))
+ } else {
+ Err(TrySendError::Full(msg))
+ }
+ }
+
+ /// Sends a message into the channel.
+ pub(crate) fn send(
+ &self,
+ msg: T,
+ deadline: Option<Instant>,
+ ) -> Result<(), SendTimeoutError<T>> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock().unwrap();
+
+ // If there's a waiting receiver, pair up with it.
+ if let Some(operation) = inner.receivers.try_select() {
+ token.zero.0 = operation.packet;
+ drop(inner);
+ unsafe {
+ self.write(token, msg).ok().unwrap();
+ }
+ return Ok(());
+ }
+
+ if inner.is_disconnected {
+ return Err(SendTimeoutError::Disconnected(msg));
+ }
+
+ Context::with(|cx| {
+ // Prepare for blocking until a receiver wakes us up.
+ let oper = Operation::hook(token);
+ let mut packet = Packet::<T>::message_on_stack(msg);
+ inner.senders.register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
+ inner.receivers.notify();
+ drop(inner);
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted => {
+ self.inner.lock().unwrap().senders.unregister(oper).unwrap();
+ let msg = unsafe { packet.msg.get().replace(None).unwrap() };
+ Err(SendTimeoutError::Timeout(msg))
+ }
+ Selected::Disconnected => {
+ self.inner.lock().unwrap().senders.unregister(oper).unwrap();
+ let msg = unsafe { packet.msg.get().replace(None).unwrap() };
+ Err(SendTimeoutError::Disconnected(msg))
+ }
+ Selected::Operation(_) => {
+ // Wait until the message is read, then drop the packet.
+ packet.wait_ready();
+ Ok(())
+ }
+ }
+ })
+ }
+
+ /// Attempts to receive a message without blocking.
+ pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock().unwrap();
+
+ // If there's a waiting sender, pair up with it.
+ if let Some(operation) = inner.senders.try_select() {
+ token.zero.0 = operation.packet;
+ drop(inner);
+ unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
+ } else if inner.is_disconnected {
+ Err(TryRecvError::Disconnected)
+ } else {
+ Err(TryRecvError::Empty)
+ }
+ }
+
+ /// Receives a message from the channel.
+ pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
+ let token = &mut Token::default();
+ let mut inner = self.inner.lock().unwrap();
+
+ // If there's a waiting sender, pair up with it.
+ if let Some(operation) = inner.senders.try_select() {
+ token.zero.0 = operation.packet;
+ drop(inner);
+ unsafe {
+ return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
+ }
+ }
+
+ if inner.is_disconnected {
+ return Err(RecvTimeoutError::Disconnected);
+ }
+
+ Context::with(|cx| {
+ // Prepare for blocking until a sender wakes us up.
+ let oper = Operation::hook(token);
+ let mut packet = Packet::<T>::empty_on_stack();
+ inner.receivers.register_with_packet(
+ oper,
+ &mut packet as *mut Packet<T> as *mut (),
+ cx,
+ );
+ inner.senders.notify();
+ drop(inner);
+
+ // Block the current thread.
+ let sel = cx.wait_until(deadline);
+
+ match sel {
+ Selected::Waiting => unreachable!(),
+ Selected::Aborted => {
+ self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
+ Err(RecvTimeoutError::Timeout)
+ }
+ Selected::Disconnected => {
+ self.inner.lock().unwrap().receivers.unregister(oper).unwrap();
+ Err(RecvTimeoutError::Disconnected)
+ }
+ Selected::Operation(_) => {
+ // Wait until the message is provided, then read it.
+ packet.wait_ready();
+ unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
+ }
+ }
+ })
+ }
+
+ /// Disconnects the channel and wakes up all blocked senders and receivers.
+ ///
+ /// Returns `true` if this call disconnected the channel.
+ pub(crate) fn disconnect(&self) -> bool {
+ let mut inner = self.inner.lock().unwrap();
+
+ if !inner.is_disconnected {
+ inner.is_disconnected = true;
+ inner.senders.disconnect();
+ inner.receivers.disconnect();
+ true
+ } else {
+ false
+ }
+ }
+
+ /// Returns the current number of messages inside the channel.
+ pub(crate) fn len(&self) -> usize {
+ 0
+ }
+
+ /// Returns the capacity of the channel.
+ #[allow(clippy::unnecessary_wraps)] // This is intentional.
+ pub(crate) fn capacity(&self) -> Option<usize> {
+ Some(0)
+ }
+
+ /// Returns `true` if the channel is empty.
+ pub(crate) fn is_empty(&self) -> bool {
+ true
+ }
+
+ /// Returns `true` if the channel is full.
+ pub(crate) fn is_full(&self) -> bool {
+ true
+ }
+}