diff options
Diffstat (limited to 'third_party/rust/futures-channel/tests/oneshot.rs')
-rw-r--r-- | third_party/rust/futures-channel/tests/oneshot.rs | 265 |
1 files changed, 265 insertions, 0 deletions
diff --git a/third_party/rust/futures-channel/tests/oneshot.rs b/third_party/rust/futures-channel/tests/oneshot.rs new file mode 100644 index 0000000000..5194ce468f --- /dev/null +++ b/third_party/rust/futures-channel/tests/oneshot.rs @@ -0,0 +1,265 @@ +use futures::channel::oneshot::{self, Sender}; +use futures::executor::block_on; +use futures::future::{Future, FutureExt, poll_fn}; +use futures::task::{Context, Poll}; +use futures_test::task::panic_waker_ref; +use std::pin::Pin; +use std::sync::mpsc; +use std::thread; + +#[test] +fn smoke_poll() { + let (mut tx, rx) = oneshot::channel::<u32>(); + let mut rx = Some(rx); + let f = poll_fn(|cx| { + assert!(tx.poll_canceled(cx).is_pending()); + assert!(tx.poll_canceled(cx).is_pending()); + drop(rx.take()); + assert!(tx.poll_canceled(cx).is_ready()); + assert!(tx.poll_canceled(cx).is_ready()); + Poll::Ready(()) + }); + + block_on(f); +} + +#[test] +fn cancel_notifies() { + let (tx, rx) = oneshot::channel::<u32>(); + + let t = thread::spawn(|| { + block_on(WaitForCancel { tx }); + }); + drop(rx); + t.join().unwrap(); +} + +struct WaitForCancel { + tx: Sender<u32>, +} + +impl Future for WaitForCancel { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.tx.poll_canceled(cx) + } +} + +#[test] +fn cancel_lots() { + let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>(); + let t = thread::spawn(move || { + for (tx, tx2) in rx { + block_on(WaitForCancel { tx }); + tx2.send(()).unwrap(); + } + }); + + for _ in 0..20000 { + let (otx, orx) = oneshot::channel::<u32>(); + let (tx2, rx2) = mpsc::channel(); + tx.send((otx, tx2)).unwrap(); + drop(orx); + rx2.recv().unwrap(); + } + drop(tx); + + t.join().unwrap(); +} + +#[test] +fn cancel_after_sender_drop_doesnt_notify() { + let (mut tx, rx) = oneshot::channel::<u32>(); + let mut cx = Context::from_waker(panic_waker_ref()); + assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending); + drop(tx); + drop(rx); +} + +#[test] +fn close() { + let (mut tx, mut rx) = oneshot::channel::<u32>(); + rx.close(); + block_on(poll_fn(|cx| { + match rx.poll_unpin(cx) { + Poll::Ready(Err(_)) => {}, + _ => panic!(), + }; + assert!(tx.poll_canceled(cx).is_ready()); + Poll::Ready(()) + })); +} + +#[test] +fn close_wakes() { + let (tx, mut rx) = oneshot::channel::<u32>(); + let (tx2, rx2) = mpsc::channel(); + let t = thread::spawn(move || { + rx.close(); + rx2.recv().unwrap(); + }); + block_on(WaitForCancel { tx }); + tx2.send(()).unwrap(); + t.join().unwrap(); +} + +#[test] +fn is_canceled() { + let (tx, rx) = oneshot::channel::<u32>(); + assert!(!tx.is_canceled()); + drop(rx); + assert!(tx.is_canceled()); +} + +#[test] +fn cancel_sends() { + let (tx, rx) = mpsc::channel::<Sender<_>>(); + let t = thread::spawn(move || { + for otx in rx { + let _ = otx.send(42); + } + }); + + for _ in 0..20000 { + let (otx, mut orx) = oneshot::channel::<u32>(); + tx.send(otx).unwrap(); + + orx.close(); + let _ = block_on(orx); + } + + drop(tx); + t.join().unwrap(); +} + +// #[test] +// fn spawn_sends_items() { +// let core = local_executor::Core::new(); +// let future = ok::<_, ()>(1); +// let rx = spawn(future, &core); +// assert_eq!(core.run(rx).unwrap(), 1); +// } +// +// #[test] +// fn spawn_kill_dead_stream() { +// use std::thread; +// use std::time::Duration; +// use futures::future::Either; +// use futures::sync::oneshot; +// +// // a future which never returns anything (forever accepting incoming +// // connections), but dropping it leads to observable side effects +// // (like closing listening sockets, releasing limited resources, +// // ...) +// #[derive(Debug)] +// struct Dead { +// // when dropped you should get Err(oneshot::Canceled) on the +// // receiving end +// done: oneshot::Sender<()>, +// } +// impl Future for Dead { +// type Item = (); +// type Error = (); +// +// fn poll(&mut self) -> Poll<Self::Item, Self::Error> { +// Ok(Poll::Pending) +// } +// } +// +// // need to implement a timeout for the test, as it would hang +// // forever right now +// let (timeout_tx, timeout_rx) = oneshot::channel(); +// thread::spawn(move || { +// thread::sleep(Duration::from_millis(1000)); +// let _ = timeout_tx.send(()); +// }); +// +// let core = local_executor::Core::new(); +// let (done_tx, done_rx) = oneshot::channel(); +// let future = Dead{done: done_tx}; +// let rx = spawn(future, &core); +// let res = core.run( +// Ok::<_, ()>(()) +// .into_future() +// .then(move |_| { +// // now drop the spawned future: maybe some timeout exceeded, +// // or some connection on this end was closed by the remote +// // end. +// drop(rx); +// // and wait for the spawned future to release its resources +// done_rx +// }) +// .select2(timeout_rx) +// ); +// match res { +// Err(Either::A((oneshot::Canceled, _))) => (), +// Ok(Either::B(((), _))) => { +// panic!("dead future wasn't canceled (timeout)"); +// }, +// _ => { +// panic!("dead future wasn't canceled (unexpected result)"); +// }, +// } +// } +// +// #[test] +// fn spawn_dont_kill_forgot_dead_stream() { +// use std::thread; +// use std::time::Duration; +// use futures::future::Either; +// use futures::sync::oneshot; +// +// // a future which never returns anything (forever accepting incoming +// // connections), but dropping it leads to observable side effects +// // (like closing listening sockets, releasing limited resources, +// // ...) +// #[derive(Debug)] +// struct Dead { +// // when dropped you should get Err(oneshot::Canceled) on the +// // receiving end +// done: oneshot::Sender<()>, +// } +// impl Future for Dead { +// type Item = (); +// type Error = (); +// +// fn poll(&mut self) -> Poll<Self::Item, Self::Error> { +// Ok(Poll::Pending) +// } +// } +// +// // need to implement a timeout for the test, as it would hang +// // forever right now +// let (timeout_tx, timeout_rx) = oneshot::channel(); +// thread::spawn(move || { +// thread::sleep(Duration::from_millis(1000)); +// let _ = timeout_tx.send(()); +// }); +// +// let core = local_executor::Core::new(); +// let (done_tx, done_rx) = oneshot::channel(); +// let future = Dead{done: done_tx}; +// let rx = spawn(future, &core); +// let res = core.run( +// Ok::<_, ()>(()) +// .into_future() +// .then(move |_| { +// // forget the spawned future: should keep running, i.e. hit +// // the timeout below. +// rx.forget(); +// // and wait for the spawned future to release its resources +// done_rx +// }) +// .select2(timeout_rx) +// ); +// match res { +// Err(Either::A((oneshot::Canceled, _))) => { +// panic!("forgotten dead future was canceled"); +// }, +// Ok(Either::B(((), _))) => (), // reached timeout +// _ => { +// panic!("forgotten dead future was canceled (unexpected result)"); +// }, +// } +// } |