summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-executor/src/local_pool.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-executor/src/local_pool.rs')
-rw-r--r--third_party/rust/futures-executor/src/local_pool.rs402
1 files changed, 402 insertions, 0 deletions
diff --git a/third_party/rust/futures-executor/src/local_pool.rs b/third_party/rust/futures-executor/src/local_pool.rs
new file mode 100644
index 0000000000..8a9bc2fc90
--- /dev/null
+++ b/third_party/rust/futures-executor/src/local_pool.rs
@@ -0,0 +1,402 @@
+use crate::enter;
+use futures_core::future::Future;
+use futures_core::stream::Stream;
+use futures_core::task::{Context, Poll};
+use futures_task::{waker_ref, ArcWake};
+use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
+use futures_util::pin_mut;
+use futures_util::stream::FuturesUnordered;
+use futures_util::stream::StreamExt;
+use std::cell::RefCell;
+use std::ops::{Deref, DerefMut};
+use std::rc::{Rc, Weak};
+use std::sync::{
+ atomic::{AtomicBool, Ordering},
+ Arc,
+};
+use std::thread::{self, Thread};
+
+/// A single-threaded task pool for polling futures to completion.
+///
+/// This executor allows you to multiplex any number of tasks onto a single
+/// thread. It's appropriate to poll strictly I/O-bound futures that do very
+/// little work in between I/O actions.
+///
+/// To get a handle to the pool that implements
+/// [`Spawn`](futures_task::Spawn), use the
+/// [`spawner()`](LocalPool::spawner) method. Because the executor is
+/// single-threaded, it supports a special form of task spawning for non-`Send`
+/// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
+#[derive(Debug)]
+pub struct LocalPool {
+ pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
+ incoming: Rc<Incoming>,
+}
+
+/// A handle to a [`LocalPool`](LocalPool) that implements
+/// [`Spawn`](futures_task::Spawn).
+#[derive(Clone, Debug)]
+pub struct LocalSpawner {
+ incoming: Weak<Incoming>,
+}
+
+type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
+
+pub(crate) struct ThreadNotify {
+ /// The (single) executor thread.
+ thread: Thread,
+ /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
+ /// before the next `park()`, which may otherwise happen if the code
+ /// being executed as part of the future(s) being polled makes use of
+ /// park / unpark calls of its own, i.e. we cannot assume that no other
+ /// code uses park / unpark on the executing `thread`.
+ unparked: AtomicBool,
+}
+
+thread_local! {
+ static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
+ thread: thread::current(),
+ unparked: AtomicBool::new(false),
+ });
+}
+
+impl ArcWake for ThreadNotify {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ // Make sure the wakeup is remembered until the next `park()`.
+ let unparked = arc_self.unparked.swap(true, Ordering::Release);
+ if !unparked {
+ // If the thread has not been unparked yet, it must be done
+ // now. If it was actually parked, it will run again,
+ // otherwise the token made available by `unpark`
+ // may be consumed before reaching `park()`, but `unparked`
+ // ensures it is not forgotten.
+ arc_self.thread.unpark();
+ }
+ }
+}
+
+// Set up and run a basic single-threaded spawner loop, invoking `f` on each
+// turn.
+fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
+ let _enter = enter().expect(
+ "cannot execute `LocalPool` executor from within \
+ another executor",
+ );
+
+ CURRENT_THREAD_NOTIFY.with(|thread_notify| {
+ let waker = waker_ref(thread_notify);
+ let mut cx = Context::from_waker(&waker);
+ loop {
+ if let Poll::Ready(t) = f(&mut cx) {
+ return t;
+ }
+
+ // Wait for a wakeup.
+ while !thread_notify.unparked.swap(false, Ordering::Acquire) {
+ // No wakeup occurred. It may occur now, right before parking,
+ // but in that case the token made available by `unpark()`
+ // is guaranteed to still be available and `park()` is a no-op.
+ thread::park();
+ }
+ }
+ })
+}
+
+/// Check for a wakeup, but don't consume it.
+fn woken() -> bool {
+ CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire))
+}
+
+impl LocalPool {
+ /// Create a new, empty pool of tasks.
+ pub fn new() -> Self {
+ Self { pool: FuturesUnordered::new(), incoming: Default::default() }
+ }
+
+ /// Get a clonable handle to the pool as a [`Spawn`].
+ pub fn spawner(&self) -> LocalSpawner {
+ LocalSpawner { incoming: Rc::downgrade(&self.incoming) }
+ }
+
+ /// Run all tasks in the pool to completion.
+ ///
+ /// ```
+ /// use futures::executor::LocalPool;
+ ///
+ /// let mut pool = LocalPool::new();
+ ///
+ /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
+ ///
+ /// // run *all* tasks in the pool to completion, including any newly-spawned ones.
+ /// pool.run();
+ /// ```
+ ///
+ /// The function will block the calling thread until *all* tasks in the pool
+ /// are complete, including any spawned while running existing tasks.
+ pub fn run(&mut self) {
+ run_executor(|cx| self.poll_pool(cx))
+ }
+
+ /// Runs all the tasks in the pool until the given future completes.
+ ///
+ /// ```
+ /// use futures::executor::LocalPool;
+ ///
+ /// let mut pool = LocalPool::new();
+ /// # let my_app = async {};
+ ///
+ /// // run tasks in the pool until `my_app` completes
+ /// pool.run_until(my_app);
+ /// ```
+ ///
+ /// The function will block the calling thread *only* until the future `f`
+ /// completes; there may still be incomplete tasks in the pool, which will
+ /// be inert after the call completes, but can continue with further use of
+ /// one of the pool's run or poll methods. While the function is running,
+ /// however, all tasks in the pool will try to make progress.
+ pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
+ pin_mut!(future);
+
+ run_executor(|cx| {
+ {
+ // if our main task is done, so are we
+ let result = future.as_mut().poll(cx);
+ if let Poll::Ready(output) = result {
+ return Poll::Ready(output);
+ }
+ }
+
+ let _ = self.poll_pool(cx);
+ Poll::Pending
+ })
+ }
+
+ /// Runs all tasks and returns after completing one future or until no more progress
+ /// can be made. Returns `true` if one future was completed, `false` otherwise.
+ ///
+ /// ```
+ /// use futures::executor::LocalPool;
+ /// use futures::task::LocalSpawnExt;
+ /// use futures::future::{ready, pending};
+ ///
+ /// let mut pool = LocalPool::new();
+ /// let spawner = pool.spawner();
+ ///
+ /// spawner.spawn_local(ready(())).unwrap();
+ /// spawner.spawn_local(ready(())).unwrap();
+ /// spawner.spawn_local(pending()).unwrap();
+ ///
+ /// // Run the two ready tasks and return true for them.
+ /// pool.try_run_one(); // returns true after completing one of the ready futures
+ /// pool.try_run_one(); // returns true after completing the other ready future
+ ///
+ /// // the remaining task can not be completed
+ /// assert!(!pool.try_run_one()); // returns false
+ /// ```
+ ///
+ /// This function will not block the calling thread and will return the moment
+ /// that there are no tasks left for which progress can be made or after exactly one
+ /// task was completed; Remaining incomplete tasks in the pool can continue with
+ /// further use of one of the pool's run or poll methods.
+ /// Though only one task will be completed, progress may be made on multiple tasks.
+ pub fn try_run_one(&mut self) -> bool {
+ run_executor(|cx| {
+ loop {
+ self.drain_incoming();
+
+ match self.pool.poll_next_unpin(cx) {
+ // Success!
+ Poll::Ready(Some(())) => return Poll::Ready(true),
+ // The pool was empty.
+ Poll::Ready(None) => return Poll::Ready(false),
+ Poll::Pending => (),
+ }
+
+ if !self.incoming.borrow().is_empty() {
+ // New tasks were spawned; try again.
+ continue;
+ } else if woken() {
+ // The pool yielded to us, but there's more progress to be made.
+ return Poll::Pending;
+ } else {
+ return Poll::Ready(false);
+ }
+ }
+ })
+ }
+
+ /// Runs all tasks in the pool and returns if no more progress can be made
+ /// on any task.
+ ///
+ /// ```
+ /// use futures::executor::LocalPool;
+ /// use futures::task::LocalSpawnExt;
+ /// use futures::future::{ready, pending};
+ ///
+ /// let mut pool = LocalPool::new();
+ /// let spawner = pool.spawner();
+ ///
+ /// spawner.spawn_local(ready(())).unwrap();
+ /// spawner.spawn_local(ready(())).unwrap();
+ /// spawner.spawn_local(pending()).unwrap();
+ ///
+ /// // Runs the two ready task and returns.
+ /// // The empty task remains in the pool.
+ /// pool.run_until_stalled();
+ /// ```
+ ///
+ /// This function will not block the calling thread and will return the moment
+ /// that there are no tasks left for which progress can be made;
+ /// remaining incomplete tasks in the pool can continue with further use of one
+ /// of the pool's run or poll methods. While the function is running, all tasks
+ /// in the pool will try to make progress.
+ pub fn run_until_stalled(&mut self) {
+ run_executor(|cx| match self.poll_pool(cx) {
+ // The pool is empty.
+ Poll::Ready(()) => Poll::Ready(()),
+ Poll::Pending => {
+ if woken() {
+ Poll::Pending
+ } else {
+ // We're stalled for now.
+ Poll::Ready(())
+ }
+ }
+ });
+ }
+
+ /// Poll `self.pool`, re-filling it with any newly-spawned tasks.
+ /// Repeat until either the pool is empty, or it returns `Pending`.
+ ///
+ /// Returns `Ready` if the pool was empty, and `Pending` otherwise.
+ ///
+ /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily
+ /// mean that the pool can't make progress.
+ fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
+ loop {
+ self.drain_incoming();
+
+ let pool_ret = self.pool.poll_next_unpin(cx);
+
+ // We queued up some new tasks; add them and poll again.
+ if !self.incoming.borrow().is_empty() {
+ continue;
+ }
+
+ match pool_ret {
+ Poll::Ready(Some(())) => continue,
+ Poll::Ready(None) => return Poll::Ready(()),
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+ }
+
+ /// Empty the incoming queue of newly-spawned tasks.
+ fn drain_incoming(&mut self) {
+ let mut incoming = self.incoming.borrow_mut();
+ for task in incoming.drain(..) {
+ self.pool.push(task)
+ }
+ }
+}
+
+impl Default for LocalPool {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Run a future to completion on the current thread.
+///
+/// This function will block the caller until the given future has completed.
+///
+/// Use a [`LocalPool`](LocalPool) if you need finer-grained control over
+/// spawned tasks.
+pub fn block_on<F: Future>(f: F) -> F::Output {
+ pin_mut!(f);
+ run_executor(|cx| f.as_mut().poll(cx))
+}
+
+/// Turn a stream into a blocking iterator.
+///
+/// When `next` is called on the resulting `BlockingStream`, the caller
+/// will be blocked until the next element of the `Stream` becomes available.
+pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> {
+ BlockingStream { stream }
+}
+
+/// An iterator which blocks on values from a stream until they become available.
+#[derive(Debug)]
+pub struct BlockingStream<S: Stream + Unpin> {
+ stream: S,
+}
+
+impl<S: Stream + Unpin> Deref for BlockingStream<S> {
+ type Target = S;
+ fn deref(&self) -> &Self::Target {
+ &self.stream
+ }
+}
+
+impl<S: Stream + Unpin> DerefMut for BlockingStream<S> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.stream
+ }
+}
+
+impl<S: Stream + Unpin> BlockingStream<S> {
+ /// Convert this `BlockingStream` into the inner `Stream` type.
+ pub fn into_inner(self) -> S {
+ self.stream
+ }
+}
+
+impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
+ type Item = S::Item;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ LocalPool::new().run_until(self.stream.next())
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.stream.size_hint()
+ }
+}
+
+impl Spawn for LocalSpawner {
+ fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
+ if let Some(incoming) = self.incoming.upgrade() {
+ incoming.borrow_mut().push(future.into());
+ Ok(())
+ } else {
+ Err(SpawnError::shutdown())
+ }
+ }
+
+ fn status(&self) -> Result<(), SpawnError> {
+ if self.incoming.upgrade().is_some() {
+ Ok(())
+ } else {
+ Err(SpawnError::shutdown())
+ }
+ }
+}
+
+impl LocalSpawn for LocalSpawner {
+ fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
+ if let Some(incoming) = self.incoming.upgrade() {
+ incoming.borrow_mut().push(future);
+ Ok(())
+ } else {
+ Err(SpawnError::shutdown())
+ }
+ }
+
+ fn status_local(&self) -> Result<(), SpawnError> {
+ if self.incoming.upgrade().is_some() {
+ Ok(())
+ } else {
+ Err(SpawnError::shutdown())
+ }
+ }
+}