use core::fmt; use core::future::Future; use core::marker::{PhantomData, Unpin}; use core::mem; use core::pin::Pin; use core::ptr::NonNull; use core::sync::atomic::Ordering; use core::task::{Context, Poll}; use crate::header::Header; use crate::state::*; /// A spawned task. /// /// A [`Task`] can be awaited to retrieve the output of its future. /// /// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the /// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a /// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()] /// method. /// /// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor /// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking /// [`run()`][`super::Runnable::run()`]. /// /// # Examples /// /// ``` /// use smol::{future, Executor}; /// use std::thread; /// /// let ex = Executor::new(); /// /// // Spawn a future onto the executor. /// let task = ex.spawn(async { /// println!("Hello from a task!"); /// 1 + 2 /// }); /// /// // Run an executor thread. /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); /// /// // Wait for the task's output. /// assert_eq!(future::block_on(task), 3); /// ``` #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] pub struct Task { /// A raw task pointer. pub(crate) ptr: NonNull<()>, /// A marker capturing generic type `T`. pub(crate) _marker: PhantomData, } unsafe impl Send for Task {} unsafe impl Sync for Task {} impl Unpin for Task {} #[cfg(feature = "std")] impl std::panic::UnwindSafe for Task {} #[cfg(feature = "std")] impl std::panic::RefUnwindSafe for Task {} impl Task { /// Detaches the task to let it keep running in the background. /// /// # Examples /// /// ``` /// use smol::{Executor, Timer}; /// use std::time::Duration; /// /// let ex = Executor::new(); /// /// // Spawn a deamon future. /// ex.spawn(async { /// loop { /// println!("I'm a daemon task looping forever."); /// Timer::after(Duration::from_secs(1)).await; /// } /// }) /// .detach(); /// ``` pub fn detach(self) { let mut this = self; let _out = this.set_detached(); mem::forget(this); } /// Cancels the task and waits for it to stop running. /// /// Returns the task's output if it was completed just before it got canceled, or [`None`] if /// it didn't complete. /// /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of /// canceling because it also waits for the task to stop running. /// /// # Examples /// /// ``` /// # if cfg!(miri) { return; } // Miri does not support epoll /// use smol::{future, Executor, Timer}; /// use std::thread; /// use std::time::Duration; /// /// let ex = Executor::new(); /// /// // Spawn a deamon future. /// let task = ex.spawn(async { /// loop { /// println!("Even though I'm in an infinite loop, you can still cancel me!"); /// Timer::after(Duration::from_secs(1)).await; /// } /// }); /// /// // Run an executor thread. /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); /// /// future::block_on(async { /// Timer::after(Duration::from_secs(3)).await; /// task.cancel().await; /// }); /// ``` pub async fn cancel(self) -> Option { let mut this = self; this.set_canceled(); this.fallible().await } /// Converts this task into a [`FallibleTask`]. /// /// Like [`Task`], a fallible task will poll the task's output until it is /// completed or cancelled due to its [`Runnable`][`super::Runnable`] being /// dropped without being run. Resolves to the task's output when completed, /// or [`None`] if it didn't complete. /// /// # Examples /// /// ``` /// use smol::{future, Executor}; /// use std::thread; /// /// let ex = Executor::new(); /// /// // Spawn a future onto the executor. /// let task = ex.spawn(async { /// println!("Hello from a task!"); /// 1 + 2 /// }) /// .fallible(); /// /// // Run an executor thread. /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); /// /// // Wait for the task's output. /// assert_eq!(future::block_on(task), Some(3)); /// ``` /// /// ``` /// use smol::future; /// /// // Schedule function which drops the runnable without running it. /// let schedule = move |runnable| drop(runnable); /// /// // Create a task with the future and the schedule function. /// let (runnable, task) = async_task::spawn(async { /// println!("Hello from a task!"); /// 1 + 2 /// }, schedule); /// runnable.schedule(); /// /// // Wait for the task's output. /// assert_eq!(future::block_on(task.fallible()), None); /// ``` pub fn fallible(self) -> FallibleTask { FallibleTask { task: self } } /// Puts the task in canceled state. fn set_canceled(&mut self) { let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; unsafe { let mut state = (*header).state.load(Ordering::Acquire); loop { // If the task has been completed or closed, it can't be canceled. if state & (COMPLETED | CLOSED) != 0 { break; } // If the task is not scheduled nor running, we'll need to schedule it. let new = if state & (SCHEDULED | RUNNING) == 0 { (state | SCHEDULED | CLOSED) + REFERENCE } else { state | CLOSED }; // Mark the task as closed. match (*header).state.compare_exchange_weak( state, new, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // If the task is not scheduled nor running, schedule it one more time so // that its future gets dropped by the executor. if state & (SCHEDULED | RUNNING) == 0 { ((*header).vtable.schedule)(ptr); } // Notify the awaiter that the task has been closed. if state & AWAITER != 0 { (*header).notify(None); } break; } Err(s) => state = s, } } } } /// Puts the task in detached state. fn set_detached(&mut self) -> Option { let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; unsafe { // A place where the output will be stored in case it needs to be dropped. let mut output = None; // Optimistically assume the `Task` is being detached just after creating the task. // This is a common case so if the `Task` is datached, the overhead of it is only one // compare-exchange operation. if let Err(mut state) = (*header).state.compare_exchange_weak( SCHEDULED | TASK | REFERENCE, SCHEDULED | REFERENCE, Ordering::AcqRel, Ordering::Acquire, ) { loop { // If the task has been completed but not yet closed, that means its output // must be dropped. if state & COMPLETED != 0 && state & CLOSED == 0 { // Mark the task as closed in order to grab its output. match (*header).state.compare_exchange_weak( state, state | CLOSED, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // Read the output. output = Some((((*header).vtable.get_output)(ptr) as *mut T).read()); // Update the state variable because we're continuing the loop. state |= CLOSED; } Err(s) => state = s, } } else { // If this is the last reference to the task and it's not closed, then // close it and schedule one more time so that its future gets dropped by // the executor. let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { SCHEDULED | CLOSED | REFERENCE } else { state & !TASK }; // Unset the `TASK` flag. match (*header).state.compare_exchange_weak( state, new, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // If this is the last reference to the task, we need to either // schedule dropping its future or destroy it. if state & !(REFERENCE - 1) == 0 { if state & CLOSED == 0 { ((*header).vtable.schedule)(ptr); } else { ((*header).vtable.destroy)(ptr); } } break; } Err(s) => state = s, } } } } output } } /// Polls the task to retrieve its output. /// /// Returns `Some` if the task has completed or `None` if it was closed. /// /// A task becomes closed in the following cases: /// /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`. /// 2. Its output gets awaited by the `Task`. /// 3. It panics while polling the future. /// 4. It is completed and the `Task` gets dropped. fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll> { let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; unsafe { let mut state = (*header).state.load(Ordering::Acquire); loop { // If the task has been closed, notify the awaiter and return `None`. if state & CLOSED != 0 { // If the task is scheduled or running, we need to wait until its future is // dropped. if state & (SCHEDULED | RUNNING) != 0 { // Replace the waker with one associated with the current task. (*header).register(cx.waker()); // Reload the state after registering. It is possible changes occurred just // before registration so we need to check for that. state = (*header).state.load(Ordering::Acquire); // If the task is still scheduled or running, we need to wait because its // future is not dropped yet. if state & (SCHEDULED | RUNNING) != 0 { return Poll::Pending; } } // Even though the awaiter is most likely the current task, it could also be // another task. (*header).notify(Some(cx.waker())); return Poll::Ready(None); } // If the task is not completed, register the current task. if state & COMPLETED == 0 { // Replace the waker with one associated with the current task. (*header).register(cx.waker()); // Reload the state after registering. It is possible that the task became // completed or closed just before registration so we need to check for that. state = (*header).state.load(Ordering::Acquire); // If the task has been closed, restart. if state & CLOSED != 0 { continue; } // If the task is still not completed, we're blocked on it. if state & COMPLETED == 0 { return Poll::Pending; } } // Since the task is now completed, mark it as closed in order to grab its output. match (*header).state.compare_exchange( state, state | CLOSED, Ordering::AcqRel, Ordering::Acquire, ) { Ok(_) => { // Notify the awaiter. Even though the awaiter is most likely the current // task, it could also be another task. if state & AWAITER != 0 { (*header).notify(Some(cx.waker())); } // Take the output from the task. let output = ((*header).vtable.get_output)(ptr) as *mut T; return Poll::Ready(Some(output.read())); } Err(s) => state = s, } } } } fn header(&self) -> &Header { let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; unsafe { &*header } } /// Returns `true` if the current task is finished. /// /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. pub fn is_finished(&self) -> bool { let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; unsafe { let state = (*header).state.load(Ordering::Acquire); state & (CLOSED | COMPLETED) != 0 } } } impl Drop for Task { fn drop(&mut self) { self.set_canceled(); self.set_detached(); } } impl Future for Task { type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.poll_task(cx) { Poll::Ready(t) => Poll::Ready(t.expect("task has failed")), Poll::Pending => Poll::Pending, } } } impl fmt::Debug for Task { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Task") .field("header", self.header()) .finish() } } /// A spawned task with a fallible response. /// /// This type behaves like [`Task`], however it produces an `Option` when /// polled and will return `None` if the executor dropped its /// [`Runnable`][`super::Runnable`] without being run. /// /// This can be useful to avoid the panic produced when polling the `Task` /// future if the executor dropped its `Runnable`. #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] pub struct FallibleTask { task: Task, } impl FallibleTask { /// Detaches the task to let it keep running in the background. /// /// # Examples /// /// ``` /// use smol::{Executor, Timer}; /// use std::time::Duration; /// /// let ex = Executor::new(); /// /// // Spawn a deamon future. /// ex.spawn(async { /// loop { /// println!("I'm a daemon task looping forever."); /// Timer::after(Duration::from_secs(1)).await; /// } /// }) /// .fallible() /// .detach(); /// ``` pub fn detach(self) { self.task.detach() } /// Cancels the task and waits for it to stop running. /// /// Returns the task's output if it was completed just before it got canceled, or [`None`] if /// it didn't complete. /// /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of /// canceling because it also waits for the task to stop running. /// /// # Examples /// /// ``` /// # if cfg!(miri) { return; } // Miri does not support epoll /// use smol::{future, Executor, Timer}; /// use std::thread; /// use std::time::Duration; /// /// let ex = Executor::new(); /// /// // Spawn a deamon future. /// let task = ex.spawn(async { /// loop { /// println!("Even though I'm in an infinite loop, you can still cancel me!"); /// Timer::after(Duration::from_secs(1)).await; /// } /// }) /// .fallible(); /// /// // Run an executor thread. /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); /// /// future::block_on(async { /// Timer::after(Duration::from_secs(3)).await; /// task.cancel().await; /// }); /// ``` pub async fn cancel(self) -> Option { self.task.cancel().await } } impl Future for FallibleTask { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.task.poll_task(cx) } } impl fmt::Debug for FallibleTask { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FallibleTask") .field("header", self.task.header()) .finish() } }