summaryrefslogtreecommitdiffstats
path: root/third_party/rust/async-task/src/raw.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/async-task/src/raw.rs')
-rw-r--r--third_party/rust/async-task/src/raw.rs707
1 files changed, 707 insertions, 0 deletions
diff --git a/third_party/rust/async-task/src/raw.rs b/third_party/rust/async-task/src/raw.rs
new file mode 100644
index 0000000000..bb031da3e3
--- /dev/null
+++ b/third_party/rust/async-task/src/raw.rs
@@ -0,0 +1,707 @@
+use alloc::alloc::Layout as StdLayout;
+use core::cell::UnsafeCell;
+use core::future::Future;
+use core::mem::{self, ManuallyDrop};
+use core::pin::Pin;
+use core::ptr::NonNull;
+use core::sync::atomic::{AtomicUsize, Ordering};
+use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
+
+use crate::header::Header;
+use crate::state::*;
+use crate::utils::{abort, abort_on_panic, max, Layout};
+use crate::Runnable;
+
+/// The vtable for a task.
+pub(crate) struct TaskVTable {
+ /// Schedules the task.
+ pub(crate) schedule: unsafe fn(*const ()),
+
+ /// Drops the future inside the task.
+ pub(crate) drop_future: unsafe fn(*const ()),
+
+ /// Returns a pointer to the output stored after completion.
+ pub(crate) get_output: unsafe fn(*const ()) -> *const (),
+
+ /// Drops the task reference (`Runnable` or `Waker`).
+ pub(crate) drop_ref: unsafe fn(ptr: *const ()),
+
+ /// Destroys the task.
+ pub(crate) destroy: unsafe fn(*const ()),
+
+ /// Runs the task.
+ pub(crate) run: unsafe fn(*const ()) -> bool,
+
+ /// Creates a new waker associated with the task.
+ pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker,
+
+ /// The memory layout of the task. This information enables
+ /// debuggers to decode raw task memory blobs. Do not remove
+ /// the field, even if it appears to be unused.
+ #[allow(unused)]
+ pub(crate) layout_info: &'static Option<TaskLayout>,
+}
+
+/// Memory layout of a task.
+///
+/// This struct contains the following information:
+///
+/// 1. How to allocate and deallocate the task.
+/// 2. How to access the fields inside the task.
+#[derive(Clone, Copy)]
+pub(crate) struct TaskLayout {
+ /// Memory layout of the whole task.
+ pub(crate) layout: StdLayout,
+
+ /// Offset into the task at which the schedule function is stored.
+ pub(crate) offset_s: usize,
+
+ /// Offset into the task at which the future is stored.
+ pub(crate) offset_f: usize,
+
+ /// Offset into the task at which the output is stored.
+ pub(crate) offset_r: usize,
+}
+
+/// Raw pointers to the fields inside a task.
+pub(crate) struct RawTask<F, T, S> {
+ /// The task header.
+ pub(crate) header: *const Header,
+
+ /// The schedule function.
+ pub(crate) schedule: *const S,
+
+ /// The future.
+ pub(crate) future: *mut F,
+
+ /// The output of the future.
+ pub(crate) output: *mut T,
+}
+
+impl<F, T, S> Copy for RawTask<F, T, S> {}
+
+impl<F, T, S> Clone for RawTask<F, T, S> {
+ fn clone(&self) -> Self {
+ *self
+ }
+}
+
+impl<F, T, S> RawTask<F, T, S> {
+ const TASK_LAYOUT: Option<TaskLayout> = Self::eval_task_layout();
+
+ /// Computes the memory layout for a task.
+ #[inline]
+ const fn eval_task_layout() -> Option<TaskLayout> {
+ // Compute the layouts for `Header`, `S`, `F`, and `T`.
+ let layout_header = Layout::new::<Header>();
+ let layout_s = Layout::new::<S>();
+ let layout_f = Layout::new::<F>();
+ let layout_r = Layout::new::<T>();
+
+ // Compute the layout for `union { F, T }`.
+ let size_union = max(layout_f.size(), layout_r.size());
+ let align_union = max(layout_f.align(), layout_r.align());
+ let layout_union = Layout::from_size_align(size_union, align_union);
+
+ // Compute the layout for `Header` followed `S` and `union { F, T }`.
+ let layout = layout_header;
+ let (layout, offset_s) = leap!(layout.extend(layout_s));
+ let (layout, offset_union) = leap!(layout.extend(layout_union));
+ let offset_f = offset_union;
+ let offset_r = offset_union;
+
+ Some(TaskLayout {
+ layout: unsafe { layout.into_std() },
+ offset_s,
+ offset_f,
+ offset_r,
+ })
+ }
+}
+
+impl<F, T, S> RawTask<F, T, S>
+where
+ F: Future<Output = T>,
+ S: Fn(Runnable),
+{
+ const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
+ Self::clone_waker,
+ Self::wake,
+ Self::wake_by_ref,
+ Self::drop_waker,
+ );
+
+ /// Allocates a task with the given `future` and `schedule` function.
+ ///
+ /// It is assumed that initially only the `Runnable` and the `Task` exist.
+ pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> {
+ // Compute the layout of the task for allocation. Abort if the computation fails.
+ //
+ // n.b. notgull: task_layout now automatically aborts instead of panicking
+ let task_layout = Self::task_layout();
+
+ unsafe {
+ // Allocate enough space for the entire task.
+ let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) {
+ None => abort(),
+ Some(p) => p,
+ };
+
+ let raw = Self::from_ptr(ptr.as_ptr());
+
+ // Write the header as the first field of the task.
+ (raw.header as *mut Header).write(Header {
+ state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE),
+ awaiter: UnsafeCell::new(None),
+ vtable: &TaskVTable {
+ schedule: Self::schedule,
+ drop_future: Self::drop_future,
+ get_output: Self::get_output,
+ drop_ref: Self::drop_ref,
+ destroy: Self::destroy,
+ run: Self::run,
+ clone_waker: Self::clone_waker,
+ layout_info: &Self::TASK_LAYOUT,
+ },
+ });
+
+ // Write the schedule function as the third field of the task.
+ (raw.schedule as *mut S).write(schedule);
+
+ // Write the future as the fourth field of the task.
+ raw.future.write(future);
+
+ ptr
+ }
+ }
+
+ /// Creates a `RawTask` from a raw task pointer.
+ #[inline]
+ pub(crate) fn from_ptr(ptr: *const ()) -> Self {
+ let task_layout = Self::task_layout();
+ let p = ptr as *const u8;
+
+ unsafe {
+ Self {
+ header: p as *const Header,
+ schedule: p.add(task_layout.offset_s) as *const S,
+ future: p.add(task_layout.offset_f) as *mut F,
+ output: p.add(task_layout.offset_r) as *mut T,
+ }
+ }
+ }
+
+ /// Returns the layout of the task.
+ #[inline]
+ fn task_layout() -> TaskLayout {
+ match Self::TASK_LAYOUT {
+ Some(tl) => tl,
+ None => abort(),
+ }
+ }
+
+ /// Wakes a waker.
+ unsafe fn wake(ptr: *const ()) {
+ // This is just an optimization. If the schedule function has captured variables, then
+ // we'll do less reference counting if we wake the waker by reference and then drop it.
+ if mem::size_of::<S>() > 0 {
+ Self::wake_by_ref(ptr);
+ Self::drop_waker(ptr);
+ return;
+ }
+
+ let raw = Self::from_ptr(ptr);
+
+ let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+ loop {
+ // If the task is completed or closed, it can't be woken up.
+ if state & (COMPLETED | CLOSED) != 0 {
+ // Drop the waker.
+ Self::drop_waker(ptr);
+ break;
+ }
+
+ // If the task is already scheduled, we just need to synchronize with the thread that
+ // will run the task by "publishing" our current view of the memory.
+ if state & SCHEDULED != 0 {
+ // Update the state without actually modifying it.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ state,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // Drop the waker.
+ Self::drop_waker(ptr);
+ break;
+ }
+ Err(s) => state = s,
+ }
+ } else {
+ // Mark the task as scheduled.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ state | SCHEDULED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // If the task is not yet scheduled and isn't currently running, now is the
+ // time to schedule it.
+ if state & RUNNING == 0 {
+ // Schedule the task.
+ Self::schedule(ptr);
+ } else {
+ // Drop the waker.
+ Self::drop_waker(ptr);
+ }
+
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+
+ /// Wakes a waker by reference.
+ unsafe fn wake_by_ref(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+ loop {
+ // If the task is completed or closed, it can't be woken up.
+ if state & (COMPLETED | CLOSED) != 0 {
+ break;
+ }
+
+ // If the task is already scheduled, we just need to synchronize with the thread that
+ // will run the task by "publishing" our current view of the memory.
+ if state & SCHEDULED != 0 {
+ // Update the state without actually modifying it.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ state,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => break,
+ Err(s) => state = s,
+ }
+ } else {
+ // If the task is not running, we can schedule right away.
+ let new = if state & RUNNING == 0 {
+ (state | SCHEDULED) + REFERENCE
+ } else {
+ state | SCHEDULED
+ };
+
+ // Mark the task as scheduled.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ new,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // If the task is not running, now is the time to schedule.
+ if state & RUNNING == 0 {
+ // If the reference count overflowed, abort.
+ if state > isize::max_value() as usize {
+ abort();
+ }
+
+ // Schedule the task. There is no need to call `Self::schedule(ptr)`
+ // because the schedule function cannot be destroyed while the waker is
+ // still alive.
+ let task = Runnable {
+ ptr: NonNull::new_unchecked(ptr as *mut ()),
+ };
+ (*raw.schedule)(task);
+ }
+
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+
+ /// Clones a waker.
+ unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
+ let raw = Self::from_ptr(ptr);
+
+ // Increment the reference count. With any kind of reference-counted data structure,
+ // relaxed ordering is appropriate when incrementing the counter.
+ let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
+
+ // If the reference count overflowed, abort.
+ if state > isize::max_value() as usize {
+ abort();
+ }
+
+ RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)
+ }
+
+ /// Drops a waker.
+ ///
+ /// This function will decrement the reference count. If it drops down to zero, the associated
+ /// `Task` has been dropped too, and the task has not been completed, then it will get
+ /// scheduled one more time so that its future gets dropped by the executor.
+ #[inline]
+ unsafe fn drop_waker(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ // Decrement the reference count.
+ let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
+
+ // If this was the last reference to the task and the `Task` has been dropped too,
+ // then we need to decide how to destroy the task.
+ if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
+ if new & (COMPLETED | CLOSED) == 0 {
+ // If the task was not completed nor closed, close it and schedule one more time so
+ // that its future gets dropped by the executor.
+ (*raw.header)
+ .state
+ .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
+ Self::schedule(ptr);
+ } else {
+ // Otherwise, destroy the task right away.
+ Self::destroy(ptr);
+ }
+ }
+ }
+
+ /// Drops a task reference (`Runnable` or `Waker`).
+ ///
+ /// This function will decrement the reference count. If it drops down to zero and the
+ /// associated `Task` handle has been dropped too, then the task gets destroyed.
+ #[inline]
+ unsafe fn drop_ref(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ // Decrement the reference count.
+ let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
+
+ // If this was the last reference to the task and the `Task` has been dropped too,
+ // then destroy the task.
+ if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
+ Self::destroy(ptr);
+ }
+ }
+
+ /// Schedules a task for running.
+ ///
+ /// This function doesn't modify the state of the task. It only passes the task reference to
+ /// its schedule function.
+ unsafe fn schedule(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ // If the schedule function has captured variables, create a temporary waker that prevents
+ // the task from getting deallocated while the function is being invoked.
+ let _waker;
+ if mem::size_of::<S>() > 0 {
+ _waker = Waker::from_raw(Self::clone_waker(ptr));
+ }
+
+ let task = Runnable {
+ ptr: NonNull::new_unchecked(ptr as *mut ()),
+ };
+ (*raw.schedule)(task);
+ }
+
+ /// Drops the future inside a task.
+ #[inline]
+ unsafe fn drop_future(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+
+ // We need a safeguard against panics because the destructor can panic.
+ abort_on_panic(|| {
+ raw.future.drop_in_place();
+ })
+ }
+
+ /// Returns a pointer to the output inside a task.
+ unsafe fn get_output(ptr: *const ()) -> *const () {
+ let raw = Self::from_ptr(ptr);
+ raw.output as *const ()
+ }
+
+ /// Cleans up task's resources and deallocates it.
+ ///
+ /// The schedule function will be dropped, and the task will then get deallocated.
+ /// The task must be closed before this function is called.
+ #[inline]
+ unsafe fn destroy(ptr: *const ()) {
+ let raw = Self::from_ptr(ptr);
+ let task_layout = Self::task_layout();
+
+ // We need a safeguard against panics because destructors can panic.
+ abort_on_panic(|| {
+ // Drop the schedule function.
+ (raw.schedule as *mut S).drop_in_place();
+ });
+
+ // Finally, deallocate the memory reserved by the task.
+ alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout);
+ }
+
+ /// Runs a task.
+ ///
+ /// If polling its future panics, the task will be closed and the panic will be propagated into
+ /// the caller.
+ unsafe fn run(ptr: *const ()) -> bool {
+ let raw = Self::from_ptr(ptr);
+
+ // Create a context from the raw task pointer and the vtable inside the its header.
+ let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)));
+ let cx = &mut Context::from_waker(&waker);
+
+ let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+ // Update the task's state before polling its future.
+ loop {
+ // If the task has already been closed, drop the task reference and return.
+ if state & CLOSED != 0 {
+ // Drop the future.
+ Self::drop_future(ptr);
+
+ // Mark the task as unscheduled.
+ let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
+
+ // Take the awaiter out.
+ let mut awaiter = None;
+ if state & AWAITER != 0 {
+ awaiter = (*raw.header).take(None);
+ }
+
+ // Drop the task reference.
+ Self::drop_ref(ptr);
+
+ // Notify the awaiter that the future has been dropped.
+ if let Some(w) = awaiter {
+ abort_on_panic(|| w.wake());
+ }
+ return false;
+ }
+
+ // Mark the task as unscheduled and running.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ (state & !SCHEDULED) | RUNNING,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // Update the state because we're continuing with polling the future.
+ state = (state & !SCHEDULED) | RUNNING;
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+
+ // Poll the inner future, but surround it with a guard that closes the task in case polling
+ // panics.
+ let guard = Guard(raw);
+ let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
+ mem::forget(guard);
+
+ match poll {
+ Poll::Ready(out) => {
+ // Replace the future with its output.
+ Self::drop_future(ptr);
+ raw.output.write(out);
+
+ // The task is now completed.
+ loop {
+ // If the `Task` is dropped, we'll need to close it and drop the output.
+ let new = if state & TASK == 0 {
+ (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
+ } else {
+ (state & !RUNNING & !SCHEDULED) | COMPLETED
+ };
+
+ // Mark the task as not running and completed.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ new,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(_) => {
+ // If the `Task` is dropped or if the task was closed while running,
+ // now it's time to drop the output.
+ if state & TASK == 0 || state & CLOSED != 0 {
+ // Drop the output.
+ abort_on_panic(|| raw.output.drop_in_place());
+ }
+
+ // Take the awaiter out.
+ let mut awaiter = None;
+ if state & AWAITER != 0 {
+ awaiter = (*raw.header).take(None);
+ }
+
+ // Drop the task reference.
+ Self::drop_ref(ptr);
+
+ // Notify the awaiter that the future has been dropped.
+ if let Some(w) = awaiter {
+ abort_on_panic(|| w.wake());
+ }
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ Poll::Pending => {
+ let mut future_dropped = false;
+
+ // The task is still not completed.
+ loop {
+ // If the task was closed while running, we'll need to unschedule in case it
+ // was woken up and then destroy it.
+ let new = if state & CLOSED != 0 {
+ state & !RUNNING & !SCHEDULED
+ } else {
+ state & !RUNNING
+ };
+
+ if state & CLOSED != 0 && !future_dropped {
+ // The thread that closed the task didn't drop the future because it was
+ // running so now it's our responsibility to do so.
+ Self::drop_future(ptr);
+ future_dropped = true;
+ }
+
+ // Mark the task as not running.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ new,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(state) => {
+ // If the task was closed while running, we need to notify the awaiter.
+ // If the task was woken up while running, we need to schedule it.
+ // Otherwise, we just drop the task reference.
+ if state & CLOSED != 0 {
+ // Take the awaiter out.
+ let mut awaiter = None;
+ if state & AWAITER != 0 {
+ awaiter = (*raw.header).take(None);
+ }
+
+ // Drop the task reference.
+ Self::drop_ref(ptr);
+
+ // Notify the awaiter that the future has been dropped.
+ if let Some(w) = awaiter {
+ abort_on_panic(|| w.wake());
+ }
+ } else if state & SCHEDULED != 0 {
+ // The thread that woke the task up didn't reschedule it because
+ // it was running so now it's our responsibility to do so.
+ Self::schedule(ptr);
+ return true;
+ } else {
+ // Drop the task reference.
+ Self::drop_ref(ptr);
+ }
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+
+ return false;
+
+ /// A guard that closes the task if polling its future panics.
+ struct Guard<F, T, S>(RawTask<F, T, S>)
+ where
+ F: Future<Output = T>,
+ S: Fn(Runnable);
+
+ impl<F, T, S> Drop for Guard<F, T, S>
+ where
+ F: Future<Output = T>,
+ S: Fn(Runnable),
+ {
+ fn drop(&mut self) {
+ let raw = self.0;
+ let ptr = raw.header as *const ();
+
+ unsafe {
+ let mut state = (*raw.header).state.load(Ordering::Acquire);
+
+ loop {
+ // If the task was closed while running, then unschedule it, drop its
+ // future, and drop the task reference.
+ if state & CLOSED != 0 {
+ // The thread that closed the task didn't drop the future because it
+ // was running so now it's our responsibility to do so.
+ RawTask::<F, T, S>::drop_future(ptr);
+
+ // Mark the task as not running and not scheduled.
+ (*raw.header)
+ .state
+ .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel);
+
+ // Take the awaiter out.
+ let mut awaiter = None;
+ if state & AWAITER != 0 {
+ awaiter = (*raw.header).take(None);
+ }
+
+ // Drop the task reference.
+ RawTask::<F, T, S>::drop_ref(ptr);
+
+ // Notify the awaiter that the future has been dropped.
+ if let Some(w) = awaiter {
+ abort_on_panic(|| w.wake());
+ }
+ break;
+ }
+
+ // Mark the task as not running, not scheduled, and closed.
+ match (*raw.header).state.compare_exchange_weak(
+ state,
+ (state & !RUNNING & !SCHEDULED) | CLOSED,
+ Ordering::AcqRel,
+ Ordering::Acquire,
+ ) {
+ Ok(state) => {
+ // Drop the future because the task is now closed.
+ RawTask::<F, T, S>::drop_future(ptr);
+
+ // Take the awaiter out.
+ let mut awaiter = None;
+ if state & AWAITER != 0 {
+ awaiter = (*raw.header).take(None);
+ }
+
+ // Drop the task reference.
+ RawTask::<F, T, S>::drop_ref(ptr);
+
+ // Notify the awaiter that the future has been dropped.
+ if let Some(w) = awaiter {
+ abort_on_panic(|| w.wake());
+ }
+ break;
+ }
+ Err(s) => state = s,
+ }
+ }
+ }
+ }
+ }
+ }
+}