summaryrefslogtreecommitdiffstats
path: root/third_party/rust/async-task/src/runnable.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/async-task/src/runnable.rs')
-rw-r--r--third_party/rust/async-task/src/runnable.rs398
1 files changed, 398 insertions, 0 deletions
diff --git a/third_party/rust/async-task/src/runnable.rs b/third_party/rust/async-task/src/runnable.rs
new file mode 100644
index 0000000000..cb70ef31b4
--- /dev/null
+++ b/third_party/rust/async-task/src/runnable.rs
@@ -0,0 +1,398 @@
+use core::fmt;
+use core::future::Future;
+use core::marker::PhantomData;
+use core::mem;
+use core::ptr::NonNull;
+use core::sync::atomic::Ordering;
+use core::task::Waker;
+
+use crate::header::Header;
+use crate::raw::RawTask;
+use crate::state::*;
+use crate::Task;
+
+/// Creates a new task.
+///
+/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
+/// output.
+///
+/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
+/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
+/// again.
+///
+/// When the task is woken, its [`Runnable`] is passed to the `schedule` function.
+/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it
+/// should push it into a task queue so that it can be processed later.
+///
+/// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider
+/// using [`spawn_local()`] or [`spawn_unchecked()`] instead.
+///
+/// # Examples
+///
+/// ```
+/// // The future inside the task.
+/// let future = async {
+/// println!("Hello, world!");
+/// };
+///
+/// // A function that schedules the task when it gets woken up.
+/// let (s, r) = flume::unbounded();
+/// let schedule = move |runnable| s.send(runnable).unwrap();
+///
+/// // Create a task with the future and the schedule function.
+/// let (runnable, task) = async_task::spawn(future, schedule);
+/// ```
+pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
+where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ S: Fn(Runnable) + Send + Sync + 'static,
+{
+ unsafe { spawn_unchecked(future, schedule) }
+}
+
+/// Creates a new thread-local task.
+///
+/// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the
+/// [`Runnable`] is used or dropped on another thread, a panic will occur.
+///
+/// This function is only available when the `std` feature for this crate is enabled.
+///
+/// # Examples
+///
+/// ```
+/// use async_task::Runnable;
+/// use flume::{Receiver, Sender};
+/// use std::rc::Rc;
+///
+/// thread_local! {
+/// // A queue that holds scheduled tasks.
+/// static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded();
+/// }
+///
+/// // Make a non-Send future.
+/// let msg: Rc<str> = "Hello, world!".into();
+/// let future = async move {
+/// println!("{}", msg);
+/// };
+///
+/// // A function that schedules the task when it gets woken up.
+/// let s = QUEUE.with(|(s, _)| s.clone());
+/// let schedule = move |runnable| s.send(runnable).unwrap();
+///
+/// // Create a task with the future and the schedule function.
+/// let (runnable, task) = async_task::spawn_local(future, schedule);
+/// ```
+#[cfg(feature = "std")]
+pub fn spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
+where
+ F: Future + 'static,
+ F::Output: 'static,
+ S: Fn(Runnable) + Send + Sync + 'static,
+{
+ use std::mem::ManuallyDrop;
+ use std::pin::Pin;
+ use std::task::{Context, Poll};
+ use std::thread::{self, ThreadId};
+
+ #[inline]
+ fn thread_id() -> ThreadId {
+ thread_local! {
+ static ID: ThreadId = thread::current().id();
+ }
+ ID.try_with(|id| *id)
+ .unwrap_or_else(|_| thread::current().id())
+ }
+
+ struct Checked<F> {
+ id: ThreadId,
+ inner: ManuallyDrop<F>,
+ }
+
+ impl<F> Drop for Checked<F> {
+ fn drop(&mut self) {
+ assert!(
+ self.id == thread_id(),
+ "local task dropped by a thread that didn't spawn it"
+ );
+ unsafe {
+ ManuallyDrop::drop(&mut self.inner);
+ }
+ }
+ }
+
+ impl<F: Future> Future for Checked<F> {
+ type Output = F::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ assert!(
+ self.id == thread_id(),
+ "local task polled by a thread that didn't spawn it"
+ );
+ unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
+ }
+ }
+
+ // Wrap the future into one that checks which thread it's on.
+ let future = Checked {
+ id: thread_id(),
+ inner: ManuallyDrop::new(future),
+ };
+
+ unsafe { spawn_unchecked(future, schedule) }
+}
+
+/// Creates a new task without [`Send`], [`Sync`], and `'static` bounds.
+///
+/// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and
+/// `'static` on `future` and `schedule`.
+///
+/// # Safety
+///
+/// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on the original
+/// thread.
+/// - If `future` is not `'static`, borrowed variables must outlive its [`Runnable`].
+/// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be used and dropped on
+/// the original thread.
+/// - If `schedule` is not `'static`, borrowed variables must outlive the task's [`Waker`].
+///
+/// # Examples
+///
+/// ```
+/// // The future inside the task.
+/// let future = async {
+/// println!("Hello, world!");
+/// };
+///
+/// // If the task gets woken up, it will be sent into this channel.
+/// let (s, r) = flume::unbounded();
+/// let schedule = move |runnable| s.send(runnable).unwrap();
+///
+/// // Create a task with the future and the schedule function.
+/// let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
+/// ```
+pub unsafe fn spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
+where
+ F: Future,
+ S: Fn(Runnable),
+{
+ // Allocate large futures on the heap.
+ let ptr = if mem::size_of::<F>() >= 2048 {
+ let future = alloc::boxed::Box::pin(future);
+ RawTask::<_, F::Output, S>::allocate(future, schedule)
+ } else {
+ RawTask::<F, F::Output, S>::allocate(future, schedule)
+ };
+
+ let runnable = Runnable { ptr };
+ let task = Task {
+ ptr,
+ _marker: PhantomData,
+ };
+ (runnable, task)
+}
+
+/// A handle to a runnable task.
+///
+/// Every spawned task has a single [`Runnable`] handle, which only exists when the task is
+/// scheduled for running.
+///
+/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
+/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
+/// again.
+///
+/// Dropping a [`Runnable`] cancels the task, which means its future won't be polled again, and
+/// awaiting the [`Task`] after that will result in a panic.
+///
+/// # Examples
+///
+/// ```
+/// use async_task::Runnable;
+/// use once_cell::sync::Lazy;
+/// use std::{panic, thread};
+///
+/// // A simple executor.
+/// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| {
+/// let (sender, receiver) = flume::unbounded::<Runnable>();
+/// thread::spawn(|| {
+/// for runnable in receiver {
+/// let _ignore_panic = panic::catch_unwind(|| runnable.run());
+/// }
+/// });
+/// sender
+/// });
+///
+/// // Create a task with a simple future.
+/// let schedule = |runnable| QUEUE.send(runnable).unwrap();
+/// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
+///
+/// // Schedule the task and await its output.
+/// runnable.schedule();
+/// assert_eq!(smol::future::block_on(task), 3);
+/// ```
+pub struct Runnable {
+ /// A pointer to the heap-allocated task.
+ pub(crate) ptr: NonNull<()>,
+}
+
+unsafe impl Send for Runnable {}
+unsafe impl Sync for Runnable {}
+
+#[cfg(feature = "std")]
+impl std::panic::UnwindSafe for Runnable {}
+#[cfg(feature = "std")]
+impl std::panic::RefUnwindSafe for Runnable {}
+
+impl Runnable {
+ /// Schedules the task.
+ ///
+ /// This is a convenience method that passes the [`Runnable`] to the schedule function.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// // A function that schedules the task when it gets woken up.
+ /// let (s, r) = flume::unbounded();
+ /// let schedule = move |runnable| s.send(runnable).unwrap();
+ ///
+ /// // Create a task with a simple future and the schedule function.
+ /// let (runnable, task) = async_task::spawn(async {}, schedule);
+ ///
+ /// // Schedule the task.
+ /// assert_eq!(r.len(), 0);
+ /// runnable.schedule();
+ /// assert_eq!(r.len(), 1);
+ /// ```
+ pub fn schedule(self) {
+ let ptr = self.ptr.as_ptr();
+ let header = ptr as *const Header;
+ mem::forget(self);
+
+ unsafe {
+ ((*header).vtable.schedule)(ptr);
+ }
+ }
+
+ /// Runs the task by polling its future.
+ ///
+ /// Returns `true` if the task was woken while running, in which case the [`Runnable`] gets
+ /// rescheduled at the end of this method invocation. Otherwise, returns `false` and the
+ /// [`Runnable`] vanishes until the task is woken.
+ /// The return value is just a hint: `true` usually indicates that the task has yielded, i.e.
+ /// it woke itself and then gave the control back to the executor.
+ ///
+ /// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`] was called, then
+ /// this method simply destroys the task.
+ ///
+ /// If the polled future panics, this method propagates the panic, and awaiting the [`Task`]
+ /// after that will also result in a panic.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// // A function that schedules the task when it gets woken up.
+ /// let (s, r) = flume::unbounded();
+ /// let schedule = move |runnable| s.send(runnable).unwrap();
+ ///
+ /// // Create a task with a simple future and the schedule function.
+ /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
+ ///
+ /// // Run the task and check its output.
+ /// runnable.run();
+ /// assert_eq!(smol::future::block_on(task), 3);
+ /// ```
+ pub fn run(self) -> bool {
+ let ptr = self.ptr.as_ptr();
+ let header = ptr as *const Header;
+ mem::forget(self);
+
+ unsafe { ((*header).vtable.run)(ptr) }
+ }
+
+ /// Returns a waker associated with this task.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use smol::future;
+ ///
+ /// // A function that schedules the task when it gets woken up.
+ /// let (s, r) = flume::unbounded();
+ /// let schedule = move |runnable| s.send(runnable).unwrap();
+ ///
+ /// // Create a task with a simple future and the schedule function.
+ /// let (runnable, task) = async_task::spawn(future::pending::<()>(), schedule);
+ ///
+ /// // Take a waker and run the task.
+ /// let waker = runnable.waker();
+ /// runnable.run();
+ ///
+ /// // Reschedule the task by waking it.
+ /// assert_eq!(r.len(), 0);
+ /// waker.wake();
+ /// assert_eq!(r.len(), 1);
+ /// ```
+ pub fn waker(&self) -> Waker {
+ let ptr = self.ptr.as_ptr();
+ let header = ptr as *const Header;
+
+ unsafe {
+ let raw_waker = ((*header).vtable.clone_waker)(ptr);
+ Waker::from_raw(raw_waker)
+ }
+ }
+}
+
+impl Drop for Runnable {
+ fn drop(&mut self) {
+ let ptr = self.ptr.as_ptr();
+ let header = ptr as *const Header;
+
+ unsafe {
+ let mut state = (*header).state.load(Ordering::Acquire);
+
+ loop {
+ // If the task has been completed or closed, it can't be canceled.
+ if state & (COMPLETED | CLOSED) != 0 {
+ break;
+ }
+
+ // Mark the task as closed.
+ match (*header).state.compare_exchange_weak(
+ state,
+ state | CLOSED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => break,
+ Err(s) => state = s,
+ }
+ }
+
+ // Drop the future.
+ ((*header).vtable.drop_future)(ptr);
+
+ // Mark the task as unscheduled.
+ let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
+
+ // Notify the awaiter that the future has been dropped.
+ if state & AWAITER != 0 {
+ (*header).notify(None);
+ }
+
+ // Drop the task reference.
+ ((*header).vtable.drop_ref)(ptr);
+ }
+ }
+}
+
+impl fmt::Debug for Runnable {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let ptr = self.ptr.as_ptr();
+ let header = ptr as *const Header;
+
+ f.debug_struct("Runnable")
+ .field("header", unsafe { &(*header) })
+ .finish()
+ }
+}