summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs')
-rw-r--r--third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs189
1 files changed, 189 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs b/third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs
new file mode 100644
index 0000000000..55b0ca5ac2
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs
@@ -0,0 +1,189 @@
+extern crate futures;
+
+use futures::prelude::*;
+use futures::future;
+use futures::unsync::oneshot::{channel, Canceled, spawn};
+
+mod support;
+use support::local_executor;
+
+#[test]
+fn smoke() {
+ let (tx, rx) = channel();
+ tx.send(33).unwrap();
+ assert_eq!(rx.wait().unwrap(), 33);
+}
+
+#[test]
+fn canceled() {
+ let (_, rx) = channel::<()>();
+ assert_eq!(rx.wait().unwrap_err(), Canceled);
+}
+
+#[test]
+fn poll_cancel() {
+ let (mut tx, _) = channel::<()>();
+ assert!(tx.poll_cancel().unwrap().is_ready());
+}
+
+#[test]
+fn tx_complete_rx_unparked() {
+ let (tx, rx) = channel();
+
+ let res = rx.join(future::lazy(move || {
+ tx.send(55).unwrap();
+ Ok(11)
+ }));
+ assert_eq!(res.wait().unwrap(), (55, 11));
+}
+
+#[test]
+fn tx_dropped_rx_unparked() {
+ let (tx, rx) = channel::<i32>();
+
+ let res = rx.join(future::lazy(move || {
+ let _tx = tx;
+ Ok(11)
+ }));
+ assert_eq!(res.wait().unwrap_err(), Canceled);
+}
+
+
+#[test]
+fn is_canceled() {
+ let (tx, rx) = channel::<u32>();
+ assert!(!tx.is_canceled());
+ drop(rx);
+ assert!(tx.is_canceled());
+}
+
+#[test]
+fn spawn_sends_items() {
+ let core = local_executor::Core::new();
+ let future = future::ok::<_, ()>(1);
+ let rx = spawn(future, &core);
+ assert_eq!(core.run(rx).unwrap(), 1);
+}
+
+#[test]
+fn spawn_kill_dead_stream() {
+ use std::thread;
+ use std::time::Duration;
+ use futures::future::Either;
+ use futures::sync::oneshot;
+
+ // a future which never returns anything (forever accepting incoming
+ // connections), but dropping it leads to observable side effects
+ // (like closing listening sockets, releasing limited resources,
+ // ...)
+ #[derive(Debug)]
+ struct Dead {
+ // when dropped you should get Err(oneshot::Canceled) on the
+ // receiving end
+ done: oneshot::Sender<()>,
+ }
+ impl Future for Dead {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ Ok(Async::NotReady)
+ }
+ }
+
+ // need to implement a timeout for the test, as it would hang
+ // forever right now
+ let (timeout_tx, timeout_rx) = oneshot::channel();
+ thread::spawn(move || {
+ thread::sleep(Duration::from_millis(1000));
+ let _ = timeout_tx.send(());
+ });
+
+ let core = local_executor::Core::new();
+ let (done_tx, done_rx) = oneshot::channel();
+ let future = Dead{done: done_tx};
+ let rx = spawn(future, &core);
+ let res = core.run(
+ Ok::<_, ()>(())
+ .into_future()
+ .then(move |_| {
+ // now drop the spawned future: maybe some timeout exceeded,
+ // or some connection on this end was closed by the remote
+ // end.
+ drop(rx);
+ // and wait for the spawned future to release its resources
+ done_rx
+ })
+ .select2(timeout_rx)
+ );
+ match res {
+ Err(Either::A((oneshot::Canceled, _))) => (),
+ Ok(Either::B(((), _))) => {
+ panic!("dead future wasn't canceled (timeout)");
+ },
+ _ => {
+ panic!("dead future wasn't canceled (unexpected result)");
+ },
+ }
+}
+
+#[test]
+fn spawn_dont_kill_forgot_dead_stream() {
+ use std::thread;
+ use std::time::Duration;
+ use futures::future::Either;
+ use futures::sync::oneshot;
+
+ // a future which never returns anything (forever accepting incoming
+ // connections), but dropping it leads to observable side effects
+ // (like closing listening sockets, releasing limited resources,
+ // ...)
+ #[derive(Debug)]
+ struct Dead {
+ // when dropped you should get Err(oneshot::Canceled) on the
+ // receiving end
+ done: oneshot::Sender<()>,
+ }
+ impl Future for Dead {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ Ok(Async::NotReady)
+ }
+ }
+
+ // need to implement a timeout for the test, as it would hang
+ // forever right now
+ let (timeout_tx, timeout_rx) = oneshot::channel();
+ thread::spawn(move || {
+ thread::sleep(Duration::from_millis(1000));
+ let _ = timeout_tx.send(());
+ });
+
+ let core = local_executor::Core::new();
+ let (done_tx, done_rx) = oneshot::channel();
+ let future = Dead{done: done_tx};
+ let rx = spawn(future, &core);
+ let res = core.run(
+ Ok::<_, ()>(())
+ .into_future()
+ .then(move |_| {
+ // forget the spawned future: should keep running, i.e. hit
+ // the timeout below.
+ rx.forget();
+ // and wait for the spawned future to release its resources
+ done_rx
+ })
+ .select2(timeout_rx)
+ );
+ match res {
+ Err(Either::A((oneshot::Canceled, _))) => {
+ panic!("forgotten dead future was canceled");
+ },
+ Ok(Either::B(((), _))) => (), // reached timeout
+ _ => {
+ panic!("forgotten dead future was canceled (unexpected result)");
+ },
+ }
+}