diff options
Diffstat (limited to 'third_party/rust/async-task/src/raw.rs')
-rw-r--r-- | third_party/rust/async-task/src/raw.rs | 707 |
1 files changed, 707 insertions, 0 deletions
diff --git a/third_party/rust/async-task/src/raw.rs b/third_party/rust/async-task/src/raw.rs new file mode 100644 index 0000000000..bb031da3e3 --- /dev/null +++ b/third_party/rust/async-task/src/raw.rs @@ -0,0 +1,707 @@ +use alloc::alloc::Layout as StdLayout; +use core::cell::UnsafeCell; +use core::future::Future; +use core::mem::{self, ManuallyDrop}; +use core::pin::Pin; +use core::ptr::NonNull; +use core::sync::atomic::{AtomicUsize, Ordering}; +use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + +use crate::header::Header; +use crate::state::*; +use crate::utils::{abort, abort_on_panic, max, Layout}; +use crate::Runnable; + +/// The vtable for a task. +pub(crate) struct TaskVTable { + /// Schedules the task. + pub(crate) schedule: unsafe fn(*const ()), + + /// Drops the future inside the task. + pub(crate) drop_future: unsafe fn(*const ()), + + /// Returns a pointer to the output stored after completion. + pub(crate) get_output: unsafe fn(*const ()) -> *const (), + + /// Drops the task reference (`Runnable` or `Waker`). + pub(crate) drop_ref: unsafe fn(ptr: *const ()), + + /// Destroys the task. + pub(crate) destroy: unsafe fn(*const ()), + + /// Runs the task. + pub(crate) run: unsafe fn(*const ()) -> bool, + + /// Creates a new waker associated with the task. + pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker, + + /// The memory layout of the task. This information enables + /// debuggers to decode raw task memory blobs. Do not remove + /// the field, even if it appears to be unused. + #[allow(unused)] + pub(crate) layout_info: &'static Option<TaskLayout>, +} + +/// Memory layout of a task. +/// +/// This struct contains the following information: +/// +/// 1. How to allocate and deallocate the task. +/// 2. How to access the fields inside the task. +#[derive(Clone, Copy)] +pub(crate) struct TaskLayout { + /// Memory layout of the whole task. + pub(crate) layout: StdLayout, + + /// Offset into the task at which the schedule function is stored. + pub(crate) offset_s: usize, + + /// Offset into the task at which the future is stored. + pub(crate) offset_f: usize, + + /// Offset into the task at which the output is stored. + pub(crate) offset_r: usize, +} + +/// Raw pointers to the fields inside a task. +pub(crate) struct RawTask<F, T, S> { + /// The task header. + pub(crate) header: *const Header, + + /// The schedule function. + pub(crate) schedule: *const S, + + /// The future. + pub(crate) future: *mut F, + + /// The output of the future. + pub(crate) output: *mut T, +} + +impl<F, T, S> Copy for RawTask<F, T, S> {} + +impl<F, T, S> Clone for RawTask<F, T, S> { + fn clone(&self) -> Self { + *self + } +} + +impl<F, T, S> RawTask<F, T, S> { + const TASK_LAYOUT: Option<TaskLayout> = Self::eval_task_layout(); + + /// Computes the memory layout for a task. + #[inline] + const fn eval_task_layout() -> Option<TaskLayout> { + // Compute the layouts for `Header`, `S`, `F`, and `T`. + let layout_header = Layout::new::<Header>(); + let layout_s = Layout::new::<S>(); + let layout_f = Layout::new::<F>(); + let layout_r = Layout::new::<T>(); + + // Compute the layout for `union { F, T }`. + let size_union = max(layout_f.size(), layout_r.size()); + let align_union = max(layout_f.align(), layout_r.align()); + let layout_union = Layout::from_size_align(size_union, align_union); + + // Compute the layout for `Header` followed `S` and `union { F, T }`. + let layout = layout_header; + let (layout, offset_s) = leap!(layout.extend(layout_s)); + let (layout, offset_union) = leap!(layout.extend(layout_union)); + let offset_f = offset_union; + let offset_r = offset_union; + + Some(TaskLayout { + layout: unsafe { layout.into_std() }, + offset_s, + offset_f, + offset_r, + }) + } +} + +impl<F, T, S> RawTask<F, T, S> +where + F: Future<Output = T>, + S: Fn(Runnable), +{ + const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + Self::clone_waker, + Self::wake, + Self::wake_by_ref, + Self::drop_waker, + ); + + /// Allocates a task with the given `future` and `schedule` function. + /// + /// It is assumed that initially only the `Runnable` and the `Task` exist. + pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> { + // Compute the layout of the task for allocation. Abort if the computation fails. + // + // n.b. notgull: task_layout now automatically aborts instead of panicking + let task_layout = Self::task_layout(); + + unsafe { + // Allocate enough space for the entire task. + let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) { + None => abort(), + Some(p) => p, + }; + + let raw = Self::from_ptr(ptr.as_ptr()); + + // Write the header as the first field of the task. + (raw.header as *mut Header).write(Header { + state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE), + awaiter: UnsafeCell::new(None), + vtable: &TaskVTable { + schedule: Self::schedule, + drop_future: Self::drop_future, + get_output: Self::get_output, + drop_ref: Self::drop_ref, + destroy: Self::destroy, + run: Self::run, + clone_waker: Self::clone_waker, + layout_info: &Self::TASK_LAYOUT, + }, + }); + + // Write the schedule function as the third field of the task. + (raw.schedule as *mut S).write(schedule); + + // Write the future as the fourth field of the task. + raw.future.write(future); + + ptr + } + } + + /// Creates a `RawTask` from a raw task pointer. + #[inline] + pub(crate) fn from_ptr(ptr: *const ()) -> Self { + let task_layout = Self::task_layout(); + let p = ptr as *const u8; + + unsafe { + Self { + header: p as *const Header, + schedule: p.add(task_layout.offset_s) as *const S, + future: p.add(task_layout.offset_f) as *mut F, + output: p.add(task_layout.offset_r) as *mut T, + } + } + } + + /// Returns the layout of the task. + #[inline] + fn task_layout() -> TaskLayout { + match Self::TASK_LAYOUT { + Some(tl) => tl, + None => abort(), + } + } + + /// Wakes a waker. + unsafe fn wake(ptr: *const ()) { + // This is just an optimization. If the schedule function has captured variables, then + // we'll do less reference counting if we wake the waker by reference and then drop it. + if mem::size_of::<S>() > 0 { + Self::wake_by_ref(ptr); + Self::drop_waker(ptr); + return; + } + + let raw = Self::from_ptr(ptr); + + let mut state = (*raw.header).state.load(Ordering::Acquire); + + loop { + // If the task is completed or closed, it can't be woken up. + if state & (COMPLETED | CLOSED) != 0 { + // Drop the waker. + Self::drop_waker(ptr); + break; + } + + // If the task is already scheduled, we just need to synchronize with the thread that + // will run the task by "publishing" our current view of the memory. + if state & SCHEDULED != 0 { + // Update the state without actually modifying it. + match (*raw.header).state.compare_exchange_weak( + state, + state, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Drop the waker. + Self::drop_waker(ptr); + break; + } + Err(s) => state = s, + } + } else { + // Mark the task as scheduled. + match (*raw.header).state.compare_exchange_weak( + state, + state | SCHEDULED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not yet scheduled and isn't currently running, now is the + // time to schedule it. + if state & RUNNING == 0 { + // Schedule the task. + Self::schedule(ptr); + } else { + // Drop the waker. + Self::drop_waker(ptr); + } + + break; + } + Err(s) => state = s, + } + } + } + } + + /// Wakes a waker by reference. + unsafe fn wake_by_ref(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + let mut state = (*raw.header).state.load(Ordering::Acquire); + + loop { + // If the task is completed or closed, it can't be woken up. + if state & (COMPLETED | CLOSED) != 0 { + break; + } + + // If the task is already scheduled, we just need to synchronize with the thread that + // will run the task by "publishing" our current view of the memory. + if state & SCHEDULED != 0 { + // Update the state without actually modifying it. + match (*raw.header).state.compare_exchange_weak( + state, + state, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(s) => state = s, + } + } else { + // If the task is not running, we can schedule right away. + let new = if state & RUNNING == 0 { + (state | SCHEDULED) + REFERENCE + } else { + state | SCHEDULED + }; + + // Mark the task as scheduled. + match (*raw.header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not running, now is the time to schedule. + if state & RUNNING == 0 { + // If the reference count overflowed, abort. + if state > isize::max_value() as usize { + abort(); + } + + // Schedule the task. There is no need to call `Self::schedule(ptr)` + // because the schedule function cannot be destroyed while the waker is + // still alive. + let task = Runnable { + ptr: NonNull::new_unchecked(ptr as *mut ()), + }; + (*raw.schedule)(task); + } + + break; + } + Err(s) => state = s, + } + } + } + } + + /// Clones a waker. + unsafe fn clone_waker(ptr: *const ()) -> RawWaker { + let raw = Self::from_ptr(ptr); + + // Increment the reference count. With any kind of reference-counted data structure, + // relaxed ordering is appropriate when incrementing the counter. + let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed); + + // If the reference count overflowed, abort. + if state > isize::max_value() as usize { + abort(); + } + + RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE) + } + + /// Drops a waker. + /// + /// This function will decrement the reference count. If it drops down to zero, the associated + /// `Task` has been dropped too, and the task has not been completed, then it will get + /// scheduled one more time so that its future gets dropped by the executor. + #[inline] + unsafe fn drop_waker(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + // Decrement the reference count. + let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; + + // If this was the last reference to the task and the `Task` has been dropped too, + // then we need to decide how to destroy the task. + if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { + if new & (COMPLETED | CLOSED) == 0 { + // If the task was not completed nor closed, close it and schedule one more time so + // that its future gets dropped by the executor. + (*raw.header) + .state + .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); + Self::schedule(ptr); + } else { + // Otherwise, destroy the task right away. + Self::destroy(ptr); + } + } + } + + /// Drops a task reference (`Runnable` or `Waker`). + /// + /// This function will decrement the reference count. If it drops down to zero and the + /// associated `Task` handle has been dropped too, then the task gets destroyed. + #[inline] + unsafe fn drop_ref(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + // Decrement the reference count. + let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; + + // If this was the last reference to the task and the `Task` has been dropped too, + // then destroy the task. + if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { + Self::destroy(ptr); + } + } + + /// Schedules a task for running. + /// + /// This function doesn't modify the state of the task. It only passes the task reference to + /// its schedule function. + unsafe fn schedule(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + // If the schedule function has captured variables, create a temporary waker that prevents + // the task from getting deallocated while the function is being invoked. + let _waker; + if mem::size_of::<S>() > 0 { + _waker = Waker::from_raw(Self::clone_waker(ptr)); + } + + let task = Runnable { + ptr: NonNull::new_unchecked(ptr as *mut ()), + }; + (*raw.schedule)(task); + } + + /// Drops the future inside a task. + #[inline] + unsafe fn drop_future(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + // We need a safeguard against panics because the destructor can panic. + abort_on_panic(|| { + raw.future.drop_in_place(); + }) + } + + /// Returns a pointer to the output inside a task. + unsafe fn get_output(ptr: *const ()) -> *const () { + let raw = Self::from_ptr(ptr); + raw.output as *const () + } + + /// Cleans up task's resources and deallocates it. + /// + /// The schedule function will be dropped, and the task will then get deallocated. + /// The task must be closed before this function is called. + #[inline] + unsafe fn destroy(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + let task_layout = Self::task_layout(); + + // We need a safeguard against panics because destructors can panic. + abort_on_panic(|| { + // Drop the schedule function. + (raw.schedule as *mut S).drop_in_place(); + }); + + // Finally, deallocate the memory reserved by the task. + alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout); + } + + /// Runs a task. + /// + /// If polling its future panics, the task will be closed and the panic will be propagated into + /// the caller. + unsafe fn run(ptr: *const ()) -> bool { + let raw = Self::from_ptr(ptr); + + // Create a context from the raw task pointer and the vtable inside the its header. + let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE))); + let cx = &mut Context::from_waker(&waker); + + let mut state = (*raw.header).state.load(Ordering::Acquire); + + // Update the task's state before polling its future. + loop { + // If the task has already been closed, drop the task reference and return. + if state & CLOSED != 0 { + // Drop the future. + Self::drop_future(ptr); + + // Mark the task as unscheduled. + let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); + + // Take the awaiter out. + let mut awaiter = None; + if state & AWAITER != 0 { + awaiter = (*raw.header).take(None); + } + + // Drop the task reference. + Self::drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } + return false; + } + + // Mark the task as unscheduled and running. + match (*raw.header).state.compare_exchange_weak( + state, + (state & !SCHEDULED) | RUNNING, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Update the state because we're continuing with polling the future. + state = (state & !SCHEDULED) | RUNNING; + break; + } + Err(s) => state = s, + } + } + + // Poll the inner future, but surround it with a guard that closes the task in case polling + // panics. + let guard = Guard(raw); + let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx); + mem::forget(guard); + + match poll { + Poll::Ready(out) => { + // Replace the future with its output. + Self::drop_future(ptr); + raw.output.write(out); + + // The task is now completed. + loop { + // If the `Task` is dropped, we'll need to close it and drop the output. + let new = if state & TASK == 0 { + (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED + } else { + (state & !RUNNING & !SCHEDULED) | COMPLETED + }; + + // Mark the task as not running and completed. + match (*raw.header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the `Task` is dropped or if the task was closed while running, + // now it's time to drop the output. + if state & TASK == 0 || state & CLOSED != 0 { + // Drop the output. + abort_on_panic(|| raw.output.drop_in_place()); + } + + // Take the awaiter out. + let mut awaiter = None; + if state & AWAITER != 0 { + awaiter = (*raw.header).take(None); + } + + // Drop the task reference. + Self::drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } + break; + } + Err(s) => state = s, + } + } + } + Poll::Pending => { + let mut future_dropped = false; + + // The task is still not completed. + loop { + // If the task was closed while running, we'll need to unschedule in case it + // was woken up and then destroy it. + let new = if state & CLOSED != 0 { + state & !RUNNING & !SCHEDULED + } else { + state & !RUNNING + }; + + if state & CLOSED != 0 && !future_dropped { + // The thread that closed the task didn't drop the future because it was + // running so now it's our responsibility to do so. + Self::drop_future(ptr); + future_dropped = true; + } + + // Mark the task as not running. + match (*raw.header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(state) => { + // If the task was closed while running, we need to notify the awaiter. + // If the task was woken up while running, we need to schedule it. + // Otherwise, we just drop the task reference. + if state & CLOSED != 0 { + // Take the awaiter out. + let mut awaiter = None; + if state & AWAITER != 0 { + awaiter = (*raw.header).take(None); + } + + // Drop the task reference. + Self::drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } + } else if state & SCHEDULED != 0 { + // The thread that woke the task up didn't reschedule it because + // it was running so now it's our responsibility to do so. + Self::schedule(ptr); + return true; + } else { + // Drop the task reference. + Self::drop_ref(ptr); + } + break; + } + Err(s) => state = s, + } + } + } + } + + return false; + + /// A guard that closes the task if polling its future panics. + struct Guard<F, T, S>(RawTask<F, T, S>) + where + F: Future<Output = T>, + S: Fn(Runnable); + + impl<F, T, S> Drop for Guard<F, T, S> + where + F: Future<Output = T>, + S: Fn(Runnable), + { + fn drop(&mut self) { + let raw = self.0; + let ptr = raw.header as *const (); + + unsafe { + let mut state = (*raw.header).state.load(Ordering::Acquire); + + loop { + // If the task was closed while running, then unschedule it, drop its + // future, and drop the task reference. + if state & CLOSED != 0 { + // The thread that closed the task didn't drop the future because it + // was running so now it's our responsibility to do so. + RawTask::<F, T, S>::drop_future(ptr); + + // Mark the task as not running and not scheduled. + (*raw.header) + .state + .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel); + + // Take the awaiter out. + let mut awaiter = None; + if state & AWAITER != 0 { + awaiter = (*raw.header).take(None); + } + + // Drop the task reference. + RawTask::<F, T, S>::drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } + break; + } + + // Mark the task as not running, not scheduled, and closed. + match (*raw.header).state.compare_exchange_weak( + state, + (state & !RUNNING & !SCHEDULED) | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(state) => { + // Drop the future because the task is now closed. + RawTask::<F, T, S>::drop_future(ptr); + + // Take the awaiter out. + let mut awaiter = None; + if state & AWAITER != 0 { + awaiter = (*raw.header).take(None); + } + + // Drop the task reference. + RawTask::<F, T, S>::drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } + break; + } + Err(s) => state = s, + } + } + } + } + } + } +} |