summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-executor/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/futures-executor/src
parentInitial commit. (diff)
downloadfirefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz
firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-executor/src')
-rw-r--r--third_party/rust/futures-executor/src/enter.rs80
-rw-r--r--third_party/rust/futures-executor/src/lib.rs76
-rw-r--r--third_party/rust/futures-executor/src/local_pool.rs402
-rw-r--r--third_party/rust/futures-executor/src/thread_pool.rs380
-rw-r--r--third_party/rust/futures-executor/src/unpark_mutex.rs137
5 files changed, 1075 insertions, 0 deletions
diff --git a/third_party/rust/futures-executor/src/enter.rs b/third_party/rust/futures-executor/src/enter.rs
new file mode 100644
index 0000000000..cb58c30bb7
--- /dev/null
+++ b/third_party/rust/futures-executor/src/enter.rs
@@ -0,0 +1,80 @@
+use std::cell::Cell;
+use std::fmt;
+
+thread_local!(static ENTERED: Cell<bool> = Cell::new(false));
+
+/// Represents an executor context.
+///
+/// For more details, see [`enter` documentation](enter()).
+pub struct Enter {
+ _priv: (),
+}
+
+/// An error returned by `enter` if an execution scope has already been
+/// entered.
+pub struct EnterError {
+ _priv: (),
+}
+
+impl fmt::Debug for EnterError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("EnterError").finish()
+ }
+}
+
+impl fmt::Display for EnterError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "an execution scope has already been entered")
+ }
+}
+
+impl std::error::Error for EnterError {}
+
+/// Marks the current thread as being within the dynamic extent of an
+/// executor.
+///
+/// Executor implementations should call this function before beginning to
+/// execute a task, and drop the returned [`Enter`](Enter) value after
+/// completing task execution:
+///
+/// ```
+/// use futures::executor::enter;
+///
+/// let enter = enter().expect("...");
+/// /* run task */
+/// drop(enter);
+/// ```
+///
+/// Doing so ensures that executors aren't
+/// accidentally invoked in a nested fashion.
+///
+/// # Error
+///
+/// Returns an error if the current thread is already marked, in which case the
+/// caller should panic with a tailored error message.
+pub fn enter() -> Result<Enter, EnterError> {
+ ENTERED.with(|c| {
+ if c.get() {
+ Err(EnterError { _priv: () })
+ } else {
+ c.set(true);
+
+ Ok(Enter { _priv: () })
+ }
+ })
+}
+
+impl fmt::Debug for Enter {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Enter").finish()
+ }
+}
+
+impl Drop for Enter {
+ fn drop(&mut self) {
+ ENTERED.with(|c| {
+ assert!(c.get());
+ c.set(false);
+ });
+ }
+}
diff --git a/third_party/rust/futures-executor/src/lib.rs b/third_party/rust/futures-executor/src/lib.rs
new file mode 100644
index 0000000000..b1af87545f
--- /dev/null
+++ b/third_party/rust/futures-executor/src/lib.rs
@@ -0,0 +1,76 @@
+//! Built-in executors and related tools.
+//!
+//! All asynchronous computation occurs within an executor, which is
+//! capable of spawning futures as tasks. This module provides several
+//! built-in executors, as well as tools for building your own.
+//!
+//! All items are only available when the `std` feature of this
+//! library is activated, and it is activated by default.
+//!
+//! # Using a thread pool (M:N task scheduling)
+//!
+//! Most of the time tasks should be executed on a [thread pool](ThreadPool).
+//! A small set of worker threads can handle a very large set of spawned tasks
+//! (which are much lighter weight than threads). Tasks spawned onto the pool
+//! with the [`spawn_ok`](ThreadPool::spawn_ok) function will run ambiently on
+//! the created threads.
+//!
+//! # Spawning additional tasks
+//!
+//! Tasks can be spawned onto a spawner by calling its [`spawn_obj`] method
+//! directly. In the case of `!Send` futures, [`spawn_local_obj`] can be used
+//! instead.
+//!
+//! # Single-threaded execution
+//!
+//! In addition to thread pools, it's possible to run a task (and the tasks
+//! it spawns) entirely within a single thread via the [`LocalPool`] executor.
+//! Aside from cutting down on synchronization costs, this executor also makes
+//! it possible to spawn non-`Send` tasks, via [`spawn_local_obj`]. The
+//! [`LocalPool`] is best suited for running I/O-bound tasks that do relatively
+//! little work between I/O operations.
+//!
+//! There is also a convenience function [`block_on`] for simply running a
+//! future to completion on the current thread.
+//!
+//! [`spawn_obj`]: https://docs.rs/futures/0.3/futures/task/trait.Spawn.html#tymethod.spawn_obj
+//! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj
+
+#![cfg_attr(not(feature = "std"), no_std)]
+#![warn(
+ missing_debug_implementations,
+ missing_docs,
+ rust_2018_idioms,
+ single_use_lifetimes,
+ unreachable_pub
+)]
+#![doc(test(
+ no_crate_inject,
+ attr(
+ deny(warnings, rust_2018_idioms, single_use_lifetimes),
+ allow(dead_code, unused_assignments, unused_variables)
+ )
+))]
+#![cfg_attr(docsrs, feature(doc_cfg))]
+
+#[cfg(feature = "std")]
+mod local_pool;
+#[cfg(feature = "std")]
+pub use crate::local_pool::{block_on, block_on_stream, BlockingStream, LocalPool, LocalSpawner};
+
+#[cfg(feature = "thread-pool")]
+#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
+#[cfg(feature = "std")]
+mod thread_pool;
+#[cfg(feature = "thread-pool")]
+#[cfg(feature = "std")]
+mod unpark_mutex;
+#[cfg(feature = "thread-pool")]
+#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
+#[cfg(feature = "std")]
+pub use crate::thread_pool::{ThreadPool, ThreadPoolBuilder};
+
+#[cfg(feature = "std")]
+mod enter;
+#[cfg(feature = "std")]
+pub use crate::enter::{enter, Enter, EnterError};
diff --git a/third_party/rust/futures-executor/src/local_pool.rs b/third_party/rust/futures-executor/src/local_pool.rs
new file mode 100644
index 0000000000..8a9bc2fc90
--- /dev/null
+++ b/third_party/rust/futures-executor/src/local_pool.rs
@@ -0,0 +1,402 @@
+use crate::enter;
+use futures_core::future::Future;
+use futures_core::stream::Stream;
+use futures_core::task::{Context, Poll};
+use futures_task::{waker_ref, ArcWake};
+use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
+use futures_util::pin_mut;
+use futures_util::stream::FuturesUnordered;
+use futures_util::stream::StreamExt;
+use std::cell::RefCell;
+use std::ops::{Deref, DerefMut};
+use std::rc::{Rc, Weak};
+use std::sync::{
+ atomic::{AtomicBool, Ordering},
+ Arc,
+};
+use std::thread::{self, Thread};
+
+/// A single-threaded task pool for polling futures to completion.
+///
+/// This executor allows you to multiplex any number of tasks onto a single
+/// thread. It's appropriate to poll strictly I/O-bound futures that do very
+/// little work in between I/O actions.
+///
+/// To get a handle to the pool that implements
+/// [`Spawn`](futures_task::Spawn), use the
+/// [`spawner()`](LocalPool::spawner) method. Because the executor is
+/// single-threaded, it supports a special form of task spawning for non-`Send`
+/// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
+#[derive(Debug)]
+pub struct LocalPool {
+ pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
+ incoming: Rc<Incoming>,
+}
+
+/// A handle to a [`LocalPool`](LocalPool) that implements
+/// [`Spawn`](futures_task::Spawn).
+#[derive(Clone, Debug)]
+pub struct LocalSpawner {
+ incoming: Weak<Incoming>,
+}
+
+type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
+
+pub(crate) struct ThreadNotify {
+ /// The (single) executor thread.
+ thread: Thread,
+ /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
+ /// before the next `park()`, which may otherwise happen if the code
+ /// being executed as part of the future(s) being polled makes use of
+ /// park / unpark calls of its own, i.e. we cannot assume that no other
+ /// code uses park / unpark on the executing `thread`.
+ unparked: AtomicBool,
+}
+
+thread_local! {
+ static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
+ thread: thread::current(),
+ unparked: AtomicBool::new(false),
+ });
+}
+
+impl ArcWake for ThreadNotify {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ // Make sure the wakeup is remembered until the next `park()`.
+ let unparked = arc_self.unparked.swap(true, Ordering::Release);
+ if !unparked {
+ // If the thread has not been unparked yet, it must be done
+ // now. If it was actually parked, it will run again,
+ // otherwise the token made available by `unpark`
+ // may be consumed before reaching `park()`, but `unparked`
+ // ensures it is not forgotten.
+ arc_self.thread.unpark();
+ }
+ }
+}
+
+// Set up and run a basic single-threaded spawner loop, invoking `f` on each
+// turn.
+fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
+ let _enter = enter().expect(
+ "cannot execute `LocalPool` executor from within \
+ another executor",
+ );
+
+ CURRENT_THREAD_NOTIFY.with(|thread_notify| {
+ let waker = waker_ref(thread_notify);
+ let mut cx = Context::from_waker(&waker);
+ loop {
+ if let Poll::Ready(t) = f(&mut cx) {
+ return t;
+ }
+
+ // Wait for a wakeup.
+ while !thread_notify.unparked.swap(false, Ordering::Acquire) {
+ // No wakeup occurred. It may occur now, right before parking,
+ // but in that case the token made available by `unpark()`
+ // is guaranteed to still be available and `park()` is a no-op.
+ thread::park();
+ }
+ }
+ })
+}
+
+/// Check for a wakeup, but don't consume it.
+fn woken() -> bool {
+ CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire))
+}
+
+impl LocalPool {
+ /// Create a new, empty pool of tasks.
+ pub fn new() -> Self {
+ Self { pool: FuturesUnordered::new(), incoming: Default::default() }
+ }
+
+ /// Get a clonable handle to the pool as a [`Spawn`].
+ pub fn spawner(&self) -> LocalSpawner {
+ LocalSpawner { incoming: Rc::downgrade(&self.incoming) }
+ }
+
+ /// Run all tasks in the pool to completion.
+ ///
+ /// ```
+ /// use futures::executor::LocalPool;
+ ///
+ /// let mut pool = LocalPool::new();
+ ///
+ /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
+ ///
+ /// // run *all* tasks in the pool to completion, including any newly-spawned ones.
+ /// pool.run();
+ /// ```
+ ///
+ /// The function will block the calling thread until *all* tasks in the pool
+ /// are complete, including any spawned while running existing tasks.
+ pub fn run(&mut self) {
+ run_executor(|cx| self.poll_pool(cx))
+ }
+
+ /// Runs all the tasks in the pool until the given future completes.
+ ///
+ /// ```
+ /// use futures::executor::LocalPool;
+ ///
+ /// let mut pool = LocalPool::new();
+ /// # let my_app = async {};
+ ///
+ /// // run tasks in the pool until `my_app` completes
+ /// pool.run_until(my_app);
+ /// ```
+ ///
+ /// The function will block the calling thread *only* until the future `f`
+ /// completes; there may still be incomplete tasks in the pool, which will
+ /// be inert after the call completes, but can continue with further use of
+ /// one of the pool's run or poll methods. While the function is running,
+ /// however, all tasks in the pool will try to make progress.
+ pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
+ pin_mut!(future);
+
+ run_executor(|cx| {
+ {
+ // if our main task is done, so are we
+ let result = future.as_mut().poll(cx);
+ if let Poll::Ready(output) = result {
+ return Poll::Ready(output);
+ }
+ }
+
+ let _ = self.poll_pool(cx);
+ Poll::Pending
+ })
+ }
+
+ /// Runs all tasks and returns after completing one future or until no more progress
+ /// can be made. Returns `true` if one future was completed, `false` otherwise.
+ ///
+ /// ```
+ /// use futures::executor::LocalPool;
+ /// use futures::task::LocalSpawnExt;
+ /// use futures::future::{ready, pending};
+ ///
+ /// let mut pool = LocalPool::new();
+ /// let spawner = pool.spawner();
+ ///
+ /// spawner.spawn_local(ready(())).unwrap();
+ /// spawner.spawn_local(ready(())).unwrap();
+ /// spawner.spawn_local(pending()).unwrap();
+ ///
+ /// // Run the two ready tasks and return true for them.
+ /// pool.try_run_one(); // returns true after completing one of the ready futures
+ /// pool.try_run_one(); // returns true after completing the other ready future
+ ///
+ /// // the remaining task can not be completed
+ /// assert!(!pool.try_run_one()); // returns false
+ /// ```
+ ///
+ /// This function will not block the calling thread and will return the moment
+ /// that there are no tasks left for which progress can be made or after exactly one
+ /// task was completed; Remaining incomplete tasks in the pool can continue with
+ /// further use of one of the pool's run or poll methods.
+ /// Though only one task will be completed, progress may be made on multiple tasks.
+ pub fn try_run_one(&mut self) -> bool {
+ run_executor(|cx| {
+ loop {
+ self.drain_incoming();
+
+ match self.pool.poll_next_unpin(cx) {
+ // Success!
+ Poll::Ready(Some(())) => return Poll::Ready(true),
+ // The pool was empty.
+ Poll::Ready(None) => return Poll::Ready(false),
+ Poll::Pending => (),
+ }
+
+ if !self.incoming.borrow().is_empty() {
+ // New tasks were spawned; try again.
+ continue;
+ } else if woken() {
+ // The pool yielded to us, but there's more progress to be made.
+ return Poll::Pending;
+ } else {
+ return Poll::Ready(false);
+ }
+ }
+ })
+ }
+
+ /// Runs all tasks in the pool and returns if no more progress can be made
+ /// on any task.
+ ///
+ /// ```
+ /// use futures::executor::LocalPool;
+ /// use futures::task::LocalSpawnExt;
+ /// use futures::future::{ready, pending};
+ ///
+ /// let mut pool = LocalPool::new();
+ /// let spawner = pool.spawner();
+ ///
+ /// spawner.spawn_local(ready(())).unwrap();
+ /// spawner.spawn_local(ready(())).unwrap();
+ /// spawner.spawn_local(pending()).unwrap();
+ ///
+ /// // Runs the two ready task and returns.
+ /// // The empty task remains in the pool.
+ /// pool.run_until_stalled();
+ /// ```
+ ///
+ /// This function will not block the calling thread and will return the moment
+ /// that there are no tasks left for which progress can be made;
+ /// remaining incomplete tasks in the pool can continue with further use of one
+ /// of the pool's run or poll methods. While the function is running, all tasks
+ /// in the pool will try to make progress.
+ pub fn run_until_stalled(&mut self) {
+ run_executor(|cx| match self.poll_pool(cx) {
+ // The pool is empty.
+ Poll::Ready(()) => Poll::Ready(()),
+ Poll::Pending => {
+ if woken() {
+ Poll::Pending
+ } else {
+ // We're stalled for now.
+ Poll::Ready(())
+ }
+ }
+ });
+ }
+
+ /// Poll `self.pool`, re-filling it with any newly-spawned tasks.
+ /// Repeat until either the pool is empty, or it returns `Pending`.
+ ///
+ /// Returns `Ready` if the pool was empty, and `Pending` otherwise.
+ ///
+ /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily
+ /// mean that the pool can't make progress.
+ fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
+ loop {
+ self.drain_incoming();
+
+ let pool_ret = self.pool.poll_next_unpin(cx);
+
+ // We queued up some new tasks; add them and poll again.
+ if !self.incoming.borrow().is_empty() {
+ continue;
+ }
+
+ match pool_ret {
+ Poll::Ready(Some(())) => continue,
+ Poll::Ready(None) => return Poll::Ready(()),
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+ }
+
+ /// Empty the incoming queue of newly-spawned tasks.
+ fn drain_incoming(&mut self) {
+ let mut incoming = self.incoming.borrow_mut();
+ for task in incoming.drain(..) {
+ self.pool.push(task)
+ }
+ }
+}
+
+impl Default for LocalPool {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Run a future to completion on the current thread.
+///
+/// This function will block the caller until the given future has completed.
+///
+/// Use a [`LocalPool`](LocalPool) if you need finer-grained control over
+/// spawned tasks.
+pub fn block_on<F: Future>(f: F) -> F::Output {
+ pin_mut!(f);
+ run_executor(|cx| f.as_mut().poll(cx))
+}
+
+/// Turn a stream into a blocking iterator.
+///
+/// When `next` is called on the resulting `BlockingStream`, the caller
+/// will be blocked until the next element of the `Stream` becomes available.
+pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> {
+ BlockingStream { stream }
+}
+
+/// An iterator which blocks on values from a stream until they become available.
+#[derive(Debug)]
+pub struct BlockingStream<S: Stream + Unpin> {
+ stream: S,
+}
+
+impl<S: Stream + Unpin> Deref for BlockingStream<S> {
+ type Target = S;
+ fn deref(&self) -> &Self::Target {
+ &self.stream
+ }
+}
+
+impl<S: Stream + Unpin> DerefMut for BlockingStream<S> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.stream
+ }
+}
+
+impl<S: Stream + Unpin> BlockingStream<S> {
+ /// Convert this `BlockingStream` into the inner `Stream` type.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
+ type Item = S::Item;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ LocalPool::new().run_until(self.stream.next())
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.stream.size_hint()
+ }
+}
+
+impl Spawn for LocalSpawner {
+ fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
+ if let Some(incoming) = self.incoming.upgrade() {
+ incoming.borrow_mut().push(future.into());
+ Ok(())
+ } else {
+ Err(SpawnError::shutdown())
+ }
+ }
+
+ fn status(&self) -> Result<(), SpawnError> {
+ if self.incoming.upgrade().is_some() {
+ Ok(())
+ } else {
+ Err(SpawnError::shutdown())
+ }
+ }
+}
+
+impl LocalSpawn for LocalSpawner {
+ fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
+ if let Some(incoming) = self.incoming.upgrade() {
+ incoming.borrow_mut().push(future);
+ Ok(())
+ } else {
+ Err(SpawnError::shutdown())
+ }
+ }
+
+ fn status_local(&self) -> Result<(), SpawnError> {
+ if self.incoming.upgrade().is_some() {
+ Ok(())
+ } else {
+ Err(SpawnError::shutdown())
+ }
+ }
+}
diff --git a/third_party/rust/futures-executor/src/thread_pool.rs b/third_party/rust/futures-executor/src/thread_pool.rs
new file mode 100644
index 0000000000..5371008953
--- /dev/null
+++ b/third_party/rust/futures-executor/src/thread_pool.rs
@@ -0,0 +1,380 @@
+use crate::enter;
+use crate::unpark_mutex::UnparkMutex;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_task::{waker_ref, ArcWake};
+use futures_task::{FutureObj, Spawn, SpawnError};
+use futures_util::future::FutureExt;
+use std::cmp;
+use std::fmt;
+use std::io;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::mpsc;
+use std::sync::{Arc, Mutex};
+use std::thread;
+
+/// A general-purpose thread pool for scheduling tasks that poll futures to
+/// completion.
+///
+/// The thread pool multiplexes any number of tasks onto a fixed number of
+/// worker threads.
+///
+/// This type is a clonable handle to the threadpool itself.
+/// Cloning it will only create a new reference, not a new threadpool.
+///
+/// This type is only available when the `thread-pool` feature of this
+/// library is activated.
+#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
+pub struct ThreadPool {
+ state: Arc<PoolState>,
+}
+
+/// Thread pool configuration object.
+///
+/// This type is only available when the `thread-pool` feature of this
+/// library is activated.
+#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
+pub struct ThreadPoolBuilder {
+ pool_size: usize,
+ stack_size: usize,
+ name_prefix: Option<String>,
+ after_start: Option<Arc<dyn Fn(usize) + Send + Sync>>,
+ before_stop: Option<Arc<dyn Fn(usize) + Send + Sync>>,
+}
+
+trait AssertSendSync: Send + Sync {}
+impl AssertSendSync for ThreadPool {}
+
+struct PoolState {
+ tx: Mutex<mpsc::Sender<Message>>,
+ rx: Mutex<mpsc::Receiver<Message>>,
+ cnt: AtomicUsize,
+ size: usize,
+}
+
+impl fmt::Debug for ThreadPool {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ThreadPool").field("size", &self.state.size).finish()
+ }
+}
+
+impl fmt::Debug for ThreadPoolBuilder {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ThreadPoolBuilder")
+ .field("pool_size", &self.pool_size)
+ .field("name_prefix", &self.name_prefix)
+ .finish()
+ }
+}
+
+enum Message {
+ Run(Task),
+ Close,
+}
+
+impl ThreadPool {
+ /// Creates a new thread pool with the default configuration.
+ ///
+ /// See documentation for the methods in
+ /// [`ThreadPoolBuilder`](ThreadPoolBuilder) for details on the default
+ /// configuration.
+ pub fn new() -> Result<Self, io::Error> {
+ ThreadPoolBuilder::new().create()
+ }
+
+ /// Create a default thread pool configuration, which can then be customized.
+ ///
+ /// See documentation for the methods in
+ /// [`ThreadPoolBuilder`](ThreadPoolBuilder) for details on the default
+ /// configuration.
+ pub fn builder() -> ThreadPoolBuilder {
+ ThreadPoolBuilder::new()
+ }
+
+ /// Spawns a future that will be run to completion.
+ ///
+ /// > **Note**: This method is similar to `Spawn::spawn_obj`, except that
+ /// > it is guaranteed to always succeed.
+ pub fn spawn_obj_ok(&self, future: FutureObj<'static, ()>) {
+ let task = Task {
+ future,
+ wake_handle: Arc::new(WakeHandle { exec: self.clone(), mutex: UnparkMutex::new() }),
+ exec: self.clone(),
+ };
+ self.state.send(Message::Run(task));
+ }
+
+ /// Spawns a task that polls the given future with output `()` to
+ /// completion.
+ ///
+ /// ```
+ /// # {
+ /// use futures::executor::ThreadPool;
+ ///
+ /// let pool = ThreadPool::new().unwrap();
+ ///
+ /// let future = async { /* ... */ };
+ /// pool.spawn_ok(future);
+ /// # }
+ /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
+ /// ```
+ ///
+ /// > **Note**: This method is similar to `SpawnExt::spawn`, except that
+ /// > it is guaranteed to always succeed.
+ pub fn spawn_ok<Fut>(&self, future: Fut)
+ where
+ Fut: Future<Output = ()> + Send + 'static,
+ {
+ self.spawn_obj_ok(FutureObj::new(Box::new(future)))
+ }
+}
+
+impl Spawn for ThreadPool {
+ fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
+ self.spawn_obj_ok(future);
+ Ok(())
+ }
+}
+
+impl PoolState {
+ fn send(&self, msg: Message) {
+ self.tx.lock().unwrap().send(msg).unwrap();
+ }
+
+ fn work(
+ &self,
+ idx: usize,
+ after_start: Option<Arc<dyn Fn(usize) + Send + Sync>>,
+ before_stop: Option<Arc<dyn Fn(usize) + Send + Sync>>,
+ ) {
+ let _scope = enter().unwrap();
+ if let Some(after_start) = after_start {
+ after_start(idx);
+ }
+ loop {
+ let msg = self.rx.lock().unwrap().recv().unwrap();
+ match msg {
+ Message::Run(task) => task.run(),
+ Message::Close => break,
+ }
+ }
+ if let Some(before_stop) = before_stop {
+ before_stop(idx);
+ }
+ }
+}
+
+impl Clone for ThreadPool {
+ fn clone(&self) -> Self {
+ self.state.cnt.fetch_add(1, Ordering::Relaxed);
+ Self { state: self.state.clone() }
+ }
+}
+
+impl Drop for ThreadPool {
+ fn drop(&mut self) {
+ if self.state.cnt.fetch_sub(1, Ordering::Relaxed) == 1 {
+ for _ in 0..self.state.size {
+ self.state.send(Message::Close);
+ }
+ }
+ }
+}
+
+impl ThreadPoolBuilder {
+ /// Create a default thread pool configuration.
+ ///
+ /// See the other methods on this type for details on the defaults.
+ pub fn new() -> Self {
+ Self {
+ pool_size: cmp::max(1, num_cpus::get()),
+ stack_size: 0,
+ name_prefix: None,
+ after_start: None,
+ before_stop: None,
+ }
+ }
+
+ /// Set size of a future ThreadPool
+ ///
+ /// The size of a thread pool is the number of worker threads spawned. By
+ /// default, this is equal to the number of CPU cores.
+ ///
+ /// # Panics
+ ///
+ /// Panics if `pool_size == 0`.
+ pub fn pool_size(&mut self, size: usize) -> &mut Self {
+ assert!(size > 0);
+ self.pool_size = size;
+ self
+ }
+
+ /// Set stack size of threads in the pool, in bytes.
+ ///
+ /// By default, worker threads use Rust's standard stack size.
+ pub fn stack_size(&mut self, stack_size: usize) -> &mut Self {
+ self.stack_size = stack_size;
+ self
+ }
+
+ /// Set thread name prefix of a future ThreadPool.
+ ///
+ /// Thread name prefix is used for generating thread names. For example, if prefix is
+ /// `my-pool-`, then threads in the pool will get names like `my-pool-1` etc.
+ ///
+ /// By default, worker threads are assigned Rust's standard thread name.
+ pub fn name_prefix<S: Into<String>>(&mut self, name_prefix: S) -> &mut Self {
+ self.name_prefix = Some(name_prefix.into());
+ self
+ }
+
+ /// Execute the closure `f` immediately after each worker thread is started,
+ /// but before running any tasks on it.
+ ///
+ /// This hook is intended for bookkeeping and monitoring.
+ /// The closure `f` will be dropped after the `builder` is dropped
+ /// and all worker threads in the pool have executed it.
+ ///
+ /// The closure provided will receive an index corresponding to the worker
+ /// thread it's running on.
+ pub fn after_start<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn(usize) + Send + Sync + 'static,
+ {
+ self.after_start = Some(Arc::new(f));
+ self
+ }
+
+ /// Execute closure `f` just prior to shutting down each worker thread.
+ ///
+ /// This hook is intended for bookkeeping and monitoring.
+ /// The closure `f` will be dropped after the `builder` is dropped
+ /// and all threads in the pool have executed it.
+ ///
+ /// The closure provided will receive an index corresponding to the worker
+ /// thread it's running on.
+ pub fn before_stop<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn(usize) + Send + Sync + 'static,
+ {
+ self.before_stop = Some(Arc::new(f));
+ self
+ }
+
+ /// Create a [`ThreadPool`](ThreadPool) with the given configuration.
+ pub fn create(&mut self) -> Result<ThreadPool, io::Error> {
+ let (tx, rx) = mpsc::channel();
+ let pool = ThreadPool {
+ state: Arc::new(PoolState {
+ tx: Mutex::new(tx),
+ rx: Mutex::new(rx),
+ cnt: AtomicUsize::new(1),
+ size: self.pool_size,
+ }),
+ };
+
+ for counter in 0..self.pool_size {
+ let state = pool.state.clone();
+ let after_start = self.after_start.clone();
+ let before_stop = self.before_stop.clone();
+ let mut thread_builder = thread::Builder::new();
+ if let Some(ref name_prefix) = self.name_prefix {
+ thread_builder = thread_builder.name(format!("{}{}", name_prefix, counter));
+ }
+ if self.stack_size > 0 {
+ thread_builder = thread_builder.stack_size(self.stack_size);
+ }
+ thread_builder.spawn(move || state.work(counter, after_start, before_stop))?;
+ }
+ Ok(pool)
+ }
+}
+
+impl Default for ThreadPoolBuilder {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// A task responsible for polling a future to completion.
+struct Task {
+ future: FutureObj<'static, ()>,
+ exec: ThreadPool,
+ wake_handle: Arc<WakeHandle>,
+}
+
+struct WakeHandle {
+ mutex: UnparkMutex<Task>,
+ exec: ThreadPool,
+}
+
+impl Task {
+ /// Actually run the task (invoking `poll` on the future) on the current
+ /// thread.
+ fn run(self) {
+ let Self { mut future, wake_handle, mut exec } = self;
+ let waker = waker_ref(&wake_handle);
+ let mut cx = Context::from_waker(&waker);
+
+ // Safety: The ownership of this `Task` object is evidence that
+ // we are in the `POLLING`/`REPOLL` state for the mutex.
+ unsafe {
+ wake_handle.mutex.start_poll();
+
+ loop {
+ let res = future.poll_unpin(&mut cx);
+ match res {
+ Poll::Pending => {}
+ Poll::Ready(()) => return wake_handle.mutex.complete(),
+ }
+ let task = Self { future, wake_handle: wake_handle.clone(), exec };
+ match wake_handle.mutex.wait(task) {
+ Ok(()) => return, // we've waited
+ Err(task) => {
+ // someone's notified us
+ future = task.future;
+ exec = task.exec;
+ }
+ }
+ }
+ }
+ }
+}
+
+impl fmt::Debug for Task {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Task").field("contents", &"...").finish()
+ }
+}
+
+impl ArcWake for WakeHandle {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ if let Ok(task) = arc_self.mutex.notify() {
+ arc_self.exec.state.send(Message::Run(task))
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::sync::mpsc;
+
+ #[test]
+ fn test_drop_after_start() {
+ {
+ let (tx, rx) = mpsc::sync_channel(2);
+ let _cpu_pool = ThreadPoolBuilder::new()
+ .pool_size(2)
+ .after_start(move |_| tx.send(1).unwrap())
+ .create()
+ .unwrap();
+
+ // After ThreadPoolBuilder is deconstructed, the tx should be dropped
+ // so that we can use rx as an iterator.
+ let count = rx.into_iter().count();
+ assert_eq!(count, 2);
+ }
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
+ }
+}
diff --git a/third_party/rust/futures-executor/src/unpark_mutex.rs b/third_party/rust/futures-executor/src/unpark_mutex.rs
new file mode 100644
index 0000000000..ac5112cfa2
--- /dev/null
+++ b/third_party/rust/futures-executor/src/unpark_mutex.rs
@@ -0,0 +1,137 @@
+use std::cell::UnsafeCell;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+
+/// A "lock" around data `D`, which employs a *helping* strategy.
+///
+/// Used to ensure that concurrent `unpark` invocations lead to (1) `poll` being
+/// invoked on only a single thread at a time (2) `poll` being invoked at least
+/// once after each `unpark` (unless the future has completed).
+pub(crate) struct UnparkMutex<D> {
+ // The state of task execution (state machine described below)
+ status: AtomicUsize,
+
+ // The actual task data, accessible only in the POLLING state
+ inner: UnsafeCell<Option<D>>,
+}
+
+// `UnparkMutex<D>` functions in many ways like a `Mutex<D>`, except that on
+// acquisition failure, the current lock holder performs the desired work --
+// re-polling.
+//
+// As such, these impls mirror those for `Mutex<D>`. In particular, a reference
+// to `UnparkMutex` can be used to gain `&mut` access to the inner data, which
+// must therefore be `Send`.
+unsafe impl<D: Send> Send for UnparkMutex<D> {}
+unsafe impl<D: Send> Sync for UnparkMutex<D> {}
+
+// There are four possible task states, listed below with their possible
+// transitions:
+
+// The task is blocked, waiting on an event
+const WAITING: usize = 0; // --> POLLING
+
+// The task is actively being polled by a thread; arrival of additional events
+// of interest should move it to the REPOLL state
+const POLLING: usize = 1; // --> WAITING, REPOLL, or COMPLETE
+
+// The task is actively being polled, but will need to be re-polled upon
+// completion to ensure that all events were observed.
+const REPOLL: usize = 2; // --> POLLING
+
+// The task has finished executing (either successfully or with an error/panic)
+const COMPLETE: usize = 3; // No transitions out
+
+impl<D> UnparkMutex<D> {
+ pub(crate) fn new() -> Self {
+ Self { status: AtomicUsize::new(WAITING), inner: UnsafeCell::new(None) }
+ }
+
+ /// Attempt to "notify" the mutex that a poll should occur.
+ ///
+ /// An `Ok` result indicates that the `POLLING` state has been entered, and
+ /// the caller can proceed to poll the future. An `Err` result indicates
+ /// that polling is not necessary (because the task is finished or the
+ /// polling has been delegated).
+ pub(crate) fn notify(&self) -> Result<D, ()> {
+ let mut status = self.status.load(SeqCst);
+ loop {
+ match status {
+ // The task is idle, so try to run it immediately.
+ WAITING => {
+ match self.status.compare_exchange(WAITING, POLLING, SeqCst, SeqCst) {
+ Ok(_) => {
+ let data = unsafe {
+ // SAFETY: we've ensured mutual exclusion via
+ // the status protocol; we are the only thread
+ // that has transitioned to the POLLING state,
+ // and we won't transition back to QUEUED until
+ // the lock is "released" by this thread. See
+ // the protocol diagram above.
+ (*self.inner.get()).take().unwrap()
+ };
+ return Ok(data);
+ }
+ Err(cur) => status = cur,
+ }
+ }
+
+ // The task is being polled, so we need to record that it should
+ // be *repolled* when complete.
+ POLLING => match self.status.compare_exchange(POLLING, REPOLL, SeqCst, SeqCst) {
+ Ok(_) => return Err(()),
+ Err(cur) => status = cur,
+ },
+
+ // The task is already scheduled for polling, or is complete, so
+ // we've got nothing to do.
+ _ => return Err(()),
+ }
+ }
+ }
+
+ /// Alert the mutex that polling is about to begin, clearing any accumulated
+ /// re-poll requests.
+ ///
+ /// # Safety
+ ///
+ /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
+ /// successful calls to `notify` and `wait`/`complete`.
+ pub(crate) unsafe fn start_poll(&self) {
+ self.status.store(POLLING, SeqCst);
+ }
+
+ /// Alert the mutex that polling completed with `Pending`.
+ ///
+ /// # Safety
+ ///
+ /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
+ /// successful calls to `notify` and `wait`/`complete`.
+ pub(crate) unsafe fn wait(&self, data: D) -> Result<(), D> {
+ *self.inner.get() = Some(data);
+
+ match self.status.compare_exchange(POLLING, WAITING, SeqCst, SeqCst) {
+ // no unparks came in while we were running
+ Ok(_) => Ok(()),
+
+ // guaranteed to be in REPOLL state; just clobber the
+ // state and run again.
+ Err(status) => {
+ assert_eq!(status, REPOLL);
+ self.status.store(POLLING, SeqCst);
+ Err((*self.inner.get()).take().unwrap())
+ }
+ }
+ }
+
+ /// Alert the mutex that the task has completed execution and should not be
+ /// notified again.
+ ///
+ /// # Safety
+ ///
+ /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
+ /// successful calls to `notify` and `wait`/`complete`.
+ pub(crate) unsafe fn complete(&self) {
+ self.status.store(COMPLETE, SeqCst);
+ }
+}