diff options
Diffstat (limited to 'third_party/rust/futures-0.1.29/tests/unsync.rs')
-rw-r--r-- | third_party/rust/futures-0.1.29/tests/unsync.rs | 265 |
1 files changed, 265 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.29/tests/unsync.rs b/third_party/rust/futures-0.1.29/tests/unsync.rs new file mode 100644 index 0000000000..3d11085980 --- /dev/null +++ b/third_party/rust/futures-0.1.29/tests/unsync.rs @@ -0,0 +1,265 @@ +#![cfg(feature = "use_std")] + +extern crate futures; + +mod support; + +use futures::prelude::*; +use futures::unsync::oneshot; +use futures::unsync::mpsc::{self, SendError}; +use futures::future::lazy; +use futures::stream::{iter_ok, unfold}; + +use support::local_executor::Core; + +#[test] +fn mpsc_send_recv() { + let (tx, rx) = mpsc::channel::<i32>(1); + let mut rx = rx.wait(); + + tx.send(42).wait().unwrap(); + + assert_eq!(rx.next(), Some(Ok(42))); + assert_eq!(rx.next(), None); +} + +#[test] +fn mpsc_rx_notready() { + let (_tx, mut rx) = mpsc::channel::<i32>(1); + + lazy(|| { + assert_eq!(rx.poll().unwrap(), Async::NotReady); + Ok(()) as Result<(), ()> + }).wait().unwrap(); +} + +#[test] +fn mpsc_rx_end() { + let (_, mut rx) = mpsc::channel::<i32>(1); + + lazy(|| { + assert_eq!(rx.poll().unwrap(), Async::Ready(None)); + Ok(()) as Result<(), ()> + }).wait().unwrap(); +} + +#[test] +fn mpsc_tx_clone_weak_rc() { + let (tx, mut rx) = mpsc::channel::<i32>(1); // rc = 1 + + let tx_clone = tx.clone(); // rc = 2 + lazy(|| { + assert_eq!(rx.poll().unwrap(), Async::NotReady); + Ok(()) as Result<(), ()> + }).wait().unwrap(); + + drop(tx); // rc = 1 + lazy(|| { + assert_eq!(rx.poll().unwrap(), Async::NotReady); + Ok(()) as Result<(), ()> + }).wait().unwrap(); + + drop(tx_clone); // rc = 0 + lazy(|| { + assert_eq!(rx.poll().unwrap(), Async::Ready(None)); + Ok(()) as Result<(), ()> + }).wait().unwrap(); +} + +#[test] +fn mpsc_tx_notready() { + let (tx, _rx) = mpsc::channel::<i32>(1); + let tx = tx.send(1).wait().unwrap(); + lazy(move || { + assert!(tx.send(2).poll().unwrap().is_not_ready()); + Ok(()) as Result<(), ()> + }).wait().unwrap(); +} + +#[test] +fn mpsc_tx_err() { + let (tx, _) = mpsc::channel::<i32>(1); + lazy(move || { + assert!(tx.send(2).poll().is_err()); + Ok(()) as Result<(), ()> + }).wait().unwrap(); +} + +#[test] +fn mpsc_backpressure() { + let (tx, rx) = mpsc::channel::<i32>(1); + lazy(move || { + iter_ok(vec![1, 2, 3]) + .forward(tx) + .map_err(|e: SendError<i32>| panic!("{}", e)) + .join(rx.take(3).collect().map(|xs| { + assert_eq!(xs, [1, 2, 3]); + })) + }).wait().unwrap(); +} + +#[test] +fn mpsc_unbounded() { + let (tx, rx) = mpsc::unbounded::<i32>(); + lazy(move || { + iter_ok(vec![1, 2, 3]) + .forward(tx) + .map_err(|e: SendError<i32>| panic!("{}", e)) + .join(rx.take(3).collect().map(|xs| { + assert_eq!(xs, [1, 2, 3]); + })) + }).wait().unwrap(); +} + +#[test] +fn mpsc_recv_unpark() { + let core = Core::new(); + let (tx, rx) = mpsc::channel::<i32>(1); + let tx2 = tx.clone(); + core.spawn(rx.collect().map(|xs| assert_eq!(xs, [1, 2]))); + core.spawn(lazy(move || tx.send(1).map(|_| ()).map_err(|e| panic!("{}", e)))); + core.run(lazy(move || tx2.send(2))).unwrap(); +} + +#[test] +fn mpsc_send_unpark() { + let core = Core::new(); + let (tx, rx) = mpsc::channel::<i32>(1); + let (donetx, donerx) = oneshot::channel(); + core.spawn(iter_ok(vec![1, 2]).forward(tx) + .then(|x: Result<_, SendError<i32>>| { + assert!(x.is_err()); + donetx.send(()).unwrap(); + Ok(()) + })); + core.spawn(lazy(move || { let _ = rx; Ok(()) })); + core.run(donerx).unwrap(); +} + +#[test] +fn spawn_sends_items() { + let core = Core::new(); + let stream = unfold(0, |i| Some(Ok::<_,u8>((i, i + 1)))); + let rx = mpsc::spawn(stream, &core, 1); + assert_eq!(core.run(rx.take(4).collect()).unwrap(), + [0, 1, 2, 3]); +} + +#[test] +fn spawn_kill_dead_stream() { + use std::thread; + use std::time::Duration; + use futures::future::Either; + + // a stream which never returns anything (maybe a remote end isn't + // responding), but dropping it leads to observable side effects + // (like closing connections, releasing limited resources, ...) + #[derive(Debug)] + struct Dead { + // when dropped you should get Err(oneshot::Canceled) on the + // receiving end + done: oneshot::Sender<()>, + } + impl Stream for Dead { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + Ok(Async::NotReady) + } + } + + // need to implement a timeout for the test, as it would hang + // forever right now + let (timeout_tx, timeout_rx) = futures::sync::oneshot::channel(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(1000)); + let _ = timeout_tx.send(()); + }); + + let core = Core::new(); + let (done_tx, done_rx) = oneshot::channel(); + let stream = Dead{done: done_tx}; + let rx = mpsc::spawn(stream, &core, 1); + let res = core.run( + Ok::<_, ()>(()) + .into_future() + .then(move |_| { + // now drop the spawned stream: maybe some timeout exceeded, + // or some connection on this end was closed by the remote + // end. + drop(rx); + // and wait for the spawned stream to release its resources + done_rx + }) + .select2(timeout_rx) + ); + match res { + Err(Either::A((oneshot::Canceled, _))) => (), + _ => { + panic!("dead stream wasn't canceled"); + }, + } +} + + +/// Test case for PR #768 (issue #766). +/// The issue was: +/// Given that an empty channel is polled by the Receiver, and the only Sender +/// gets dropped without sending anything, then the Receiver would get stuck. + +#[test] +fn dropped_sender_of_unused_channel_notifies_receiver() { + let core = Core::new(); + type FUTURE = Box<futures::Future<Item=u8, Error=()>>; + + // Constructs the channel which we want to test, and two futures which + // act on that channel. + let pair = |reverse| -> Vec<FUTURE> { + // This is the channel which we want to test. + let (tx, rx) = mpsc::channel::<u8>(1); + let mut futures: Vec<FUTURE> = vec![ + Box::new(futures::stream::iter_ok(vec![]) + .forward(tx) + .map_err(|_: mpsc::SendError<u8>| ()) + .map(|_| 42) + ), + Box::new(rx.fold((), |_, _| Ok(())) + .map(|_| 24) + ), + ]; + if reverse { + futures.reverse(); + } + futures + }; + + let make_test_future = |reverse| -> Box<Future<Item=Vec<u8>, Error=()>> { + let f = futures::future::join_all(pair(reverse)); + + // Use a timeout. This is not meant to test the `sync::oneshot` but + // merely uses it to implement this timeout. + let (timeout_tx, timeout_rx) = futures::sync::oneshot::channel::<Vec<u8>>(); + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(1000)); + let x = timeout_tx.send(vec![0]); + assert!(x.is_err(), "Test timed out."); + }); + + Box::new(f.select(timeout_rx.map_err(|_|())) + .map_err(|x| x.0) + .map(|x| x.0) + ) + }; + + // The order of the tested futures is important to test fix of PR #768. + // We want future_2 to poll on the Receiver before the Sender is dropped. + let result = core.run(make_test_future(false)); + assert!(result.is_ok()); + assert_eq!(vec![42, 24], result.unwrap()); + + // Test also the other ordering: + let result = core.run(make_test_future(true)); + assert!(result.is_ok()); + assert_eq!(vec![24, 42], result.unwrap()); +} |