use alloc::alloc::Layout as StdLayout; use core::cell::UnsafeCell; use core::future::Future; use core::mem::{self, ManuallyDrop}; use core::pin::Pin; use core::ptr::NonNull; use core::sync::atomic::{AtomicUsize, Ordering}; use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use crate::header::Header; use crate::state::*; use crate::utils::{abort, abort_on_panic, max, Layout}; use crate::Runnable; /// The vtable for a task. pub(crate) struct TaskVTable { /// Schedules the task. pub(crate) schedule: unsafe fn(*const ()), /// Drops the future inside the task. pub(crate) drop_future: unsafe fn(*const ()), /// Returns a pointer to the output stored after completion. pub(crate) get_output: unsafe fn(*const ()) -> *const (), /// Drops the task reference (`Runnable` or `Waker`). pub(crate) drop_ref: unsafe fn(ptr: *const ()), /// Destroys the task. pub(crate) destroy: unsafe fn(*const ()), /// Runs the task. pub(crate) run: unsafe fn(*const ()) -> bool, /// Creates a new waker associated with the task. pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker, /// The memory layout of the task. This information enables /// debuggers to decode raw task memory blobs. Do not remove /// the field, even if it appears to be unused. #[allow(unused)] pub(crate) layout_info: &'static Option, } /// Memory layout of a task. /// /// This struct contains the following information: /// /// 1. How to allocate and deallocate the task. /// 2. How to access the fields inside the task. #[derive(Clone, Copy)] pub(crate) struct TaskLayout { /// Memory layout of the whole task. pub(crate) layout: StdLayout, /// Offset into the task at which the schedule function is stored. pub(crate) offset_s: usize, /// Offset into the task at which the future is stored. pub(crate) offset_f: usize, /// Offset into the task at which the output is stored. pub(crate) offset_r: usize, } /// Raw pointers to the fields inside a task. pub(crate) struct RawTask { /// The task header. pub(crate) header: *const Header, /// The schedule function. pub(crate) schedule: *const S, /// The future. pub(crate) future: *mut F, /// The output of the future. pub(crate) output: *mut T, } impl Copy for RawTask {} impl Clone for RawTask { fn clone(&self) -> Self { *self } } impl RawTask { const TASK_LAYOUT: Option = Self::eval_task_layout(); /// Computes the memory layout for a task. #[inline] const fn eval_task_layout() -> Option { // Compute the layouts for `Header`, `S`, `F`, and `T`. let layout_header = Layout::new::
(); let layout_s = Layout::new::(); let layout_f = Layout::new::(); let layout_r = Layout::new::(); // Compute the layout for `union { F, T }`. let size_union = max(layout_f.size(), layout_r.size()); let align_union = max(layout_f.align(), layout_r.align()); let layout_union = Layout::from_size_align(size_union, align_union); // Compute the layout for `Header` followed `S` and `union { F, T }`. let layout = layout_header; let (layout, offset_s) = leap!(layout.extend(layout_s)); let (layout, offset_union) = leap!(layout.extend(layout_union)); let offset_f = offset_union; let offset_r = offset_union; Some(TaskLayout { layout: unsafe { layout.into_std() }, offset_s, offset_f, offset_r, }) } } impl RawTask where F: Future, S: Fn(Runnable), { const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( Self::clone_waker, Self::wake, Self::wake_by_ref, Self::drop_waker, ); /// Allocates a task with the given `future` and `schedule` function. /// /// It is assumed that initially only the `Runnable` and the `Task` exist. pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> { // Compute the layout of the task for allocation. Abort if the computation fails. // // n.b. notgull: task_layout now automatically aborts instead of panicking let task_layout = Self::task_layout(); unsafe { // Allocate enough space for the entire task. let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) { None => abort(), Some(p) => p, }; let raw = Self::from_ptr(ptr.as_ptr()); // Write the header as the first field of the task. (raw.header as *mut Header).write(Header { state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE), awaiter: UnsafeCell::new(None), vtable: &TaskVTable { schedule: Self::schedule, drop_future: Self::drop_future, get_output: Self::get_output, drop_ref: Self::drop_ref, destroy: Self::destroy, run: Self::run, clone_waker: Self::clone_waker, layout_info: &Self::TASK_LAYOUT, }, }); // Write the schedule function as the third field of the task. (raw.schedule as *mut S).write(schedule); // Write the future as the fourth field of the task. raw.future.write(future); ptr } } /// Creates a `RawTask` from a raw task pointer. #[inline] pub(crate) fn from_ptr(ptr: *const ()) -> Self { let task_layout = Self::task_layout(); let p = ptr as *const u8; unsafe { Self { header: p as *const Header, schedule: p.add(task_layout.offset_s) as *const S, future: p.add(task_layout.offset_f) as *mut F, output: p.add(task_layout.offset_r) as *mut T, } } } /// Returns the layout of the task. #[inline] fn task_layout() -> TaskLayout { match Self::TASK_LAYOUT { Some(tl) => tl, None => abort(), } } /// Wakes a waker. unsafe fn wake(ptr: *const ()) { // This is just an optimization. If the schedule function has captured variables, then // we'll do less reference counting if we wake the waker by reference and then drop it. if mem::size_of::() > 0 { Self::wake_by_ref(ptr); Self::drop_waker(ptr); return; } let raw = Self::from_ptr(ptr); let mut state = (*raw.header).state.load(Ordering::Acquire); loop { // If the task is completed or closed, it can't be woken up. if state & (COMPLETED | CLOSED) != 0 { // Drop the waker. Self::drop_waker(ptr); break; } // If the task is already scheduled, we just need to synchronize with the thread that // will run the task by "publishing" our current view of the memory. if state & SCHEDULED != 0 { // Update the state without actually modifying it. match (*raw.header).state.compare_exchange_weak( state, state, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // Drop the waker. Self::drop_waker(ptr); break; } Err(s) => state = s, } } else { // Mark the task as scheduled. match (*raw.header).state.compare_exchange_weak( state, state | SCHEDULED, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // If the task is not yet scheduled and isn't currently running, now is the // time to schedule it. if state & RUNNING == 0 { // Schedule the task. Self::schedule(ptr); } else { // Drop the waker. Self::drop_waker(ptr); } break; } Err(s) => state = s, } } } } /// Wakes a waker by reference. unsafe fn wake_by_ref(ptr: *const ()) { let raw = Self::from_ptr(ptr); let mut state = (*raw.header).state.load(Ordering::Acquire); loop { // If the task is completed or closed, it can't be woken up. if state & (COMPLETED | CLOSED) != 0 { break; } // If the task is already scheduled, we just need to synchronize with the thread that // will run the task by "publishing" our current view of the memory. if state & SCHEDULED != 0 { // Update the state without actually modifying it. match (*raw.header).state.compare_exchange_weak( state, state, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => break, Err(s) => state = s, } } else { // If the task is not running, we can schedule right away. let new = if state & RUNNING == 0 { (state | SCHEDULED) + REFERENCE } else { state | SCHEDULED }; // Mark the task as scheduled. match (*raw.header).state.compare_exchange_weak( state, new, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // If the task is not running, now is the time to schedule. if state & RUNNING == 0 { // If the reference count overflowed, abort. if state > isize::max_value() as usize { abort(); } // Schedule the task. There is no need to call `Self::schedule(ptr)` // because the schedule function cannot be destroyed while the waker is // still alive. let task = Runnable { ptr: NonNull::new_unchecked(ptr as *mut ()), }; (*raw.schedule)(task); } break; } Err(s) => state = s, } } } } /// Clones a waker. unsafe fn clone_waker(ptr: *const ()) -> RawWaker { let raw = Self::from_ptr(ptr); // Increment the reference count. With any kind of reference-counted data structure, // relaxed ordering is appropriate when incrementing the counter. let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed); // If the reference count overflowed, abort. if state > isize::max_value() as usize { abort(); } RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE) } /// Drops a waker. /// /// This function will decrement the reference count. If it drops down to zero, the associated /// `Task` has been dropped too, and the task has not been completed, then it will get /// scheduled one more time so that its future gets dropped by the executor. #[inline] unsafe fn drop_waker(ptr: *const ()) { let raw = Self::from_ptr(ptr); // Decrement the reference count. let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; // If this was the last reference to the task and the `Task` has been dropped too, // then we need to decide how to destroy the task. if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { if new & (COMPLETED | CLOSED) == 0 { // If the task was not completed nor closed, close it and schedule one more time so // that its future gets dropped by the executor. (*raw.header) .state .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); Self::schedule(ptr); } else { // Otherwise, destroy the task right away. Self::destroy(ptr); } } } /// Drops a task reference (`Runnable` or `Waker`). /// /// This function will decrement the reference count. If it drops down to zero and the /// associated `Task` handle has been dropped too, then the task gets destroyed. #[inline] unsafe fn drop_ref(ptr: *const ()) { let raw = Self::from_ptr(ptr); // Decrement the reference count. let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; // If this was the last reference to the task and the `Task` has been dropped too, // then destroy the task. if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { Self::destroy(ptr); } } /// Schedules a task for running. /// /// This function doesn't modify the state of the task. It only passes the task reference to /// its schedule function. unsafe fn schedule(ptr: *const ()) { let raw = Self::from_ptr(ptr); // If the schedule function has captured variables, create a temporary waker that prevents // the task from getting deallocated while the function is being invoked. let _waker; if mem::size_of::() > 0 { _waker = Waker::from_raw(Self::clone_waker(ptr)); } let task = Runnable { ptr: NonNull::new_unchecked(ptr as *mut ()), }; (*raw.schedule)(task); } /// Drops the future inside a task. #[inline] unsafe fn drop_future(ptr: *const ()) { let raw = Self::from_ptr(ptr); // We need a safeguard against panics because the destructor can panic. abort_on_panic(|| { raw.future.drop_in_place(); }) } /// Returns a pointer to the output inside a task. unsafe fn get_output(ptr: *const ()) -> *const () { let raw = Self::from_ptr(ptr); raw.output as *const () } /// Cleans up task's resources and deallocates it. /// /// The schedule function will be dropped, and the task will then get deallocated. /// The task must be closed before this function is called. #[inline] unsafe fn destroy(ptr: *const ()) { let raw = Self::from_ptr(ptr); let task_layout = Self::task_layout(); // We need a safeguard against panics because destructors can panic. abort_on_panic(|| { // Drop the schedule function. (raw.schedule as *mut S).drop_in_place(); }); // Finally, deallocate the memory reserved by the task. alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout); } /// Runs a task. /// /// If polling its future panics, the task will be closed and the panic will be propagated into /// the caller. unsafe fn run(ptr: *const ()) -> bool { let raw = Self::from_ptr(ptr); // Create a context from the raw task pointer and the vtable inside the its header. let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE))); let cx = &mut Context::from_waker(&waker); let mut state = (*raw.header).state.load(Ordering::Acquire); // Update the task's state before polling its future. loop { // If the task has already been closed, drop the task reference and return. if state & CLOSED != 0 { // Drop the future. Self::drop_future(ptr); // Mark the task as unscheduled. let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); // Take the awaiter out. let mut awaiter = None; if state & AWAITER != 0 { awaiter = (*raw.header).take(None); } // Drop the task reference. Self::drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { abort_on_panic(|| w.wake()); } return false; } // Mark the task as unscheduled and running. match (*raw.header).state.compare_exchange_weak( state, (state & !SCHEDULED) | RUNNING, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // Update the state because we're continuing with polling the future. state = (state & !SCHEDULED) | RUNNING; break; } Err(s) => state = s, } } // Poll the inner future, but surround it with a guard that closes the task in case polling // panics. let guard = Guard(raw); let poll = ::poll(Pin::new_unchecked(&mut *raw.future), cx); mem::forget(guard); match poll { Poll::Ready(out) => { // Replace the future with its output. Self::drop_future(ptr); raw.output.write(out); // The task is now completed. loop { // If the `Task` is dropped, we'll need to close it and drop the output. let new = if state & TASK == 0 { (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED } else { (state & !RUNNING & !SCHEDULED) | COMPLETED }; // Mark the task as not running and completed. match (*raw.header).state.compare_exchange_weak( state, new, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // If the `Task` is dropped or if the task was closed while running, // now it's time to drop the output. if state & TASK == 0 || state & CLOSED != 0 { // Drop the output. abort_on_panic(|| raw.output.drop_in_place()); } // Take the awaiter out. let mut awaiter = None; if state & AWAITER != 0 { awaiter = (*raw.header).take(None); } // Drop the task reference. Self::drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { abort_on_panic(|| w.wake()); } break; } Err(s) => state = s, } } } Poll::Pending => { let mut future_dropped = false; // The task is still not completed. loop { // If the task was closed while running, we'll need to unschedule in case it // was woken up and then destroy it. let new = if state & CLOSED != 0 { state & !RUNNING & !SCHEDULED } else { state & !RUNNING }; if state & CLOSED != 0 && !future_dropped { // The thread that closed the task didn't drop the future because it was // running so now it's our responsibility to do so. Self::drop_future(ptr); future_dropped = true; } // Mark the task as not running. match (*raw.header).state.compare_exchange_weak( state, new, Ordering::AcqRel, Ordering::Acquire, ) { Ok(state) => { // If the task was closed while running, we need to notify the awaiter. // If the task was woken up while running, we need to schedule it. // Otherwise, we just drop the task reference. if state & CLOSED != 0 { // Take the awaiter out. let mut awaiter = None; if state & AWAITER != 0 { awaiter = (*raw.header).take(None); } // Drop the task reference. Self::drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { abort_on_panic(|| w.wake()); } } else if state & SCHEDULED != 0 { // The thread that woke the task up didn't reschedule it because // it was running so now it's our responsibility to do so. Self::schedule(ptr); return true; } else { // Drop the task reference. Self::drop_ref(ptr); } break; } Err(s) => state = s, } } } } return false; /// A guard that closes the task if polling its future panics. struct Guard(RawTask) where F: Future, S: Fn(Runnable); impl Drop for Guard where F: Future, S: Fn(Runnable), { fn drop(&mut self) { let raw = self.0; let ptr = raw.header as *const (); unsafe { let mut state = (*raw.header).state.load(Ordering::Acquire); loop { // If the task was closed while running, then unschedule it, drop its // future, and drop the task reference. if state & CLOSED != 0 { // The thread that closed the task didn't drop the future because it // was running so now it's our responsibility to do so. RawTask::::drop_future(ptr); // Mark the task as not running and not scheduled. (*raw.header) .state .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel); // Take the awaiter out. let mut awaiter = None; if state & AWAITER != 0 { awaiter = (*raw.header).take(None); } // Drop the task reference. RawTask::::drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { abort_on_panic(|| w.wake()); } break; } // Mark the task as not running, not scheduled, and closed. match (*raw.header).state.compare_exchange_weak( state, (state & !RUNNING & !SCHEDULED) | CLOSED, Ordering::AcqRel, Ordering::Acquire, ) { Ok(state) => { // Drop the future because the task is now closed. RawTask::::drop_future(ptr); // Take the awaiter out. let mut awaiter = None; if state & AWAITER != 0 { awaiter = (*raw.header).take(None); } // Drop the task reference. RawTask::::drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { abort_on_panic(|| w.wake()); } break; } Err(s) => state = s, } } } } } } }