From 4547b622d8d29df964fa2914213088b148c498fc Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 17 Apr 2024 14:18:32 +0200 Subject: Merging upstream version 1.67.1+dfsg1. Signed-off-by: Daniel Baumann --- library/std/src/sync/mpmc/array.rs | 513 ++++++++++++++++++++++++++++ library/std/src/sync/mpmc/context.rs | 155 +++++++++ library/std/src/sync/mpmc/counter.rs | 137 ++++++++ library/std/src/sync/mpmc/error.rs | 46 +++ library/std/src/sync/mpmc/list.rs | 638 +++++++++++++++++++++++++++++++++++ library/std/src/sync/mpmc/mod.rs | 430 +++++++++++++++++++++++ library/std/src/sync/mpmc/select.rs | 71 ++++ library/std/src/sync/mpmc/utils.rs | 143 ++++++++ library/std/src/sync/mpmc/waker.rs | 204 +++++++++++ library/std/src/sync/mpmc/zero.rs | 318 +++++++++++++++++ 10 files changed, 2655 insertions(+) create mode 100644 library/std/src/sync/mpmc/array.rs create mode 100644 library/std/src/sync/mpmc/context.rs create mode 100644 library/std/src/sync/mpmc/counter.rs create mode 100644 library/std/src/sync/mpmc/error.rs create mode 100644 library/std/src/sync/mpmc/list.rs create mode 100644 library/std/src/sync/mpmc/mod.rs create mode 100644 library/std/src/sync/mpmc/select.rs create mode 100644 library/std/src/sync/mpmc/utils.rs create mode 100644 library/std/src/sync/mpmc/waker.rs create mode 100644 library/std/src/sync/mpmc/zero.rs (limited to 'library/std/src/sync/mpmc') diff --git a/library/std/src/sync/mpmc/array.rs b/library/std/src/sync/mpmc/array.rs new file mode 100644 index 000000000..c1e3e48b0 --- /dev/null +++ b/library/std/src/sync/mpmc/array.rs @@ -0,0 +1,513 @@ +//! Bounded channel based on a preallocated array. +//! +//! This flavor has a fixed, positive capacity. +//! +//! The implementation is based on Dmitry Vyukov's bounded MPMC queue. +//! +//! Source: +//! - +//! - + +use super::context::Context; +use super::error::*; +use super::select::{Operation, Selected, Token}; +use super::utils::{Backoff, CachePadded}; +use super::waker::SyncWaker; + +use crate::cell::UnsafeCell; +use crate::mem::MaybeUninit; +use crate::ptr; +use crate::sync::atomic::{self, AtomicUsize, Ordering}; +use crate::time::Instant; + +/// A slot in a channel. +struct Slot { + /// The current stamp. + stamp: AtomicUsize, + + /// The message in this slot. + msg: UnsafeCell>, +} + +/// The token type for the array flavor. +#[derive(Debug)] +pub(crate) struct ArrayToken { + /// Slot to read from or write to. + slot: *const u8, + + /// Stamp to store into the slot after reading or writing. + stamp: usize, +} + +impl Default for ArrayToken { + #[inline] + fn default() -> Self { + ArrayToken { slot: ptr::null(), stamp: 0 } + } +} + +/// Bounded channel based on a preallocated array. +pub(crate) struct Channel { + /// The head of the channel. + /// + /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but + /// packed into a single `usize`. The lower bits represent the index, while the upper bits + /// represent the lap. The mark bit in the head is always zero. + /// + /// Messages are popped from the head of the channel. + head: CachePadded, + + /// The tail of the channel. + /// + /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but + /// packed into a single `usize`. The lower bits represent the index, while the upper bits + /// represent the lap. The mark bit indicates that the channel is disconnected. + /// + /// Messages are pushed into the tail of the channel. + tail: CachePadded, + + /// The buffer holding slots. + buffer: Box<[Slot]>, + + /// The channel capacity. + cap: usize, + + /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`. + one_lap: usize, + + /// If this bit is set in the tail, that means the channel is disconnected. + mark_bit: usize, + + /// Senders waiting while the channel is full. + senders: SyncWaker, + + /// Receivers waiting while the channel is empty and not disconnected. + receivers: SyncWaker, +} + +impl Channel { + /// Creates a bounded channel of capacity `cap`. + pub(crate) fn with_capacity(cap: usize) -> Self { + assert!(cap > 0, "capacity must be positive"); + + // Compute constants `mark_bit` and `one_lap`. + let mark_bit = (cap + 1).next_power_of_two(); + let one_lap = mark_bit * 2; + + // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`. + let head = 0; + // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`. + let tail = 0; + + // Allocate a buffer of `cap` slots initialized + // with stamps. + let buffer: Box<[Slot]> = (0..cap) + .map(|i| { + // Set the stamp to `{ lap: 0, mark: 0, index: i }`. + Slot { stamp: AtomicUsize::new(i), msg: UnsafeCell::new(MaybeUninit::uninit()) } + }) + .collect(); + + Channel { + buffer, + cap, + one_lap, + mark_bit, + head: CachePadded::new(AtomicUsize::new(head)), + tail: CachePadded::new(AtomicUsize::new(tail)), + senders: SyncWaker::new(), + receivers: SyncWaker::new(), + } + } + + /// Attempts to reserve a slot for sending a message. + fn start_send(&self, token: &mut Token) -> bool { + let backoff = Backoff::new(); + let mut tail = self.tail.load(Ordering::Relaxed); + + loop { + // Check if the channel is disconnected. + if tail & self.mark_bit != 0 { + token.array.slot = ptr::null(); + token.array.stamp = 0; + return true; + } + + // Deconstruct the tail. + let index = tail & (self.mark_bit - 1); + let lap = tail & !(self.one_lap - 1); + + // Inspect the corresponding slot. + debug_assert!(index < self.buffer.len()); + let slot = unsafe { self.buffer.get_unchecked(index) }; + let stamp = slot.stamp.load(Ordering::Acquire); + + // If the tail and the stamp match, we may attempt to push. + if tail == stamp { + let new_tail = if index + 1 < self.cap { + // Same lap, incremented index. + // Set to `{ lap: lap, mark: 0, index: index + 1 }`. + tail + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + + // Try moving the tail. + match self.tail.compare_exchange_weak( + tail, + new_tail, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => { + // Prepare the token for the follow-up call to `write`. + token.array.slot = slot as *const Slot as *const u8; + token.array.stamp = tail + 1; + return true; + } + Err(_) => { + backoff.spin_light(); + tail = self.tail.load(Ordering::Relaxed); + } + } + } else if stamp.wrapping_add(self.one_lap) == tail + 1 { + atomic::fence(Ordering::SeqCst); + let head = self.head.load(Ordering::Relaxed); + + // If the head lags one lap behind the tail as well... + if head.wrapping_add(self.one_lap) == tail { + // ...then the channel is full. + return false; + } + + backoff.spin_light(); + tail = self.tail.load(Ordering::Relaxed); + } else { + // Snooze because we need to wait for the stamp to get updated. + backoff.spin_heavy(); + tail = self.tail.load(Ordering::Relaxed); + } + } + } + + /// Writes a message into the channel. + pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { + // If there is no slot, the channel is disconnected. + if token.array.slot.is_null() { + return Err(msg); + } + + let slot: &Slot = &*(token.array.slot as *const Slot); + + // Write the message into the slot and update the stamp. + slot.msg.get().write(MaybeUninit::new(msg)); + slot.stamp.store(token.array.stamp, Ordering::Release); + + // Wake a sleeping receiver. + self.receivers.notify(); + Ok(()) + } + + /// Attempts to reserve a slot for receiving a message. + fn start_recv(&self, token: &mut Token) -> bool { + let backoff = Backoff::new(); + let mut head = self.head.load(Ordering::Relaxed); + + loop { + // Deconstruct the head. + let index = head & (self.mark_bit - 1); + let lap = head & !(self.one_lap - 1); + + // Inspect the corresponding slot. + debug_assert!(index < self.buffer.len()); + let slot = unsafe { self.buffer.get_unchecked(index) }; + let stamp = slot.stamp.load(Ordering::Acquire); + + // If the stamp is ahead of the head by 1, we may attempt to pop. + if head + 1 == stamp { + let new = if index + 1 < self.cap { + // Same lap, incremented index. + // Set to `{ lap: lap, mark: 0, index: index + 1 }`. + head + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + + // Try moving the head. + match self.head.compare_exchange_weak( + head, + new, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => { + // Prepare the token for the follow-up call to `read`. + token.array.slot = slot as *const Slot as *const u8; + token.array.stamp = head.wrapping_add(self.one_lap); + return true; + } + Err(_) => { + backoff.spin_light(); + head = self.head.load(Ordering::Relaxed); + } + } + } else if stamp == head { + atomic::fence(Ordering::SeqCst); + let tail = self.tail.load(Ordering::Relaxed); + + // If the tail equals the head, that means the channel is empty. + if (tail & !self.mark_bit) == head { + // If the channel is disconnected... + if tail & self.mark_bit != 0 { + // ...then receive an error. + token.array.slot = ptr::null(); + token.array.stamp = 0; + return true; + } else { + // Otherwise, the receive operation is not ready. + return false; + } + } + + backoff.spin_light(); + head = self.head.load(Ordering::Relaxed); + } else { + // Snooze because we need to wait for the stamp to get updated. + backoff.spin_heavy(); + head = self.head.load(Ordering::Relaxed); + } + } + } + + /// Reads a message from the channel. + pub(crate) unsafe fn read(&self, token: &mut Token) -> Result { + if token.array.slot.is_null() { + // The channel is disconnected. + return Err(()); + } + + let slot: &Slot = &*(token.array.slot as *const Slot); + + // Read the message from the slot and update the stamp. + let msg = slot.msg.get().read().assume_init(); + slot.stamp.store(token.array.stamp, Ordering::Release); + + // Wake a sleeping sender. + self.senders.notify(); + Ok(msg) + } + + /// Attempts to send a message into the channel. + pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError> { + let token = &mut Token::default(); + if self.start_send(token) { + unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) } + } else { + Err(TrySendError::Full(msg)) + } + } + + /// Sends a message into the channel. + pub(crate) fn send( + &self, + msg: T, + deadline: Option, + ) -> Result<(), SendTimeoutError> { + let token = &mut Token::default(); + loop { + // Try sending a message several times. + let backoff = Backoff::new(); + loop { + if self.start_send(token) { + let res = unsafe { self.write(token, msg) }; + return res.map_err(SendTimeoutError::Disconnected); + } + + if backoff.is_completed() { + break; + } else { + backoff.spin_light(); + } + } + + if let Some(d) = deadline { + if Instant::now() >= d { + return Err(SendTimeoutError::Timeout(msg)); + } + } + + Context::with(|cx| { + // Prepare for blocking until a receiver wakes us up. + let oper = Operation::hook(token); + self.senders.register(oper, cx); + + // Has the channel become ready just now? + if !self.is_full() || self.is_disconnected() { + let _ = cx.try_select(Selected::Aborted); + } + + // Block the current thread. + let sel = cx.wait_until(deadline); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted | Selected::Disconnected => { + self.senders.unregister(oper).unwrap(); + } + Selected::Operation(_) => {} + } + }); + } + } + + /// Attempts to receive a message without blocking. + pub(crate) fn try_recv(&self) -> Result { + let token = &mut Token::default(); + + if self.start_recv(token) { + unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } + } else { + Err(TryRecvError::Empty) + } + } + + /// Receives a message from the channel. + pub(crate) fn recv(&self, deadline: Option) -> Result { + let token = &mut Token::default(); + loop { + if self.start_recv(token) { + let res = unsafe { self.read(token) }; + return res.map_err(|_| RecvTimeoutError::Disconnected); + } + + if let Some(d) = deadline { + if Instant::now() >= d { + return Err(RecvTimeoutError::Timeout); + } + } + + Context::with(|cx| { + // Prepare for blocking until a sender wakes us up. + let oper = Operation::hook(token); + self.receivers.register(oper, cx); + + // Has the channel become ready just now? + if !self.is_empty() || self.is_disconnected() { + let _ = cx.try_select(Selected::Aborted); + } + + // Block the current thread. + let sel = cx.wait_until(deadline); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted | Selected::Disconnected => { + self.receivers.unregister(oper).unwrap(); + // If the channel was disconnected, we still have to check for remaining + // messages. + } + Selected::Operation(_) => {} + } + }); + } + } + + /// Returns the current number of messages inside the channel. + pub(crate) fn len(&self) -> usize { + loop { + // Load the tail, then load the head. + let tail = self.tail.load(Ordering::SeqCst); + let head = self.head.load(Ordering::SeqCst); + + // If the tail didn't change, we've got consistent values to work with. + if self.tail.load(Ordering::SeqCst) == tail { + let hix = head & (self.mark_bit - 1); + let tix = tail & (self.mark_bit - 1); + + return if hix < tix { + tix - hix + } else if hix > tix { + self.cap - hix + tix + } else if (tail & !self.mark_bit) == head { + 0 + } else { + self.cap + }; + } + } + } + + /// Returns the capacity of the channel. + #[allow(clippy::unnecessary_wraps)] // This is intentional. + pub(crate) fn capacity(&self) -> Option { + Some(self.cap) + } + + /// Disconnects the channel and wakes up all blocked senders and receivers. + /// + /// Returns `true` if this call disconnected the channel. + pub(crate) fn disconnect(&self) -> bool { + let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst); + + if tail & self.mark_bit == 0 { + self.senders.disconnect(); + self.receivers.disconnect(); + true + } else { + false + } + } + + /// Returns `true` if the channel is disconnected. + pub(crate) fn is_disconnected(&self) -> bool { + self.tail.load(Ordering::SeqCst) & self.mark_bit != 0 + } + + /// Returns `true` if the channel is empty. + pub(crate) fn is_empty(&self) -> bool { + let head = self.head.load(Ordering::SeqCst); + let tail = self.tail.load(Ordering::SeqCst); + + // Is the tail equal to the head? + // + // Note: If the head changes just before we load the tail, that means there was a moment + // when the channel was not empty, so it is safe to just return `false`. + (tail & !self.mark_bit) == head + } + + /// Returns `true` if the channel is full. + pub(crate) fn is_full(&self) -> bool { + let tail = self.tail.load(Ordering::SeqCst); + let head = self.head.load(Ordering::SeqCst); + + // Is the head lagging one lap behind tail? + // + // Note: If the tail changes just before we load the head, that means there was a moment + // when the channel was not full, so it is safe to just return `false`. + head.wrapping_add(self.one_lap) == tail & !self.mark_bit + } +} + +impl Drop for Channel { + fn drop(&mut self) { + // Get the index of the head. + let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1); + + // Loop over all slots that hold a message and drop them. + for i in 0..self.len() { + // Compute the index of the next slot holding a message. + let index = if hix + i < self.cap { hix + i } else { hix + i - self.cap }; + + unsafe { + debug_assert!(index < self.buffer.len()); + let slot = self.buffer.get_unchecked_mut(index); + let msg = &mut *slot.msg.get(); + msg.as_mut_ptr().drop_in_place(); + } + } + } +} diff --git a/library/std/src/sync/mpmc/context.rs b/library/std/src/sync/mpmc/context.rs new file mode 100644 index 000000000..bbfc6ce00 --- /dev/null +++ b/library/std/src/sync/mpmc/context.rs @@ -0,0 +1,155 @@ +//! Thread-local channel context. + +use super::select::Selected; +use super::waker::current_thread_id; + +use crate::cell::Cell; +use crate::ptr; +use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use crate::sync::Arc; +use crate::thread::{self, Thread}; +use crate::time::Instant; + +/// Thread-local context. +#[derive(Debug, Clone)] +pub struct Context { + inner: Arc, +} + +/// Inner representation of `Context`. +#[derive(Debug)] +struct Inner { + /// Selected operation. + select: AtomicUsize, + + /// A slot into which another thread may store a pointer to its `Packet`. + packet: AtomicPtr<()>, + + /// Thread handle. + thread: Thread, + + /// Thread id. + thread_id: usize, +} + +impl Context { + /// Creates a new context for the duration of the closure. + #[inline] + pub fn with(f: F) -> R + where + F: FnOnce(&Context) -> R, + { + thread_local! { + /// Cached thread-local context. + static CONTEXT: Cell> = Cell::new(Some(Context::new())); + } + + let mut f = Some(f); + let mut f = |cx: &Context| -> R { + let f = f.take().unwrap(); + f(cx) + }; + + CONTEXT + .try_with(|cell| match cell.take() { + None => f(&Context::new()), + Some(cx) => { + cx.reset(); + let res = f(&cx); + cell.set(Some(cx)); + res + } + }) + .unwrap_or_else(|_| f(&Context::new())) + } + + /// Creates a new `Context`. + #[cold] + fn new() -> Context { + Context { + inner: Arc::new(Inner { + select: AtomicUsize::new(Selected::Waiting.into()), + packet: AtomicPtr::new(ptr::null_mut()), + thread: thread::current(), + thread_id: current_thread_id(), + }), + } + } + + /// Resets `select` and `packet`. + #[inline] + fn reset(&self) { + self.inner.select.store(Selected::Waiting.into(), Ordering::Release); + self.inner.packet.store(ptr::null_mut(), Ordering::Release); + } + + /// Attempts to select an operation. + /// + /// On failure, the previously selected operation is returned. + #[inline] + pub fn try_select(&self, select: Selected) -> Result<(), Selected> { + self.inner + .select + .compare_exchange( + Selected::Waiting.into(), + select.into(), + Ordering::AcqRel, + Ordering::Acquire, + ) + .map(|_| ()) + .map_err(|e| e.into()) + } + + /// Stores a packet. + /// + /// This method must be called after `try_select` succeeds and there is a packet to provide. + #[inline] + pub fn store_packet(&self, packet: *mut ()) { + if !packet.is_null() { + self.inner.packet.store(packet, Ordering::Release); + } + } + + /// Waits until an operation is selected and returns it. + /// + /// If the deadline is reached, `Selected::Aborted` will be selected. + #[inline] + pub fn wait_until(&self, deadline: Option) -> Selected { + loop { + // Check whether an operation has been selected. + let sel = Selected::from(self.inner.select.load(Ordering::Acquire)); + if sel != Selected::Waiting { + return sel; + } + + // If there's a deadline, park the current thread until the deadline is reached. + if let Some(end) = deadline { + let now = Instant::now(); + + if now < end { + thread::park_timeout(end - now); + } else { + // The deadline has been reached. Try aborting select. + return match self.try_select(Selected::Aborted) { + Ok(()) => Selected::Aborted, + Err(s) => s, + }; + } + } else { + thread::park(); + } + } + } + + /// Unparks the thread this context belongs to. + #[inline] + pub fn unpark(&self) { + self.inner.thread.unpark(); + } + + /// Returns the id of the thread this context belongs to. + #[inline] + pub fn thread_id(&self) -> usize { + self.inner.thread_id + } +} diff --git a/library/std/src/sync/mpmc/counter.rs b/library/std/src/sync/mpmc/counter.rs new file mode 100644 index 000000000..a5a6bdc67 --- /dev/null +++ b/library/std/src/sync/mpmc/counter.rs @@ -0,0 +1,137 @@ +use crate::ops; +use crate::process; +use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + +/// Reference counter internals. +struct Counter { + /// The number of senders associated with the channel. + senders: AtomicUsize, + + /// The number of receivers associated with the channel. + receivers: AtomicUsize, + + /// Set to `true` if the last sender or the last receiver reference deallocates the channel. + destroy: AtomicBool, + + /// The internal channel. + chan: C, +} + +/// Wraps a channel into the reference counter. +pub(crate) fn new(chan: C) -> (Sender, Receiver) { + let counter = Box::into_raw(Box::new(Counter { + senders: AtomicUsize::new(1), + receivers: AtomicUsize::new(1), + destroy: AtomicBool::new(false), + chan, + })); + let s = Sender { counter }; + let r = Receiver { counter }; + (s, r) +} + +/// The sending side. +pub(crate) struct Sender { + counter: *mut Counter, +} + +impl Sender { + /// Returns the internal `Counter`. + fn counter(&self) -> &Counter { + unsafe { &*self.counter } + } + + /// Acquires another sender reference. + pub(crate) fn acquire(&self) -> Sender { + let count = self.counter().senders.fetch_add(1, Ordering::Relaxed); + + // Cloning senders and calling `mem::forget` on the clones could potentially overflow the + // counter. It's very difficult to recover sensibly from such degenerate scenarios so we + // just abort when the count becomes very large. + if count > isize::MAX as usize { + process::abort(); + } + + Sender { counter: self.counter } + } + + /// Releases the sender reference. + /// + /// Function `disconnect` will be called if this is the last sender reference. + pub(crate) unsafe fn release bool>(&self, disconnect: F) { + if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 { + disconnect(&self.counter().chan); + + if self.counter().destroy.swap(true, Ordering::AcqRel) { + drop(Box::from_raw(self.counter)); + } + } + } +} + +impl ops::Deref for Sender { + type Target = C; + + fn deref(&self) -> &C { + &self.counter().chan + } +} + +impl PartialEq for Sender { + fn eq(&self, other: &Sender) -> bool { + self.counter == other.counter + } +} + +/// The receiving side. +pub(crate) struct Receiver { + counter: *mut Counter, +} + +impl Receiver { + /// Returns the internal `Counter`. + fn counter(&self) -> &Counter { + unsafe { &*self.counter } + } + + /// Acquires another receiver reference. + pub(crate) fn acquire(&self) -> Receiver { + let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed); + + // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the + // counter. It's very difficult to recover sensibly from such degenerate scenarios so we + // just abort when the count becomes very large. + if count > isize::MAX as usize { + process::abort(); + } + + Receiver { counter: self.counter } + } + + /// Releases the receiver reference. + /// + /// Function `disconnect` will be called if this is the last receiver reference. + pub(crate) unsafe fn release bool>(&self, disconnect: F) { + if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 { + disconnect(&self.counter().chan); + + if self.counter().destroy.swap(true, Ordering::AcqRel) { + drop(Box::from_raw(self.counter)); + } + } + } +} + +impl ops::Deref for Receiver { + type Target = C; + + fn deref(&self) -> &C { + &self.counter().chan + } +} + +impl PartialEq for Receiver { + fn eq(&self, other: &Receiver) -> bool { + self.counter == other.counter + } +} diff --git a/library/std/src/sync/mpmc/error.rs b/library/std/src/sync/mpmc/error.rs new file mode 100644 index 000000000..1b8a1f387 --- /dev/null +++ b/library/std/src/sync/mpmc/error.rs @@ -0,0 +1,46 @@ +use crate::error; +use crate::fmt; + +pub use crate::sync::mpsc::{RecvError, RecvTimeoutError, SendError, TryRecvError, TrySendError}; + +/// An error returned from the [`send_timeout`] method. +/// +/// The error contains the message being sent so it can be recovered. +/// +/// [`send_timeout`]: super::Sender::send_timeout +#[derive(PartialEq, Eq, Clone, Copy)] +pub enum SendTimeoutError { + /// The message could not be sent because the channel is full and the operation timed out. + /// + /// If this is a zero-capacity channel, then the error indicates that there was no receiver + /// available to receive the message and the operation timed out. + Timeout(T), + + /// The message could not be sent because the channel is disconnected. + Disconnected(T), +} + +impl fmt::Debug for SendTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "SendTimeoutError(..)".fmt(f) + } +} + +impl fmt::Display for SendTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + SendTimeoutError::Timeout(..) => "timed out waiting on send operation".fmt(f), + SendTimeoutError::Disconnected(..) => "sending on a disconnected channel".fmt(f), + } + } +} + +impl error::Error for SendTimeoutError {} + +impl From> for SendTimeoutError { + fn from(err: SendError) -> SendTimeoutError { + match err { + SendError(e) => SendTimeoutError::Disconnected(e), + } + } +} diff --git a/library/std/src/sync/mpmc/list.rs b/library/std/src/sync/mpmc/list.rs new file mode 100644 index 000000000..ec6c0726a --- /dev/null +++ b/library/std/src/sync/mpmc/list.rs @@ -0,0 +1,638 @@ +//! Unbounded channel implemented as a linked list. + +use super::context::Context; +use super::error::*; +use super::select::{Operation, Selected, Token}; +use super::utils::{Backoff, CachePadded}; +use super::waker::SyncWaker; + +use crate::cell::UnsafeCell; +use crate::marker::PhantomData; +use crate::mem::MaybeUninit; +use crate::ptr; +use crate::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; +use crate::time::Instant; + +// Bits indicating the state of a slot: +// * If a message has been written into the slot, `WRITE` is set. +// * If a message has been read from the slot, `READ` is set. +// * If the block is being destroyed, `DESTROY` is set. +const WRITE: usize = 1; +const READ: usize = 2; +const DESTROY: usize = 4; + +// Each block covers one "lap" of indices. +const LAP: usize = 32; +// The maximum number of messages a block can hold. +const BLOCK_CAP: usize = LAP - 1; +// How many lower bits are reserved for metadata. +const SHIFT: usize = 1; +// Has two different purposes: +// * If set in head, indicates that the block is not the last one. +// * If set in tail, indicates that the channel is disconnected. +const MARK_BIT: usize = 1; + +/// A slot in a block. +struct Slot { + /// The message. + msg: UnsafeCell>, + + /// The state of the slot. + state: AtomicUsize, +} + +impl Slot { + /// Waits until a message is written into the slot. + fn wait_write(&self) { + let backoff = Backoff::new(); + while self.state.load(Ordering::Acquire) & WRITE == 0 { + backoff.spin_heavy(); + } + } +} + +/// A block in a linked list. +/// +/// Each block in the list can hold up to `BLOCK_CAP` messages. +struct Block { + /// The next block in the linked list. + next: AtomicPtr>, + + /// Slots for messages. + slots: [Slot; BLOCK_CAP], +} + +impl Block { + /// Creates an empty block. + fn new() -> Block { + // SAFETY: This is safe because: + // [1] `Block::next` (AtomicPtr) may be safely zero initialized. + // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4]. + // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it + // holds a MaybeUninit. + // [4] `Slot::state` (AtomicUsize) may be safely zero initialized. + unsafe { MaybeUninit::zeroed().assume_init() } + } + + /// Waits until the next pointer is set. + fn wait_next(&self) -> *mut Block { + let backoff = Backoff::new(); + loop { + let next = self.next.load(Ordering::Acquire); + if !next.is_null() { + return next; + } + backoff.spin_heavy(); + } + } + + /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block. + unsafe fn destroy(this: *mut Block, start: usize) { + // It is not necessary to set the `DESTROY` bit in the last slot because that slot has + // begun destruction of the block. + for i in start..BLOCK_CAP - 1 { + let slot = (*this).slots.get_unchecked(i); + + // Mark the `DESTROY` bit if a thread is still using the slot. + if slot.state.load(Ordering::Acquire) & READ == 0 + && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0 + { + // If a thread is still using the slot, it will continue destruction of the block. + return; + } + } + + // No thread is using the block, now it is safe to destroy it. + drop(Box::from_raw(this)); + } +} + +/// A position in a channel. +#[derive(Debug)] +struct Position { + /// The index in the channel. + index: AtomicUsize, + + /// The block in the linked list. + block: AtomicPtr>, +} + +/// The token type for the list flavor. +#[derive(Debug)] +pub(crate) struct ListToken { + /// The block of slots. + block: *const u8, + + /// The offset into the block. + offset: usize, +} + +impl Default for ListToken { + #[inline] + fn default() -> Self { + ListToken { block: ptr::null(), offset: 0 } + } +} + +/// Unbounded channel implemented as a linked list. +/// +/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are +/// represented as numbers of type `usize` and wrap on overflow. +/// +/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and +/// improve cache efficiency. +pub(crate) struct Channel { + /// The head of the channel. + head: CachePadded>, + + /// The tail of the channel. + tail: CachePadded>, + + /// Receivers waiting while the channel is empty and not disconnected. + receivers: SyncWaker, + + /// Indicates that dropping a `Channel` may drop messages of type `T`. + _marker: PhantomData, +} + +impl Channel { + /// Creates a new unbounded channel. + pub(crate) fn new() -> Self { + Channel { + head: CachePadded::new(Position { + block: AtomicPtr::new(ptr::null_mut()), + index: AtomicUsize::new(0), + }), + tail: CachePadded::new(Position { + block: AtomicPtr::new(ptr::null_mut()), + index: AtomicUsize::new(0), + }), + receivers: SyncWaker::new(), + _marker: PhantomData, + } + } + + /// Attempts to reserve a slot for sending a message. + fn start_send(&self, token: &mut Token) -> bool { + let backoff = Backoff::new(); + let mut tail = self.tail.index.load(Ordering::Acquire); + let mut block = self.tail.block.load(Ordering::Acquire); + let mut next_block = None; + + loop { + // Check if the channel is disconnected. + if tail & MARK_BIT != 0 { + token.list.block = ptr::null(); + return true; + } + + // Calculate the offset of the index into the block. + let offset = (tail >> SHIFT) % LAP; + + // If we reached the end of the block, wait until the next one is installed. + if offset == BLOCK_CAP { + backoff.spin_heavy(); + tail = self.tail.index.load(Ordering::Acquire); + block = self.tail.block.load(Ordering::Acquire); + continue; + } + + // If we're going to have to install the next block, allocate it in advance in order to + // make the wait for other threads as short as possible. + if offset + 1 == BLOCK_CAP && next_block.is_none() { + next_block = Some(Box::new(Block::::new())); + } + + // If this is the first message to be sent into the channel, we need to allocate the + // first block and install it. + if block.is_null() { + let new = Box::into_raw(Box::new(Block::::new())); + + if self + .tail + .block + .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + self.head.block.store(new, Ordering::Release); + block = new; + } else { + next_block = unsafe { Some(Box::from_raw(new)) }; + tail = self.tail.index.load(Ordering::Acquire); + block = self.tail.block.load(Ordering::Acquire); + continue; + } + } + + let new_tail = tail + (1 << SHIFT); + + // Try advancing the tail forward. + match self.tail.index.compare_exchange_weak( + tail, + new_tail, + Ordering::SeqCst, + Ordering::Acquire, + ) { + Ok(_) => unsafe { + // If we've reached the end of the block, install the next one. + if offset + 1 == BLOCK_CAP { + let next_block = Box::into_raw(next_block.unwrap()); + self.tail.block.store(next_block, Ordering::Release); + self.tail.index.fetch_add(1 << SHIFT, Ordering::Release); + (*block).next.store(next_block, Ordering::Release); + } + + token.list.block = block as *const u8; + token.list.offset = offset; + return true; + }, + Err(_) => { + backoff.spin_light(); + tail = self.tail.index.load(Ordering::Acquire); + block = self.tail.block.load(Ordering::Acquire); + } + } + } + } + + /// Writes a message into the channel. + pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { + // If there is no slot, the channel is disconnected. + if token.list.block.is_null() { + return Err(msg); + } + + // Write the message into the slot. + let block = token.list.block as *mut Block; + let offset = token.list.offset; + let slot = (*block).slots.get_unchecked(offset); + slot.msg.get().write(MaybeUninit::new(msg)); + slot.state.fetch_or(WRITE, Ordering::Release); + + // Wake a sleeping receiver. + self.receivers.notify(); + Ok(()) + } + + /// Attempts to reserve a slot for receiving a message. + fn start_recv(&self, token: &mut Token) -> bool { + let backoff = Backoff::new(); + let mut head = self.head.index.load(Ordering::Acquire); + let mut block = self.head.block.load(Ordering::Acquire); + + loop { + // Calculate the offset of the index into the block. + let offset = (head >> SHIFT) % LAP; + + // If we reached the end of the block, wait until the next one is installed. + if offset == BLOCK_CAP { + backoff.spin_heavy(); + head = self.head.index.load(Ordering::Acquire); + block = self.head.block.load(Ordering::Acquire); + continue; + } + + let mut new_head = head + (1 << SHIFT); + + if new_head & MARK_BIT == 0 { + atomic::fence(Ordering::SeqCst); + let tail = self.tail.index.load(Ordering::Relaxed); + + // If the tail equals the head, that means the channel is empty. + if head >> SHIFT == tail >> SHIFT { + // If the channel is disconnected... + if tail & MARK_BIT != 0 { + // ...then receive an error. + token.list.block = ptr::null(); + return true; + } else { + // Otherwise, the receive operation is not ready. + return false; + } + } + + // If head and tail are not in the same block, set `MARK_BIT` in head. + if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { + new_head |= MARK_BIT; + } + } + + // The block can be null here only if the first message is being sent into the channel. + // In that case, just wait until it gets initialized. + if block.is_null() { + backoff.spin_heavy(); + head = self.head.index.load(Ordering::Acquire); + block = self.head.block.load(Ordering::Acquire); + continue; + } + + // Try moving the head index forward. + match self.head.index.compare_exchange_weak( + head, + new_head, + Ordering::SeqCst, + Ordering::Acquire, + ) { + Ok(_) => unsafe { + // If we've reached the end of the block, move to the next one. + if offset + 1 == BLOCK_CAP { + let next = (*block).wait_next(); + let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT); + if !(*next).next.load(Ordering::Relaxed).is_null() { + next_index |= MARK_BIT; + } + + self.head.block.store(next, Ordering::Release); + self.head.index.store(next_index, Ordering::Release); + } + + token.list.block = block as *const u8; + token.list.offset = offset; + return true; + }, + Err(_) => { + backoff.spin_light(); + head = self.head.index.load(Ordering::Acquire); + block = self.head.block.load(Ordering::Acquire); + } + } + } + } + + /// Reads a message from the channel. + pub(crate) unsafe fn read(&self, token: &mut Token) -> Result { + if token.list.block.is_null() { + // The channel is disconnected. + return Err(()); + } + + // Read the message. + let block = token.list.block as *mut Block; + let offset = token.list.offset; + let slot = (*block).slots.get_unchecked(offset); + slot.wait_write(); + let msg = slot.msg.get().read().assume_init(); + + // Destroy the block if we've reached the end, or if another thread wanted to destroy but + // couldn't because we were busy reading from the slot. + if offset + 1 == BLOCK_CAP { + Block::destroy(block, 0); + } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { + Block::destroy(block, offset + 1); + } + + Ok(msg) + } + + /// Attempts to send a message into the channel. + pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError> { + self.send(msg, None).map_err(|err| match err { + SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg), + SendTimeoutError::Timeout(_) => unreachable!(), + }) + } + + /// Sends a message into the channel. + pub(crate) fn send( + &self, + msg: T, + _deadline: Option, + ) -> Result<(), SendTimeoutError> { + let token = &mut Token::default(); + assert!(self.start_send(token)); + unsafe { self.write(token, msg).map_err(SendTimeoutError::Disconnected) } + } + + /// Attempts to receive a message without blocking. + pub(crate) fn try_recv(&self) -> Result { + let token = &mut Token::default(); + + if self.start_recv(token) { + unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } + } else { + Err(TryRecvError::Empty) + } + } + + /// Receives a message from the channel. + pub(crate) fn recv(&self, deadline: Option) -> Result { + let token = &mut Token::default(); + loop { + if self.start_recv(token) { + unsafe { + return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); + } + } + + if let Some(d) = deadline { + if Instant::now() >= d { + return Err(RecvTimeoutError::Timeout); + } + } + + // Prepare for blocking until a sender wakes us up. + Context::with(|cx| { + let oper = Operation::hook(token); + self.receivers.register(oper, cx); + + // Has the channel become ready just now? + if !self.is_empty() || self.is_disconnected() { + let _ = cx.try_select(Selected::Aborted); + } + + // Block the current thread. + let sel = cx.wait_until(deadline); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted | Selected::Disconnected => { + self.receivers.unregister(oper).unwrap(); + // If the channel was disconnected, we still have to check for remaining + // messages. + } + Selected::Operation(_) => {} + } + }); + } + } + + /// Returns the current number of messages inside the channel. + pub(crate) fn len(&self) -> usize { + loop { + // Load the tail index, then load the head index. + let mut tail = self.tail.index.load(Ordering::SeqCst); + let mut head = self.head.index.load(Ordering::SeqCst); + + // If the tail index didn't change, we've got consistent indices to work with. + if self.tail.index.load(Ordering::SeqCst) == tail { + // Erase the lower bits. + tail &= !((1 << SHIFT) - 1); + head &= !((1 << SHIFT) - 1); + + // Fix up indices if they fall onto block ends. + if (tail >> SHIFT) & (LAP - 1) == LAP - 1 { + tail = tail.wrapping_add(1 << SHIFT); + } + if (head >> SHIFT) & (LAP - 1) == LAP - 1 { + head = head.wrapping_add(1 << SHIFT); + } + + // Rotate indices so that head falls into the first block. + let lap = (head >> SHIFT) / LAP; + tail = tail.wrapping_sub((lap * LAP) << SHIFT); + head = head.wrapping_sub((lap * LAP) << SHIFT); + + // Remove the lower bits. + tail >>= SHIFT; + head >>= SHIFT; + + // Return the difference minus the number of blocks between tail and head. + return tail - head - tail / LAP; + } + } + } + + /// Returns the capacity of the channel. + pub(crate) fn capacity(&self) -> Option { + None + } + + /// Disconnects senders and wakes up all blocked receivers. + /// + /// Returns `true` if this call disconnected the channel. + pub(crate) fn disconnect_senders(&self) -> bool { + let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); + + if tail & MARK_BIT == 0 { + self.receivers.disconnect(); + true + } else { + false + } + } + + /// Disconnects receivers. + /// + /// Returns `true` if this call disconnected the channel. + pub(crate) fn disconnect_receivers(&self) -> bool { + let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst); + + if tail & MARK_BIT == 0 { + // If receivers are dropped first, discard all messages to free + // memory eagerly. + self.discard_all_messages(); + true + } else { + false + } + } + + /// Discards all messages. + /// + /// This method should only be called when all receivers are dropped. + fn discard_all_messages(&self) { + let backoff = Backoff::new(); + let mut tail = self.tail.index.load(Ordering::Acquire); + loop { + let offset = (tail >> SHIFT) % LAP; + if offset != BLOCK_CAP { + break; + } + + // New updates to tail will be rejected by MARK_BIT and aborted unless it's + // at boundary. We need to wait for the updates take affect otherwise there + // can be memory leaks. + backoff.spin_heavy(); + tail = self.tail.index.load(Ordering::Acquire); + } + + let mut head = self.head.index.load(Ordering::Acquire); + let mut block = self.head.block.load(Ordering::Acquire); + + unsafe { + // Drop all messages between head and tail and deallocate the heap-allocated blocks. + while head >> SHIFT != tail >> SHIFT { + let offset = (head >> SHIFT) % LAP; + + if offset < BLOCK_CAP { + // Drop the message in the slot. + let slot = (*block).slots.get_unchecked(offset); + slot.wait_write(); + let p = &mut *slot.msg.get(); + p.as_mut_ptr().drop_in_place(); + } else { + (*block).wait_next(); + // Deallocate the block and move to the next one. + let next = (*block).next.load(Ordering::Acquire); + drop(Box::from_raw(block)); + block = next; + } + + head = head.wrapping_add(1 << SHIFT); + } + + // Deallocate the last remaining block. + if !block.is_null() { + drop(Box::from_raw(block)); + } + } + head &= !MARK_BIT; + self.head.block.store(ptr::null_mut(), Ordering::Release); + self.head.index.store(head, Ordering::Release); + } + + /// Returns `true` if the channel is disconnected. + pub(crate) fn is_disconnected(&self) -> bool { + self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0 + } + + /// Returns `true` if the channel is empty. + pub(crate) fn is_empty(&self) -> bool { + let head = self.head.index.load(Ordering::SeqCst); + let tail = self.tail.index.load(Ordering::SeqCst); + head >> SHIFT == tail >> SHIFT + } + + /// Returns `true` if the channel is full. + pub(crate) fn is_full(&self) -> bool { + false + } +} + +impl Drop for Channel { + fn drop(&mut self) { + let mut head = self.head.index.load(Ordering::Relaxed); + let mut tail = self.tail.index.load(Ordering::Relaxed); + let mut block = self.head.block.load(Ordering::Relaxed); + + // Erase the lower bits. + head &= !((1 << SHIFT) - 1); + tail &= !((1 << SHIFT) - 1); + + unsafe { + // Drop all messages between head and tail and deallocate the heap-allocated blocks. + while head != tail { + let offset = (head >> SHIFT) % LAP; + + if offset < BLOCK_CAP { + // Drop the message in the slot. + let slot = (*block).slots.get_unchecked(offset); + let p = &mut *slot.msg.get(); + p.as_mut_ptr().drop_in_place(); + } else { + // Deallocate the block and move to the next one. + let next = (*block).next.load(Ordering::Relaxed); + drop(Box::from_raw(block)); + block = next; + } + + head = head.wrapping_add(1 << SHIFT); + } + + // Deallocate the last remaining block. + if !block.is_null() { + drop(Box::from_raw(block)); + } + } + } +} diff --git a/library/std/src/sync/mpmc/mod.rs b/library/std/src/sync/mpmc/mod.rs new file mode 100644 index 000000000..7a602cecd --- /dev/null +++ b/library/std/src/sync/mpmc/mod.rs @@ -0,0 +1,430 @@ +//! Multi-producer multi-consumer channels. + +// This module is not currently exposed publicly, but is used +// as the implementation for the channels in `sync::mpsc`. The +// implementation comes from the crossbeam-channel crate: +// +// Copyright (c) 2019 The Crossbeam Project Developers +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +mod array; +mod context; +mod counter; +mod error; +mod list; +mod select; +mod utils; +mod waker; +mod zero; + +use crate::fmt; +use crate::panic::{RefUnwindSafe, UnwindSafe}; +use crate::time::{Duration, Instant}; +pub use error::*; + +/// Creates a channel of unbounded capacity. +/// +/// This channel has a growable buffer that can hold any number of messages at a time. +pub fn channel() -> (Sender, Receiver) { + let (s, r) = counter::new(list::Channel::new()); + let s = Sender { flavor: SenderFlavor::List(s) }; + let r = Receiver { flavor: ReceiverFlavor::List(r) }; + (s, r) +} + +/// Creates a channel of bounded capacity. +/// +/// This channel has a buffer that can hold at most `cap` messages at a time. +/// +/// A special case is zero-capacity channel, which cannot hold any messages. Instead, send and +/// receive operations must appear at the same time in order to pair up and pass the message over. +pub fn sync_channel(cap: usize) -> (Sender, Receiver) { + if cap == 0 { + let (s, r) = counter::new(zero::Channel::new()); + let s = Sender { flavor: SenderFlavor::Zero(s) }; + let r = Receiver { flavor: ReceiverFlavor::Zero(r) }; + (s, r) + } else { + let (s, r) = counter::new(array::Channel::with_capacity(cap)); + let s = Sender { flavor: SenderFlavor::Array(s) }; + let r = Receiver { flavor: ReceiverFlavor::Array(r) }; + (s, r) + } +} + +/// The sending side of a channel. +pub struct Sender { + flavor: SenderFlavor, +} + +/// Sender flavors. +enum SenderFlavor { + /// Bounded channel based on a preallocated array. + Array(counter::Sender>), + + /// Unbounded channel implemented as a linked list. + List(counter::Sender>), + + /// Zero-capacity channel. + Zero(counter::Sender>), +} + +unsafe impl Send for Sender {} +unsafe impl Sync for Sender {} + +impl UnwindSafe for Sender {} +impl RefUnwindSafe for Sender {} + +impl Sender { + /// Attempts to send a message into the channel without blocking. + /// + /// This method will either send a message into the channel immediately or return an error if + /// the channel is full or disconnected. The returned error contains the original message. + /// + /// If called on a zero-capacity channel, this method will send the message only if there + /// happens to be a receive operation on the other side of the channel at the same time. + pub fn try_send(&self, msg: T) -> Result<(), TrySendError> { + match &self.flavor { + SenderFlavor::Array(chan) => chan.try_send(msg), + SenderFlavor::List(chan) => chan.try_send(msg), + SenderFlavor::Zero(chan) => chan.try_send(msg), + } + } + + /// Blocks the current thread until a message is sent or the channel is disconnected. + /// + /// If the channel is full and not disconnected, this call will block until the send operation + /// can proceed. If the channel becomes disconnected, this call will wake up and return an + /// error. The returned error contains the original message. + /// + /// If called on a zero-capacity channel, this method will wait for a receive operation to + /// appear on the other side of the channel. + pub fn send(&self, msg: T) -> Result<(), SendError> { + match &self.flavor { + SenderFlavor::Array(chan) => chan.send(msg, None), + SenderFlavor::List(chan) => chan.send(msg, None), + SenderFlavor::Zero(chan) => chan.send(msg, None), + } + .map_err(|err| match err { + SendTimeoutError::Disconnected(msg) => SendError(msg), + SendTimeoutError::Timeout(_) => unreachable!(), + }) + } +} + +// The methods below are not used by `sync::mpsc`, but +// are useful and we'll likely want to expose them +// eventually +#[allow(unused)] +impl Sender { + /// Waits for a message to be sent into the channel, but only for a limited time. + /// + /// If the channel is full and not disconnected, this call will block until the send operation + /// can proceed or the operation times out. If the channel becomes disconnected, this call will + /// wake up and return an error. The returned error contains the original message. + /// + /// If called on a zero-capacity channel, this method will wait for a receive operation to + /// appear on the other side of the channel. + pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError> { + match Instant::now().checked_add(timeout) { + Some(deadline) => self.send_deadline(msg, deadline), + // So far in the future that it's practically the same as waiting indefinitely. + None => self.send(msg).map_err(SendTimeoutError::from), + } + } + + /// Waits for a message to be sent into the channel, but only until a given deadline. + /// + /// If the channel is full and not disconnected, this call will block until the send operation + /// can proceed or the operation times out. If the channel becomes disconnected, this call will + /// wake up and return an error. The returned error contains the original message. + /// + /// If called on a zero-capacity channel, this method will wait for a receive operation to + /// appear on the other side of the channel. + pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError> { + match &self.flavor { + SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)), + SenderFlavor::List(chan) => chan.send(msg, Some(deadline)), + SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)), + } + } + + /// Returns `true` if the channel is empty. + /// + /// Note: Zero-capacity channels are always empty. + pub fn is_empty(&self) -> bool { + match &self.flavor { + SenderFlavor::Array(chan) => chan.is_empty(), + SenderFlavor::List(chan) => chan.is_empty(), + SenderFlavor::Zero(chan) => chan.is_empty(), + } + } + + /// Returns `true` if the channel is full. + /// + /// Note: Zero-capacity channels are always full. + pub fn is_full(&self) -> bool { + match &self.flavor { + SenderFlavor::Array(chan) => chan.is_full(), + SenderFlavor::List(chan) => chan.is_full(), + SenderFlavor::Zero(chan) => chan.is_full(), + } + } + + /// Returns the number of messages in the channel. + pub fn len(&self) -> usize { + match &self.flavor { + SenderFlavor::Array(chan) => chan.len(), + SenderFlavor::List(chan) => chan.len(), + SenderFlavor::Zero(chan) => chan.len(), + } + } + + /// If the channel is bounded, returns its capacity. + pub fn capacity(&self) -> Option { + match &self.flavor { + SenderFlavor::Array(chan) => chan.capacity(), + SenderFlavor::List(chan) => chan.capacity(), + SenderFlavor::Zero(chan) => chan.capacity(), + } + } + + /// Returns `true` if senders belong to the same channel. + pub fn same_channel(&self, other: &Sender) -> bool { + match (&self.flavor, &other.flavor) { + (SenderFlavor::Array(ref a), SenderFlavor::Array(ref b)) => a == b, + (SenderFlavor::List(ref a), SenderFlavor::List(ref b)) => a == b, + (SenderFlavor::Zero(ref a), SenderFlavor::Zero(ref b)) => a == b, + _ => false, + } + } +} + +impl Drop for Sender { + fn drop(&mut self) { + unsafe { + match &self.flavor { + SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()), + SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()), + SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()), + } + } + } +} + +impl Clone for Sender { + fn clone(&self) -> Self { + let flavor = match &self.flavor { + SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()), + SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()), + SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()), + }; + + Sender { flavor } + } +} + +impl fmt::Debug for Sender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Sender { .. }") + } +} + +/// The receiving side of a channel. +pub struct Receiver { + flavor: ReceiverFlavor, +} + +/// Receiver flavors. +enum ReceiverFlavor { + /// Bounded channel based on a preallocated array. + Array(counter::Receiver>), + + /// Unbounded channel implemented as a linked list. + List(counter::Receiver>), + + /// Zero-capacity channel. + Zero(counter::Receiver>), +} + +unsafe impl Send for Receiver {} +unsafe impl Sync for Receiver {} + +impl UnwindSafe for Receiver {} +impl RefUnwindSafe for Receiver {} + +impl Receiver { + /// Attempts to receive a message from the channel without blocking. + /// + /// This method will either receive a message from the channel immediately or return an error + /// if the channel is empty. + /// + /// If called on a zero-capacity channel, this method will receive a message only if there + /// happens to be a send operation on the other side of the channel at the same time. + pub fn try_recv(&self) -> Result { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.try_recv(), + ReceiverFlavor::List(chan) => chan.try_recv(), + ReceiverFlavor::Zero(chan) => chan.try_recv(), + } + } + + /// Blocks the current thread until a message is received or the channel is empty and + /// disconnected. + /// + /// If the channel is empty and not disconnected, this call will block until the receive + /// operation can proceed. If the channel is empty and becomes disconnected, this call will + /// wake up and return an error. + /// + /// If called on a zero-capacity channel, this method will wait for a send operation to appear + /// on the other side of the channel. + pub fn recv(&self) -> Result { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.recv(None), + ReceiverFlavor::List(chan) => chan.recv(None), + ReceiverFlavor::Zero(chan) => chan.recv(None), + } + .map_err(|_| RecvError) + } + + /// Waits for a message to be received from the channel, but only for a limited time. + /// + /// If the channel is empty and not disconnected, this call will block until the receive + /// operation can proceed or the operation times out. If the channel is empty and becomes + /// disconnected, this call will wake up and return an error. + /// + /// If called on a zero-capacity channel, this method will wait for a send operation to appear + /// on the other side of the channel. + pub fn recv_timeout(&self, timeout: Duration) -> Result { + match Instant::now().checked_add(timeout) { + Some(deadline) => self.recv_deadline(deadline), + // So far in the future that it's practically the same as waiting indefinitely. + None => self.recv().map_err(RecvTimeoutError::from), + } + } + + /// Waits for a message to be received from the channel, but only for a limited time. + /// + /// If the channel is empty and not disconnected, this call will block until the receive + /// operation can proceed or the operation times out. If the channel is empty and becomes + /// disconnected, this call will wake up and return an error. + /// + /// If called on a zero-capacity channel, this method will wait for a send operation to appear + /// on the other side of the channel. + pub fn recv_deadline(&self, deadline: Instant) -> Result { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)), + ReceiverFlavor::List(chan) => chan.recv(Some(deadline)), + ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)), + } + } +} + +// The methods below are not used by `sync::mpsc`, but +// are useful and we'll likely want to expose them +// eventually +#[allow(unused)] +impl Receiver { + /// Returns `true` if the channel is empty. + /// + /// Note: Zero-capacity channels are always empty. + pub fn is_empty(&self) -> bool { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.is_empty(), + ReceiverFlavor::List(chan) => chan.is_empty(), + ReceiverFlavor::Zero(chan) => chan.is_empty(), + } + } + + /// Returns `true` if the channel is full. + /// + /// Note: Zero-capacity channels are always full. + pub fn is_full(&self) -> bool { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.is_full(), + ReceiverFlavor::List(chan) => chan.is_full(), + ReceiverFlavor::Zero(chan) => chan.is_full(), + } + } + + /// Returns the number of messages in the channel. + pub fn len(&self) -> usize { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.len(), + ReceiverFlavor::List(chan) => chan.len(), + ReceiverFlavor::Zero(chan) => chan.len(), + } + } + + /// If the channel is bounded, returns its capacity. + pub fn capacity(&self) -> Option { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.capacity(), + ReceiverFlavor::List(chan) => chan.capacity(), + ReceiverFlavor::Zero(chan) => chan.capacity(), + } + } + + /// Returns `true` if receivers belong to the same channel. + pub fn same_channel(&self, other: &Receiver) -> bool { + match (&self.flavor, &other.flavor) { + (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b, + (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b, + (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b, + _ => false, + } + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + unsafe { + match &self.flavor { + ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()), + ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()), + ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()), + } + } + } +} + +impl Clone for Receiver { + fn clone(&self) -> Self { + let flavor = match &self.flavor { + ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()), + ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()), + ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()), + }; + + Receiver { flavor } + } +} + +impl fmt::Debug for Receiver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Receiver { .. }") + } +} diff --git a/library/std/src/sync/mpmc/select.rs b/library/std/src/sync/mpmc/select.rs new file mode 100644 index 000000000..56a83fee2 --- /dev/null +++ b/library/std/src/sync/mpmc/select.rs @@ -0,0 +1,71 @@ +/// Temporary data that gets initialized during a blocking operation, and is consumed by +/// `read` or `write`. +/// +/// Each field contains data associated with a specific channel flavor. +#[derive(Debug, Default)] +pub struct Token { + pub(crate) array: super::array::ArrayToken, + pub(crate) list: super::list::ListToken, + #[allow(dead_code)] + pub(crate) zero: super::zero::ZeroToken, +} + +/// Identifier associated with an operation by a specific thread on a specific channel. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct Operation(usize); + +impl Operation { + /// Creates an operation identifier from a mutable reference. + /// + /// This function essentially just turns the address of the reference into a number. The + /// reference should point to a variable that is specific to the thread and the operation, + /// and is alive for the entire duration of a blocking operation. + #[inline] + pub fn hook(r: &mut T) -> Operation { + let val = r as *mut T as usize; + // Make sure that the pointer address doesn't equal the numerical representation of + // `Selected::{Waiting, Aborted, Disconnected}`. + assert!(val > 2); + Operation(val) + } +} + +/// Current state of a blocking operation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Selected { + /// Still waiting for an operation. + Waiting, + + /// The attempt to block the current thread has been aborted. + Aborted, + + /// An operation became ready because a channel is disconnected. + Disconnected, + + /// An operation became ready because a message can be sent or received. + Operation(Operation), +} + +impl From for Selected { + #[inline] + fn from(val: usize) -> Selected { + match val { + 0 => Selected::Waiting, + 1 => Selected::Aborted, + 2 => Selected::Disconnected, + oper => Selected::Operation(Operation(oper)), + } + } +} + +impl Into for Selected { + #[inline] + fn into(self) -> usize { + match self { + Selected::Waiting => 0, + Selected::Aborted => 1, + Selected::Disconnected => 2, + Selected::Operation(Operation(val)) => val, + } + } +} diff --git a/library/std/src/sync/mpmc/utils.rs b/library/std/src/sync/mpmc/utils.rs new file mode 100644 index 000000000..cfe42750d --- /dev/null +++ b/library/std/src/sync/mpmc/utils.rs @@ -0,0 +1,143 @@ +use crate::cell::Cell; +use crate::ops::{Deref, DerefMut}; + +/// Pads and aligns a value to the length of a cache line. +#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)] +// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache +// lines at a time, so we have to align to 128 bytes rather than 64. +// +// Sources: +// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf +// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 +// +// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. +// +// Sources: +// - https://www.mono-project.com/news/2016/09/12/arm64-icache/ +// +// powerpc64 has 128-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 +#[cfg_attr( + any(target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64",), + repr(align(128)) +)] +// arm, mips, mips64, and riscv64 have 32-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7 +#[cfg_attr( + any( + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + ), + repr(align(32)) +)] +// s390x has 256-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 +#[cfg_attr(target_arch = "s390x", repr(align(256)))] +// x86 and wasm have 64-byte cache line size. +// +// Sources: +// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 +// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 +// +// All others are assumed to have 64-byte cache line size. +#[cfg_attr( + not(any( + target_arch = "x86_64", + target_arch = "aarch64", + target_arch = "powerpc64", + target_arch = "arm", + target_arch = "mips", + target_arch = "mips64", + target_arch = "riscv64", + target_arch = "s390x", + )), + repr(align(64)) +)] +pub struct CachePadded { + value: T, +} + +impl CachePadded { + /// Pads and aligns a value to the length of a cache line. + pub fn new(value: T) -> CachePadded { + CachePadded:: { value } + } +} + +impl Deref for CachePadded { + type Target = T; + + fn deref(&self) -> &T { + &self.value + } +} + +impl DerefMut for CachePadded { + fn deref_mut(&mut self) -> &mut T { + &mut self.value + } +} + +const SPIN_LIMIT: u32 = 6; + +/// Performs quadratic backoff in spin loops. +pub struct Backoff { + step: Cell, +} + +impl Backoff { + /// Creates a new `Backoff`. + pub fn new() -> Self { + Backoff { step: Cell::new(0) } + } + + /// Backs off using lightweight spinning. + /// + /// This method should be used for: + /// - Retrying an operation because another thread made progress. i.e. on CAS failure. + /// - Waiting for an operation to complete by spinning optimistically for a few iterations + /// before falling back to parking the thread (see `Backoff::is_completed`). + #[inline] + pub fn spin_light(&self) { + let step = self.step.get().min(SPIN_LIMIT); + for _ in 0..step.pow(2) { + crate::hint::spin_loop(); + } + + self.step.set(self.step.get() + 1); + } + + /// Backs off using heavyweight spinning. + /// + /// This method should be used in blocking loops where parking the thread is not an option. + #[inline] + pub fn spin_heavy(&self) { + if self.step.get() <= SPIN_LIMIT { + for _ in 0..self.step.get().pow(2) { + crate::hint::spin_loop() + } + } else { + crate::thread::yield_now(); + } + + self.step.set(self.step.get() + 1); + } + + /// Returns `true` if quadratic backoff has completed and parking the thread is advised. + #[inline] + pub fn is_completed(&self) -> bool { + self.step.get() > SPIN_LIMIT + } +} diff --git a/library/std/src/sync/mpmc/waker.rs b/library/std/src/sync/mpmc/waker.rs new file mode 100644 index 000000000..4912ca4f8 --- /dev/null +++ b/library/std/src/sync/mpmc/waker.rs @@ -0,0 +1,204 @@ +//! Waking mechanism for threads blocked on channel operations. + +use super::context::Context; +use super::select::{Operation, Selected}; + +use crate::ptr; +use crate::sync::atomic::{AtomicBool, Ordering}; +use crate::sync::Mutex; + +/// Represents a thread blocked on a specific channel operation. +pub(crate) struct Entry { + /// The operation. + pub(crate) oper: Operation, + + /// Optional packet. + pub(crate) packet: *mut (), + + /// Context associated with the thread owning this operation. + pub(crate) cx: Context, +} + +/// A queue of threads blocked on channel operations. +/// +/// This data structure is used by threads to register blocking operations and get woken up once +/// an operation becomes ready. +pub(crate) struct Waker { + /// A list of select operations. + selectors: Vec, + + /// A list of operations waiting to be ready. + observers: Vec, +} + +impl Waker { + /// Creates a new `Waker`. + #[inline] + pub(crate) fn new() -> Self { + Waker { selectors: Vec::new(), observers: Vec::new() } + } + + /// Registers a select operation. + #[inline] + pub(crate) fn register(&mut self, oper: Operation, cx: &Context) { + self.register_with_packet(oper, ptr::null_mut(), cx); + } + + /// Registers a select operation and a packet. + #[inline] + pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) { + self.selectors.push(Entry { oper, packet, cx: cx.clone() }); + } + + /// Unregisters a select operation. + #[inline] + pub(crate) fn unregister(&mut self, oper: Operation) -> Option { + if let Some((i, _)) = + self.selectors.iter().enumerate().find(|&(_, entry)| entry.oper == oper) + { + let entry = self.selectors.remove(i); + Some(entry) + } else { + None + } + } + + /// Attempts to find another thread's entry, select the operation, and wake it up. + #[inline] + pub(crate) fn try_select(&mut self) -> Option { + self.selectors + .iter() + .position(|selector| { + // Does the entry belong to a different thread? + selector.cx.thread_id() != current_thread_id() + && selector // Try selecting this operation. + .cx + .try_select(Selected::Operation(selector.oper)) + .is_ok() + && { + // Provide the packet. + selector.cx.store_packet(selector.packet); + // Wake the thread up. + selector.cx.unpark(); + true + } + }) + // Remove the entry from the queue to keep it clean and improve + // performance. + .map(|pos| self.selectors.remove(pos)) + } + + /// Notifies all operations waiting to be ready. + #[inline] + pub(crate) fn notify(&mut self) { + for entry in self.observers.drain(..) { + if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() { + entry.cx.unpark(); + } + } + } + + /// Notifies all registered operations that the channel is disconnected. + #[inline] + pub(crate) fn disconnect(&mut self) { + for entry in self.selectors.iter() { + if entry.cx.try_select(Selected::Disconnected).is_ok() { + // Wake the thread up. + // + // Here we don't remove the entry from the queue. Registered threads must + // unregister from the waker by themselves. They might also want to recover the + // packet value and destroy it, if necessary. + entry.cx.unpark(); + } + } + + self.notify(); + } +} + +impl Drop for Waker { + #[inline] + fn drop(&mut self) { + debug_assert_eq!(self.selectors.len(), 0); + debug_assert_eq!(self.observers.len(), 0); + } +} + +/// A waker that can be shared among threads without locking. +/// +/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization. +pub(crate) struct SyncWaker { + /// The inner `Waker`. + inner: Mutex, + + /// `true` if the waker is empty. + is_empty: AtomicBool, +} + +impl SyncWaker { + /// Creates a new `SyncWaker`. + #[inline] + pub(crate) fn new() -> Self { + SyncWaker { inner: Mutex::new(Waker::new()), is_empty: AtomicBool::new(true) } + } + + /// Registers the current thread with an operation. + #[inline] + pub(crate) fn register(&self, oper: Operation, cx: &Context) { + let mut inner = self.inner.lock().unwrap(); + inner.register(oper, cx); + self.is_empty + .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst); + } + + /// Unregisters an operation previously registered by the current thread. + #[inline] + pub(crate) fn unregister(&self, oper: Operation) -> Option { + let mut inner = self.inner.lock().unwrap(); + let entry = inner.unregister(oper); + self.is_empty + .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst); + entry + } + + /// Attempts to find one thread (not the current one), select its operation, and wake it up. + #[inline] + pub(crate) fn notify(&self) { + if !self.is_empty.load(Ordering::SeqCst) { + let mut inner = self.inner.lock().unwrap(); + if !self.is_empty.load(Ordering::SeqCst) { + inner.try_select(); + inner.notify(); + self.is_empty.store( + inner.selectors.is_empty() && inner.observers.is_empty(), + Ordering::SeqCst, + ); + } + } + } + + /// Notifies all threads that the channel is disconnected. + #[inline] + pub(crate) fn disconnect(&self) { + let mut inner = self.inner.lock().unwrap(); + inner.disconnect(); + self.is_empty + .store(inner.selectors.is_empty() && inner.observers.is_empty(), Ordering::SeqCst); + } +} + +impl Drop for SyncWaker { + #[inline] + fn drop(&mut self) { + debug_assert!(self.is_empty.load(Ordering::SeqCst)); + } +} + +/// Returns a unique id for the current thread. +#[inline] +pub fn current_thread_id() -> usize { + // `u8` is not drop so this variable will be available during thread destruction, + // whereas `thread::current()` would not be + thread_local! { static DUMMY: u8 = 0 } + DUMMY.with(|x| (x as *const u8).addr()) +} diff --git a/library/std/src/sync/mpmc/zero.rs b/library/std/src/sync/mpmc/zero.rs new file mode 100644 index 000000000..33f768dcb --- /dev/null +++ b/library/std/src/sync/mpmc/zero.rs @@ -0,0 +1,318 @@ +//! Zero-capacity channel. +//! +//! This kind of channel is also known as *rendezvous* channel. + +use super::context::Context; +use super::error::*; +use super::select::{Operation, Selected, Token}; +use super::utils::Backoff; +use super::waker::Waker; + +use crate::cell::UnsafeCell; +use crate::marker::PhantomData; +use crate::sync::atomic::{AtomicBool, Ordering}; +use crate::sync::Mutex; +use crate::time::Instant; +use crate::{fmt, ptr}; + +/// A pointer to a packet. +pub(crate) struct ZeroToken(*mut ()); + +impl Default for ZeroToken { + fn default() -> Self { + Self(ptr::null_mut()) + } +} + +impl fmt::Debug for ZeroToken { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&(self.0 as usize), f) + } +} + +/// A slot for passing one message from a sender to a receiver. +struct Packet { + /// Equals `true` if the packet is allocated on the stack. + on_stack: bool, + + /// Equals `true` once the packet is ready for reading or writing. + ready: AtomicBool, + + /// The message. + msg: UnsafeCell>, +} + +impl Packet { + /// Creates an empty packet on the stack. + fn empty_on_stack() -> Packet { + Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(None) } + } + + /// Creates a packet on the stack, containing a message. + fn message_on_stack(msg: T) -> Packet { + Packet { on_stack: true, ready: AtomicBool::new(false), msg: UnsafeCell::new(Some(msg)) } + } + + /// Waits until the packet becomes ready for reading or writing. + fn wait_ready(&self) { + let backoff = Backoff::new(); + while !self.ready.load(Ordering::Acquire) { + backoff.spin_heavy(); + } + } +} + +/// Inner representation of a zero-capacity channel. +struct Inner { + /// Senders waiting to pair up with a receive operation. + senders: Waker, + + /// Receivers waiting to pair up with a send operation. + receivers: Waker, + + /// Equals `true` when the channel is disconnected. + is_disconnected: bool, +} + +/// Zero-capacity channel. +pub(crate) struct Channel { + /// Inner representation of the channel. + inner: Mutex, + + /// Indicates that dropping a `Channel` may drop values of type `T`. + _marker: PhantomData, +} + +impl Channel { + /// Constructs a new zero-capacity channel. + pub(crate) fn new() -> Self { + Channel { + inner: Mutex::new(Inner { + senders: Waker::new(), + receivers: Waker::new(), + is_disconnected: false, + }), + _marker: PhantomData, + } + } + + /// Writes a message into the packet. + pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> { + // If there is no packet, the channel is disconnected. + if token.zero.0.is_null() { + return Err(msg); + } + + let packet = &*(token.zero.0 as *const Packet); + packet.msg.get().write(Some(msg)); + packet.ready.store(true, Ordering::Release); + Ok(()) + } + + /// Reads a message from the packet. + pub(crate) unsafe fn read(&self, token: &mut Token) -> Result { + // If there is no packet, the channel is disconnected. + if token.zero.0.is_null() { + return Err(()); + } + + let packet = &*(token.zero.0 as *const Packet); + + if packet.on_stack { + // The message has been in the packet from the beginning, so there is no need to wait + // for it. However, after reading the message, we need to set `ready` to `true` in + // order to signal that the packet can be destroyed. + let msg = packet.msg.get().replace(None).unwrap(); + packet.ready.store(true, Ordering::Release); + Ok(msg) + } else { + // Wait until the message becomes available, then read it and destroy the + // heap-allocated packet. + packet.wait_ready(); + let msg = packet.msg.get().replace(None).unwrap(); + drop(Box::from_raw(token.zero.0 as *mut Packet)); + Ok(msg) + } + } + + /// Attempts to send a message into the channel. + pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError> { + let token = &mut Token::default(); + let mut inner = self.inner.lock().unwrap(); + + // If there's a waiting receiver, pair up with it. + if let Some(operation) = inner.receivers.try_select() { + token.zero.0 = operation.packet; + drop(inner); + unsafe { + self.write(token, msg).ok().unwrap(); + } + Ok(()) + } else if inner.is_disconnected { + Err(TrySendError::Disconnected(msg)) + } else { + Err(TrySendError::Full(msg)) + } + } + + /// Sends a message into the channel. + pub(crate) fn send( + &self, + msg: T, + deadline: Option, + ) -> Result<(), SendTimeoutError> { + let token = &mut Token::default(); + let mut inner = self.inner.lock().unwrap(); + + // If there's a waiting receiver, pair up with it. + if let Some(operation) = inner.receivers.try_select() { + token.zero.0 = operation.packet; + drop(inner); + unsafe { + self.write(token, msg).ok().unwrap(); + } + return Ok(()); + } + + if inner.is_disconnected { + return Err(SendTimeoutError::Disconnected(msg)); + } + + Context::with(|cx| { + // Prepare for blocking until a receiver wakes us up. + let oper = Operation::hook(token); + let mut packet = Packet::::message_on_stack(msg); + inner.senders.register_with_packet(oper, &mut packet as *mut Packet as *mut (), cx); + inner.receivers.notify(); + drop(inner); + + // Block the current thread. + let sel = cx.wait_until(deadline); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted => { + self.inner.lock().unwrap().senders.unregister(oper).unwrap(); + let msg = unsafe { packet.msg.get().replace(None).unwrap() }; + Err(SendTimeoutError::Timeout(msg)) + } + Selected::Disconnected => { + self.inner.lock().unwrap().senders.unregister(oper).unwrap(); + let msg = unsafe { packet.msg.get().replace(None).unwrap() }; + Err(SendTimeoutError::Disconnected(msg)) + } + Selected::Operation(_) => { + // Wait until the message is read, then drop the packet. + packet.wait_ready(); + Ok(()) + } + } + }) + } + + /// Attempts to receive a message without blocking. + pub(crate) fn try_recv(&self) -> Result { + let token = &mut Token::default(); + let mut inner = self.inner.lock().unwrap(); + + // If there's a waiting sender, pair up with it. + if let Some(operation) = inner.senders.try_select() { + token.zero.0 = operation.packet; + drop(inner); + unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) } + } else if inner.is_disconnected { + Err(TryRecvError::Disconnected) + } else { + Err(TryRecvError::Empty) + } + } + + /// Receives a message from the channel. + pub(crate) fn recv(&self, deadline: Option) -> Result { + let token = &mut Token::default(); + let mut inner = self.inner.lock().unwrap(); + + // If there's a waiting sender, pair up with it. + if let Some(operation) = inner.senders.try_select() { + token.zero.0 = operation.packet; + drop(inner); + unsafe { + return self.read(token).map_err(|_| RecvTimeoutError::Disconnected); + } + } + + if inner.is_disconnected { + return Err(RecvTimeoutError::Disconnected); + } + + Context::with(|cx| { + // Prepare for blocking until a sender wakes us up. + let oper = Operation::hook(token); + let mut packet = Packet::::empty_on_stack(); + inner.receivers.register_with_packet( + oper, + &mut packet as *mut Packet as *mut (), + cx, + ); + inner.senders.notify(); + drop(inner); + + // Block the current thread. + let sel = cx.wait_until(deadline); + + match sel { + Selected::Waiting => unreachable!(), + Selected::Aborted => { + self.inner.lock().unwrap().receivers.unregister(oper).unwrap(); + Err(RecvTimeoutError::Timeout) + } + Selected::Disconnected => { + self.inner.lock().unwrap().receivers.unregister(oper).unwrap(); + Err(RecvTimeoutError::Disconnected) + } + Selected::Operation(_) => { + // Wait until the message is provided, then read it. + packet.wait_ready(); + unsafe { Ok(packet.msg.get().replace(None).unwrap()) } + } + } + }) + } + + /// Disconnects the channel and wakes up all blocked senders and receivers. + /// + /// Returns `true` if this call disconnected the channel. + pub(crate) fn disconnect(&self) -> bool { + let mut inner = self.inner.lock().unwrap(); + + if !inner.is_disconnected { + inner.is_disconnected = true; + inner.senders.disconnect(); + inner.receivers.disconnect(); + true + } else { + false + } + } + + /// Returns the current number of messages inside the channel. + pub(crate) fn len(&self) -> usize { + 0 + } + + /// Returns the capacity of the channel. + #[allow(clippy::unnecessary_wraps)] // This is intentional. + pub(crate) fn capacity(&self) -> Option { + Some(0) + } + + /// Returns `true` if the channel is empty. + pub(crate) fn is_empty(&self) -> bool { + true + } + + /// Returns `true` if the channel is full. + pub(crate) fn is_full(&self) -> bool { + true + } +} -- cgit v1.2.3