summaryrefslogtreecommitdiffstats
path: root/vendor/futures/tests/ready_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--vendor/futures/tests/ready_queue.rs148
1 files changed, 148 insertions, 0 deletions
diff --git a/vendor/futures/tests/ready_queue.rs b/vendor/futures/tests/ready_queue.rs
new file mode 100644
index 000000000..82901327f
--- /dev/null
+++ b/vendor/futures/tests/ready_queue.rs
@@ -0,0 +1,148 @@
+use futures::channel::oneshot;
+use futures::executor::{block_on, block_on_stream};
+use futures::future;
+use futures::stream::{FuturesUnordered, StreamExt};
+use futures::task::Poll;
+use futures_test::task::noop_context;
+use std::panic::{self, AssertUnwindSafe};
+use std::sync::{Arc, Barrier};
+use std::thread;
+
+#[test]
+fn basic_usage() {
+ block_on(future::lazy(move |cx| {
+ let mut queue = FuturesUnordered::new();
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
+ let (tx3, rx3) = oneshot::channel();
+
+ queue.push(rx1);
+ queue.push(rx2);
+ queue.push(rx3);
+
+ assert!(!queue.poll_next_unpin(cx).is_ready());
+
+ tx2.send("hello").unwrap();
+
+ assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx));
+ assert!(!queue.poll_next_unpin(cx).is_ready());
+
+ tx1.send("world").unwrap();
+ tx3.send("world2").unwrap();
+
+ assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx));
+ assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
+ assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
+ }));
+}
+
+#[test]
+fn resolving_errors() {
+ block_on(future::lazy(move |cx| {
+ let mut queue = FuturesUnordered::new();
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
+ let (tx3, rx3) = oneshot::channel();
+
+ queue.push(rx1);
+ queue.push(rx2);
+ queue.push(rx3);
+
+ assert!(!queue.poll_next_unpin(cx).is_ready());
+
+ drop(tx2);
+
+ assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
+ assert!(!queue.poll_next_unpin(cx).is_ready());
+
+ drop(tx1);
+ tx3.send("world2").unwrap();
+
+ assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
+ assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
+ assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
+ }));
+}
+
+#[test]
+fn dropping_ready_queue() {
+ block_on(future::lazy(move |_| {
+ let queue = FuturesUnordered::new();
+ let (mut tx1, rx1) = oneshot::channel::<()>();
+ let (mut tx2, rx2) = oneshot::channel::<()>();
+ let (mut tx3, rx3) = oneshot::channel::<()>();
+
+ queue.push(rx1);
+ queue.push(rx2);
+ queue.push(rx3);
+
+ {
+ let cx = &mut noop_context();
+ assert!(!tx1.poll_canceled(cx).is_ready());
+ assert!(!tx2.poll_canceled(cx).is_ready());
+ assert!(!tx3.poll_canceled(cx).is_ready());
+
+ drop(queue);
+
+ assert!(tx1.poll_canceled(cx).is_ready());
+ assert!(tx2.poll_canceled(cx).is_ready());
+ assert!(tx3.poll_canceled(cx).is_ready());
+ }
+ }));
+}
+
+#[test]
+fn stress() {
+ const ITER: usize = 300;
+
+ for i in 0..ITER {
+ let n = (i % 10) + 1;
+
+ let mut queue = FuturesUnordered::new();
+
+ for _ in 0..5 {
+ let barrier = Arc::new(Barrier::new(n + 1));
+
+ for num in 0..n {
+ let barrier = barrier.clone();
+ let (tx, rx) = oneshot::channel();
+
+ queue.push(rx);
+
+ thread::spawn(move || {
+ barrier.wait();
+ tx.send(num).unwrap();
+ });
+ }
+
+ barrier.wait();
+
+ let mut sync = block_on_stream(queue);
+
+ let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect();
+
+ assert_eq!(rx.len(), n);
+
+ rx.sort_unstable();
+
+ for (i, x) in rx.into_iter().enumerate() {
+ assert_eq!(i, x);
+ }
+
+ queue = sync.into_inner();
+ }
+ }
+}
+
+#[test]
+fn panicking_future_dropped() {
+ block_on(future::lazy(move |cx| {
+ let mut queue = FuturesUnordered::new();
+ queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() }));
+
+ let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx)));
+ assert!(r.is_err());
+ assert!(queue.is_empty());
+ assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
+ }));
+}