use futures::channel::oneshot; use futures::executor::LocalPool; use futures::future::{self, lazy, poll_fn, Future}; use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker}; use std::cell::{Cell, RefCell}; use std::pin::Pin; use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; use std::time::Duration; struct Pending(Rc<()>); impl Future for Pending { type Output = (); fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { Poll::Pending } } fn pending() -> Pending { Pending(Rc::new(())) } #[test] fn run_until_single_future() { let mut cnt = 0; { let mut pool = LocalPool::new(); let fut = lazy(|_| { cnt += 1; }); pool.run_until(fut); } assert_eq!(cnt, 1); } #[test] fn run_until_ignores_spawned() { let mut pool = LocalPool::new(); let spawn = pool.spawner(); spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); pool.run_until(lazy(|_| ())); } #[test] fn run_until_executes_spawned() { let (tx, rx) = oneshot::channel(); let mut pool = LocalPool::new(); let spawn = pool.spawner(); spawn .spawn_local_obj( Box::pin(lazy(move |_| { tx.send(()).unwrap(); })) .into(), ) .unwrap(); pool.run_until(rx).unwrap(); } #[test] fn run_returns_if_empty() { let mut pool = LocalPool::new(); pool.run(); pool.run(); } #[test] fn run_executes_spawned() { let cnt = Rc::new(Cell::new(0)); let cnt2 = cnt.clone(); let mut pool = LocalPool::new(); let spawn = pool.spawner(); let spawn2 = pool.spawner(); spawn .spawn_local_obj( Box::pin(lazy(move |_| { spawn2 .spawn_local_obj( Box::pin(lazy(move |_| { cnt2.set(cnt2.get() + 1); })) .into(), ) .unwrap(); })) .into(), ) .unwrap(); pool.run(); assert_eq!(cnt.get(), 1); } #[test] fn run_spawn_many() { const ITER: usize = 200; let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); let spawn = pool.spawner(); for _ in 0..ITER { let cnt = cnt.clone(); spawn .spawn_local_obj( Box::pin(lazy(move |_| { cnt.set(cnt.get() + 1); })) .into(), ) .unwrap(); } pool.run(); assert_eq!(cnt.get(), ITER); } #[test] fn try_run_one_returns_if_empty() { let mut pool = LocalPool::new(); assert!(!pool.try_run_one()); } #[test] fn try_run_one_executes_one_ready() { const ITER: usize = 200; let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); let spawn = pool.spawner(); for _ in 0..ITER { spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); let cnt = cnt.clone(); spawn .spawn_local_obj( Box::pin(lazy(move |_| { cnt.set(cnt.get() + 1); })) .into(), ) .unwrap(); spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); } for i in 0..ITER { assert_eq!(cnt.get(), i); assert!(pool.try_run_one()); assert_eq!(cnt.get(), i + 1); } assert!(!pool.try_run_one()); } #[test] fn try_run_one_returns_on_no_progress() { const ITER: usize = 10; let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); let spawn = pool.spawner(); let waker: Rc>> = Rc::new(Cell::new(None)); { let cnt = cnt.clone(); let waker = waker.clone(); spawn .spawn_local_obj( Box::pin(poll_fn(move |ctx| { cnt.set(cnt.get() + 1); waker.set(Some(ctx.waker().clone())); if cnt.get() == ITER { Poll::Ready(()) } else { Poll::Pending } })) .into(), ) .unwrap(); } for i in 0..ITER - 1 { assert_eq!(cnt.get(), i); assert!(!pool.try_run_one()); assert_eq!(cnt.get(), i + 1); let w = waker.take(); assert!(w.is_some()); w.unwrap().wake(); } assert!(pool.try_run_one()); assert_eq!(cnt.get(), ITER); } #[test] fn try_run_one_runs_sub_futures() { let mut pool = LocalPool::new(); let spawn = pool.spawner(); let cnt = Rc::new(Cell::new(0)); let inner_spawner = spawn.clone(); let cnt1 = cnt.clone(); spawn .spawn_local_obj( Box::pin(poll_fn(move |_| { cnt1.set(cnt1.get() + 1); let cnt2 = cnt1.clone(); inner_spawner .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()) .unwrap(); Poll::Pending })) .into(), ) .unwrap(); pool.try_run_one(); assert_eq!(cnt.get(), 2); } #[test] fn run_until_stalled_returns_if_empty() { let mut pool = LocalPool::new(); pool.run_until_stalled(); pool.run_until_stalled(); } #[test] fn run_until_stalled_returns_multiple_times() { let mut pool = LocalPool::new(); let spawn = pool.spawner(); let cnt = Rc::new(Cell::new(0)); let cnt1 = cnt.clone(); spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt1.set(cnt1.get() + 1))).into()).unwrap(); pool.run_until_stalled(); assert_eq!(cnt.get(), 1); let cnt2 = cnt.clone(); spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()).unwrap(); pool.run_until_stalled(); assert_eq!(cnt.get(), 2); } #[test] fn run_until_stalled_runs_spawned_sub_futures() { let mut pool = LocalPool::new(); let spawn = pool.spawner(); let cnt = Rc::new(Cell::new(0)); let inner_spawner = spawn.clone(); let cnt1 = cnt.clone(); spawn .spawn_local_obj( Box::pin(poll_fn(move |_| { cnt1.set(cnt1.get() + 1); let cnt2 = cnt1.clone(); inner_spawner .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()) .unwrap(); Poll::Pending })) .into(), ) .unwrap(); pool.run_until_stalled(); assert_eq!(cnt.get(), 2); } #[test] fn run_until_stalled_executes_all_ready() { const ITER: usize = if cfg!(miri) { 50 } else { 200 }; const PER_ITER: usize = 3; let cnt = Rc::new(Cell::new(0)); let mut pool = LocalPool::new(); let spawn = pool.spawner(); for i in 0..ITER { for _ in 0..PER_ITER { spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); let cnt = cnt.clone(); spawn .spawn_local_obj( Box::pin(lazy(move |_| { cnt.set(cnt.get() + 1); })) .into(), ) .unwrap(); // also add some pending tasks to test if they are ignored spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap(); } assert_eq!(cnt.get(), i * PER_ITER); pool.run_until_stalled(); assert_eq!(cnt.get(), (i + 1) * PER_ITER); } } #[test] #[should_panic] fn nesting_run() { let mut pool = LocalPool::new(); let spawn = pool.spawner(); spawn .spawn_obj( Box::pin(lazy(|_| { let mut pool = LocalPool::new(); pool.run(); })) .into(), ) .unwrap(); pool.run(); } #[test] #[should_panic] fn nesting_run_run_until_stalled() { let mut pool = LocalPool::new(); let spawn = pool.spawner(); spawn .spawn_obj( Box::pin(lazy(|_| { let mut pool = LocalPool::new(); pool.run_until_stalled(); })) .into(), ) .unwrap(); pool.run(); } #[test] fn tasks_are_scheduled_fairly() { let state = Rc::new(RefCell::new([0, 0])); struct Spin { state: Rc>, idx: usize, } impl Future for Spin { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { let mut state = self.state.borrow_mut(); if self.idx == 0 { let diff = state[0] - state[1]; assert!(diff.abs() <= 1); if state[0] >= 50 { return Poll::Ready(()); } } state[self.idx] += 1; if state[self.idx] >= 100 { return Poll::Ready(()); } cx.waker().wake_by_ref(); Poll::Pending } } let mut pool = LocalPool::new(); let spawn = pool.spawner(); spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), idx: 0 }).into()).unwrap(); spawn.spawn_local_obj(Box::pin(Spin { state, idx: 1 }).into()).unwrap(); pool.run(); } // Tests that the use of park/unpark in user-code has no // effect on the expected behavior of the executor. #[test] fn park_unpark_independence() { let mut done = false; let future = future::poll_fn(move |cx| { if done { return Poll::Ready(()); } done = true; cx.waker().clone().wake(); // (*) // some user-code that temporarily parks the thread let test = thread::current(); let latch = Arc::new(AtomicBool::new(false)); let signal = latch.clone(); thread::spawn(move || { thread::sleep(Duration::from_millis(10)); signal.store(true, Ordering::SeqCst); test.unpark() }); while !latch.load(Ordering::Relaxed) { thread::park(); } Poll::Pending // Expect to be called again due to (*). }); futures::executor::block_on(future) } struct SelfWaking { wakeups_remaining: Rc>, } impl Future for SelfWaking { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if *self.wakeups_remaining.borrow() != 0 { *self.wakeups_remaining.borrow_mut() -= 1; cx.waker().wake_by_ref(); } Poll::Pending } } /// Regression test for https://github.com/rust-lang/futures-rs/pull/2593 /// /// The issue was that self-waking futures could cause `run_until_stalled` /// to exit early, even when progress could still be made. #[test] fn self_waking_run_until_stalled() { let wakeups_remaining = Rc::new(RefCell::new(10)); let mut pool = LocalPool::new(); let spawner = pool.spawner(); for _ in 0..3 { let wakeups_remaining = Rc::clone(&wakeups_remaining); spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); } // This should keep polling until there are no more wakeups. pool.run_until_stalled(); assert_eq!(*wakeups_remaining.borrow(), 0); } /// Regression test for https://github.com/rust-lang/futures-rs/pull/2593 /// /// The issue was that self-waking futures could cause `try_run_one` /// to exit early, even when progress could still be made. #[test] fn self_waking_try_run_one() { let wakeups_remaining = Rc::new(RefCell::new(10)); let mut pool = LocalPool::new(); let spawner = pool.spawner(); for _ in 0..3 { let wakeups_remaining = Rc::clone(&wakeups_remaining); spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); } spawner.spawn(future::ready(())).unwrap(); // The `ready` future should complete. assert!(pool.try_run_one()); // The self-waking futures are each polled once. assert_eq!(*wakeups_remaining.borrow(), 7); }