diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/futures-0.1.31/src/sync | |
parent | Initial commit. (diff) | |
download | firefox-upstream.tar.xz firefox-upstream.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/sync')
-rw-r--r-- | third_party/rust/futures-0.1.31/src/sync/bilock.rs | 298 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.31/src/sync/mod.rs | 17 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.31/src/sync/mpsc/mod.rs | 1187 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.31/src/sync/mpsc/queue.rs | 151 | ||||
-rw-r--r-- | third_party/rust/futures-0.1.31/src/sync/oneshot.rs | 611 |
5 files changed, 2264 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/sync/bilock.rs b/third_party/rust/futures-0.1.31/src/sync/bilock.rs new file mode 100644 index 0000000000..af9e1eeb2c --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/sync/bilock.rs @@ -0,0 +1,298 @@ +use std::any::Any; +use std::boxed::Box; +use std::cell::UnsafeCell; +use std::error::Error; +use std::fmt; +use std::mem; +use std::ops::{Deref, DerefMut}; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; + +use {Async, Future, Poll}; +use task::{self, Task}; + +/// A type of futures-powered synchronization primitive which is a mutex between +/// two possible owners. +/// +/// This primitive is not as generic as a full-blown mutex but is sufficient for +/// many use cases where there are only two possible owners of a resource. The +/// implementation of `BiLock` can be more optimized for just the two possible +/// owners. +/// +/// Note that it's possible to use this lock through a poll-style interface with +/// the `poll_lock` method but you can also use it as a future with the `lock` +/// method that consumes a `BiLock` and returns a future that will resolve when +/// it's locked. +/// +/// A `BiLock` is typically used for "split" operations where data which serves +/// two purposes wants to be split into two to be worked with separately. For +/// example a TCP stream could be both a reader and a writer or a framing layer +/// could be both a stream and a sink for messages. A `BiLock` enables splitting +/// these two and then using each independently in a futures-powered fashion. +#[derive(Debug)] +pub struct BiLock<T> { + inner: Arc<Inner<T>>, +} + +#[derive(Debug)] +struct Inner<T> { + state: AtomicUsize, + inner: Option<UnsafeCell<T>>, +} + +unsafe impl<T: Send> Send for Inner<T> {} +unsafe impl<T: Send> Sync for Inner<T> {} + +impl<T> BiLock<T> { + /// Creates a new `BiLock` protecting the provided data. + /// + /// Two handles to the lock are returned, and these are the only two handles + /// that will ever be available to the lock. These can then be sent to separate + /// tasks to be managed there. + pub fn new(t: T) -> (BiLock<T>, BiLock<T>) { + let inner = Arc::new(Inner { + state: AtomicUsize::new(0), + inner: Some(UnsafeCell::new(t)), + }); + + (BiLock { inner: inner.clone() }, BiLock { inner: inner }) + } + + /// Attempt to acquire this lock, returning `NotReady` if it can't be + /// acquired. + /// + /// This function will acquire the lock in a nonblocking fashion, returning + /// immediately if the lock is already held. If the lock is successfully + /// acquired then `Async::Ready` is returned with a value that represents + /// the locked value (and can be used to access the protected data). The + /// lock is unlocked when the returned `BiLockGuard` is dropped. + /// + /// If the lock is already held then this function will return + /// `Async::NotReady`. In this case the current task will also be scheduled + /// to receive a notification when the lock would otherwise become + /// available. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn poll_lock(&self) -> Async<BiLockGuard<T>> { + loop { + match self.inner.state.swap(1, SeqCst) { + // Woohoo, we grabbed the lock! + 0 => return Async::Ready(BiLockGuard { inner: self }), + + // Oops, someone else has locked the lock + 1 => {} + + // A task was previously blocked on this lock, likely our task, + // so we need to update that task. + n => unsafe { + drop(Box::from_raw(n as *mut Task)); + } + } + + let me = Box::new(task::current()); + let me = Box::into_raw(me) as usize; + + match self.inner.state.compare_exchange(1, me, SeqCst, SeqCst) { + // The lock is still locked, but we've now parked ourselves, so + // just report that we're scheduled to receive a notification. + Ok(_) => return Async::NotReady, + + // Oops, looks like the lock was unlocked after our swap above + // and before the compare_exchange. Deallocate what we just + // allocated and go through the loop again. + Err(0) => unsafe { + drop(Box::from_raw(me as *mut Task)); + }, + + // The top of this loop set the previous state to 1, so if we + // failed the CAS above then it's because the previous value was + // *not* zero or one. This indicates that a task was blocked, + // but we're trying to acquire the lock and there's only one + // other reference of the lock, so it should be impossible for + // that task to ever block itself. + Err(n) => panic!("invalid state: {}", n), + } + } + } + + /// Perform a "blocking lock" of this lock, consuming this lock handle and + /// returning a future to the acquired lock. + /// + /// This function consumes the `BiLock<T>` and returns a sentinel future, + /// `BiLockAcquire<T>`. The returned future will resolve to + /// `BiLockAcquired<T>` which represents a locked lock similarly to + /// `BiLockGuard<T>`. + /// + /// Note that the returned future will never resolve to an error. + pub fn lock(self) -> BiLockAcquire<T> { + BiLockAcquire { + inner: Some(self), + } + } + + /// Attempts to put the two "halves" of a `BiLock<T>` back together and + /// recover the original value. Succeeds only if the two `BiLock<T>`s + /// originated from the same call to `BiLock::new`. + pub fn reunite(self, other: Self) -> Result<T, ReuniteError<T>> { + if &*self.inner as *const _ == &*other.inner as *const _ { + drop(other); + let inner = Arc::try_unwrap(self.inner) + .ok() + .expect("futures: try_unwrap failed in BiLock<T>::reunite"); + Ok(unsafe { inner.into_inner() }) + } else { + Err(ReuniteError(self, other)) + } + } + + fn unlock(&self) { + match self.inner.state.swap(0, SeqCst) { + // we've locked the lock, shouldn't be possible for us to see an + // unlocked lock. + 0 => panic!("invalid unlocked state"), + + // Ok, no one else tried to get the lock, we're done. + 1 => {} + + // Another task has parked themselves on this lock, let's wake them + // up as its now their turn. + n => unsafe { + Box::from_raw(n as *mut Task).notify(); + } + } + } +} + +impl<T> Inner<T> { + unsafe fn into_inner(mut self) -> T { + mem::replace(&mut self.inner, None).unwrap().into_inner() + } +} + +impl<T> Drop for Inner<T> { + fn drop(&mut self) { + assert_eq!(self.state.load(SeqCst), 0); + } +} + +/// Error indicating two `BiLock<T>`s were not two halves of a whole, and +/// thus could not be `reunite`d. +pub struct ReuniteError<T>(pub BiLock<T>, pub BiLock<T>); + +impl<T> fmt::Debug for ReuniteError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("ReuniteError") + .field(&"...") + .finish() + } +} + +impl<T> fmt::Display for ReuniteError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "tried to reunite two BiLocks that don't form a pair") + } +} + +impl<T: Any> Error for ReuniteError<T> { + fn description(&self) -> &str { + "tried to reunite two BiLocks that don't form a pair" + } +} + +/// Returned RAII guard from the `poll_lock` method. +/// +/// This structure acts as a sentinel to the data in the `BiLock<T>` itself, +/// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be +/// unlocked. +#[derive(Debug)] +pub struct BiLockGuard<'a, T: 'a> { + inner: &'a BiLock<T>, +} + +impl<'a, T> Deref for BiLockGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + unsafe { &*self.inner.inner.inner.as_ref().unwrap().get() } + } +} + +impl<'a, T> DerefMut for BiLockGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.inner.inner.inner.as_ref().unwrap().get() } + } +} + +impl<'a, T> Drop for BiLockGuard<'a, T> { + fn drop(&mut self) { + self.inner.unlock(); + } +} + +/// Future returned by `BiLock::lock` which will resolve when the lock is +/// acquired. +#[derive(Debug)] +pub struct BiLockAcquire<T> { + inner: Option<BiLock<T>>, +} + +impl<T> Future for BiLockAcquire<T> { + type Item = BiLockAcquired<T>; + type Error = (); + + fn poll(&mut self) -> Poll<BiLockAcquired<T>, ()> { + match self.inner.as_ref().expect("cannot poll after Ready").poll_lock() { + Async::Ready(r) => { + mem::forget(r); + } + Async::NotReady => return Ok(Async::NotReady), + } + Ok(Async::Ready(BiLockAcquired { inner: self.inner.take() })) + } +} + +/// Resolved value of the `BiLockAcquire<T>` future. +/// +/// This value, like `BiLockGuard<T>`, is a sentinel to the value `T` through +/// implementations of `Deref` and `DerefMut`. When dropped will unlock the +/// lock, and the original unlocked `BiLock<T>` can be recovered through the +/// `unlock` method. +#[derive(Debug)] +pub struct BiLockAcquired<T> { + inner: Option<BiLock<T>>, +} + +impl<T> BiLockAcquired<T> { + /// Recovers the original `BiLock<T>`, unlocking this lock. + pub fn unlock(mut self) -> BiLock<T> { + let bi_lock = self.inner.take().unwrap(); + + bi_lock.unlock(); + + bi_lock + } +} + +impl<T> Deref for BiLockAcquired<T> { + type Target = T; + fn deref(&self) -> &T { + unsafe { &*self.inner.as_ref().unwrap().inner.inner.as_ref().unwrap().get() } + } +} + +impl<T> DerefMut for BiLockAcquired<T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.inner.as_mut().unwrap().inner.inner.as_ref().unwrap().get() } + } +} + +impl<T> Drop for BiLockAcquired<T> { + fn drop(&mut self) { + if let Some(ref bi_lock) = self.inner { + bi_lock.unlock(); + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/sync/mod.rs b/third_party/rust/futures-0.1.31/src/sync/mod.rs new file mode 100644 index 0000000000..0a46e9afbe --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/sync/mod.rs @@ -0,0 +1,17 @@ +//! Future-aware synchronization +//! +//! This module, which is modeled after `std::sync`, contains user-space +//! synchronization tools that work with futures, streams and sinks. In +//! particular, these synchronizers do *not* block physical OS threads, but +//! instead work at the task level. +//! +//! More information and examples of how to use these synchronization primitives +//! can be found [online at tokio.rs]. +//! +//! [online at tokio.rs]: https://tokio.rs/docs/going-deeper-futures/synchronization/ + +pub mod oneshot; +pub mod mpsc; +mod bilock; + +pub use self::bilock::{BiLock, BiLockGuard, BiLockAcquire, BiLockAcquired}; diff --git a/third_party/rust/futures-0.1.31/src/sync/mpsc/mod.rs b/third_party/rust/futures-0.1.31/src/sync/mpsc/mod.rs new file mode 100644 index 0000000000..31d2320ab6 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/sync/mpsc/mod.rs @@ -0,0 +1,1187 @@ +//! A multi-producer, single-consumer, futures-aware, FIFO queue with back pressure. +//! +//! A channel can be used as a communication primitive between tasks running on +//! `futures-rs` executors. Channel creation provides `Receiver` and `Sender` +//! handles. `Receiver` implements `Stream` and allows a task to read values +//! out of the channel. If there is no message to read from the channel, the +//! current task will be notified when a new value is sent. `Sender` implements +//! the `Sink` trait and allows a task to send messages into the channel. If +//! the channel is at capacity, then send will be rejected and the task will be +//! notified when additional capacity is available. +//! +//! # Disconnection +//! +//! When all `Sender` handles have been dropped, it is no longer possible to +//! send values into the channel. This is considered the termination event of +//! the stream. As such, `Sender::poll` will return `Ok(Ready(None))`. +//! +//! If the receiver handle is dropped, then messages can no longer be read out +//! of the channel. In this case, a `send` will result in an error. +//! +//! # Clean Shutdown +//! +//! If the `Receiver` is simply dropped, then it is possible for there to be +//! messages still in the channel that will not be processed. As such, it is +//! usually desirable to perform a "clean" shutdown. To do this, the receiver +//! will first call `close`, which will prevent any further messages to be sent +//! into the channel. Then, the receiver consumes the channel to completion, at +//! which point the receiver can be dropped. + +// At the core, the channel uses an atomic FIFO queue for message passing. This +// queue is used as the primary coordination primitive. In order to enforce +// capacity limits and handle back pressure, a secondary FIFO queue is used to +// send parked task handles. +// +// The general idea is that the channel is created with a `buffer` size of `n`. +// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed" +// slot to hold a message. This allows `Sender` to know for a fact that a send +// will succeed *before* starting to do the actual work of sending the value. +// Since most of this work is lock-free, once the work starts, it is impossible +// to safely revert. +// +// If the sender is unable to process a send operation, then the current +// task is parked and the handle is sent on the parked task queue. +// +// Note that the implementation guarantees that the channel capacity will never +// exceed the configured limit, however there is no *strict* guarantee that the +// receiver will wake up a parked task *immediately* when a slot becomes +// available. However, it will almost always unpark a task when a slot becomes +// available and it is *guaranteed* that a sender will be unparked when the +// message that caused the sender to become parked is read out of the channel. +// +// The steps for sending a message are roughly: +// +// 1) Increment the channel message count +// 2) If the channel is at capacity, push the task handle onto the wait queue +// 3) Push the message onto the message queue. +// +// The steps for receiving a message are roughly: +// +// 1) Pop a message from the message queue +// 2) Pop a task handle from the wait queue +// 3) Decrement the channel message count. +// +// It's important for the order of operations on lock-free structures to happen +// in reverse order between the sender and receiver. This makes the message +// queue the primary coordination structure and establishes the necessary +// happens-before semantics required for the acquire / release semantics used +// by the queue structure. + +use std::fmt; +use std::error::Error; +use std::any::Any; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::usize; + +use sync::mpsc::queue::{Queue, PopResult}; +use sync::oneshot; +use task::{self, Task}; +use future::Executor; +use sink::SendAll; +use resultstream::{self, Results}; +use {Async, AsyncSink, Future, Poll, StartSend, Sink, Stream}; + +mod queue; + +/// The transmission end of a channel which is used to send values. +/// +/// This is created by the `channel` method. +#[derive(Debug)] +pub struct Sender<T> { + // Channel state shared between the sender and receiver. + inner: Arc<Inner<T>>, + + // Handle to the task that is blocked on this sender. This handle is sent + // to the receiver half in order to be notified when the sender becomes + // unblocked. + sender_task: Arc<Mutex<SenderTask>>, + + // True if the sender might be blocked. This is an optimization to avoid + // having to lock the mutex most of the time. + maybe_parked: bool, +} + +/// The transmission end of a channel which is used to send values. +/// +/// This is created by the `unbounded` method. +#[derive(Debug)] +pub struct UnboundedSender<T>(Sender<T>); + +trait AssertKinds: Send + Sync + Clone {} +impl AssertKinds for UnboundedSender<u32> {} + + +/// The receiving end of a channel which implements the `Stream` trait. +/// +/// This is a concrete implementation of a stream which can be used to represent +/// a stream of values being computed elsewhere. This is created by the +/// `channel` method. +#[derive(Debug)] +pub struct Receiver<T> { + inner: Arc<Inner<T>>, +} + +/// The receiving end of a channel which implements the `Stream` trait. +/// +/// This is a concrete implementation of a stream which can be used to represent +/// a stream of values being computed elsewhere. This is created by the +/// `unbounded` method. +#[derive(Debug)] +pub struct UnboundedReceiver<T>(Receiver<T>); + +/// Error type for sending, used when the receiving end of a channel is +/// dropped +#[derive(Clone, PartialEq, Eq)] +pub struct SendError<T>(T); + +/// Error type returned from `try_send` +#[derive(Clone, PartialEq, Eq)] +pub struct TrySendError<T> { + kind: TrySendErrorKind<T>, +} + +#[derive(Clone, PartialEq, Eq)] +enum TrySendErrorKind<T> { + Full(T), + Disconnected(T), +} + +impl<T> fmt::Debug for SendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("SendError") + .field(&"...") + .finish() + } +} + +impl<T> fmt::Display for SendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "send failed because receiver is gone") + } +} + +impl<T: Any> Error for SendError<T> +{ + fn description(&self) -> &str { + "send failed because receiver is gone" + } +} + +impl<T> SendError<T> { + /// Returns the message that was attempted to be sent but failed. + pub fn into_inner(self) -> T { + self.0 + } +} + +impl<T> fmt::Debug for TrySendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("TrySendError") + .field(&"...") + .finish() + } +} + +impl<T> fmt::Display for TrySendError<T> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + if self.is_full() { + write!(fmt, "send failed because channel is full") + } else { + write!(fmt, "send failed because receiver is gone") + } + } +} + +impl<T: Any> Error for TrySendError<T> { + fn description(&self) -> &str { + if self.is_full() { + "send failed because channel is full" + } else { + "send failed because receiver is gone" + } + } +} + +impl<T> TrySendError<T> { + /// Returns true if this error is a result of the channel being full + pub fn is_full(&self) -> bool { + use self::TrySendErrorKind::*; + + match self.kind { + Full(_) => true, + _ => false, + } + } + + /// Returns true if this error is a result of the receiver being dropped + pub fn is_disconnected(&self) -> bool { + use self::TrySendErrorKind::*; + + match self.kind { + Disconnected(_) => true, + _ => false, + } + } + + /// Returns the message that was attempted to be sent but failed. + pub fn into_inner(self) -> T { + use self::TrySendErrorKind::*; + + match self.kind { + Full(v) | Disconnected(v) => v, + } + } +} + +#[derive(Debug)] +struct Inner<T> { + // Max buffer size of the channel. If `None` then the channel is unbounded. + buffer: Option<usize>, + + // Internal channel state. Consists of the number of messages stored in the + // channel as well as a flag signalling that the channel is closed. + state: AtomicUsize, + + // Atomic, FIFO queue used to send messages to the receiver + message_queue: Queue<Option<T>>, + + // Atomic, FIFO queue used to send parked task handles to the receiver. + parked_queue: Queue<Arc<Mutex<SenderTask>>>, + + // Number of senders in existence + num_senders: AtomicUsize, + + // Handle to the receiver's task. + recv_task: Mutex<ReceiverTask>, +} + +// Struct representation of `Inner::state`. +#[derive(Debug, Clone, Copy)] +struct State { + // `true` when the channel is open + is_open: bool, + + // Number of messages in the channel + num_messages: usize, +} + +#[derive(Debug)] +struct ReceiverTask { + unparked: bool, + task: Option<Task>, +} + +// Returned from Receiver::try_park() +enum TryPark { + Parked, + Closed, + NotEmpty, +} + +// The `is_open` flag is stored in the left-most bit of `Inner::state` +const OPEN_MASK: usize = usize::MAX - (usize::MAX >> 1); + +// When a new channel is created, it is created in the open state with no +// pending messages. +const INIT_STATE: usize = OPEN_MASK; + +// The maximum number of messages that a channel can track is `usize::MAX >> 1` +const MAX_CAPACITY: usize = !(OPEN_MASK); + +// The maximum requested buffer size must be less than the maximum capacity of +// a channel. This is because each sender gets a guaranteed slot. +const MAX_BUFFER: usize = MAX_CAPACITY >> 1; + +// Sent to the consumer to wake up blocked producers +#[derive(Debug)] +struct SenderTask { + task: Option<Task>, + is_parked: bool, +} + +impl SenderTask { + fn new() -> Self { + SenderTask { + task: None, + is_parked: false, + } + } + + fn notify(&mut self) { + self.is_parked = false; + + if let Some(task) = self.task.take() { + task.notify(); + } + } +} + +/// Creates an in-memory channel implementation of the `Stream` trait with +/// bounded capacity. +/// +/// This method creates a concrete implementation of the `Stream` trait which +/// can be used to send values across threads in a streaming fashion. This +/// channel is unique in that it implements back pressure to ensure that the +/// sender never outpaces the receiver. The channel capacity is equal to +/// `buffer + num-senders`. In other words, each sender gets a guaranteed slot +/// in the channel capacity, and on top of that there are `buffer` "first come, +/// first serve" slots available to all senders. +/// +/// The `Receiver` returned implements the `Stream` trait and has access to any +/// number of the associated combinators for transforming the result. +pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) { + // Check that the requested buffer size does not exceed the maximum buffer + // size permitted by the system. + assert!(buffer < MAX_BUFFER, "requested buffer size too large"); + channel2(Some(buffer)) +} + +/// Creates an in-memory channel implementation of the `Stream` trait with +/// unbounded capacity. +/// +/// This method creates a concrete implementation of the `Stream` trait which +/// can be used to send values across threads in a streaming fashion. A `send` +/// on this channel will always succeed as long as the receive half has not +/// been closed. If the receiver falls behind, messages will be buffered +/// internally. +/// +/// **Note** that the amount of available system memory is an implicit bound to +/// the channel. Using an `unbounded` channel has the ability of causing the +/// process to run out of memory. In this case, the process will be aborted. +pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { + let (tx, rx) = channel2(None); + (UnboundedSender(tx), UnboundedReceiver(rx)) +} + +fn channel2<T>(buffer: Option<usize>) -> (Sender<T>, Receiver<T>) { + let inner = Arc::new(Inner { + buffer: buffer, + state: AtomicUsize::new(INIT_STATE), + message_queue: Queue::new(), + parked_queue: Queue::new(), + num_senders: AtomicUsize::new(1), + recv_task: Mutex::new(ReceiverTask { + unparked: false, + task: None, + }), + }); + + let tx = Sender { + inner: inner.clone(), + sender_task: Arc::new(Mutex::new(SenderTask::new())), + maybe_parked: false, + }; + + let rx = Receiver { + inner: inner, + }; + + (tx, rx) +} + +/* + * + * ===== impl Sender ===== + * + */ + +impl<T> Sender<T> { + /// Attempts to send a message on this `Sender<T>` without blocking. + /// + /// This function, unlike `start_send`, is safe to call whether it's being + /// called on a task or not. Note that this function, however, will *not* + /// attempt to block the current task if the message cannot be sent. + /// + /// It is not recommended to call this function from inside of a future, + /// only from an external thread where you've otherwise arranged to be + /// notified when the channel is no longer full. + pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> { + // If the sender is currently blocked, reject the message + if !self.poll_unparked(false).is_ready() { + return Err(TrySendError { + kind: TrySendErrorKind::Full(msg), + }); + } + + // The channel has capacity to accept the message, so send it + self.do_send(Some(msg), false) + .map_err(|SendError(v)| { + TrySendError { + kind: TrySendErrorKind::Disconnected(v), + } + }) + } + + // Do the send without failing + // None means close + fn do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>> { + // First, increment the number of messages contained by the channel. + // This operation will also atomically determine if the sender task + // should be parked. + // + // None is returned in the case that the channel has been closed by the + // receiver. This happens when `Receiver::close` is called or the + // receiver is dropped. + let park_self = match self.inc_num_messages(msg.is_none()) { + Some(park_self) => park_self, + None => { + // The receiver has closed the channel. Only abort if actually + // sending a message. It is important that the stream + // termination (None) is always sent. This technically means + // that it is possible for the queue to contain the following + // number of messages: + // + // num-senders + buffer + 1 + // + if let Some(msg) = msg { + return Err(SendError(msg)); + } else { + return Ok(()); + } + } + }; + + // If the channel has reached capacity, then the sender task needs to + // be parked. This will send the task handle on the parked task queue. + // + // However, when `do_send` is called while dropping the `Sender`, + // `task::current()` can't be called safely. In this case, in order to + // maintain internal consistency, a blank message is pushed onto the + // parked task queue. + if park_self { + self.park(do_park); + } + + self.queue_push_and_signal(msg); + + Ok(()) + } + + // Do the send without parking current task. + // + // To be called from unbounded sender. + fn do_send_nb(&self, msg: T) -> Result<(), SendError<T>> { + match self.inc_num_messages(false) { + Some(park_self) => assert!(!park_self), + None => return Err(SendError(msg)), + }; + + self.queue_push_and_signal(Some(msg)); + + Ok(()) + } + + // Push message to the queue and signal to the receiver + fn queue_push_and_signal(&self, msg: Option<T>) { + // Push the message onto the message queue + self.inner.message_queue.push(msg); + + // Signal to the receiver that a message has been enqueued. If the + // receiver is parked, this will unpark the task. + self.signal(); + } + + // Increment the number of queued messages. Returns if the sender should + // block. + fn inc_num_messages(&self, close: bool) -> Option<bool> { + let mut curr = self.inner.state.load(SeqCst); + + loop { + let mut state = decode_state(curr); + + // The receiver end closed the channel. + if !state.is_open { + return None; + } + + // This probably is never hit? Odds are the process will run out of + // memory first. It may be worth to return something else in this + // case? + assert!(state.num_messages < MAX_CAPACITY, "buffer space exhausted; \ + sending this messages would overflow the state"); + + state.num_messages += 1; + + // The channel is closed by all sender handles being dropped. + if close { + state.is_open = false; + } + + let next = encode_state(&state); + match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { + Ok(_) => { + // Block if the current number of pending messages has exceeded + // the configured buffer size + let park_self = match self.inner.buffer { + Some(buffer) => state.num_messages > buffer, + None => false, + }; + + return Some(park_self) + } + Err(actual) => curr = actual, + } + } + } + + // Signal to the receiver task that a message has been enqueued + fn signal(&self) { + // TODO + // This logic can probably be improved by guarding the lock with an + // atomic. + // + // Do this step first so that the lock is dropped when + // `unpark` is called + let task = { + let mut recv_task = self.inner.recv_task.lock().unwrap(); + + // If the receiver has already been unparked, then there is nothing + // more to do + if recv_task.unparked { + return; + } + + // Setting this flag enables the receiving end to detect that + // an unpark event happened in order to avoid unnecessarily + // parking. + recv_task.unparked = true; + recv_task.task.take() + }; + + if let Some(task) = task { + task.notify(); + } + } + + fn park(&mut self, can_park: bool) { + // TODO: clean up internal state if the task::current will fail + + let task = if can_park { + Some(task::current()) + } else { + None + }; + + { + let mut sender = self.sender_task.lock().unwrap(); + sender.task = task; + sender.is_parked = true; + } + + // Send handle over queue + let t = self.sender_task.clone(); + self.inner.parked_queue.push(t); + + // Check to make sure we weren't closed after we sent our task on the + // queue + let state = decode_state(self.inner.state.load(SeqCst)); + self.maybe_parked = state.is_open; + } + + /// Polls the channel to determine if there is guaranteed to be capacity to send at least one + /// item without waiting. + /// + /// Returns `Ok(Async::Ready(_))` if there is sufficient capacity, or returns + /// `Ok(Async::NotReady)` if the channel is not guaranteed to have capacity. Returns + /// `Err(SendError(_))` if the receiver has been dropped. + /// + /// # Panics + /// + /// This method will panic if called from outside the context of a task or future. + pub fn poll_ready(&mut self) -> Poll<(), SendError<()>> { + let state = decode_state(self.inner.state.load(SeqCst)); + if !state.is_open { + return Err(SendError(())); + } + + Ok(self.poll_unparked(true)) + } + + /// Returns whether this channel is closed without needing a context. + pub fn is_closed(&self) -> bool { + !decode_state(self.inner.state.load(SeqCst)).is_open + } + + fn poll_unparked(&mut self, do_park: bool) -> Async<()> { + // First check the `maybe_parked` variable. This avoids acquiring the + // lock in most cases + if self.maybe_parked { + // Get a lock on the task handle + let mut task = self.sender_task.lock().unwrap(); + + if !task.is_parked { + self.maybe_parked = false; + return Async::Ready(()) + } + + // At this point, an unpark request is pending, so there will be an + // unpark sometime in the future. We just need to make sure that + // the correct task will be notified. + // + // Update the task in case the `Sender` has been moved to another + // task + task.task = if do_park { + Some(task::current()) + } else { + None + }; + + Async::NotReady + } else { + Async::Ready(()) + } + } +} + +impl<T> Sink for Sender<T> { + type SinkItem = T; + type SinkError = SendError<T>; + + fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> { + // If the sender is currently blocked, reject the message before doing + // any work. + if !self.poll_unparked(true).is_ready() { + return Ok(AsyncSink::NotReady(msg)); + } + + // The channel has capacity to accept the message, so send it. + self.do_send(Some(msg), true)?; + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), SendError<T>> { + self.poll_ready() + // At this point, the value cannot be returned and `SendError` + // cannot be created with a `T` without breaking backwards + // comptibility. This means we cannot return an error. + // + // That said, there is also no guarantee that a `poll_complete` + // returning `Ok` implies the receiver sees the message. + .or_else(|_| Ok(().into())) + } + + fn close(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } +} + +impl<T> UnboundedSender<T> { + /// Returns whether this channel is closed without needing a context. + pub fn is_closed(&self) -> bool { + self.0.is_closed() + } + + /// Sends the provided message along this channel. + /// + /// This is an unbounded sender, so this function differs from `Sink::send` + /// by ensuring the return type reflects that the channel is always ready to + /// receive messages. + #[deprecated(note = "renamed to `unbounded_send`")] + #[doc(hidden)] + pub fn send(&self, msg: T) -> Result<(), SendError<T>> { + self.unbounded_send(msg) + } + + /// Sends the provided message along this channel. + /// + /// This is an unbounded sender, so this function differs from `Sink::send` + /// by ensuring the return type reflects that the channel is always ready to + /// receive messages. + pub fn unbounded_send(&self, msg: T) -> Result<(), SendError<T>> { + self.0.do_send_nb(msg) + } +} + +impl<T> Sink for UnboundedSender<T> { + type SinkItem = T; + type SinkError = SendError<T>; + + fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> { + self.0.start_send(msg) + } + + fn poll_complete(&mut self) -> Poll<(), SendError<T>> { + self.0.poll_complete() + } + + fn close(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } +} + +impl<'a, T> Sink for &'a UnboundedSender<T> { + type SinkItem = T; + type SinkError = SendError<T>; + + fn start_send(&mut self, msg: T) -> StartSend<T, SendError<T>> { + self.0.do_send_nb(msg)?; + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } + + fn close(&mut self) -> Poll<(), SendError<T>> { + Ok(Async::Ready(())) + } +} + +impl<T> Clone for UnboundedSender<T> { + fn clone(&self) -> UnboundedSender<T> { + UnboundedSender(self.0.clone()) + } +} + + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Sender<T> { + // Since this atomic op isn't actually guarding any memory and we don't + // care about any orderings besides the ordering on the single atomic + // variable, a relaxed ordering is acceptable. + let mut curr = self.inner.num_senders.load(SeqCst); + + loop { + // If the maximum number of senders has been reached, then fail + if curr == self.inner.max_senders() { + panic!("cannot clone `Sender` -- too many outstanding senders"); + } + + debug_assert!(curr < self.inner.max_senders()); + + let next = curr + 1; + let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst); + + // The ABA problem doesn't matter here. We only care that the + // number of senders never exceeds the maximum. + if actual == curr { + return Sender { + inner: self.inner.clone(), + sender_task: Arc::new(Mutex::new(SenderTask::new())), + maybe_parked: false, + }; + } + + curr = actual; + } + } +} + +impl<T> Drop for Sender<T> { + fn drop(&mut self) { + // Ordering between variables don't matter here + let prev = self.inner.num_senders.fetch_sub(1, SeqCst); + + if prev == 1 { + let _ = self.do_send(None, false); + } + } +} + +/* + * + * ===== impl Receiver ===== + * + */ + +impl<T> Receiver<T> { + /// Closes the receiving half + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + let mut curr = self.inner.state.load(SeqCst); + + loop { + let mut state = decode_state(curr); + + if !state.is_open { + break + } + + state.is_open = false; + + let next = encode_state(&state); + match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { + Ok(_) => break, + Err(actual) => curr = actual, + } + } + + // Wake up any threads waiting as they'll see that we've closed the + // channel and will continue on their merry way. + loop { + match unsafe { self.inner.parked_queue.pop() } { + PopResult::Data(task) => { + task.lock().unwrap().notify(); + } + PopResult::Empty => break, + PopResult::Inconsistent => thread::yield_now(), + } + } + } + + fn next_message(&mut self) -> Async<Option<T>> { + // Pop off a message + loop { + match unsafe { self.inner.message_queue.pop() } { + PopResult::Data(msg) => { + // If there are any parked task handles in the parked queue, + // pop one and unpark it. + self.unpark_one(); + // Decrement number of messages + self.dec_num_messages(); + + return Async::Ready(msg); + } + PopResult::Empty => { + // The queue is empty, return NotReady + return Async::NotReady; + } + PopResult::Inconsistent => { + // Inconsistent means that there will be a message to pop + // in a short time. This branch can only be reached if + // values are being produced from another thread, so there + // are a few ways that we can deal with this: + // + // 1) Spin + // 2) thread::yield_now() + // 3) task::current().unwrap() & return NotReady + // + // For now, thread::yield_now() is used, but it would + // probably be better to spin a few times then yield. + thread::yield_now(); + } + } + } + } + + // Unpark a single task handle if there is one pending in the parked queue + fn unpark_one(&mut self) { + loop { + match unsafe { self.inner.parked_queue.pop() } { + PopResult::Data(task) => { + task.lock().unwrap().notify(); + return; + } + PopResult::Empty => { + // Queue empty, no task to wake up. + return; + } + PopResult::Inconsistent => { + // Same as above + thread::yield_now(); + } + } + } + } + + // Try to park the receiver task + fn try_park(&self) -> TryPark { + let curr = self.inner.state.load(SeqCst); + let state = decode_state(curr); + + // If the channel is closed, then there is no need to park. + if state.is_closed() { + return TryPark::Closed; + } + + // First, track the task in the `recv_task` slot + let mut recv_task = self.inner.recv_task.lock().unwrap(); + + if recv_task.unparked { + // Consume the `unpark` signal without actually parking + recv_task.unparked = false; + return TryPark::NotEmpty; + } + + recv_task.task = Some(task::current()); + TryPark::Parked + } + + fn dec_num_messages(&self) { + let mut curr = self.inner.state.load(SeqCst); + + loop { + let mut state = decode_state(curr); + + state.num_messages -= 1; + + let next = encode_state(&state); + match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) { + Ok(_) => break, + Err(actual) => curr = actual, + } + } + } +} + +impl<T> Stream for Receiver<T> { + type Item = T; + type Error = (); + + fn poll(&mut self) -> Poll<Option<T>, ()> { + loop { + // Try to read a message off of the message queue. + match self.next_message() { + Async::Ready(msg) => return Ok(Async::Ready(msg)), + Async::NotReady => { + // There are no messages to read, in this case, attempt to + // park. The act of parking will verify that the channel is + // still empty after the park operation has completed. + match self.try_park() { + TryPark::Parked => { + // The task was parked, and the channel is still + // empty, return NotReady. + return Ok(Async::NotReady); + } + TryPark::Closed => { + // The channel is closed, there will be no further + // messages. + return Ok(Async::Ready(None)); + } + TryPark::NotEmpty => { + // A message has been sent while attempting to + // park. Loop again, the next iteration is + // guaranteed to get the message. + continue; + } + } + } + } + } + } +} + +impl<T> Drop for Receiver<T> { + fn drop(&mut self) { + // Drain the channel of all pending messages + self.close(); + + loop { + match self.next_message() { + Async::Ready(_) => {} + Async::NotReady => { + let curr = self.inner.state.load(SeqCst); + let state = decode_state(curr); + + // If the channel is closed, then there is no need to park. + if state.is_closed() { + return; + } + + // TODO: Spinning isn't ideal, it might be worth + // investigating using a condvar or some other strategy + // here. That said, if this case is hit, then another thread + // is about to push the value into the queue and this isn't + // the only spinlock in the impl right now. + thread::yield_now(); + } + } + } + } +} + +impl<T> UnboundedReceiver<T> { + /// Closes the receiving half + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.0.close(); + } +} + +impl<T> Stream for UnboundedReceiver<T> { + type Item = T; + type Error = (); + + fn poll(&mut self) -> Poll<Option<T>, ()> { + self.0.poll() + } +} + +/// Handle returned from the `spawn` function. +/// +/// This handle is a stream that proxies a stream on a separate `Executor`. +/// Created through the `mpsc::spawn` function, this handle will produce +/// the same values as the proxied stream, as they are produced in the executor, +/// and uses a limited buffer to exert back-pressure on the remote stream. +/// +/// If this handle is dropped, then the stream will no longer be polled and is +/// scheduled to be dropped. +pub struct SpawnHandle<Item, Error> { + rx: Receiver<Result<Item, Error>>, + _cancel_tx: oneshot::Sender<()>, +} + +/// Type of future which `Executor` instances must be able to execute for `spawn`. +pub struct Execute<S: Stream> { + inner: SendAll<Sender<Result<S::Item, S::Error>>, Results<S, SendError<Result<S::Item, S::Error>>>>, + cancel_rx: oneshot::Receiver<()>, +} + +/// Spawns a `stream` onto the instance of `Executor` provided, `executor`, +/// returning a handle representing the remote stream. +/// +/// The `stream` will be canceled if the `SpawnHandle` is dropped. +/// +/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself. +/// When `stream` has additional items available, then the `SpawnHandle` +/// will have those same items available. +/// +/// At most `buffer + 1` elements will be buffered at a time. If the buffer +/// is full, then `stream` will stop progressing until more space is available. +/// This allows the `SpawnHandle` to exert backpressure on the `stream`. +/// +/// # Panics +/// +/// This function will panic if `executor` is unable spawn a `Future` containing +/// the entirety of the `stream`. +pub fn spawn<S, E>(stream: S, executor: &E, buffer: usize) -> SpawnHandle<S::Item, S::Error> + where S: Stream, + E: Executor<Execute<S>> +{ + let (cancel_tx, cancel_rx) = oneshot::channel(); + let (tx, rx) = channel(buffer); + executor.execute(Execute { + inner: tx.send_all(resultstream::new(stream)), + cancel_rx: cancel_rx, + }).expect("failed to spawn stream"); + SpawnHandle { + rx: rx, + _cancel_tx: cancel_tx, + } +} + +/// Spawns a `stream` onto the instance of `Executor` provided, `executor`, +/// returning a handle representing the remote stream, with unbounded buffering. +/// +/// The `stream` will be canceled if the `SpawnHandle` is dropped. +/// +/// The `SpawnHandle` returned is a stream that is a proxy for `stream` itself. +/// When `stream` has additional items available, then the `SpawnHandle` +/// will have those same items available. +/// +/// An unbounded buffer is used, which means that values will be buffered as +/// fast as `stream` can produce them, without any backpressure. Therefore, if +/// `stream` is an infinite stream, it can use an unbounded amount of memory, and +/// potentially hog CPU resources. +/// +/// # Panics +/// +/// This function will panic if `executor` is unable spawn a `Future` containing +/// the entirety of the `stream`. +pub fn spawn_unbounded<S, E>(stream: S, executor: &E) -> SpawnHandle<S::Item, S::Error> + where S: Stream, + E: Executor<Execute<S>> +{ + let (cancel_tx, cancel_rx) = oneshot::channel(); + let (tx, rx) = channel2(None); + executor.execute(Execute { + inner: tx.send_all(resultstream::new(stream)), + cancel_rx: cancel_rx, + }).expect("failed to spawn stream"); + SpawnHandle { + rx: rx, + _cancel_tx: cancel_tx, + } +} + +impl<I, E> Stream for SpawnHandle<I, E> { + type Item = I; + type Error = E; + + fn poll(&mut self) -> Poll<Option<I>, E> { + match self.rx.poll() { + Ok(Async::Ready(Some(Ok(t)))) => Ok(Async::Ready(Some(t.into()))), + Ok(Async::Ready(Some(Err(e)))) => Err(e), + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => unreachable!("mpsc::Receiver should never return Err"), + } + } +} + +impl<I, E> fmt::Debug for SpawnHandle<I, E> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("SpawnHandle") + .finish() + } +} + +impl<S: Stream> Future for Execute<S> { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + match self.cancel_rx.poll() { + Ok(Async::NotReady) => (), + _ => return Ok(Async::Ready(())), + } + match self.inner.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + _ => Ok(Async::Ready(())) + } + } +} + +impl<S: Stream> fmt::Debug for Execute<S> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Execute") + .finish() + } +} + +/* + * + * ===== impl Inner ===== + * + */ + +impl<T> Inner<T> { + // The return value is such that the total number of messages that can be + // enqueued into the channel will never exceed MAX_CAPACITY + fn max_senders(&self) -> usize { + match self.buffer { + Some(buffer) => MAX_CAPACITY - buffer, + None => MAX_BUFFER, + } + } +} + +unsafe impl<T: Send> Send for Inner<T> {} +unsafe impl<T: Send> Sync for Inner<T> {} + +impl State { + fn is_closed(&self) -> bool { + !self.is_open && self.num_messages == 0 + } +} + +/* + * + * ===== Helpers ===== + * + */ + +fn decode_state(num: usize) -> State { + State { + is_open: num & OPEN_MASK == OPEN_MASK, + num_messages: num & MAX_CAPACITY, + } +} + +fn encode_state(state: &State) -> usize { + let mut num = state.num_messages; + + if state.is_open { + num |= OPEN_MASK; + } + + num +} diff --git a/third_party/rust/futures-0.1.31/src/sync/mpsc/queue.rs b/third_party/rust/futures-0.1.31/src/sync/mpsc/queue.rs new file mode 100644 index 0000000000..9ff6bcf873 --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/sync/mpsc/queue.rs @@ -0,0 +1,151 @@ +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +//! A mostly lock-free multi-producer, single consumer queue. +//! +//! This module contains an implementation of a concurrent MPSC queue. This +//! queue can be used to share data between threads, and is also used as the +//! building block of channels in rust. +//! +//! Note that the current implementation of this queue has a caveat of the `pop` +//! method, and see the method for more information about it. Due to this +//! caveat, this queue may not be appropriate for all use-cases. + +// http://www.1024cores.net/home/lock-free-algorithms +// /queues/non-intrusive-mpsc-node-based-queue + +// NOTE: this implementation is lifted from the standard library and only +// slightly modified + +pub use self::PopResult::*; +use std::prelude::v1::*; + +use std::cell::UnsafeCell; +use std::ptr; +use std::sync::atomic::{AtomicPtr, Ordering}; + +/// A result of the `pop` function. +pub enum PopResult<T> { + /// Some data has been popped + Data(T), + /// The queue is empty + Empty, + /// The queue is in an inconsistent state. Popping data should succeed, but + /// some pushers have yet to make enough progress in order allow a pop to + /// succeed. It is recommended that a pop() occur "in the near future" in + /// order to see if the sender has made progress or not + Inconsistent, +} + +#[derive(Debug)] +struct Node<T> { + next: AtomicPtr<Node<T>>, + value: Option<T>, +} + +/// The multi-producer single-consumer structure. This is not cloneable, but it +/// may be safely shared so long as it is guaranteed that there is only one +/// popper at a time (many pushers are allowed). +#[derive(Debug)] +pub struct Queue<T> { + head: AtomicPtr<Node<T>>, + tail: UnsafeCell<*mut Node<T>>, +} + +unsafe impl<T: Send> Send for Queue<T> { } +unsafe impl<T: Send> Sync for Queue<T> { } + +impl<T> Node<T> { + unsafe fn new(v: Option<T>) -> *mut Node<T> { + Box::into_raw(Box::new(Node { + next: AtomicPtr::new(ptr::null_mut()), + value: v, + })) + } +} + +impl<T> Queue<T> { + /// Creates a new queue that is safe to share among multiple producers and + /// one consumer. + pub fn new() -> Queue<T> { + let stub = unsafe { Node::new(None) }; + Queue { + head: AtomicPtr::new(stub), + tail: UnsafeCell::new(stub), + } + } + + /// Pushes a new value onto this queue. + pub fn push(&self, t: T) { + unsafe { + let n = Node::new(Some(t)); + let prev = self.head.swap(n, Ordering::AcqRel); + (*prev).next.store(n, Ordering::Release); + } + } + + /// Pops some data from this queue. + /// + /// Note that the current implementation means that this function cannot + /// return `Option<T>`. It is possible for this queue to be in an + /// inconsistent state where many pushes have succeeded and completely + /// finished, but pops cannot return `Some(t)`. This inconsistent state + /// happens when a pusher is preempted at an inopportune moment. + /// + /// This inconsistent state means that this queue does indeed have data, but + /// it does not currently have access to it at this time. + /// + /// This function is unsafe because only one thread can call it at a time. + pub unsafe fn pop(&self) -> PopResult<T> { + let tail = *self.tail.get(); + let next = (*tail).next.load(Ordering::Acquire); + + if !next.is_null() { + *self.tail.get() = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take().unwrap(); + drop(Box::from_raw(tail)); + return Data(ret); + } + + if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent} + } +} + +impl<T> Drop for Queue<T> { + fn drop(&mut self) { + unsafe { + let mut cur = *self.tail.get(); + while !cur.is_null() { + let next = (*cur).next.load(Ordering::Relaxed); + drop(Box::from_raw(cur)); + cur = next; + } + } + } +} diff --git a/third_party/rust/futures-0.1.31/src/sync/oneshot.rs b/third_party/rust/futures-0.1.31/src/sync/oneshot.rs new file mode 100644 index 0000000000..3a9d8efdca --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/sync/oneshot.rs @@ -0,0 +1,611 @@ +//! A one-shot, futures-aware channel + +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering::SeqCst; +use std::error::Error; +use std::fmt; + +use {Future, Poll, Async}; +use future::{lazy, Lazy, Executor, IntoFuture}; +use lock::Lock; +use task::{self, Task}; + +/// A future representing the completion of a computation happening elsewhere in +/// memory. +/// +/// This is created by the `oneshot::channel` function. +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct Receiver<T> { + inner: Arc<Inner<T>>, +} + +/// Represents the completion half of a oneshot through which the result of a +/// computation is signaled. +/// +/// This is created by the `oneshot::channel` function. +#[derive(Debug)] +pub struct Sender<T> { + inner: Arc<Inner<T>>, +} + +/// Internal state of the `Receiver`/`Sender` pair above. This is all used as +/// the internal synchronization between the two for send/recv operations. +#[derive(Debug)] +struct Inner<T> { + /// Indicates whether this oneshot is complete yet. This is filled in both + /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it + /// appropriately. + /// + /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is + /// unlocked and ready to be inspected. + /// + /// For `Sender` if this is `true` then the oneshot has gone away and it + /// can return ready from `poll_cancel`. + complete: AtomicBool, + + /// The actual data being transferred as part of this `Receiver`. This is + /// filled in by `Sender::complete` and read by `Receiver::poll`. + /// + /// Note that this is protected by `Lock`, but it is in theory safe to + /// replace with an `UnsafeCell` as it's actually protected by `complete` + /// above. I wouldn't recommend doing this, however, unless someone is + /// supremely confident in the various atomic orderings here and there. + data: Lock<Option<T>>, + + /// Field to store the task which is blocked in `Receiver::poll`. + /// + /// This is filled in when a oneshot is polled but not ready yet. Note that + /// the `Lock` here, unlike in `data` above, is important to resolve races. + /// Both the `Receiver` and the `Sender` halves understand that if they + /// can't acquire the lock then some important interference is happening. + rx_task: Lock<Option<Task>>, + + /// Like `rx_task` above, except for the task blocked in + /// `Sender::poll_cancel`. Additionally, `Lock` cannot be `UnsafeCell`. + tx_task: Lock<Option<Task>>, +} + +/// Creates a new futures-aware, one-shot channel. +/// +/// This function is similar to Rust's channels found in the standard library. +/// Two halves are returned, the first of which is a `Sender` handle, used to +/// signal the end of a computation and provide its value. The second half is a +/// `Receiver` which implements the `Future` trait, resolving to the value that +/// was given to the `Sender` handle. +/// +/// Each half can be separately owned and sent across threads/tasks. +/// +/// # Examples +/// +/// ``` +/// use std::thread; +/// use futures::sync::oneshot; +/// use futures::*; +/// +/// let (p, c) = oneshot::channel::<i32>(); +/// +/// thread::spawn(|| { +/// c.map(|i| { +/// println!("got: {}", i); +/// }).wait(); +/// }); +/// +/// p.send(3).unwrap(); +/// ``` +pub fn channel<T>() -> (Sender<T>, Receiver<T>) { + let inner = Arc::new(Inner::new()); + let receiver = Receiver { + inner: inner.clone(), + }; + let sender = Sender { + inner: inner, + }; + (sender, receiver) +} + +impl<T> Inner<T> { + fn new() -> Inner<T> { + Inner { + complete: AtomicBool::new(false), + data: Lock::new(None), + rx_task: Lock::new(None), + tx_task: Lock::new(None), + } + } + + fn send(&self, t: T) -> Result<(), T> { + if self.complete.load(SeqCst) { + return Err(t) + } + + // Note that this lock acquisition may fail if the receiver + // is closed and sets the `complete` flag to true, whereupon + // the receiver may call `poll()`. + if let Some(mut slot) = self.data.try_lock() { + assert!(slot.is_none()); + *slot = Some(t); + drop(slot); + + // If the receiver called `close()` between the check at the + // start of the function, and the lock being released, then + // the receiver may not be around to receive it, so try to + // pull it back out. + if self.complete.load(SeqCst) { + // If lock acquisition fails, then receiver is actually + // receiving it, so we're good. + if let Some(mut slot) = self.data.try_lock() { + if let Some(t) = slot.take() { + return Err(t); + } + } + } + Ok(()) + } else { + // Must have been closed + Err(t) + } + } + + fn poll_cancel(&self) -> Poll<(), ()> { + // Fast path up first, just read the flag and see if our other half is + // gone. This flag is set both in our destructor and the oneshot + // destructor, but our destructor hasn't run yet so if it's set then the + // oneshot is gone. + if self.complete.load(SeqCst) { + return Ok(Async::Ready(())) + } + + // If our other half is not gone then we need to park our current task + // and move it into the `notify_cancel` slot to get notified when it's + // actually gone. + // + // If `try_lock` fails, then the `Receiver` is in the process of using + // it, so we can deduce that it's now in the process of going away and + // hence we're canceled. If it succeeds then we just store our handle. + // + // Crucially we then check `oneshot_gone` *again* before we return. + // While we were storing our handle inside `notify_cancel` the `Receiver` + // may have been dropped. The first thing it does is set the flag, and + // if it fails to acquire the lock it assumes that we'll see the flag + // later on. So... we then try to see the flag later on! + let handle = task::current(); + match self.tx_task.try_lock() { + Some(mut p) => *p = Some(handle), + None => return Ok(Async::Ready(())), + } + if self.complete.load(SeqCst) { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } + } + + fn is_canceled(&self) -> bool { + self.complete.load(SeqCst) + } + + fn drop_tx(&self) { + // Flag that we're a completed `Sender` and try to wake up a receiver. + // Whether or not we actually stored any data will get picked up and + // translated to either an item or cancellation. + // + // Note that if we fail to acquire the `rx_task` lock then that means + // we're in one of two situations: + // + // 1. The receiver is trying to block in `poll` + // 2. The receiver is being dropped + // + // In the first case it'll check the `complete` flag after it's done + // blocking to see if it succeeded. In the latter case we don't need to + // wake up anyone anyway. So in both cases it's ok to ignore the `None` + // case of `try_lock` and bail out. + // + // The first case crucially depends on `Lock` using `SeqCst` ordering + // under the hood. If it instead used `Release` / `Acquire` ordering, + // then it would not necessarily synchronize with `inner.complete` + // and deadlock might be possible, as was observed in + // https://github.com/rust-lang-nursery/futures-rs/pull/219. + self.complete.store(true, SeqCst); + if let Some(mut slot) = self.rx_task.try_lock() { + if let Some(task) = slot.take() { + drop(slot); + task.notify(); + } + } + } + + fn close_rx(&self) { + // Flag our completion and then attempt to wake up the sender if it's + // blocked. See comments in `drop` below for more info + self.complete.store(true, SeqCst); + if let Some(mut handle) = self.tx_task.try_lock() { + if let Some(task) = handle.take() { + drop(handle); + task.notify() + } + } + } + + fn try_recv(&self) -> Result<Option<T>, Canceled> { + // If we're complete, either `::close_rx` or `::drop_tx` was called. + // We can assume a successful send if data is present. + if self.complete.load(SeqCst) { + if let Some(mut slot) = self.data.try_lock() { + if let Some(data) = slot.take() { + return Ok(Some(data.into())); + } + } + // Should there be a different error value or a panic in the case + // where `self.data.try_lock() == None`? + Err(Canceled) + } else { + Ok(None) + } + } + + fn recv(&self) -> Poll<T, Canceled> { + let mut done = false; + + // Check to see if some data has arrived. If it hasn't then we need to + // block our task. + // + // Note that the acquisition of the `rx_task` lock might fail below, but + // the only situation where this can happen is during `Sender::drop` + // when we are indeed completed already. If that's happening then we + // know we're completed so keep going. + if self.complete.load(SeqCst) { + done = true; + } else { + let task = task::current(); + match self.rx_task.try_lock() { + Some(mut slot) => *slot = Some(task), + None => done = true, + } + } + + // If we're `done` via one of the paths above, then look at the data and + // figure out what the answer is. If, however, we stored `rx_task` + // successfully above we need to check again if we're completed in case + // a message was sent while `rx_task` was locked and couldn't notify us + // otherwise. + // + // If we're not done, and we're not complete, though, then we've + // successfully blocked our task and we return `NotReady`. + if done || self.complete.load(SeqCst) { + // If taking the lock fails, the sender will realise that the we're + // `done` when it checks the `complete` flag on the way out, and will + // treat the send as a failure. + if let Some(mut slot) = self.data.try_lock() { + if let Some(data) = slot.take() { + return Ok(data.into()); + } + } + Err(Canceled) + } else { + Ok(Async::NotReady) + } + } + + fn drop_rx(&self) { + // Indicate to the `Sender` that we're done, so any future calls to + // `poll_cancel` are weeded out. + self.complete.store(true, SeqCst); + + // If we've blocked a task then there's no need for it to stick around, + // so we need to drop it. If this lock acquisition fails, though, then + // it's just because our `Sender` is trying to take the task, so we + // let them take care of that. + if let Some(mut slot) = self.rx_task.try_lock() { + let task = slot.take(); + drop(slot); + drop(task); + } + + // Finally, if our `Sender` wants to get notified of us going away, it + // would have stored something in `tx_task`. Here we try to peel that + // out and unpark it. + // + // Note that the `try_lock` here may fail, but only if the `Sender` is + // in the process of filling in the task. If that happens then we + // already flagged `complete` and they'll pick that up above. + if let Some(mut handle) = self.tx_task.try_lock() { + if let Some(task) = handle.take() { + drop(handle); + task.notify() + } + } + } +} + +impl<T> Sender<T> { + #[deprecated(note = "renamed to `send`", since = "0.1.11")] + #[doc(hidden)] + #[cfg(feature = "with-deprecated")] + pub fn complete(self, t: T) { + drop(self.send(t)); + } + + /// Completes this oneshot with a successful result. + /// + /// This function will consume `self` and indicate to the other end, the + /// `Receiver`, that the value provided is the result of the computation this + /// represents. + /// + /// If the value is successfully enqueued for the remote end to receive, + /// then `Ok(())` is returned. If the receiving end was deallocated before + /// this function was called, however, then `Err` is returned with the value + /// provided. + pub fn send(self, t: T) -> Result<(), T> { + self.inner.send(t) + } + + /// Polls this `Sender` half to detect whether the `Receiver` this has + /// paired with has gone away. + /// + /// This function can be used to learn about when the `Receiver` (consumer) + /// half has gone away and nothing will be able to receive a message sent + /// from `send`. + /// + /// If `Ready` is returned then it means that the `Receiver` has disappeared + /// and the result this `Sender` would otherwise produce should no longer + /// be produced. + /// + /// If `NotReady` is returned then the `Receiver` is still alive and may be + /// able to receive a message if sent. The current task, however, is + /// scheduled to receive a notification if the corresponding `Receiver` goes + /// away. + /// + /// # Panics + /// + /// Like `Future::poll`, this function will panic if it's not called from + /// within the context of a task. In other words, this should only ever be + /// called from inside another future. + /// + /// If `Ok(Ready)` is returned then the associated `Receiver` has been + /// dropped, which means any work required for sending should be canceled. + /// + /// If you're calling this function from a context that does not have a + /// task, then you can use the `is_canceled` API instead. + pub fn poll_cancel(&mut self) -> Poll<(), ()> { + self.inner.poll_cancel() + } + + /// Tests to see whether this `Sender`'s corresponding `Receiver` + /// has gone away. + /// + /// This function can be used to learn about when the `Receiver` (consumer) + /// half has gone away and nothing will be able to receive a message sent + /// from `send`. + /// + /// Note that this function is intended to *not* be used in the context of a + /// future. If you're implementing a future you probably want to call the + /// `poll_cancel` function which will block the current task if the + /// cancellation hasn't happened yet. This can be useful when working on a + /// non-futures related thread, though, which would otherwise panic if + /// `poll_cancel` were called. + pub fn is_canceled(&self) -> bool { + self.inner.is_canceled() + } +} + +impl<T> Drop for Sender<T> { + fn drop(&mut self) { + self.inner.drop_tx() + } +} + +/// Error returned from a `Receiver<T>` whenever the corresponding `Sender<T>` +/// is dropped. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct Canceled; + +impl fmt::Display for Canceled { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "oneshot canceled") + } +} + +impl Error for Canceled { + fn description(&self) -> &str { + "oneshot canceled" + } +} + +impl<T> Receiver<T> { + /// Gracefully close this receiver, preventing sending any future messages. + /// + /// Any `send` operation which happens after this method returns is + /// guaranteed to fail. Once this method is called the normal `poll` method + /// can be used to determine whether a message was actually sent or not. If + /// `Canceled` is returned from `poll` then no message was sent. + pub fn close(&mut self) { + self.inner.close_rx() + } + + /// Attempts to receive a message outside of the context of a task. + /// + /// Useful when a [`Context`](Context) is not available such as within a + /// `Drop` impl. + /// + /// Does not schedule a task wakeup or have any other side effects. + /// + /// A return value of `None` must be considered immediately stale (out of + /// date) unless [`::close`](Receiver::close) has been called first. + /// + /// Returns an error if the sender was dropped. + pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> { + self.inner.try_recv() + } +} + +impl<T> Future for Receiver<T> { + type Item = T; + type Error = Canceled; + + fn poll(&mut self) -> Poll<T, Canceled> { + self.inner.recv() + } +} + +impl<T> Drop for Receiver<T> { + fn drop(&mut self) { + self.inner.drop_rx() + } +} + +/// Handle returned from the `spawn` function. +/// +/// This handle is a future representing the completion of a different future on +/// a separate executor. Created through the `oneshot::spawn` function this +/// handle will resolve when the future provided to `spawn` resolves on the +/// `Executor` instance provided to that function. +/// +/// If this handle is dropped then the future will automatically no longer be +/// polled and is scheduled to be dropped. This can be canceled with the +/// `forget` function, however. +pub struct SpawnHandle<T, E> { + rx: Arc<ExecuteInner<Result<T, E>>>, +} + +struct ExecuteInner<T> { + inner: Inner<T>, + keep_running: AtomicBool, +} + +/// Type of future which `Execute` instances below must be able to spawn. +pub struct Execute<F: Future> { + future: F, + tx: Arc<ExecuteInner<Result<F::Item, F::Error>>>, +} + +/// Spawns a `future` onto the instance of `Executor` provided, `executor`, +/// returning a handle representing the completion of the future. +/// +/// The `SpawnHandle` returned is a future that is a proxy for `future` itself. +/// When `future` completes on `executor` then the `SpawnHandle` will itself be +/// resolved. Internally `SpawnHandle` contains a `oneshot` channel and is +/// thus safe to send across threads. +/// +/// The `future` will be canceled if the `SpawnHandle` is dropped. If this is +/// not desired then the `SpawnHandle::forget` function can be used to continue +/// running the future to completion. +/// +/// # Panics +/// +/// This function will panic if the instance of `Spawn` provided is unable to +/// spawn the `future` provided. +/// +/// If the provided instance of `Spawn` does not actually run `future` to +/// completion, then the returned handle may panic when polled. Typically this +/// is not a problem, though, as most instances of `Spawn` will run futures to +/// completion. +/// +/// Note that the returned future will likely panic if the `futures` provided +/// panics. If a future running on an executor panics that typically means that +/// the executor drops the future, which falls into the above case of not +/// running the future to completion essentially. +pub fn spawn<F, E>(future: F, executor: &E) -> SpawnHandle<F::Item, F::Error> + where F: Future, + E: Executor<Execute<F>>, +{ + let data = Arc::new(ExecuteInner { + inner: Inner::new(), + keep_running: AtomicBool::new(false), + }); + executor.execute(Execute { + future: future, + tx: data.clone(), + }).expect("failed to spawn future"); + SpawnHandle { rx: data } +} + +/// Spawns a function `f` onto the `Spawn` instance provided `s`. +/// +/// For more information see the `spawn` function in this module. This function +/// is just a thin wrapper around `spawn` which will execute the closure on the +/// executor provided and then complete the future that the closure returns. +pub fn spawn_fn<F, R, E>(f: F, executor: &E) -> SpawnHandle<R::Item, R::Error> + where F: FnOnce() -> R, + R: IntoFuture, + E: Executor<Execute<Lazy<F, R>>>, +{ + spawn(lazy(f), executor) +} + +impl<T, E> SpawnHandle<T, E> { + /// Drop this future without canceling the underlying future. + /// + /// When `SpawnHandle` is dropped, the spawned future will be canceled as + /// well if the future hasn't already resolved. This function can be used + /// when to drop this future but keep executing the underlying future. + pub fn forget(self) { + self.rx.keep_running.store(true, SeqCst); + } +} + +impl<T, E> Future for SpawnHandle<T, E> { + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<T, E> { + match self.rx.inner.recv() { + Ok(Async::Ready(Ok(t))) => Ok(t.into()), + Ok(Async::Ready(Err(e))) => Err(e), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => panic!("future was canceled before completion"), + } + } +} + +impl<T: fmt::Debug, E: fmt::Debug> fmt::Debug for SpawnHandle<T, E> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("SpawnHandle") + .finish() + } +} + +impl<T, E> Drop for SpawnHandle<T, E> { + fn drop(&mut self) { + self.rx.inner.drop_rx(); + } +} + +impl<F: Future> Future for Execute<F> { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + // If we're canceled then we may want to bail out early. + // + // If the `forget` function was called, though, then we keep going. + if self.tx.inner.poll_cancel().unwrap().is_ready() { + if !self.tx.keep_running.load(SeqCst) { + return Ok(().into()) + } + } + + let result = match self.future.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(t)) => Ok(t), + Err(e) => Err(e), + }; + drop(self.tx.inner.send(result)); + Ok(().into()) + } +} + +impl<F: Future + fmt::Debug> fmt::Debug for Execute<F> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Execute") + .field("future", &self.future) + .finish() + } +} + +impl<F: Future> Drop for Execute<F> { + fn drop(&mut self) { + self.tx.inner.drop_tx(); + } +} |