diff options
Diffstat (limited to 'third_party/rust/tokio-current-thread/tests/current_thread.rs')
-rw-r--r-- | third_party/rust/tokio-current-thread/tests/current_thread.rs | 837 |
1 files changed, 837 insertions, 0 deletions
diff --git a/third_party/rust/tokio-current-thread/tests/current_thread.rs b/third_party/rust/tokio-current-thread/tests/current_thread.rs new file mode 100644 index 0000000000..0ed0ca2467 --- /dev/null +++ b/third_party/rust/tokio-current-thread/tests/current_thread.rs @@ -0,0 +1,837 @@ +extern crate futures; +extern crate tokio_current_thread; +extern crate tokio_executor; + +use tokio_current_thread::{block_on_all, CurrentThread}; + +use std::any::Any; +use std::cell::{Cell, RefCell}; +use std::rc::Rc; +use std::thread; +use std::time::Duration; + +use futures::future::{self, lazy}; +use futures::task; +// This is not actually unused --- we need this trait to be in scope for +// the tests that sue TaskExecutor::current().execute(). The compiler +// doesn't realise that. +#[allow(unused_imports)] +use futures::future::Executor as _futures_Executor; +use futures::prelude::*; +use futures::sync::oneshot; + +mod from_block_on_all { + use super::*; + fn test<F: Fn(Box<Future<Item = (), Error = ()>>) + 'static>(spawn: F) { + let cnt = Rc::new(Cell::new(0)); + let c = cnt.clone(); + + let msg = tokio_current_thread::block_on_all(lazy(move || { + c.set(1 + c.get()); + + // Spawn! + spawn(Box::new(lazy(move || { + c.set(1 + c.get()); + Ok::<(), ()>(()) + }))); + + Ok::<_, ()>("hello") + })) + .unwrap(); + + assert_eq!(2, cnt.get()); + assert_eq!(msg, "hello"); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn) + } + + #[test] + fn execute() { + test(|f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }); + } +} + +#[test] +fn block_waits() { + let (tx, rx) = oneshot::channel(); + + thread::spawn(|| { + thread::sleep(Duration::from_millis(1000)); + tx.send(()).unwrap(); + }); + + let cnt = Rc::new(Cell::new(0)); + let cnt2 = cnt.clone(); + + block_on_all(rx.then(move |_| { + cnt.set(1 + cnt.get()); + Ok::<_, ()>(()) + })) + .unwrap(); + + assert_eq!(1, cnt2.get()); +} + +#[test] +fn spawn_many() { + const ITER: usize = 200; + + let cnt = Rc::new(Cell::new(0)); + let mut tokio_current_thread = CurrentThread::new(); + + for _ in 0..ITER { + let cnt = cnt.clone(); + tokio_current_thread.spawn(lazy(move || { + cnt.set(1 + cnt.get()); + Ok::<(), ()>(()) + })); + } + + tokio_current_thread.run().unwrap(); + + assert_eq!(cnt.get(), ITER); +} + +mod does_not_set_global_executor_by_default { + use super::*; + + fn test<F: Fn(Box<Future<Item = (), Error = ()> + Send>) -> Result<(), E> + 'static, E>( + spawn: F, + ) { + block_on_all(lazy(|| { + spawn(Box::new(lazy(|| ok()))).unwrap_err(); + ok() + })) + .unwrap() + } + + #[test] + fn spawn() { + use tokio_executor::Executor; + test(|f| tokio_executor::DefaultExecutor::current().spawn(f)) + } + + #[test] + fn execute() { + test(|f| tokio_executor::DefaultExecutor::current().execute(f)) + } +} + +mod from_block_on_future { + use super::*; + + fn test<F: Fn(Box<Future<Item = (), Error = ()>>)>(spawn: F) { + let cnt = Rc::new(Cell::new(0)); + + let mut tokio_current_thread = CurrentThread::new(); + + tokio_current_thread + .block_on(lazy(|| { + let cnt = cnt.clone(); + + spawn(Box::new(lazy(move || { + cnt.set(1 + cnt.get()); + Ok(()) + }))); + + Ok::<_, ()>(()) + })) + .unwrap(); + + tokio_current_thread.run().unwrap(); + + assert_eq!(1, cnt.get()); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn); + } + + #[test] + fn execute() { + test(|f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }); + } +} + +struct Never(Rc<()>); + +impl Future for Never { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + Ok(Async::NotReady) + } +} + +mod outstanding_tasks_are_dropped_when_executor_is_dropped { + use super::*; + + fn test<F, G>(spawn: F, dotspawn: G) + where + F: Fn(Box<Future<Item = (), Error = ()>>) + 'static, + G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>), + { + let mut rc = Rc::new(()); + + let mut tokio_current_thread = CurrentThread::new(); + dotspawn(&mut tokio_current_thread, Box::new(Never(rc.clone()))); + + drop(tokio_current_thread); + + // Ensure the daemon is dropped + assert!(Rc::get_mut(&mut rc).is_some()); + + // Using the global spawn fn + + let mut rc = Rc::new(()); + + let mut tokio_current_thread = CurrentThread::new(); + + tokio_current_thread + .block_on(lazy(|| { + spawn(Box::new(Never(rc.clone()))); + Ok::<_, ()>(()) + })) + .unwrap(); + + drop(tokio_current_thread); + + // Ensure the daemon is dropped + assert!(Rc::get_mut(&mut rc).is_some()); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn, |rt, f| { + rt.spawn(f); + }) + } + + #[test] + fn execute() { + test( + |f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }, + // Note: `CurrentThread` doesn't currently implement + // `futures::Executor`, so we'll call `.spawn(...)` rather than + // `.execute(...)` for now. If `CurrentThread` is changed to + // implement Executor, change this to `.execute(...).unwrap()`. + |rt, f| { + rt.spawn(f); + }, + ); + } +} + +#[test] +#[should_panic] +fn nesting_run() { + block_on_all(lazy(|| { + block_on_all(lazy(|| ok())).unwrap(); + + ok() + })) + .unwrap(); +} + +mod run_in_future { + use super::*; + + #[test] + #[should_panic] + fn spawn() { + block_on_all(lazy(|| { + tokio_current_thread::spawn(lazy(|| { + block_on_all(lazy(|| ok())).unwrap(); + ok() + })); + ok() + })) + .unwrap(); + } + + #[test] + #[should_panic] + fn execute() { + block_on_all(lazy(|| { + tokio_current_thread::TaskExecutor::current() + .execute(lazy(|| { + block_on_all(lazy(|| ok())).unwrap(); + ok() + })) + .unwrap(); + ok() + })) + .unwrap(); + } +} + +#[test] +fn tick_on_infini_future() { + let num = Rc::new(Cell::new(0)); + + struct Infini { + num: Rc<Cell<usize>>, + } + + impl Future for Infini { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + self.num.set(1 + self.num.get()); + task::current().notify(); + Ok(Async::NotReady) + } + } + + CurrentThread::new() + .spawn(Infini { num: num.clone() }) + .turn(None) + .unwrap(); + + assert_eq!(1, num.get()); +} + +mod tasks_are_scheduled_fairly { + use super::*; + struct Spin { + state: Rc<RefCell<[i32; 2]>>, + idx: usize, + } + + impl Future for Spin { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + let mut state = self.state.borrow_mut(); + + if self.idx == 0 { + let diff = state[0] - state[1]; + + assert!(diff.abs() <= 1); + + if state[0] >= 50 { + return Ok(().into()); + } + } + + state[self.idx] += 1; + + if state[self.idx] >= 100 { + return Ok(().into()); + } + + task::current().notify(); + Ok(Async::NotReady) + } + } + + fn test<F: Fn(Spin)>(spawn: F) { + let state = Rc::new(RefCell::new([0, 0])); + + block_on_all(lazy(|| { + spawn(Spin { + state: state.clone(), + idx: 0, + }); + + spawn(Spin { + state: state, + idx: 1, + }); + + ok() + })) + .unwrap(); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn) + } + + #[test] + fn execute() { + test(|f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }) + } +} + +mod and_turn { + use super::*; + + fn test<F, G>(spawn: F, dotspawn: G) + where + F: Fn(Box<Future<Item = (), Error = ()>>) + 'static, + G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>), + { + let cnt = Rc::new(Cell::new(0)); + let c = cnt.clone(); + + let mut tokio_current_thread = CurrentThread::new(); + + // Spawn a basic task to get the executor to turn + dotspawn(&mut tokio_current_thread, Box::new(lazy(move || Ok(())))); + + // Turn once... + tokio_current_thread.turn(None).unwrap(); + + dotspawn( + &mut tokio_current_thread, + Box::new(lazy(move || { + c.set(1 + c.get()); + + // Spawn! + spawn(Box::new(lazy(move || { + c.set(1 + c.get()); + Ok::<(), ()>(()) + }))); + + Ok(()) + })), + ); + + // This does not run the newly spawned thread + tokio_current_thread.turn(None).unwrap(); + assert_eq!(1, cnt.get()); + + // This runs the newly spawned thread + tokio_current_thread.turn(None).unwrap(); + assert_eq!(2, cnt.get()); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn, |rt, f| { + rt.spawn(f); + }) + } + + #[test] + fn execute() { + test( + |f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }, + // Note: `CurrentThread` doesn't currently implement + // `futures::Executor`, so we'll call `.spawn(...)` rather than + // `.execute(...)` for now. If `CurrentThread` is changed to + // implement Executor, change this to `.execute(...).unwrap()`. + |rt, f| { + rt.spawn(f); + }, + ); + } + +} + +mod in_drop { + use super::*; + struct OnDrop<F: FnOnce()>(Option<F>); + + impl<F: FnOnce()> Drop for OnDrop<F> { + fn drop(&mut self) { + (self.0.take().unwrap())(); + } + } + + struct MyFuture { + _data: Box<Any>, + } + + impl Future for MyFuture { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + Ok(().into()) + } + } + + fn test<F, G>(spawn: F, dotspawn: G) + where + F: Fn(Box<Future<Item = (), Error = ()>>) + 'static, + G: Fn(&mut CurrentThread, Box<Future<Item = (), Error = ()>>), + { + let mut tokio_current_thread = CurrentThread::new(); + + let (tx, rx) = oneshot::channel(); + + dotspawn( + &mut tokio_current_thread, + Box::new(MyFuture { + _data: Box::new(OnDrop(Some(move || { + spawn(Box::new(lazy(move || { + tx.send(()).unwrap(); + Ok(()) + }))); + }))), + }), + ); + + tokio_current_thread.block_on(rx).unwrap(); + tokio_current_thread.run().unwrap(); + } + + #[test] + fn spawn() { + test(tokio_current_thread::spawn, |rt, f| { + rt.spawn(f); + }) + } + + #[test] + fn execute() { + test( + |f| { + tokio_current_thread::TaskExecutor::current() + .execute(f) + .unwrap(); + }, + // Note: `CurrentThread` doesn't currently implement + // `futures::Executor`, so we'll call `.spawn(...)` rather than + // `.execute(...)` for now. If `CurrentThread` is changed to + // implement Executor, change this to `.execute(...).unwrap()`. + |rt, f| { + rt.spawn(f); + }, + ); + } + +} + +#[test] +fn hammer_turn() { + use futures::sync::mpsc; + + const ITER: usize = 100; + const N: usize = 100; + const THREADS: usize = 4; + + for _ in 0..ITER { + let mut ths = vec![]; + + // Add some jitter + for _ in 0..THREADS { + let th = thread::spawn(|| { + let mut tokio_current_thread = CurrentThread::new(); + + let (tx, rx) = mpsc::unbounded(); + + tokio_current_thread.spawn({ + let cnt = Rc::new(Cell::new(0)); + let c = cnt.clone(); + + rx.for_each(move |_| { + c.set(1 + c.get()); + Ok(()) + }) + .map_err(|e| panic!("err={:?}", e)) + .map(move |v| { + assert_eq!(N, cnt.get()); + v + }) + }); + + thread::spawn(move || { + for _ in 0..N { + tx.unbounded_send(()).unwrap(); + thread::yield_now(); + } + }); + + while !tokio_current_thread.is_idle() { + tokio_current_thread.turn(None).unwrap(); + } + }); + + ths.push(th); + } + + for th in ths { + th.join().unwrap(); + } + } +} + +#[test] +fn turn_has_polled() { + let mut tokio_current_thread = CurrentThread::new(); + + // Spawn oneshot receiver + let (sender, receiver) = oneshot::channel::<()>(); + tokio_current_thread.spawn(receiver.then(|_| Ok(()))); + + // Turn once... + let res = tokio_current_thread + .turn(Some(Duration::from_millis(0))) + .unwrap(); + + // Should've polled the receiver once, but considered it not ready + assert!(res.has_polled()); + + // Turn another time + let res = tokio_current_thread + .turn(Some(Duration::from_millis(0))) + .unwrap(); + + // Should've polled nothing, the receiver is not ready yet + assert!(!res.has_polled()); + + // Make the receiver ready + sender.send(()).unwrap(); + + // Turn another time + let res = tokio_current_thread + .turn(Some(Duration::from_millis(0))) + .unwrap(); + + // Should've polled the receiver, it's ready now + assert!(res.has_polled()); + + // Now the executor should be empty + assert!(tokio_current_thread.is_idle()); + let res = tokio_current_thread + .turn(Some(Duration::from_millis(0))) + .unwrap(); + + // So should've polled nothing + assert!(!res.has_polled()); +} + +// Our own mock Park that is never really waiting and the only +// thing it does is to send, on request, something (once) to a oneshot +// channel +struct MyPark { + sender: Option<oneshot::Sender<()>>, + send_now: Rc<Cell<bool>>, +} + +struct MyUnpark; + +impl tokio_executor::park::Park for MyPark { + type Unpark = MyUnpark; + type Error = (); + + fn unpark(&self) -> Self::Unpark { + MyUnpark + } + + fn park(&mut self) -> Result<(), Self::Error> { + // If called twice with send_now, this will intentionally panic + if self.send_now.get() { + self.sender.take().unwrap().send(()).unwrap(); + } + + Ok(()) + } + + fn park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error> { + self.park() + } +} + +impl tokio_executor::park::Unpark for MyUnpark { + fn unpark(&self) {} +} + +#[test] +fn turn_fair() { + let send_now = Rc::new(Cell::new(false)); + + let (sender, receiver) = oneshot::channel::<()>(); + let (sender_2, receiver_2) = oneshot::channel::<()>(); + let (sender_3, receiver_3) = oneshot::channel::<()>(); + + let my_park = MyPark { + sender: Some(sender_3), + send_now: send_now.clone(), + }; + + let mut tokio_current_thread = CurrentThread::new_with_park(my_park); + + let receiver_1_done = Rc::new(Cell::new(false)); + let receiver_1_done_clone = receiver_1_done.clone(); + + // Once an item is received on the oneshot channel, it will immediately + // immediately make the second oneshot channel ready + tokio_current_thread.spawn(receiver.map_err(|_| unreachable!()).and_then(move |_| { + sender_2.send(()).unwrap(); + receiver_1_done_clone.set(true); + + Ok(()) + })); + + let receiver_2_done = Rc::new(Cell::new(false)); + let receiver_2_done_clone = receiver_2_done.clone(); + + tokio_current_thread.spawn(receiver_2.map_err(|_| unreachable!()).and_then(move |_| { + receiver_2_done_clone.set(true); + Ok(()) + })); + + // The third receiver is only woken up from our Park implementation, it simulates + // e.g. a socket that first has to be polled to know if it is ready now + let receiver_3_done = Rc::new(Cell::new(false)); + let receiver_3_done_clone = receiver_3_done.clone(); + + tokio_current_thread.spawn(receiver_3.map_err(|_| unreachable!()).and_then(move |_| { + receiver_3_done_clone.set(true); + Ok(()) + })); + + // First turn should've polled both and considered them not ready + let res = tokio_current_thread + .turn(Some(Duration::from_millis(0))) + .unwrap(); + assert!(res.has_polled()); + + // Next turn should've polled nothing + let res = tokio_current_thread + .turn(Some(Duration::from_millis(0))) + .unwrap(); + assert!(!res.has_polled()); + + assert!(!receiver_1_done.get()); + assert!(!receiver_2_done.get()); + assert!(!receiver_3_done.get()); + + // After this the receiver future will wake up the second receiver future, + // so there are pending futures again + sender.send(()).unwrap(); + + // Now the first receiver should be done, the second receiver should be ready + // to be polled again and the socket not yet + let res = tokio_current_thread.turn(None).unwrap(); + assert!(res.has_polled()); + + assert!(receiver_1_done.get()); + assert!(!receiver_2_done.get()); + assert!(!receiver_3_done.get()); + + // Now let our park implementation know that it should send something to sender 3 + send_now.set(true); + + // This should resolve the second receiver directly, but also poll the socket + // and read the packet from it. If it didn't do both here, we would handle + // futures that are woken up from the reactor and directly unfairly and would + // favour the ones that are woken up directly. + let res = tokio_current_thread.turn(None).unwrap(); + assert!(res.has_polled()); + + assert!(receiver_1_done.get()); + assert!(receiver_2_done.get()); + assert!(receiver_3_done.get()); + + // Don't send again + send_now.set(false); + + // Now we should be idle and turning should not poll anything + assert!(tokio_current_thread.is_idle()); + let res = tokio_current_thread.turn(None).unwrap(); + assert!(!res.has_polled()); +} + +#[test] +fn spawn_from_other_thread() { + let mut current_thread = CurrentThread::new(); + + let handle = current_thread.handle(); + let (sender, receiver) = oneshot::channel::<()>(); + + thread::spawn(move || { + handle + .spawn(lazy(move || { + sender.send(()).unwrap(); + Ok(()) + })) + .unwrap(); + }); + + let _ = current_thread.block_on(receiver).unwrap(); +} + +#[test] +fn spawn_from_other_thread_unpark() { + use std::sync::mpsc::channel as mpsc_channel; + + let mut current_thread = CurrentThread::new(); + + let handle = current_thread.handle(); + let (sender_1, receiver_1) = oneshot::channel::<()>(); + let (sender_2, receiver_2) = mpsc_channel::<()>(); + + thread::spawn(move || { + let _ = receiver_2.recv().unwrap(); + + handle + .spawn(lazy(move || { + sender_1.send(()).unwrap(); + Ok(()) + })) + .unwrap(); + }); + + // Ensure that unparking the executor works correctly. It will first + // check if there are new futures (there are none), then execute the + // lazy future below which will cause the future to be spawned from + // the other thread. Then the executor will park but should be woken + // up because *now* we have a new future to schedule + let _ = current_thread + .block_on( + lazy(move || { + sender_2.send(()).unwrap(); + Ok(()) + }) + .and_then(|_| receiver_1), + ) + .unwrap(); +} + +#[test] +fn spawn_from_executor_with_handle() { + let mut current_thread = CurrentThread::new(); + let handle = current_thread.handle(); + let (tx, rx) = oneshot::channel(); + + current_thread.spawn(lazy(move || { + handle + .spawn(lazy(move || { + tx.send(()).unwrap(); + Ok(()) + })) + .unwrap(); + Ok::<_, ()>(()) + })); + + current_thread.run(); + + rx.wait().unwrap(); +} + +fn ok() -> future::FutureResult<(), ()> { + future::ok(()) +} |