summaryrefslogtreecommitdiffstats
path: root/library/std/src/sync/mpsc/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'library/std/src/sync/mpsc/mod.rs')
-rw-r--r--library/std/src/sync/mpsc/mod.rs473
1 files changed, 33 insertions, 440 deletions
diff --git a/library/std/src/sync/mpsc/mod.rs b/library/std/src/sync/mpsc/mod.rs
index e85a87239..6e3c28f10 100644
--- a/library/std/src/sync/mpsc/mod.rs
+++ b/library/std/src/sync/mpsc/mod.rs
@@ -143,175 +143,16 @@ mod tests;
#[cfg(all(test, not(target_os = "emscripten")))]
mod sync_tests;
-// A description of how Rust's channel implementation works
-//
-// Channels are supposed to be the basic building block for all other
-// concurrent primitives that are used in Rust. As a result, the channel type
-// needs to be highly optimized, flexible, and broad enough for use everywhere.
-//
-// The choice of implementation of all channels is to be built on lock-free data
-// structures. The channels themselves are then consequently also lock-free data
-// structures. As always with lock-free code, this is a very "here be dragons"
-// territory, especially because I'm unaware of any academic papers that have
-// gone into great length about channels of these flavors.
-//
-// ## Flavors of channels
-//
-// From the perspective of a consumer of this library, there is only one flavor
-// of channel. This channel can be used as a stream and cloned to allow multiple
-// senders. Under the hood, however, there are actually three flavors of
-// channels in play.
-//
-// * Flavor::Oneshots - these channels are highly optimized for the one-send use
-// case. They contain as few atomics as possible and
-// involve one and exactly one allocation.
-// * Streams - these channels are optimized for the non-shared use case. They
-// use a different concurrent queue that is more tailored for this
-// use case. The initial allocation of this flavor of channel is not
-// optimized.
-// * Shared - this is the most general form of channel that this module offers,
-// a channel with multiple senders. This type is as optimized as it
-// can be, but the previous two types mentioned are much faster for
-// their use-cases.
-//
-// ## Concurrent queues
-//
-// The basic idea of Rust's Sender/Receiver types is that send() never blocks,
-// but recv() obviously blocks. This means that under the hood there must be
-// some shared and concurrent queue holding all of the actual data.
-//
-// With two flavors of channels, two flavors of queues are also used. We have
-// chosen to use queues from a well-known author that are abbreviated as SPSC
-// and MPSC (single producer, single consumer and multiple producer, single
-// consumer). SPSC queues are used for streams while MPSC queues are used for
-// shared channels.
-//
-// ### SPSC optimizations
-//
-// The SPSC queue found online is essentially a linked list of nodes where one
-// half of the nodes are the "queue of data" and the other half of nodes are a
-// cache of unused nodes. The unused nodes are used such that an allocation is
-// not required on every push() and a free doesn't need to happen on every
-// pop().
-//
-// As found online, however, the cache of nodes is of an infinite size. This
-// means that if a channel at one point in its life had 50k items in the queue,
-// then the queue will always have the capacity for 50k items. I believed that
-// this was an unnecessary limitation of the implementation, so I have altered
-// the queue to optionally have a bound on the cache size.
-//
-// By default, streams will have an unbounded SPSC queue with a small-ish cache
-// size. The hope is that the cache is still large enough to have very fast
-// send() operations while not too large such that millions of channels can
-// coexist at once.
-//
-// ### MPSC optimizations
-//
-// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
-// a linked list under the hood to earn its unboundedness, but I have not put
-// forth much effort into having a cache of nodes similar to the SPSC queue.
-//
-// For now, I believe that this is "ok" because shared channels are not the most
-// common type, but soon we may wish to revisit this queue choice and determine
-// another candidate for backend storage of shared channels.
-//
-// ## Overview of the Implementation
-//
-// Now that there's a little background on the concurrent queues used, it's
-// worth going into much more detail about the channels themselves. The basic
-// pseudocode for a send/recv are:
-//
-//
-// send(t) recv()
-// queue.push(t) return if queue.pop()
-// if increment() == -1 deschedule {
-// wakeup() if decrement() > 0
-// cancel_deschedule()
-// }
-// queue.pop()
-//
-// As mentioned before, there are no locks in this implementation, only atomic
-// instructions are used.
-//
-// ### The internal atomic counter
-//
-// Every channel has a shared counter with each half to keep track of the size
-// of the queue. This counter is used to abort descheduling by the receiver and
-// to know when to wake up on the sending side.
-//
-// As seen in the pseudocode, senders will increment this count and receivers
-// will decrement the count. The theory behind this is that if a sender sees a
-// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
-// then it doesn't need to block.
-//
-// The recv() method has a beginning call to pop(), and if successful, it needs
-// to decrement the count. It is a crucial implementation detail that this
-// decrement does *not* happen to the shared counter. If this were the case,
-// then it would be possible for the counter to be very negative when there were
-// no receivers waiting, in which case the senders would have to determine when
-// it was actually appropriate to wake up a receiver.
-//
-// Instead, the "steal count" is kept track of separately (not atomically
-// because it's only used by receivers), and then the decrement() call when
-// descheduling will lump in all of the recent steals into one large decrement.
-//
-// The implication of this is that if a sender sees a -1 count, then there's
-// guaranteed to be a waiter waiting!
-//
-// ## Native Implementation
-//
-// A major goal of these channels is to work seamlessly on and off the runtime.
-// All of the previous race conditions have been worded in terms of
-// scheduler-isms (which is obviously not available without the runtime).
-//
-// For now, native usage of channels (off the runtime) will fall back onto
-// mutexes/cond vars for descheduling/atomic decisions. The no-contention path
-// is still entirely lock-free, the "deschedule" blocks above are surrounded by
-// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
-// condition variable.
-//
-// ## Select
-//
-// Being able to support selection over channels has greatly influenced this
-// design, and not only does selection need to work inside the runtime, but also
-// outside the runtime.
-//
-// The implementation is fairly straightforward. The goal of select() is not to
-// return some data, but only to return which channel can receive data without
-// blocking. The implementation is essentially the entire blocking procedure
-// followed by an increment as soon as its woken up. The cancellation procedure
-// involves an increment and swapping out of to_wake to acquire ownership of the
-// thread to unblock.
-//
-// Sadly this current implementation requires multiple allocations, so I have
-// seen the throughput of select() be much worse than it should be. I do not
-// believe that there is anything fundamental that needs to change about these
-// channels, however, in order to support a more efficient select().
-//
-// FIXME: Select is now removed, so these factors are ready to be cleaned up!
-//
-// # Conclusion
-//
-// And now that you've seen all the races that I found and attempted to fix,
-// here's the code for you to find some more!
-
-use crate::cell::UnsafeCell;
+// MPSC channels are built as a wrapper around MPMC channels, which
+// were ported from the `crossbeam-channel` crate. MPMC channels are
+// not exposed publicly, but if you are curious about the implementation,
+// that's where everything is.
+
use crate::error;
use crate::fmt;
-use crate::mem;
-use crate::sync::Arc;
+use crate::sync::mpmc;
use crate::time::{Duration, Instant};
-mod blocking;
-mod mpsc_queue;
-mod oneshot;
-mod shared;
-mod spsc_queue;
-mod stream;
-mod sync;
-
-mod cache_aligned;
-
/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
/// This half can only be owned by one thread.
///
@@ -341,7 +182,7 @@ mod cache_aligned;
#[stable(feature = "rust1", since = "1.0.0")]
#[cfg_attr(not(test), rustc_diagnostic_item = "Receiver")]
pub struct Receiver<T> {
- inner: UnsafeCell<Flavor<T>>,
+ inner: mpmc::Receiver<T>,
}
// The receiver port can be sent from place to place, so long as it
@@ -498,7 +339,7 @@ pub struct IntoIter<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct Sender<T> {
- inner: UnsafeCell<Flavor<T>>,
+ inner: mpmc::Sender<T>,
}
// The send port can be sent from place to place, so long as it
@@ -557,7 +398,7 @@ impl<T> !Sync for Sender<T> {}
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct SyncSender<T> {
- inner: Arc<sync::Packet<T>>,
+ inner: mpmc::Sender<T>,
}
#[stable(feature = "rust1", since = "1.0.0")]
@@ -643,34 +484,6 @@ pub enum TrySendError<T> {
Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
}
-enum Flavor<T> {
- Oneshot(Arc<oneshot::Packet<T>>),
- Stream(Arc<stream::Packet<T>>),
- Shared(Arc<shared::Packet<T>>),
- Sync(Arc<sync::Packet<T>>),
-}
-
-#[doc(hidden)]
-trait UnsafeFlavor<T> {
- fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
- unsafe fn inner_mut(&self) -> &mut Flavor<T> {
- &mut *self.inner_unsafe().get()
- }
- unsafe fn inner(&self) -> &Flavor<T> {
- &*self.inner_unsafe().get()
- }
-}
-impl<T> UnsafeFlavor<T> for Sender<T> {
- fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
- &self.inner
- }
-}
-impl<T> UnsafeFlavor<T> for Receiver<T> {
- fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
- &self.inner
- }
-}
-
/// Creates a new asynchronous channel, returning the sender/receiver halves.
/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
/// the same order as it was sent, and no [`send`] will block the calling thread
@@ -711,8 +524,8 @@ impl<T> UnsafeFlavor<T> for Receiver<T> {
#[must_use]
#[stable(feature = "rust1", since = "1.0.0")]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
- let a = Arc::new(oneshot::Packet::new());
- (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
+ let (tx, rx) = mpmc::channel();
+ (Sender { inner: tx }, Receiver { inner: rx })
}
/// Creates a new synchronous, bounded channel.
@@ -760,8 +573,8 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
#[must_use]
#[stable(feature = "rust1", since = "1.0.0")]
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
- let a = Arc::new(sync::Packet::new(bound));
- (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
+ let (tx, rx) = mpmc::sync_channel(bound);
+ (SyncSender { inner: tx }, Receiver { inner: rx })
}
////////////////////////////////////////////////////////////////////////////////
@@ -769,10 +582,6 @@ pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
////////////////////////////////////////////////////////////////////////////////
impl<T> Sender<T> {
- fn new(inner: Flavor<T>) -> Sender<T> {
- Sender { inner: UnsafeCell::new(inner) }
- }
-
/// Attempts to send a value on this channel, returning it back if it could
/// not be sent.
///
@@ -802,40 +611,7 @@ impl<T> Sender<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
- let (new_inner, ret) = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => {
- if !p.sent() {
- return p.send(t).map_err(SendError);
- } else {
- let a = Arc::new(stream::Packet::new());
- let rx = Receiver::new(Flavor::Stream(a.clone()));
- match p.upgrade(rx) {
- oneshot::UpSuccess => {
- let ret = a.send(t);
- (a, ret)
- }
- oneshot::UpDisconnected => (a, Err(t)),
- oneshot::UpWoke(token) => {
- // This send cannot panic because the thread is
- // asleep (we're looking at it), so the receiver
- // can't go away.
- a.send(t).ok().unwrap();
- token.signal();
- (a, Ok(()))
- }
- }
- }
- }
- Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
- Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
- Flavor::Sync(..) => unreachable!(),
- };
-
- unsafe {
- let tmp = Sender::new(Flavor::Stream(new_inner));
- mem::swap(self.inner_mut(), tmp.inner_mut());
- }
- ret.map_err(SendError)
+ self.inner.send(t)
}
}
@@ -847,58 +623,13 @@ impl<T> Clone for Sender<T> {
/// (including the original) need to be dropped in order for
/// [`Receiver::recv`] to stop blocking.
fn clone(&self) -> Sender<T> {
- let packet = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => {
- let a = Arc::new(shared::Packet::new());
- {
- let guard = a.postinit_lock();
- let rx = Receiver::new(Flavor::Shared(a.clone()));
- let sleeper = match p.upgrade(rx) {
- oneshot::UpSuccess | oneshot::UpDisconnected => None,
- oneshot::UpWoke(task) => Some(task),
- };
- a.inherit_blocker(sleeper, guard);
- }
- a
- }
- Flavor::Stream(ref p) => {
- let a = Arc::new(shared::Packet::new());
- {
- let guard = a.postinit_lock();
- let rx = Receiver::new(Flavor::Shared(a.clone()));
- let sleeper = match p.upgrade(rx) {
- stream::UpSuccess | stream::UpDisconnected => None,
- stream::UpWoke(task) => Some(task),
- };
- a.inherit_blocker(sleeper, guard);
- }
- a
- }
- Flavor::Shared(ref p) => {
- p.clone_chan();
- return Sender::new(Flavor::Shared(p.clone()));
- }
- Flavor::Sync(..) => unreachable!(),
- };
-
- unsafe {
- let tmp = Sender::new(Flavor::Shared(packet.clone()));
- mem::swap(self.inner_mut(), tmp.inner_mut());
- }
- Sender::new(Flavor::Shared(packet))
+ Sender { inner: self.inner.clone() }
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for Sender<T> {
- fn drop(&mut self) {
- match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => p.drop_chan(),
- Flavor::Stream(ref p) => p.drop_chan(),
- Flavor::Shared(ref p) => p.drop_chan(),
- Flavor::Sync(..) => unreachable!(),
- }
- }
+ fn drop(&mut self) {}
}
#[stable(feature = "mpsc_debug", since = "1.8.0")]
@@ -913,10 +644,6 @@ impl<T> fmt::Debug for Sender<T> {
////////////////////////////////////////////////////////////////////////////////
impl<T> SyncSender<T> {
- fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
- SyncSender { inner }
- }
-
/// Sends a value on this synchronous channel.
///
/// This function will *block* until space in the internal buffer becomes
@@ -955,7 +682,7 @@ impl<T> SyncSender<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
- self.inner.send(t).map_err(SendError)
+ self.inner.send(t)
}
/// Attempts to send a value on this channel without blocking.
@@ -1011,21 +738,27 @@ impl<T> SyncSender<T> {
pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
self.inner.try_send(t)
}
+
+ // Attempts to send for a value on this receiver, returning an error if the
+ // corresponding channel has hung up, or if it waits more than `timeout`.
+ //
+ // This method is currently private and only used for tests.
+ #[allow(unused)]
+ fn send_timeout(&self, t: T, timeout: Duration) -> Result<(), mpmc::SendTimeoutError<T>> {
+ self.inner.send_timeout(t, timeout)
+ }
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Clone for SyncSender<T> {
fn clone(&self) -> SyncSender<T> {
- self.inner.clone_chan();
- SyncSender::new(self.inner.clone())
+ SyncSender { inner: self.inner.clone() }
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for SyncSender<T> {
- fn drop(&mut self) {
- self.inner.drop_chan();
- }
+ fn drop(&mut self) {}
}
#[stable(feature = "mpsc_debug", since = "1.8.0")]
@@ -1040,10 +773,6 @@ impl<T> fmt::Debug for SyncSender<T> {
////////////////////////////////////////////////////////////////////////////////
impl<T> Receiver<T> {
- fn new(inner: Flavor<T>) -> Receiver<T> {
- Receiver { inner: UnsafeCell::new(inner) }
- }
-
/// Attempts to return a pending value on this receiver without blocking.
///
/// This method will never block the caller in order to wait for data to
@@ -1069,35 +798,7 @@ impl<T> Receiver<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn try_recv(&self) -> Result<T, TryRecvError> {
- loop {
- let new_port = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(oneshot::Empty) => return Err(TryRecvError::Empty),
- Err(oneshot::Disconnected) => return Err(TryRecvError::Disconnected),
- Err(oneshot::Upgraded(rx)) => rx,
- },
- Flavor::Stream(ref p) => match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(stream::Empty) => return Err(TryRecvError::Empty),
- Err(stream::Disconnected) => return Err(TryRecvError::Disconnected),
- Err(stream::Upgraded(rx)) => rx,
- },
- Flavor::Shared(ref p) => match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(shared::Empty) => return Err(TryRecvError::Empty),
- Err(shared::Disconnected) => return Err(TryRecvError::Disconnected),
- },
- Flavor::Sync(ref p) => match p.try_recv() {
- Ok(t) => return Ok(t),
- Err(sync::Empty) => return Err(TryRecvError::Empty),
- Err(sync::Disconnected) => return Err(TryRecvError::Disconnected),
- },
- };
- unsafe {
- mem::swap(self.inner_mut(), new_port.inner_mut());
- }
- }
+ self.inner.try_recv()
}
/// Attempts to wait for a value on this receiver, returning an error if the
@@ -1156,31 +857,7 @@ impl<T> Receiver<T> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn recv(&self) -> Result<T, RecvError> {
- loop {
- let new_port = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => match p.recv(None) {
- Ok(t) => return Ok(t),
- Err(oneshot::Disconnected) => return Err(RecvError),
- Err(oneshot::Upgraded(rx)) => rx,
- Err(oneshot::Empty) => unreachable!(),
- },
- Flavor::Stream(ref p) => match p.recv(None) {
- Ok(t) => return Ok(t),
- Err(stream::Disconnected) => return Err(RecvError),
- Err(stream::Upgraded(rx)) => rx,
- Err(stream::Empty) => unreachable!(),
- },
- Flavor::Shared(ref p) => match p.recv(None) {
- Ok(t) => return Ok(t),
- Err(shared::Disconnected) => return Err(RecvError),
- Err(shared::Empty) => unreachable!(),
- },
- Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
- };
- unsafe {
- mem::swap(self.inner_mut(), new_port.inner_mut());
- }
- }
+ self.inner.recv()
}
/// Attempts to wait for a value on this receiver, returning an error if the
@@ -1198,34 +875,6 @@ impl<T> Receiver<T> {
/// However, since channels are buffered, messages sent before the disconnect
/// will still be properly received.
///
- /// # Known Issues
- ///
- /// There is currently a known issue (see [`#39364`]) that causes `recv_timeout`
- /// to panic unexpectedly with the following example:
- ///
- /// ```no_run
- /// use std::sync::mpsc::channel;
- /// use std::thread;
- /// use std::time::Duration;
- ///
- /// let (tx, rx) = channel::<String>();
- ///
- /// thread::spawn(move || {
- /// let d = Duration::from_millis(10);
- /// loop {
- /// println!("recv");
- /// let _r = rx.recv_timeout(d);
- /// }
- /// });
- ///
- /// thread::sleep(Duration::from_millis(100));
- /// let _c1 = tx.clone();
- ///
- /// thread::sleep(Duration::from_secs(1));
- /// ```
- ///
- /// [`#39364`]: https://github.com/rust-lang/rust/issues/39364
- ///
/// # Examples
///
/// Successfully receiving value before encountering timeout:
@@ -1268,17 +917,7 @@ impl<T> Receiver<T> {
/// ```
#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
- // Do an optimistic try_recv to avoid the performance impact of
- // Instant::now() in the full-channel case.
- match self.try_recv() {
- Ok(result) => Ok(result),
- Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected),
- Err(TryRecvError::Empty) => match Instant::now().checked_add(timeout) {
- Some(deadline) => self.recv_deadline(deadline),
- // So far in the future that it's practically the same as waiting indefinitely.
- None => self.recv().map_err(RecvTimeoutError::from),
- },
- }
+ self.inner.recv_timeout(timeout)
}
/// Attempts to wait for a value on this receiver, returning an error if the
@@ -1339,46 +978,7 @@ impl<T> Receiver<T> {
/// ```
#[unstable(feature = "deadline_api", issue = "46316")]
pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
- use self::RecvTimeoutError::*;
-
- loop {
- let port_or_empty = match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(oneshot::Disconnected) => return Err(Disconnected),
- Err(oneshot::Upgraded(rx)) => Some(rx),
- Err(oneshot::Empty) => None,
- },
- Flavor::Stream(ref p) => match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(stream::Disconnected) => return Err(Disconnected),
- Err(stream::Upgraded(rx)) => Some(rx),
- Err(stream::Empty) => None,
- },
- Flavor::Shared(ref p) => match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(shared::Disconnected) => return Err(Disconnected),
- Err(shared::Empty) => None,
- },
- Flavor::Sync(ref p) => match p.recv(Some(deadline)) {
- Ok(t) => return Ok(t),
- Err(sync::Disconnected) => return Err(Disconnected),
- Err(sync::Empty) => None,
- },
- };
-
- if let Some(new_port) = port_or_empty {
- unsafe {
- mem::swap(self.inner_mut(), new_port.inner_mut());
- }
- }
-
- // If we're already passed the deadline, and we're here without
- // data, return a timeout, else try again.
- if Instant::now() >= deadline {
- return Err(Timeout);
- }
- }
+ self.inner.recv_deadline(deadline)
}
/// Returns an iterator that will block waiting for messages, but never
@@ -1499,14 +1099,7 @@ impl<T> IntoIterator for Receiver<T> {
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> Drop for Receiver<T> {
- fn drop(&mut self) {
- match *unsafe { self.inner() } {
- Flavor::Oneshot(ref p) => p.drop_port(),
- Flavor::Stream(ref p) => p.drop_port(),
- Flavor::Shared(ref p) => p.drop_port(),
- Flavor::Sync(ref p) => p.drop_port(),
- }
- }
+ fn drop(&mut self) {}
}
#[stable(feature = "mpsc_debug", since = "1.8.0")]