summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-channel/tests/oneshot.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-channel/tests/oneshot.rs')
-rw-r--r--third_party/rust/futures-channel/tests/oneshot.rs265
1 files changed, 265 insertions, 0 deletions
diff --git a/third_party/rust/futures-channel/tests/oneshot.rs b/third_party/rust/futures-channel/tests/oneshot.rs
new file mode 100644
index 0000000000..5194ce468f
--- /dev/null
+++ b/third_party/rust/futures-channel/tests/oneshot.rs
@@ -0,0 +1,265 @@
+use futures::channel::oneshot::{self, Sender};
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt, poll_fn};
+use futures::task::{Context, Poll};
+use futures_test::task::panic_waker_ref;
+use std::pin::Pin;
+use std::sync::mpsc;
+use std::thread;
+
+#[test]
+fn smoke_poll() {
+ let (mut tx, rx) = oneshot::channel::<u32>();
+ let mut rx = Some(rx);
+ let f = poll_fn(|cx| {
+ assert!(tx.poll_canceled(cx).is_pending());
+ assert!(tx.poll_canceled(cx).is_pending());
+ drop(rx.take());
+ assert!(tx.poll_canceled(cx).is_ready());
+ assert!(tx.poll_canceled(cx).is_ready());
+ Poll::Ready(())
+ });
+
+ block_on(f);
+}
+
+#[test]
+fn cancel_notifies() {
+ let (tx, rx) = oneshot::channel::<u32>();
+
+ let t = thread::spawn(|| {
+ block_on(WaitForCancel { tx });
+ });
+ drop(rx);
+ t.join().unwrap();
+}
+
+struct WaitForCancel {
+ tx: Sender<u32>,
+}
+
+impl Future for WaitForCancel {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.tx.poll_canceled(cx)
+ }
+}
+
+#[test]
+fn cancel_lots() {
+ let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
+ let t = thread::spawn(move || {
+ for (tx, tx2) in rx {
+ block_on(WaitForCancel { tx });
+ tx2.send(()).unwrap();
+ }
+ });
+
+ for _ in 0..20000 {
+ let (otx, orx) = oneshot::channel::<u32>();
+ let (tx2, rx2) = mpsc::channel();
+ tx.send((otx, tx2)).unwrap();
+ drop(orx);
+ rx2.recv().unwrap();
+ }
+ drop(tx);
+
+ t.join().unwrap();
+}
+
+#[test]
+fn cancel_after_sender_drop_doesnt_notify() {
+ let (mut tx, rx) = oneshot::channel::<u32>();
+ let mut cx = Context::from_waker(panic_waker_ref());
+ assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending);
+ drop(tx);
+ drop(rx);
+}
+
+#[test]
+fn close() {
+ let (mut tx, mut rx) = oneshot::channel::<u32>();
+ rx.close();
+ block_on(poll_fn(|cx| {
+ match rx.poll_unpin(cx) {
+ Poll::Ready(Err(_)) => {},
+ _ => panic!(),
+ };
+ assert!(tx.poll_canceled(cx).is_ready());
+ Poll::Ready(())
+ }));
+}
+
+#[test]
+fn close_wakes() {
+ let (tx, mut rx) = oneshot::channel::<u32>();
+ let (tx2, rx2) = mpsc::channel();
+ let t = thread::spawn(move || {
+ rx.close();
+ rx2.recv().unwrap();
+ });
+ block_on(WaitForCancel { tx });
+ tx2.send(()).unwrap();
+ t.join().unwrap();
+}
+
+#[test]
+fn is_canceled() {
+ let (tx, rx) = oneshot::channel::<u32>();
+ assert!(!tx.is_canceled());
+ drop(rx);
+ assert!(tx.is_canceled());
+}
+
+#[test]
+fn cancel_sends() {
+ let (tx, rx) = mpsc::channel::<Sender<_>>();
+ let t = thread::spawn(move || {
+ for otx in rx {
+ let _ = otx.send(42);
+ }
+ });
+
+ for _ in 0..20000 {
+ let (otx, mut orx) = oneshot::channel::<u32>();
+ tx.send(otx).unwrap();
+
+ orx.close();
+ let _ = block_on(orx);
+ }
+
+ drop(tx);
+ t.join().unwrap();
+}
+
+// #[test]
+// fn spawn_sends_items() {
+// let core = local_executor::Core::new();
+// let future = ok::<_, ()>(1);
+// let rx = spawn(future, &core);
+// assert_eq!(core.run(rx).unwrap(), 1);
+// }
+//
+// #[test]
+// fn spawn_kill_dead_stream() {
+// use std::thread;
+// use std::time::Duration;
+// use futures::future::Either;
+// use futures::sync::oneshot;
+//
+// // a future which never returns anything (forever accepting incoming
+// // connections), but dropping it leads to observable side effects
+// // (like closing listening sockets, releasing limited resources,
+// // ...)
+// #[derive(Debug)]
+// struct Dead {
+// // when dropped you should get Err(oneshot::Canceled) on the
+// // receiving end
+// done: oneshot::Sender<()>,
+// }
+// impl Future for Dead {
+// type Item = ();
+// type Error = ();
+//
+// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+// Ok(Poll::Pending)
+// }
+// }
+//
+// // need to implement a timeout for the test, as it would hang
+// // forever right now
+// let (timeout_tx, timeout_rx) = oneshot::channel();
+// thread::spawn(move || {
+// thread::sleep(Duration::from_millis(1000));
+// let _ = timeout_tx.send(());
+// });
+//
+// let core = local_executor::Core::new();
+// let (done_tx, done_rx) = oneshot::channel();
+// let future = Dead{done: done_tx};
+// let rx = spawn(future, &core);
+// let res = core.run(
+// Ok::<_, ()>(())
+// .into_future()
+// .then(move |_| {
+// // now drop the spawned future: maybe some timeout exceeded,
+// // or some connection on this end was closed by the remote
+// // end.
+// drop(rx);
+// // and wait for the spawned future to release its resources
+// done_rx
+// })
+// .select2(timeout_rx)
+// );
+// match res {
+// Err(Either::A((oneshot::Canceled, _))) => (),
+// Ok(Either::B(((), _))) => {
+// panic!("dead future wasn't canceled (timeout)");
+// },
+// _ => {
+// panic!("dead future wasn't canceled (unexpected result)");
+// },
+// }
+// }
+//
+// #[test]
+// fn spawn_dont_kill_forgot_dead_stream() {
+// use std::thread;
+// use std::time::Duration;
+// use futures::future::Either;
+// use futures::sync::oneshot;
+//
+// // a future which never returns anything (forever accepting incoming
+// // connections), but dropping it leads to observable side effects
+// // (like closing listening sockets, releasing limited resources,
+// // ...)
+// #[derive(Debug)]
+// struct Dead {
+// // when dropped you should get Err(oneshot::Canceled) on the
+// // receiving end
+// done: oneshot::Sender<()>,
+// }
+// impl Future for Dead {
+// type Item = ();
+// type Error = ();
+//
+// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+// Ok(Poll::Pending)
+// }
+// }
+//
+// // need to implement a timeout for the test, as it would hang
+// // forever right now
+// let (timeout_tx, timeout_rx) = oneshot::channel();
+// thread::spawn(move || {
+// thread::sleep(Duration::from_millis(1000));
+// let _ = timeout_tx.send(());
+// });
+//
+// let core = local_executor::Core::new();
+// let (done_tx, done_rx) = oneshot::channel();
+// let future = Dead{done: done_tx};
+// let rx = spawn(future, &core);
+// let res = core.run(
+// Ok::<_, ()>(())
+// .into_future()
+// .then(move |_| {
+// // forget the spawned future: should keep running, i.e. hit
+// // the timeout below.
+// rx.forget();
+// // and wait for the spawned future to release its resources
+// done_rx
+// })
+// .select2(timeout_rx)
+// );
+// match res {
+// Err(Either::A((oneshot::Canceled, _))) => {
+// panic!("forgotten dead future was canceled");
+// },
+// Ok(Either::B(((), _))) => (), // reached timeout
+// _ => {
+// panic!("forgotten dead future was canceled (unexpected result)");
+// },
+// }
+// }