summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-executor/tests/local_pool.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-executor/tests/local_pool.rs')
-rw-r--r--third_party/rust/futures-executor/tests/local_pool.rs496
1 files changed, 496 insertions, 0 deletions
diff --git a/third_party/rust/futures-executor/tests/local_pool.rs b/third_party/rust/futures-executor/tests/local_pool.rs
new file mode 100644
index 0000000000..72ce74b744
--- /dev/null
+++ b/third_party/rust/futures-executor/tests/local_pool.rs
@@ -0,0 +1,496 @@
+use futures::channel::oneshot;
+use futures::executor::LocalPool;
+use futures::future::{self, lazy, poll_fn, Future};
+use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker};
+use std::cell::{Cell, RefCell};
+use std::pin::Pin;
+use std::rc::Rc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+
+struct Pending(Rc<()>);
+
+impl Future for Pending {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
+ Poll::Pending
+ }
+}
+
+fn pending() -> Pending {
+ Pending(Rc::new(()))
+}
+
+#[test]
+fn run_until_single_future() {
+ let mut cnt = 0;
+
+ {
+ let mut pool = LocalPool::new();
+ let fut = lazy(|_| {
+ cnt += 1;
+ });
+ pool.run_until(fut);
+ }
+
+ assert_eq!(cnt, 1);
+}
+
+#[test]
+fn run_until_ignores_spawned() {
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+ spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
+ pool.run_until(lazy(|_| ()));
+}
+
+#[test]
+fn run_until_executes_spawned() {
+ let (tx, rx) = oneshot::channel();
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+ spawn
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ tx.send(()).unwrap();
+ }))
+ .into(),
+ )
+ .unwrap();
+ pool.run_until(rx).unwrap();
+}
+
+#[test]
+fn run_returns_if_empty() {
+ let mut pool = LocalPool::new();
+ pool.run();
+ pool.run();
+}
+
+#[test]
+fn run_executes_spawned() {
+ let cnt = Rc::new(Cell::new(0));
+ let cnt2 = cnt.clone();
+
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+ let spawn2 = pool.spawner();
+
+ spawn
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ spawn2
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ cnt2.set(cnt2.get() + 1);
+ }))
+ .into(),
+ )
+ .unwrap();
+ }))
+ .into(),
+ )
+ .unwrap();
+
+ pool.run();
+
+ assert_eq!(cnt.get(), 1);
+}
+
+#[test]
+fn run_spawn_many() {
+ const ITER: usize = 200;
+
+ let cnt = Rc::new(Cell::new(0));
+
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+
+ for _ in 0..ITER {
+ let cnt = cnt.clone();
+ spawn
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ cnt.set(cnt.get() + 1);
+ }))
+ .into(),
+ )
+ .unwrap();
+ }
+
+ pool.run();
+
+ assert_eq!(cnt.get(), ITER);
+}
+
+#[test]
+fn try_run_one_returns_if_empty() {
+ let mut pool = LocalPool::new();
+ assert!(!pool.try_run_one());
+}
+
+#[test]
+fn try_run_one_executes_one_ready() {
+ const ITER: usize = 200;
+
+ let cnt = Rc::new(Cell::new(0));
+
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+
+ for _ in 0..ITER {
+ spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
+
+ let cnt = cnt.clone();
+ spawn
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ cnt.set(cnt.get() + 1);
+ }))
+ .into(),
+ )
+ .unwrap();
+
+ spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
+ }
+
+ for i in 0..ITER {
+ assert_eq!(cnt.get(), i);
+ assert!(pool.try_run_one());
+ assert_eq!(cnt.get(), i + 1);
+ }
+ assert!(!pool.try_run_one());
+}
+
+#[test]
+fn try_run_one_returns_on_no_progress() {
+ const ITER: usize = 10;
+
+ let cnt = Rc::new(Cell::new(0));
+
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+
+ let waker: Rc<Cell<Option<Waker>>> = Rc::new(Cell::new(None));
+ {
+ let cnt = cnt.clone();
+ let waker = waker.clone();
+ spawn
+ .spawn_local_obj(
+ Box::pin(poll_fn(move |ctx| {
+ cnt.set(cnt.get() + 1);
+ waker.set(Some(ctx.waker().clone()));
+ if cnt.get() == ITER {
+ Poll::Ready(())
+ } else {
+ Poll::Pending
+ }
+ }))
+ .into(),
+ )
+ .unwrap();
+ }
+
+ for i in 0..ITER - 1 {
+ assert_eq!(cnt.get(), i);
+ assert!(!pool.try_run_one());
+ assert_eq!(cnt.get(), i + 1);
+ let w = waker.take();
+ assert!(w.is_some());
+ w.unwrap().wake();
+ }
+ assert!(pool.try_run_one());
+ assert_eq!(cnt.get(), ITER);
+}
+
+#[test]
+fn try_run_one_runs_sub_futures() {
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+ let cnt = Rc::new(Cell::new(0));
+
+ let inner_spawner = spawn.clone();
+ let cnt1 = cnt.clone();
+ spawn
+ .spawn_local_obj(
+ Box::pin(poll_fn(move |_| {
+ cnt1.set(cnt1.get() + 1);
+
+ let cnt2 = cnt1.clone();
+ inner_spawner
+ .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
+ .unwrap();
+
+ Poll::Pending
+ }))
+ .into(),
+ )
+ .unwrap();
+
+ pool.try_run_one();
+ assert_eq!(cnt.get(), 2);
+}
+
+#[test]
+fn run_until_stalled_returns_if_empty() {
+ let mut pool = LocalPool::new();
+ pool.run_until_stalled();
+ pool.run_until_stalled();
+}
+
+#[test]
+fn run_until_stalled_returns_multiple_times() {
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+ let cnt = Rc::new(Cell::new(0));
+
+ let cnt1 = cnt.clone();
+ spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt1.set(cnt1.get() + 1))).into()).unwrap();
+ pool.run_until_stalled();
+ assert_eq!(cnt.get(), 1);
+
+ let cnt2 = cnt.clone();
+ spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()).unwrap();
+ pool.run_until_stalled();
+ assert_eq!(cnt.get(), 2);
+}
+
+#[test]
+fn run_until_stalled_runs_spawned_sub_futures() {
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+ let cnt = Rc::new(Cell::new(0));
+
+ let inner_spawner = spawn.clone();
+ let cnt1 = cnt.clone();
+ spawn
+ .spawn_local_obj(
+ Box::pin(poll_fn(move |_| {
+ cnt1.set(cnt1.get() + 1);
+
+ let cnt2 = cnt1.clone();
+ inner_spawner
+ .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
+ .unwrap();
+
+ Poll::Pending
+ }))
+ .into(),
+ )
+ .unwrap();
+
+ pool.run_until_stalled();
+ assert_eq!(cnt.get(), 2);
+}
+
+#[test]
+fn run_until_stalled_executes_all_ready() {
+ const ITER: usize = if cfg!(miri) { 50 } else { 200 };
+ const PER_ITER: usize = 3;
+
+ let cnt = Rc::new(Cell::new(0));
+
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+
+ for i in 0..ITER {
+ for _ in 0..PER_ITER {
+ spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
+
+ let cnt = cnt.clone();
+ spawn
+ .spawn_local_obj(
+ Box::pin(lazy(move |_| {
+ cnt.set(cnt.get() + 1);
+ }))
+ .into(),
+ )
+ .unwrap();
+
+ // also add some pending tasks to test if they are ignored
+ spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
+ }
+ assert_eq!(cnt.get(), i * PER_ITER);
+ pool.run_until_stalled();
+ assert_eq!(cnt.get(), (i + 1) * PER_ITER);
+ }
+}
+
+#[test]
+#[should_panic]
+fn nesting_run() {
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+
+ spawn
+ .spawn_obj(
+ Box::pin(lazy(|_| {
+ let mut pool = LocalPool::new();
+ pool.run();
+ }))
+ .into(),
+ )
+ .unwrap();
+
+ pool.run();
+}
+
+#[test]
+#[should_panic]
+fn nesting_run_run_until_stalled() {
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+
+ spawn
+ .spawn_obj(
+ Box::pin(lazy(|_| {
+ let mut pool = LocalPool::new();
+ pool.run_until_stalled();
+ }))
+ .into(),
+ )
+ .unwrap();
+
+ pool.run();
+}
+
+#[test]
+fn tasks_are_scheduled_fairly() {
+ let state = Rc::new(RefCell::new([0, 0]));
+
+ struct Spin {
+ state: Rc<RefCell<[i32; 2]>>,
+ idx: usize,
+ }
+
+ impl Future for Spin {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
+ let mut state = self.state.borrow_mut();
+
+ if self.idx == 0 {
+ let diff = state[0] - state[1];
+
+ assert!(diff.abs() <= 1);
+
+ if state[0] >= 50 {
+ return Poll::Ready(());
+ }
+ }
+
+ state[self.idx] += 1;
+
+ if state[self.idx] >= 100 {
+ return Poll::Ready(());
+ }
+
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+
+ let mut pool = LocalPool::new();
+ let spawn = pool.spawner();
+
+ spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), idx: 0 }).into()).unwrap();
+
+ spawn.spawn_local_obj(Box::pin(Spin { state, idx: 1 }).into()).unwrap();
+
+ pool.run();
+}
+
+// Tests that the use of park/unpark in user-code has no
+// effect on the expected behavior of the executor.
+#[test]
+fn park_unpark_independence() {
+ let mut done = false;
+
+ let future = future::poll_fn(move |cx| {
+ if done {
+ return Poll::Ready(());
+ }
+ done = true;
+ cx.waker().clone().wake(); // (*)
+ // some user-code that temporarily parks the thread
+ let test = thread::current();
+ let latch = Arc::new(AtomicBool::new(false));
+ let signal = latch.clone();
+ thread::spawn(move || {
+ thread::sleep(Duration::from_millis(10));
+ signal.store(true, Ordering::SeqCst);
+ test.unpark()
+ });
+ while !latch.load(Ordering::Relaxed) {
+ thread::park();
+ }
+ Poll::Pending // Expect to be called again due to (*).
+ });
+
+ futures::executor::block_on(future)
+}
+
+struct SelfWaking {
+ wakeups_remaining: Rc<RefCell<usize>>,
+}
+
+impl Future for SelfWaking {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ if *self.wakeups_remaining.borrow() != 0 {
+ *self.wakeups_remaining.borrow_mut() -= 1;
+ cx.waker().wake_by_ref();
+ }
+
+ Poll::Pending
+ }
+}
+
+/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
+///
+/// The issue was that self-waking futures could cause `run_until_stalled`
+/// to exit early, even when progress could still be made.
+#[test]
+fn self_waking_run_until_stalled() {
+ let wakeups_remaining = Rc::new(RefCell::new(10));
+
+ let mut pool = LocalPool::new();
+ let spawner = pool.spawner();
+ for _ in 0..3 {
+ let wakeups_remaining = Rc::clone(&wakeups_remaining);
+ spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
+ }
+
+ // This should keep polling until there are no more wakeups.
+ pool.run_until_stalled();
+
+ assert_eq!(*wakeups_remaining.borrow(), 0);
+}
+
+/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
+///
+/// The issue was that self-waking futures could cause `try_run_one`
+/// to exit early, even when progress could still be made.
+#[test]
+fn self_waking_try_run_one() {
+ let wakeups_remaining = Rc::new(RefCell::new(10));
+
+ let mut pool = LocalPool::new();
+ let spawner = pool.spawner();
+ for _ in 0..3 {
+ let wakeups_remaining = Rc::clone(&wakeups_remaining);
+ spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
+ }
+
+ spawner.spawn(future::ready(())).unwrap();
+
+ // The `ready` future should complete.
+ assert!(pool.try_run_one());
+
+ // The self-waking futures are each polled once.
+ assert_eq!(*wakeups_remaining.borrow(), 7);
+}