402 lines
13 KiB
Rust
402 lines
13 KiB
Rust
use crate::enter;
|
|
use futures_core::future::Future;
|
|
use futures_core::stream::Stream;
|
|
use futures_core::task::{Context, Poll};
|
|
use futures_task::{waker_ref, ArcWake};
|
|
use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
|
|
use futures_util::pin_mut;
|
|
use futures_util::stream::FuturesUnordered;
|
|
use futures_util::stream::StreamExt;
|
|
use std::cell::RefCell;
|
|
use std::ops::{Deref, DerefMut};
|
|
use std::rc::{Rc, Weak};
|
|
use std::sync::{
|
|
atomic::{AtomicBool, Ordering},
|
|
Arc,
|
|
};
|
|
use std::thread::{self, Thread};
|
|
|
|
/// A single-threaded task pool for polling futures to completion.
|
|
///
|
|
/// This executor allows you to multiplex any number of tasks onto a single
|
|
/// thread. It's appropriate to poll strictly I/O-bound futures that do very
|
|
/// little work in between I/O actions.
|
|
///
|
|
/// To get a handle to the pool that implements
|
|
/// [`Spawn`](futures_task::Spawn), use the
|
|
/// [`spawner()`](LocalPool::spawner) method. Because the executor is
|
|
/// single-threaded, it supports a special form of task spawning for non-`Send`
|
|
/// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
|
|
#[derive(Debug)]
|
|
pub struct LocalPool {
|
|
pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
|
|
incoming: Rc<Incoming>,
|
|
}
|
|
|
|
/// A handle to a [`LocalPool`](LocalPool) that implements
|
|
/// [`Spawn`](futures_task::Spawn).
|
|
#[derive(Clone, Debug)]
|
|
pub struct LocalSpawner {
|
|
incoming: Weak<Incoming>,
|
|
}
|
|
|
|
type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
|
|
|
|
pub(crate) struct ThreadNotify {
|
|
/// The (single) executor thread.
|
|
thread: Thread,
|
|
/// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
|
|
/// before the next `park()`, which may otherwise happen if the code
|
|
/// being executed as part of the future(s) being polled makes use of
|
|
/// park / unpark calls of its own, i.e. we cannot assume that no other
|
|
/// code uses park / unpark on the executing `thread`.
|
|
unparked: AtomicBool,
|
|
}
|
|
|
|
thread_local! {
|
|
static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
|
|
thread: thread::current(),
|
|
unparked: AtomicBool::new(false),
|
|
});
|
|
}
|
|
|
|
impl ArcWake for ThreadNotify {
|
|
fn wake_by_ref(arc_self: &Arc<Self>) {
|
|
// Make sure the wakeup is remembered until the next `park()`.
|
|
let unparked = arc_self.unparked.swap(true, Ordering::Release);
|
|
if !unparked {
|
|
// If the thread has not been unparked yet, it must be done
|
|
// now. If it was actually parked, it will run again,
|
|
// otherwise the token made available by `unpark`
|
|
// may be consumed before reaching `park()`, but `unparked`
|
|
// ensures it is not forgotten.
|
|
arc_self.thread.unpark();
|
|
}
|
|
}
|
|
}
|
|
|
|
// Set up and run a basic single-threaded spawner loop, invoking `f` on each
|
|
// turn.
|
|
fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
|
|
let _enter = enter().expect(
|
|
"cannot execute `LocalPool` executor from within \
|
|
another executor",
|
|
);
|
|
|
|
CURRENT_THREAD_NOTIFY.with(|thread_notify| {
|
|
let waker = waker_ref(thread_notify);
|
|
let mut cx = Context::from_waker(&waker);
|
|
loop {
|
|
if let Poll::Ready(t) = f(&mut cx) {
|
|
return t;
|
|
}
|
|
|
|
// Wait for a wakeup.
|
|
while !thread_notify.unparked.swap(false, Ordering::Acquire) {
|
|
// No wakeup occurred. It may occur now, right before parking,
|
|
// but in that case the token made available by `unpark()`
|
|
// is guaranteed to still be available and `park()` is a no-op.
|
|
thread::park();
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
/// Check for a wakeup, but don't consume it.
|
|
fn woken() -> bool {
|
|
CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire))
|
|
}
|
|
|
|
impl LocalPool {
|
|
/// Create a new, empty pool of tasks.
|
|
pub fn new() -> Self {
|
|
Self { pool: FuturesUnordered::new(), incoming: Default::default() }
|
|
}
|
|
|
|
/// Get a clonable handle to the pool as a [`Spawn`].
|
|
pub fn spawner(&self) -> LocalSpawner {
|
|
LocalSpawner { incoming: Rc::downgrade(&self.incoming) }
|
|
}
|
|
|
|
/// Run all tasks in the pool to completion.
|
|
///
|
|
/// ```
|
|
/// use futures::executor::LocalPool;
|
|
///
|
|
/// let mut pool = LocalPool::new();
|
|
///
|
|
/// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
|
|
///
|
|
/// // run *all* tasks in the pool to completion, including any newly-spawned ones.
|
|
/// pool.run();
|
|
/// ```
|
|
///
|
|
/// The function will block the calling thread until *all* tasks in the pool
|
|
/// are complete, including any spawned while running existing tasks.
|
|
pub fn run(&mut self) {
|
|
run_executor(|cx| self.poll_pool(cx))
|
|
}
|
|
|
|
/// Runs all the tasks in the pool until the given future completes.
|
|
///
|
|
/// ```
|
|
/// use futures::executor::LocalPool;
|
|
///
|
|
/// let mut pool = LocalPool::new();
|
|
/// # let my_app = async {};
|
|
///
|
|
/// // run tasks in the pool until `my_app` completes
|
|
/// pool.run_until(my_app);
|
|
/// ```
|
|
///
|
|
/// The function will block the calling thread *only* until the future `f`
|
|
/// completes; there may still be incomplete tasks in the pool, which will
|
|
/// be inert after the call completes, but can continue with further use of
|
|
/// one of the pool's run or poll methods. While the function is running,
|
|
/// however, all tasks in the pool will try to make progress.
|
|
pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
|
|
pin_mut!(future);
|
|
|
|
run_executor(|cx| {
|
|
{
|
|
// if our main task is done, so are we
|
|
let result = future.as_mut().poll(cx);
|
|
if let Poll::Ready(output) = result {
|
|
return Poll::Ready(output);
|
|
}
|
|
}
|
|
|
|
let _ = self.poll_pool(cx);
|
|
Poll::Pending
|
|
})
|
|
}
|
|
|
|
/// Runs all tasks and returns after completing one future or until no more progress
|
|
/// can be made. Returns `true` if one future was completed, `false` otherwise.
|
|
///
|
|
/// ```
|
|
/// use futures::executor::LocalPool;
|
|
/// use futures::task::LocalSpawnExt;
|
|
/// use futures::future::{ready, pending};
|
|
///
|
|
/// let mut pool = LocalPool::new();
|
|
/// let spawner = pool.spawner();
|
|
///
|
|
/// spawner.spawn_local(ready(())).unwrap();
|
|
/// spawner.spawn_local(ready(())).unwrap();
|
|
/// spawner.spawn_local(pending()).unwrap();
|
|
///
|
|
/// // Run the two ready tasks and return true for them.
|
|
/// pool.try_run_one(); // returns true after completing one of the ready futures
|
|
/// pool.try_run_one(); // returns true after completing the other ready future
|
|
///
|
|
/// // the remaining task can not be completed
|
|
/// assert!(!pool.try_run_one()); // returns false
|
|
/// ```
|
|
///
|
|
/// This function will not block the calling thread and will return the moment
|
|
/// that there are no tasks left for which progress can be made or after exactly one
|
|
/// task was completed; Remaining incomplete tasks in the pool can continue with
|
|
/// further use of one of the pool's run or poll methods.
|
|
/// Though only one task will be completed, progress may be made on multiple tasks.
|
|
pub fn try_run_one(&mut self) -> bool {
|
|
run_executor(|cx| {
|
|
loop {
|
|
self.drain_incoming();
|
|
|
|
match self.pool.poll_next_unpin(cx) {
|
|
// Success!
|
|
Poll::Ready(Some(())) => return Poll::Ready(true),
|
|
// The pool was empty.
|
|
Poll::Ready(None) => return Poll::Ready(false),
|
|
Poll::Pending => (),
|
|
}
|
|
|
|
if !self.incoming.borrow().is_empty() {
|
|
// New tasks were spawned; try again.
|
|
continue;
|
|
} else if woken() {
|
|
// The pool yielded to us, but there's more progress to be made.
|
|
return Poll::Pending;
|
|
} else {
|
|
return Poll::Ready(false);
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
/// Runs all tasks in the pool and returns if no more progress can be made
|
|
/// on any task.
|
|
///
|
|
/// ```
|
|
/// use futures::executor::LocalPool;
|
|
/// use futures::task::LocalSpawnExt;
|
|
/// use futures::future::{ready, pending};
|
|
///
|
|
/// let mut pool = LocalPool::new();
|
|
/// let spawner = pool.spawner();
|
|
///
|
|
/// spawner.spawn_local(ready(())).unwrap();
|
|
/// spawner.spawn_local(ready(())).unwrap();
|
|
/// spawner.spawn_local(pending()).unwrap();
|
|
///
|
|
/// // Runs the two ready task and returns.
|
|
/// // The empty task remains in the pool.
|
|
/// pool.run_until_stalled();
|
|
/// ```
|
|
///
|
|
/// This function will not block the calling thread and will return the moment
|
|
/// that there are no tasks left for which progress can be made;
|
|
/// remaining incomplete tasks in the pool can continue with further use of one
|
|
/// of the pool's run or poll methods. While the function is running, all tasks
|
|
/// in the pool will try to make progress.
|
|
pub fn run_until_stalled(&mut self) {
|
|
run_executor(|cx| match self.poll_pool(cx) {
|
|
// The pool is empty.
|
|
Poll::Ready(()) => Poll::Ready(()),
|
|
Poll::Pending => {
|
|
if woken() {
|
|
Poll::Pending
|
|
} else {
|
|
// We're stalled for now.
|
|
Poll::Ready(())
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Poll `self.pool`, re-filling it with any newly-spawned tasks.
|
|
/// Repeat until either the pool is empty, or it returns `Pending`.
|
|
///
|
|
/// Returns `Ready` if the pool was empty, and `Pending` otherwise.
|
|
///
|
|
/// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily
|
|
/// mean that the pool can't make progress.
|
|
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
|
|
loop {
|
|
self.drain_incoming();
|
|
|
|
let pool_ret = self.pool.poll_next_unpin(cx);
|
|
|
|
// We queued up some new tasks; add them and poll again.
|
|
if !self.incoming.borrow().is_empty() {
|
|
continue;
|
|
}
|
|
|
|
match pool_ret {
|
|
Poll::Ready(Some(())) => continue,
|
|
Poll::Ready(None) => return Poll::Ready(()),
|
|
Poll::Pending => return Poll::Pending,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Empty the incoming queue of newly-spawned tasks.
|
|
fn drain_incoming(&mut self) {
|
|
let mut incoming = self.incoming.borrow_mut();
|
|
for task in incoming.drain(..) {
|
|
self.pool.push(task)
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Default for LocalPool {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
/// Run a future to completion on the current thread.
|
|
///
|
|
/// This function will block the caller until the given future has completed.
|
|
///
|
|
/// Use a [`LocalPool`](LocalPool) if you need finer-grained control over
|
|
/// spawned tasks.
|
|
pub fn block_on<F: Future>(f: F) -> F::Output {
|
|
pin_mut!(f);
|
|
run_executor(|cx| f.as_mut().poll(cx))
|
|
}
|
|
|
|
/// Turn a stream into a blocking iterator.
|
|
///
|
|
/// When `next` is called on the resulting `BlockingStream`, the caller
|
|
/// will be blocked until the next element of the `Stream` becomes available.
|
|
pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> {
|
|
BlockingStream { stream }
|
|
}
|
|
|
|
/// An iterator which blocks on values from a stream until they become available.
|
|
#[derive(Debug)]
|
|
pub struct BlockingStream<S: Stream + Unpin> {
|
|
stream: S,
|
|
}
|
|
|
|
impl<S: Stream + Unpin> Deref for BlockingStream<S> {
|
|
type Target = S;
|
|
fn deref(&self) -> &Self::Target {
|
|
&self.stream
|
|
}
|
|
}
|
|
|
|
impl<S: Stream + Unpin> DerefMut for BlockingStream<S> {
|
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
&mut self.stream
|
|
}
|
|
}
|
|
|
|
impl<S: Stream + Unpin> BlockingStream<S> {
|
|
/// Convert this `BlockingStream` into the inner `Stream` type.
|
|
pub fn into_inner(self) -> S {
|
|
self.stream
|
|
}
|
|
}
|
|
|
|
impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
|
|
type Item = S::Item;
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
LocalPool::new().run_until(self.stream.next())
|
|
}
|
|
|
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
|
self.stream.size_hint()
|
|
}
|
|
}
|
|
|
|
impl Spawn for LocalSpawner {
|
|
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
|
|
if let Some(incoming) = self.incoming.upgrade() {
|
|
incoming.borrow_mut().push(future.into());
|
|
Ok(())
|
|
} else {
|
|
Err(SpawnError::shutdown())
|
|
}
|
|
}
|
|
|
|
fn status(&self) -> Result<(), SpawnError> {
|
|
if self.incoming.upgrade().is_some() {
|
|
Ok(())
|
|
} else {
|
|
Err(SpawnError::shutdown())
|
|
}
|
|
}
|
|
}
|
|
|
|
impl LocalSpawn for LocalSpawner {
|
|
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
|
|
if let Some(incoming) = self.incoming.upgrade() {
|
|
incoming.borrow_mut().push(future);
|
|
Ok(())
|
|
} else {
|
|
Err(SpawnError::shutdown())
|
|
}
|
|
}
|
|
|
|
fn status_local(&self) -> Result<(), SpawnError> {
|
|
if self.incoming.upgrade().is_some() {
|
|
Ok(())
|
|
} else {
|
|
Err(SpawnError::shutdown())
|
|
}
|
|
}
|
|
}
|