summaryrefslogtreecommitdiffstats
path: root/third_party/rust/async-task/src
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/async-task/src
parentInitial commit. (diff)
downloadfirefox-esr-upstream.tar.xz
firefox-esr-upstream.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--third_party/rust/async-task/src/header.rs162
-rw-r--r--third_party/rust/async-task/src/lib.rs99
-rw-r--r--third_party/rust/async-task/src/raw.rs707
-rw-r--r--third_party/rust/async-task/src/runnable.rs398
-rw-r--r--third_party/rust/async-task/src/state.rs69
-rw-r--r--third_party/rust/async-task/src/task.rs532
-rw-r--r--third_party/rust/async-task/src/utils.rs127
7 files changed, 2094 insertions, 0 deletions
diff --git a/third_party/rust/async-task/src/header.rs b/third_party/rust/async-task/src/header.rs
new file mode 100644
index 0000000000..8a3a0b9189
--- /dev/null
+++ b/third_party/rust/async-task/src/header.rs
@@ -0,0 +1,162 @@
+use core::cell::UnsafeCell;
+use core::fmt;
+use core::sync::atomic::{AtomicUsize, Ordering};
+use core::task::Waker;
+
+use crate::raw::TaskVTable;
+use crate::state::*;
+use crate::utils::abort_on_panic;
+
+/// The header of a task.
+///
+/// This header is stored in memory at the beginning of the heap-allocated task.
+pub(crate) struct Header {
+ /// Current state of the task.
+ ///
+ /// Contains flags representing the current state and the reference count.
+ pub(crate) state: AtomicUsize,
+
+ /// The task that is blocked on the `Task` handle.
+ ///
+ /// This waker needs to be woken up once the task completes or is closed.
+ pub(crate) awaiter: UnsafeCell<Option<Waker>>,
+
+ /// The virtual table.
+ ///
+ /// In addition to the actual waker virtual table, it also contains pointers to several other
+ /// methods necessary for bookkeeping the heap-allocated task.
+ pub(crate) vtable: &'static TaskVTable,
+}
+
+impl Header {
+ /// Notifies the awaiter blocked on this task.
+ ///
+ /// If the awaiter is the same as the current waker, it will not be notified.
+ #[inline]
+ pub(crate) fn notify(&self, current: Option<&Waker>) {
+ if let Some(w) = self.take(current) {
+ abort_on_panic(|| w.wake());
+ }
+ }
+
+ /// Takes the awaiter blocked on this task.
+ ///
+ /// If there is no awaiter or if it is the same as the current waker, returns `None`.
+ #[inline]
+ pub(crate) fn take(&self, current: Option<&Waker>) -> Option<Waker> {
+ // Set the bit indicating that the task is notifying its awaiter.
+ let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel);
+
+ // If the task was not notifying or registering an awaiter...
+ if state & (NOTIFYING | REGISTERING) == 0 {
+ // Take the waker out.
+ let waker = unsafe { (*self.awaiter.get()).take() };
+
+ // Unset the bit indicating that the task is notifying its awaiter.
+ self.state
+ .fetch_and(!NOTIFYING & !AWAITER, Ordering::Release);
+
+ // Finally, notify the waker if it's different from the current waker.
+ if let Some(w) = waker {
+ match current {
+ None => return Some(w),
+ Some(c) if !w.will_wake(c) => return Some(w),
+ Some(_) => abort_on_panic(|| drop(w)),
+ }
+ }
+ }
+
+ None
+ }
+
+ /// Registers a new awaiter blocked on this task.
+ ///
+ /// This method is called when `Task` is polled and it has not yet completed.
+ #[inline]
+ pub(crate) fn register(&self, waker: &Waker) {
+ // Load the state and synchronize with it.
+ let mut state = self.state.fetch_or(0, Ordering::Acquire);
+
+ loop {
+ // There can't be two concurrent registrations because `Task` can only be polled
+ // by a unique pinned reference.
+ debug_assert!(state & REGISTERING == 0);
+
+ // If we're in the notifying state at this moment, just wake and return without
+ // registering.
+ if state & NOTIFYING != 0 {
+ abort_on_panic(|| waker.wake_by_ref());
+ return;
+ }
+
+ // Mark the state to let other threads know we're registering a new awaiter.
+ match self.state.compare_exchange_weak(
+ state,
+ state | REGISTERING,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ state |= REGISTERING;
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+
+ // Put the waker into the awaiter field.
+ unsafe {
+ abort_on_panic(|| (*self.awaiter.get()) = Some(waker.clone()));
+ }
+
+ // This variable will contain the newly registered waker if a notification comes in before
+ // we complete registration.
+ let mut waker = None;
+
+ loop {
+ // If there was a notification, take the waker out of the awaiter field.
+ if state & NOTIFYING != 0 {
+ if let Some(w) = unsafe { (*self.awaiter.get()).take() } {
+ abort_on_panic(|| waker = Some(w));
+ }
+ }
+
+ // The new state is not being notified nor registered, but there might or might not be
+ // an awaiter depending on whether there was a concurrent notification.
+ let new = if waker.is_none() {
+ (state & !NOTIFYING & !REGISTERING) | AWAITER
+ } else {
+ state & !NOTIFYING & !REGISTERING & !AWAITER
+ };
+
+ match self
+ .state
+ .compare_exchange_weak(state, new, Ordering::AcqRel, Ordering::Acquire)
+ {
+ Ok(_) => break,
+ Err(s) => state = s,
+ }
+ }
+
+ // If there was a notification during registration, wake the awaiter now.
+ if let Some(w) = waker {
+ abort_on_panic(|| w.wake());
+ }
+ }
+}
+
+impl fmt::Debug for Header {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let state = self.state.load(Ordering::SeqCst);
+
+ f.debug_struct("Header")
+ .field("scheduled", &(state & SCHEDULED != 0))
+ .field("running", &(state & RUNNING != 0))
+ .field("completed", &(state & COMPLETED != 0))
+ .field("closed", &(state & CLOSED != 0))
+ .field("awaiter", &(state & AWAITER != 0))
+ .field("task", &(state & TASK != 0))
+ .field("ref_count", &(state / REFERENCE))
+ .finish()
+ }
+}
diff --git a/third_party/rust/async-task/src/lib.rs b/third_party/rust/async-task/src/lib.rs
new file mode 100644
index 0000000000..dd689ecd26
--- /dev/null
+++ b/third_party/rust/async-task/src/lib.rs
@@ -0,0 +1,99 @@
+//! Task abstraction for building executors.
+//!
+//! To spawn a future onto an executor, we first need to allocate it on the heap and keep some
+//! state attached to it. The state indicates whether the future is ready for polling, waiting to
+//! be woken up, or completed. Such a stateful future is called a *task*.
+//!
+//! All executors have a queue that holds scheduled tasks:
+//!
+//! ```
+//! let (sender, receiver) = flume::unbounded();
+//! #
+//! # // A future that will get spawned.
+//! # let future = async { 1 + 2 };
+//! #
+//! # // A function that schedules the task when it gets woken up.
+//! # let schedule = move |runnable| sender.send(runnable).unwrap();
+//! #
+//! # // Create a task.
+//! # let (runnable, task) = async_task::spawn(future, schedule);
+//! ```
+//!
+//! A task is created using either [`spawn()`], [`spawn_local()`], or [`spawn_unchecked()`] which
+//! return a [`Runnable`] and a [`Task`]:
+//!
+//! ```
+//! # let (sender, receiver) = flume::unbounded();
+//! #
+//! // A future that will be spawned.
+//! let future = async { 1 + 2 };
+//!
+//! // A function that schedules the task when it gets woken up.
+//! let schedule = move |runnable| sender.send(runnable).unwrap();
+//!
+//! // Construct a task.
+//! let (runnable, task) = async_task::spawn(future, schedule);
+//!
+//! // Push the task into the queue by invoking its schedule function.
+//! runnable.schedule();
+//! ```
+//!
+//! The [`Runnable`] is used to poll the task's future, and the [`Task`] is used to await its
+//! output.
+//!
+//! Finally, we need a loop that takes scheduled tasks from the queue and runs them:
+//!
+//! ```no_run
+//! # let (sender, receiver) = flume::unbounded();
+//! #
+//! # // A future that will get spawned.
+//! # let future = async { 1 + 2 };
+//! #
+//! # // A function that schedules the task when it gets woken up.
+//! # let schedule = move |runnable| sender.send(runnable).unwrap();
+//! #
+//! # // Create a task.
+//! # let (runnable, task) = async_task::spawn(future, schedule);
+//! #
+//! # // Push the task into the queue by invoking its schedule function.
+//! # runnable.schedule();
+//! #
+//! for runnable in receiver {
+//! runnable.run();
+//! }
+//! ```
+//!
+//! Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
+//! vanishes and only reappears when its [`Waker`][`core::task::Waker`] wakes the task, thus
+//! scheduling it to be run again.
+
+#![cfg_attr(not(feature = "std"), no_std)]
+#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
+#![doc(test(attr(deny(rust_2018_idioms, warnings))))]
+#![doc(test(attr(allow(unused_extern_crates, unused_variables))))]
+
+extern crate alloc;
+
+/// We can't use `?` in const contexts yet, so this macro acts
+/// as a workaround.
+macro_rules! leap {
+ ($x: expr) => {{
+ match ($x) {
+ Some(val) => val,
+ None => return None,
+ }
+ }};
+}
+
+mod header;
+mod raw;
+mod runnable;
+mod state;
+mod task;
+mod utils;
+
+pub use crate::runnable::{spawn, spawn_unchecked, Runnable};
+pub use crate::task::{FallibleTask, Task};
+
+#[cfg(feature = "std")]
+pub use crate::runnable::spawn_local;
diff --git a/third_party/rust/async-task/src/raw.rs b/third_party/rust/async-task/src/raw.rs
new file mode 100644
index 0000000000..bb031da3e3
--- /dev/null
+++ b/third_party/rust/async-task/src/raw.rs
@@ -0,0 +1,707 @@
+use alloc::alloc::Layout as StdLayout;
+use core::cell::UnsafeCell;
+use core::future::Future;
+use core::mem::{self, ManuallyDrop};
+use core::pin::Pin;
+use core::ptr::NonNull;
+use core::sync::atomic::{AtomicUsize, Ordering};
+use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
+
+use crate::header::Header;
+use crate::state::*;
+use crate::utils::{abort, abort_on_panic, max, Layout};
+use crate::Runnable;
+
+/// The vtable for a task.
+pub(crate) struct TaskVTable {
+ /// Schedules the task.
+ pub(crate) schedule: unsafe fn(*const ()),
+
+ /// Drops the future inside the task.
+ pub(crate) drop_future: unsafe fn(*const ()),
+
+ /// Returns a pointer to the output stored after completion.
+ pub(crate) get_output: unsafe fn(*const ()) -> *const (),
+
+ /// Drops the task reference (`Runnable` or `Waker`).
+ pub(crate) drop_ref: unsafe fn(ptr: *const ()),
+
+ /// Destroys the task.
+ pub(crate) destroy: unsafe fn(*const ()),
+
+ /// Runs the task.
+ pub(crate) run: unsafe fn(*const ()) -> bool,
+
+ /// Creates a new waker associated with the task.
+ pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker,
+
+ /// The memory layout of the task. This information enables
+ /// debuggers to decode raw task memory blobs. Do not remove
+ /// the field, even if it appears to be unused.
+ #[allow(unused)]
+ pub(crate) layout_info: &'static Option<TaskLayout>,
+}
+
+/// Memory layout of a task.
+///
+/// This struct contains the following information:
+///
+/// 1. How to allocate and deallocate the task.
+/// 2. How to access the fields inside the task.
+#[derive(Clone, Copy)]
+pub(crate) struct TaskLayout {
+ /// Memory layout of the whole task.
+ pub(crate) layout: StdLayout,
+
+ /// Offset into the task at which the schedule function is stored.
+ pub(crate) offset_s: usize,
+
+ /// Offset into the task at which the future is stored.
+ pub(crate) offset_f: usize,
+
+ /// Offset into the task at which the output is stored.
+ pub(crate) offset_r: usize,
+}
+
+/// Raw pointers to the fields inside a task.
+pub(crate) struct RawTask<F, T, S> {
+ /// The task header.
+ pub(crate) header: *const Header,
+
+ /// The schedule function.
+ pub(crate) schedule: *const S,
+
+ /// The future.
+ pub(crate) future: *mut F,
+
+ /// The output of the future.
+ pub(crate) output: *mut T,
+}
+
+impl<F, T, S> Copy for RawTask<F, T, S> {}
+
+impl<F, T, S> Clone for RawTask<F, T, S> {
+ fn clone(&self) -> Self {
+ *self
+ }
+}
+
+impl<F, T, S> RawTask<F, T, S> {
+ const TASK_LAYOUT: Option<TaskLayout> = Self::eval_task_layout();
+
+ /// Computes the memory layout for a task.
+ #[inline]
+ const fn eval_task_layout() -> Option<TaskLayout> {
+ // Compute the layouts for `Header`, `S`, `F`, and `T`.
+ let layout_header = Layout::new::<Header>();
+ let layout_s = Layout::new::<S>();
+ let layout_f = Layout::new::<F>();
+ let layout_r = Layout::new::<T>();
+
+ // Compute the layout for `union { F, T }`.
+ let size_union = max(layout_f.size(), layout_r.size());
+ let align_union = max(layout_f.align(), layout_r.align());
+ let layout_union = Layout::from_size_align(size_union, align_union);
+
+ // Compute the layout for `Header` followed `S` and `union { F, T }`.
+ let layout = layout_header;
+ let (layout, offset_s) = leap!(layout.extend(layout_s));
+ let (layout, offset_union) = leap!(layout.extend(layout_union));
+ let offset_f = offset_union;
+ let offset_r = offset_union;
+
+ Some(TaskLayout {
+ layout: unsafe { layout.into_std() },
+ offset_s,
+ offset_f,
+ offset_r,
+ })
+ }
+}
+
+impl<F, T, S> RawTask<F, T, S>
+where
+ F: Future<Output = T>,
+ S: Fn(Runnable),
+{
+ const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
+ Self::clone_waker,
+ Self::wake,
+ Self::wake_by_ref,
+ Self::drop_waker,
+ );
+
+ /// Allocates a task with the given `future` and `schedule` function.
+ ///
+ /// It is assumed that initially only the `Runnable` and the `Task` exist.
+ pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> {
+ // Compute the layout of the task for allocation. Abort if the computation fails.
+ //
+ // n.b. notgull: task_layout now automatically aborts instead of panicking
+ let task_layout = Self::task_layout();
+
+ unsafe {
+ // Allocate enough space for the entire task.
+ let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) {
+ None => abort(),
+ Some(p) => p,
+ };
+
+ let raw = Self::from_ptr(ptr.as_ptr());
+
+ // Write the header as the first field of the task.
+ (raw.header as *mut Header).write(Header {
+ state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE),
+ awaiter: UnsafeCell::new(None),
+ vtable: &TaskVTable {
+ schedule: Self::schedule,
+ drop_future: Self::drop_future,
+ get_output: Self::get_output,
+ drop_ref: Self::drop_ref,
+ destroy: Self::destroy,
+ run: Self::run,
+ clone_waker: Self::clone_waker,
+ layout_info: &Self::TASK_LAYOUT,
+ },
+ });
+
+ // Write the schedule function as the third field of the task.
+ (raw.schedule as *mut S).write(schedule);
+
+ // Write the future as the fourth field of the task.
+ raw.future.write(future);
+
+ ptr
+ }
+ }
+
+ /// Creates a `RawTask` from a raw task pointer.
+ #[inline]
+ pub(crate) fn from_ptr(ptr: *const ()) -> Self {
+ let task_layout = Self::task_layout();
+ let p = ptr as *const u8;
+
+ unsafe {
+ Self {
+ header: p as *const Header,
+ schedule: p.add(task_layout.offset_s) as *const S,
+ future: p.add(task_layout.offset_f) as *mut F,
+ output: p.add(task_layout.offset_r) as *mut T,
+ }
+ }
+ }
+
+ /// Returns the layout of the task.
+ #[inline]
+ fn task_layout() -> TaskLayout {
+ match Self::TASK_LAYOUT {
+ Some(tl) => tl,
+ None => abort(),
+ }
+ }
+
+ /// Wakes a waker.
+ unsafe fn wake(ptr: *const ()) {
+ // This is just an optimization. If the schedule function has captured variables, then
+ // we'll do less reference counting if we wake the waker by reference and then drop it.
+ if mem::size_of::<S>() > 0 {
+ Self::wake_by_ref(ptr);
+ Self::drop_waker(ptr);
+ return;
+ }
+
+ let raw = Self::from_ptr(ptr);
+
+ let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+ loop {
+ // If the task is completed or closed, it can't be woken up.
+ if state & (COMPLETED | CLOSED) != 0 {
+ // Drop the waker.
+ Self::drop_waker(ptr);
+ break;
+ }
+
+ // If the task is already scheduled, we just need to synchronize with the thread that
+ // will run the task by "publishing" our current view of the memory.
+ if state & SCHEDULED != 0 {
+ // Update the state without actually modifying it.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ state,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // Drop the waker.
+ Self::drop_waker(ptr);
+ break;
+ }
+ Err(s) => state = s,
+ }
+ } else {
+ // Mark the task as scheduled.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ state | SCHEDULED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // If the task is not yet scheduled and isn't currently running, now is the
+ // time to schedule it.
+ if state & RUNNING == 0 {
+ // Schedule the task.
+ Self::schedule(ptr);
+ } else {
+ // Drop the waker.
+ Self::drop_waker(ptr);
+ }
+
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+
+ /// Wakes a waker by reference.
+ unsafe fn wake_by_ref(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+ loop {
+ // If the task is completed or closed, it can't be woken up.
+ if state & (COMPLETED | CLOSED) != 0 {
+ break;
+ }
+
+ // If the task is already scheduled, we just need to synchronize with the thread that
+ // will run the task by "publishing" our current view of the memory.
+ if state & SCHEDULED != 0 {
+ // Update the state without actually modifying it.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ state,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => break,
+ Err(s) => state = s,
+ }
+ } else {
+ // If the task is not running, we can schedule right away.
+ let new = if state & RUNNING == 0 {
+ (state | SCHEDULED) + REFERENCE
+ } else {
+ state | SCHEDULED
+ };
+
+ // Mark the task as scheduled.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ new,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // If the task is not running, now is the time to schedule.
+ if state & RUNNING == 0 {
+ // If the reference count overflowed, abort.
+ if state > isize::max_value() as usize {
+ abort();
+ }
+
+ // Schedule the task. There is no need to call `Self::schedule(ptr)`
+ // because the schedule function cannot be destroyed while the waker is
+ // still alive.
+ let task = Runnable {
+ ptr: NonNull::new_unchecked(ptr as *mut ()),
+ };
+ (*raw.schedule)(task);
+ }
+
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+
+ /// Clones a waker.
+ unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
+ let raw = Self::from_ptr(ptr);
+
+ // Increment the reference count. With any kind of reference-counted data structure,
+ // relaxed ordering is appropriate when incrementing the counter.
+ let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
+
+ // If the reference count overflowed, abort.
+ if state > isize::max_value() as usize {
+ abort();
+ }
+
+ RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)
+ }
+
+ /// Drops a waker.
+ ///
+ /// This function will decrement the reference count. If it drops down to zero, the associated
+ /// `Task` has been dropped too, and the task has not been completed, then it will get
+ /// scheduled one more time so that its future gets dropped by the executor.
+ #[inline]
+ unsafe fn drop_waker(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ // Decrement the reference count.
+ let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
+
+ // If this was the last reference to the task and the `Task` has been dropped too,
+ // then we need to decide how to destroy the task.
+ if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
+ if new & (COMPLETED | CLOSED) == 0 {
+ // If the task was not completed nor closed, close it and schedule one more time so
+ // that its future gets dropped by the executor.
+ (*raw.header)
+ .state
+ .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
+ Self::schedule(ptr);
+ } else {
+ // Otherwise, destroy the task right away.
+ Self::destroy(ptr);
+ }
+ }
+ }
+
+ /// Drops a task reference (`Runnable` or `Waker`).
+ ///
+ /// This function will decrement the reference count. If it drops down to zero and the
+ /// associated `Task` handle has been dropped too, then the task gets destroyed.
+ #[inline]
+ unsafe fn drop_ref(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ // Decrement the reference count.
+ let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
+
+ // If this was the last reference to the task and the `Task` has been dropped too,
+ // then destroy the task.
+ if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
+ Self::destroy(ptr);
+ }
+ }
+
+ /// Schedules a task for running.
+ ///
+ /// This function doesn't modify the state of the task. It only passes the task reference to
+ /// its schedule function.
+ unsafe fn schedule(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ // If the schedule function has captured variables, create a temporary waker that prevents
+ // the task from getting deallocated while the function is being invoked.
+ let _waker;
+ if mem::size_of::<S>() > 0 {
+ _waker = Waker::from_raw(Self::clone_waker(ptr));
+ }
+
+ let task = Runnable {
+ ptr: NonNull::new_unchecked(ptr as *mut ()),
+ };
+ (*raw.schedule)(task);
+ }
+
+ /// Drops the future inside a task.
+ #[inline]
+ unsafe fn drop_future(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ // We need a safeguard against panics because the destructor can panic.
+ abort_on_panic(|| {
+ raw.future.drop_in_place();
+ })
+ }
+
+ /// Returns a pointer to the output inside a task.
+ unsafe fn get_output(ptr: *const ()) -> *const () {
+ let raw = Self::from_ptr(ptr);
+ raw.output as *const ()
+ }
+
+ /// Cleans up task's resources and deallocates it.
+ ///
+ /// The schedule function will be dropped, and the task will then get deallocated.
+ /// The task must be closed before this function is called.
+ #[inline]
+ unsafe fn destroy(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+ let task_layout = Self::task_layout();
+
+ // We need a safeguard against panics because destructors can panic.
+ abort_on_panic(|| {
+ // Drop the schedule function.
+ (raw.schedule as *mut S).drop_in_place();
+ });
+
+ // Finally, deallocate the memory reserved by the task.
+ alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout);
+ }
+
+ /// Runs a task.
+ ///
+ /// If polling its future panics, the task will be closed and the panic will be propagated into
+ /// the caller.
+ unsafe fn run(ptr: *const ()) -> bool {
+ let raw = Self::from_ptr(ptr);
+
+ // Create a context from the raw task pointer and the vtable inside the its header.
+ let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)));
+ let cx = &mut Context::from_waker(&waker);
+
+ let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+ // Update the task's state before polling its future.
+ loop {
+ // If the task has already been closed, drop the task reference and return.
+ if state & CLOSED != 0 {
+ // Drop the future.
+ Self::drop_future(ptr);
+
+ // Mark the task as unscheduled.
+ let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
+
+ // Take the awaiter out.
+ let mut awaiter = None;
+ if state & AWAITER != 0 {
+ awaiter = (*raw.header).take(None);
+ }
+
+ // Drop the task reference.
+ Self::drop_ref(ptr);
+
+ // Notify the awaiter that the future has been dropped.
+ if let Some(w) = awaiter {
+ abort_on_panic(|| w.wake());
+ }
+ return false;
+ }
+
+ // Mark the task as unscheduled and running.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ (state & !SCHEDULED) | RUNNING,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // Update the state because we're continuing with polling the future.
+ state = (state & !SCHEDULED) | RUNNING;
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+
+ // Poll the inner future, but surround it with a guard that closes the task in case polling
+ // panics.
+ let guard = Guard(raw);
+ let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
+ mem::forget(guard);
+
+ match poll {
+ Poll::Ready(out) => {
+ // Replace the future with its output.
+ Self::drop_future(ptr);
+ raw.output.write(out);
+
+ // The task is now completed.
+ loop {
+ // If the `Task` is dropped, we'll need to close it and drop the output.
+ let new = if state & TASK == 0 {
+ (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
+ } else {
+ (state & !RUNNING & !SCHEDULED) | COMPLETED
+ };
+
+ // Mark the task as not running and completed.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ new,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // If the `Task` is dropped or if the task was closed while running,
+ // now it's time to drop the output.
+ if state & TASK == 0 || state & CLOSED != 0 {
+ // Drop the output.
+ abort_on_panic(|| raw.output.drop_in_place());
+ }
+
+ // Take the awaiter out.
+ let mut awaiter = None;
+ if state & AWAITER != 0 {
+ awaiter = (*raw.header).take(None);
+ }
+
+ // Drop the task reference.
+ Self::drop_ref(ptr);
+
+ // Notify the awaiter that the future has been dropped.
+ if let Some(w) = awaiter {
+ abort_on_panic(|| w.wake());
+ }
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ Poll::Pending => {
+ let mut future_dropped = false;
+
+ // The task is still not completed.
+ loop {
+ // If the task was closed while running, we'll need to unschedule in case it
+ // was woken up and then destroy it.
+ let new = if state & CLOSED != 0 {
+ state & !RUNNING & !SCHEDULED
+ } else {
+ state & !RUNNING
+ };
+
+ if state & CLOSED != 0 && !future_dropped {
+ // The thread that closed the task didn't drop the future because it was
+ // running so now it's our responsibility to do so.
+ Self::drop_future(ptr);
+ future_dropped = true;
+ }
+
+ // Mark the task as not running.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ new,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(state) => {
+ // If the task was closed while running, we need to notify the awaiter.
+ // If the task was woken up while running, we need to schedule it.
+ // Otherwise, we just drop the task reference.
+ if state & CLOSED != 0 {
+ // Take the awaiter out.
+ let mut awaiter = None;
+ if state & AWAITER != 0 {
+ awaiter = (*raw.header).take(None);
+ }
+
+ // Drop the task reference.
+ Self::drop_ref(ptr);
+
+ // Notify the awaiter that the future has been dropped.
+ if let Some(w) = awaiter {
+ abort_on_panic(|| w.wake());
+ }
+ } else if state & SCHEDULED != 0 {
+ // The thread that woke the task up didn't reschedule it because
+ // it was running so now it's our responsibility to do so.
+ Self::schedule(ptr);
+ return true;
+ } else {
+ // Drop the task reference.
+ Self::drop_ref(ptr);
+ }
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+
+ return false;
+
+ /// A guard that closes the task if polling its future panics.
+ struct Guard<F, T, S>(RawTask<F, T, S>)
+ where
+ F: Future<Output = T>,
+ S: Fn(Runnable);
+
+ impl<F, T, S> Drop for Guard<F, T, S>
+ where
+ F: Future<Output = T>,
+ S: Fn(Runnable),
+ {
+ fn drop(&mut self) {
+ let raw = self.0;
+ let ptr = raw.header as *const ();
+
+ unsafe {
+ let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+ loop {
+ // If the task was closed while running, then unschedule it, drop its
+ // future, and drop the task reference.
+ if state & CLOSED != 0 {
+ // The thread that closed the task didn't drop the future because it
+ // was running so now it's our responsibility to do so.
+ RawTask::<F, T, S>::drop_future(ptr);
+
+ // Mark the task as not running and not scheduled.
+ (*raw.header)
+ .state
+ .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel);
+
+ // Take the awaiter out.
+ let mut awaiter = None;
+ if state & AWAITER != 0 {
+ awaiter = (*raw.header).take(None);
+ }
+
+ // Drop the task reference.
+ RawTask::<F, T, S>::drop_ref(ptr);
+
+ // Notify the awaiter that the future has been dropped.
+ if let Some(w) = awaiter {
+ abort_on_panic(|| w.wake());
+ }
+ break;
+ }
+
+ // Mark the task as not running, not scheduled, and closed.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ (state & !RUNNING & !SCHEDULED) | CLOSED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(state) => {
+ // Drop the future because the task is now closed.
+ RawTask::<F, T, S>::drop_future(ptr);
+
+ // Take the awaiter out.
+ let mut awaiter = None;
+ if state & AWAITER != 0 {
+ awaiter = (*raw.header).take(None);
+ }
+
+ // Drop the task reference.
+ RawTask::<F, T, S>::drop_ref(ptr);
+
+ // Notify the awaiter that the future has been dropped.
+ if let Some(w) = awaiter {
+ abort_on_panic(|| w.wake());
+ }
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/third_party/rust/async-task/src/runnable.rs b/third_party/rust/async-task/src/runnable.rs
new file mode 100644
index 0000000000..cb70ef31b4
--- /dev/null
+++ b/third_party/rust/async-task/src/runnable.rs
@@ -0,0 +1,398 @@
+use core::fmt;
+use core::future::Future;
+use core::marker::PhantomData;
+use core::mem;
+use core::ptr::NonNull;
+use core::sync::atomic::Ordering;
+use core::task::Waker;
+
+use crate::header::Header;
+use crate::raw::RawTask;
+use crate::state::*;
+use crate::Task;
+
+/// Creates a new task.
+///
+/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
+/// output.
+///
+/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
+/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
+/// again.
+///
+/// When the task is woken, its [`Runnable`] is passed to the `schedule` function.
+/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it
+/// should push it into a task queue so that it can be processed later.
+///
+/// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider
+/// using [`spawn_local()`] or [`spawn_unchecked()`] instead.
+///
+/// # Examples
+///
+/// ```
+/// // The future inside the task.
+/// let future = async {
+/// println!("Hello, world!");
+/// };
+///
+/// // A function that schedules the task when it gets woken up.
+/// let (s, r) = flume::unbounded();
+/// let schedule = move |runnable| s.send(runnable).unwrap();
+///
+/// // Create a task with the future and the schedule function.
+/// let (runnable, task) = async_task::spawn(future, schedule);
+/// ```
+pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
+where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ S: Fn(Runnable) + Send + Sync + 'static,
+{
+ unsafe { spawn_unchecked(future, schedule) }
+}
+
+/// Creates a new thread-local task.
+///
+/// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the
+/// [`Runnable`] is used or dropped on another thread, a panic will occur.
+///
+/// This function is only available when the `std` feature for this crate is enabled.
+///
+/// # Examples
+///
+/// ```
+/// use async_task::Runnable;
+/// use flume::{Receiver, Sender};
+/// use std::rc::Rc;
+///
+/// thread_local! {
+/// // A queue that holds scheduled tasks.
+/// static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded();
+/// }
+///
+/// // Make a non-Send future.
+/// let msg: Rc<str> = "Hello, world!".into();
+/// let future = async move {
+/// println!("{}", msg);
+/// };
+///
+/// // A function that schedules the task when it gets woken up.
+/// let s = QUEUE.with(|(s, _)| s.clone());
+/// let schedule = move |runnable| s.send(runnable).unwrap();
+///
+/// // Create a task with the future and the schedule function.
+/// let (runnable, task) = async_task::spawn_local(future, schedule);
+/// ```
+#[cfg(feature = "std")]
+pub fn spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
+where
+ F: Future + 'static,
+ F::Output: 'static,
+ S: Fn(Runnable) + Send + Sync + 'static,
+{
+ use std::mem::ManuallyDrop;
+ use std::pin::Pin;
+ use std::task::{Context, Poll};
+ use std::thread::{self, ThreadId};
+
+ #[inline]
+ fn thread_id() -> ThreadId {
+ thread_local! {
+ static ID: ThreadId = thread::current().id();
+ }
+ ID.try_with(|id| *id)
+ .unwrap_or_else(|_| thread::current().id())
+ }
+
+ struct Checked<F> {
+ id: ThreadId,
+ inner: ManuallyDrop<F>,
+ }
+
+ impl<F> Drop for Checked<F> {
+ fn drop(&mut self) {
+ assert!(
+ self.id == thread_id(),
+ "local task dropped by a thread that didn't spawn it"
+ );
+ unsafe {
+ ManuallyDrop::drop(&mut self.inner);
+ }
+ }
+ }
+
+ impl<F: Future> Future for Checked<F> {
+ type Output = F::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ assert!(
+ self.id == thread_id(),
+ "local task polled by a thread that didn't spawn it"
+ );
+ unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
+ }
+ }
+
+ // Wrap the future into one that checks which thread it's on.
+ let future = Checked {
+ id: thread_id(),
+ inner: ManuallyDrop::new(future),
+ };
+
+ unsafe { spawn_unchecked(future, schedule) }
+}
+
+/// Creates a new task without [`Send`], [`Sync`], and `'static` bounds.
+///
+/// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and
+/// `'static` on `future` and `schedule`.
+///
+/// # Safety
+///
+/// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on the original
+/// thread.
+/// - If `future` is not `'static`, borrowed variables must outlive its [`Runnable`].
+/// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be used and dropped on
+/// the original thread.
+/// - If `schedule` is not `'static`, borrowed variables must outlive the task's [`Waker`].
+///
+/// # Examples
+///
+/// ```
+/// // The future inside the task.
+/// let future = async {
+/// println!("Hello, world!");
+/// };
+///
+/// // If the task gets woken up, it will be sent into this channel.
+/// let (s, r) = flume::unbounded();
+/// let schedule = move |runnable| s.send(runnable).unwrap();
+///
+/// // Create a task with the future and the schedule function.
+/// let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
+/// ```
+pub unsafe fn spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
+where
+ F: Future,
+ S: Fn(Runnable),
+{
+ // Allocate large futures on the heap.
+ let ptr = if mem::size_of::<F>() >= 2048 {
+ let future = alloc::boxed::Box::pin(future);
+ RawTask::<_, F::Output, S>::allocate(future, schedule)
+ } else {
+ RawTask::<F, F::Output, S>::allocate(future, schedule)
+ };
+
+ let runnable = Runnable { ptr };
+ let task = Task {
+ ptr,
+ _marker: PhantomData,
+ };
+ (runnable, task)
+}
+
+/// A handle to a runnable task.
+///
+/// Every spawned task has a single [`Runnable`] handle, which only exists when the task is
+/// scheduled for running.
+///
+/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
+/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
+/// again.
+///
+/// Dropping a [`Runnable`] cancels the task, which means its future won't be polled again, and
+/// awaiting the [`Task`] after that will result in a panic.
+///
+/// # Examples
+///
+/// ```
+/// use async_task::Runnable;
+/// use once_cell::sync::Lazy;
+/// use std::{panic, thread};
+///
+/// // A simple executor.
+/// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| {
+/// let (sender, receiver) = flume::unbounded::<Runnable>();
+/// thread::spawn(|| {
+/// for runnable in receiver {
+/// let _ignore_panic = panic::catch_unwind(|| runnable.run());
+/// }
+/// });
+/// sender
+/// });
+///
+/// // Create a task with a simple future.
+/// let schedule = |runnable| QUEUE.send(runnable).unwrap();
+/// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
+///
+/// // Schedule the task and await its output.
+/// runnable.schedule();
+/// assert_eq!(smol::future::block_on(task), 3);
+/// ```
+pub struct Runnable {
+ /// A pointer to the heap-allocated task.
+ pub(crate) ptr: NonNull<()>,
+}
+
+unsafe impl Send for Runnable {}
+unsafe impl Sync for Runnable {}
+
+#[cfg(feature = "std")]
+impl std::panic::UnwindSafe for Runnable {}
+#[cfg(feature = "std")]
+impl std::panic::RefUnwindSafe for Runnable {}
+
+impl Runnable {
+ /// Schedules the task.
+ ///
+ /// This is a convenience method that passes the [`Runnable`] to the schedule function.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// // A function that schedules the task when it gets woken up.
+ /// let (s, r) = flume::unbounded();
+ /// let schedule = move |runnable| s.send(runnable).unwrap();
+ ///
+ /// // Create a task with a simple future and the schedule function.
+ /// let (runnable, task) = async_task::spawn(async {}, schedule);
+ ///
+ /// // Schedule the task.
+ /// assert_eq!(r.len(), 0);
+ /// runnable.schedule();
+ /// assert_eq!(r.len(), 1);
+ /// ```
+ pub fn schedule(self) {
+ let ptr = self.ptr.as_ptr();
+ let header = ptr as *const Header;
+ mem::forget(self);
+
+ unsafe {
+ ((*header).vtable.schedule)(ptr);
+ }
+ }
+
+ /// Runs the task by polling its future.
+ ///
+ /// Returns `true` if the task was woken while running, in which case the [`Runnable`] gets
+ /// rescheduled at the end of this method invocation. Otherwise, returns `false` and the
+ /// [`Runnable`] vanishes until the task is woken.
+ /// The return value is just a hint: `true` usually indicates that the task has yielded, i.e.
+ /// it woke itself and then gave the control back to the executor.
+ ///
+ /// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`] was called, then
+ /// this method simply destroys the task.
+ ///
+ /// If the polled future panics, this method propagates the panic, and awaiting the [`Task`]
+ /// after that will also result in a panic.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// // A function that schedules the task when it gets woken up.
+ /// let (s, r) = flume::unbounded();
+ /// let schedule = move |runnable| s.send(runnable).unwrap();
+ ///
+ /// // Create a task with a simple future and the schedule function.
+ /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
+ ///
+ /// // Run the task and check its output.
+ /// runnable.run();
+ /// assert_eq!(smol::future::block_on(task), 3);
+ /// ```
+ pub fn run(self) -> bool {
+ let ptr = self.ptr.as_ptr();
+ let header = ptr as *const Header;
+ mem::forget(self);
+
+ unsafe { ((*header).vtable.run)(ptr) }
+ }
+
+ /// Returns a waker associated with this task.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use smol::future;
+ ///
+ /// // A function that schedules the task when it gets woken up.
+ /// let (s, r) = flume::unbounded();
+ /// let schedule = move |runnable| s.send(runnable).unwrap();
+ ///
+ /// // Create a task with a simple future and the schedule function.
+ /// let (runnable, task) = async_task::spawn(future::pending::<()>(), schedule);
+ ///
+ /// // Take a waker and run the task.
+ /// let waker = runnable.waker();
+ /// runnable.run();
+ ///
+ /// // Reschedule the task by waking it.
+ /// assert_eq!(r.len(), 0);
+ /// waker.wake();
+ /// assert_eq!(r.len(), 1);
+ /// ```
+ pub fn waker(&self) -> Waker {
+ let ptr = self.ptr.as_ptr();
+ let header = ptr as *const Header;
+
+ unsafe {
+ let raw_waker = ((*header).vtable.clone_waker)(ptr);
+ Waker::from_raw(raw_waker)
+ }
+ }
+}
+
+impl Drop for Runnable {
+ fn drop(&mut self) {
+ let ptr = self.ptr.as_ptr();
+ let header = ptr as *const Header;
+
+ unsafe {
+ let mut state = (*header).state.load(Ordering::Acquire);
+
+ loop {
+ // If the task has been completed or closed, it can't be canceled.
+ if state & (COMPLETED | CLOSED) != 0 {
+ break;
+ }
+
+ // Mark the task as closed.
+ match (*header).state.compare_exchange_weak(
+ state,
+ state | CLOSED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => break,
+ Err(s) => state = s,
+ }
+ }
+
+ // Drop the future.
+ ((*header).vtable.drop_future)(ptr);
+
+ // Mark the task as unscheduled.
+ let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
+
+ // Notify the awaiter that the future has been dropped.
+ if state & AWAITER != 0 {
+ (*header).notify(None);
+ }
+
+ // Drop the task reference.
+ ((*header).vtable.drop_ref)(ptr);
+ }
+ }
+}
+
+impl fmt::Debug for Runnable {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let ptr = self.ptr.as_ptr();
+ let header = ptr as *const Header;
+
+ f.debug_struct("Runnable")
+ .field("header", unsafe { &(*header) })
+ .finish()
+ }
+}
diff --git a/third_party/rust/async-task/src/state.rs b/third_party/rust/async-task/src/state.rs
new file mode 100644
index 0000000000..2fc6cf3711
--- /dev/null
+++ b/third_party/rust/async-task/src/state.rs
@@ -0,0 +1,69 @@
+/// Set if the task is scheduled for running.
+///
+/// A task is considered to be scheduled whenever its `Runnable` exists.
+///
+/// This flag can't be set when the task is completed. However, it can be set while the task is
+/// running, in which case it will be rescheduled as soon as polling finishes.
+pub(crate) const SCHEDULED: usize = 1 << 0;
+
+/// Set if the task is running.
+///
+/// A task is in running state while its future is being polled.
+///
+/// This flag can't be set when the task is completed. However, it can be in scheduled state while
+/// it is running, in which case it will be rescheduled as soon as polling finishes.
+pub(crate) const RUNNING: usize = 1 << 1;
+
+/// Set if the task has been completed.
+///
+/// This flag is set when polling returns `Poll::Ready`. The output of the future is then stored
+/// inside the task until it becomes closed. In fact, `Task` picks up the output by marking
+/// the task as closed.
+///
+/// This flag can't be set when the task is scheduled or running.
+pub(crate) const COMPLETED: usize = 1 << 2;
+
+/// Set if the task is closed.
+///
+/// If a task is closed, that means it's either canceled or its output has been consumed by the
+/// `Task`. A task becomes closed in the following cases:
+///
+/// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`.
+/// 2. Its output gets awaited by the `Task`.
+/// 3. It panics while polling the future.
+/// 4. It is completed and the `Task` gets dropped.
+pub(crate) const CLOSED: usize = 1 << 3;
+
+/// Set if the `Task` still exists.
+///
+/// The `Task` is a special case in that it is only tracked by this flag, while all other
+/// task references (`Runnable` and `Waker`s) are tracked by the reference count.
+pub(crate) const TASK: usize = 1 << 4;
+
+/// Set if the `Task` is awaiting the output.
+///
+/// This flag is set while there is a registered awaiter of type `Waker` inside the task. When the
+/// task gets closed or completed, we need to wake the awaiter. This flag can be used as a fast
+/// check that tells us if we need to wake anyone.
+pub(crate) const AWAITER: usize = 1 << 5;
+
+/// Set if an awaiter is being registered.
+///
+/// This flag is set when `Task` is polled and we are registering a new awaiter.
+pub(crate) const REGISTERING: usize = 1 << 6;
+
+/// Set if the awaiter is being notified.
+///
+/// This flag is set when notifying the awaiter. If an awaiter is concurrently registered and
+/// notified, whichever side came first will take over the reposibility of resolving the race.
+pub(crate) const NOTIFYING: usize = 1 << 7;
+
+/// A single reference.
+///
+/// The lower bits in the state contain various flags representing the task state, while the upper
+/// bits contain the reference count. The value of `REFERENCE` represents a single reference in the
+/// total reference count.
+///
+/// Note that the reference counter only tracks the `Runnable` and `Waker`s. The `Task` is
+/// tracked separately by the `TASK` flag.
+pub(crate) const REFERENCE: usize = 1 << 8;
diff --git a/third_party/rust/async-task/src/task.rs b/third_party/rust/async-task/src/task.rs
new file mode 100644
index 0000000000..8ecd746c13
--- /dev/null
+++ b/third_party/rust/async-task/src/task.rs
@@ -0,0 +1,532 @@
+use core::fmt;
+use core::future::Future;
+use core::marker::{PhantomData, Unpin};
+use core::mem;
+use core::pin::Pin;
+use core::ptr::NonNull;
+use core::sync::atomic::Ordering;
+use core::task::{Context, Poll};
+
+use crate::header::Header;
+use crate::state::*;
+
+/// A spawned task.
+///
+/// A [`Task`] can be awaited to retrieve the output of its future.
+///
+/// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the
+/// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a
+/// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()]
+/// method.
+///
+/// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor
+/// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking
+/// [`run()`][`super::Runnable::run()`].
+///
+/// # Examples
+///
+/// ```
+/// use smol::{future, Executor};
+/// use std::thread;
+///
+/// let ex = Executor::new();
+///
+/// // Spawn a future onto the executor.
+/// let task = ex.spawn(async {
+/// println!("Hello from a task!");
+/// 1 + 2
+/// });
+///
+/// // Run an executor thread.
+/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
+///
+/// // Wait for the task's output.
+/// assert_eq!(future::block_on(task), 3);
+/// ```
+#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
+pub struct Task<T> {
+ /// A raw task pointer.
+ pub(crate) ptr: NonNull<()>,
+
+ /// A marker capturing generic type `T`.
+ pub(crate) _marker: PhantomData<T>,
+}
+
+unsafe impl<T: Send> Send for Task<T> {}
+unsafe impl<T> Sync for Task<T> {}
+
+impl<T> Unpin for Task<T> {}
+
+#[cfg(feature = "std")]
+impl<T> std::panic::UnwindSafe for Task<T> {}
+#[cfg(feature = "std")]
+impl<T> std::panic::RefUnwindSafe for Task<T> {}
+
+impl<T> Task<T> {
+ /// Detaches the task to let it keep running in the background.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use smol::{Executor, Timer};
+ /// use std::time::Duration;
+ ///
+ /// let ex = Executor::new();
+ ///
+ /// // Spawn a deamon future.
+ /// ex.spawn(async {
+ /// loop {
+ /// println!("I'm a daemon task looping forever.");
+ /// Timer::after(Duration::from_secs(1)).await;
+ /// }
+ /// })
+ /// .detach();
+ /// ```
+ pub fn detach(self) {
+ let mut this = self;
+ let _out = this.set_detached();
+ mem::forget(this);
+ }
+
+ /// Cancels the task and waits for it to stop running.
+ ///
+ /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
+ /// it didn't complete.
+ ///
+ /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
+ /// canceling because it also waits for the task to stop running.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # if cfg!(miri) { return; } // Miri does not support epoll
+ /// use smol::{future, Executor, Timer};
+ /// use std::thread;
+ /// use std::time::Duration;
+ ///
+ /// let ex = Executor::new();
+ ///
+ /// // Spawn a deamon future.
+ /// let task = ex.spawn(async {
+ /// loop {
+ /// println!("Even though I'm in an infinite loop, you can still cancel me!");
+ /// Timer::after(Duration::from_secs(1)).await;
+ /// }
+ /// });
+ ///
+ /// // Run an executor thread.
+ /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
+ ///
+ /// future::block_on(async {
+ /// Timer::after(Duration::from_secs(3)).await;
+ /// task.cancel().await;
+ /// });
+ /// ```
+ pub async fn cancel(self) -> Option<T> {
+ let mut this = self;
+ this.set_canceled();
+ this.fallible().await
+ }
+
+ /// Converts this task into a [`FallibleTask`].
+ ///
+ /// Like [`Task`], a fallible task will poll the task's output until it is
+ /// completed or cancelled due to its [`Runnable`][`super::Runnable`] being
+ /// dropped without being run. Resolves to the task's output when completed,
+ /// or [`None`] if it didn't complete.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use smol::{future, Executor};
+ /// use std::thread;
+ ///
+ /// let ex = Executor::new();
+ ///
+ /// // Spawn a future onto the executor.
+ /// let task = ex.spawn(async {
+ /// println!("Hello from a task!");
+ /// 1 + 2
+ /// })
+ /// .fallible();
+ ///
+ /// // Run an executor thread.
+ /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
+ ///
+ /// // Wait for the task's output.
+ /// assert_eq!(future::block_on(task), Some(3));
+ /// ```
+ ///
+ /// ```
+ /// use smol::future;
+ ///
+ /// // Schedule function which drops the runnable without running it.
+ /// let schedule = move |runnable| drop(runnable);
+ ///
+ /// // Create a task with the future and the schedule function.
+ /// let (runnable, task) = async_task::spawn(async {
+ /// println!("Hello from a task!");
+ /// 1 + 2
+ /// }, schedule);
+ /// runnable.schedule();
+ ///
+ /// // Wait for the task's output.
+ /// assert_eq!(future::block_on(task.fallible()), None);
+ /// ```
+ pub fn fallible(self) -> FallibleTask<T> {
+ FallibleTask { task: self }
+ }
+
+ /// Puts the task in canceled state.
+ fn set_canceled(&mut self) {
+ let ptr = self.ptr.as_ptr();
+ let header = ptr as *const Header;
+
+ unsafe {
+ let mut state = (*header).state.load(Ordering::Acquire);
+
+ loop {
+ // If the task has been completed or closed, it can't be canceled.
+ if state & (COMPLETED | CLOSED) != 0 {
+ break;
+ }
+
+ // If the task is not scheduled nor running, we'll need to schedule it.
+ let new = if state & (SCHEDULED | RUNNING) == 0 {
+ (state | SCHEDULED | CLOSED) + REFERENCE
+ } else {
+ state | CLOSED
+ };
+
+ // Mark the task as closed.
+ match (*header).state.compare_exchange_weak(
+ state,
+ new,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // If the task is not scheduled nor running, schedule it one more time so
+ // that its future gets dropped by the executor.
+ if state & (SCHEDULED | RUNNING) == 0 {
+ ((*header).vtable.schedule)(ptr);
+ }
+
+ // Notify the awaiter that the task has been closed.
+ if state & AWAITER != 0 {
+ (*header).notify(None);
+ }
+
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+
+ /// Puts the task in detached state.
+ fn set_detached(&mut self) -> Option<T> {
+ let ptr = self.ptr.as_ptr();
+ let header = ptr as *const Header;
+
+ unsafe {
+ // A place where the output will be stored in case it needs to be dropped.
+ let mut output = None;
+
+ // Optimistically assume the `Task` is being detached just after creating the task.
+ // This is a common case so if the `Task` is datached, the overhead of it is only one
+ // compare-exchange operation.
+ if let Err(mut state) = (*header).state.compare_exchange_weak(
+ SCHEDULED | TASK | REFERENCE,
+ SCHEDULED | REFERENCE,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ loop {
+ // If the task has been completed but not yet closed, that means its output
+ // must be dropped.
+ if state & COMPLETED != 0 && state & CLOSED == 0 {
+ // Mark the task as closed in order to grab its output.
+ match (*header).state.compare_exchange_weak(
+ state,
+ state | CLOSED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // Read the output.
+ output =
+ Some((((*header).vtable.get_output)(ptr) as *mut T).read());
+
+ // Update the state variable because we're continuing the loop.
+ state |= CLOSED;
+ }
+ Err(s) => state = s,
+ }
+ } else {
+ // If this is the last reference to the task and it's not closed, then
+ // close it and schedule one more time so that its future gets dropped by
+ // the executor.
+ let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
+ SCHEDULED | CLOSED | REFERENCE
+ } else {
+ state & !TASK
+ };
+
+ // Unset the `TASK` flag.
+ match (*header).state.compare_exchange_weak(
+ state,
+ new,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // If this is the last reference to the task, we need to either
+ // schedule dropping its future or destroy it.
+ if state & !(REFERENCE - 1) == 0 {
+ if state & CLOSED == 0 {
+ ((*header).vtable.schedule)(ptr);
+ } else {
+ ((*header).vtable.destroy)(ptr);
+ }
+ }
+
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+
+ output
+ }
+ }
+
+ /// Polls the task to retrieve its output.
+ ///
+ /// Returns `Some` if the task has completed or `None` if it was closed.
+ ///
+ /// A task becomes closed in the following cases:
+ ///
+ /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`.
+ /// 2. Its output gets awaited by the `Task`.
+ /// 3. It panics while polling the future.
+ /// 4. It is completed and the `Task` gets dropped.
+ fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
+ let ptr = self.ptr.as_ptr();
+ let header = ptr as *const Header;
+
+ unsafe {
+ let mut state = (*header).state.load(Ordering::Acquire);
+
+ loop {
+ // If the task has been closed, notify the awaiter and return `None`.
+ if state & CLOSED != 0 {
+ // If the task is scheduled or running, we need to wait until its future is
+ // dropped.
+ if state & (SCHEDULED | RUNNING) != 0 {
+ // Replace the waker with one associated with the current task.
+ (*header).register(cx.waker());
+
+ // Reload the state after registering. It is possible changes occurred just
+ // before registration so we need to check for that.
+ state = (*header).state.load(Ordering::Acquire);
+
+ // If the task is still scheduled or running, we need to wait because its
+ // future is not dropped yet.
+ if state & (SCHEDULED | RUNNING) != 0 {
+ return Poll::Pending;
+ }
+ }
+
+ // Even though the awaiter is most likely the current task, it could also be
+ // another task.
+ (*header).notify(Some(cx.waker()));
+ return Poll::Ready(None);
+ }
+
+ // If the task is not completed, register the current task.
+ if state & COMPLETED == 0 {
+ // Replace the waker with one associated with the current task.
+ (*header).register(cx.waker());
+
+ // Reload the state after registering. It is possible that the task became
+ // completed or closed just before registration so we need to check for that.
+ state = (*header).state.load(Ordering::Acquire);
+
+ // If the task has been closed, restart.
+ if state & CLOSED != 0 {
+ continue;
+ }
+
+ // If the task is still not completed, we're blocked on it.
+ if state & COMPLETED == 0 {
+ return Poll::Pending;
+ }
+ }
+
+ // Since the task is now completed, mark it as closed in order to grab its output.
+ match (*header).state.compare_exchange(
+ state,
+ state | CLOSED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // Notify the awaiter. Even though the awaiter is most likely the current
+ // task, it could also be another task.
+ if state & AWAITER != 0 {
+ (*header).notify(Some(cx.waker()));
+ }
+
+ // Take the output from the task.
+ let output = ((*header).vtable.get_output)(ptr) as *mut T;
+ return Poll::Ready(Some(output.read()));
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+
+ fn header(&self) -> &Header {
+ let ptr = self.ptr.as_ptr();
+ let header = ptr as *const Header;
+ unsafe { &*header }
+ }
+
+ /// Returns `true` if the current task is finished.
+ ///
+ /// Note that in a multithreaded environment, this task can change finish immediately after calling this function.
+ pub fn is_finished(&self) -> bool {
+ let ptr = self.ptr.as_ptr();
+ let header = ptr as *const Header;
+
+ unsafe {
+ let state = (*header).state.load(Ordering::Acquire);
+ state & (CLOSED | COMPLETED) != 0
+ }
+ }
+}
+
+impl<T> Drop for Task<T> {
+ fn drop(&mut self) {
+ self.set_canceled();
+ self.set_detached();
+ }
+}
+
+impl<T> Future for Task<T> {
+ type Output = T;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ match self.poll_task(cx) {
+ Poll::Ready(t) => Poll::Ready(t.expect("task has failed")),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+}
+
+impl<T> fmt::Debug for Task<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Task")
+ .field("header", self.header())
+ .finish()
+ }
+}
+
+/// A spawned task with a fallible response.
+///
+/// This type behaves like [`Task`], however it produces an `Option<T>` when
+/// polled and will return `None` if the executor dropped its
+/// [`Runnable`][`super::Runnable`] without being run.
+///
+/// This can be useful to avoid the panic produced when polling the `Task`
+/// future if the executor dropped its `Runnable`.
+#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
+pub struct FallibleTask<T> {
+ task: Task<T>,
+}
+
+impl<T> FallibleTask<T> {
+ /// Detaches the task to let it keep running in the background.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use smol::{Executor, Timer};
+ /// use std::time::Duration;
+ ///
+ /// let ex = Executor::new();
+ ///
+ /// // Spawn a deamon future.
+ /// ex.spawn(async {
+ /// loop {
+ /// println!("I'm a daemon task looping forever.");
+ /// Timer::after(Duration::from_secs(1)).await;
+ /// }
+ /// })
+ /// .fallible()
+ /// .detach();
+ /// ```
+ pub fn detach(self) {
+ self.task.detach()
+ }
+
+ /// Cancels the task and waits for it to stop running.
+ ///
+ /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
+ /// it didn't complete.
+ ///
+ /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
+ /// canceling because it also waits for the task to stop running.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # if cfg!(miri) { return; } // Miri does not support epoll
+ /// use smol::{future, Executor, Timer};
+ /// use std::thread;
+ /// use std::time::Duration;
+ ///
+ /// let ex = Executor::new();
+ ///
+ /// // Spawn a deamon future.
+ /// let task = ex.spawn(async {
+ /// loop {
+ /// println!("Even though I'm in an infinite loop, you can still cancel me!");
+ /// Timer::after(Duration::from_secs(1)).await;
+ /// }
+ /// })
+ /// .fallible();
+ ///
+ /// // Run an executor thread.
+ /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
+ ///
+ /// future::block_on(async {
+ /// Timer::after(Duration::from_secs(3)).await;
+ /// task.cancel().await;
+ /// });
+ /// ```
+ pub async fn cancel(self) -> Option<T> {
+ self.task.cancel().await
+ }
+}
+
+impl<T> Future for FallibleTask<T> {
+ type Output = Option<T>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.task.poll_task(cx)
+ }
+}
+
+impl<T> fmt::Debug for FallibleTask<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("FallibleTask")
+ .field("header", self.task.header())
+ .finish()
+ }
+}
diff --git a/third_party/rust/async-task/src/utils.rs b/third_party/rust/async-task/src/utils.rs
new file mode 100644
index 0000000000..189e9af046
--- /dev/null
+++ b/third_party/rust/async-task/src/utils.rs
@@ -0,0 +1,127 @@
+use core::alloc::Layout as StdLayout;
+use core::mem;
+
+/// Aborts the process.
+///
+/// To abort, this function simply panics while panicking.
+pub(crate) fn abort() -> ! {
+ struct Panic;
+
+ impl Drop for Panic {
+ fn drop(&mut self) {
+ panic!("aborting the process");
+ }
+ }
+
+ let _panic = Panic;
+ panic!("aborting the process");
+}
+
+/// Calls a function and aborts if it panics.
+///
+/// This is useful in unsafe code where we can't recover from panics.
+#[inline]
+pub(crate) fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
+ struct Bomb;
+
+ impl Drop for Bomb {
+ fn drop(&mut self) {
+ abort();
+ }
+ }
+
+ let bomb = Bomb;
+ let t = f();
+ mem::forget(bomb);
+ t
+}
+
+/// A version of `alloc::alloc::Layout` that can be used in the const
+/// position.
+#[derive(Clone, Copy, Debug)]
+pub(crate) struct Layout {
+ size: usize,
+ align: usize,
+}
+
+impl Layout {
+ /// Creates a new `Layout` with the given size and alignment.
+ #[inline]
+ pub(crate) const fn from_size_align(size: usize, align: usize) -> Self {
+ Self { size, align }
+ }
+
+ /// Creates a new `Layout` for the given sized type.
+ #[inline]
+ pub(crate) const fn new<T>() -> Self {
+ Self::from_size_align(mem::size_of::<T>(), mem::align_of::<T>())
+ }
+
+ /// Convert this into the standard library's layout type.
+ ///
+ /// # Safety
+ ///
+ /// - `align` must be non-zero and a power of two.
+ /// - When rounded up to the nearest multiple of `align`, the size
+ /// must not overflow.
+ #[inline]
+ pub(crate) const unsafe fn into_std(self) -> StdLayout {
+ StdLayout::from_size_align_unchecked(self.size, self.align)
+ }
+
+ /// Get the alignment of this layout.
+ #[inline]
+ pub(crate) const fn align(&self) -> usize {
+ self.align
+ }
+
+ /// Get the size of this layout.
+ #[inline]
+ pub(crate) const fn size(&self) -> usize {
+ self.size
+ }
+
+ /// Returns the layout for `a` followed by `b` and the offset of `b`.
+ ///
+ /// This function was adapted from the currently unstable `Layout::extend()`:
+ /// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.extend
+ #[inline]
+ pub(crate) const fn extend(self, other: Layout) -> Option<(Layout, usize)> {
+ let new_align = max(self.align(), other.align());
+ let pad = self.padding_needed_for(other.align());
+
+ let offset = leap!(self.size().checked_add(pad));
+ let new_size = leap!(offset.checked_add(other.size()));
+
+ // return None if any of the following are true:
+ // - align is 0 (implied false by is_power_of_two())
+ // - align is not a power of 2
+ // - size rounded up to align overflows
+ if !new_align.is_power_of_two() || new_size > core::usize::MAX - (new_align - 1) {
+ return None;
+ }
+
+ let layout = Layout::from_size_align(new_size, new_align);
+ Some((layout, offset))
+ }
+
+ /// Returns the padding after `layout` that aligns the following address to `align`.
+ ///
+ /// This function was adapted from the currently unstable `Layout::padding_needed_for()`:
+ /// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.padding_needed_for
+ #[inline]
+ pub(crate) const fn padding_needed_for(self, align: usize) -> usize {
+ let len = self.size();
+ let len_rounded_up = len.wrapping_add(align).wrapping_sub(1) & !align.wrapping_sub(1);
+ len_rounded_up.wrapping_sub(len)
+ }
+}
+
+#[inline]
+pub(crate) const fn max(left: usize, right: usize) -> usize {
+ if left > right {
+ left
+ } else {
+ right
+ }
+}