summaryrefslogtreecommitdiffstats
path: root/vendor/futures/tests
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:20:29 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:20:29 +0000
commit631cd5845e8de329d0e227aaa707d7ea228b8f8f (patch)
treea1b87c8f8cad01cf18f7c5f57a08f102771ed303 /vendor/futures/tests
parentAdding debian version 1.69.0+dfsg1-1. (diff)
downloadrustc-631cd5845e8de329d0e227aaa707d7ea228b8f8f.tar.xz
rustc-631cd5845e8de329d0e227aaa707d7ea228b8f8f.zip
Merging upstream version 1.70.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/futures/tests')
-rw-r--r--vendor/futures/tests/async_await_macros.rs12
-rw-r--r--vendor/futures/tests/auto_traits.rs8
-rw-r--r--vendor/futures/tests/bilock.rs104
-rw-r--r--vendor/futures/tests/compat.rs1
-rw-r--r--vendor/futures/tests/eventual.rs22
-rw-r--r--vendor/futures/tests/future_join.rs32
-rw-r--r--vendor/futures/tests/future_join_all.rs25
-rw-r--r--vendor/futures/tests/future_shared.rs78
-rw-r--r--vendor/futures/tests/future_try_join_all.rs24
-rw-r--r--vendor/futures/tests/lock_mutex.rs49
-rw-r--r--vendor/futures/tests/ready_queue.rs2
-rw-r--r--vendor/futures/tests/sink.rs2
-rw-r--r--vendor/futures/tests/stream.rs388
-rw-r--r--vendor/futures/tests/stream_futures_ordered.rs88
-rw-r--r--vendor/futures/tests/stream_futures_unordered.rs16
-rw-r--r--vendor/futures/tests/stream_try_stream.rs98
16 files changed, 888 insertions, 61 deletions
diff --git a/vendor/futures/tests/async_await_macros.rs b/vendor/futures/tests/async_await_macros.rs
index ce1f3a337..82a617f2c 100644
--- a/vendor/futures/tests/async_await_macros.rs
+++ b/vendor/futures/tests/async_await_macros.rs
@@ -346,43 +346,47 @@ fn stream_select() {
});
}
+#[cfg_attr(not(target_pointer_width = "64"), ignore)]
#[test]
fn join_size() {
let fut = async {
let ready = future::ready(0i32);
join!(ready)
};
- assert_eq!(mem::size_of_val(&fut), 16);
+ assert_eq!(mem::size_of_val(&fut), 24);
let fut = async {
let ready1 = future::ready(0i32);
let ready2 = future::ready(0i32);
join!(ready1, ready2)
};
- assert_eq!(mem::size_of_val(&fut), 28);
+ assert_eq!(mem::size_of_val(&fut), 40);
}
+#[cfg_attr(not(target_pointer_width = "64"), ignore)]
#[test]
fn try_join_size() {
let fut = async {
let ready = future::ready(Ok::<i32, i32>(0));
try_join!(ready)
};
- assert_eq!(mem::size_of_val(&fut), 16);
+ assert_eq!(mem::size_of_val(&fut), 24);
let fut = async {
let ready1 = future::ready(Ok::<i32, i32>(0));
let ready2 = future::ready(Ok::<i32, i32>(0));
try_join!(ready1, ready2)
};
- assert_eq!(mem::size_of_val(&fut), 28);
+ assert_eq!(mem::size_of_val(&fut), 48);
}
+#[allow(clippy::let_underscore_future)]
#[test]
fn join_doesnt_require_unpin() {
let _ = async { join!(async {}, async {}) };
}
+#[allow(clippy::let_underscore_future)]
#[test]
fn try_join_doesnt_require_unpin() {
let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) };
diff --git a/vendor/futures/tests/auto_traits.rs b/vendor/futures/tests/auto_traits.rs
index b3d8b0077..5fc0f7d67 100644
--- a/vendor/futures/tests/auto_traits.rs
+++ b/vendor/futures/tests/auto_traits.rs
@@ -576,10 +576,10 @@ pub mod future {
// TryJoin3, TryJoin4, TryJoin5 are the same as TryJoin
- assert_impl!(TryJoinAll<SendTryFuture<()>>: Send);
+ assert_impl!(TryJoinAll<SendTryFuture<(), ()>>: Send);
assert_not_impl!(TryJoinAll<LocalTryFuture>: Send);
assert_not_impl!(TryJoinAll<SendTryFuture>: Send);
- assert_impl!(TryJoinAll<SyncTryFuture<()>>: Sync);
+ assert_impl!(TryJoinAll<SyncTryFuture<(), ()>>: Sync);
assert_not_impl!(TryJoinAll<LocalTryFuture>: Sync);
assert_not_impl!(TryJoinAll<SyncTryFuture>: Sync);
assert_impl!(TryJoinAll<PinnedTryFuture>: Unpin);
@@ -1480,10 +1480,10 @@ pub mod stream {
assert_not_impl!(PollImmediate<PinnedStream>: Unpin);
assert_impl!(ReadyChunks<SendStream<()>>: Send);
- assert_not_impl!(ReadyChunks<SendStream>: Send);
+ assert_impl!(ReadyChunks<SendStream>: Send);
assert_not_impl!(ReadyChunks<LocalStream>: Send);
assert_impl!(ReadyChunks<SyncStream<()>>: Sync);
- assert_not_impl!(ReadyChunks<SyncStream>: Sync);
+ assert_impl!(ReadyChunks<SyncStream>: Sync);
assert_not_impl!(ReadyChunks<LocalStream>: Sync);
assert_impl!(ReadyChunks<UnpinStream>: Unpin);
assert_not_impl!(ReadyChunks<PinnedStream>: Unpin);
diff --git a/vendor/futures/tests/bilock.rs b/vendor/futures/tests/bilock.rs
new file mode 100644
index 000000000..b10348784
--- /dev/null
+++ b/vendor/futures/tests/bilock.rs
@@ -0,0 +1,104 @@
+#![cfg(feature = "bilock")]
+
+use futures::executor::block_on;
+use futures::future;
+use futures::stream;
+use futures::task::{Context, Poll};
+use futures::Future;
+use futures::StreamExt;
+use futures_test::task::noop_context;
+use futures_util::lock::BiLock;
+use std::pin::Pin;
+use std::thread;
+
+#[test]
+fn smoke() {
+ let future = future::lazy(|cx| {
+ let (a, b) = BiLock::new(1);
+
+ {
+ let mut lock = match a.poll_lock(cx) {
+ Poll::Ready(l) => l,
+ Poll::Pending => panic!("poll not ready"),
+ };
+ assert_eq!(*lock, 1);
+ *lock = 2;
+
+ assert!(b.poll_lock(cx).is_pending());
+ assert!(a.poll_lock(cx).is_pending());
+ }
+
+ assert!(b.poll_lock(cx).is_ready());
+ assert!(a.poll_lock(cx).is_ready());
+
+ {
+ let lock = match b.poll_lock(cx) {
+ Poll::Ready(l) => l,
+ Poll::Pending => panic!("poll not ready"),
+ };
+ assert_eq!(*lock, 2);
+ }
+
+ assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2);
+
+ Ok::<(), ()>(())
+ });
+
+ assert_eq!(block_on(future), Ok(()));
+}
+
+#[test]
+fn concurrent() {
+ const N: usize = 10000;
+ let mut cx = noop_context();
+ let (a, b) = BiLock::new(0);
+
+ let a = Increment { a: Some(a), remaining: N };
+ let b = stream::iter(0..N).fold(b, |b, _n| async {
+ let mut g = b.lock().await;
+ *g += 1;
+ drop(g);
+ b
+ });
+
+ let t1 = thread::spawn(move || block_on(a));
+ let b = block_on(b);
+ let a = t1.join().unwrap();
+
+ match a.poll_lock(&mut cx) {
+ Poll::Ready(l) => assert_eq!(*l, 2 * N),
+ Poll::Pending => panic!("poll not ready"),
+ }
+ match b.poll_lock(&mut cx) {
+ Poll::Ready(l) => assert_eq!(*l, 2 * N),
+ Poll::Pending => panic!("poll not ready"),
+ }
+
+ assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N);
+
+ struct Increment {
+ remaining: usize,
+ a: Option<BiLock<usize>>,
+ }
+
+ impl Future for Increment {
+ type Output = BiLock<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<BiLock<usize>> {
+ loop {
+ if self.remaining == 0 {
+ return self.a.take().unwrap().into();
+ }
+
+ let a = self.a.as_mut().unwrap();
+ let mut a = match a.poll_lock(cx) {
+ Poll::Ready(l) => l,
+ Poll::Pending => return Poll::Pending,
+ };
+ *a += 1;
+ drop(a);
+ self.remaining -= 1;
+ }
+ }
+ }
+}
diff --git a/vendor/futures/tests/compat.rs b/vendor/futures/tests/compat.rs
index c4125d895..ac04a95ea 100644
--- a/vendor/futures/tests/compat.rs
+++ b/vendor/futures/tests/compat.rs
@@ -1,4 +1,5 @@
#![cfg(feature = "compat")]
+#![cfg(not(miri))] // Miri does not support epoll
use futures::compat::Future01CompatExt;
use futures::prelude::*;
diff --git a/vendor/futures/tests/eventual.rs b/vendor/futures/tests/eventual.rs
index bff000dd0..57a49b241 100644
--- a/vendor/futures/tests/eventual.rs
+++ b/vendor/futures/tests/eventual.rs
@@ -16,6 +16,8 @@ fn join1() {
run(future::try_join(ok::<i32, i32>(1), ok(2)).map_ok(move |v| tx.send(v).unwrap()));
assert_eq!(rx.recv(), Ok((1, 2)));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -30,6 +32,8 @@ fn join2() {
c2.send(2).unwrap();
assert_eq!(rx.recv(), Ok((1, 2)));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -43,6 +47,8 @@ fn join3() {
assert_eq!(rx.recv(), Ok(1));
assert!(rx.recv().is_err());
drop(c2);
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -56,6 +62,8 @@ fn join4() {
assert!(rx.recv().is_ok());
drop(c2);
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -73,6 +81,8 @@ fn join5() {
c3.send(3).unwrap();
assert_eq!(rx.recv(), Ok(((1, 2), 3)));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -92,6 +102,8 @@ fn select1() {
c2.send(2).unwrap();
assert_eq!(rx.recv(), Ok(2));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -111,6 +123,8 @@ fn select2() {
c2.send(2).unwrap();
assert_eq!(rx.recv(), Ok(2));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -130,10 +144,14 @@ fn select3() {
drop(c2);
assert_eq!(rx.recv(), Ok(2));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
fn select4() {
+ const N: usize = if cfg!(miri) { 100 } else { 10000 };
+
let (tx, rx) = mpsc::channel::<oneshot::Sender<i32>>();
let t = thread::spawn(move || {
@@ -143,7 +161,7 @@ fn select4() {
});
let (tx2, rx2) = mpsc::channel();
- for _ in 0..10000 {
+ for _ in 0..N {
let (c1, p1) = oneshot::channel::<i32>();
let (c2, p2) = oneshot::channel::<i32>();
@@ -156,4 +174,6 @@ fn select4() {
drop(tx);
t.join().unwrap();
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
diff --git a/vendor/futures/tests/future_join.rs b/vendor/futures/tests/future_join.rs
new file mode 100644
index 000000000..f5df9d777
--- /dev/null
+++ b/vendor/futures/tests/future_join.rs
@@ -0,0 +1,32 @@
+use futures::executor::block_on;
+use futures::future::Future;
+use std::task::Poll;
+
+/// This tests verifies (through miri) that self-referencing
+/// futures are not invalidated when joining them.
+#[test]
+fn futures_join_macro_self_referential() {
+ block_on(async { futures::join!(yield_now(), trouble()) });
+}
+
+async fn trouble() {
+ let lucky_number = 42;
+ let problematic_variable = &lucky_number;
+
+ yield_now().await;
+
+ // problematic dereference
+ let _ = { *problematic_variable };
+}
+
+fn yield_now() -> impl Future<Output = ()> {
+ let mut yielded = false;
+ std::future::poll_fn(move |cx| {
+ if core::mem::replace(&mut yielded, true) {
+ Poll::Ready(())
+ } else {
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ })
+}
diff --git a/vendor/futures/tests/future_join_all.rs b/vendor/futures/tests/future_join_all.rs
index ae05a21b7..44486e1ca 100644
--- a/vendor/futures/tests/future_join_all.rs
+++ b/vendor/futures/tests/future_join_all.rs
@@ -1,22 +1,24 @@
use futures::executor::block_on;
use futures::future::{join_all, ready, Future, JoinAll};
+use futures::pin_mut;
use std::fmt::Debug;
-fn assert_done<T, F>(actual_fut: F, expected: T)
+#[track_caller]
+fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T)
where
T: PartialEq + Debug,
- F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
{
- let output = block_on(actual_fut());
+ pin_mut!(actual_fut);
+ let output = block_on(actual_fut);
assert_eq!(output, expected);
}
#[test]
fn collect_collects() {
- assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]);
- assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]);
+ assert_done(join_all(vec![ready(1), ready(2)]), vec![1, 2]);
+ assert_done(join_all(vec![ready(1)]), vec![1]);
// REVIEW: should this be implemented?
- // assert_done(|| Box::new(join_all(Vec::<i32>::new())), vec![]);
+ // assert_done(join_all(Vec::<i32>::new()), vec![]);
// TODO: needs more tests
}
@@ -25,18 +27,15 @@ fn collect_collects() {
fn join_all_iter_lifetime() {
// In futures-rs version 0.1, this function would fail to typecheck due to an overly
// conservative type parameterization of `JoinAll`.
- fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Vec<usize>> + Unpin> {
+ fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Vec<usize>> {
let iter = bufs.into_iter().map(|b| ready::<usize>(b.len()));
- Box::new(join_all(iter))
+ join_all(iter)
}
- assert_done(|| sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]);
+ assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]);
}
#[test]
fn join_all_from_iter() {
- assert_done(
- || Box::new(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>()),
- vec![1, 2],
- )
+ assert_done(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>(), vec![1, 2])
}
diff --git a/vendor/futures/tests/future_shared.rs b/vendor/futures/tests/future_shared.rs
index 718d6c41b..bd69c1d7c 100644
--- a/vendor/futures/tests/future_shared.rs
+++ b/vendor/futures/tests/future_shared.rs
@@ -3,6 +3,7 @@ use futures::executor::{block_on, LocalPool};
use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt};
use futures::task::LocalSpawn;
use std::cell::{Cell, RefCell};
+use std::panic::AssertUnwindSafe;
use std::rc::Rc;
use std::task::Poll;
use std::thread;
@@ -151,6 +152,52 @@ fn downgrade() {
}
#[test]
+fn ptr_eq() {
+ use future::FusedFuture;
+ use std::collections::hash_map::DefaultHasher;
+ use std::hash::Hasher;
+
+ let (tx, rx) = oneshot::channel::<i32>();
+ let shared = rx.shared();
+ let mut shared2 = shared.clone();
+ let mut hasher = DefaultHasher::new();
+ let mut hasher2 = DefaultHasher::new();
+
+ // Because these two futures share the same underlying future,
+ // `ptr_eq` should return true.
+ assert!(shared.ptr_eq(&shared2));
+ // Equivalence relations are symmetric
+ assert!(shared2.ptr_eq(&shared));
+
+ // If `ptr_eq` returns true, they should hash to the same value.
+ shared.ptr_hash(&mut hasher);
+ shared2.ptr_hash(&mut hasher2);
+ assert_eq!(hasher.finish(), hasher2.finish());
+
+ tx.send(42).unwrap();
+ assert_eq!(block_on(&mut shared2).unwrap(), 42);
+
+ // Now that `shared2` has completed, `ptr_eq` should return false.
+ assert!(shared2.is_terminated());
+ assert!(!shared.ptr_eq(&shared2));
+
+ // `ptr_eq` should continue to work for the other `Shared`.
+ let shared3 = shared.clone();
+ let mut hasher3 = DefaultHasher::new();
+ assert!(shared.ptr_eq(&shared3));
+
+ shared3.ptr_hash(&mut hasher3);
+ assert_eq!(hasher.finish(), hasher3.finish());
+
+ let (_tx, rx) = oneshot::channel::<i32>();
+ let shared4 = rx.shared();
+
+ // And `ptr_eq` should return false for two futures that don't share
+ // the underlying future.
+ assert!(!shared.ptr_eq(&shared4));
+}
+
+#[test]
fn dont_clone_in_single_owner_shared_future() {
let counter = CountClone(Rc::new(Cell::new(0)));
let (tx, rx) = oneshot::channel();
@@ -193,3 +240,34 @@ fn shared_future_that_wakes_itself_until_pending_is_returned() {
// has returned pending
assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ()));
}
+
+#[test]
+#[should_panic(expected = "inner future panicked during poll")]
+fn panic_while_poll() {
+ let fut = futures::future::poll_fn::<i8, _>(|_cx| panic!("test")).shared();
+
+ let fut_captured = fut.clone();
+ std::panic::catch_unwind(AssertUnwindSafe(|| {
+ block_on(fut_captured);
+ }))
+ .unwrap_err();
+
+ block_on(fut);
+}
+
+#[test]
+#[should_panic(expected = "test_marker")]
+fn poll_while_panic() {
+ struct S;
+
+ impl Drop for S {
+ fn drop(&mut self) {
+ let fut = futures::future::ready(1).shared();
+ assert_eq!(block_on(fut.clone()), 1);
+ assert_eq!(block_on(fut), 1);
+ }
+ }
+
+ let _s = S {};
+ panic!("test_marker");
+}
diff --git a/vendor/futures/tests/future_try_join_all.rs b/vendor/futures/tests/future_try_join_all.rs
index a4b3bb76a..9a824872f 100644
--- a/vendor/futures/tests/future_try_join_all.rs
+++ b/vendor/futures/tests/future_try_join_all.rs
@@ -1,24 +1,26 @@
use futures::executor::block_on;
+use futures::pin_mut;
use futures_util::future::{err, ok, try_join_all, TryJoinAll};
use std::fmt::Debug;
use std::future::Future;
-fn assert_done<T, F>(actual_fut: F, expected: T)
+#[track_caller]
+fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T)
where
T: PartialEq + Debug,
- F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
{
- let output = block_on(actual_fut());
+ pin_mut!(actual_fut);
+ let output = block_on(actual_fut);
assert_eq!(output, expected);
}
#[test]
fn collect_collects() {
- assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2]));
- assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2));
- assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1]));
+ assert_done(try_join_all(vec![ok(1), ok(2)]), Ok::<_, usize>(vec![1, 2]));
+ assert_done(try_join_all(vec![ok(1), err(2)]), Err(2));
+ assert_done(try_join_all(vec![ok(1)]), Ok::<_, usize>(vec![1]));
// REVIEW: should this be implemented?
- // assert_done(|| Box::new(try_join_all(Vec::<i32>::new())), Ok(vec![]));
+ // assert_done(try_join_all(Vec::<i32>::new()), Ok(vec![]));
// TODO: needs more tests
}
@@ -27,18 +29,18 @@ fn collect_collects() {
fn try_join_all_iter_lifetime() {
// In futures-rs version 0.1, this function would fail to typecheck due to an overly
// conservative type parameterization of `TryJoinAll`.
- fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Result<Vec<usize>, ()>> + Unpin> {
+ fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Result<Vec<usize>, ()>> {
let iter = bufs.into_iter().map(|b| ok::<usize, ()>(b.len()));
- Box::new(try_join_all(iter))
+ try_join_all(iter)
}
- assert_done(|| sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1]));
+ assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1]));
}
#[test]
fn try_join_all_from_iter() {
assert_done(
- || Box::new(vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>()),
+ vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>(),
Ok::<_, usize>(vec![1, 2]),
)
}
diff --git a/vendor/futures/tests/lock_mutex.rs b/vendor/futures/tests/lock_mutex.rs
index 7c33864c7..c15e76bd8 100644
--- a/vendor/futures/tests/lock_mutex.rs
+++ b/vendor/futures/tests/lock_mutex.rs
@@ -36,31 +36,34 @@ fn mutex_wakes_waiters() {
#[test]
fn mutex_contested() {
- let (tx, mut rx) = mpsc::unbounded();
- let pool = ThreadPool::builder().pool_size(16).create().unwrap();
+ {
+ let (tx, mut rx) = mpsc::unbounded();
+ let pool = ThreadPool::builder().pool_size(16).create().unwrap();
- let tx = Arc::new(tx);
- let mutex = Arc::new(Mutex::new(0));
+ let tx = Arc::new(tx);
+ let mutex = Arc::new(Mutex::new(0));
- let num_tasks = 1000;
- for _ in 0..num_tasks {
- let tx = tx.clone();
- let mutex = mutex.clone();
- pool.spawn(async move {
- let mut lock = mutex.lock().await;
- ready(()).pending_once().await;
- *lock += 1;
- tx.unbounded_send(()).unwrap();
- drop(lock);
- })
- .unwrap();
- }
-
- block_on(async {
+ let num_tasks = 1000;
for _ in 0..num_tasks {
- rx.next().await.unwrap();
+ let tx = tx.clone();
+ let mutex = mutex.clone();
+ pool.spawn(async move {
+ let mut lock = mutex.lock().await;
+ ready(()).pending_once().await;
+ *lock += 1;
+ tx.unbounded_send(()).unwrap();
+ drop(lock);
+ })
+ .unwrap();
}
- let lock = mutex.lock().await;
- assert_eq!(num_tasks, *lock);
- })
+
+ block_on(async {
+ for _ in 0..num_tasks {
+ rx.next().await.unwrap();
+ }
+ let lock = mutex.lock().await;
+ assert_eq!(num_tasks, *lock);
+ });
+ }
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
diff --git a/vendor/futures/tests/ready_queue.rs b/vendor/futures/tests/ready_queue.rs
index 82901327f..c19d62593 100644
--- a/vendor/futures/tests/ready_queue.rs
+++ b/vendor/futures/tests/ready_queue.rs
@@ -93,7 +93,7 @@ fn dropping_ready_queue() {
#[test]
fn stress() {
- const ITER: usize = 300;
+ const ITER: usize = if cfg!(miri) { 30 } else { 300 };
for i in 0..ITER {
let n = (i % 10) + 1;
diff --git a/vendor/futures/tests/sink.rs b/vendor/futures/tests/sink.rs
index f3cf11b93..5b691e74c 100644
--- a/vendor/futures/tests/sink.rs
+++ b/vendor/futures/tests/sink.rs
@@ -138,7 +138,7 @@ impl<T: Unpin> ManualFlush<T> {
for task in self.waiting_tasks.drain(..) {
task.wake()
}
- mem::replace(&mut self.data, Vec::new())
+ mem::take(&mut self.data)
}
}
diff --git a/vendor/futures/tests/stream.rs b/vendor/futures/tests/stream.rs
index 0d453d175..79d8e233c 100644
--- a/vendor/futures/tests/stream.rs
+++ b/vendor/futures/tests/stream.rs
@@ -1,10 +1,20 @@
+use std::cell::Cell;
+use std::iter;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::task::Context;
+
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::{self, Future};
+use futures::lock::Mutex;
use futures::sink::SinkExt;
use futures::stream::{self, StreamExt};
use futures::task::Poll;
-use futures::FutureExt;
+use futures::{ready, FutureExt};
+use futures_core::Stream;
+use futures_executor::ThreadPool;
use futures_test::task::noop_context;
#[test]
@@ -50,6 +60,345 @@ fn scan() {
}
#[test]
+fn flatten_unordered() {
+ use futures::executor::block_on;
+ use futures::stream::*;
+ use futures::task::*;
+ use std::convert::identity;
+ use std::pin::Pin;
+ use std::sync::atomic::{AtomicBool, Ordering};
+ use std::thread;
+ use std::time::Duration;
+
+ struct DataStream {
+ data: Vec<u8>,
+ polled: bool,
+ wake_immediately: bool,
+ }
+
+ impl Stream for DataStream {
+ type Item = u8;
+
+ fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
+ if !self.polled {
+ if !self.wake_immediately {
+ let waker = ctx.waker().clone();
+ let sleep_time =
+ Duration::from_millis(*self.data.first().unwrap_or(&0) as u64 / 10);
+ thread::spawn(move || {
+ thread::sleep(sleep_time);
+ waker.wake_by_ref();
+ });
+ } else {
+ ctx.waker().wake_by_ref();
+ }
+ self.polled = true;
+ Poll::Pending
+ } else {
+ self.polled = false;
+ Poll::Ready(self.data.pop())
+ }
+ }
+ }
+
+ struct Interchanger {
+ polled: bool,
+ base: u8,
+ wake_immediately: bool,
+ }
+
+ impl Stream for Interchanger {
+ type Item = DataStream;
+
+ fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
+ if !self.polled {
+ self.polled = true;
+ if !self.wake_immediately {
+ let waker = ctx.waker().clone();
+ let sleep_time = Duration::from_millis(self.base as u64);
+ thread::spawn(move || {
+ thread::sleep(sleep_time);
+ waker.wake_by_ref();
+ });
+ } else {
+ ctx.waker().wake_by_ref();
+ }
+ Poll::Pending
+ } else {
+ let data: Vec<_> = (0..6).rev().map(|v| v + self.base * 6).collect();
+ self.base += 1;
+ self.polled = false;
+ Poll::Ready(Some(DataStream {
+ polled: false,
+ data,
+ wake_immediately: self.wake_immediately && self.base % 2 == 0,
+ }))
+ }
+ }
+ }
+
+ // basic behaviour
+ {
+ block_on(async {
+ let st = stream::iter(vec![
+ stream::iter(0..=4u8),
+ stream::iter(6..=10),
+ stream::iter(10..=12),
+ ]);
+
+ let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await;
+
+ assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]);
+ });
+
+ block_on(async {
+ let st = stream::iter(vec![
+ stream::iter(0..=4u8),
+ stream::iter(6..=10),
+ stream::iter(0..=2),
+ ]);
+
+ let mut fm_unordered = st
+ .flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0)))
+ .collect::<Vec<_>>()
+ .await;
+
+ fm_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]);
+ });
+ }
+
+ // wake up immediately
+ {
+ block_on(async {
+ let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
+ .take(10)
+ .map(|s| s.map(identity))
+ .flatten_unordered(10)
+ .collect::<Vec<_>>()
+ .await;
+
+ fl_unordered.sort_unstable();
+
+ assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
+ });
+
+ block_on(async {
+ let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity))
+ .collect::<Vec<_>>()
+ .await;
+
+ fm_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+ });
+ }
+
+ // wake up after delay
+ {
+ block_on(async {
+ let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .map(|s| s.map(identity))
+ .flatten_unordered(10)
+ .collect::<Vec<_>>()
+ .await;
+
+ fl_unordered.sort_unstable();
+
+ assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
+ });
+
+ block_on(async {
+ let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity))
+ .collect::<Vec<_>>()
+ .await;
+
+ fm_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+ });
+
+ block_on(async {
+ let (mut fm_unordered, mut fl_unordered) = futures_util::join!(
+ Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity))
+ .collect::<Vec<_>>(),
+ Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .map(|s| s.map(identity))
+ .flatten_unordered(10)
+ .collect::<Vec<_>>()
+ );
+
+ fm_unordered.sort_unstable();
+ fl_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, fl_unordered);
+ assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+ });
+ }
+
+ // waker panics
+ {
+ let stream = Arc::new(Mutex::new(
+ Interchanger { polled: false, base: 0, wake_immediately: true }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity)),
+ ));
+
+ struct PanicWaker;
+
+ impl ArcWake for PanicWaker {
+ fn wake_by_ref(_arc_self: &Arc<Self>) {
+ panic!("WAKE UP");
+ }
+ }
+
+ std::thread::spawn({
+ let stream = stream.clone();
+ move || {
+ let mut st = poll_fn(|cx| {
+ let mut lock = ready!(stream.lock().poll_unpin(cx));
+
+ let panic_waker = waker(Arc::new(PanicWaker));
+ let mut panic_cx = Context::from_waker(&panic_waker);
+ let _ = ready!(lock.poll_next_unpin(&mut panic_cx));
+
+ Poll::Ready(Some(()))
+ });
+
+ block_on(st.next())
+ }
+ })
+ .join()
+ .unwrap_err();
+
+ block_on(async move {
+ let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
+ values.sort_unstable();
+
+ assert_eq!(values, (0..60).collect::<Vec<u8>>());
+ });
+ }
+
+ // stream panics
+ {
+ let st = stream::iter(iter::once(
+ once(Box::pin(async { panic!("Polled") })).left_stream::<DataStream>(),
+ ))
+ .chain(
+ Interchanger { polled: false, base: 0, wake_immediately: true }
+ .map(|stream| stream.right_stream())
+ .take(10),
+ );
+
+ let stream = Arc::new(Mutex::new(st.flatten_unordered(10)));
+
+ std::thread::spawn({
+ let stream = stream.clone();
+ move || {
+ let mut st = poll_fn(|cx| {
+ let mut lock = ready!(stream.lock().poll_unpin(cx));
+ let data = ready!(lock.poll_next_unpin(cx));
+
+ Poll::Ready(data)
+ });
+
+ block_on(st.next())
+ }
+ })
+ .join()
+ .unwrap_err();
+
+ block_on(async move {
+ let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
+ values.sort_unstable();
+
+ assert_eq!(values, (0..60).collect::<Vec<u8>>());
+ });
+ }
+
+ fn timeout<I: Clone>(time: Duration, value: I) -> impl Future<Output = I> {
+ let ready = Arc::new(AtomicBool::new(false));
+ let mut spawned = false;
+
+ future::poll_fn(move |cx| {
+ if !spawned {
+ let waker = cx.waker().clone();
+ let ready = ready.clone();
+
+ std::thread::spawn(move || {
+ std::thread::sleep(time);
+ ready.store(true, Ordering::Release);
+
+ waker.wake_by_ref()
+ });
+ spawned = true;
+ }
+
+ if ready.load(Ordering::Acquire) {
+ Poll::Ready(value.clone())
+ } else {
+ Poll::Pending
+ }
+ })
+ }
+
+ fn build_nested_fu<S: Stream + Unpin>(st: S) -> impl Stream<Item = S::Item> + Unpin
+ where
+ S::Item: Clone,
+ {
+ let inner = st
+ .then(|item| timeout(Duration::from_millis(50), item))
+ .enumerate()
+ .map(|(idx, value)| {
+ stream::once(if idx % 2 == 0 {
+ future::ready(value).left_future()
+ } else {
+ timeout(Duration::from_millis(100), value).right_future()
+ })
+ })
+ .flatten_unordered(None);
+
+ stream::once(future::ready(inner)).flatten_unordered(None)
+ }
+
+ // nested `flatten_unordered`
+ let te = ThreadPool::new().unwrap();
+ let base_handle = te
+ .spawn_with_handle(async move {
+ let fu = build_nested_fu(stream::iter(1..=10));
+
+ assert_eq!(fu.count().await, 10);
+ })
+ .unwrap();
+
+ block_on(base_handle);
+
+ let empty_state_move_handle = te
+ .spawn_with_handle(async move {
+ let mut fu = build_nested_fu(stream::iter(1..10));
+ {
+ let mut cx = noop_context();
+ let _ = fu.poll_next_unpin(&mut cx);
+ let _ = fu.poll_next_unpin(&mut cx);
+ }
+
+ assert_eq!(fu.count().await, 9);
+ })
+ .unwrap();
+
+ block_on(empty_state_move_handle);
+}
+
+#[test]
fn take_until() {
fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> {
let mut i = 0;
@@ -149,3 +498,40 @@ fn ready_chunks() {
assert_eq!(s.next().await.unwrap(), vec![4]);
});
}
+
+struct SlowStream {
+ times_should_poll: usize,
+ times_polled: Rc<Cell<usize>>,
+}
+impl Stream for SlowStream {
+ type Item = usize;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.times_polled.set(self.times_polled.get() + 1);
+ if self.times_polled.get() % 2 == 0 {
+ cx.waker().wake_by_ref();
+ return Poll::Pending;
+ }
+ if self.times_polled.get() >= self.times_should_poll {
+ return Poll::Ready(None);
+ }
+ Poll::Ready(Some(self.times_polled.get()))
+ }
+}
+
+#[test]
+fn select_with_strategy_doesnt_terminate_early() {
+ for side in [stream::PollNext::Left, stream::PollNext::Right] {
+ let times_should_poll = 10;
+ let count = Rc::new(Cell::new(0));
+ let b = stream::iter([10, 20]);
+
+ let mut selected = stream::select_with_strategy(
+ SlowStream { times_should_poll, times_polled: count.clone() },
+ b,
+ |_: &mut ()| side,
+ );
+ block_on(async move { while selected.next().await.is_some() {} });
+ assert_eq!(count.get(), times_should_poll + 1);
+ }
+}
diff --git a/vendor/futures/tests/stream_futures_ordered.rs b/vendor/futures/tests/stream_futures_ordered.rs
index 7506c65a6..5a4a3e22e 100644
--- a/vendor/futures/tests/stream_futures_ordered.rs
+++ b/vendor/futures/tests/stream_futures_ordered.rs
@@ -2,6 +2,7 @@ use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{self, join, Future, FutureExt, TryFutureExt};
use futures::stream::{FuturesOrdered, StreamExt};
+use futures::task::Poll;
use futures_test::task::noop_context;
use std::any::Any;
@@ -46,6 +47,69 @@ fn works_2() {
}
#[test]
+fn test_push_front() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+ let (d_tx, d_rx) = oneshot::channel::<i32>();
+
+ let mut stream = FuturesOrdered::new();
+
+ let mut cx = noop_context();
+
+ stream.push_back(a_rx);
+ stream.push_back(b_rx);
+ stream.push_back(c_rx);
+
+ a_tx.send(1).unwrap();
+ b_tx.send(2).unwrap();
+ c_tx.send(3).unwrap();
+
+ // 1 and 2 should be received in order
+ assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
+
+ stream.push_front(d_rx);
+ d_tx.send(4).unwrap();
+
+ // we pushed `d_rx` to the front and sent 4, so we should recieve 4 next
+ // and then 3 after it
+ assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
+}
+
+#[test]
+fn test_push_back() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+ let (d_tx, d_rx) = oneshot::channel::<i32>();
+
+ let mut stream = FuturesOrdered::new();
+
+ let mut cx = noop_context();
+
+ stream.push_back(a_rx);
+ stream.push_back(b_rx);
+ stream.push_back(c_rx);
+
+ a_tx.send(1).unwrap();
+ b_tx.send(2).unwrap();
+ c_tx.send(3).unwrap();
+
+ // All results should be received in order
+
+ assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
+
+ stream.push_back(d_rx);
+ d_tx.send(4).unwrap();
+
+ assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
+}
+
+#[test]
fn from_iterator() {
let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]
.into_iter()
@@ -82,3 +146,27 @@ fn queue_never_unblocked() {
assert!(stream.poll_next_unpin(cx).is_pending());
assert!(stream.poll_next_unpin(cx).is_pending());
}
+
+#[test]
+fn test_push_front_negative() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut stream = FuturesOrdered::new();
+
+ let mut cx = noop_context();
+
+ stream.push_front(a_rx);
+ stream.push_front(b_rx);
+ stream.push_front(c_rx);
+
+ a_tx.send(1).unwrap();
+ b_tx.send(2).unwrap();
+ c_tx.send(3).unwrap();
+
+ // These should all be recieved in reverse order
+ assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
+}
diff --git a/vendor/futures/tests/stream_futures_unordered.rs b/vendor/futures/tests/stream_futures_unordered.rs
index 439c809be..b56828047 100644
--- a/vendor/futures/tests/stream_futures_unordered.rs
+++ b/vendor/futures/tests/stream_futures_unordered.rs
@@ -260,6 +260,20 @@ fn into_iter_len() {
}
#[test]
+fn into_iter_partial() {
+ let stream = vec![future::ready(1), future::ready(2), future::ready(3), future::ready(4)]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+
+ let mut into_iter = stream.into_iter();
+ assert!(into_iter.next().is_some());
+ assert!(into_iter.next().is_some());
+ assert!(into_iter.next().is_some());
+ assert_eq!(into_iter.len(), 1);
+ // don't panic when iterator is dropped before completing
+}
+
+#[test]
fn futures_not_moved_after_poll() {
// Future that will be ready after being polled twice,
// asserting that it does not move.
@@ -340,7 +354,7 @@ fn polled_only_once_at_most_per_iteration() {
let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
assert!(tasks.poll_next_unpin(cx).is_pending());
- assert_eq!(32, tasks.iter().filter(|f| f.polled).count());
+ assert_eq!(33, tasks.iter().filter(|f| f.polled).count());
let mut tasks = FuturesUnordered::<F>::new();
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
diff --git a/vendor/futures/tests/stream_try_stream.rs b/vendor/futures/tests/stream_try_stream.rs
index 194e74db7..b3d04b920 100644
--- a/vendor/futures/tests/stream_try_stream.rs
+++ b/vendor/futures/tests/stream_try_stream.rs
@@ -1,7 +1,12 @@
+use core::pin::Pin;
+
use futures::{
- stream::{self, StreamExt, TryStreamExt},
+ stream::{self, repeat, Repeat, StreamExt, TryStreamExt},
task::Poll,
+ Stream,
};
+use futures_executor::block_on;
+use futures_task::Context;
use futures_test::task::noop_context;
#[test]
@@ -36,3 +41,94 @@ fn try_take_while_after_err() {
.boxed();
assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
}
+
+#[test]
+fn try_flatten_unordered() {
+ let test_st = stream::iter(1..7)
+ .map(|val: u32| {
+ if val % 2 == 0 {
+ Ok(stream::unfold((val, 1), |(val, pow)| async move {
+ Some((val.pow(pow), (val, pow + 1)))
+ })
+ .take(3)
+ .map(move |val| if val % 16 != 0 { Ok(val) } else { Err(val) }))
+ } else {
+ Err(val)
+ }
+ })
+ .map_ok(Box::pin)
+ .try_flatten_unordered(None);
+
+ block_on(async move {
+ assert_eq!(
+ // All numbers can be divided by 16 and odds must be `Err`
+ // For all basic evens we must have powers from 1 to 3
+ vec![
+ Err(1),
+ Err(3),
+ Err(5),
+ Ok(2),
+ Ok(4),
+ Ok(6),
+ Ok(4),
+ Err(16),
+ Ok(36),
+ Ok(8),
+ Err(64),
+ Ok(216)
+ ],
+ test_st.collect::<Vec<_>>().await
+ )
+ });
+
+ #[derive(Clone, Debug)]
+ struct ErrorStream {
+ error_after: usize,
+ polled: usize,
+ }
+
+ impl Stream for ErrorStream {
+ type Item = Result<Repeat<Result<(), ()>>, ()>;
+
+ fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
+ if self.polled > self.error_after {
+ panic!("Polled after error");
+ } else {
+ let out =
+ if self.polled == self.error_after { Err(()) } else { Ok(repeat(Ok(()))) };
+ self.polled += 1;
+ Poll::Ready(Some(out))
+ }
+ }
+ }
+
+ block_on(async move {
+ let mut st = ErrorStream { error_after: 3, polled: 0 }.try_flatten_unordered(None);
+ let mut ctr = 0;
+ while (st.try_next().await).is_ok() {
+ ctr += 1;
+ }
+ assert_eq!(ctr, 0);
+
+ assert_eq!(
+ ErrorStream { error_after: 10, polled: 0 }
+ .try_flatten_unordered(None)
+ .inspect_ok(|_| panic!("Unexpected `Ok`"))
+ .try_collect::<Vec<_>>()
+ .await,
+ Err(())
+ );
+
+ let mut taken = 0;
+ assert_eq!(
+ ErrorStream { error_after: 10, polled: 0 }
+ .map_ok(|st| st.take(3))
+ .try_flatten_unordered(1)
+ .inspect(|_| taken += 1)
+ .try_fold((), |(), res| async move { Ok(res) })
+ .await,
+ Err(())
+ );
+ assert_eq!(taken, 31);
+ })
+}