diff options
Diffstat (limited to 'third_party/rust/futures-executor/src')
-rw-r--r-- | third_party/rust/futures-executor/src/enter.rs | 80 | ||||
-rw-r--r-- | third_party/rust/futures-executor/src/lib.rs | 76 | ||||
-rw-r--r-- | third_party/rust/futures-executor/src/local_pool.rs | 402 | ||||
-rw-r--r-- | third_party/rust/futures-executor/src/thread_pool.rs | 380 | ||||
-rw-r--r-- | third_party/rust/futures-executor/src/unpark_mutex.rs | 137 |
5 files changed, 1075 insertions, 0 deletions
diff --git a/third_party/rust/futures-executor/src/enter.rs b/third_party/rust/futures-executor/src/enter.rs new file mode 100644 index 0000000000..cb58c30bb7 --- /dev/null +++ b/third_party/rust/futures-executor/src/enter.rs @@ -0,0 +1,80 @@ +use std::cell::Cell; +use std::fmt; + +thread_local!(static ENTERED: Cell<bool> = Cell::new(false)); + +/// Represents an executor context. +/// +/// For more details, see [`enter` documentation](enter()). +pub struct Enter { + _priv: (), +} + +/// An error returned by `enter` if an execution scope has already been +/// entered. +pub struct EnterError { + _priv: (), +} + +impl fmt::Debug for EnterError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EnterError").finish() + } +} + +impl fmt::Display for EnterError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "an execution scope has already been entered") + } +} + +impl std::error::Error for EnterError {} + +/// Marks the current thread as being within the dynamic extent of an +/// executor. +/// +/// Executor implementations should call this function before beginning to +/// execute a task, and drop the returned [`Enter`](Enter) value after +/// completing task execution: +/// +/// ``` +/// use futures::executor::enter; +/// +/// let enter = enter().expect("..."); +/// /* run task */ +/// drop(enter); +/// ``` +/// +/// Doing so ensures that executors aren't +/// accidentally invoked in a nested fashion. +/// +/// # Error +/// +/// Returns an error if the current thread is already marked, in which case the +/// caller should panic with a tailored error message. +pub fn enter() -> Result<Enter, EnterError> { + ENTERED.with(|c| { + if c.get() { + Err(EnterError { _priv: () }) + } else { + c.set(true); + + Ok(Enter { _priv: () }) + } + }) +} + +impl fmt::Debug for Enter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Enter").finish() + } +} + +impl Drop for Enter { + fn drop(&mut self) { + ENTERED.with(|c| { + assert!(c.get()); + c.set(false); + }); + } +} diff --git a/third_party/rust/futures-executor/src/lib.rs b/third_party/rust/futures-executor/src/lib.rs new file mode 100644 index 0000000000..b1af87545f --- /dev/null +++ b/third_party/rust/futures-executor/src/lib.rs @@ -0,0 +1,76 @@ +//! Built-in executors and related tools. +//! +//! All asynchronous computation occurs within an executor, which is +//! capable of spawning futures as tasks. This module provides several +//! built-in executors, as well as tools for building your own. +//! +//! All items are only available when the `std` feature of this +//! library is activated, and it is activated by default. +//! +//! # Using a thread pool (M:N task scheduling) +//! +//! Most of the time tasks should be executed on a [thread pool](ThreadPool). +//! A small set of worker threads can handle a very large set of spawned tasks +//! (which are much lighter weight than threads). Tasks spawned onto the pool +//! with the [`spawn_ok`](ThreadPool::spawn_ok) function will run ambiently on +//! the created threads. +//! +//! # Spawning additional tasks +//! +//! Tasks can be spawned onto a spawner by calling its [`spawn_obj`] method +//! directly. In the case of `!Send` futures, [`spawn_local_obj`] can be used +//! instead. +//! +//! # Single-threaded execution +//! +//! In addition to thread pools, it's possible to run a task (and the tasks +//! it spawns) entirely within a single thread via the [`LocalPool`] executor. +//! Aside from cutting down on synchronization costs, this executor also makes +//! it possible to spawn non-`Send` tasks, via [`spawn_local_obj`]. The +//! [`LocalPool`] is best suited for running I/O-bound tasks that do relatively +//! little work between I/O operations. +//! +//! There is also a convenience function [`block_on`] for simply running a +//! future to completion on the current thread. +//! +//! [`spawn_obj`]: https://docs.rs/futures/0.3/futures/task/trait.Spawn.html#tymethod.spawn_obj +//! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj + +#![cfg_attr(not(feature = "std"), no_std)] +#![warn( + missing_debug_implementations, + missing_docs, + rust_2018_idioms, + single_use_lifetimes, + unreachable_pub +)] +#![doc(test( + no_crate_inject, + attr( + deny(warnings, rust_2018_idioms, single_use_lifetimes), + allow(dead_code, unused_assignments, unused_variables) + ) +))] +#![cfg_attr(docsrs, feature(doc_cfg))] + +#[cfg(feature = "std")] +mod local_pool; +#[cfg(feature = "std")] +pub use crate::local_pool::{block_on, block_on_stream, BlockingStream, LocalPool, LocalSpawner}; + +#[cfg(feature = "thread-pool")] +#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] +#[cfg(feature = "std")] +mod thread_pool; +#[cfg(feature = "thread-pool")] +#[cfg(feature = "std")] +mod unpark_mutex; +#[cfg(feature = "thread-pool")] +#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] +#[cfg(feature = "std")] +pub use crate::thread_pool::{ThreadPool, ThreadPoolBuilder}; + +#[cfg(feature = "std")] +mod enter; +#[cfg(feature = "std")] +pub use crate::enter::{enter, Enter, EnterError}; diff --git a/third_party/rust/futures-executor/src/local_pool.rs b/third_party/rust/futures-executor/src/local_pool.rs new file mode 100644 index 0000000000..8a9bc2fc90 --- /dev/null +++ b/third_party/rust/futures-executor/src/local_pool.rs @@ -0,0 +1,402 @@ +use crate::enter; +use futures_core::future::Future; +use futures_core::stream::Stream; +use futures_core::task::{Context, Poll}; +use futures_task::{waker_ref, ArcWake}; +use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; +use futures_util::pin_mut; +use futures_util::stream::FuturesUnordered; +use futures_util::stream::StreamExt; +use std::cell::RefCell; +use std::ops::{Deref, DerefMut}; +use std::rc::{Rc, Weak}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; +use std::thread::{self, Thread}; + +/// A single-threaded task pool for polling futures to completion. +/// +/// This executor allows you to multiplex any number of tasks onto a single +/// thread. It's appropriate to poll strictly I/O-bound futures that do very +/// little work in between I/O actions. +/// +/// To get a handle to the pool that implements +/// [`Spawn`](futures_task::Spawn), use the +/// [`spawner()`](LocalPool::spawner) method. Because the executor is +/// single-threaded, it supports a special form of task spawning for non-`Send` +/// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj). +#[derive(Debug)] +pub struct LocalPool { + pool: FuturesUnordered<LocalFutureObj<'static, ()>>, + incoming: Rc<Incoming>, +} + +/// A handle to a [`LocalPool`](LocalPool) that implements +/// [`Spawn`](futures_task::Spawn). +#[derive(Clone, Debug)] +pub struct LocalSpawner { + incoming: Weak<Incoming>, +} + +type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>; + +pub(crate) struct ThreadNotify { + /// The (single) executor thread. + thread: Thread, + /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten" + /// before the next `park()`, which may otherwise happen if the code + /// being executed as part of the future(s) being polled makes use of + /// park / unpark calls of its own, i.e. we cannot assume that no other + /// code uses park / unpark on the executing `thread`. + unparked: AtomicBool, +} + +thread_local! { + static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify { + thread: thread::current(), + unparked: AtomicBool::new(false), + }); +} + +impl ArcWake for ThreadNotify { + fn wake_by_ref(arc_self: &Arc<Self>) { + // Make sure the wakeup is remembered until the next `park()`. + let unparked = arc_self.unparked.swap(true, Ordering::Release); + if !unparked { + // If the thread has not been unparked yet, it must be done + // now. If it was actually parked, it will run again, + // otherwise the token made available by `unpark` + // may be consumed before reaching `park()`, but `unparked` + // ensures it is not forgotten. + arc_self.thread.unpark(); + } + } +} + +// Set up and run a basic single-threaded spawner loop, invoking `f` on each +// turn. +fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T { + let _enter = enter().expect( + "cannot execute `LocalPool` executor from within \ + another executor", + ); + + CURRENT_THREAD_NOTIFY.with(|thread_notify| { + let waker = waker_ref(thread_notify); + let mut cx = Context::from_waker(&waker); + loop { + if let Poll::Ready(t) = f(&mut cx) { + return t; + } + + // Wait for a wakeup. + while !thread_notify.unparked.swap(false, Ordering::Acquire) { + // No wakeup occurred. It may occur now, right before parking, + // but in that case the token made available by `unpark()` + // is guaranteed to still be available and `park()` is a no-op. + thread::park(); + } + } + }) +} + +/// Check for a wakeup, but don't consume it. +fn woken() -> bool { + CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire)) +} + +impl LocalPool { + /// Create a new, empty pool of tasks. + pub fn new() -> Self { + Self { pool: FuturesUnordered::new(), incoming: Default::default() } + } + + /// Get a clonable handle to the pool as a [`Spawn`]. + pub fn spawner(&self) -> LocalSpawner { + LocalSpawner { incoming: Rc::downgrade(&self.incoming) } + } + + /// Run all tasks in the pool to completion. + /// + /// ``` + /// use futures::executor::LocalPool; + /// + /// let mut pool = LocalPool::new(); + /// + /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()` + /// + /// // run *all* tasks in the pool to completion, including any newly-spawned ones. + /// pool.run(); + /// ``` + /// + /// The function will block the calling thread until *all* tasks in the pool + /// are complete, including any spawned while running existing tasks. + pub fn run(&mut self) { + run_executor(|cx| self.poll_pool(cx)) + } + + /// Runs all the tasks in the pool until the given future completes. + /// + /// ``` + /// use futures::executor::LocalPool; + /// + /// let mut pool = LocalPool::new(); + /// # let my_app = async {}; + /// + /// // run tasks in the pool until `my_app` completes + /// pool.run_until(my_app); + /// ``` + /// + /// The function will block the calling thread *only* until the future `f` + /// completes; there may still be incomplete tasks in the pool, which will + /// be inert after the call completes, but can continue with further use of + /// one of the pool's run or poll methods. While the function is running, + /// however, all tasks in the pool will try to make progress. + pub fn run_until<F: Future>(&mut self, future: F) -> F::Output { + pin_mut!(future); + + run_executor(|cx| { + { + // if our main task is done, so are we + let result = future.as_mut().poll(cx); + if let Poll::Ready(output) = result { + return Poll::Ready(output); + } + } + + let _ = self.poll_pool(cx); + Poll::Pending + }) + } + + /// Runs all tasks and returns after completing one future or until no more progress + /// can be made. Returns `true` if one future was completed, `false` otherwise. + /// + /// ``` + /// use futures::executor::LocalPool; + /// use futures::task::LocalSpawnExt; + /// use futures::future::{ready, pending}; + /// + /// let mut pool = LocalPool::new(); + /// let spawner = pool.spawner(); + /// + /// spawner.spawn_local(ready(())).unwrap(); + /// spawner.spawn_local(ready(())).unwrap(); + /// spawner.spawn_local(pending()).unwrap(); + /// + /// // Run the two ready tasks and return true for them. + /// pool.try_run_one(); // returns true after completing one of the ready futures + /// pool.try_run_one(); // returns true after completing the other ready future + /// + /// // the remaining task can not be completed + /// assert!(!pool.try_run_one()); // returns false + /// ``` + /// + /// This function will not block the calling thread and will return the moment + /// that there are no tasks left for which progress can be made or after exactly one + /// task was completed; Remaining incomplete tasks in the pool can continue with + /// further use of one of the pool's run or poll methods. + /// Though only one task will be completed, progress may be made on multiple tasks. + pub fn try_run_one(&mut self) -> bool { + run_executor(|cx| { + loop { + self.drain_incoming(); + + match self.pool.poll_next_unpin(cx) { + // Success! + Poll::Ready(Some(())) => return Poll::Ready(true), + // The pool was empty. + Poll::Ready(None) => return Poll::Ready(false), + Poll::Pending => (), + } + + if !self.incoming.borrow().is_empty() { + // New tasks were spawned; try again. + continue; + } else if woken() { + // The pool yielded to us, but there's more progress to be made. + return Poll::Pending; + } else { + return Poll::Ready(false); + } + } + }) + } + + /// Runs all tasks in the pool and returns if no more progress can be made + /// on any task. + /// + /// ``` + /// use futures::executor::LocalPool; + /// use futures::task::LocalSpawnExt; + /// use futures::future::{ready, pending}; + /// + /// let mut pool = LocalPool::new(); + /// let spawner = pool.spawner(); + /// + /// spawner.spawn_local(ready(())).unwrap(); + /// spawner.spawn_local(ready(())).unwrap(); + /// spawner.spawn_local(pending()).unwrap(); + /// + /// // Runs the two ready task and returns. + /// // The empty task remains in the pool. + /// pool.run_until_stalled(); + /// ``` + /// + /// This function will not block the calling thread and will return the moment + /// that there are no tasks left for which progress can be made; + /// remaining incomplete tasks in the pool can continue with further use of one + /// of the pool's run or poll methods. While the function is running, all tasks + /// in the pool will try to make progress. + pub fn run_until_stalled(&mut self) { + run_executor(|cx| match self.poll_pool(cx) { + // The pool is empty. + Poll::Ready(()) => Poll::Ready(()), + Poll::Pending => { + if woken() { + Poll::Pending + } else { + // We're stalled for now. + Poll::Ready(()) + } + } + }); + } + + /// Poll `self.pool`, re-filling it with any newly-spawned tasks. + /// Repeat until either the pool is empty, or it returns `Pending`. + /// + /// Returns `Ready` if the pool was empty, and `Pending` otherwise. + /// + /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily + /// mean that the pool can't make progress. + fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> { + loop { + self.drain_incoming(); + + let pool_ret = self.pool.poll_next_unpin(cx); + + // We queued up some new tasks; add them and poll again. + if !self.incoming.borrow().is_empty() { + continue; + } + + match pool_ret { + Poll::Ready(Some(())) => continue, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, + } + } + } + + /// Empty the incoming queue of newly-spawned tasks. + fn drain_incoming(&mut self) { + let mut incoming = self.incoming.borrow_mut(); + for task in incoming.drain(..) { + self.pool.push(task) + } + } +} + +impl Default for LocalPool { + fn default() -> Self { + Self::new() + } +} + +/// Run a future to completion on the current thread. +/// +/// This function will block the caller until the given future has completed. +/// +/// Use a [`LocalPool`](LocalPool) if you need finer-grained control over +/// spawned tasks. +pub fn block_on<F: Future>(f: F) -> F::Output { + pin_mut!(f); + run_executor(|cx| f.as_mut().poll(cx)) +} + +/// Turn a stream into a blocking iterator. +/// +/// When `next` is called on the resulting `BlockingStream`, the caller +/// will be blocked until the next element of the `Stream` becomes available. +pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> { + BlockingStream { stream } +} + +/// An iterator which blocks on values from a stream until they become available. +#[derive(Debug)] +pub struct BlockingStream<S: Stream + Unpin> { + stream: S, +} + +impl<S: Stream + Unpin> Deref for BlockingStream<S> { + type Target = S; + fn deref(&self) -> &Self::Target { + &self.stream + } +} + +impl<S: Stream + Unpin> DerefMut for BlockingStream<S> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.stream + } +} + +impl<S: Stream + Unpin> BlockingStream<S> { + /// Convert this `BlockingStream` into the inner `Stream` type. + pub fn into_inner(self) -> S { + self.stream + } +} + +impl<S: Stream + Unpin> Iterator for BlockingStream<S> { + type Item = S::Item; + + fn next(&mut self) -> Option<Self::Item> { + LocalPool::new().run_until(self.stream.next()) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.stream.size_hint() + } +} + +impl Spawn for LocalSpawner { + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { + if let Some(incoming) = self.incoming.upgrade() { + incoming.borrow_mut().push(future.into()); + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } + + fn status(&self) -> Result<(), SpawnError> { + if self.incoming.upgrade().is_some() { + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } +} + +impl LocalSpawn for LocalSpawner { + fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { + if let Some(incoming) = self.incoming.upgrade() { + incoming.borrow_mut().push(future); + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } + + fn status_local(&self) -> Result<(), SpawnError> { + if self.incoming.upgrade().is_some() { + Ok(()) + } else { + Err(SpawnError::shutdown()) + } + } +} diff --git a/third_party/rust/futures-executor/src/thread_pool.rs b/third_party/rust/futures-executor/src/thread_pool.rs new file mode 100644 index 0000000000..5371008953 --- /dev/null +++ b/third_party/rust/futures-executor/src/thread_pool.rs @@ -0,0 +1,380 @@ +use crate::enter; +use crate::unpark_mutex::UnparkMutex; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_task::{waker_ref, ArcWake}; +use futures_task::{FutureObj, Spawn, SpawnError}; +use futures_util::future::FutureExt; +use std::cmp; +use std::fmt; +use std::io; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::mpsc; +use std::sync::{Arc, Mutex}; +use std::thread; + +/// A general-purpose thread pool for scheduling tasks that poll futures to +/// completion. +/// +/// The thread pool multiplexes any number of tasks onto a fixed number of +/// worker threads. +/// +/// This type is a clonable handle to the threadpool itself. +/// Cloning it will only create a new reference, not a new threadpool. +/// +/// This type is only available when the `thread-pool` feature of this +/// library is activated. +#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] +pub struct ThreadPool { + state: Arc<PoolState>, +} + +/// Thread pool configuration object. +/// +/// This type is only available when the `thread-pool` feature of this +/// library is activated. +#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] +pub struct ThreadPoolBuilder { + pool_size: usize, + stack_size: usize, + name_prefix: Option<String>, + after_start: Option<Arc<dyn Fn(usize) + Send + Sync>>, + before_stop: Option<Arc<dyn Fn(usize) + Send + Sync>>, +} + +trait AssertSendSync: Send + Sync {} +impl AssertSendSync for ThreadPool {} + +struct PoolState { + tx: Mutex<mpsc::Sender<Message>>, + rx: Mutex<mpsc::Receiver<Message>>, + cnt: AtomicUsize, + size: usize, +} + +impl fmt::Debug for ThreadPool { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ThreadPool").field("size", &self.state.size).finish() + } +} + +impl fmt::Debug for ThreadPoolBuilder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ThreadPoolBuilder") + .field("pool_size", &self.pool_size) + .field("name_prefix", &self.name_prefix) + .finish() + } +} + +enum Message { + Run(Task), + Close, +} + +impl ThreadPool { + /// Creates a new thread pool with the default configuration. + /// + /// See documentation for the methods in + /// [`ThreadPoolBuilder`](ThreadPoolBuilder) for details on the default + /// configuration. + pub fn new() -> Result<Self, io::Error> { + ThreadPoolBuilder::new().create() + } + + /// Create a default thread pool configuration, which can then be customized. + /// + /// See documentation for the methods in + /// [`ThreadPoolBuilder`](ThreadPoolBuilder) for details on the default + /// configuration. + pub fn builder() -> ThreadPoolBuilder { + ThreadPoolBuilder::new() + } + + /// Spawns a future that will be run to completion. + /// + /// > **Note**: This method is similar to `Spawn::spawn_obj`, except that + /// > it is guaranteed to always succeed. + pub fn spawn_obj_ok(&self, future: FutureObj<'static, ()>) { + let task = Task { + future, + wake_handle: Arc::new(WakeHandle { exec: self.clone(), mutex: UnparkMutex::new() }), + exec: self.clone(), + }; + self.state.send(Message::Run(task)); + } + + /// Spawns a task that polls the given future with output `()` to + /// completion. + /// + /// ``` + /// # { + /// use futures::executor::ThreadPool; + /// + /// let pool = ThreadPool::new().unwrap(); + /// + /// let future = async { /* ... */ }; + /// pool.spawn_ok(future); + /// # } + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 + /// ``` + /// + /// > **Note**: This method is similar to `SpawnExt::spawn`, except that + /// > it is guaranteed to always succeed. + pub fn spawn_ok<Fut>(&self, future: Fut) + where + Fut: Future<Output = ()> + Send + 'static, + { + self.spawn_obj_ok(FutureObj::new(Box::new(future))) + } +} + +impl Spawn for ThreadPool { + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { + self.spawn_obj_ok(future); + Ok(()) + } +} + +impl PoolState { + fn send(&self, msg: Message) { + self.tx.lock().unwrap().send(msg).unwrap(); + } + + fn work( + &self, + idx: usize, + after_start: Option<Arc<dyn Fn(usize) + Send + Sync>>, + before_stop: Option<Arc<dyn Fn(usize) + Send + Sync>>, + ) { + let _scope = enter().unwrap(); + if let Some(after_start) = after_start { + after_start(idx); + } + loop { + let msg = self.rx.lock().unwrap().recv().unwrap(); + match msg { + Message::Run(task) => task.run(), + Message::Close => break, + } + } + if let Some(before_stop) = before_stop { + before_stop(idx); + } + } +} + +impl Clone for ThreadPool { + fn clone(&self) -> Self { + self.state.cnt.fetch_add(1, Ordering::Relaxed); + Self { state: self.state.clone() } + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + if self.state.cnt.fetch_sub(1, Ordering::Relaxed) == 1 { + for _ in 0..self.state.size { + self.state.send(Message::Close); + } + } + } +} + +impl ThreadPoolBuilder { + /// Create a default thread pool configuration. + /// + /// See the other methods on this type for details on the defaults. + pub fn new() -> Self { + Self { + pool_size: cmp::max(1, num_cpus::get()), + stack_size: 0, + name_prefix: None, + after_start: None, + before_stop: None, + } + } + + /// Set size of a future ThreadPool + /// + /// The size of a thread pool is the number of worker threads spawned. By + /// default, this is equal to the number of CPU cores. + /// + /// # Panics + /// + /// Panics if `pool_size == 0`. + pub fn pool_size(&mut self, size: usize) -> &mut Self { + assert!(size > 0); + self.pool_size = size; + self + } + + /// Set stack size of threads in the pool, in bytes. + /// + /// By default, worker threads use Rust's standard stack size. + pub fn stack_size(&mut self, stack_size: usize) -> &mut Self { + self.stack_size = stack_size; + self + } + + /// Set thread name prefix of a future ThreadPool. + /// + /// Thread name prefix is used for generating thread names. For example, if prefix is + /// `my-pool-`, then threads in the pool will get names like `my-pool-1` etc. + /// + /// By default, worker threads are assigned Rust's standard thread name. + pub fn name_prefix<S: Into<String>>(&mut self, name_prefix: S) -> &mut Self { + self.name_prefix = Some(name_prefix.into()); + self + } + + /// Execute the closure `f` immediately after each worker thread is started, + /// but before running any tasks on it. + /// + /// This hook is intended for bookkeeping and monitoring. + /// The closure `f` will be dropped after the `builder` is dropped + /// and all worker threads in the pool have executed it. + /// + /// The closure provided will receive an index corresponding to the worker + /// thread it's running on. + pub fn after_start<F>(&mut self, f: F) -> &mut Self + where + F: Fn(usize) + Send + Sync + 'static, + { + self.after_start = Some(Arc::new(f)); + self + } + + /// Execute closure `f` just prior to shutting down each worker thread. + /// + /// This hook is intended for bookkeeping and monitoring. + /// The closure `f` will be dropped after the `builder` is dropped + /// and all threads in the pool have executed it. + /// + /// The closure provided will receive an index corresponding to the worker + /// thread it's running on. + pub fn before_stop<F>(&mut self, f: F) -> &mut Self + where + F: Fn(usize) + Send + Sync + 'static, + { + self.before_stop = Some(Arc::new(f)); + self + } + + /// Create a [`ThreadPool`](ThreadPool) with the given configuration. + pub fn create(&mut self) -> Result<ThreadPool, io::Error> { + let (tx, rx) = mpsc::channel(); + let pool = ThreadPool { + state: Arc::new(PoolState { + tx: Mutex::new(tx), + rx: Mutex::new(rx), + cnt: AtomicUsize::new(1), + size: self.pool_size, + }), + }; + + for counter in 0..self.pool_size { + let state = pool.state.clone(); + let after_start = self.after_start.clone(); + let before_stop = self.before_stop.clone(); + let mut thread_builder = thread::Builder::new(); + if let Some(ref name_prefix) = self.name_prefix { + thread_builder = thread_builder.name(format!("{}{}", name_prefix, counter)); + } + if self.stack_size > 0 { + thread_builder = thread_builder.stack_size(self.stack_size); + } + thread_builder.spawn(move || state.work(counter, after_start, before_stop))?; + } + Ok(pool) + } +} + +impl Default for ThreadPoolBuilder { + fn default() -> Self { + Self::new() + } +} + +/// A task responsible for polling a future to completion. +struct Task { + future: FutureObj<'static, ()>, + exec: ThreadPool, + wake_handle: Arc<WakeHandle>, +} + +struct WakeHandle { + mutex: UnparkMutex<Task>, + exec: ThreadPool, +} + +impl Task { + /// Actually run the task (invoking `poll` on the future) on the current + /// thread. + fn run(self) { + let Self { mut future, wake_handle, mut exec } = self; + let waker = waker_ref(&wake_handle); + let mut cx = Context::from_waker(&waker); + + // Safety: The ownership of this `Task` object is evidence that + // we are in the `POLLING`/`REPOLL` state for the mutex. + unsafe { + wake_handle.mutex.start_poll(); + + loop { + let res = future.poll_unpin(&mut cx); + match res { + Poll::Pending => {} + Poll::Ready(()) => return wake_handle.mutex.complete(), + } + let task = Self { future, wake_handle: wake_handle.clone(), exec }; + match wake_handle.mutex.wait(task) { + Ok(()) => return, // we've waited + Err(task) => { + // someone's notified us + future = task.future; + exec = task.exec; + } + } + } + } + } +} + +impl fmt::Debug for Task { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Task").field("contents", &"...").finish() + } +} + +impl ArcWake for WakeHandle { + fn wake_by_ref(arc_self: &Arc<Self>) { + if let Ok(task) = arc_self.mutex.notify() { + arc_self.exec.state.send(Message::Run(task)) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::mpsc; + + #[test] + fn test_drop_after_start() { + { + let (tx, rx) = mpsc::sync_channel(2); + let _cpu_pool = ThreadPoolBuilder::new() + .pool_size(2) + .after_start(move |_| tx.send(1).unwrap()) + .create() + .unwrap(); + + // After ThreadPoolBuilder is deconstructed, the tx should be dropped + // so that we can use rx as an iterator. + let count = rx.into_iter().count(); + assert_eq!(count, 2); + } + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 + } +} diff --git a/third_party/rust/futures-executor/src/unpark_mutex.rs b/third_party/rust/futures-executor/src/unpark_mutex.rs new file mode 100644 index 0000000000..ac5112cfa2 --- /dev/null +++ b/third_party/rust/futures-executor/src/unpark_mutex.rs @@ -0,0 +1,137 @@ +use std::cell::UnsafeCell; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; + +/// A "lock" around data `D`, which employs a *helping* strategy. +/// +/// Used to ensure that concurrent `unpark` invocations lead to (1) `poll` being +/// invoked on only a single thread at a time (2) `poll` being invoked at least +/// once after each `unpark` (unless the future has completed). +pub(crate) struct UnparkMutex<D> { + // The state of task execution (state machine described below) + status: AtomicUsize, + + // The actual task data, accessible only in the POLLING state + inner: UnsafeCell<Option<D>>, +} + +// `UnparkMutex<D>` functions in many ways like a `Mutex<D>`, except that on +// acquisition failure, the current lock holder performs the desired work -- +// re-polling. +// +// As such, these impls mirror those for `Mutex<D>`. In particular, a reference +// to `UnparkMutex` can be used to gain `&mut` access to the inner data, which +// must therefore be `Send`. +unsafe impl<D: Send> Send for UnparkMutex<D> {} +unsafe impl<D: Send> Sync for UnparkMutex<D> {} + +// There are four possible task states, listed below with their possible +// transitions: + +// The task is blocked, waiting on an event +const WAITING: usize = 0; // --> POLLING + +// The task is actively being polled by a thread; arrival of additional events +// of interest should move it to the REPOLL state +const POLLING: usize = 1; // --> WAITING, REPOLL, or COMPLETE + +// The task is actively being polled, but will need to be re-polled upon +// completion to ensure that all events were observed. +const REPOLL: usize = 2; // --> POLLING + +// The task has finished executing (either successfully or with an error/panic) +const COMPLETE: usize = 3; // No transitions out + +impl<D> UnparkMutex<D> { + pub(crate) fn new() -> Self { + Self { status: AtomicUsize::new(WAITING), inner: UnsafeCell::new(None) } + } + + /// Attempt to "notify" the mutex that a poll should occur. + /// + /// An `Ok` result indicates that the `POLLING` state has been entered, and + /// the caller can proceed to poll the future. An `Err` result indicates + /// that polling is not necessary (because the task is finished or the + /// polling has been delegated). + pub(crate) fn notify(&self) -> Result<D, ()> { + let mut status = self.status.load(SeqCst); + loop { + match status { + // The task is idle, so try to run it immediately. + WAITING => { + match self.status.compare_exchange(WAITING, POLLING, SeqCst, SeqCst) { + Ok(_) => { + let data = unsafe { + // SAFETY: we've ensured mutual exclusion via + // the status protocol; we are the only thread + // that has transitioned to the POLLING state, + // and we won't transition back to QUEUED until + // the lock is "released" by this thread. See + // the protocol diagram above. + (*self.inner.get()).take().unwrap() + }; + return Ok(data); + } + Err(cur) => status = cur, + } + } + + // The task is being polled, so we need to record that it should + // be *repolled* when complete. + POLLING => match self.status.compare_exchange(POLLING, REPOLL, SeqCst, SeqCst) { + Ok(_) => return Err(()), + Err(cur) => status = cur, + }, + + // The task is already scheduled for polling, or is complete, so + // we've got nothing to do. + _ => return Err(()), + } + } + } + + /// Alert the mutex that polling is about to begin, clearing any accumulated + /// re-poll requests. + /// + /// # Safety + /// + /// Callable only from the `POLLING`/`REPOLL` states, i.e. between + /// successful calls to `notify` and `wait`/`complete`. + pub(crate) unsafe fn start_poll(&self) { + self.status.store(POLLING, SeqCst); + } + + /// Alert the mutex that polling completed with `Pending`. + /// + /// # Safety + /// + /// Callable only from the `POLLING`/`REPOLL` states, i.e. between + /// successful calls to `notify` and `wait`/`complete`. + pub(crate) unsafe fn wait(&self, data: D) -> Result<(), D> { + *self.inner.get() = Some(data); + + match self.status.compare_exchange(POLLING, WAITING, SeqCst, SeqCst) { + // no unparks came in while we were running + Ok(_) => Ok(()), + + // guaranteed to be in REPOLL state; just clobber the + // state and run again. + Err(status) => { + assert_eq!(status, REPOLL); + self.status.store(POLLING, SeqCst); + Err((*self.inner.get()).take().unwrap()) + } + } + } + + /// Alert the mutex that the task has completed execution and should not be + /// notified again. + /// + /// # Safety + /// + /// Callable only from the `POLLING`/`REPOLL` states, i.e. between + /// successful calls to `notify` and `wait`/`complete`. + pub(crate) unsafe fn complete(&self) { + self.status.store(COMPLETE, SeqCst); + } +} |