diff options
Diffstat (limited to 'third_party/rust/tokio-executor/src')
-rw-r--r-- | third_party/rust/tokio-executor/src/enter.rs | 164 | ||||
-rw-r--r-- | third_party/rust/tokio-executor/src/error.rs | 50 | ||||
-rw-r--r-- | third_party/rust/tokio-executor/src/executor.rs | 151 | ||||
-rw-r--r-- | third_party/rust/tokio-executor/src/global.rs | 290 | ||||
-rw-r--r-- | third_party/rust/tokio-executor/src/lib.rs | 68 | ||||
-rw-r--r-- | third_party/rust/tokio-executor/src/park.rs | 226 | ||||
-rw-r--r-- | third_party/rust/tokio-executor/src/typed.rs | 181 |
7 files changed, 1130 insertions, 0 deletions
diff --git a/third_party/rust/tokio-executor/src/enter.rs b/third_party/rust/tokio-executor/src/enter.rs new file mode 100644 index 0000000000..1f8ef1a701 --- /dev/null +++ b/third_party/rust/tokio-executor/src/enter.rs @@ -0,0 +1,164 @@ +use std::cell::Cell; +use std::error::Error; +use std::fmt; +use std::prelude::v1::*; + +use futures::{self, Future}; + +thread_local!(static ENTERED: Cell<bool> = Cell::new(false)); + +/// Represents an executor context. +/// +/// For more details, see [`enter` documentation](fn.enter.html) +pub struct Enter { + on_exit: Vec<Box<dyn Callback>>, + permanent: bool, +} + +/// An error returned by `enter` if an execution scope has already been +/// entered. +pub struct EnterError { + _a: (), +} + +impl fmt::Debug for EnterError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("EnterError") + .field("reason", &self.description()) + .finish() + } +} + +impl fmt::Display for EnterError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.description()) + } +} + +impl Error for EnterError { + fn description(&self) -> &str { + "attempted to run an executor while another executor is already running" + } +} + +/// Marks the current thread as being within the dynamic extent of an +/// executor. +/// +/// Executor implementations should call this function before blocking the +/// thread. If `None` is returned, the executor should fail by panicking or +/// taking some other action without blocking the current thread. This prevents +/// deadlocks due to multiple executors competing for the same thread. +/// +/// # Error +/// +/// Returns an error if the current thread is already marked +pub fn enter() -> Result<Enter, EnterError> { + ENTERED.with(|c| { + if c.get() { + Err(EnterError { _a: () }) + } else { + c.set(true); + + Ok(Enter { + on_exit: Vec::new(), + permanent: false, + }) + } + }) +} + +// Forces the current "entered" state to be cleared while the closure +// is executed. +// +// # Warning +// +// This is hidden for a reason. Do not use without fully understanding +// executors. Misuing can easily cause your program to deadlock. +#[doc(hidden)] +pub fn exit<F: FnOnce() -> R, R>(f: F) -> R { + // Reset in case the closure panics + struct Reset; + impl Drop for Reset { + fn drop(&mut self) { + ENTERED.with(|c| { + c.set(true); + }); + } + } + + ENTERED.with(|c| { + debug_assert!(c.get()); + c.set(false); + }); + + let reset = Reset; + let ret = f(); + ::std::mem::forget(reset); + + ENTERED.with(|c| { + assert!(!c.get(), "closure claimed permanent executor"); + c.set(true); + }); + + ret +} + +impl Enter { + /// Register a callback to be invoked if and when the thread + /// ceased to act as an executor. + pub fn on_exit<F>(&mut self, f: F) + where + F: FnOnce() + 'static, + { + self.on_exit.push(Box::new(f)); + } + + /// Treat the remainder of execution on this thread as part of an + /// executor; used mostly for thread pool worker threads. + /// + /// All registered `on_exit` callbacks are *dropped* without being + /// invoked. + pub fn make_permanent(mut self) { + self.permanent = true; + } + + /// Blocks the thread on the specified future, returning the value with + /// which that future completes. + pub fn block_on<F: Future>(&mut self, f: F) -> Result<F::Item, F::Error> { + futures::executor::spawn(f).wait_future() + } +} + +impl fmt::Debug for Enter { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Enter").finish() + } +} + +impl Drop for Enter { + fn drop(&mut self) { + ENTERED.with(|c| { + assert!(c.get()); + + if self.permanent { + return; + } + + for callback in self.on_exit.drain(..) { + callback.call(); + } + + c.set(false); + }); + } +} + +trait Callback: 'static { + fn call(self: Box<Self>); +} + +impl<F: FnOnce() + 'static> Callback for F { + fn call(self: Box<Self>) { + (*self)() + } +} diff --git a/third_party/rust/tokio-executor/src/error.rs b/third_party/rust/tokio-executor/src/error.rs new file mode 100644 index 0000000000..579239f089 --- /dev/null +++ b/third_party/rust/tokio-executor/src/error.rs @@ -0,0 +1,50 @@ +use std::error::Error; +use std::fmt; + +/// Errors returned by `Executor::spawn`. +/// +/// Spawn errors should represent relatively rare scenarios. Currently, the two +/// scenarios represented by `SpawnError` are: +/// +/// * An executor being at capacity or full. As such, the executor is not able +/// to accept a new future. This error state is expected to be transient. +/// * An executor has been shutdown and can no longer accept new futures. This +/// error state is expected to be permanent. +#[derive(Debug)] +pub struct SpawnError { + is_shutdown: bool, +} + +impl SpawnError { + /// Return a new `SpawnError` reflecting a shutdown executor failure. + pub fn shutdown() -> Self { + SpawnError { is_shutdown: true } + } + + /// Return a new `SpawnError` reflecting an executor at capacity failure. + pub fn at_capacity() -> Self { + SpawnError { is_shutdown: false } + } + + /// Returns `true` if the error reflects a shutdown executor failure. + pub fn is_shutdown(&self) -> bool { + self.is_shutdown + } + + /// Returns `true` if the error reflects an executor at capacity failure. + pub fn is_at_capacity(&self) -> bool { + !self.is_shutdown + } +} + +impl fmt::Display for SpawnError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.description()) + } +} + +impl Error for SpawnError { + fn description(&self) -> &str { + "attempted to spawn task while the executor is at capacity or shut down" + } +} diff --git a/third_party/rust/tokio-executor/src/executor.rs b/third_party/rust/tokio-executor/src/executor.rs new file mode 100644 index 0000000000..2a8abc6824 --- /dev/null +++ b/third_party/rust/tokio-executor/src/executor.rs @@ -0,0 +1,151 @@ +use futures::Future; +use SpawnError; + +/// A value that executes futures. +/// +/// The [`spawn`] function is used to submit a future to an executor. Once +/// submitted, the executor takes ownership of the future and becomes +/// responsible for driving the future to completion. +/// +/// The strategy employed by the executor to handle the future is less defined +/// and is left up to the `Executor` implementation. The `Executor` instance is +/// expected to call [`poll`] on the future once it has been notified, however +/// the "when" and "how" can vary greatly. +/// +/// For example, the executor might be a thread pool, in which case a set of +/// threads have already been spawned up and the future is inserted into a +/// queue. A thread will acquire the future and poll it. +/// +/// The `Executor` trait is only for futures that **are** `Send`. These are most +/// common. There currently is no trait that describes executors that operate +/// entirely on the current thread (i.e., are able to spawn futures that are not +/// `Send`). Note that single threaded executors can still implement `Executor`, +/// but only futures that are `Send` can be spawned via the trait. +/// +/// This trait is primarily intended to implemented by executors and used to +/// back `tokio::spawn`. Libraries and applications **may** use this trait to +/// bound generics, but doing so will limit usage to futures that implement +/// `Send`. Instead, libraries and applications are recommended to use +/// [`TypedExecutor`] as a bound. +/// +/// # Errors +/// +/// The [`spawn`] function returns `Result` with an error type of `SpawnError`. +/// This error type represents the reason that the executor was unable to spawn +/// the future. The two current represented scenarios are: +/// +/// * An executor being at capacity or full. As such, the executor is not able +/// to accept a new future. This error state is expected to be transient. +/// * An executor has been shutdown and can no longer accept new futures. This +/// error state is expected to be permanent. +/// +/// If a caller encounters an at capacity error, the caller should try to shed +/// load. This can be as simple as dropping the future that was spawned. +/// +/// If the caller encounters a shutdown error, the caller should attempt to +/// gracefully shutdown. +/// +/// # Examples +/// +/// ```rust +/// # extern crate futures; +/// # extern crate tokio_executor; +/// # use tokio_executor::Executor; +/// # fn docs(my_executor: &mut Executor) { +/// use futures::future::lazy; +/// my_executor.spawn(Box::new(lazy(|| { +/// println!("running on the executor"); +/// Ok(()) +/// }))).unwrap(); +/// # } +/// # fn main() {} +/// ``` +/// +/// [`spawn`]: #tymethod.spawn +/// [`poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll +/// [`TypedExecutor`]: ../trait.TypedExecutor.html +pub trait Executor { + /// Spawns a future object to run on this executor. + /// + /// `future` is passed to the executor, which will begin running it. The + /// future may run on the current thread or another thread at the discretion + /// of the `Executor` implementation. + /// + /// # Panics + /// + /// Implementations are encouraged to avoid panics. However, panics are + /// permitted and the caller should check the implementation specific + /// documentation for more details on possible panics. + /// + /// # Examples + /// + /// ```rust + /// # extern crate futures; + /// # extern crate tokio_executor; + /// # use tokio_executor::Executor; + /// # fn docs(my_executor: &mut Executor) { + /// use futures::future::lazy; + /// my_executor.spawn(Box::new(lazy(|| { + /// println!("running on the executor"); + /// Ok(()) + /// }))).unwrap(); + /// # } + /// # fn main() {} + /// ``` + fn spawn( + &mut self, + future: Box<dyn Future<Item = (), Error = ()> + Send>, + ) -> Result<(), SpawnError>; + + /// Provides a best effort **hint** to whether or not `spawn` will succeed. + /// + /// This function may return both false positives **and** false negatives. + /// If `status` returns `Ok`, then a call to `spawn` will *probably* + /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will + /// *probably* fail, but may succeed. + /// + /// This allows a caller to avoid creating the task if the call to `spawn` + /// has a high likelihood of failing. + /// + /// # Panics + /// + /// This function must not panic. Implementers must ensure that panics do + /// not happen. + /// + /// # Examples + /// + /// ```rust + /// # extern crate futures; + /// # extern crate tokio_executor; + /// # use tokio_executor::Executor; + /// # fn docs(my_executor: &mut Executor) { + /// use futures::future::lazy; + /// + /// if my_executor.status().is_ok() { + /// my_executor.spawn(Box::new(lazy(|| { + /// println!("running on the executor"); + /// Ok(()) + /// }))).unwrap(); + /// } else { + /// println!("the executor is not in a good state"); + /// } + /// # } + /// # fn main() {} + /// ``` + fn status(&self) -> Result<(), SpawnError> { + Ok(()) + } +} + +impl<E: Executor + ?Sized> Executor for Box<E> { + fn spawn( + &mut self, + future: Box<dyn Future<Item = (), Error = ()> + Send>, + ) -> Result<(), SpawnError> { + (**self).spawn(future) + } + + fn status(&self) -> Result<(), SpawnError> { + (**self).status() + } +} diff --git a/third_party/rust/tokio-executor/src/global.rs b/third_party/rust/tokio-executor/src/global.rs new file mode 100644 index 0000000000..5012276088 --- /dev/null +++ b/third_party/rust/tokio-executor/src/global.rs @@ -0,0 +1,290 @@ +use super::{Enter, Executor, SpawnError}; + +use futures::{future, Future}; + +use std::cell::Cell; + +/// Executes futures on the default executor for the current execution context. +/// +/// `DefaultExecutor` implements `Executor` and can be used to spawn futures +/// without referencing a specific executor. +/// +/// When an executor starts, it sets the `DefaultExecutor` handle to point to an +/// executor (usually itself) that is used to spawn new tasks. +/// +/// The current `DefaultExecutor` reference is tracked using a thread-local +/// variable and is set using `tokio_executor::with_default` +#[derive(Debug, Clone)] +pub struct DefaultExecutor { + _dummy: (), +} + +/// Ensures that the executor is removed from the thread-local context +/// when leaving the scope. This handles cases that involve panicking. +#[derive(Debug)] +pub struct DefaultGuard { + _p: (), +} + +impl DefaultExecutor { + /// Returns a handle to the default executor for the current context. + /// + /// Futures may be spawned onto the default executor using this handle. + /// + /// The returned handle will reference whichever executor is configured as + /// the default **at the time `spawn` is called**. This enables + /// `DefaultExecutor::current()` to be called before an execution context is + /// setup, then passed **into** an execution context before it is used. + /// + /// This is also true for sending the handle across threads, so calling + /// `DefaultExecutor::current()` on thread A and then sending the result to + /// thread B will _not_ reference the default executor that was set on thread A. + pub fn current() -> DefaultExecutor { + DefaultExecutor { _dummy: () } + } + + #[inline] + fn with_current<F: FnOnce(&mut dyn Executor) -> R, R>(f: F) -> Option<R> { + EXECUTOR.with( + |current_executor| match current_executor.replace(State::Active) { + State::Ready(executor_ptr) => { + let executor = unsafe { &mut *executor_ptr }; + let result = f(executor); + current_executor.set(State::Ready(executor_ptr)); + Some(result) + } + State::Empty | State::Active => None, + }, + ) + } +} + +#[derive(Clone, Copy)] +enum State { + // default executor not defined + Empty, + // default executor is defined and ready to be used + Ready(*mut dyn Executor), + // default executor is currently active (used to detect recursive calls) + Active, +} + +thread_local! { + /// Thread-local tracking the current executor + static EXECUTOR: Cell<State> = Cell::new(State::Empty) +} + +// ===== impl DefaultExecutor ===== + +impl super::Executor for DefaultExecutor { + fn spawn( + &mut self, + future: Box<dyn Future<Item = (), Error = ()> + Send>, + ) -> Result<(), SpawnError> { + DefaultExecutor::with_current(|executor| executor.spawn(future)) + .unwrap_or_else(|| Err(SpawnError::shutdown())) + } + + fn status(&self) -> Result<(), SpawnError> { + DefaultExecutor::with_current(|executor| executor.status()) + .unwrap_or_else(|| Err(SpawnError::shutdown())) + } +} + +impl<T> super::TypedExecutor<T> for DefaultExecutor +where + T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn spawn(&mut self, future: T) -> Result<(), SpawnError> { + super::Executor::spawn(self, Box::new(future)) + } + + fn status(&self) -> Result<(), SpawnError> { + super::Executor::status(self) + } +} + +impl<T> future::Executor<T> for DefaultExecutor +where + T: Future<Item = (), Error = ()> + Send + 'static, +{ + fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> { + if let Err(e) = super::Executor::status(self) { + let kind = if e.is_at_capacity() { + future::ExecuteErrorKind::NoCapacity + } else { + future::ExecuteErrorKind::Shutdown + }; + + return Err(future::ExecuteError::new(kind, future)); + } + + let _ = DefaultExecutor::with_current(|executor| executor.spawn(Box::new(future))); + Ok(()) + } +} + +// ===== global spawn fns ===== + +/// Submits a future for execution on the default executor -- usually a +/// threadpool. +/// +/// Futures are lazy constructs. When they are defined, no work happens. In +/// order for the logic defined by the future to be run, the future must be +/// spawned on an executor. This function is the easiest way to do so. +/// +/// This function must be called from an execution context, i.e. from a future +/// that has been already spawned onto an executor. +/// +/// Once spawned, the future will execute. The details of how that happens is +/// left up to the executor instance. If the executor is a thread pool, the +/// future will be pushed onto a queue that a worker thread polls from. If the +/// executor is a "current thread" executor, the future might be polled +/// immediately from within the call to `spawn` or it might be pushed onto an +/// internal queue. +/// +/// # Panics +/// +/// This function will panic if the default executor is not set or if spawning +/// onto the default executor returns an error. To avoid the panic, use the +/// `DefaultExecutor` handle directly. +/// +/// # Examples +/// +/// ```rust +/// # extern crate futures; +/// # extern crate tokio_executor; +/// # use tokio_executor::spawn; +/// # pub fn dox() { +/// use futures::future::lazy; +/// +/// spawn(lazy(|| { +/// println!("running on the default executor"); +/// Ok(()) +/// })); +/// # } +/// # pub fn main() {} +/// ``` +pub fn spawn<T>(future: T) +where + T: Future<Item = (), Error = ()> + Send + 'static, +{ + DefaultExecutor::current().spawn(Box::new(future)).unwrap() +} + +/// Set the default executor for the duration of the closure +/// +/// # Panics +/// +/// This function panics if there already is a default executor set. +pub fn with_default<T, F, R>(executor: &mut T, enter: &mut Enter, f: F) -> R +where + T: Executor, + F: FnOnce(&mut Enter) -> R, +{ + unsafe fn hide_lt<'a>(p: *mut (dyn Executor + 'a)) -> *mut (dyn Executor + 'static) { + use std::mem; + mem::transmute(p) + } + + EXECUTOR.with(|cell| { + match cell.get() { + State::Ready(_) | State::Active => { + panic!("default executor already set for execution context") + } + _ => {} + } + + // Ensure that the executor is removed from the thread-local context + // when leaving the scope. This handles cases that involve panicking. + struct Reset<'a>(&'a Cell<State>); + + impl<'a> Drop for Reset<'a> { + fn drop(&mut self) { + self.0.set(State::Empty); + } + } + + let _reset = Reset(cell); + + // While scary, this is safe. The function takes a + // `&mut Executor`, which guarantees that the reference lives for the + // duration of `with_default`. + // + // Because we are always clearing the TLS value at the end of the + // function, we can cast the reference to 'static which thread-local + // cells require. + let executor = unsafe { hide_lt(executor as &mut _ as *mut _) }; + + cell.set(State::Ready(executor)); + + f(enter) + }) +} + +/// Sets `executor` as the default executor, returning a guard that unsets it when +/// dropped. +/// +/// # Panics +/// +/// This function panics if there already is a default executor set. +pub fn set_default<T>(executor: T) -> DefaultGuard +where + T: Executor + 'static, +{ + EXECUTOR.with(|cell| { + match cell.get() { + State::Ready(_) | State::Active => { + panic!("default executor already set for execution context") + } + _ => {} + } + + // Ensure that the executor will outlive the call to set_default, even + // if the drop guard is never dropped due to calls to `mem::forget` or + // similar. + let executor = Box::new(executor); + + cell.set(State::Ready(Box::into_raw(executor))); + }); + + DefaultGuard { _p: () } +} + +impl Drop for DefaultGuard { + fn drop(&mut self) { + let _ = EXECUTOR.try_with(|cell| { + if let State::Ready(prev) = cell.replace(State::Empty) { + // drop the previous executor. + unsafe { + let prev = Box::from_raw(prev); + drop(prev); + }; + } + }); + } +} + +#[cfg(test)] +mod tests { + use super::{with_default, DefaultExecutor, Executor}; + + #[test] + fn default_executor_is_send_and_sync() { + fn assert_send_sync<T: Send + Sync>() {} + + assert_send_sync::<DefaultExecutor>(); + } + + #[test] + fn nested_default_executor_status() { + let mut enter = super::super::enter().unwrap(); + let mut executor = DefaultExecutor::current(); + + let result = with_default(&mut executor, &mut enter, |_| { + DefaultExecutor::current().status() + }); + + assert!(result.err().unwrap().is_shutdown()) + } +} diff --git a/third_party/rust/tokio-executor/src/lib.rs b/third_party/rust/tokio-executor/src/lib.rs new file mode 100644 index 0000000000..67aa4895fc --- /dev/null +++ b/third_party/rust/tokio-executor/src/lib.rs @@ -0,0 +1,68 @@ +#![deny(missing_docs, missing_debug_implementations)] +#![doc(html_root_url = "https://docs.rs/tokio-executor/0.1.9")] + +//! Task execution related traits and utilities. +//! +//! In the Tokio execution model, futures are lazy. When a future is created, no +//! work is performed. In order for the work defined by the future to happen, +//! the future must be submitted to an executor. A future that is submitted to +//! an executor is called a "task". +//! +//! The executor is responsible for ensuring that [`Future::poll`] is called +//! whenever the task is notified. Notification happens when the internal +//! state of a task transitions from *not ready* to *ready*. For example, a +//! socket might have received data and a call to `read` will now be able to +//! succeed. +//! +//! This crate provides traits and utilities that are necessary for building an +//! executor, including: +//! +//! * The [`Executor`] trait spawns future object onto an executor. +//! +//! * The [`TypedExecutor`] trait spawns futures of a specific type onto an +//! executor. This is used to be generic over executors that spawn futures +//! that are either `Send` or `!Send` or implement executors that apply to +//! specific futures. +//! +//! * [`enter`] marks that the current thread is entering an execution +//! context. This prevents a second executor from accidentally starting from +//! within the context of one that is already running. +//! +//! * [`DefaultExecutor`] spawns tasks onto the default executor for the current +//! context. +//! +//! * [`Park`] abstracts over blocking and unblocking the current thread. +//! +//! # Implementing an executor +//! +//! Executors should always implement `TypedExecutor`. This usually is the bound +//! that applications and libraries will use when generic over an executor. See +//! the [trait documentation][`TypedExecutor`] for more details. +//! +//! If the executor is able to spawn all futures that are `Send`, then the +//! executor should also implement the `Executor` trait. This trait is rarely +//! used directly by applications and libraries. Instead, `tokio::spawn` is +//! configured to dispatch to type that implements `Executor`. +//! +//! [`Executor`]: trait.Executor.html +//! [`TypedExecutor`]: trait.TypedExecutor.html +//! [`enter`]: fn.enter.html +//! [`DefaultExecutor`]: struct.DefaultExecutor.html +//! [`Park`]: park/index.html +//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll + +extern crate crossbeam_utils; +extern crate futures; + +mod enter; +mod error; +mod executor; +mod global; +pub mod park; +mod typed; + +pub use enter::{enter, exit, Enter, EnterError}; +pub use error::SpawnError; +pub use executor::Executor; +pub use global::{set_default, spawn, with_default, DefaultExecutor, DefaultGuard}; +pub use typed::TypedExecutor; diff --git a/third_party/rust/tokio-executor/src/park.rs b/third_party/rust/tokio-executor/src/park.rs new file mode 100644 index 0000000000..8626f089a1 --- /dev/null +++ b/third_party/rust/tokio-executor/src/park.rs @@ -0,0 +1,226 @@ +//! Abstraction over blocking and unblocking the current thread. +//! +//! Provides an abstraction over blocking the current thread. This is similar to +//! the park / unpark constructs provided by [`std`] but made generic. This +//! allows embedding custom functionality to perform when the thread is blocked. +//! +//! A blocked [`Park`][p] instance is unblocked by calling [`unpark`] on its +//! [`Unpark`][up] handle. +//! +//! The [`ParkThread`] struct implements [`Park`][p] using +//! [`thread::park`][`std`] to put the thread to sleep. The Tokio reactor also +//! implements park, but uses [`mio::Poll`][mio] to block the thread instead. +//! +//! The [`Park`][p] trait is composable. A timer implementation might decorate a +//! [`Park`][p] implementation by checking if any timeouts have elapsed after +//! the inner [`Park`][p] implementation unblocks. +//! +//! # Model +//! +//! Conceptually, each [`Park`][p] instance has an associated token, which is +//! initially not present: +//! +//! * The [`park`] method blocks the current thread unless or until the token +//! is available, at which point it atomically consumes the token. +//! * The [`unpark`] method atomically makes the token available if it wasn't +//! already. +//! +//! Some things to note: +//! +//! * If [`unpark`] is called before [`park`], the next call to [`park`] will +//! **not** block the thread. +//! * **Spurious** wakeups are permitted, i.e., the [`park`] method may unblock +//! even if [`unpark`] was not called. +//! * [`park_timeout`] does the same as [`park`] but allows specifying a maximum +//! time to block the thread for. +//! +//! [`std`]: https://doc.rust-lang.org/std/thread/fn.park.html +//! [`thread::park`]: https://doc.rust-lang.org/std/thread/fn.park.html +//! [`ParkThread`]: struct.ParkThread.html +//! [p]: trait.Park.html +//! [`park`]: trait.Park.html#tymethod.park +//! [`park_timeout`]: trait.Park.html#tymethod.park_timeout +//! [`unpark`]: trait.Unpark.html#tymethod.unpark +//! [up]: trait.Unpark.html +//! [mio]: https://docs.rs/mio/0.6/mio/struct.Poll.html + +use std::marker::PhantomData; +use std::rc::Rc; +use std::sync::Arc; +use std::time::Duration; + +use crossbeam_utils::sync::{Parker, Unparker}; + +/// Block the current thread. +/// +/// See [module documentation][mod] for more details. +/// +/// [mod]: ../index.html +pub trait Park { + /// Unpark handle type for the `Park` implementation. + type Unpark: Unpark; + + /// Error returned by `park` + type Error; + + /// Get a new `Unpark` handle associated with this `Park` instance. + fn unpark(&self) -> Self::Unpark; + + /// Block the current thread unless or until the token is available. + /// + /// A call to `park` does not guarantee that the thread will remain blocked + /// forever, and callers should be prepared for this possibility. This + /// function may wakeup spuriously for any reason. + /// + /// See [module documentation][mod] for more details. + /// + /// # Panics + /// + /// This function **should** not panic, but ultimately, panics are left as + /// an implementation detail. Refer to the documentation for the specific + /// `Park` implementation + /// + /// [mod]: ../index.html + fn park(&mut self) -> Result<(), Self::Error>; + + /// Park the current thread for at most `duration`. + /// + /// This function is the same as `park` but allows specifying a maximum time + /// to block the thread for. + /// + /// Same as `park`, there is no guarantee that the thread will remain + /// blocked for any amount of time. Spurious wakeups are permitted for any + /// reason. + /// + /// See [module documentation][mod] for more details. + /// + /// # Panics + /// + /// This function **should** not panic, but ultimately, panics are left as + /// an implementation detail. Refer to the documentation for the specific + /// `Park` implementation + /// + /// [mod]: ../index.html + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>; +} + +/// Unblock a thread blocked by the associated [`Park`] instance. +/// +/// See [module documentation][mod] for more details. +/// +/// [mod]: ../index.html +/// [`Park`]: trait.Park.html +pub trait Unpark: Sync + Send + 'static { + /// Unblock a thread that is blocked by the associated `Park` handle. + /// + /// Calling `unpark` atomically makes available the unpark token, if it is + /// not already available. + /// + /// See [module documentation][mod] for more details. + /// + /// # Panics + /// + /// This function **should** not panic, but ultimately, panics are left as + /// an implementation detail. Refer to the documentation for the specific + /// `Unpark` implementation + /// + /// [mod]: ../index.html + fn unpark(&self); +} + +impl Unpark for Box<dyn Unpark> { + fn unpark(&self) { + (**self).unpark() + } +} + +impl Unpark for Arc<dyn Unpark> { + fn unpark(&self) { + (**self).unpark() + } +} + +/// Blocks the current thread using a condition variable. +/// +/// Implements the [`Park`] functionality by using a condition variable. An +/// atomic variable is also used to avoid using the condition variable if +/// possible. +/// +/// The condition variable is cached in a thread-local variable and is shared +/// across all `ParkThread` instances created on the same thread. This also +/// means that an instance of `ParkThread` might be unblocked by a handle +/// associated with a different `ParkThread` instance. +#[derive(Debug)] +pub struct ParkThread { + _anchor: PhantomData<Rc<()>>, +} + +/// Error returned by [`ParkThread`] +/// +/// This currently is never returned, but might at some point in the future. +/// +/// [`ParkThread`]: struct.ParkThread.html +#[derive(Debug)] +pub struct ParkError { + _p: (), +} + +/// Unblocks a thread that was blocked by `ParkThread`. +#[derive(Clone, Debug)] +pub struct UnparkThread { + inner: Unparker, +} + +thread_local! { + static CURRENT_PARKER: Parker = Parker::new(); +} + +// ===== impl ParkThread ===== + +impl ParkThread { + /// Create a new `ParkThread` handle for the current thread. + /// + /// This type cannot be moved to other threads, so it should be created on + /// the thread that the caller intends to park. + pub fn new() -> ParkThread { + ParkThread { + _anchor: PhantomData, + } + } + + /// Get a reference to the `ParkThread` handle for this thread. + fn with_current<F, R>(&self, f: F) -> R + where + F: FnOnce(&Parker) -> R, + { + CURRENT_PARKER.with(|inner| f(inner)) + } +} + +impl Park for ParkThread { + type Unpark = UnparkThread; + type Error = ParkError; + + fn unpark(&self) -> Self::Unpark { + let inner = self.with_current(|inner| inner.unparker().clone()); + UnparkThread { inner } + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.with_current(|inner| inner.park()); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.with_current(|inner| inner.park_timeout(duration)); + Ok(()) + } +} + +// ===== impl UnparkThread ===== + +impl Unpark for UnparkThread { + fn unpark(&self) { + self.inner.unpark(); + } +} diff --git a/third_party/rust/tokio-executor/src/typed.rs b/third_party/rust/tokio-executor/src/typed.rs new file mode 100644 index 0000000000..22edf29d12 --- /dev/null +++ b/third_party/rust/tokio-executor/src/typed.rs @@ -0,0 +1,181 @@ +use SpawnError; + +/// A value that spawns futures of a specific type. +/// +/// The trait is generic over `T`: the type of future that can be spawened. This +/// is useful for implementing an executor that is only able to spawn a specific +/// type of future. +/// +/// The [`spawn`] function is used to submit the future to the executor. Once +/// submitted, the executor takes ownership of the future and becomes +/// responsible for driving the future to completion. +/// +/// This trait is useful as a bound for applications and libraries in order to +/// be generic over futures that are `Send` vs. `!Send`. +/// +/// # Examples +/// +/// Consider a function that provides an API for draining a `Stream` in the +/// background. To do this, a task must be spawned to perform the draining. As +/// such, the function takes a stream and an executor on which the background +/// task is spawned. +/// +/// ```rust +/// #[macro_use] +/// extern crate futures; +/// extern crate tokio; +/// +/// use futures::{Future, Stream, Poll}; +/// use tokio::executor::TypedExecutor; +/// use tokio::sync::oneshot; +/// +/// pub fn drain<T, E>(stream: T, executor: &mut E) +/// -> impl Future<Item = (), Error = ()> +/// where +/// T: Stream, +/// E: TypedExecutor<Drain<T>> +/// { +/// let (tx, rx) = oneshot::channel(); +/// +/// executor.spawn(Drain { +/// stream, +/// tx: Some(tx), +/// }).unwrap(); +/// +/// rx.map_err(|_| ()) +/// } +/// +/// // The background task +/// pub struct Drain<T: Stream> { +/// stream: T, +/// tx: Option<oneshot::Sender<()>>, +/// } +/// +/// impl<T: Stream> Future for Drain<T> { +/// type Item = (); +/// type Error = (); +/// +/// fn poll(&mut self) -> Poll<Self::Item, Self::Error> { +/// loop { +/// let item = try_ready!( +/// self.stream.poll() +/// .map_err(|_| ()) +/// ); +/// +/// if item.is_none() { break; } +/// } +/// +/// self.tx.take().unwrap().send(()).map_err(|_| ()); +/// Ok(().into()) +/// } +/// } +/// # pub fn main() {} +/// ``` +/// +/// By doing this, the `drain` fn can accept a stream that is `!Send` as long as +/// the supplied executor is able to spawn `!Send` types. +pub trait TypedExecutor<T> { + /// Spawns a future to run on this executor. + /// + /// `future` is passed to the executor, which will begin running it. The + /// executor takes ownership of the future and becomes responsible for + /// driving the future to completion. + /// + /// # Panics + /// + /// Implementations are encouraged to avoid panics. However, panics are + /// permitted and the caller should check the implementation specific + /// documentation for more details on possible panics. + /// + /// # Examples + /// + /// ```rust + /// # extern crate futures; + /// # extern crate tokio_executor; + /// # use tokio_executor::TypedExecutor; + /// # use futures::{Future, Poll}; + /// fn example<T>(my_executor: &mut T) + /// where + /// T: TypedExecutor<MyFuture>, + /// { + /// my_executor.spawn(MyFuture).unwrap(); + /// } + /// + /// struct MyFuture; + /// + /// impl Future for MyFuture { + /// type Item = (); + /// type Error = (); + /// + /// fn poll(&mut self) -> Poll<(), ()> { + /// println!("running on the executor"); + /// Ok(().into()) + /// } + /// } + /// # fn main() {} + /// ``` + fn spawn(&mut self, future: T) -> Result<(), SpawnError>; + + /// Provides a best effort **hint** to whether or not `spawn` will succeed. + /// + /// This function may return both false positives **and** false negatives. + /// If `status` returns `Ok`, then a call to `spawn` will *probably* + /// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will + /// *probably* fail, but may succeed. + /// + /// This allows a caller to avoid creating the task if the call to `spawn` + /// has a high likelihood of failing. + /// + /// # Panics + /// + /// This function must not panic. Implementers must ensure that panics do + /// not happen. + /// + /// # Examples + /// + /// ```rust + /// # extern crate futures; + /// # extern crate tokio_executor; + /// # use tokio_executor::TypedExecutor; + /// # use futures::{Future, Poll}; + /// fn example<T>(my_executor: &mut T) + /// where + /// T: TypedExecutor<MyFuture>, + /// { + /// if my_executor.status().is_ok() { + /// my_executor.spawn(MyFuture).unwrap(); + /// } else { + /// println!("the executor is not in a good state"); + /// } + /// } + /// + /// struct MyFuture; + /// + /// impl Future for MyFuture { + /// type Item = (); + /// type Error = (); + /// + /// fn poll(&mut self) -> Poll<(), ()> { + /// println!("running on the executor"); + /// Ok(().into()) + /// } + /// } + /// # fn main() {} + /// ``` + fn status(&self) -> Result<(), SpawnError> { + Ok(()) + } +} + +impl<E, T> TypedExecutor<T> for Box<E> +where + E: TypedExecutor<T>, +{ + fn spawn(&mut self, future: T) -> Result<(), SpawnError> { + (**self).spawn(future) + } + + fn status(&self) -> Result<(), SpawnError> { + (**self).status() + } +} |