diff options
Diffstat (limited to 'third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs')
-rw-r--r-- | third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs | 189 |
1 files changed, 189 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs b/third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs new file mode 100644 index 0000000000..55b0ca5ac2 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs @@ -0,0 +1,189 @@ +extern crate futures; + +use futures::prelude::*; +use futures::future; +use futures::unsync::oneshot::{channel, Canceled, spawn}; + +mod support; +use support::local_executor; + +#[test] +fn smoke() { + let (tx, rx) = channel(); + tx.send(33).unwrap(); + assert_eq!(rx.wait().unwrap(), 33); +} + +#[test] +fn canceled() { + let (_, rx) = channel::<()>(); + assert_eq!(rx.wait().unwrap_err(), Canceled); +} + +#[test] +fn poll_cancel() { + let (mut tx, _) = channel::<()>(); + assert!(tx.poll_cancel().unwrap().is_ready()); +} + +#[test] +fn tx_complete_rx_unparked() { + let (tx, rx) = channel(); + + let res = rx.join(future::lazy(move || { + tx.send(55).unwrap(); + Ok(11) + })); + assert_eq!(res.wait().unwrap(), (55, 11)); +} + +#[test] +fn tx_dropped_rx_unparked() { + let (tx, rx) = channel::<i32>(); + + let res = rx.join(future::lazy(move || { + let _tx = tx; + Ok(11) + })); + assert_eq!(res.wait().unwrap_err(), Canceled); +} + + +#[test] +fn is_canceled() { + let (tx, rx) = channel::<u32>(); + assert!(!tx.is_canceled()); + drop(rx); + assert!(tx.is_canceled()); +} + +#[test] +fn spawn_sends_items() { + let core = local_executor::Core::new(); + let future = future::ok::<_, ()>(1); + let rx = spawn(future, &core); + assert_eq!(core.run(rx).unwrap(), 1); +} + +#[test] +fn spawn_kill_dead_stream() { + use std::thread; + use std::time::Duration; + use futures::future::Either; + use futures::sync::oneshot; + + // a future which never returns anything (forever accepting incoming + // connections), but dropping it leads to observable side effects + // (like closing listening sockets, releasing limited resources, + // ...) + #[derive(Debug)] + struct Dead { + // when dropped you should get Err(oneshot::Canceled) on the + // receiving end + done: oneshot::Sender<()>, + } + impl Future for Dead { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + Ok(Async::NotReady) + } + } + + // need to implement a timeout for the test, as it would hang + // forever right now + let (timeout_tx, timeout_rx) = oneshot::channel(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(1000)); + let _ = timeout_tx.send(()); + }); + + let core = local_executor::Core::new(); + let (done_tx, done_rx) = oneshot::channel(); + let future = Dead{done: done_tx}; + let rx = spawn(future, &core); + let res = core.run( + Ok::<_, ()>(()) + .into_future() + .then(move |_| { + // now drop the spawned future: maybe some timeout exceeded, + // or some connection on this end was closed by the remote + // end. + drop(rx); + // and wait for the spawned future to release its resources + done_rx + }) + .select2(timeout_rx) + ); + match res { + Err(Either::A((oneshot::Canceled, _))) => (), + Ok(Either::B(((), _))) => { + panic!("dead future wasn't canceled (timeout)"); + }, + _ => { + panic!("dead future wasn't canceled (unexpected result)"); + }, + } +} + +#[test] +fn spawn_dont_kill_forgot_dead_stream() { + use std::thread; + use std::time::Duration; + use futures::future::Either; + use futures::sync::oneshot; + + // a future which never returns anything (forever accepting incoming + // connections), but dropping it leads to observable side effects + // (like closing listening sockets, releasing limited resources, + // ...) + #[derive(Debug)] + struct Dead { + // when dropped you should get Err(oneshot::Canceled) on the + // receiving end + done: oneshot::Sender<()>, + } + impl Future for Dead { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + Ok(Async::NotReady) + } + } + + // need to implement a timeout for the test, as it would hang + // forever right now + let (timeout_tx, timeout_rx) = oneshot::channel(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(1000)); + let _ = timeout_tx.send(()); + }); + + let core = local_executor::Core::new(); + let (done_tx, done_rx) = oneshot::channel(); + let future = Dead{done: done_tx}; + let rx = spawn(future, &core); + let res = core.run( + Ok::<_, ()>(()) + .into_future() + .then(move |_| { + // forget the spawned future: should keep running, i.e. hit + // the timeout below. + rx.forget(); + // and wait for the spawned future to release its resources + done_rx + }) + .select2(timeout_rx) + ); + match res { + Err(Either::A((oneshot::Canceled, _))) => { + panic!("forgotten dead future was canceled"); + }, + Ok(Either::B(((), _))) => (), // reached timeout + _ => { + panic!("forgotten dead future was canceled (unexpected result)"); + }, + } +} |