diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:20:29 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:20:29 +0000 |
commit | 631cd5845e8de329d0e227aaa707d7ea228b8f8f (patch) | |
tree | a1b87c8f8cad01cf18f7c5f57a08f102771ed303 /vendor/futures/tests | |
parent | Adding debian version 1.69.0+dfsg1-1. (diff) | |
download | rustc-631cd5845e8de329d0e227aaa707d7ea228b8f8f.tar.xz rustc-631cd5845e8de329d0e227aaa707d7ea228b8f8f.zip |
Merging upstream version 1.70.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/futures/tests')
-rw-r--r-- | vendor/futures/tests/async_await_macros.rs | 12 | ||||
-rw-r--r-- | vendor/futures/tests/auto_traits.rs | 8 | ||||
-rw-r--r-- | vendor/futures/tests/bilock.rs | 104 | ||||
-rw-r--r-- | vendor/futures/tests/compat.rs | 1 | ||||
-rw-r--r-- | vendor/futures/tests/eventual.rs | 22 | ||||
-rw-r--r-- | vendor/futures/tests/future_join.rs | 32 | ||||
-rw-r--r-- | vendor/futures/tests/future_join_all.rs | 25 | ||||
-rw-r--r-- | vendor/futures/tests/future_shared.rs | 78 | ||||
-rw-r--r-- | vendor/futures/tests/future_try_join_all.rs | 24 | ||||
-rw-r--r-- | vendor/futures/tests/lock_mutex.rs | 49 | ||||
-rw-r--r-- | vendor/futures/tests/ready_queue.rs | 2 | ||||
-rw-r--r-- | vendor/futures/tests/sink.rs | 2 | ||||
-rw-r--r-- | vendor/futures/tests/stream.rs | 388 | ||||
-rw-r--r-- | vendor/futures/tests/stream_futures_ordered.rs | 88 | ||||
-rw-r--r-- | vendor/futures/tests/stream_futures_unordered.rs | 16 | ||||
-rw-r--r-- | vendor/futures/tests/stream_try_stream.rs | 98 |
16 files changed, 888 insertions, 61 deletions
diff --git a/vendor/futures/tests/async_await_macros.rs b/vendor/futures/tests/async_await_macros.rs index ce1f3a337..82a617f2c 100644 --- a/vendor/futures/tests/async_await_macros.rs +++ b/vendor/futures/tests/async_await_macros.rs @@ -346,43 +346,47 @@ fn stream_select() { }); } +#[cfg_attr(not(target_pointer_width = "64"), ignore)] #[test] fn join_size() { let fut = async { let ready = future::ready(0i32); join!(ready) }; - assert_eq!(mem::size_of_val(&fut), 16); + assert_eq!(mem::size_of_val(&fut), 24); let fut = async { let ready1 = future::ready(0i32); let ready2 = future::ready(0i32); join!(ready1, ready2) }; - assert_eq!(mem::size_of_val(&fut), 28); + assert_eq!(mem::size_of_val(&fut), 40); } +#[cfg_attr(not(target_pointer_width = "64"), ignore)] #[test] fn try_join_size() { let fut = async { let ready = future::ready(Ok::<i32, i32>(0)); try_join!(ready) }; - assert_eq!(mem::size_of_val(&fut), 16); + assert_eq!(mem::size_of_val(&fut), 24); let fut = async { let ready1 = future::ready(Ok::<i32, i32>(0)); let ready2 = future::ready(Ok::<i32, i32>(0)); try_join!(ready1, ready2) }; - assert_eq!(mem::size_of_val(&fut), 28); + assert_eq!(mem::size_of_val(&fut), 48); } +#[allow(clippy::let_underscore_future)] #[test] fn join_doesnt_require_unpin() { let _ = async { join!(async {}, async {}) }; } +#[allow(clippy::let_underscore_future)] #[test] fn try_join_doesnt_require_unpin() { let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) }; diff --git a/vendor/futures/tests/auto_traits.rs b/vendor/futures/tests/auto_traits.rs index b3d8b0077..5fc0f7d67 100644 --- a/vendor/futures/tests/auto_traits.rs +++ b/vendor/futures/tests/auto_traits.rs @@ -576,10 +576,10 @@ pub mod future { // TryJoin3, TryJoin4, TryJoin5 are the same as TryJoin - assert_impl!(TryJoinAll<SendTryFuture<()>>: Send); + assert_impl!(TryJoinAll<SendTryFuture<(), ()>>: Send); assert_not_impl!(TryJoinAll<LocalTryFuture>: Send); assert_not_impl!(TryJoinAll<SendTryFuture>: Send); - assert_impl!(TryJoinAll<SyncTryFuture<()>>: Sync); + assert_impl!(TryJoinAll<SyncTryFuture<(), ()>>: Sync); assert_not_impl!(TryJoinAll<LocalTryFuture>: Sync); assert_not_impl!(TryJoinAll<SyncTryFuture>: Sync); assert_impl!(TryJoinAll<PinnedTryFuture>: Unpin); @@ -1480,10 +1480,10 @@ pub mod stream { assert_not_impl!(PollImmediate<PinnedStream>: Unpin); assert_impl!(ReadyChunks<SendStream<()>>: Send); - assert_not_impl!(ReadyChunks<SendStream>: Send); + assert_impl!(ReadyChunks<SendStream>: Send); assert_not_impl!(ReadyChunks<LocalStream>: Send); assert_impl!(ReadyChunks<SyncStream<()>>: Sync); - assert_not_impl!(ReadyChunks<SyncStream>: Sync); + assert_impl!(ReadyChunks<SyncStream>: Sync); assert_not_impl!(ReadyChunks<LocalStream>: Sync); assert_impl!(ReadyChunks<UnpinStream>: Unpin); assert_not_impl!(ReadyChunks<PinnedStream>: Unpin); diff --git a/vendor/futures/tests/bilock.rs b/vendor/futures/tests/bilock.rs new file mode 100644 index 000000000..b10348784 --- /dev/null +++ b/vendor/futures/tests/bilock.rs @@ -0,0 +1,104 @@ +#![cfg(feature = "bilock")] + +use futures::executor::block_on; +use futures::future; +use futures::stream; +use futures::task::{Context, Poll}; +use futures::Future; +use futures::StreamExt; +use futures_test::task::noop_context; +use futures_util::lock::BiLock; +use std::pin::Pin; +use std::thread; + +#[test] +fn smoke() { + let future = future::lazy(|cx| { + let (a, b) = BiLock::new(1); + + { + let mut lock = match a.poll_lock(cx) { + Poll::Ready(l) => l, + Poll::Pending => panic!("poll not ready"), + }; + assert_eq!(*lock, 1); + *lock = 2; + + assert!(b.poll_lock(cx).is_pending()); + assert!(a.poll_lock(cx).is_pending()); + } + + assert!(b.poll_lock(cx).is_ready()); + assert!(a.poll_lock(cx).is_ready()); + + { + let lock = match b.poll_lock(cx) { + Poll::Ready(l) => l, + Poll::Pending => panic!("poll not ready"), + }; + assert_eq!(*lock, 2); + } + + assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2); + + Ok::<(), ()>(()) + }); + + assert_eq!(block_on(future), Ok(())); +} + +#[test] +fn concurrent() { + const N: usize = 10000; + let mut cx = noop_context(); + let (a, b) = BiLock::new(0); + + let a = Increment { a: Some(a), remaining: N }; + let b = stream::iter(0..N).fold(b, |b, _n| async { + let mut g = b.lock().await; + *g += 1; + drop(g); + b + }); + + let t1 = thread::spawn(move || block_on(a)); + let b = block_on(b); + let a = t1.join().unwrap(); + + match a.poll_lock(&mut cx) { + Poll::Ready(l) => assert_eq!(*l, 2 * N), + Poll::Pending => panic!("poll not ready"), + } + match b.poll_lock(&mut cx) { + Poll::Ready(l) => assert_eq!(*l, 2 * N), + Poll::Pending => panic!("poll not ready"), + } + + assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N); + + struct Increment { + remaining: usize, + a: Option<BiLock<usize>>, + } + + impl Future for Increment { + type Output = BiLock<usize>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<BiLock<usize>> { + loop { + if self.remaining == 0 { + return self.a.take().unwrap().into(); + } + + let a = self.a.as_mut().unwrap(); + let mut a = match a.poll_lock(cx) { + Poll::Ready(l) => l, + Poll::Pending => return Poll::Pending, + }; + *a += 1; + drop(a); + self.remaining -= 1; + } + } + } +} diff --git a/vendor/futures/tests/compat.rs b/vendor/futures/tests/compat.rs index c4125d895..ac04a95ea 100644 --- a/vendor/futures/tests/compat.rs +++ b/vendor/futures/tests/compat.rs @@ -1,4 +1,5 @@ #![cfg(feature = "compat")] +#![cfg(not(miri))] // Miri does not support epoll use futures::compat::Future01CompatExt; use futures::prelude::*; diff --git a/vendor/futures/tests/eventual.rs b/vendor/futures/tests/eventual.rs index bff000dd0..57a49b241 100644 --- a/vendor/futures/tests/eventual.rs +++ b/vendor/futures/tests/eventual.rs @@ -16,6 +16,8 @@ fn join1() { run(future::try_join(ok::<i32, i32>(1), ok(2)).map_ok(move |v| tx.send(v).unwrap())); assert_eq!(rx.recv(), Ok((1, 2))); assert!(rx.recv().is_err()); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 } #[test] @@ -30,6 +32,8 @@ fn join2() { c2.send(2).unwrap(); assert_eq!(rx.recv(), Ok((1, 2))); assert!(rx.recv().is_err()); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 } #[test] @@ -43,6 +47,8 @@ fn join3() { assert_eq!(rx.recv(), Ok(1)); assert!(rx.recv().is_err()); drop(c2); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 } #[test] @@ -56,6 +62,8 @@ fn join4() { assert!(rx.recv().is_ok()); drop(c2); assert!(rx.recv().is_err()); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 } #[test] @@ -73,6 +81,8 @@ fn join5() { c3.send(3).unwrap(); assert_eq!(rx.recv(), Ok(((1, 2), 3))); assert!(rx.recv().is_err()); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 } #[test] @@ -92,6 +102,8 @@ fn select1() { c2.send(2).unwrap(); assert_eq!(rx.recv(), Ok(2)); assert!(rx.recv().is_err()); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 } #[test] @@ -111,6 +123,8 @@ fn select2() { c2.send(2).unwrap(); assert_eq!(rx.recv(), Ok(2)); assert!(rx.recv().is_err()); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 } #[test] @@ -130,10 +144,14 @@ fn select3() { drop(c2); assert_eq!(rx.recv(), Ok(2)); assert!(rx.recv().is_err()); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 } #[test] fn select4() { + const N: usize = if cfg!(miri) { 100 } else { 10000 }; + let (tx, rx) = mpsc::channel::<oneshot::Sender<i32>>(); let t = thread::spawn(move || { @@ -143,7 +161,7 @@ fn select4() { }); let (tx2, rx2) = mpsc::channel(); - for _ in 0..10000 { + for _ in 0..N { let (c1, p1) = oneshot::channel::<i32>(); let (c2, p2) = oneshot::channel::<i32>(); @@ -156,4 +174,6 @@ fn select4() { drop(tx); t.join().unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 } diff --git a/vendor/futures/tests/future_join.rs b/vendor/futures/tests/future_join.rs new file mode 100644 index 000000000..f5df9d777 --- /dev/null +++ b/vendor/futures/tests/future_join.rs @@ -0,0 +1,32 @@ +use futures::executor::block_on; +use futures::future::Future; +use std::task::Poll; + +/// This tests verifies (through miri) that self-referencing +/// futures are not invalidated when joining them. +#[test] +fn futures_join_macro_self_referential() { + block_on(async { futures::join!(yield_now(), trouble()) }); +} + +async fn trouble() { + let lucky_number = 42; + let problematic_variable = &lucky_number; + + yield_now().await; + + // problematic dereference + let _ = { *problematic_variable }; +} + +fn yield_now() -> impl Future<Output = ()> { + let mut yielded = false; + std::future::poll_fn(move |cx| { + if core::mem::replace(&mut yielded, true) { + Poll::Ready(()) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + }) +} diff --git a/vendor/futures/tests/future_join_all.rs b/vendor/futures/tests/future_join_all.rs index ae05a21b7..44486e1ca 100644 --- a/vendor/futures/tests/future_join_all.rs +++ b/vendor/futures/tests/future_join_all.rs @@ -1,22 +1,24 @@ use futures::executor::block_on; use futures::future::{join_all, ready, Future, JoinAll}; +use futures::pin_mut; use std::fmt::Debug; -fn assert_done<T, F>(actual_fut: F, expected: T) +#[track_caller] +fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T) where T: PartialEq + Debug, - F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>, { - let output = block_on(actual_fut()); + pin_mut!(actual_fut); + let output = block_on(actual_fut); assert_eq!(output, expected); } #[test] fn collect_collects() { - assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]); - assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]); + assert_done(join_all(vec![ready(1), ready(2)]), vec![1, 2]); + assert_done(join_all(vec![ready(1)]), vec![1]); // REVIEW: should this be implemented? - // assert_done(|| Box::new(join_all(Vec::<i32>::new())), vec![]); + // assert_done(join_all(Vec::<i32>::new()), vec![]); // TODO: needs more tests } @@ -25,18 +27,15 @@ fn collect_collects() { fn join_all_iter_lifetime() { // In futures-rs version 0.1, this function would fail to typecheck due to an overly // conservative type parameterization of `JoinAll`. - fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Vec<usize>> + Unpin> { + fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Vec<usize>> { let iter = bufs.into_iter().map(|b| ready::<usize>(b.len())); - Box::new(join_all(iter)) + join_all(iter) } - assert_done(|| sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]); + assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]); } #[test] fn join_all_from_iter() { - assert_done( - || Box::new(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>()), - vec![1, 2], - ) + assert_done(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>(), vec![1, 2]) } diff --git a/vendor/futures/tests/future_shared.rs b/vendor/futures/tests/future_shared.rs index 718d6c41b..bd69c1d7c 100644 --- a/vendor/futures/tests/future_shared.rs +++ b/vendor/futures/tests/future_shared.rs @@ -3,6 +3,7 @@ use futures::executor::{block_on, LocalPool}; use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt}; use futures::task::LocalSpawn; use std::cell::{Cell, RefCell}; +use std::panic::AssertUnwindSafe; use std::rc::Rc; use std::task::Poll; use std::thread; @@ -151,6 +152,52 @@ fn downgrade() { } #[test] +fn ptr_eq() { + use future::FusedFuture; + use std::collections::hash_map::DefaultHasher; + use std::hash::Hasher; + + let (tx, rx) = oneshot::channel::<i32>(); + let shared = rx.shared(); + let mut shared2 = shared.clone(); + let mut hasher = DefaultHasher::new(); + let mut hasher2 = DefaultHasher::new(); + + // Because these two futures share the same underlying future, + // `ptr_eq` should return true. + assert!(shared.ptr_eq(&shared2)); + // Equivalence relations are symmetric + assert!(shared2.ptr_eq(&shared)); + + // If `ptr_eq` returns true, they should hash to the same value. + shared.ptr_hash(&mut hasher); + shared2.ptr_hash(&mut hasher2); + assert_eq!(hasher.finish(), hasher2.finish()); + + tx.send(42).unwrap(); + assert_eq!(block_on(&mut shared2).unwrap(), 42); + + // Now that `shared2` has completed, `ptr_eq` should return false. + assert!(shared2.is_terminated()); + assert!(!shared.ptr_eq(&shared2)); + + // `ptr_eq` should continue to work for the other `Shared`. + let shared3 = shared.clone(); + let mut hasher3 = DefaultHasher::new(); + assert!(shared.ptr_eq(&shared3)); + + shared3.ptr_hash(&mut hasher3); + assert_eq!(hasher.finish(), hasher3.finish()); + + let (_tx, rx) = oneshot::channel::<i32>(); + let shared4 = rx.shared(); + + // And `ptr_eq` should return false for two futures that don't share + // the underlying future. + assert!(!shared.ptr_eq(&shared4)); +} + +#[test] fn dont_clone_in_single_owner_shared_future() { let counter = CountClone(Rc::new(Cell::new(0))); let (tx, rx) = oneshot::channel(); @@ -193,3 +240,34 @@ fn shared_future_that_wakes_itself_until_pending_is_returned() { // has returned pending assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ())); } + +#[test] +#[should_panic(expected = "inner future panicked during poll")] +fn panic_while_poll() { + let fut = futures::future::poll_fn::<i8, _>(|_cx| panic!("test")).shared(); + + let fut_captured = fut.clone(); + std::panic::catch_unwind(AssertUnwindSafe(|| { + block_on(fut_captured); + })) + .unwrap_err(); + + block_on(fut); +} + +#[test] +#[should_panic(expected = "test_marker")] +fn poll_while_panic() { + struct S; + + impl Drop for S { + fn drop(&mut self) { + let fut = futures::future::ready(1).shared(); + assert_eq!(block_on(fut.clone()), 1); + assert_eq!(block_on(fut), 1); + } + } + + let _s = S {}; + panic!("test_marker"); +} diff --git a/vendor/futures/tests/future_try_join_all.rs b/vendor/futures/tests/future_try_join_all.rs index a4b3bb76a..9a824872f 100644 --- a/vendor/futures/tests/future_try_join_all.rs +++ b/vendor/futures/tests/future_try_join_all.rs @@ -1,24 +1,26 @@ use futures::executor::block_on; +use futures::pin_mut; use futures_util::future::{err, ok, try_join_all, TryJoinAll}; use std::fmt::Debug; use std::future::Future; -fn assert_done<T, F>(actual_fut: F, expected: T) +#[track_caller] +fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T) where T: PartialEq + Debug, - F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>, { - let output = block_on(actual_fut()); + pin_mut!(actual_fut); + let output = block_on(actual_fut); assert_eq!(output, expected); } #[test] fn collect_collects() { - assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2])); - assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2)); - assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1])); + assert_done(try_join_all(vec![ok(1), ok(2)]), Ok::<_, usize>(vec![1, 2])); + assert_done(try_join_all(vec![ok(1), err(2)]), Err(2)); + assert_done(try_join_all(vec![ok(1)]), Ok::<_, usize>(vec![1])); // REVIEW: should this be implemented? - // assert_done(|| Box::new(try_join_all(Vec::<i32>::new())), Ok(vec![])); + // assert_done(try_join_all(Vec::<i32>::new()), Ok(vec![])); // TODO: needs more tests } @@ -27,18 +29,18 @@ fn collect_collects() { fn try_join_all_iter_lifetime() { // In futures-rs version 0.1, this function would fail to typecheck due to an overly // conservative type parameterization of `TryJoinAll`. - fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Result<Vec<usize>, ()>> + Unpin> { + fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Result<Vec<usize>, ()>> { let iter = bufs.into_iter().map(|b| ok::<usize, ()>(b.len())); - Box::new(try_join_all(iter)) + try_join_all(iter) } - assert_done(|| sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1])); + assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1])); } #[test] fn try_join_all_from_iter() { assert_done( - || Box::new(vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>()), + vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>(), Ok::<_, usize>(vec![1, 2]), ) } diff --git a/vendor/futures/tests/lock_mutex.rs b/vendor/futures/tests/lock_mutex.rs index 7c33864c7..c15e76bd8 100644 --- a/vendor/futures/tests/lock_mutex.rs +++ b/vendor/futures/tests/lock_mutex.rs @@ -36,31 +36,34 @@ fn mutex_wakes_waiters() { #[test] fn mutex_contested() { - let (tx, mut rx) = mpsc::unbounded(); - let pool = ThreadPool::builder().pool_size(16).create().unwrap(); + { + let (tx, mut rx) = mpsc::unbounded(); + let pool = ThreadPool::builder().pool_size(16).create().unwrap(); - let tx = Arc::new(tx); - let mutex = Arc::new(Mutex::new(0)); + let tx = Arc::new(tx); + let mutex = Arc::new(Mutex::new(0)); - let num_tasks = 1000; - for _ in 0..num_tasks { - let tx = tx.clone(); - let mutex = mutex.clone(); - pool.spawn(async move { - let mut lock = mutex.lock().await; - ready(()).pending_once().await; - *lock += 1; - tx.unbounded_send(()).unwrap(); - drop(lock); - }) - .unwrap(); - } - - block_on(async { + let num_tasks = 1000; for _ in 0..num_tasks { - rx.next().await.unwrap(); + let tx = tx.clone(); + let mutex = mutex.clone(); + pool.spawn(async move { + let mut lock = mutex.lock().await; + ready(()).pending_once().await; + *lock += 1; + tx.unbounded_send(()).unwrap(); + drop(lock); + }) + .unwrap(); } - let lock = mutex.lock().await; - assert_eq!(num_tasks, *lock); - }) + + block_on(async { + for _ in 0..num_tasks { + rx.next().await.unwrap(); + } + let lock = mutex.lock().await; + assert_eq!(num_tasks, *lock); + }); + } + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 } diff --git a/vendor/futures/tests/ready_queue.rs b/vendor/futures/tests/ready_queue.rs index 82901327f..c19d62593 100644 --- a/vendor/futures/tests/ready_queue.rs +++ b/vendor/futures/tests/ready_queue.rs @@ -93,7 +93,7 @@ fn dropping_ready_queue() { #[test] fn stress() { - const ITER: usize = 300; + const ITER: usize = if cfg!(miri) { 30 } else { 300 }; for i in 0..ITER { let n = (i % 10) + 1; diff --git a/vendor/futures/tests/sink.rs b/vendor/futures/tests/sink.rs index f3cf11b93..5b691e74c 100644 --- a/vendor/futures/tests/sink.rs +++ b/vendor/futures/tests/sink.rs @@ -138,7 +138,7 @@ impl<T: Unpin> ManualFlush<T> { for task in self.waiting_tasks.drain(..) { task.wake() } - mem::replace(&mut self.data, Vec::new()) + mem::take(&mut self.data) } } diff --git a/vendor/futures/tests/stream.rs b/vendor/futures/tests/stream.rs index 0d453d175..79d8e233c 100644 --- a/vendor/futures/tests/stream.rs +++ b/vendor/futures/tests/stream.rs @@ -1,10 +1,20 @@ +use std::cell::Cell; +use std::iter; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::Arc; +use std::task::Context; + use futures::channel::mpsc; use futures::executor::block_on; use futures::future::{self, Future}; +use futures::lock::Mutex; use futures::sink::SinkExt; use futures::stream::{self, StreamExt}; use futures::task::Poll; -use futures::FutureExt; +use futures::{ready, FutureExt}; +use futures_core::Stream; +use futures_executor::ThreadPool; use futures_test::task::noop_context; #[test] @@ -50,6 +60,345 @@ fn scan() { } #[test] +fn flatten_unordered() { + use futures::executor::block_on; + use futures::stream::*; + use futures::task::*; + use std::convert::identity; + use std::pin::Pin; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::thread; + use std::time::Duration; + + struct DataStream { + data: Vec<u8>, + polled: bool, + wake_immediately: bool, + } + + impl Stream for DataStream { + type Item = u8; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> { + if !self.polled { + if !self.wake_immediately { + let waker = ctx.waker().clone(); + let sleep_time = + Duration::from_millis(*self.data.first().unwrap_or(&0) as u64 / 10); + thread::spawn(move || { + thread::sleep(sleep_time); + waker.wake_by_ref(); + }); + } else { + ctx.waker().wake_by_ref(); + } + self.polled = true; + Poll::Pending + } else { + self.polled = false; + Poll::Ready(self.data.pop()) + } + } + } + + struct Interchanger { + polled: bool, + base: u8, + wake_immediately: bool, + } + + impl Stream for Interchanger { + type Item = DataStream; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> { + if !self.polled { + self.polled = true; + if !self.wake_immediately { + let waker = ctx.waker().clone(); + let sleep_time = Duration::from_millis(self.base as u64); + thread::spawn(move || { + thread::sleep(sleep_time); + waker.wake_by_ref(); + }); + } else { + ctx.waker().wake_by_ref(); + } + Poll::Pending + } else { + let data: Vec<_> = (0..6).rev().map(|v| v + self.base * 6).collect(); + self.base += 1; + self.polled = false; + Poll::Ready(Some(DataStream { + polled: false, + data, + wake_immediately: self.wake_immediately && self.base % 2 == 0, + })) + } + } + } + + // basic behaviour + { + block_on(async { + let st = stream::iter(vec![ + stream::iter(0..=4u8), + stream::iter(6..=10), + stream::iter(10..=12), + ]); + + let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await; + + assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]); + }); + + block_on(async { + let st = stream::iter(vec![ + stream::iter(0..=4u8), + stream::iter(6..=10), + stream::iter(0..=2), + ]); + + let mut fm_unordered = st + .flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0))) + .collect::<Vec<_>>() + .await; + + fm_unordered.sort_unstable(); + + assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]); + }); + } + + // wake up immediately + { + block_on(async { + let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true } + .take(10) + .map(|s| s.map(identity)) + .flatten_unordered(10) + .collect::<Vec<_>>() + .await; + + fl_unordered.sort_unstable(); + + assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>()); + }); + + block_on(async { + let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)) + .collect::<Vec<_>>() + .await; + + fm_unordered.sort_unstable(); + + assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>()); + }); + } + + // wake up after delay + { + block_on(async { + let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .map(|s| s.map(identity)) + .flatten_unordered(10) + .collect::<Vec<_>>() + .await; + + fl_unordered.sort_unstable(); + + assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>()); + }); + + block_on(async { + let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)) + .collect::<Vec<_>>() + .await; + + fm_unordered.sort_unstable(); + + assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>()); + }); + + block_on(async { + let (mut fm_unordered, mut fl_unordered) = futures_util::join!( + Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)) + .collect::<Vec<_>>(), + Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .map(|s| s.map(identity)) + .flatten_unordered(10) + .collect::<Vec<_>>() + ); + + fm_unordered.sort_unstable(); + fl_unordered.sort_unstable(); + + assert_eq!(fm_unordered, fl_unordered); + assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>()); + }); + } + + // waker panics + { + let stream = Arc::new(Mutex::new( + Interchanger { polled: false, base: 0, wake_immediately: true } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)), + )); + + struct PanicWaker; + + impl ArcWake for PanicWaker { + fn wake_by_ref(_arc_self: &Arc<Self>) { + panic!("WAKE UP"); + } + } + + std::thread::spawn({ + let stream = stream.clone(); + move || { + let mut st = poll_fn(|cx| { + let mut lock = ready!(stream.lock().poll_unpin(cx)); + + let panic_waker = waker(Arc::new(PanicWaker)); + let mut panic_cx = Context::from_waker(&panic_waker); + let _ = ready!(lock.poll_next_unpin(&mut panic_cx)); + + Poll::Ready(Some(())) + }); + + block_on(st.next()) + } + }) + .join() + .unwrap_err(); + + block_on(async move { + let mut values: Vec<_> = stream.lock().await.by_ref().collect().await; + values.sort_unstable(); + + assert_eq!(values, (0..60).collect::<Vec<u8>>()); + }); + } + + // stream panics + { + let st = stream::iter(iter::once( + once(Box::pin(async { panic!("Polled") })).left_stream::<DataStream>(), + )) + .chain( + Interchanger { polled: false, base: 0, wake_immediately: true } + .map(|stream| stream.right_stream()) + .take(10), + ); + + let stream = Arc::new(Mutex::new(st.flatten_unordered(10))); + + std::thread::spawn({ + let stream = stream.clone(); + move || { + let mut st = poll_fn(|cx| { + let mut lock = ready!(stream.lock().poll_unpin(cx)); + let data = ready!(lock.poll_next_unpin(cx)); + + Poll::Ready(data) + }); + + block_on(st.next()) + } + }) + .join() + .unwrap_err(); + + block_on(async move { + let mut values: Vec<_> = stream.lock().await.by_ref().collect().await; + values.sort_unstable(); + + assert_eq!(values, (0..60).collect::<Vec<u8>>()); + }); + } + + fn timeout<I: Clone>(time: Duration, value: I) -> impl Future<Output = I> { + let ready = Arc::new(AtomicBool::new(false)); + let mut spawned = false; + + future::poll_fn(move |cx| { + if !spawned { + let waker = cx.waker().clone(); + let ready = ready.clone(); + + std::thread::spawn(move || { + std::thread::sleep(time); + ready.store(true, Ordering::Release); + + waker.wake_by_ref() + }); + spawned = true; + } + + if ready.load(Ordering::Acquire) { + Poll::Ready(value.clone()) + } else { + Poll::Pending + } + }) + } + + fn build_nested_fu<S: Stream + Unpin>(st: S) -> impl Stream<Item = S::Item> + Unpin + where + S::Item: Clone, + { + let inner = st + .then(|item| timeout(Duration::from_millis(50), item)) + .enumerate() + .map(|(idx, value)| { + stream::once(if idx % 2 == 0 { + future::ready(value).left_future() + } else { + timeout(Duration::from_millis(100), value).right_future() + }) + }) + .flatten_unordered(None); + + stream::once(future::ready(inner)).flatten_unordered(None) + } + + // nested `flatten_unordered` + let te = ThreadPool::new().unwrap(); + let base_handle = te + .spawn_with_handle(async move { + let fu = build_nested_fu(stream::iter(1..=10)); + + assert_eq!(fu.count().await, 10); + }) + .unwrap(); + + block_on(base_handle); + + let empty_state_move_handle = te + .spawn_with_handle(async move { + let mut fu = build_nested_fu(stream::iter(1..10)); + { + let mut cx = noop_context(); + let _ = fu.poll_next_unpin(&mut cx); + let _ = fu.poll_next_unpin(&mut cx); + } + + assert_eq!(fu.count().await, 9); + }) + .unwrap(); + + block_on(empty_state_move_handle); +} + +#[test] fn take_until() { fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> { let mut i = 0; @@ -149,3 +498,40 @@ fn ready_chunks() { assert_eq!(s.next().await.unwrap(), vec![4]); }); } + +struct SlowStream { + times_should_poll: usize, + times_polled: Rc<Cell<usize>>, +} +impl Stream for SlowStream { + type Item = usize; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.times_polled.set(self.times_polled.get() + 1); + if self.times_polled.get() % 2 == 0 { + cx.waker().wake_by_ref(); + return Poll::Pending; + } + if self.times_polled.get() >= self.times_should_poll { + return Poll::Ready(None); + } + Poll::Ready(Some(self.times_polled.get())) + } +} + +#[test] +fn select_with_strategy_doesnt_terminate_early() { + for side in [stream::PollNext::Left, stream::PollNext::Right] { + let times_should_poll = 10; + let count = Rc::new(Cell::new(0)); + let b = stream::iter([10, 20]); + + let mut selected = stream::select_with_strategy( + SlowStream { times_should_poll, times_polled: count.clone() }, + b, + |_: &mut ()| side, + ); + block_on(async move { while selected.next().await.is_some() {} }); + assert_eq!(count.get(), times_should_poll + 1); + } +} diff --git a/vendor/futures/tests/stream_futures_ordered.rs b/vendor/futures/tests/stream_futures_ordered.rs index 7506c65a6..5a4a3e22e 100644 --- a/vendor/futures/tests/stream_futures_ordered.rs +++ b/vendor/futures/tests/stream_futures_ordered.rs @@ -2,6 +2,7 @@ use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; use futures::future::{self, join, Future, FutureExt, TryFutureExt}; use futures::stream::{FuturesOrdered, StreamExt}; +use futures::task::Poll; use futures_test::task::noop_context; use std::any::Any; @@ -46,6 +47,69 @@ fn works_2() { } #[test] +fn test_push_front() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + let (d_tx, d_rx) = oneshot::channel::<i32>(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_back(a_rx); + stream.push_back(b_rx); + stream.push_back(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // 1 and 2 should be received in order + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + + stream.push_front(d_rx); + d_tx.send(4).unwrap(); + + // we pushed `d_rx` to the front and sent 4, so we should recieve 4 next + // and then 3 after it + assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); +} + +#[test] +fn test_push_back() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + let (d_tx, d_rx) = oneshot::channel::<i32>(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_back(a_rx); + stream.push_back(b_rx); + stream.push_back(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // All results should be received in order + + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + + stream.push_back(d_rx); + d_tx.send(4).unwrap(); + + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx)); +} + +#[test] fn from_iterator() { let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)] .into_iter() @@ -82,3 +146,27 @@ fn queue_never_unblocked() { assert!(stream.poll_next_unpin(cx).is_pending()); assert!(stream.poll_next_unpin(cx).is_pending()); } + +#[test] +fn test_push_front_negative() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_front(a_rx); + stream.push_front(b_rx); + stream.push_front(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // These should all be recieved in reverse order + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); +} diff --git a/vendor/futures/tests/stream_futures_unordered.rs b/vendor/futures/tests/stream_futures_unordered.rs index 439c809be..b56828047 100644 --- a/vendor/futures/tests/stream_futures_unordered.rs +++ b/vendor/futures/tests/stream_futures_unordered.rs @@ -260,6 +260,20 @@ fn into_iter_len() { } #[test] +fn into_iter_partial() { + let stream = vec![future::ready(1), future::ready(2), future::ready(3), future::ready(4)] + .into_iter() + .collect::<FuturesUnordered<_>>(); + + let mut into_iter = stream.into_iter(); + assert!(into_iter.next().is_some()); + assert!(into_iter.next().is_some()); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 1); + // don't panic when iterator is dropped before completing +} + +#[test] fn futures_not_moved_after_poll() { // Future that will be ready after being polled twice, // asserting that it does not move. @@ -340,7 +354,7 @@ fn polled_only_once_at_most_per_iteration() { let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]); assert!(tasks.poll_next_unpin(cx).is_pending()); - assert_eq!(32, tasks.iter().filter(|f| f.polled).count()); + assert_eq!(33, tasks.iter().filter(|f| f.polled).count()); let mut tasks = FuturesUnordered::<F>::new(); assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx)); diff --git a/vendor/futures/tests/stream_try_stream.rs b/vendor/futures/tests/stream_try_stream.rs index 194e74db7..b3d04b920 100644 --- a/vendor/futures/tests/stream_try_stream.rs +++ b/vendor/futures/tests/stream_try_stream.rs @@ -1,7 +1,12 @@ +use core::pin::Pin; + use futures::{ - stream::{self, StreamExt, TryStreamExt}, + stream::{self, repeat, Repeat, StreamExt, TryStreamExt}, task::Poll, + Stream, }; +use futures_executor::block_on; +use futures_task::Context; use futures_test::task::noop_context; #[test] @@ -36,3 +41,94 @@ fn try_take_while_after_err() { .boxed(); assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); } + +#[test] +fn try_flatten_unordered() { + let test_st = stream::iter(1..7) + .map(|val: u32| { + if val % 2 == 0 { + Ok(stream::unfold((val, 1), |(val, pow)| async move { + Some((val.pow(pow), (val, pow + 1))) + }) + .take(3) + .map(move |val| if val % 16 != 0 { Ok(val) } else { Err(val) })) + } else { + Err(val) + } + }) + .map_ok(Box::pin) + .try_flatten_unordered(None); + + block_on(async move { + assert_eq!( + // All numbers can be divided by 16 and odds must be `Err` + // For all basic evens we must have powers from 1 to 3 + vec![ + Err(1), + Err(3), + Err(5), + Ok(2), + Ok(4), + Ok(6), + Ok(4), + Err(16), + Ok(36), + Ok(8), + Err(64), + Ok(216) + ], + test_st.collect::<Vec<_>>().await + ) + }); + + #[derive(Clone, Debug)] + struct ErrorStream { + error_after: usize, + polled: usize, + } + + impl Stream for ErrorStream { + type Item = Result<Repeat<Result<(), ()>>, ()>; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> { + if self.polled > self.error_after { + panic!("Polled after error"); + } else { + let out = + if self.polled == self.error_after { Err(()) } else { Ok(repeat(Ok(()))) }; + self.polled += 1; + Poll::Ready(Some(out)) + } + } + } + + block_on(async move { + let mut st = ErrorStream { error_after: 3, polled: 0 }.try_flatten_unordered(None); + let mut ctr = 0; + while (st.try_next().await).is_ok() { + ctr += 1; + } + assert_eq!(ctr, 0); + + assert_eq!( + ErrorStream { error_after: 10, polled: 0 } + .try_flatten_unordered(None) + .inspect_ok(|_| panic!("Unexpected `Ok`")) + .try_collect::<Vec<_>>() + .await, + Err(()) + ); + + let mut taken = 0; + assert_eq!( + ErrorStream { error_after: 10, polled: 0 } + .map_ok(|st| st.take(3)) + .try_flatten_unordered(1) + .inspect(|_| taken += 1) + .try_fold((), |(), res| async move { Ok(res) }) + .await, + Err(()) + ); + assert_eq!(taken, 31); + }) +} |