use crate::enter; use futures_core::future::Future; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_task::{waker_ref, ArcWake}; use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; use futures_util::pin_mut; use futures_util::stream::FuturesUnordered; use futures_util::stream::StreamExt; use std::cell::RefCell; use std::ops::{Deref, DerefMut}; use std::rc::{Rc, Weak}; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; use std::thread::{self, Thread}; /// A single-threaded task pool for polling futures to completion. /// /// This executor allows you to multiplex any number of tasks onto a single /// thread. It's appropriate to poll strictly I/O-bound futures that do very /// little work in between I/O actions. /// /// To get a handle to the pool that implements /// [`Spawn`](futures_task::Spawn), use the /// [`spawner()`](LocalPool::spawner) method. Because the executor is /// single-threaded, it supports a special form of task spawning for non-`Send` /// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj). #[derive(Debug)] pub struct LocalPool { pool: FuturesUnordered>, incoming: Rc, } /// A handle to a [`LocalPool`](LocalPool) that implements /// [`Spawn`](futures_task::Spawn). #[derive(Clone, Debug)] pub struct LocalSpawner { incoming: Weak, } type Incoming = RefCell>>; pub(crate) struct ThreadNotify { /// The (single) executor thread. thread: Thread, /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten" /// before the next `park()`, which may otherwise happen if the code /// being executed as part of the future(s) being polled makes use of /// park / unpark calls of its own, i.e. we cannot assume that no other /// code uses park / unpark on the executing `thread`. unparked: AtomicBool, } thread_local! { static CURRENT_THREAD_NOTIFY: Arc = Arc::new(ThreadNotify { thread: thread::current(), unparked: AtomicBool::new(false), }); } impl ArcWake for ThreadNotify { fn wake_by_ref(arc_self: &Arc) { // Make sure the wakeup is remembered until the next `park()`. let unparked = arc_self.unparked.swap(true, Ordering::Release); if !unparked { // If the thread has not been unparked yet, it must be done // now. If it was actually parked, it will run again, // otherwise the token made available by `unpark` // may be consumed before reaching `park()`, but `unparked` // ensures it is not forgotten. arc_self.thread.unpark(); } } } // Set up and run a basic single-threaded spawner loop, invoking `f` on each // turn. fn run_executor) -> Poll>(mut f: F) -> T { let _enter = enter().expect( "cannot execute `LocalPool` executor from within \ another executor", ); CURRENT_THREAD_NOTIFY.with(|thread_notify| { let waker = waker_ref(thread_notify); let mut cx = Context::from_waker(&waker); loop { if let Poll::Ready(t) = f(&mut cx) { return t; } // Wait for a wakeup. while !thread_notify.unparked.swap(false, Ordering::Acquire) { // No wakeup occurred. It may occur now, right before parking, // but in that case the token made available by `unpark()` // is guaranteed to still be available and `park()` is a no-op. thread::park(); } } }) } /// Check for a wakeup, but don't consume it. fn woken() -> bool { CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire)) } impl LocalPool { /// Create a new, empty pool of tasks. pub fn new() -> Self { Self { pool: FuturesUnordered::new(), incoming: Default::default() } } /// Get a clonable handle to the pool as a [`Spawn`]. pub fn spawner(&self) -> LocalSpawner { LocalSpawner { incoming: Rc::downgrade(&self.incoming) } } /// Run all tasks in the pool to completion. /// /// ``` /// use futures::executor::LocalPool; /// /// let mut pool = LocalPool::new(); /// /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()` /// /// // run *all* tasks in the pool to completion, including any newly-spawned ones. /// pool.run(); /// ``` /// /// The function will block the calling thread until *all* tasks in the pool /// are complete, including any spawned while running existing tasks. pub fn run(&mut self) { run_executor(|cx| self.poll_pool(cx)) } /// Runs all the tasks in the pool until the given future completes. /// /// ``` /// use futures::executor::LocalPool; /// /// let mut pool = LocalPool::new(); /// # let my_app = async {}; /// /// // run tasks in the pool until `my_app` completes /// pool.run_until(my_app); /// ``` /// /// The function will block the calling thread *only* until the future `f` /// completes; there may still be incomplete tasks in the pool, which will /// be inert after the call completes, but can continue with further use of /// one of the pool's run or poll methods. While the function is running, /// however, all tasks in the pool will try to make progress. pub fn run_until(&mut self, future: F) -> F::Output { pin_mut!(future); run_executor(|cx| { { // if our main task is done, so are we let result = future.as_mut().poll(cx); if let Poll::Ready(output) = result { return Poll::Ready(output); } } let _ = self.poll_pool(cx); Poll::Pending }) } /// Runs all tasks and returns after completing one future or until no more progress /// can be made. Returns `true` if one future was completed, `false` otherwise. /// /// ``` /// use futures::executor::LocalPool; /// use futures::task::LocalSpawnExt; /// use futures::future::{ready, pending}; /// /// let mut pool = LocalPool::new(); /// let spawner = pool.spawner(); /// /// spawner.spawn_local(ready(())).unwrap(); /// spawner.spawn_local(ready(())).unwrap(); /// spawner.spawn_local(pending()).unwrap(); /// /// // Run the two ready tasks and return true for them. /// pool.try_run_one(); // returns true after completing one of the ready futures /// pool.try_run_one(); // returns true after completing the other ready future /// /// // the remaining task can not be completed /// assert!(!pool.try_run_one()); // returns false /// ``` /// /// This function will not block the calling thread and will return the moment /// that there are no tasks left for which progress can be made or after exactly one /// task was completed; Remaining incomplete tasks in the pool can continue with /// further use of one of the pool's run or poll methods. /// Though only one task will be completed, progress may be made on multiple tasks. pub fn try_run_one(&mut self) -> bool { run_executor(|cx| { loop { self.drain_incoming(); match self.pool.poll_next_unpin(cx) { // Success! Poll::Ready(Some(())) => return Poll::Ready(true), // The pool was empty. Poll::Ready(None) => return Poll::Ready(false), Poll::Pending => (), } if !self.incoming.borrow().is_empty() { // New tasks were spawned; try again. continue; } else if woken() { // The pool yielded to us, but there's more progress to be made. return Poll::Pending; } else { return Poll::Ready(false); } } }) } /// Runs all tasks in the pool and returns if no more progress can be made /// on any task. /// /// ``` /// use futures::executor::LocalPool; /// use futures::task::LocalSpawnExt; /// use futures::future::{ready, pending}; /// /// let mut pool = LocalPool::new(); /// let spawner = pool.spawner(); /// /// spawner.spawn_local(ready(())).unwrap(); /// spawner.spawn_local(ready(())).unwrap(); /// spawner.spawn_local(pending()).unwrap(); /// /// // Runs the two ready task and returns. /// // The empty task remains in the pool. /// pool.run_until_stalled(); /// ``` /// /// This function will not block the calling thread and will return the moment /// that there are no tasks left for which progress can be made; /// remaining incomplete tasks in the pool can continue with further use of one /// of the pool's run or poll methods. While the function is running, all tasks /// in the pool will try to make progress. pub fn run_until_stalled(&mut self) { run_executor(|cx| match self.poll_pool(cx) { // The pool is empty. Poll::Ready(()) => Poll::Ready(()), Poll::Pending => { if woken() { Poll::Pending } else { // We're stalled for now. Poll::Ready(()) } } }); } /// Poll `self.pool`, re-filling it with any newly-spawned tasks. /// Repeat until either the pool is empty, or it returns `Pending`. /// /// Returns `Ready` if the pool was empty, and `Pending` otherwise. /// /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily /// mean that the pool can't make progress. fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> { loop { self.drain_incoming(); let pool_ret = self.pool.poll_next_unpin(cx); // We queued up some new tasks; add them and poll again. if !self.incoming.borrow().is_empty() { continue; } match pool_ret { Poll::Ready(Some(())) => continue, Poll::Ready(None) => return Poll::Ready(()), Poll::Pending => return Poll::Pending, } } } /// Empty the incoming queue of newly-spawned tasks. fn drain_incoming(&mut self) { let mut incoming = self.incoming.borrow_mut(); for task in incoming.drain(..) { self.pool.push(task) } } } impl Default for LocalPool { fn default() -> Self { Self::new() } } /// Run a future to completion on the current thread. /// /// This function will block the caller until the given future has completed. /// /// Use a [`LocalPool`](LocalPool) if you need finer-grained control over /// spawned tasks. pub fn block_on(f: F) -> F::Output { pin_mut!(f); run_executor(|cx| f.as_mut().poll(cx)) } /// Turn a stream into a blocking iterator. /// /// When `next` is called on the resulting `BlockingStream`, the caller /// will be blocked until the next element of the `Stream` becomes available. pub fn block_on_stream(stream: S) -> BlockingStream { BlockingStream { stream } } /// An iterator which blocks on values from a stream until they become available. #[derive(Debug)] pub struct BlockingStream { stream: S, } impl Deref for BlockingStream { type Target = S; fn deref(&self) -> &Self::Target { &self.stream } } impl DerefMut for BlockingStream { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.stream } } impl BlockingStream { /// Convert this `BlockingStream` into the inner `Stream` type. pub fn into_inner(self) -> S { self.stream } } impl Iterator for BlockingStream { type Item = S::Item; fn next(&mut self) -> Option { LocalPool::new().run_until(self.stream.next()) } fn size_hint(&self) -> (usize, Option) { self.stream.size_hint() } } impl Spawn for LocalSpawner { fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { if let Some(incoming) = self.incoming.upgrade() { incoming.borrow_mut().push(future.into()); Ok(()) } else { Err(SpawnError::shutdown()) } } fn status(&self) -> Result<(), SpawnError> { if self.incoming.upgrade().is_some() { Ok(()) } else { Err(SpawnError::shutdown()) } } } impl LocalSpawn for LocalSpawner { fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> { if let Some(incoming) = self.incoming.upgrade() { incoming.borrow_mut().push(future); Ok(()) } else { Err(SpawnError::shutdown()) } } fn status_local(&self) -> Result<(), SpawnError> { if self.incoming.upgrade().is_some() { Ok(()) } else { Err(SpawnError::shutdown()) } } }