use std::future::Future; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; use async_task::Runnable; use smol::future; // Creates a future with event counters. // // Usage: `future!(f, POLL, DROP)` // // The future `f` always returns `Poll::Ready`. // When it gets polled, `POLL` is incremented. // When it gets dropped, `DROP` is incremented. macro_rules! future { ($name:pat, $poll:ident, $drop:ident) => { static $poll: AtomicUsize = AtomicUsize::new(0); static $drop: AtomicUsize = AtomicUsize::new(0); let $name = { struct Fut(Box); impl Future for Fut { type Output = Box; fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { $poll.fetch_add(1, Ordering::SeqCst); Poll::Ready(Box::new(0)) } } impl Drop for Fut { fn drop(&mut self) { $drop.fetch_add(1, Ordering::SeqCst); } } Fut(Box::new(0)) }; }; } // Creates a schedule function with event counters. // // Usage: `schedule!(s, SCHED, DROP)` // // The schedule function `s` does nothing. // When it gets invoked, `SCHED` is incremented. // When it gets dropped, `DROP` is incremented. macro_rules! schedule { ($name:pat, $sched:ident, $drop:ident) => { static $drop: AtomicUsize = AtomicUsize::new(0); static $sched: AtomicUsize = AtomicUsize::new(0); let $name = { struct Guard(Box); impl Drop for Guard { fn drop(&mut self) { $drop.fetch_add(1, Ordering::SeqCst); } } let guard = Guard(Box::new(0)); move |_runnable| { let _ = &guard; $sched.fetch_add(1, Ordering::SeqCst); } }; }; } fn try_await(f: impl Future) -> Option { future::block_on(future::poll_once(f)) } #[test] fn drop_and_detach() { future!(f, POLL, DROP_F); schedule!(s, SCHEDULE, DROP_S); let (runnable, task) = async_task::spawn(f, s); assert_eq!(POLL.load(Ordering::SeqCst), 0); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 0); assert_eq!(DROP_S.load(Ordering::SeqCst), 0); drop(runnable); assert_eq!(POLL.load(Ordering::SeqCst), 0); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 1); assert_eq!(DROP_S.load(Ordering::SeqCst), 0); task.detach(); assert_eq!(POLL.load(Ordering::SeqCst), 0); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 1); assert_eq!(DROP_S.load(Ordering::SeqCst), 1); } #[test] fn detach_and_drop() { future!(f, POLL, DROP_F); schedule!(s, SCHEDULE, DROP_S); let (runnable, task) = async_task::spawn(f, s); task.detach(); assert_eq!(POLL.load(Ordering::SeqCst), 0); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 0); assert_eq!(DROP_S.load(Ordering::SeqCst), 0); drop(runnable); assert_eq!(POLL.load(Ordering::SeqCst), 0); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 1); assert_eq!(DROP_S.load(Ordering::SeqCst), 1); } #[test] fn detach_and_run() { future!(f, POLL, DROP_F); schedule!(s, SCHEDULE, DROP_S); let (runnable, task) = async_task::spawn(f, s); task.detach(); assert_eq!(POLL.load(Ordering::SeqCst), 0); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 0); assert_eq!(DROP_S.load(Ordering::SeqCst), 0); runnable.run(); assert_eq!(POLL.load(Ordering::SeqCst), 1); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 1); assert_eq!(DROP_S.load(Ordering::SeqCst), 1); } #[test] fn run_and_detach() { future!(f, POLL, DROP_F); schedule!(s, SCHEDULE, DROP_S); let (runnable, task) = async_task::spawn(f, s); runnable.run(); assert_eq!(POLL.load(Ordering::SeqCst), 1); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 1); assert_eq!(DROP_S.load(Ordering::SeqCst), 0); task.detach(); assert_eq!(POLL.load(Ordering::SeqCst), 1); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 1); assert_eq!(DROP_S.load(Ordering::SeqCst), 1); } #[test] fn cancel_and_run() { future!(f, POLL, DROP_F); schedule!(s, SCHEDULE, DROP_S); let (runnable, task) = async_task::spawn(f, s); drop(task); assert_eq!(POLL.load(Ordering::SeqCst), 0); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 0); assert_eq!(DROP_S.load(Ordering::SeqCst), 0); runnable.run(); assert_eq!(POLL.load(Ordering::SeqCst), 0); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 1); assert_eq!(DROP_S.load(Ordering::SeqCst), 1); } #[test] fn run_and_cancel() { future!(f, POLL, DROP_F); schedule!(s, SCHEDULE, DROP_S); let (runnable, task) = async_task::spawn(f, s); runnable.run(); assert_eq!(POLL.load(Ordering::SeqCst), 1); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 1); assert_eq!(DROP_S.load(Ordering::SeqCst), 0); drop(task); assert_eq!(POLL.load(Ordering::SeqCst), 1); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 1); assert_eq!(DROP_S.load(Ordering::SeqCst), 1); } #[test] fn cancel_join() { future!(f, POLL, DROP_F); schedule!(s, SCHEDULE, DROP_S); let (runnable, mut task) = async_task::spawn(f, s); assert!(try_await(&mut task).is_none()); assert_eq!(POLL.load(Ordering::SeqCst), 0); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 0); assert_eq!(DROP_S.load(Ordering::SeqCst), 0); runnable.run(); assert_eq!(POLL.load(Ordering::SeqCst), 1); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 1); assert_eq!(DROP_S.load(Ordering::SeqCst), 0); assert!(try_await(&mut task).is_some()); assert_eq!(POLL.load(Ordering::SeqCst), 1); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 1); assert_eq!(DROP_S.load(Ordering::SeqCst), 0); drop(task); assert_eq!(POLL.load(Ordering::SeqCst), 1); assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); assert_eq!(DROP_F.load(Ordering::SeqCst), 1); assert_eq!(DROP_S.load(Ordering::SeqCst), 1); } #[test] fn schedule() { let (s, r) = flume::unbounded(); let schedule = move |runnable| s.send(runnable).unwrap(); let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule); assert!(r.is_empty()); runnable.schedule(); let runnable = r.recv().unwrap(); assert!(r.is_empty()); runnable.schedule(); let runnable = r.recv().unwrap(); assert!(r.is_empty()); runnable.schedule(); r.recv().unwrap(); } #[test] fn schedule_counter() { static COUNT: AtomicUsize = AtomicUsize::new(0); let (s, r) = flume::unbounded(); let schedule = move |runnable: Runnable| { COUNT.fetch_add(1, Ordering::SeqCst); s.send(runnable).unwrap(); }; let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule); runnable.schedule(); r.recv().unwrap().schedule(); r.recv().unwrap().schedule(); assert_eq!(COUNT.load(Ordering::SeqCst), 3); r.recv().unwrap(); } #[test] fn drop_inside_schedule() { struct DropGuard(AtomicUsize); impl Drop for DropGuard { fn drop(&mut self) { self.0.fetch_add(1, Ordering::SeqCst); } } let guard = DropGuard(AtomicUsize::new(0)); let (runnable, _) = async_task::spawn(async {}, move |runnable| { assert_eq!(guard.0.load(Ordering::SeqCst), 0); drop(runnable); assert_eq!(guard.0.load(Ordering::SeqCst), 0); }); runnable.schedule(); } #[test] fn waker() { let (s, r) = flume::unbounded(); let schedule = move |runnable| s.send(runnable).unwrap(); let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule); assert!(r.is_empty()); let waker = runnable.waker(); runnable.run(); waker.wake_by_ref(); let runnable = r.recv().unwrap(); runnable.run(); waker.wake(); r.recv().unwrap(); }