summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.29/tests/unsync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-0.1.29/tests/unsync.rs')
-rw-r--r--third_party/rust/futures-0.1.29/tests/unsync.rs265
1 files changed, 265 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.29/tests/unsync.rs b/third_party/rust/futures-0.1.29/tests/unsync.rs
new file mode 100644
index 0000000000..3d11085980
--- /dev/null
+++ b/third_party/rust/futures-0.1.29/tests/unsync.rs
@@ -0,0 +1,265 @@
+#![cfg(feature = "use_std")]
+
+extern crate futures;
+
+mod support;
+
+use futures::prelude::*;
+use futures::unsync::oneshot;
+use futures::unsync::mpsc::{self, SendError};
+use futures::future::lazy;
+use futures::stream::{iter_ok, unfold};
+
+use support::local_executor::Core;
+
+#[test]
+fn mpsc_send_recv() {
+ let (tx, rx) = mpsc::channel::<i32>(1);
+ let mut rx = rx.wait();
+
+ tx.send(42).wait().unwrap();
+
+ assert_eq!(rx.next(), Some(Ok(42)));
+ assert_eq!(rx.next(), None);
+}
+
+#[test]
+fn mpsc_rx_notready() {
+ let (_tx, mut rx) = mpsc::channel::<i32>(1);
+
+ lazy(|| {
+ assert_eq!(rx.poll().unwrap(), Async::NotReady);
+ Ok(()) as Result<(), ()>
+ }).wait().unwrap();
+}
+
+#[test]
+fn mpsc_rx_end() {
+ let (_, mut rx) = mpsc::channel::<i32>(1);
+
+ lazy(|| {
+ assert_eq!(rx.poll().unwrap(), Async::Ready(None));
+ Ok(()) as Result<(), ()>
+ }).wait().unwrap();
+}
+
+#[test]
+fn mpsc_tx_clone_weak_rc() {
+ let (tx, mut rx) = mpsc::channel::<i32>(1); // rc = 1
+
+ let tx_clone = tx.clone(); // rc = 2
+ lazy(|| {
+ assert_eq!(rx.poll().unwrap(), Async::NotReady);
+ Ok(()) as Result<(), ()>
+ }).wait().unwrap();
+
+ drop(tx); // rc = 1
+ lazy(|| {
+ assert_eq!(rx.poll().unwrap(), Async::NotReady);
+ Ok(()) as Result<(), ()>
+ }).wait().unwrap();
+
+ drop(tx_clone); // rc = 0
+ lazy(|| {
+ assert_eq!(rx.poll().unwrap(), Async::Ready(None));
+ Ok(()) as Result<(), ()>
+ }).wait().unwrap();
+}
+
+#[test]
+fn mpsc_tx_notready() {
+ let (tx, _rx) = mpsc::channel::<i32>(1);
+ let tx = tx.send(1).wait().unwrap();
+ lazy(move || {
+ assert!(tx.send(2).poll().unwrap().is_not_ready());
+ Ok(()) as Result<(), ()>
+ }).wait().unwrap();
+}
+
+#[test]
+fn mpsc_tx_err() {
+ let (tx, _) = mpsc::channel::<i32>(1);
+ lazy(move || {
+ assert!(tx.send(2).poll().is_err());
+ Ok(()) as Result<(), ()>
+ }).wait().unwrap();
+}
+
+#[test]
+fn mpsc_backpressure() {
+ let (tx, rx) = mpsc::channel::<i32>(1);
+ lazy(move || {
+ iter_ok(vec![1, 2, 3])
+ .forward(tx)
+ .map_err(|e: SendError<i32>| panic!("{}", e))
+ .join(rx.take(3).collect().map(|xs| {
+ assert_eq!(xs, [1, 2, 3]);
+ }))
+ }).wait().unwrap();
+}
+
+#[test]
+fn mpsc_unbounded() {
+ let (tx, rx) = mpsc::unbounded::<i32>();
+ lazy(move || {
+ iter_ok(vec![1, 2, 3])
+ .forward(tx)
+ .map_err(|e: SendError<i32>| panic!("{}", e))
+ .join(rx.take(3).collect().map(|xs| {
+ assert_eq!(xs, [1, 2, 3]);
+ }))
+ }).wait().unwrap();
+}
+
+#[test]
+fn mpsc_recv_unpark() {
+ let core = Core::new();
+ let (tx, rx) = mpsc::channel::<i32>(1);
+ let tx2 = tx.clone();
+ core.spawn(rx.collect().map(|xs| assert_eq!(xs, [1, 2])));
+ core.spawn(lazy(move || tx.send(1).map(|_| ()).map_err(|e| panic!("{}", e))));
+ core.run(lazy(move || tx2.send(2))).unwrap();
+}
+
+#[test]
+fn mpsc_send_unpark() {
+ let core = Core::new();
+ let (tx, rx) = mpsc::channel::<i32>(1);
+ let (donetx, donerx) = oneshot::channel();
+ core.spawn(iter_ok(vec![1, 2]).forward(tx)
+ .then(|x: Result<_, SendError<i32>>| {
+ assert!(x.is_err());
+ donetx.send(()).unwrap();
+ Ok(())
+ }));
+ core.spawn(lazy(move || { let _ = rx; Ok(()) }));
+ core.run(donerx).unwrap();
+}
+
+#[test]
+fn spawn_sends_items() {
+ let core = Core::new();
+ let stream = unfold(0, |i| Some(Ok::<_,u8>((i, i + 1))));
+ let rx = mpsc::spawn(stream, &core, 1);
+ assert_eq!(core.run(rx.take(4).collect()).unwrap(),
+ [0, 1, 2, 3]);
+}
+
+#[test]
+fn spawn_kill_dead_stream() {
+ use std::thread;
+ use std::time::Duration;
+ use futures::future::Either;
+
+ // a stream which never returns anything (maybe a remote end isn't
+ // responding), but dropping it leads to observable side effects
+ // (like closing connections, releasing limited resources, ...)
+ #[derive(Debug)]
+ struct Dead {
+ // when dropped you should get Err(oneshot::Canceled) on the
+ // receiving end
+ done: oneshot::Sender<()>,
+ }
+ impl Stream for Dead {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ Ok(Async::NotReady)
+ }
+ }
+
+ // need to implement a timeout for the test, as it would hang
+ // forever right now
+ let (timeout_tx, timeout_rx) = futures::sync::oneshot::channel();
+ thread::spawn(move || {
+ thread::sleep(Duration::from_millis(1000));
+ let _ = timeout_tx.send(());
+ });
+
+ let core = Core::new();
+ let (done_tx, done_rx) = oneshot::channel();
+ let stream = Dead{done: done_tx};
+ let rx = mpsc::spawn(stream, &core, 1);
+ let res = core.run(
+ Ok::<_, ()>(())
+ .into_future()
+ .then(move |_| {
+ // now drop the spawned stream: maybe some timeout exceeded,
+ // or some connection on this end was closed by the remote
+ // end.
+ drop(rx);
+ // and wait for the spawned stream to release its resources
+ done_rx
+ })
+ .select2(timeout_rx)
+ );
+ match res {
+ Err(Either::A((oneshot::Canceled, _))) => (),
+ _ => {
+ panic!("dead stream wasn't canceled");
+ },
+ }
+}
+
+
+/// Test case for PR #768 (issue #766).
+/// The issue was:
+/// Given that an empty channel is polled by the Receiver, and the only Sender
+/// gets dropped without sending anything, then the Receiver would get stuck.
+
+#[test]
+fn dropped_sender_of_unused_channel_notifies_receiver() {
+ let core = Core::new();
+ type FUTURE = Box<futures::Future<Item=u8, Error=()>>;
+
+ // Constructs the channel which we want to test, and two futures which
+ // act on that channel.
+ let pair = |reverse| -> Vec<FUTURE> {
+ // This is the channel which we want to test.
+ let (tx, rx) = mpsc::channel::<u8>(1);
+ let mut futures: Vec<FUTURE> = vec![
+ Box::new(futures::stream::iter_ok(vec![])
+ .forward(tx)
+ .map_err(|_: mpsc::SendError<u8>| ())
+ .map(|_| 42)
+ ),
+ Box::new(rx.fold((), |_, _| Ok(()))
+ .map(|_| 24)
+ ),
+ ];
+ if reverse {
+ futures.reverse();
+ }
+ futures
+ };
+
+ let make_test_future = |reverse| -> Box<Future<Item=Vec<u8>, Error=()>> {
+ let f = futures::future::join_all(pair(reverse));
+
+ // Use a timeout. This is not meant to test the `sync::oneshot` but
+ // merely uses it to implement this timeout.
+ let (timeout_tx, timeout_rx) = futures::sync::oneshot::channel::<Vec<u8>>();
+ std::thread::spawn(move || {
+ std::thread::sleep(std::time::Duration::from_millis(1000));
+ let x = timeout_tx.send(vec![0]);
+ assert!(x.is_err(), "Test timed out.");
+ });
+
+ Box::new(f.select(timeout_rx.map_err(|_|()))
+ .map_err(|x| x.0)
+ .map(|x| x.0)
+ )
+ };
+
+ // The order of the tested futures is important to test fix of PR #768.
+ // We want future_2 to poll on the Receiver before the Sender is dropped.
+ let result = core.run(make_test_future(false));
+ assert!(result.is_ok());
+ assert_eq!(vec![42, 24], result.unwrap());
+
+ // Test also the other ordering:
+ let result = core.run(make_test_future(true));
+ assert!(result.is_ok());
+ assert_eq!(vec![24, 42], result.unwrap());
+}