diff options
Diffstat (limited to 'third_party/rust/async-task/src/runnable.rs')
-rw-r--r-- | third_party/rust/async-task/src/runnable.rs | 398 |
1 files changed, 398 insertions, 0 deletions
diff --git a/third_party/rust/async-task/src/runnable.rs b/third_party/rust/async-task/src/runnable.rs new file mode 100644 index 0000000000..cb70ef31b4 --- /dev/null +++ b/third_party/rust/async-task/src/runnable.rs @@ -0,0 +1,398 @@ +use core::fmt; +use core::future::Future; +use core::marker::PhantomData; +use core::mem; +use core::ptr::NonNull; +use core::sync::atomic::Ordering; +use core::task::Waker; + +use crate::header::Header; +use crate::raw::RawTask; +use crate::state::*; +use crate::Task; + +/// Creates a new task. +/// +/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its +/// output. +/// +/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] +/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run +/// again. +/// +/// When the task is woken, its [`Runnable`] is passed to the `schedule` function. +/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it +/// should push it into a task queue so that it can be processed later. +/// +/// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider +/// using [`spawn_local()`] or [`spawn_unchecked()`] instead. +/// +/// # Examples +/// +/// ``` +/// // The future inside the task. +/// let future = async { +/// println!("Hello, world!"); +/// }; +/// +/// // A function that schedules the task when it gets woken up. +/// let (s, r) = flume::unbounded(); +/// let schedule = move |runnable| s.send(runnable).unwrap(); +/// +/// // Create a task with the future and the schedule function. +/// let (runnable, task) = async_task::spawn(future, schedule); +/// ``` +pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) +where + F: Future + Send + 'static, + F::Output: Send + 'static, + S: Fn(Runnable) + Send + Sync + 'static, +{ + unsafe { spawn_unchecked(future, schedule) } +} + +/// Creates a new thread-local task. +/// +/// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the +/// [`Runnable`] is used or dropped on another thread, a panic will occur. +/// +/// This function is only available when the `std` feature for this crate is enabled. +/// +/// # Examples +/// +/// ``` +/// use async_task::Runnable; +/// use flume::{Receiver, Sender}; +/// use std::rc::Rc; +/// +/// thread_local! { +/// // A queue that holds scheduled tasks. +/// static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded(); +/// } +/// +/// // Make a non-Send future. +/// let msg: Rc<str> = "Hello, world!".into(); +/// let future = async move { +/// println!("{}", msg); +/// }; +/// +/// // A function that schedules the task when it gets woken up. +/// let s = QUEUE.with(|(s, _)| s.clone()); +/// let schedule = move |runnable| s.send(runnable).unwrap(); +/// +/// // Create a task with the future and the schedule function. +/// let (runnable, task) = async_task::spawn_local(future, schedule); +/// ``` +#[cfg(feature = "std")] +pub fn spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) +where + F: Future + 'static, + F::Output: 'static, + S: Fn(Runnable) + Send + Sync + 'static, +{ + use std::mem::ManuallyDrop; + use std::pin::Pin; + use std::task::{Context, Poll}; + use std::thread::{self, ThreadId}; + + #[inline] + fn thread_id() -> ThreadId { + thread_local! { + static ID: ThreadId = thread::current().id(); + } + ID.try_with(|id| *id) + .unwrap_or_else(|_| thread::current().id()) + } + + struct Checked<F> { + id: ThreadId, + inner: ManuallyDrop<F>, + } + + impl<F> Drop for Checked<F> { + fn drop(&mut self) { + assert!( + self.id == thread_id(), + "local task dropped by a thread that didn't spawn it" + ); + unsafe { + ManuallyDrop::drop(&mut self.inner); + } + } + } + + impl<F: Future> Future for Checked<F> { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + assert!( + self.id == thread_id(), + "local task polled by a thread that didn't spawn it" + ); + unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } + } + } + + // Wrap the future into one that checks which thread it's on. + let future = Checked { + id: thread_id(), + inner: ManuallyDrop::new(future), + }; + + unsafe { spawn_unchecked(future, schedule) } +} + +/// Creates a new task without [`Send`], [`Sync`], and `'static` bounds. +/// +/// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and +/// `'static` on `future` and `schedule`. +/// +/// # Safety +/// +/// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on the original +/// thread. +/// - If `future` is not `'static`, borrowed variables must outlive its [`Runnable`]. +/// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be used and dropped on +/// the original thread. +/// - If `schedule` is not `'static`, borrowed variables must outlive the task's [`Waker`]. +/// +/// # Examples +/// +/// ``` +/// // The future inside the task. +/// let future = async { +/// println!("Hello, world!"); +/// }; +/// +/// // If the task gets woken up, it will be sent into this channel. +/// let (s, r) = flume::unbounded(); +/// let schedule = move |runnable| s.send(runnable).unwrap(); +/// +/// // Create a task with the future and the schedule function. +/// let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) }; +/// ``` +pub unsafe fn spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) +where + F: Future, + S: Fn(Runnable), +{ + // Allocate large futures on the heap. + let ptr = if mem::size_of::<F>() >= 2048 { + let future = alloc::boxed::Box::pin(future); + RawTask::<_, F::Output, S>::allocate(future, schedule) + } else { + RawTask::<F, F::Output, S>::allocate(future, schedule) + }; + + let runnable = Runnable { ptr }; + let task = Task { + ptr, + _marker: PhantomData, + }; + (runnable, task) +} + +/// A handle to a runnable task. +/// +/// Every spawned task has a single [`Runnable`] handle, which only exists when the task is +/// scheduled for running. +/// +/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] +/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run +/// again. +/// +/// Dropping a [`Runnable`] cancels the task, which means its future won't be polled again, and +/// awaiting the [`Task`] after that will result in a panic. +/// +/// # Examples +/// +/// ``` +/// use async_task::Runnable; +/// use once_cell::sync::Lazy; +/// use std::{panic, thread}; +/// +/// // A simple executor. +/// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| { +/// let (sender, receiver) = flume::unbounded::<Runnable>(); +/// thread::spawn(|| { +/// for runnable in receiver { +/// let _ignore_panic = panic::catch_unwind(|| runnable.run()); +/// } +/// }); +/// sender +/// }); +/// +/// // Create a task with a simple future. +/// let schedule = |runnable| QUEUE.send(runnable).unwrap(); +/// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule); +/// +/// // Schedule the task and await its output. +/// runnable.schedule(); +/// assert_eq!(smol::future::block_on(task), 3); +/// ``` +pub struct Runnable { + /// A pointer to the heap-allocated task. + pub(crate) ptr: NonNull<()>, +} + +unsafe impl Send for Runnable {} +unsafe impl Sync for Runnable {} + +#[cfg(feature = "std")] +impl std::panic::UnwindSafe for Runnable {} +#[cfg(feature = "std")] +impl std::panic::RefUnwindSafe for Runnable {} + +impl Runnable { + /// Schedules the task. + /// + /// This is a convenience method that passes the [`Runnable`] to the schedule function. + /// + /// # Examples + /// + /// ``` + /// // A function that schedules the task when it gets woken up. + /// let (s, r) = flume::unbounded(); + /// let schedule = move |runnable| s.send(runnable).unwrap(); + /// + /// // Create a task with a simple future and the schedule function. + /// let (runnable, task) = async_task::spawn(async {}, schedule); + /// + /// // Schedule the task. + /// assert_eq!(r.len(), 0); + /// runnable.schedule(); + /// assert_eq!(r.len(), 1); + /// ``` + pub fn schedule(self) { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; + mem::forget(self); + + unsafe { + ((*header).vtable.schedule)(ptr); + } + } + + /// Runs the task by polling its future. + /// + /// Returns `true` if the task was woken while running, in which case the [`Runnable`] gets + /// rescheduled at the end of this method invocation. Otherwise, returns `false` and the + /// [`Runnable`] vanishes until the task is woken. + /// The return value is just a hint: `true` usually indicates that the task has yielded, i.e. + /// it woke itself and then gave the control back to the executor. + /// + /// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`] was called, then + /// this method simply destroys the task. + /// + /// If the polled future panics, this method propagates the panic, and awaiting the [`Task`] + /// after that will also result in a panic. + /// + /// # Examples + /// + /// ``` + /// // A function that schedules the task when it gets woken up. + /// let (s, r) = flume::unbounded(); + /// let schedule = move |runnable| s.send(runnable).unwrap(); + /// + /// // Create a task with a simple future and the schedule function. + /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule); + /// + /// // Run the task and check its output. + /// runnable.run(); + /// assert_eq!(smol::future::block_on(task), 3); + /// ``` + pub fn run(self) -> bool { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; + mem::forget(self); + + unsafe { ((*header).vtable.run)(ptr) } + } + + /// Returns a waker associated with this task. + /// + /// # Examples + /// + /// ``` + /// use smol::future; + /// + /// // A function that schedules the task when it gets woken up. + /// let (s, r) = flume::unbounded(); + /// let schedule = move |runnable| s.send(runnable).unwrap(); + /// + /// // Create a task with a simple future and the schedule function. + /// let (runnable, task) = async_task::spawn(future::pending::<()>(), schedule); + /// + /// // Take a waker and run the task. + /// let waker = runnable.waker(); + /// runnable.run(); + /// + /// // Reschedule the task by waking it. + /// assert_eq!(r.len(), 0); + /// waker.wake(); + /// assert_eq!(r.len(), 1); + /// ``` + pub fn waker(&self) -> Waker { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; + + unsafe { + let raw_waker = ((*header).vtable.clone_waker)(ptr); + Waker::from_raw(raw_waker) + } + } +} + +impl Drop for Runnable { + fn drop(&mut self) { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; + + unsafe { + let mut state = (*header).state.load(Ordering::Acquire); + + loop { + // If the task has been completed or closed, it can't be canceled. + if state & (COMPLETED | CLOSED) != 0 { + break; + } + + // Mark the task as closed. + match (*header).state.compare_exchange_weak( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(s) => state = s, + } + } + + // Drop the future. + ((*header).vtable.drop_future)(ptr); + + // Mark the task as unscheduled. + let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); + + // Notify the awaiter that the future has been dropped. + if state & AWAITER != 0 { + (*header).notify(None); + } + + // Drop the task reference. + ((*header).vtable.drop_ref)(ptr); + } + } +} + +impl fmt::Debug for Runnable { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; + + f.debug_struct("Runnable") + .field("header", unsafe { &(*header) }) + .finish() + } +} |