496 lines
12 KiB
Rust
496 lines
12 KiB
Rust
use futures::channel::oneshot;
|
|
use futures::executor::LocalPool;
|
|
use futures::future::{self, lazy, poll_fn, Future};
|
|
use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker};
|
|
use std::cell::{Cell, RefCell};
|
|
use std::pin::Pin;
|
|
use std::rc::Rc;
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
use std::sync::Arc;
|
|
use std::thread;
|
|
use std::time::Duration;
|
|
|
|
struct Pending(Rc<()>);
|
|
|
|
impl Future for Pending {
|
|
type Output = ();
|
|
|
|
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
|
|
Poll::Pending
|
|
}
|
|
}
|
|
|
|
fn pending() -> Pending {
|
|
Pending(Rc::new(()))
|
|
}
|
|
|
|
#[test]
|
|
fn run_until_single_future() {
|
|
let mut cnt = 0;
|
|
|
|
{
|
|
let mut pool = LocalPool::new();
|
|
let fut = lazy(|_| {
|
|
cnt += 1;
|
|
});
|
|
pool.run_until(fut);
|
|
}
|
|
|
|
assert_eq!(cnt, 1);
|
|
}
|
|
|
|
#[test]
|
|
fn run_until_ignores_spawned() {
|
|
let mut pool = LocalPool::new();
|
|
let spawn = pool.spawner();
|
|
spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
|
|
pool.run_until(lazy(|_| ()));
|
|
}
|
|
|
|
#[test]
|
|
fn run_until_executes_spawned() {
|
|
let (tx, rx) = oneshot::channel();
|
|
let mut pool = LocalPool::new();
|
|
let spawn = pool.spawner();
|
|
spawn
|
|
.spawn_local_obj(
|
|
Box::pin(lazy(move |_| {
|
|
tx.send(()).unwrap();
|
|
}))
|
|
.into(),
|
|
)
|
|
.unwrap();
|
|
pool.run_until(rx).unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn run_returns_if_empty() {
|
|
let mut pool = LocalPool::new();
|
|
pool.run();
|
|
pool.run();
|
|
}
|
|
|
|
#[test]
|
|
fn run_executes_spawned() {
|
|
let cnt = Rc::new(Cell::new(0));
|
|
let cnt2 = cnt.clone();
|
|
|
|
let mut pool = LocalPool::new();
|
|
let spawn = pool.spawner();
|
|
let spawn2 = pool.spawner();
|
|
|
|
spawn
|
|
.spawn_local_obj(
|
|
Box::pin(lazy(move |_| {
|
|
spawn2
|
|
.spawn_local_obj(
|
|
Box::pin(lazy(move |_| {
|
|
cnt2.set(cnt2.get() + 1);
|
|
}))
|
|
.into(),
|
|
)
|
|
.unwrap();
|
|
}))
|
|
.into(),
|
|
)
|
|
.unwrap();
|
|
|
|
pool.run();
|
|
|
|
assert_eq!(cnt.get(), 1);
|
|
}
|
|
|
|
#[test]
|
|
fn run_spawn_many() {
|
|
const ITER: usize = 200;
|
|
|
|
let cnt = Rc::new(Cell::new(0));
|
|
|
|
let mut pool = LocalPool::new();
|
|
let spawn = pool.spawner();
|
|
|
|
for _ in 0..ITER {
|
|
let cnt = cnt.clone();
|
|
spawn
|
|
.spawn_local_obj(
|
|
Box::pin(lazy(move |_| {
|
|
cnt.set(cnt.get() + 1);
|
|
}))
|
|
.into(),
|
|
)
|
|
.unwrap();
|
|
}
|
|
|
|
pool.run();
|
|
|
|
assert_eq!(cnt.get(), ITER);
|
|
}
|
|
|
|
#[test]
|
|
fn try_run_one_returns_if_empty() {
|
|
let mut pool = LocalPool::new();
|
|
assert!(!pool.try_run_one());
|
|
}
|
|
|
|
#[test]
|
|
fn try_run_one_executes_one_ready() {
|
|
const ITER: usize = 200;
|
|
|
|
let cnt = Rc::new(Cell::new(0));
|
|
|
|
let mut pool = LocalPool::new();
|
|
let spawn = pool.spawner();
|
|
|
|
for _ in 0..ITER {
|
|
spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
|
|
|
|
let cnt = cnt.clone();
|
|
spawn
|
|
.spawn_local_obj(
|
|
Box::pin(lazy(move |_| {
|
|
cnt.set(cnt.get() + 1);
|
|
}))
|
|
.into(),
|
|
)
|
|
.unwrap();
|
|
|
|
spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
|
|
}
|
|
|
|
for i in 0..ITER {
|
|
assert_eq!(cnt.get(), i);
|
|
assert!(pool.try_run_one());
|
|
assert_eq!(cnt.get(), i + 1);
|
|
}
|
|
assert!(!pool.try_run_one());
|
|
}
|
|
|
|
#[test]
|
|
fn try_run_one_returns_on_no_progress() {
|
|
const ITER: usize = 10;
|
|
|
|
let cnt = Rc::new(Cell::new(0));
|
|
|
|
let mut pool = LocalPool::new();
|
|
let spawn = pool.spawner();
|
|
|
|
let waker: Rc<Cell<Option<Waker>>> = Rc::new(Cell::new(None));
|
|
{
|
|
let cnt = cnt.clone();
|
|
let waker = waker.clone();
|
|
spawn
|
|
.spawn_local_obj(
|
|
Box::pin(poll_fn(move |ctx| {
|
|
cnt.set(cnt.get() + 1);
|
|
waker.set(Some(ctx.waker().clone()));
|
|
if cnt.get() == ITER {
|
|
Poll::Ready(())
|
|
} else {
|
|
Poll::Pending
|
|
}
|
|
}))
|
|
.into(),
|
|
)
|
|
.unwrap();
|
|
}
|
|
|
|
for i in 0..ITER - 1 {
|
|
assert_eq!(cnt.get(), i);
|
|
assert!(!pool.try_run_one());
|
|
assert_eq!(cnt.get(), i + 1);
|
|
let w = waker.take();
|
|
assert!(w.is_some());
|
|
w.unwrap().wake();
|
|
}
|
|
assert!(pool.try_run_one());
|
|
assert_eq!(cnt.get(), ITER);
|
|
}
|
|
|
|
#[test]
|
|
fn try_run_one_runs_sub_futures() {
|
|
let mut pool = LocalPool::new();
|
|
let spawn = pool.spawner();
|
|
let cnt = Rc::new(Cell::new(0));
|
|
|
|
let inner_spawner = spawn.clone();
|
|
let cnt1 = cnt.clone();
|
|
spawn
|
|
.spawn_local_obj(
|
|
Box::pin(poll_fn(move |_| {
|
|
cnt1.set(cnt1.get() + 1);
|
|
|
|
let cnt2 = cnt1.clone();
|
|
inner_spawner
|
|
.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
|
|
.unwrap();
|
|
|
|
Poll::Pending
|
|
}))
|
|
.into(),
|
|
)
|
|
.unwrap();
|
|
|
|
pool.try_run_one();
|
|
assert_eq!(cnt.get(), 2);
|
|
}
|
|
|
|
#[test]
|
|
fn run_until_stalled_returns_if_empty() {
|
|
let mut pool = LocalPool::new();
|
|
pool.run_until_stalled();
|
|
pool.run_until_stalled();
|
|
}
|
|
|
|
#[test]
|
|
fn run_until_stalled_returns_multiple_times() {
|
|
let mut pool = LocalPool::new();
|
|
let spawn = pool.spawner();
|
|
let cnt = Rc::new(Cell::new(0));
|
|
|
|
let cnt1 = cnt.clone();
|
|
spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt1.set(cnt1.get() + 1))).into()).unwrap();
|
|
pool.run_until_stalled();
|
|
assert_eq!(cnt.get(), 1);
|
|
|
|
let cnt2 = cnt.clone();
|
|
spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()).unwrap();
|
|
pool.run_until_stalled();
|
|
assert_eq!(cnt.get(), 2);
|
|
}
|
|
|
|
#[test]
|
|
fn run_until_stalled_runs_spawned_sub_futures() {
|
|
let mut pool = LocalPool::new();
|
|
let spawn = pool.spawner();
|
|
let cnt = Rc::new(Cell::new(0));
|
|
|
|
let inner_spawner = spawn.clone();
|
|
let cnt1 = cnt.clone();
|
|
spawn
|
|
.spawn_local_obj(
|
|
Box::pin(poll_fn(move |_| {
|
|
cnt1.set(cnt1.get() + 1);
|
|
|
|
let cnt2 = cnt1.clone();
|
|
inner_spawner
|
|
.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
|
|
.unwrap();
|
|
|
|
Poll::Pending
|
|
}))
|
|
.into(),
|
|
)
|
|
.unwrap();
|
|
|
|
pool.run_until_stalled();
|
|
assert_eq!(cnt.get(), 2);
|
|
}
|
|
|
|
#[test]
|
|
fn run_until_stalled_executes_all_ready() {
|
|
const ITER: usize = if cfg!(miri) { 50 } else { 200 };
|
|
const PER_ITER: usize = 3;
|
|
|
|
let cnt = Rc::new(Cell::new(0));
|
|
|
|
let mut pool = LocalPool::new();
|
|
let spawn = pool.spawner();
|
|
|
|
for i in 0..ITER {
|
|
for _ in 0..PER_ITER {
|
|
spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
|
|
|
|
let cnt = cnt.clone();
|
|
spawn
|
|
.spawn_local_obj(
|
|
Box::pin(lazy(move |_| {
|
|
cnt.set(cnt.get() + 1);
|
|
}))
|
|
.into(),
|
|
)
|
|
.unwrap();
|
|
|
|
// also add some pending tasks to test if they are ignored
|
|
spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
|
|
}
|
|
assert_eq!(cnt.get(), i * PER_ITER);
|
|
pool.run_until_stalled();
|
|
assert_eq!(cnt.get(), (i + 1) * PER_ITER);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
#[should_panic]
|
|
fn nesting_run() {
|
|
let mut pool = LocalPool::new();
|
|
let spawn = pool.spawner();
|
|
|
|
spawn
|
|
.spawn_obj(
|
|
Box::pin(lazy(|_| {
|
|
let mut pool = LocalPool::new();
|
|
pool.run();
|
|
}))
|
|
.into(),
|
|
)
|
|
.unwrap();
|
|
|
|
pool.run();
|
|
}
|
|
|
|
#[test]
|
|
#[should_panic]
|
|
fn nesting_run_run_until_stalled() {
|
|
let mut pool = LocalPool::new();
|
|
let spawn = pool.spawner();
|
|
|
|
spawn
|
|
.spawn_obj(
|
|
Box::pin(lazy(|_| {
|
|
let mut pool = LocalPool::new();
|
|
pool.run_until_stalled();
|
|
}))
|
|
.into(),
|
|
)
|
|
.unwrap();
|
|
|
|
pool.run();
|
|
}
|
|
|
|
#[test]
|
|
fn tasks_are_scheduled_fairly() {
|
|
let state = Rc::new(RefCell::new([0, 0]));
|
|
|
|
struct Spin {
|
|
state: Rc<RefCell<[i32; 2]>>,
|
|
idx: usize,
|
|
}
|
|
|
|
impl Future for Spin {
|
|
type Output = ();
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
|
|
let mut state = self.state.borrow_mut();
|
|
|
|
if self.idx == 0 {
|
|
let diff = state[0] - state[1];
|
|
|
|
assert!(diff.abs() <= 1);
|
|
|
|
if state[0] >= 50 {
|
|
return Poll::Ready(());
|
|
}
|
|
}
|
|
|
|
state[self.idx] += 1;
|
|
|
|
if state[self.idx] >= 100 {
|
|
return Poll::Ready(());
|
|
}
|
|
|
|
cx.waker().wake_by_ref();
|
|
Poll::Pending
|
|
}
|
|
}
|
|
|
|
let mut pool = LocalPool::new();
|
|
let spawn = pool.spawner();
|
|
|
|
spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), idx: 0 }).into()).unwrap();
|
|
|
|
spawn.spawn_local_obj(Box::pin(Spin { state, idx: 1 }).into()).unwrap();
|
|
|
|
pool.run();
|
|
}
|
|
|
|
// Tests that the use of park/unpark in user-code has no
|
|
// effect on the expected behavior of the executor.
|
|
#[test]
|
|
fn park_unpark_independence() {
|
|
let mut done = false;
|
|
|
|
let future = future::poll_fn(move |cx| {
|
|
if done {
|
|
return Poll::Ready(());
|
|
}
|
|
done = true;
|
|
cx.waker().clone().wake(); // (*)
|
|
// some user-code that temporarily parks the thread
|
|
let test = thread::current();
|
|
let latch = Arc::new(AtomicBool::new(false));
|
|
let signal = latch.clone();
|
|
thread::spawn(move || {
|
|
thread::sleep(Duration::from_millis(10));
|
|
signal.store(true, Ordering::SeqCst);
|
|
test.unpark()
|
|
});
|
|
while !latch.load(Ordering::Relaxed) {
|
|
thread::park();
|
|
}
|
|
Poll::Pending // Expect to be called again due to (*).
|
|
});
|
|
|
|
futures::executor::block_on(future)
|
|
}
|
|
|
|
struct SelfWaking {
|
|
wakeups_remaining: Rc<RefCell<usize>>,
|
|
}
|
|
|
|
impl Future for SelfWaking {
|
|
type Output = ();
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
if *self.wakeups_remaining.borrow() != 0 {
|
|
*self.wakeups_remaining.borrow_mut() -= 1;
|
|
cx.waker().wake_by_ref();
|
|
}
|
|
|
|
Poll::Pending
|
|
}
|
|
}
|
|
|
|
/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
|
|
///
|
|
/// The issue was that self-waking futures could cause `run_until_stalled`
|
|
/// to exit early, even when progress could still be made.
|
|
#[test]
|
|
fn self_waking_run_until_stalled() {
|
|
let wakeups_remaining = Rc::new(RefCell::new(10));
|
|
|
|
let mut pool = LocalPool::new();
|
|
let spawner = pool.spawner();
|
|
for _ in 0..3 {
|
|
let wakeups_remaining = Rc::clone(&wakeups_remaining);
|
|
spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
|
|
}
|
|
|
|
// This should keep polling until there are no more wakeups.
|
|
pool.run_until_stalled();
|
|
|
|
assert_eq!(*wakeups_remaining.borrow(), 0);
|
|
}
|
|
|
|
/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
|
|
///
|
|
/// The issue was that self-waking futures could cause `try_run_one`
|
|
/// to exit early, even when progress could still be made.
|
|
#[test]
|
|
fn self_waking_try_run_one() {
|
|
let wakeups_remaining = Rc::new(RefCell::new(10));
|
|
|
|
let mut pool = LocalPool::new();
|
|
let spawner = pool.spawner();
|
|
for _ in 0..3 {
|
|
let wakeups_remaining = Rc::clone(&wakeups_remaining);
|
|
spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
|
|
}
|
|
|
|
spawner.spawn(future::ready(())).unwrap();
|
|
|
|
// The `ready` future should complete.
|
|
assert!(pool.try_run_one());
|
|
|
|
// The self-waking futures are each polled once.
|
|
assert_eq!(*wakeups_remaining.borrow(), 7);
|
|
}
|