diff options
Diffstat (limited to 'third_party/rust/futures/tests')
56 files changed, 7487 insertions, 0 deletions
diff --git a/third_party/rust/futures/tests/_require_features.rs b/third_party/rust/futures/tests/_require_features.rs new file mode 100644 index 0000000000..8046cc99a4 --- /dev/null +++ b/third_party/rust/futures/tests/_require_features.rs @@ -0,0 +1,13 @@ +#[cfg(not(all( + feature = "std", + feature = "alloc", + feature = "async-await", + feature = "compat", + feature = "io-compat", + feature = "executor", + feature = "thread-pool", +)))] +compile_error!( + "`futures` tests must have all stable features activated: \ + use `--all-features` or `--features default,thread-pool,io-compat`" +); diff --git a/third_party/rust/futures/tests/async_await_macros.rs b/third_party/rust/futures/tests/async_await_macros.rs new file mode 100644 index 0000000000..82a617f2c2 --- /dev/null +++ b/third_party/rust/futures/tests/async_await_macros.rs @@ -0,0 +1,393 @@ +use futures::channel::{mpsc, oneshot}; +use futures::executor::block_on; +use futures::future::{self, poll_fn, FutureExt}; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use futures::task::{Context, Poll}; +use futures::{ + join, pending, pin_mut, poll, select, select_biased, stream, stream_select, try_join, +}; +use std::mem; + +#[test] +fn poll_and_pending() { + let pending_once = async { pending!() }; + block_on(async { + pin_mut!(pending_once); + assert_eq!(Poll::Pending, poll!(&mut pending_once)); + assert_eq!(Poll::Ready(()), poll!(&mut pending_once)); + }); +} + +#[test] +fn join() { + let (tx1, rx1) = oneshot::channel::<i32>(); + let (tx2, rx2) = oneshot::channel::<i32>(); + + let fut = async { + let res = join!(rx1, rx2); + assert_eq!((Ok(1), Ok(2)), res); + }; + + block_on(async { + pin_mut!(fut); + assert_eq!(Poll::Pending, poll!(&mut fut)); + tx1.send(1).unwrap(); + assert_eq!(Poll::Pending, poll!(&mut fut)); + tx2.send(2).unwrap(); + assert_eq!(Poll::Ready(()), poll!(&mut fut)); + }); +} + +#[test] +fn select() { + let (tx1, rx1) = oneshot::channel::<i32>(); + let (_tx2, rx2) = oneshot::channel::<i32>(); + tx1.send(1).unwrap(); + let mut ran = false; + block_on(async { + select! { + res = rx1.fuse() => { + assert_eq!(Ok(1), res); + ran = true; + }, + _ = rx2.fuse() => unreachable!(), + } + }); + assert!(ran); +} + +#[test] +fn select_biased() { + let (tx1, rx1) = oneshot::channel::<i32>(); + let (_tx2, rx2) = oneshot::channel::<i32>(); + tx1.send(1).unwrap(); + let mut ran = false; + block_on(async { + select_biased! { + res = rx1.fuse() => { + assert_eq!(Ok(1), res); + ran = true; + }, + _ = rx2.fuse() => unreachable!(), + } + }); + assert!(ran); +} + +#[test] +fn select_streams() { + let (mut tx1, rx1) = mpsc::channel::<i32>(1); + let (mut tx2, rx2) = mpsc::channel::<i32>(1); + let mut rx1 = rx1.fuse(); + let mut rx2 = rx2.fuse(); + let mut ran = false; + let mut total = 0; + block_on(async { + let mut tx1_opt; + let mut tx2_opt; + select! { + _ = rx1.next() => panic!(), + _ = rx2.next() => panic!(), + default => { + tx1.send(2).await.unwrap(); + tx2.send(3).await.unwrap(); + tx1_opt = Some(tx1); + tx2_opt = Some(tx2); + } + complete => panic!(), + } + loop { + select! { + // runs first and again after default + x = rx1.next() => if let Some(x) = x { total += x; }, + // runs second and again after default + x = rx2.next() => if let Some(x) = x { total += x; }, + // runs third + default => { + assert_eq!(total, 5); + ran = true; + drop(tx1_opt.take().unwrap()); + drop(tx2_opt.take().unwrap()); + }, + // runs last + complete => break, + }; + } + }); + assert!(ran); +} + +#[test] +fn select_can_move_uncompleted_futures() { + let (tx1, rx1) = oneshot::channel::<i32>(); + let (tx2, rx2) = oneshot::channel::<i32>(); + tx1.send(1).unwrap(); + tx2.send(2).unwrap(); + let mut ran = false; + let mut rx1 = rx1.fuse(); + let mut rx2 = rx2.fuse(); + block_on(async { + select! { + res = rx1 => { + assert_eq!(Ok(1), res); + assert_eq!(Ok(2), rx2.await); + ran = true; + }, + res = rx2 => { + assert_eq!(Ok(2), res); + assert_eq!(Ok(1), rx1.await); + ran = true; + }, + } + }); + assert!(ran); +} + +#[test] +fn select_nested() { + let mut outer_fut = future::ready(1); + let mut inner_fut = future::ready(2); + let res = block_on(async { + select! { + x = outer_fut => { + select! { + y = inner_fut => x + y, + } + } + } + }); + assert_eq!(res, 3); +} + +#[cfg_attr(not(target_pointer_width = "64"), ignore)] +#[test] +fn select_size() { + let fut = async { + let mut ready = future::ready(0i32); + select! { + _ = ready => {}, + } + }; + assert_eq!(mem::size_of_val(&fut), 24); + + let fut = async { + let mut ready1 = future::ready(0i32); + let mut ready2 = future::ready(0i32); + select! { + _ = ready1 => {}, + _ = ready2 => {}, + } + }; + assert_eq!(mem::size_of_val(&fut), 40); +} + +#[test] +fn select_on_non_unpin_expressions() { + // The returned Future is !Unpin + let make_non_unpin_fut = || async { 5 }; + + let res = block_on(async { + let select_res; + select! { + value_1 = make_non_unpin_fut().fuse() => select_res = value_1, + value_2 = make_non_unpin_fut().fuse() => select_res = value_2, + }; + select_res + }); + assert_eq!(res, 5); +} + +#[test] +fn select_on_non_unpin_expressions_with_default() { + // The returned Future is !Unpin + let make_non_unpin_fut = || async { 5 }; + + let res = block_on(async { + let select_res; + select! { + value_1 = make_non_unpin_fut().fuse() => select_res = value_1, + value_2 = make_non_unpin_fut().fuse() => select_res = value_2, + default => select_res = 7, + }; + select_res + }); + assert_eq!(res, 5); +} + +#[cfg_attr(not(target_pointer_width = "64"), ignore)] +#[test] +fn select_on_non_unpin_size() { + // The returned Future is !Unpin + let make_non_unpin_fut = || async { 5 }; + + let fut = async { + let select_res; + select! { + value_1 = make_non_unpin_fut().fuse() => select_res = value_1, + value_2 = make_non_unpin_fut().fuse() => select_res = value_2, + }; + select_res + }; + + assert_eq!(32, mem::size_of_val(&fut)); +} + +#[test] +fn select_can_be_used_as_expression() { + block_on(async { + let res = select! { + x = future::ready(7) => x, + y = future::ready(3) => y + 1, + }; + assert!(res == 7 || res == 4); + }); +} + +#[test] +fn select_with_default_can_be_used_as_expression() { + fn poll_always_pending<T>(_cx: &mut Context<'_>) -> Poll<T> { + Poll::Pending + } + + block_on(async { + let res = select! { + x = poll_fn(poll_always_pending::<i32>).fuse() => x, + y = poll_fn(poll_always_pending::<i32>).fuse() => y + 1, + default => 99, + }; + assert_eq!(res, 99); + }); +} + +#[test] +fn select_with_complete_can_be_used_as_expression() { + block_on(async { + let res = select! { + x = future::pending::<i32>() => x, + y = future::pending::<i32>() => y + 1, + default => 99, + complete => 237, + }; + assert_eq!(res, 237); + }); +} + +#[test] +#[allow(unused_assignments)] +fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { + async fn require_mutable(_: &mut i32) {} + async fn async_noop() {} + + block_on(async { + let mut value = 234; + select! { + _ = require_mutable(&mut value).fuse() => { }, + _ = async_noop().fuse() => { + value += 5; + }, + } + }); +} + +#[test] +#[allow(unused_assignments)] +fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { + async fn require_mutable(_: &mut i32) {} + async fn async_noop() {} + + block_on(async { + let mut value = 234; + select! { + _ = require_mutable(&mut value).fuse() => { }, + _ = async_noop().fuse() => { + value += 5; + }, + default => { + value += 27; + }, + } + }); +} + +#[test] +#[allow(unused_assignments)] +fn stream_select() { + // stream_select! macro + block_on(async { + let endless_ints = |i| stream::iter(vec![i].into_iter().cycle()); + + let mut endless_ones = stream_select!(endless_ints(1i32), stream::pending()); + assert_eq!(endless_ones.next().await, Some(1)); + assert_eq!(endless_ones.next().await, Some(1)); + + let mut finite_list = + stream_select!(stream::iter(vec![1].into_iter()), stream::iter(vec![1].into_iter())); + assert_eq!(finite_list.next().await, Some(1)); + assert_eq!(finite_list.next().await, Some(1)); + assert_eq!(finite_list.next().await, None); + + let endless_mixed = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3)); + // Take 1000, and assert a somewhat even distribution of values. + // The fairness is randomized, but over 1000 samples we should be pretty close to even. + // This test may be a bit flaky. Feel free to adjust the margins as you see fit. + let mut count = 0; + let results = endless_mixed + .take_while(move |_| { + count += 1; + let ret = count < 1000; + async move { ret } + }) + .collect::<Vec<_>>() + .await; + assert!(results.iter().filter(|x| **x == 1).count() >= 299); + assert!(results.iter().filter(|x| **x == 2).count() >= 299); + assert!(results.iter().filter(|x| **x == 3).count() >= 299); + }); +} + +#[cfg_attr(not(target_pointer_width = "64"), ignore)] +#[test] +fn join_size() { + let fut = async { + let ready = future::ready(0i32); + join!(ready) + }; + assert_eq!(mem::size_of_val(&fut), 24); + + let fut = async { + let ready1 = future::ready(0i32); + let ready2 = future::ready(0i32); + join!(ready1, ready2) + }; + assert_eq!(mem::size_of_val(&fut), 40); +} + +#[cfg_attr(not(target_pointer_width = "64"), ignore)] +#[test] +fn try_join_size() { + let fut = async { + let ready = future::ready(Ok::<i32, i32>(0)); + try_join!(ready) + }; + assert_eq!(mem::size_of_val(&fut), 24); + + let fut = async { + let ready1 = future::ready(Ok::<i32, i32>(0)); + let ready2 = future::ready(Ok::<i32, i32>(0)); + try_join!(ready1, ready2) + }; + assert_eq!(mem::size_of_val(&fut), 48); +} + +#[allow(clippy::let_underscore_future)] +#[test] +fn join_doesnt_require_unpin() { + let _ = async { join!(async {}, async {}) }; +} + +#[allow(clippy::let_underscore_future)] +#[test] +fn try_join_doesnt_require_unpin() { + let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) }; +} diff --git a/third_party/rust/futures/tests/auto_traits.rs b/third_party/rust/futures/tests/auto_traits.rs new file mode 100644 index 0000000000..5fc0f7d675 --- /dev/null +++ b/third_party/rust/futures/tests/auto_traits.rs @@ -0,0 +1,1891 @@ +#![cfg(feature = "compat")] + +//! Assert Send/Sync/Unpin for all public types. + +use futures::{ + future::Future, + sink::Sink, + stream::Stream, + task::{Context, Poll}, +}; +use static_assertions::{assert_impl_all as assert_impl, assert_not_impl_all as assert_not_impl}; +use std::marker::PhantomPinned; +use std::{marker::PhantomData, pin::Pin}; + +pub type LocalFuture<T = *const ()> = Pin<Box<dyn Future<Output = T>>>; +pub type LocalTryFuture<T = *const (), E = *const ()> = LocalFuture<Result<T, E>>; +pub type SendFuture<T = *const ()> = Pin<Box<dyn Future<Output = T> + Send>>; +pub type SendTryFuture<T = *const (), E = *const ()> = SendFuture<Result<T, E>>; +pub type SyncFuture<T = *const ()> = Pin<Box<dyn Future<Output = T> + Sync>>; +pub type SyncTryFuture<T = *const (), E = *const ()> = SyncFuture<Result<T, E>>; +pub type UnpinFuture<T = PhantomPinned> = LocalFuture<T>; +pub type UnpinTryFuture<T = PhantomPinned, E = PhantomPinned> = UnpinFuture<Result<T, E>>; +pub struct PinnedFuture<T = PhantomPinned>(PhantomPinned, PhantomData<T>); +impl<T> Future for PinnedFuture<T> { + type Output = T; + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> { + unimplemented!() + } +} +pub type PinnedTryFuture<T = PhantomPinned, E = PhantomPinned> = PinnedFuture<Result<T, E>>; + +pub type LocalStream<T = *const ()> = Pin<Box<dyn Stream<Item = T>>>; +pub type LocalTryStream<T = *const (), E = *const ()> = LocalStream<Result<T, E>>; +pub type SendStream<T = *const ()> = Pin<Box<dyn Stream<Item = T> + Send>>; +pub type SendTryStream<T = *const (), E = *const ()> = SendStream<Result<T, E>>; +pub type SyncStream<T = *const ()> = Pin<Box<dyn Stream<Item = T> + Sync>>; +pub type SyncTryStream<T = *const (), E = *const ()> = SyncStream<Result<T, E>>; +pub type UnpinStream<T = PhantomPinned> = LocalStream<T>; +pub type UnpinTryStream<T = PhantomPinned, E = PhantomPinned> = UnpinStream<Result<T, E>>; +pub struct PinnedStream<T = PhantomPinned>(PhantomPinned, PhantomData<T>); +impl<T> Stream for PinnedStream<T> { + type Item = T; + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { + unimplemented!() + } +} +pub type PinnedTryStream<T = PhantomPinned, E = PhantomPinned> = PinnedStream<Result<T, E>>; + +pub type LocalSink<T = *const (), E = *const ()> = Pin<Box<dyn Sink<T, Error = E>>>; +pub type SendSink<T = *const (), E = *const ()> = Pin<Box<dyn Sink<T, Error = E> + Send>>; +pub type SyncSink<T = *const (), E = *const ()> = Pin<Box<dyn Sink<T, Error = E> + Sync>>; +pub type UnpinSink<T = PhantomPinned, E = PhantomPinned> = LocalSink<T, E>; +pub struct PinnedSink<T = PhantomPinned, E = PhantomPinned>(PhantomPinned, PhantomData<(T, E)>); +impl<T, E> Sink<T> for PinnedSink<T, E> { + type Error = E; + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + unimplemented!() + } + fn start_send(self: Pin<&mut Self>, _: T) -> Result<(), Self::Error> { + unimplemented!() + } + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + unimplemented!() + } + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + unimplemented!() + } +} + +/// Assert Send/Sync/Unpin for all public types in `futures::channel`. +pub mod channel { + use super::*; + use futures::channel::*; + + assert_impl!(mpsc::Receiver<()>: Send); + assert_not_impl!(mpsc::Receiver<*const ()>: Send); + assert_impl!(mpsc::Receiver<()>: Sync); + assert_not_impl!(mpsc::Receiver<*const ()>: Sync); + assert_impl!(mpsc::Receiver<PhantomPinned>: Unpin); + + assert_impl!(mpsc::SendError: Send); + assert_impl!(mpsc::SendError: Sync); + assert_impl!(mpsc::SendError: Unpin); + + assert_impl!(mpsc::Sender<()>: Send); + assert_not_impl!(mpsc::Sender<*const ()>: Send); + assert_impl!(mpsc::Sender<()>: Sync); + assert_not_impl!(mpsc::Sender<*const ()>: Sync); + assert_impl!(mpsc::Sender<PhantomPinned>: Unpin); + + assert_impl!(mpsc::TryRecvError: Send); + assert_impl!(mpsc::TryRecvError: Sync); + assert_impl!(mpsc::TryRecvError: Unpin); + + assert_impl!(mpsc::TrySendError<()>: Send); + assert_not_impl!(mpsc::TrySendError<*const ()>: Send); + assert_impl!(mpsc::TrySendError<()>: Sync); + assert_not_impl!(mpsc::TrySendError<*const ()>: Sync); + assert_impl!(mpsc::TrySendError<()>: Unpin); + assert_not_impl!(mpsc::TrySendError<PhantomPinned>: Unpin); + + assert_impl!(mpsc::UnboundedReceiver<()>: Send); + assert_not_impl!(mpsc::UnboundedReceiver<*const ()>: Send); + assert_impl!(mpsc::UnboundedReceiver<()>: Sync); + assert_not_impl!(mpsc::UnboundedReceiver<*const ()>: Sync); + assert_impl!(mpsc::UnboundedReceiver<PhantomPinned>: Unpin); + + assert_impl!(mpsc::UnboundedReceiver<()>: Send); + assert_not_impl!(mpsc::UnboundedReceiver<*const ()>: Send); + assert_impl!(mpsc::UnboundedReceiver<()>: Sync); + assert_not_impl!(mpsc::UnboundedReceiver<*const ()>: Sync); + assert_impl!(mpsc::UnboundedReceiver<PhantomPinned>: Unpin); + + assert_impl!(oneshot::Canceled: Send); + assert_impl!(oneshot::Canceled: Sync); + assert_impl!(oneshot::Canceled: Unpin); + + assert_impl!(oneshot::Cancellation<()>: Send); + assert_not_impl!(oneshot::Cancellation<*const ()>: Send); + assert_impl!(oneshot::Cancellation<()>: Sync); + assert_not_impl!(oneshot::Cancellation<*const ()>: Sync); + assert_impl!(oneshot::Cancellation<PhantomPinned>: Unpin); + + assert_impl!(oneshot::Receiver<()>: Send); + assert_not_impl!(oneshot::Receiver<*const ()>: Send); + assert_impl!(oneshot::Receiver<()>: Sync); + assert_not_impl!(oneshot::Receiver<*const ()>: Sync); + assert_impl!(oneshot::Receiver<PhantomPinned>: Unpin); + + assert_impl!(oneshot::Sender<()>: Send); + assert_not_impl!(oneshot::Sender<*const ()>: Send); + assert_impl!(oneshot::Sender<()>: Sync); + assert_not_impl!(oneshot::Sender<*const ()>: Sync); + assert_impl!(oneshot::Sender<PhantomPinned>: Unpin); +} + +/// Assert Send/Sync/Unpin for all public types in `futures::compat`. +pub mod compat { + use super::*; + use futures::compat::*; + + assert_impl!(Compat<()>: Send); + assert_not_impl!(Compat<*const ()>: Send); + assert_impl!(Compat<()>: Sync); + assert_not_impl!(Compat<*const ()>: Sync); + assert_impl!(Compat<()>: Unpin); + assert_not_impl!(Compat<PhantomPinned>: Unpin); + + assert_impl!(Compat01As03<()>: Send); + assert_not_impl!(Compat01As03<*const ()>: Send); + assert_not_impl!(Compat01As03<()>: Sync); + assert_impl!(Compat01As03<PhantomPinned>: Unpin); + + assert_impl!(Compat01As03Sink<(), ()>: Send); + assert_not_impl!(Compat01As03Sink<(), *const ()>: Send); + assert_not_impl!(Compat01As03Sink<*const (), ()>: Send); + assert_not_impl!(Compat01As03Sink<(), ()>: Sync); + assert_impl!(Compat01As03Sink<PhantomPinned, PhantomPinned>: Unpin); + + assert_impl!(CompatSink<(), *const ()>: Send); + assert_not_impl!(CompatSink<*const (), ()>: Send); + assert_impl!(CompatSink<(), *const ()>: Sync); + assert_not_impl!(CompatSink<*const (), ()>: Sync); + assert_impl!(CompatSink<(), PhantomPinned>: Unpin); + assert_not_impl!(CompatSink<PhantomPinned, ()>: Unpin); + + assert_impl!(Executor01As03<()>: Send); + assert_not_impl!(Executor01As03<*const ()>: Send); + assert_impl!(Executor01As03<()>: Sync); + assert_not_impl!(Executor01As03<*const ()>: Sync); + assert_impl!(Executor01As03<()>: Unpin); + assert_not_impl!(Executor01As03<PhantomPinned>: Unpin); + + assert_impl!(Executor01Future: Send); + assert_not_impl!(Executor01Future: Sync); + assert_impl!(Executor01Future: Unpin); +} + +/// Assert Send/Sync/Unpin for all public types in `futures::executor`. +pub mod executor { + use super::*; + use futures::executor::*; + + assert_impl!(BlockingStream<SendStream>: Send); + assert_not_impl!(BlockingStream<LocalStream>: Send); + assert_impl!(BlockingStream<SyncStream>: Sync); + assert_not_impl!(BlockingStream<LocalStream>: Sync); + assert_impl!(BlockingStream<UnpinStream>: Unpin); + // BlockingStream requires `S: Unpin` + // assert_not_impl!(BlockingStream<PinnedStream>: Unpin); + + assert_impl!(Enter: Send); + assert_impl!(Enter: Sync); + assert_impl!(Enter: Unpin); + + assert_impl!(EnterError: Send); + assert_impl!(EnterError: Sync); + assert_impl!(EnterError: Unpin); + + assert_not_impl!(LocalPool: Send); + assert_not_impl!(LocalPool: Sync); + assert_impl!(LocalPool: Unpin); + + assert_not_impl!(LocalSpawner: Send); + assert_not_impl!(LocalSpawner: Sync); + assert_impl!(LocalSpawner: Unpin); + + assert_impl!(ThreadPool: Send); + assert_impl!(ThreadPool: Sync); + assert_impl!(ThreadPool: Unpin); + + assert_impl!(ThreadPoolBuilder: Send); + assert_impl!(ThreadPoolBuilder: Sync); + assert_impl!(ThreadPoolBuilder: Unpin); +} + +/// Assert Send/Sync/Unpin for all public types in `futures::future`. +pub mod future { + use super::*; + use futures::future::*; + + assert_impl!(AbortHandle: Send); + assert_impl!(AbortHandle: Sync); + assert_impl!(AbortHandle: Unpin); + + assert_impl!(AbortRegistration: Send); + assert_impl!(AbortRegistration: Sync); + assert_impl!(AbortRegistration: Unpin); + + assert_impl!(Abortable<SendFuture>: Send); + assert_not_impl!(Abortable<LocalFuture>: Send); + assert_impl!(Abortable<SyncFuture>: Sync); + assert_not_impl!(Abortable<LocalFuture>: Sync); + assert_impl!(Abortable<UnpinFuture>: Unpin); + assert_not_impl!(Abortable<PinnedFuture>: Unpin); + + assert_impl!(Aborted: Send); + assert_impl!(Aborted: Sync); + assert_impl!(Aborted: Unpin); + + assert_impl!(AndThen<SendFuture, SendFuture, ()>: Send); + assert_not_impl!(AndThen<SendFuture, LocalFuture, ()>: Send); + assert_not_impl!(AndThen<LocalFuture, SendFuture, ()>: Send); + assert_not_impl!(AndThen<SendFuture, SendFuture, *const ()>: Send); + assert_impl!(AndThen<SyncFuture, SyncFuture, ()>: Sync); + assert_not_impl!(AndThen<SyncFuture, LocalFuture, ()>: Sync); + assert_not_impl!(AndThen<LocalFuture, SyncFuture, ()>: Sync); + assert_not_impl!(AndThen<SyncFuture, SyncFuture, *const ()>: Sync); + assert_impl!(AndThen<UnpinFuture, UnpinFuture, PhantomPinned>: Unpin); + assert_not_impl!(AndThen<PinnedFuture, UnpinFuture, PhantomPinned>: Unpin); + assert_not_impl!(AndThen<UnpinFuture, PinnedFuture, PhantomPinned>: Unpin); + + assert_impl!(CatchUnwind<SendFuture>: Send); + assert_not_impl!(CatchUnwind<LocalFuture>: Send); + assert_impl!(CatchUnwind<SyncFuture>: Sync); + assert_not_impl!(CatchUnwind<LocalFuture>: Sync); + assert_impl!(CatchUnwind<UnpinFuture>: Unpin); + assert_not_impl!(CatchUnwind<PinnedFuture>: Unpin); + + assert_impl!(ErrInto<SendTryFuture, *const ()>: Send); + assert_not_impl!(ErrInto<LocalTryFuture, ()>: Send); + assert_impl!(ErrInto<SyncTryFuture, *const ()>: Sync); + assert_not_impl!(ErrInto<LocalTryFuture, ()>: Sync); + assert_impl!(ErrInto<UnpinTryFuture, PhantomPinned>: Unpin); + assert_not_impl!(ErrInto<PinnedTryFuture, PhantomPinned>: Unpin); + + assert_impl!(Flatten<SendFuture<()>>: Send); + assert_not_impl!(Flatten<LocalFuture>: Send); + assert_not_impl!(Flatten<SendFuture>: Send); + assert_impl!(Flatten<SyncFuture<()>>: Sync); + assert_not_impl!(Flatten<LocalFuture>: Sync); + assert_not_impl!(Flatten<SyncFuture>: Sync); + assert_impl!(Flatten<UnpinFuture<()>>: Unpin); + assert_not_impl!(Flatten<PinnedFuture>: Unpin); + assert_not_impl!(Flatten<UnpinFuture>: Unpin); + + assert_impl!(FlattenSink<SendFuture, ()>: Send); + assert_not_impl!(FlattenSink<SendFuture, *const ()>: Send); + assert_not_impl!(FlattenSink<LocalFuture, ()>: Send); + assert_impl!(FlattenSink<SyncFuture, ()>: Sync); + assert_not_impl!(FlattenSink<SyncFuture, *const ()>: Sync); + assert_not_impl!(FlattenSink<LocalFuture, ()>: Sync); + assert_impl!(FlattenSink<UnpinFuture, ()>: Unpin); + assert_not_impl!(FlattenSink<UnpinFuture, PhantomPinned>: Unpin); + assert_not_impl!(FlattenSink<PinnedFuture, ()>: Unpin); + + assert_impl!(FlattenStream<SendFuture<()>>: Send); + assert_not_impl!(FlattenStream<LocalFuture>: Send); + assert_not_impl!(FlattenStream<SendFuture>: Send); + assert_impl!(FlattenStream<SyncFuture<()>>: Sync); + assert_not_impl!(FlattenStream<LocalFuture>: Sync); + assert_not_impl!(FlattenStream<SyncFuture>: Sync); + assert_impl!(FlattenStream<UnpinFuture<()>>: Unpin); + assert_not_impl!(FlattenStream<PinnedFuture>: Unpin); + assert_not_impl!(FlattenStream<UnpinFuture>: Unpin); + + assert_impl!(Fuse<SendFuture>: Send); + assert_not_impl!(Fuse<LocalFuture>: Send); + assert_impl!(Fuse<SyncFuture>: Sync); + assert_not_impl!(Fuse<LocalFuture>: Sync); + assert_impl!(Fuse<UnpinFuture>: Unpin); + assert_not_impl!(Fuse<PinnedFuture>: Unpin); + + assert_impl!(FutureObj<*const ()>: Send); + assert_not_impl!(FutureObj<()>: Sync); + assert_impl!(FutureObj<PhantomPinned>: Unpin); + + assert_impl!(Inspect<SendFuture, ()>: Send); + assert_not_impl!(Inspect<SendFuture, *const ()>: Send); + assert_not_impl!(Inspect<LocalFuture, ()>: Send); + assert_impl!(Inspect<SyncFuture, ()>: Sync); + assert_not_impl!(Inspect<SyncFuture, *const ()>: Sync); + assert_not_impl!(Inspect<LocalFuture, ()>: Sync); + assert_impl!(Inspect<UnpinFuture, PhantomPinned>: Unpin); + assert_not_impl!(Inspect<PhantomPinned, PhantomPinned>: Unpin); + + assert_impl!(InspectErr<SendFuture, ()>: Send); + assert_not_impl!(InspectErr<SendFuture, *const ()>: Send); + assert_not_impl!(InspectErr<LocalFuture, ()>: Send); + assert_impl!(InspectErr<SyncFuture, ()>: Sync); + assert_not_impl!(InspectErr<SyncFuture, *const ()>: Sync); + assert_not_impl!(InspectErr<LocalFuture, ()>: Sync); + assert_impl!(InspectErr<UnpinFuture, PhantomPinned>: Unpin); + assert_not_impl!(InspectErr<PhantomPinned, PhantomPinned>: Unpin); + + assert_impl!(InspectOk<SendFuture, ()>: Send); + assert_not_impl!(InspectOk<SendFuture, *const ()>: Send); + assert_not_impl!(InspectOk<LocalFuture, ()>: Send); + assert_impl!(InspectOk<SyncFuture, ()>: Sync); + assert_not_impl!(InspectOk<SyncFuture, *const ()>: Sync); + assert_not_impl!(InspectOk<LocalFuture, ()>: Sync); + assert_impl!(InspectOk<UnpinFuture, PhantomPinned>: Unpin); + assert_not_impl!(InspectOk<PhantomPinned, PhantomPinned>: Unpin); + + assert_impl!(IntoFuture<SendFuture>: Send); + assert_not_impl!(IntoFuture<LocalFuture>: Send); + assert_impl!(IntoFuture<SyncFuture>: Sync); + assert_not_impl!(IntoFuture<LocalFuture>: Sync); + assert_impl!(IntoFuture<UnpinFuture>: Unpin); + assert_not_impl!(IntoFuture<PinnedFuture>: Unpin); + + assert_impl!(IntoStream<SendFuture>: Send); + assert_not_impl!(IntoStream<LocalFuture>: Send); + assert_impl!(IntoStream<SyncFuture>: Sync); + assert_not_impl!(IntoStream<LocalFuture>: Sync); + assert_impl!(IntoStream<UnpinFuture>: Unpin); + assert_not_impl!(IntoStream<PinnedFuture>: Unpin); + + assert_impl!(Join<SendFuture<()>, SendFuture<()>>: Send); + assert_not_impl!(Join<SendFuture<()>, SendFuture>: Send); + assert_not_impl!(Join<SendFuture, SendFuture<()>>: Send); + assert_not_impl!(Join<SendFuture, LocalFuture>: Send); + assert_not_impl!(Join<LocalFuture, SendFuture>: Send); + assert_impl!(Join<SyncFuture<()>, SyncFuture<()>>: Sync); + assert_not_impl!(Join<SyncFuture<()>, SyncFuture>: Sync); + assert_not_impl!(Join<SyncFuture, SyncFuture<()>>: Sync); + assert_not_impl!(Join<SyncFuture, LocalFuture>: Sync); + assert_not_impl!(Join<LocalFuture, SyncFuture>: Sync); + assert_impl!(Join<UnpinFuture, UnpinFuture>: Unpin); + assert_not_impl!(Join<PinnedFuture, UnpinFuture>: Unpin); + assert_not_impl!(Join<UnpinFuture, PinnedFuture>: Unpin); + + // Join3, Join4, Join5 are the same as Join + + assert_impl!(JoinAll<SendFuture<()>>: Send); + assert_not_impl!(JoinAll<LocalFuture>: Send); + assert_not_impl!(JoinAll<SendFuture>: Send); + assert_impl!(JoinAll<SyncFuture<()>>: Sync); + assert_not_impl!(JoinAll<LocalFuture>: Sync); + assert_not_impl!(JoinAll<SyncFuture>: Sync); + assert_impl!(JoinAll<PinnedFuture>: Unpin); + + assert_impl!(Lazy<()>: Send); + assert_not_impl!(Lazy<*const ()>: Send); + assert_impl!(Lazy<()>: Sync); + assert_not_impl!(Lazy<*const ()>: Sync); + assert_impl!(Lazy<PhantomPinned>: Unpin); + + assert_not_impl!(LocalFutureObj<()>: Send); + assert_not_impl!(LocalFutureObj<()>: Sync); + assert_impl!(LocalFutureObj<PhantomPinned>: Unpin); + + assert_impl!(Map<SendFuture, ()>: Send); + assert_not_impl!(Map<SendFuture, *const ()>: Send); + assert_not_impl!(Map<LocalFuture, ()>: Send); + assert_impl!(Map<SyncFuture, ()>: Sync); + assert_not_impl!(Map<SyncFuture, *const ()>: Sync); + assert_not_impl!(Map<LocalFuture, ()>: Sync); + assert_impl!(Map<UnpinFuture, PhantomPinned>: Unpin); + assert_not_impl!(Map<PhantomPinned, ()>: Unpin); + + assert_impl!(MapErr<SendFuture, ()>: Send); + assert_not_impl!(MapErr<SendFuture, *const ()>: Send); + assert_not_impl!(MapErr<LocalFuture, ()>: Send); + assert_impl!(MapErr<SyncFuture, ()>: Sync); + assert_not_impl!(MapErr<SyncFuture, *const ()>: Sync); + assert_not_impl!(MapErr<LocalFuture, ()>: Sync); + assert_impl!(MapErr<UnpinFuture, PhantomPinned>: Unpin); + assert_not_impl!(MapErr<PhantomPinned, ()>: Unpin); + + assert_impl!(MapInto<SendFuture, *const ()>: Send); + assert_not_impl!(MapInto<LocalFuture, ()>: Send); + assert_impl!(MapInto<SyncFuture, *const ()>: Sync); + assert_not_impl!(MapInto<LocalFuture, ()>: Sync); + assert_impl!(MapInto<UnpinFuture, PhantomPinned>: Unpin); + assert_not_impl!(MapInto<PhantomPinned, ()>: Unpin); + + assert_impl!(MapOk<SendFuture, ()>: Send); + assert_not_impl!(MapOk<SendFuture, *const ()>: Send); + assert_not_impl!(MapOk<LocalFuture, ()>: Send); + assert_impl!(MapOk<SyncFuture, ()>: Sync); + assert_not_impl!(MapOk<SyncFuture, *const ()>: Sync); + assert_not_impl!(MapOk<LocalFuture, ()>: Sync); + assert_impl!(MapOk<UnpinFuture, PhantomPinned>: Unpin); + assert_not_impl!(MapOk<PhantomPinned, ()>: Unpin); + + assert_impl!(MapOkOrElse<SendFuture, (), ()>: Send); + assert_not_impl!(MapOkOrElse<SendFuture, (), *const ()>: Send); + assert_not_impl!(MapOkOrElse<SendFuture, *const (), ()>: Send); + assert_not_impl!(MapOkOrElse<LocalFuture, (), ()>: Send); + assert_impl!(MapOkOrElse<SyncFuture, (), ()>: Sync); + assert_not_impl!(MapOkOrElse<SyncFuture, (), *const ()>: Sync); + assert_not_impl!(MapOkOrElse<SyncFuture, *const (), ()>: Sync); + assert_not_impl!(MapOkOrElse<LocalFuture, (), ()>: Sync); + assert_impl!(MapOkOrElse<UnpinFuture, PhantomPinned, PhantomPinned>: Unpin); + assert_not_impl!(MapOkOrElse<PhantomPinned, (), ()>: Unpin); + + assert_impl!(NeverError<SendFuture>: Send); + assert_not_impl!(NeverError<LocalFuture>: Send); + assert_impl!(NeverError<SyncFuture>: Sync); + assert_not_impl!(NeverError<LocalFuture>: Sync); + assert_impl!(NeverError<UnpinFuture>: Unpin); + assert_not_impl!(NeverError<PinnedFuture>: Unpin); + + assert_impl!(OkInto<SendFuture, *const ()>: Send); + assert_not_impl!(OkInto<LocalFuture, ()>: Send); + assert_impl!(OkInto<SyncFuture, *const ()>: Sync); + assert_not_impl!(OkInto<LocalFuture, ()>: Sync); + assert_impl!(OkInto<UnpinFuture, PhantomPinned>: Unpin); + assert_not_impl!(OkInto<PhantomPinned, ()>: Unpin); + + assert_impl!(OptionFuture<SendFuture>: Send); + assert_not_impl!(OptionFuture<LocalFuture>: Send); + assert_impl!(OptionFuture<SyncFuture>: Sync); + assert_not_impl!(OptionFuture<LocalFuture>: Sync); + assert_impl!(OptionFuture<UnpinFuture>: Unpin); + assert_not_impl!(OptionFuture<PinnedFuture>: Unpin); + + assert_impl!(OrElse<SendFuture, SendFuture, ()>: Send); + assert_not_impl!(OrElse<SendFuture, LocalFuture, ()>: Send); + assert_not_impl!(OrElse<LocalFuture, SendFuture, ()>: Send); + assert_not_impl!(OrElse<SendFuture, SendFuture, *const ()>: Send); + assert_impl!(OrElse<SyncFuture, SyncFuture, ()>: Sync); + assert_not_impl!(OrElse<SyncFuture, LocalFuture, ()>: Sync); + assert_not_impl!(OrElse<LocalFuture, SyncFuture, ()>: Sync); + assert_not_impl!(OrElse<SyncFuture, SyncFuture, *const ()>: Sync); + assert_impl!(OrElse<UnpinFuture, UnpinFuture, PhantomPinned>: Unpin); + assert_not_impl!(OrElse<PinnedFuture, UnpinFuture, PhantomPinned>: Unpin); + assert_not_impl!(OrElse<UnpinFuture, PinnedFuture, PhantomPinned>: Unpin); + + assert_impl!(Pending<()>: Send); + assert_not_impl!(Pending<*const ()>: Send); + assert_impl!(Pending<()>: Sync); + assert_not_impl!(Pending<*const ()>: Sync); + assert_impl!(Pending<PhantomPinned>: Unpin); + + assert_impl!(PollFn<()>: Send); + assert_not_impl!(PollFn<*const ()>: Send); + assert_impl!(PollFn<()>: Sync); + assert_not_impl!(PollFn<*const ()>: Sync); + assert_impl!(PollFn<PhantomPinned>: Unpin); + + assert_impl!(PollImmediate<SendStream>: Send); + assert_not_impl!(PollImmediate<LocalStream<()>>: Send); + assert_impl!(PollImmediate<SyncStream>: Sync); + assert_not_impl!(PollImmediate<LocalStream<()>>: Sync); + assert_impl!(PollImmediate<UnpinStream>: Unpin); + assert_not_impl!(PollImmediate<PinnedStream>: Unpin); + + assert_impl!(Ready<()>: Send); + assert_not_impl!(Ready<*const ()>: Send); + assert_impl!(Ready<()>: Sync); + assert_not_impl!(Ready<*const ()>: Sync); + assert_impl!(Ready<PhantomPinned>: Unpin); + + assert_impl!(Remote<SendFuture<()>>: Send); + assert_not_impl!(Remote<LocalFuture>: Send); + assert_not_impl!(Remote<SendFuture>: Send); + assert_impl!(Remote<SyncFuture<()>>: Sync); + assert_not_impl!(Remote<LocalFuture>: Sync); + assert_not_impl!(Remote<SyncFuture>: Sync); + assert_impl!(Remote<UnpinFuture>: Unpin); + assert_not_impl!(Remote<PinnedFuture>: Unpin); + + assert_impl!(RemoteHandle<()>: Send); + assert_not_impl!(RemoteHandle<*const ()>: Send); + assert_impl!(RemoteHandle<()>: Sync); + assert_not_impl!(RemoteHandle<*const ()>: Sync); + assert_impl!(RemoteHandle<PhantomPinned>: Unpin); + + assert_impl!(Select<SendFuture, SendFuture>: Send); + assert_not_impl!(Select<SendFuture, LocalFuture>: Send); + assert_not_impl!(Select<LocalFuture, SendFuture>: Send); + assert_impl!(Select<SyncFuture, SyncFuture>: Sync); + assert_not_impl!(Select<SyncFuture, LocalFuture>: Sync); + assert_not_impl!(Select<LocalFuture, SyncFuture>: Sync); + assert_impl!(Select<UnpinFuture, UnpinFuture>: Unpin); + assert_not_impl!(Select<PinnedFuture, UnpinFuture>: Unpin); + assert_not_impl!(Select<UnpinFuture, PinnedFuture>: Unpin); + + assert_impl!(SelectAll<SendFuture>: Send); + assert_not_impl!(SelectAll<LocalFuture>: Send); + assert_impl!(SelectAll<SyncFuture>: Sync); + assert_not_impl!(SelectAll<LocalFuture>: Sync); + assert_impl!(SelectAll<UnpinFuture>: Unpin); + assert_not_impl!(SelectAll<PinnedFuture>: Unpin); + + assert_impl!(SelectOk<SendFuture>: Send); + assert_not_impl!(SelectOk<LocalFuture>: Send); + assert_impl!(SelectOk<SyncFuture>: Sync); + assert_not_impl!(SelectOk<LocalFuture>: Sync); + assert_impl!(SelectOk<UnpinFuture>: Unpin); + assert_not_impl!(SelectOk<PinnedFuture>: Unpin); + + assert_impl!(Shared<SendFuture<()>>: Send); + assert_not_impl!(Shared<SendFuture>: Send); + assert_not_impl!(Shared<LocalFuture>: Send); + assert_not_impl!(Shared<SyncFuture<()>>: Sync); + assert_impl!(Shared<PinnedFuture>: Unpin); + + assert_impl!(Then<SendFuture, SendFuture, ()>: Send); + assert_not_impl!(Then<SendFuture, SendFuture, *const ()>: Send); + assert_not_impl!(Then<SendFuture, LocalFuture, ()>: Send); + assert_not_impl!(Then<LocalFuture, SendFuture, ()>: Send); + assert_impl!(Then<SyncFuture, SyncFuture, ()>: Sync); + assert_not_impl!(Then<SyncFuture, SyncFuture, *const ()>: Sync); + assert_not_impl!(Then<SyncFuture, LocalFuture, ()>: Sync); + assert_not_impl!(Then<LocalFuture, SyncFuture, ()>: Sync); + assert_impl!(Then<UnpinFuture, UnpinFuture, PhantomPinned>: Unpin); + assert_not_impl!(Then<PinnedFuture, UnpinFuture, ()>: Unpin); + assert_not_impl!(Then<UnpinFuture, PinnedFuture, ()>: Unpin); + + assert_impl!(TryFlatten<SendTryFuture<()>, ()>: Send); + assert_not_impl!(TryFlatten<LocalTryFuture, ()>: Send); + assert_not_impl!(TryFlatten<SendTryFuture, *const ()>: Send); + assert_impl!(TryFlatten<SyncTryFuture<()>, ()>: Sync); + assert_not_impl!(TryFlatten<LocalTryFuture, ()>: Sync); + assert_not_impl!(TryFlatten<SyncTryFuture, *const ()>: Sync); + assert_impl!(TryFlatten<UnpinTryFuture<()>, ()>: Unpin); + assert_not_impl!(TryFlatten<PinnedTryFuture, ()>: Unpin); + assert_not_impl!(TryFlatten<UnpinTryFuture, PhantomPinned>: Unpin); + + assert_impl!(TryFlattenStream<SendTryFuture<()>>: Send); + assert_not_impl!(TryFlattenStream<LocalTryFuture>: Send); + assert_not_impl!(TryFlattenStream<SendTryFuture>: Send); + assert_impl!(TryFlattenStream<SyncTryFuture<()>>: Sync); + assert_not_impl!(TryFlattenStream<LocalTryFuture>: Sync); + assert_not_impl!(TryFlattenStream<SyncTryFuture>: Sync); + assert_impl!(TryFlattenStream<UnpinTryFuture<()>>: Unpin); + assert_not_impl!(TryFlattenStream<PinnedTryFuture>: Unpin); + assert_not_impl!(TryFlattenStream<UnpinTryFuture>: Unpin); + + assert_impl!(TryJoin<SendTryFuture<()>, SendTryFuture<()>>: Send); + assert_not_impl!(TryJoin<SendTryFuture<()>, SendTryFuture>: Send); + assert_not_impl!(TryJoin<SendTryFuture, SendTryFuture<()>>: Send); + assert_not_impl!(TryJoin<SendTryFuture, LocalTryFuture>: Send); + assert_not_impl!(TryJoin<LocalTryFuture, SendTryFuture>: Send); + assert_impl!(TryJoin<SyncTryFuture<()>, SyncTryFuture<()>>: Sync); + assert_not_impl!(TryJoin<SyncTryFuture<()>, SyncTryFuture>: Sync); + assert_not_impl!(TryJoin<SyncTryFuture, SyncTryFuture<()>>: Sync); + assert_not_impl!(TryJoin<SyncTryFuture, LocalTryFuture>: Sync); + assert_not_impl!(TryJoin<LocalTryFuture, SyncTryFuture>: Sync); + assert_impl!(TryJoin<UnpinTryFuture, UnpinTryFuture>: Unpin); + assert_not_impl!(TryJoin<PinnedTryFuture, UnpinTryFuture>: Unpin); + assert_not_impl!(TryJoin<UnpinTryFuture, PinnedTryFuture>: Unpin); + + // TryJoin3, TryJoin4, TryJoin5 are the same as TryJoin + + assert_impl!(TryJoinAll<SendTryFuture<(), ()>>: Send); + assert_not_impl!(TryJoinAll<LocalTryFuture>: Send); + assert_not_impl!(TryJoinAll<SendTryFuture>: Send); + assert_impl!(TryJoinAll<SyncTryFuture<(), ()>>: Sync); + assert_not_impl!(TryJoinAll<LocalTryFuture>: Sync); + assert_not_impl!(TryJoinAll<SyncTryFuture>: Sync); + assert_impl!(TryJoinAll<PinnedTryFuture>: Unpin); + + assert_impl!(TrySelect<SendFuture, SendFuture>: Send); + assert_not_impl!(TrySelect<SendFuture, LocalFuture>: Send); + assert_not_impl!(TrySelect<LocalFuture, SendFuture>: Send); + assert_impl!(TrySelect<SyncFuture, SyncFuture>: Sync); + assert_not_impl!(TrySelect<SyncFuture, LocalFuture>: Sync); + assert_not_impl!(TrySelect<LocalFuture, SyncFuture>: Sync); + assert_impl!(TrySelect<UnpinFuture, UnpinFuture>: Unpin); + assert_not_impl!(TrySelect<PinnedFuture, UnpinFuture>: Unpin); + assert_not_impl!(TrySelect<UnpinFuture, PinnedFuture>: Unpin); + + assert_impl!(UnitError<SendFuture>: Send); + assert_not_impl!(UnitError<LocalFuture>: Send); + assert_impl!(UnitError<SyncFuture>: Sync); + assert_not_impl!(UnitError<LocalFuture>: Sync); + assert_impl!(UnitError<UnpinFuture>: Unpin); + assert_not_impl!(UnitError<PinnedFuture>: Unpin); + + assert_impl!(UnwrapOrElse<SendFuture, ()>: Send); + assert_not_impl!(UnwrapOrElse<SendFuture, *const ()>: Send); + assert_not_impl!(UnwrapOrElse<LocalFuture, ()>: Send); + assert_impl!(UnwrapOrElse<SyncFuture, ()>: Sync); + assert_not_impl!(UnwrapOrElse<SyncFuture, *const ()>: Sync); + assert_not_impl!(UnwrapOrElse<LocalFuture, ()>: Sync); + assert_impl!(UnwrapOrElse<UnpinFuture, PhantomPinned>: Unpin); + assert_not_impl!(UnwrapOrElse<PhantomPinned, ()>: Unpin); + + assert_impl!(WeakShared<SendFuture<()>>: Send); + assert_not_impl!(WeakShared<SendFuture>: Send); + assert_not_impl!(WeakShared<LocalFuture>: Send); + assert_not_impl!(WeakShared<SyncFuture<()>>: Sync); + assert_impl!(WeakShared<PinnedFuture>: Unpin); + + assert_impl!(Either<SendFuture, SendFuture>: Send); + assert_not_impl!(Either<SendFuture, LocalFuture>: Send); + assert_not_impl!(Either<LocalFuture, SendFuture>: Send); + assert_impl!(Either<SyncFuture, SyncFuture>: Sync); + assert_not_impl!(Either<SyncFuture, LocalFuture>: Sync); + assert_not_impl!(Either<LocalFuture, SyncFuture>: Sync); + assert_impl!(Either<UnpinFuture, UnpinFuture>: Unpin); + assert_not_impl!(Either<UnpinFuture, PinnedFuture>: Unpin); + assert_not_impl!(Either<PinnedFuture, UnpinFuture>: Unpin); + + assert_impl!(MaybeDone<SendFuture<()>>: Send); + assert_not_impl!(MaybeDone<SendFuture>: Send); + assert_not_impl!(MaybeDone<LocalFuture>: Send); + assert_impl!(MaybeDone<SyncFuture<()>>: Sync); + assert_not_impl!(MaybeDone<SyncFuture>: Sync); + assert_not_impl!(MaybeDone<LocalFuture>: Sync); + assert_impl!(MaybeDone<UnpinFuture>: Unpin); + assert_not_impl!(MaybeDone<PinnedFuture>: Unpin); + + assert_impl!(TryMaybeDone<SendTryFuture<()>>: Send); + assert_not_impl!(TryMaybeDone<SendTryFuture>: Send); + assert_not_impl!(TryMaybeDone<LocalTryFuture>: Send); + assert_impl!(TryMaybeDone<SyncTryFuture<()>>: Sync); + assert_not_impl!(TryMaybeDone<SyncTryFuture>: Sync); + assert_not_impl!(TryMaybeDone<LocalTryFuture>: Sync); + assert_impl!(TryMaybeDone<UnpinTryFuture>: Unpin); + assert_not_impl!(TryMaybeDone<PinnedTryFuture>: Unpin); +} + +/// Assert Send/Sync/Unpin for all public types in `futures::io`. +pub mod io { + use super::*; + use futures::io::{Sink, *}; + + assert_impl!(AllowStdIo<()>: Send); + assert_not_impl!(AllowStdIo<*const ()>: Send); + assert_impl!(AllowStdIo<()>: Sync); + assert_not_impl!(AllowStdIo<*const ()>: Sync); + assert_impl!(AllowStdIo<PhantomPinned>: Unpin); + + assert_impl!(BufReader<()>: Send); + assert_not_impl!(BufReader<*const ()>: Send); + assert_impl!(BufReader<()>: Sync); + assert_not_impl!(BufReader<*const ()>: Sync); + assert_impl!(BufReader<()>: Unpin); + assert_not_impl!(BufReader<PhantomPinned>: Unpin); + + assert_impl!(BufWriter<()>: Send); + assert_not_impl!(BufWriter<*const ()>: Send); + assert_impl!(BufWriter<()>: Sync); + assert_not_impl!(BufWriter<*const ()>: Sync); + assert_impl!(BufWriter<()>: Unpin); + assert_not_impl!(BufWriter<PhantomPinned>: Unpin); + + assert_impl!(Chain<(), ()>: Send); + assert_not_impl!(Chain<(), *const ()>: Send); + assert_not_impl!(Chain<*const (), ()>: Send); + assert_impl!(Chain<(), ()>: Sync); + assert_not_impl!(Chain<(), *const ()>: Sync); + assert_not_impl!(Chain<*const (), ()>: Sync); + assert_impl!(Chain<(), ()>: Unpin); + assert_not_impl!(Chain<(), PhantomPinned>: Unpin); + assert_not_impl!(Chain<PhantomPinned, ()>: Unpin); + + assert_impl!(Close<'_, ()>: Send); + assert_not_impl!(Close<'_, *const ()>: Send); + assert_impl!(Close<'_, ()>: Sync); + assert_not_impl!(Close<'_, *const ()>: Sync); + assert_impl!(Close<'_, ()>: Unpin); + assert_not_impl!(Close<'_, PhantomPinned>: Unpin); + + assert_impl!(Copy<(), ()>: Send); + assert_not_impl!(Copy<(), *const ()>: Send); + assert_not_impl!(Copy<*const (), ()>: Send); + assert_impl!(Copy<(), ()>: Sync); + assert_not_impl!(Copy<(), *const ()>: Sync); + assert_not_impl!(Copy<*const (), ()>: Sync); + assert_impl!(Copy<(), PhantomPinned>: Unpin); + assert_not_impl!(Copy<PhantomPinned, ()>: Unpin); + + assert_impl!(CopyBuf<(), ()>: Send); + assert_not_impl!(CopyBuf<(), *const ()>: Send); + assert_not_impl!(CopyBuf<*const (), ()>: Send); + assert_impl!(CopyBuf<(), ()>: Sync); + assert_not_impl!(CopyBuf<(), *const ()>: Sync); + assert_not_impl!(CopyBuf<*const (), ()>: Sync); + assert_impl!(CopyBuf<(), PhantomPinned>: Unpin); + assert_not_impl!(CopyBuf<PhantomPinned, ()>: Unpin); + + assert_impl!(Cursor<()>: Send); + assert_not_impl!(Cursor<*const ()>: Send); + assert_impl!(Cursor<()>: Sync); + assert_not_impl!(Cursor<*const ()>: Sync); + assert_impl!(Cursor<()>: Unpin); + assert_not_impl!(Cursor<PhantomPinned>: Unpin); + + assert_impl!(Empty: Send); + assert_impl!(Empty: Sync); + assert_impl!(Empty: Unpin); + + assert_impl!(FillBuf<'_, ()>: Send); + assert_not_impl!(FillBuf<'_, *const ()>: Send); + assert_impl!(FillBuf<'_, ()>: Sync); + assert_not_impl!(FillBuf<'_, *const ()>: Sync); + assert_impl!(FillBuf<'_, PhantomPinned>: Unpin); + + assert_impl!(Flush<'_, ()>: Send); + assert_not_impl!(Flush<'_, *const ()>: Send); + assert_impl!(Flush<'_, ()>: Sync); + assert_not_impl!(Flush<'_, *const ()>: Sync); + assert_impl!(Flush<'_, ()>: Unpin); + assert_not_impl!(Flush<'_, PhantomPinned>: Unpin); + + assert_impl!(IntoSink<(), ()>: Send); + assert_not_impl!(IntoSink<(), *const ()>: Send); + assert_not_impl!(IntoSink<*const (), ()>: Send); + assert_impl!(IntoSink<(), ()>: Sync); + assert_not_impl!(IntoSink<(), *const ()>: Sync); + assert_not_impl!(IntoSink<*const (), ()>: Sync); + assert_impl!(IntoSink<(), PhantomPinned>: Unpin); + assert_not_impl!(IntoSink<PhantomPinned, ()>: Unpin); + + assert_impl!(Lines<()>: Send); + assert_not_impl!(Lines<*const ()>: Send); + assert_impl!(Lines<()>: Sync); + assert_not_impl!(Lines<*const ()>: Sync); + assert_impl!(Lines<()>: Unpin); + assert_not_impl!(Lines<PhantomPinned>: Unpin); + + assert_impl!(Read<'_, ()>: Send); + assert_not_impl!(Read<'_, *const ()>: Send); + assert_impl!(Read<'_, ()>: Sync); + assert_not_impl!(Read<'_, *const ()>: Sync); + assert_impl!(Read<'_, ()>: Unpin); + assert_not_impl!(Read<'_, PhantomPinned>: Unpin); + + assert_impl!(ReadExact<'_, ()>: Send); + assert_not_impl!(ReadExact<'_, *const ()>: Send); + assert_impl!(ReadExact<'_, ()>: Sync); + assert_not_impl!(ReadExact<'_, *const ()>: Sync); + assert_impl!(ReadExact<'_, ()>: Unpin); + assert_not_impl!(ReadExact<'_, PhantomPinned>: Unpin); + + assert_impl!(ReadHalf<()>: Send); + assert_not_impl!(ReadHalf<*const ()>: Send); + assert_impl!(ReadHalf<()>: Sync); + assert_not_impl!(ReadHalf<*const ()>: Sync); + assert_impl!(ReadHalf<PhantomPinned>: Unpin); + + assert_impl!(ReadLine<'_, ()>: Send); + assert_not_impl!(ReadLine<'_, *const ()>: Send); + assert_impl!(ReadLine<'_, ()>: Sync); + assert_not_impl!(ReadLine<'_, *const ()>: Sync); + assert_impl!(ReadLine<'_, ()>: Unpin); + assert_not_impl!(ReadLine<'_, PhantomPinned>: Unpin); + + assert_impl!(ReadToEnd<'_, ()>: Send); + assert_not_impl!(ReadToEnd<'_, *const ()>: Send); + assert_impl!(ReadToEnd<'_, ()>: Sync); + assert_not_impl!(ReadToEnd<'_, *const ()>: Sync); + assert_impl!(ReadToEnd<'_, ()>: Unpin); + assert_not_impl!(ReadToEnd<'_, PhantomPinned>: Unpin); + + assert_impl!(ReadToString<'_, ()>: Send); + assert_not_impl!(ReadToString<'_, *const ()>: Send); + assert_impl!(ReadToString<'_, ()>: Sync); + assert_not_impl!(ReadToString<'_, *const ()>: Sync); + assert_impl!(ReadToString<'_, ()>: Unpin); + assert_not_impl!(ReadToString<'_, PhantomPinned>: Unpin); + + assert_impl!(ReadUntil<'_, ()>: Send); + assert_not_impl!(ReadUntil<'_, *const ()>: Send); + assert_impl!(ReadUntil<'_, ()>: Sync); + assert_not_impl!(ReadUntil<'_, *const ()>: Sync); + assert_impl!(ReadUntil<'_, ()>: Unpin); + assert_not_impl!(ReadUntil<'_, PhantomPinned>: Unpin); + + assert_impl!(ReadVectored<'_, ()>: Send); + assert_not_impl!(ReadVectored<'_, *const ()>: Send); + assert_impl!(ReadVectored<'_, ()>: Sync); + assert_not_impl!(ReadVectored<'_, *const ()>: Sync); + assert_impl!(ReadVectored<'_, ()>: Unpin); + assert_not_impl!(ReadVectored<'_, PhantomPinned>: Unpin); + + assert_impl!(Repeat: Send); + assert_impl!(Repeat: Sync); + assert_impl!(Repeat: Unpin); + + assert_impl!(ReuniteError<()>: Send); + assert_not_impl!(ReuniteError<*const ()>: Send); + assert_impl!(ReuniteError<()>: Sync); + assert_not_impl!(ReuniteError<*const ()>: Sync); + assert_impl!(ReuniteError<PhantomPinned>: Unpin); + + assert_impl!(Seek<'_, ()>: Send); + assert_not_impl!(Seek<'_, *const ()>: Send); + assert_impl!(Seek<'_, ()>: Sync); + assert_not_impl!(Seek<'_, *const ()>: Sync); + assert_impl!(Seek<'_, ()>: Unpin); + assert_not_impl!(Seek<'_, PhantomPinned>: Unpin); + + assert_impl!(SeeKRelative<'_, ()>: Send); + assert_not_impl!(SeeKRelative<'_, *const ()>: Send); + assert_impl!(SeeKRelative<'_, ()>: Sync); + assert_not_impl!(SeeKRelative<'_, *const ()>: Sync); + assert_impl!(SeeKRelative<'_, PhantomPinned>: Unpin); + + assert_impl!(Sink: Send); + assert_impl!(Sink: Sync); + assert_impl!(Sink: Unpin); + + assert_impl!(Take<()>: Send); + assert_not_impl!(Take<*const ()>: Send); + assert_impl!(Take<()>: Sync); + assert_not_impl!(Take<*const ()>: Sync); + assert_impl!(Take<()>: Unpin); + assert_not_impl!(Take<PhantomPinned>: Unpin); + + assert_impl!(Window<()>: Send); + assert_not_impl!(Window<*const ()>: Send); + assert_impl!(Window<()>: Sync); + assert_not_impl!(Window<*const ()>: Sync); + assert_impl!(Window<()>: Unpin); + assert_not_impl!(Window<PhantomPinned>: Unpin); + + assert_impl!(Write<'_, ()>: Send); + assert_not_impl!(Write<'_, *const ()>: Send); + assert_impl!(Write<'_, ()>: Sync); + assert_not_impl!(Write<'_, *const ()>: Sync); + assert_impl!(Write<'_, ()>: Unpin); + assert_not_impl!(Write<'_, PhantomPinned>: Unpin); + + assert_impl!(WriteAll<'_, ()>: Send); + assert_not_impl!(WriteAll<'_, *const ()>: Send); + assert_impl!(WriteAll<'_, ()>: Sync); + assert_not_impl!(WriteAll<'_, *const ()>: Sync); + assert_impl!(WriteAll<'_, ()>: Unpin); + assert_not_impl!(WriteAll<'_, PhantomPinned>: Unpin); + + #[cfg(feature = "write-all-vectored")] + assert_impl!(WriteAllVectored<'_, ()>: Send); + #[cfg(feature = "write-all-vectored")] + assert_not_impl!(WriteAllVectored<'_, *const ()>: Send); + #[cfg(feature = "write-all-vectored")] + assert_impl!(WriteAllVectored<'_, ()>: Sync); + #[cfg(feature = "write-all-vectored")] + assert_not_impl!(WriteAllVectored<'_, *const ()>: Sync); + #[cfg(feature = "write-all-vectored")] + assert_impl!(WriteAllVectored<'_, ()>: Unpin); + // WriteAllVectored requires `W: Unpin` + // #[cfg(feature = "write-all-vectored")] + // assert_not_impl!(WriteAllVectored<'_, PhantomPinned>: Unpin); + + assert_impl!(WriteHalf<()>: Send); + assert_not_impl!(WriteHalf<*const ()>: Send); + assert_impl!(WriteHalf<()>: Sync); + assert_not_impl!(WriteHalf<*const ()>: Sync); + assert_impl!(WriteHalf<PhantomPinned>: Unpin); + + assert_impl!(WriteVectored<'_, ()>: Send); + assert_not_impl!(WriteVectored<'_, *const ()>: Send); + assert_impl!(WriteVectored<'_, ()>: Sync); + assert_not_impl!(WriteVectored<'_, *const ()>: Sync); + assert_impl!(WriteVectored<'_, ()>: Unpin); + assert_not_impl!(WriteVectored<'_, PhantomPinned>: Unpin); +} + +/// Assert Send/Sync/Unpin for all public types in `futures::lock`. +pub mod lock { + use super::*; + use futures::lock::*; + + #[cfg(feature = "bilock")] + assert_impl!(BiLock<()>: Send); + #[cfg(feature = "bilock")] + assert_not_impl!(BiLock<*const ()>: Send); + #[cfg(feature = "bilock")] + assert_impl!(BiLock<()>: Sync); + #[cfg(feature = "bilock")] + assert_not_impl!(BiLock<*const ()>: Sync); + #[cfg(feature = "bilock")] + assert_impl!(BiLock<PhantomPinned>: Unpin); + + #[cfg(feature = "bilock")] + assert_impl!(BiLockAcquire<'_, ()>: Send); + #[cfg(feature = "bilock")] + assert_not_impl!(BiLockAcquire<'_, *const ()>: Send); + #[cfg(feature = "bilock")] + assert_impl!(BiLockAcquire<'_, ()>: Sync); + #[cfg(feature = "bilock")] + assert_not_impl!(BiLockAcquire<'_, *const ()>: Sync); + #[cfg(feature = "bilock")] + assert_impl!(BiLockAcquire<'_, PhantomPinned>: Unpin); + + #[cfg(feature = "bilock")] + assert_impl!(BiLockGuard<'_, ()>: Send); + #[cfg(feature = "bilock")] + assert_not_impl!(BiLockGuard<'_, *const ()>: Send); + #[cfg(feature = "bilock")] + assert_impl!(BiLockGuard<'_, ()>: Sync); + #[cfg(feature = "bilock")] + assert_not_impl!(BiLockGuard<'_, *const ()>: Sync); + #[cfg(feature = "bilock")] + assert_impl!(BiLockGuard<'_, PhantomPinned>: Unpin); + + assert_impl!(MappedMutexGuard<'_, (), ()>: Send); + assert_not_impl!(MappedMutexGuard<'_, (), *const ()>: Send); + assert_not_impl!(MappedMutexGuard<'_, *const (), ()>: Send); + assert_impl!(MappedMutexGuard<'_, (), ()>: Sync); + assert_not_impl!(MappedMutexGuard<'_, (), *const ()>: Sync); + assert_not_impl!(MappedMutexGuard<'_, *const (), ()>: Sync); + assert_impl!(MappedMutexGuard<'_, PhantomPinned, PhantomPinned>: Unpin); + + assert_impl!(Mutex<()>: Send); + assert_not_impl!(Mutex<*const ()>: Send); + assert_impl!(Mutex<()>: Sync); + assert_not_impl!(Mutex<*const ()>: Sync); + assert_impl!(Mutex<()>: Unpin); + assert_not_impl!(Mutex<PhantomPinned>: Unpin); + + assert_impl!(MutexGuard<'_, ()>: Send); + assert_not_impl!(MutexGuard<'_, *const ()>: Send); + assert_impl!(MutexGuard<'_, ()>: Sync); + assert_not_impl!(MutexGuard<'_, *const ()>: Sync); + assert_impl!(MutexGuard<'_, PhantomPinned>: Unpin); + + assert_impl!(MutexLockFuture<'_, ()>: Send); + assert_not_impl!(MutexLockFuture<'_, *const ()>: Send); + assert_impl!(MutexLockFuture<'_, *const ()>: Sync); + assert_impl!(MutexLockFuture<'_, PhantomPinned>: Unpin); + + #[cfg(feature = "bilock")] + assert_impl!(ReuniteError<()>: Send); + #[cfg(feature = "bilock")] + assert_not_impl!(ReuniteError<*const ()>: Send); + #[cfg(feature = "bilock")] + assert_impl!(ReuniteError<()>: Sync); + #[cfg(feature = "bilock")] + assert_not_impl!(ReuniteError<*const ()>: Sync); + #[cfg(feature = "bilock")] + assert_impl!(ReuniteError<PhantomPinned>: Unpin); +} + +/// Assert Send/Sync/Unpin for all public types in `futures::sink`. +pub mod sink { + use super::*; + use futures::sink::{self, *}; + use std::marker::Send; + + assert_impl!(Buffer<(), ()>: Send); + assert_not_impl!(Buffer<(), *const ()>: Send); + assert_not_impl!(Buffer<*const (), ()>: Send); + assert_impl!(Buffer<(), ()>: Sync); + assert_not_impl!(Buffer<(), *const ()>: Sync); + assert_not_impl!(Buffer<*const (), ()>: Sync); + assert_impl!(Buffer<(), PhantomPinned>: Unpin); + assert_not_impl!(Buffer<PhantomPinned, ()>: Unpin); + + assert_impl!(Close<'_, (), *const ()>: Send); + assert_not_impl!(Close<'_, *const (), ()>: Send); + assert_impl!(Close<'_, (), *const ()>: Sync); + assert_not_impl!(Close<'_, *const (), ()>: Sync); + assert_impl!(Close<'_, (), PhantomPinned>: Unpin); + assert_not_impl!(Close<'_, PhantomPinned, ()>: Unpin); + + assert_impl!(Drain<()>: Send); + assert_not_impl!(Drain<*const ()>: Send); + assert_impl!(Drain<()>: Sync); + assert_not_impl!(Drain<*const ()>: Sync); + assert_impl!(Drain<PhantomPinned>: Unpin); + + assert_impl!(Fanout<(), ()>: Send); + assert_not_impl!(Fanout<(), *const ()>: Send); + assert_not_impl!(Fanout<*const (), ()>: Send); + assert_impl!(Fanout<(), ()>: Sync); + assert_not_impl!(Fanout<(), *const ()>: Sync); + assert_not_impl!(Fanout<*const (), ()>: Sync); + assert_impl!(Fanout<(), ()>: Unpin); + assert_not_impl!(Fanout<(), PhantomPinned>: Unpin); + assert_not_impl!(Fanout<PhantomPinned, ()>: Unpin); + + assert_impl!(Feed<'_, (), ()>: Send); + assert_not_impl!(Feed<'_, (), *const ()>: Send); + assert_not_impl!(Feed<'_, *const (), ()>: Send); + assert_impl!(Feed<'_, (), ()>: Sync); + assert_not_impl!(Feed<'_, (), *const ()>: Sync); + assert_not_impl!(Feed<'_, *const (), ()>: Sync); + assert_impl!(Feed<'_, (), PhantomPinned>: Unpin); + assert_not_impl!(Feed<'_, PhantomPinned, ()>: Unpin); + + assert_impl!(Flush<'_, (), *const ()>: Send); + assert_not_impl!(Flush<'_, *const (), ()>: Send); + assert_impl!(Flush<'_, (), *const ()>: Sync); + assert_not_impl!(Flush<'_, *const (), ()>: Sync); + assert_impl!(Flush<'_, (), PhantomPinned>: Unpin); + assert_not_impl!(Flush<'_, PhantomPinned, ()>: Unpin); + + assert_impl!(sink::Send<'_, (), ()>: Send); + assert_not_impl!(sink::Send<'_, (), *const ()>: Send); + assert_not_impl!(sink::Send<'_, *const (), ()>: Send); + assert_impl!(sink::Send<'_, (), ()>: Sync); + assert_not_impl!(sink::Send<'_, (), *const ()>: Sync); + assert_not_impl!(sink::Send<'_, *const (), ()>: Sync); + assert_impl!(sink::Send<'_, (), PhantomPinned>: Unpin); + assert_not_impl!(sink::Send<'_, PhantomPinned, ()>: Unpin); + + assert_impl!(SendAll<'_, (), SendTryStream<()>>: Send); + assert_not_impl!(SendAll<'_, (), SendTryStream>: Send); + assert_not_impl!(SendAll<'_, (), LocalTryStream>: Send); + assert_not_impl!(SendAll<'_, *const (), SendTryStream<()>>: Send); + assert_impl!(SendAll<'_, (), SyncTryStream<()>>: Sync); + assert_not_impl!(SendAll<'_, (), SyncTryStream>: Sync); + assert_not_impl!(SendAll<'_, (), LocalTryStream>: Sync); + assert_not_impl!(SendAll<'_, *const (), SyncTryStream<()>>: Sync); + assert_impl!(SendAll<'_, (), UnpinTryStream>: Unpin); + assert_not_impl!(SendAll<'_, PhantomPinned, UnpinTryStream>: Unpin); + assert_not_impl!(SendAll<'_, (), PinnedTryStream>: Unpin); + + assert_impl!(SinkErrInto<SendSink, *const (), *const ()>: Send); + assert_not_impl!(SinkErrInto<LocalSink<()>, (), ()>: Send); + assert_impl!(SinkErrInto<SyncSink, *const (), *const ()>: Sync); + assert_not_impl!(SinkErrInto<LocalSink<()>, (), ()>: Sync); + assert_impl!(SinkErrInto<UnpinSink, PhantomPinned, PhantomPinned>: Unpin); + assert_not_impl!(SinkErrInto<PinnedSink<()>, (), ()>: Unpin); + + assert_impl!(SinkMapErr<SendSink, ()>: Send); + assert_not_impl!(SinkMapErr<SendSink, *const ()>: Send); + assert_not_impl!(SinkMapErr<LocalSink<()>, ()>: Send); + assert_impl!(SinkMapErr<SyncSink, ()>: Sync); + assert_not_impl!(SinkMapErr<SyncSink, *const ()>: Sync); + assert_not_impl!(SinkMapErr<LocalSink<()>, ()>: Sync); + assert_impl!(SinkMapErr<UnpinSink, PhantomPinned>: Unpin); + assert_not_impl!(SinkMapErr<PinnedSink<()>, ()>: Unpin); + + assert_impl!(Unfold<(), (), ()>: Send); + assert_not_impl!(Unfold<*const (), (), ()>: Send); + assert_not_impl!(Unfold<(), *const (), ()>: Send); + assert_not_impl!(Unfold<(), (), *const ()>: Send); + assert_impl!(Unfold<(), (), ()>: Sync); + assert_not_impl!(Unfold<*const (), (), ()>: Sync); + assert_not_impl!(Unfold<(), *const (), ()>: Sync); + assert_not_impl!(Unfold<(), (), *const ()>: Sync); + assert_impl!(Unfold<PhantomPinned, PhantomPinned, ()>: Unpin); + assert_not_impl!(Unfold<PinnedSink<()>, (), PhantomPinned>: Unpin); + + assert_impl!(With<(), *const (), *const (), (), ()>: Send); + assert_not_impl!(With<*const (), (), (), (), ()>: Send); + assert_not_impl!(With<(), (), (), *const (), ()>: Send); + assert_not_impl!(With<(), (), (), (), *const ()>: Send); + assert_impl!(With<(), *const (), *const (), (), ()>: Sync); + assert_not_impl!(With<*const (), (), (), (), ()>: Sync); + assert_not_impl!(With<(), (), (), *const (), ()>: Sync); + assert_not_impl!(With<(), (), (), (), *const ()>: Sync); + assert_impl!(With<(), PhantomPinned, PhantomPinned, (), PhantomPinned>: Unpin); + assert_not_impl!(With<PhantomPinned, (), (), (), ()>: Unpin); + assert_not_impl!(With<(), (), (), PhantomPinned, ()>: Unpin); + + assert_impl!(WithFlatMap<(), (), *const (), (), ()>: Send); + assert_not_impl!(WithFlatMap<*const (), (), (), (), ()>: Send); + assert_not_impl!(WithFlatMap<(), *const (), (), (), ()>: Send); + assert_not_impl!(WithFlatMap<(), (), (), *const (), ()>: Send); + assert_not_impl!(WithFlatMap<(), (), (), (), *const ()>: Send); + assert_impl!(WithFlatMap<(), (), *const (), (), ()>: Sync); + assert_not_impl!(WithFlatMap<*const (), (), (), (), ()>: Sync); + assert_not_impl!(WithFlatMap<(), *const (), (), (), ()>: Sync); + assert_not_impl!(WithFlatMap<(), (), (), *const (), ()>: Sync); + assert_not_impl!(WithFlatMap<(), (), (), (), *const ()>: Sync); + assert_impl!(WithFlatMap<(), PhantomPinned, PhantomPinned, (), PhantomPinned>: Unpin); + assert_not_impl!(WithFlatMap<PhantomPinned, (), (), (), ()>: Unpin); + assert_not_impl!(WithFlatMap<(), (), (), PhantomPinned, ()>: Unpin); +} + +/// Assert Send/Sync/Unpin for all public types in `futures::stream`. +pub mod stream { + use super::*; + use futures::{io, stream::*}; + + assert_impl!(AndThen<(), (), ()>: Send); + assert_not_impl!(AndThen<*const (), (), ()>: Send); + assert_not_impl!(AndThen<(), *const (), ()>: Send); + assert_not_impl!(AndThen<(), (), *const ()>: Send); + assert_impl!(AndThen<(), (), ()>: Sync); + assert_not_impl!(AndThen<*const (), (), ()>: Sync); + assert_not_impl!(AndThen<(), *const (), ()>: Sync); + assert_not_impl!(AndThen<(), (), *const ()>: Sync); + assert_impl!(AndThen<(), (), PhantomPinned>: Unpin); + assert_not_impl!(AndThen<PhantomPinned, (), ()>: Unpin); + assert_not_impl!(AndThen<(), PhantomPinned, ()>: Unpin); + + assert_impl!(BufferUnordered<SendStream<()>>: Send); + assert_not_impl!(BufferUnordered<SendStream>: Send); + assert_not_impl!(BufferUnordered<LocalStream>: Send); + assert_impl!(BufferUnordered<SyncStream<()>>: Sync); + assert_not_impl!(BufferUnordered<SyncStream>: Sync); + assert_not_impl!(BufferUnordered<LocalStream>: Sync); + assert_impl!(BufferUnordered<UnpinStream>: Unpin); + assert_not_impl!(BufferUnordered<PinnedStream>: Unpin); + + assert_impl!(Buffered<SendStream<SendFuture<()>>>: Send); + assert_not_impl!(Buffered<SendStream<SendFuture>>: Send); + assert_not_impl!(Buffered<SendStream<LocalFuture>>: Send); + assert_not_impl!(Buffered<LocalStream<SendFuture<()>>>: Send); + assert_impl!(Buffered<SyncStream<SyncFuture<()>>>: Sync); + assert_not_impl!(Buffered<SyncStream<SyncFuture>>: Sync); + assert_not_impl!(Buffered<SyncStream<LocalFuture>>: Sync); + assert_not_impl!(Buffered<LocalStream<SyncFuture<()>>>: Sync); + assert_impl!(Buffered<UnpinStream<PinnedFuture>>: Unpin); + assert_not_impl!(Buffered<PinnedStream<PinnedFuture>>: Unpin); + + assert_impl!(CatchUnwind<SendStream>: Send); + assert_not_impl!(CatchUnwind<LocalStream>: Send); + assert_impl!(CatchUnwind<SyncStream>: Sync); + assert_not_impl!(CatchUnwind<LocalStream>: Sync); + assert_impl!(CatchUnwind<UnpinStream>: Unpin); + assert_not_impl!(CatchUnwind<PinnedStream>: Unpin); + + assert_impl!(Chain<(), ()>: Send); + assert_not_impl!(Chain<(), *const ()>: Send); + assert_not_impl!(Chain<*const (), ()>: Send); + assert_impl!(Chain<(), ()>: Sync); + assert_not_impl!(Chain<(), *const ()>: Sync); + assert_not_impl!(Chain<*const (), ()>: Sync); + assert_impl!(Chain<(), ()>: Unpin); + assert_not_impl!(Chain<(), PhantomPinned>: Unpin); + assert_not_impl!(Chain<PhantomPinned, ()>: Unpin); + + assert_impl!(Chunks<SendStream<()>>: Send); + assert_not_impl!(Chunks<SendStream>: Send); + assert_not_impl!(Chunks<LocalStream>: Send); + assert_impl!(Chunks<SyncStream<()>>: Sync); + assert_not_impl!(Chunks<SyncStream>: Sync); + assert_not_impl!(Chunks<LocalStream>: Sync); + assert_impl!(Chunks<UnpinStream>: Unpin); + assert_not_impl!(Chunks<PinnedStream>: Unpin); + + assert_impl!(Collect<(), ()>: Send); + assert_not_impl!(Collect<*const (), ()>: Send); + assert_not_impl!(Collect<(), *const ()>: Send); + assert_impl!(Collect<(), ()>: Sync); + assert_not_impl!(Collect<*const (), ()>: Sync); + assert_not_impl!(Collect<(), *const ()>: Sync); + assert_impl!(Collect<(), PhantomPinned>: Unpin); + assert_not_impl!(Collect<PhantomPinned, ()>: Unpin); + + assert_impl!(Concat<SendStream<()>>: Send); + assert_not_impl!(Concat<SendStream>: Send); + assert_not_impl!(Concat<LocalStream>: Send); + assert_impl!(Concat<SyncStream<()>>: Sync); + assert_not_impl!(Concat<SyncStream>: Sync); + assert_not_impl!(Concat<LocalStream>: Sync); + assert_impl!(Concat<UnpinStream>: Unpin); + assert_not_impl!(Concat<PinnedStream>: Unpin); + + assert_impl!(Cycle<()>: Send); + assert_not_impl!(Cycle<*const ()>: Send); + assert_impl!(Cycle<()>: Sync); + assert_not_impl!(Cycle<*const ()>: Sync); + assert_impl!(Cycle<()>: Unpin); + assert_not_impl!(Cycle<PhantomPinned>: Unpin); + + assert_impl!(Empty<()>: Send); + assert_not_impl!(Empty<*const ()>: Send); + assert_impl!(Empty<()>: Sync); + assert_not_impl!(Empty<*const ()>: Sync); + assert_impl!(Empty<PhantomPinned>: Unpin); + + assert_impl!(Enumerate<()>: Send); + assert_not_impl!(Enumerate<*const ()>: Send); + assert_impl!(Enumerate<()>: Sync); + assert_not_impl!(Enumerate<*const ()>: Sync); + assert_impl!(Enumerate<()>: Unpin); + assert_not_impl!(Enumerate<PhantomPinned>: Unpin); + + assert_impl!(ErrInto<(), *const ()>: Send); + assert_not_impl!(ErrInto<*const (), ()>: Send); + assert_impl!(ErrInto<(), *const ()>: Sync); + assert_not_impl!(ErrInto<*const (), ()>: Sync); + assert_impl!(ErrInto<(), PhantomPinned>: Unpin); + assert_not_impl!(ErrInto<PhantomPinned, ()>: Unpin); + + assert_impl!(Filter<SendStream<()>, (), ()>: Send); + assert_not_impl!(Filter<LocalStream<()>, (), ()>: Send); + assert_not_impl!(Filter<SendStream, (), ()>: Send); + assert_not_impl!(Filter<SendStream<()>, *const (), ()>: Send); + assert_not_impl!(Filter<SendStream<()>, (), *const ()>: Send); + assert_impl!(Filter<SyncStream<()>, (), ()>: Sync); + assert_not_impl!(Filter<LocalStream<()>, (), ()>: Sync); + assert_not_impl!(Filter<SyncStream, (), ()>: Sync); + assert_not_impl!(Filter<SyncStream<()>, *const (), ()>: Sync); + assert_not_impl!(Filter<SyncStream<()>, (), *const ()>: Sync); + assert_impl!(Filter<UnpinStream, (), PhantomPinned>: Unpin); + assert_not_impl!(Filter<PinnedStream, (), ()>: Unpin); + assert_not_impl!(Filter<UnpinStream, PhantomPinned, ()>: Unpin); + + assert_impl!(FilterMap<(), (), ()>: Send); + assert_not_impl!(FilterMap<*const (), (), ()>: Send); + assert_not_impl!(FilterMap<(), *const (), ()>: Send); + assert_not_impl!(FilterMap<(), (), *const ()>: Send); + assert_impl!(FilterMap<(), (), ()>: Sync); + assert_not_impl!(FilterMap<*const (), (), ()>: Sync); + assert_not_impl!(FilterMap<(), *const (), ()>: Sync); + assert_not_impl!(FilterMap<(), (), *const ()>: Sync); + assert_impl!(FilterMap<(), (), PhantomPinned>: Unpin); + assert_not_impl!(FilterMap<PhantomPinned, (), ()>: Unpin); + assert_not_impl!(FilterMap<(), PhantomPinned, ()>: Unpin); + + assert_impl!(FlatMap<(), (), ()>: Send); + assert_not_impl!(FlatMap<*const (), (), ()>: Send); + assert_not_impl!(FlatMap<(), *const (), ()>: Send); + assert_not_impl!(FlatMap<(), (), *const ()>: Send); + assert_impl!(FlatMap<(), (), ()>: Sync); + assert_not_impl!(FlatMap<*const (), (), ()>: Sync); + assert_not_impl!(FlatMap<(), *const (), ()>: Sync); + assert_not_impl!(FlatMap<(), (), *const ()>: Sync); + assert_impl!(FlatMap<(), (), PhantomPinned>: Unpin); + assert_not_impl!(FlatMap<PhantomPinned, (), ()>: Unpin); + assert_not_impl!(FlatMap<(), PhantomPinned, ()>: Unpin); + + assert_impl!(Flatten<SendStream<()>>: Send); + assert_not_impl!(Flatten<SendStream>: Send); + assert_not_impl!(Flatten<SendStream>: Send); + assert_impl!(Flatten<SyncStream<()>>: Sync); + assert_not_impl!(Flatten<LocalStream<()>>: Sync); + assert_not_impl!(Flatten<LocalStream<()>>: Sync); + assert_impl!(Flatten<UnpinStream<()>>: Unpin); + assert_not_impl!(Flatten<UnpinStream>: Unpin); + assert_not_impl!(Flatten<PinnedStream>: Unpin); + + assert_impl!(Fold<(), (), (), ()>: Send); + assert_not_impl!(Fold<*const (), (), (), ()>: Send); + assert_not_impl!(Fold<(), *const (), (), ()>: Send); + assert_not_impl!(Fold<(), (), *const (), ()>: Send); + assert_not_impl!(Fold<(), (), (), *const ()>: Send); + assert_impl!(Fold<(), (), (), ()>: Sync); + assert_not_impl!(Fold<*const (), (), (), ()>: Sync); + assert_not_impl!(Fold<(), *const (), (), ()>: Sync); + assert_not_impl!(Fold<(), (), *const (), ()>: Sync); + assert_not_impl!(Fold<(), (), (), *const ()>: Sync); + assert_impl!(Fold<(), (), PhantomPinned, PhantomPinned>: Unpin); + assert_not_impl!(Fold<PhantomPinned, (), (), ()>: Unpin); + assert_not_impl!(Fold<(), PhantomPinned, (), ()>: Unpin); + + assert_impl!(ForEach<(), (), ()>: Send); + assert_not_impl!(ForEach<*const (), (), ()>: Send); + assert_not_impl!(ForEach<(), *const (), ()>: Send); + assert_not_impl!(ForEach<(), (), *const ()>: Send); + assert_impl!(ForEach<(), (), ()>: Sync); + assert_not_impl!(ForEach<*const (), (), ()>: Sync); + assert_not_impl!(ForEach<(), *const (), ()>: Sync); + assert_not_impl!(ForEach<(), (), *const ()>: Sync); + assert_impl!(ForEach<(), (), PhantomPinned>: Unpin); + assert_not_impl!(ForEach<PhantomPinned, (), ()>: Unpin); + assert_not_impl!(ForEach<(), PhantomPinned, ()>: Unpin); + + assert_impl!(ForEachConcurrent<(), (), ()>: Send); + assert_not_impl!(ForEachConcurrent<*const (), (), ()>: Send); + assert_not_impl!(ForEachConcurrent<(), *const (), ()>: Send); + assert_not_impl!(ForEachConcurrent<(), (), *const ()>: Send); + assert_impl!(ForEachConcurrent<(), (), ()>: Sync); + assert_not_impl!(ForEachConcurrent<*const (), (), ()>: Sync); + assert_not_impl!(ForEachConcurrent<(), *const (), ()>: Sync); + assert_not_impl!(ForEachConcurrent<(), (), *const ()>: Sync); + assert_impl!(ForEachConcurrent<(), PhantomPinned, PhantomPinned>: Unpin); + assert_not_impl!(ForEachConcurrent<PhantomPinned, (), ()>: Unpin); + + assert_impl!(Forward<SendTryStream<()>, ()>: Send); + assert_not_impl!(Forward<SendTryStream, ()>: Send); + assert_not_impl!(Forward<SendTryStream<()>, *const ()>: Send); + assert_not_impl!(Forward<LocalTryStream, ()>: Send); + assert_impl!(Forward<SyncTryStream<()>, ()>: Sync); + assert_not_impl!(Forward<SyncTryStream, ()>: Sync); + assert_not_impl!(Forward<SyncTryStream<()>, *const ()>: Sync); + assert_not_impl!(Forward<LocalTryStream, ()>: Sync); + assert_impl!(Forward<UnpinTryStream, ()>: Unpin); + assert_not_impl!(Forward<UnpinTryStream, PhantomPinned>: Unpin); + assert_not_impl!(Forward<PinnedTryStream, ()>: Unpin); + + assert_impl!(Fuse<()>: Send); + assert_not_impl!(Fuse<*const ()>: Send); + assert_impl!(Fuse<()>: Sync); + assert_not_impl!(Fuse<*const ()>: Sync); + assert_impl!(Fuse<()>: Unpin); + assert_not_impl!(Fuse<PhantomPinned>: Unpin); + + assert_impl!(FuturesOrdered<SendFuture<()>>: Send); + assert_not_impl!(FuturesOrdered<SendFuture>: Send); + assert_not_impl!(FuturesOrdered<SendFuture>: Send); + assert_impl!(FuturesOrdered<SyncFuture<()>>: Sync); + assert_not_impl!(FuturesOrdered<LocalFuture<()>>: Sync); + assert_not_impl!(FuturesOrdered<LocalFuture<()>>: Sync); + assert_impl!(FuturesOrdered<PinnedFuture>: Unpin); + + assert_impl!(FuturesUnordered<()>: Send); + assert_not_impl!(FuturesUnordered<*const ()>: Send); + assert_impl!(FuturesUnordered<()>: Sync); + assert_not_impl!(FuturesUnordered<*const ()>: Sync); + assert_impl!(FuturesUnordered<PhantomPinned>: Unpin); + + assert_impl!(Inspect<(), ()>: Send); + assert_not_impl!(Inspect<*const (), ()>: Send); + assert_not_impl!(Inspect<(), *const ()>: Send); + assert_impl!(Inspect<(), ()>: Sync); + assert_not_impl!(Inspect<*const (), ()>: Sync); + assert_not_impl!(Inspect<(), *const ()>: Sync); + assert_impl!(Inspect<(), PhantomPinned>: Unpin); + assert_not_impl!(Inspect<PhantomPinned, ()>: Unpin); + + assert_impl!(InspectErr<(), ()>: Send); + assert_not_impl!(InspectErr<*const (), ()>: Send); + assert_not_impl!(InspectErr<(), *const ()>: Send); + assert_impl!(InspectErr<(), ()>: Sync); + assert_not_impl!(InspectErr<*const (), ()>: Sync); + assert_not_impl!(InspectErr<(), *const ()>: Sync); + assert_impl!(InspectErr<(), PhantomPinned>: Unpin); + assert_not_impl!(InspectErr<PhantomPinned, ()>: Unpin); + + assert_impl!(InspectOk<(), ()>: Send); + assert_not_impl!(InspectOk<*const (), ()>: Send); + assert_not_impl!(InspectOk<(), *const ()>: Send); + assert_impl!(InspectOk<(), ()>: Sync); + assert_not_impl!(InspectOk<*const (), ()>: Sync); + assert_not_impl!(InspectOk<(), *const ()>: Sync); + assert_impl!(InspectOk<(), PhantomPinned>: Unpin); + assert_not_impl!(InspectOk<PhantomPinned, ()>: Unpin); + + assert_impl!(IntoAsyncRead<SendTryStream<Vec<u8>, io::Error>>: Send); + assert_not_impl!(IntoAsyncRead<LocalTryStream<Vec<u8>, io::Error>>: Send); + assert_impl!(IntoAsyncRead<SyncTryStream<Vec<u8>, io::Error>>: Sync); + assert_not_impl!(IntoAsyncRead<LocalTryStream<Vec<u8>, io::Error>>: Sync); + assert_impl!(IntoAsyncRead<UnpinTryStream<Vec<u8>, io::Error>>: Unpin); + // IntoAsyncRead requires `St: Unpin` + // assert_not_impl!(IntoAsyncRead<PinnedTryStream<Vec<u8>, io::Error>>: Unpin); + + assert_impl!(IntoStream<()>: Send); + assert_not_impl!(IntoStream<*const ()>: Send); + assert_impl!(IntoStream<()>: Sync); + assert_not_impl!(IntoStream<*const ()>: Sync); + assert_impl!(IntoStream<()>: Unpin); + assert_not_impl!(IntoStream<PhantomPinned>: Unpin); + + assert_impl!(Iter<()>: Send); + assert_not_impl!(Iter<*const ()>: Send); + assert_impl!(Iter<()>: Sync); + assert_not_impl!(Iter<*const ()>: Sync); + assert_impl!(Iter<PhantomPinned>: Unpin); + + assert_impl!(Map<(), ()>: Send); + assert_not_impl!(Map<*const (), ()>: Send); + assert_not_impl!(Map<(), *const ()>: Send); + assert_impl!(Map<(), ()>: Sync); + assert_not_impl!(Map<*const (), ()>: Sync); + assert_not_impl!(Map<(), *const ()>: Sync); + assert_impl!(Map<(), PhantomPinned>: Unpin); + assert_not_impl!(Map<PhantomPinned, ()>: Unpin); + + assert_impl!(MapErr<(), ()>: Send); + assert_not_impl!(MapErr<*const (), ()>: Send); + assert_not_impl!(MapErr<(), *const ()>: Send); + assert_impl!(MapErr<(), ()>: Sync); + assert_not_impl!(MapErr<*const (), ()>: Sync); + assert_not_impl!(MapErr<(), *const ()>: Sync); + assert_impl!(MapErr<(), PhantomPinned>: Unpin); + assert_not_impl!(MapErr<PhantomPinned, ()>: Unpin); + + assert_impl!(MapOk<(), ()>: Send); + assert_not_impl!(MapOk<*const (), ()>: Send); + assert_not_impl!(MapOk<(), *const ()>: Send); + assert_impl!(MapOk<(), ()>: Sync); + assert_not_impl!(MapOk<*const (), ()>: Sync); + assert_not_impl!(MapOk<(), *const ()>: Sync); + assert_impl!(MapOk<(), PhantomPinned>: Unpin); + assert_not_impl!(MapOk<PhantomPinned, ()>: Unpin); + + assert_impl!(Next<'_, ()>: Send); + assert_not_impl!(Next<'_, *const ()>: Send); + assert_impl!(Next<'_, ()>: Sync); + assert_not_impl!(Next<'_, *const ()>: Sync); + assert_impl!(Next<'_, ()>: Unpin); + assert_not_impl!(Next<'_, PhantomPinned>: Unpin); + + assert_impl!(NextIf<'_, SendStream<()>, ()>: Send); + assert_not_impl!(NextIf<'_, SendStream<()>, *const ()>: Send); + assert_not_impl!(NextIf<'_, SendStream, ()>: Send); + assert_not_impl!(NextIf<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIf<'_, SyncStream<()>, ()>: Sync); + assert_not_impl!(NextIf<'_, SyncStream<()>, *const ()>: Sync); + assert_not_impl!(NextIf<'_, SyncStream, ()>: Sync); + assert_not_impl!(NextIf<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIf<'_, PinnedStream, PhantomPinned>: Unpin); + + assert_impl!(NextIfEq<'_, SendStream<()>, ()>: Send); + assert_not_impl!(NextIfEq<'_, SendStream<()>, *const ()>: Send); + assert_not_impl!(NextIfEq<'_, SendStream, ()>: Send); + assert_not_impl!(NextIfEq<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIfEq<'_, SyncStream<()>, ()>: Sync); + assert_not_impl!(NextIfEq<'_, SyncStream<()>, *const ()>: Sync); + assert_not_impl!(NextIfEq<'_, SyncStream, ()>: Sync); + assert_not_impl!(NextIfEq<'_, LocalStream<()>, ()>: Send); + assert_impl!(NextIfEq<'_, PinnedStream, PhantomPinned>: Unpin); + + assert_impl!(Once<()>: Send); + assert_not_impl!(Once<*const ()>: Send); + assert_impl!(Once<()>: Sync); + assert_not_impl!(Once<*const ()>: Sync); + assert_impl!(Once<()>: Unpin); + assert_not_impl!(Once<PhantomPinned>: Unpin); + + assert_impl!(OrElse<(), (), ()>: Send); + assert_not_impl!(OrElse<*const (), (), ()>: Send); + assert_not_impl!(OrElse<(), *const (), ()>: Send); + assert_not_impl!(OrElse<(), (), *const ()>: Send); + assert_impl!(OrElse<(), (), ()>: Sync); + assert_not_impl!(OrElse<*const (), (), ()>: Sync); + assert_not_impl!(OrElse<(), *const (), ()>: Sync); + assert_not_impl!(OrElse<(), (), *const ()>: Sync); + assert_impl!(OrElse<(), (), PhantomPinned>: Unpin); + assert_not_impl!(OrElse<PhantomPinned, (), ()>: Unpin); + assert_not_impl!(OrElse<(), PhantomPinned, ()>: Unpin); + + assert_impl!(Peek<'_, SendStream<()>>: Send); + assert_not_impl!(Peek<'_, SendStream>: Send); + assert_not_impl!(Peek<'_, LocalStream<()>>: Send); + assert_impl!(Peek<'_, SyncStream<()>>: Sync); + assert_not_impl!(Peek<'_, SyncStream>: Sync); + assert_not_impl!(Peek<'_, LocalStream<()>>: Sync); + assert_impl!(Peek<'_, PinnedStream>: Unpin); + + assert_impl!(PeekMut<'_, SendStream<()>>: Send); + assert_not_impl!(PeekMut<'_, SendStream>: Send); + assert_not_impl!(PeekMut<'_, LocalStream<()>>: Send); + assert_impl!(PeekMut<'_, SyncStream<()>>: Sync); + assert_not_impl!(PeekMut<'_, SyncStream>: Sync); + assert_not_impl!(PeekMut<'_, LocalStream<()>>: Sync); + assert_impl!(PeekMut<'_, PinnedStream>: Unpin); + + assert_impl!(Peekable<SendStream<()>>: Send); + assert_not_impl!(Peekable<SendStream>: Send); + assert_not_impl!(Peekable<LocalStream>: Send); + assert_impl!(Peekable<SyncStream<()>>: Sync); + assert_not_impl!(Peekable<SyncStream>: Sync); + assert_not_impl!(Peekable<LocalStream>: Sync); + assert_impl!(Peekable<UnpinStream>: Unpin); + assert_not_impl!(Peekable<PinnedStream>: Unpin); + + assert_impl!(Pending<()>: Send); + assert_not_impl!(Pending<*const ()>: Send); + assert_impl!(Pending<()>: Sync); + assert_not_impl!(Pending<*const ()>: Sync); + assert_impl!(Pending<PhantomPinned>: Unpin); + + assert_impl!(PollFn<()>: Send); + assert_not_impl!(PollFn<*const ()>: Send); + assert_impl!(PollFn<()>: Sync); + assert_not_impl!(PollFn<*const ()>: Sync); + assert_impl!(PollFn<PhantomPinned>: Unpin); + + assert_impl!(PollImmediate<SendStream>: Send); + assert_not_impl!(PollImmediate<LocalStream<()>>: Send); + assert_impl!(PollImmediate<SyncStream>: Sync); + assert_not_impl!(PollImmediate<LocalStream<()>>: Sync); + assert_impl!(PollImmediate<UnpinStream>: Unpin); + assert_not_impl!(PollImmediate<PinnedStream>: Unpin); + + assert_impl!(ReadyChunks<SendStream<()>>: Send); + assert_impl!(ReadyChunks<SendStream>: Send); + assert_not_impl!(ReadyChunks<LocalStream>: Send); + assert_impl!(ReadyChunks<SyncStream<()>>: Sync); + assert_impl!(ReadyChunks<SyncStream>: Sync); + assert_not_impl!(ReadyChunks<LocalStream>: Sync); + assert_impl!(ReadyChunks<UnpinStream>: Unpin); + assert_not_impl!(ReadyChunks<PinnedStream>: Unpin); + + assert_impl!(Repeat<()>: Send); + assert_not_impl!(Repeat<*const ()>: Send); + assert_impl!(Repeat<()>: Sync); + assert_not_impl!(Repeat<*const ()>: Sync); + assert_impl!(Repeat<PhantomPinned>: Unpin); + + assert_impl!(RepeatWith<()>: Send); + assert_not_impl!(RepeatWith<*const ()>: Send); + assert_impl!(RepeatWith<()>: Sync); + assert_not_impl!(RepeatWith<*const ()>: Sync); + // RepeatWith requires `F: FnMut() -> A` + assert_impl!(RepeatWith<fn() -> ()>: Unpin); + // assert_impl!(RepeatWith<PhantomPinned>: Unpin); + + assert_impl!(ReuniteError<(), ()>: Send); + assert_not_impl!(ReuniteError<*const (), ()>: Send); + assert_not_impl!(ReuniteError<(), *const ()>: Send); + assert_impl!(ReuniteError<(), ()>: Sync); + assert_not_impl!(ReuniteError<*const (), ()>: Sync); + assert_not_impl!(ReuniteError<(), *const ()>: Sync); + assert_impl!(ReuniteError<PhantomPinned, PhantomPinned>: Unpin); + + assert_impl!(Scan<SendStream, (), (), ()>: Send); + assert_not_impl!(Scan<LocalStream<()>, (), (), ()>: Send); + assert_not_impl!(Scan<SendStream<()>, *const (), (), ()>: Send); + assert_not_impl!(Scan<SendStream<()>, (), *const (), ()>: Send); + assert_not_impl!(Scan<SendStream<()>, (), (), *const ()>: Send); + assert_impl!(Scan<SyncStream, (), (), ()>: Sync); + assert_not_impl!(Scan<LocalStream<()>, (), (), ()>: Sync); + assert_not_impl!(Scan<SyncStream<()>, *const (), (), ()>: Sync); + assert_not_impl!(Scan<SyncStream<()>, (), *const (), ()>: Sync); + assert_not_impl!(Scan<SyncStream<()>, (), (), *const ()>: Sync); + assert_impl!(Scan<UnpinStream, PhantomPinned, (), PhantomPinned>: Unpin); + assert_not_impl!(Scan<PinnedStream, (), (), ()>: Unpin); + assert_not_impl!(Scan<UnpinStream, (), PhantomPinned, ()>: Unpin); + + assert_impl!(Select<(), ()>: Send); + assert_not_impl!(Select<*const (), ()>: Send); + assert_not_impl!(Select<(), *const ()>: Send); + assert_impl!(Select<(), ()>: Sync); + assert_not_impl!(Select<*const (), ()>: Sync); + assert_not_impl!(Select<(), *const ()>: Sync); + assert_impl!(Select<(), ()>: Unpin); + assert_not_impl!(Select<PhantomPinned, ()>: Unpin); + assert_not_impl!(Select<(), PhantomPinned>: Unpin); + + assert_impl!(SelectAll<()>: Send); + assert_not_impl!(SelectAll<*const ()>: Send); + assert_impl!(SelectAll<()>: Sync); + assert_not_impl!(SelectAll<*const ()>: Sync); + assert_impl!(SelectAll<PhantomPinned>: Unpin); + + assert_impl!(SelectNextSome<'_, ()>: Send); + assert_not_impl!(SelectNextSome<'_, *const ()>: Send); + assert_impl!(SelectNextSome<'_, ()>: Sync); + assert_not_impl!(SelectNextSome<'_, *const ()>: Sync); + assert_impl!(SelectNextSome<'_, PhantomPinned>: Unpin); + + assert_impl!(Skip<()>: Send); + assert_not_impl!(Skip<*const ()>: Send); + assert_impl!(Skip<()>: Sync); + assert_not_impl!(Skip<*const ()>: Sync); + assert_impl!(Skip<()>: Unpin); + assert_not_impl!(Skip<PhantomPinned>: Unpin); + + assert_impl!(SkipWhile<SendStream<()>, (), ()>: Send); + assert_not_impl!(SkipWhile<LocalStream<()>, (), ()>: Send); + assert_not_impl!(SkipWhile<SendStream, (), ()>: Send); + assert_not_impl!(SkipWhile<SendStream<()>, *const (), ()>: Send); + assert_not_impl!(SkipWhile<SendStream<()>, (), *const ()>: Send); + assert_impl!(SkipWhile<SyncStream<()>, (), ()>: Sync); + assert_not_impl!(SkipWhile<LocalStream<()>, (), ()>: Sync); + assert_not_impl!(SkipWhile<SyncStream, (), ()>: Sync); + assert_not_impl!(SkipWhile<SyncStream<()>, *const (), ()>: Sync); + assert_not_impl!(SkipWhile<SyncStream<()>, (), *const ()>: Sync); + assert_impl!(SkipWhile<UnpinStream, (), PhantomPinned>: Unpin); + assert_not_impl!(SkipWhile<PinnedStream, (), ()>: Unpin); + assert_not_impl!(SkipWhile<UnpinStream, PhantomPinned, ()>: Unpin); + + assert_impl!(SplitSink<(), ()>: Send); + assert_not_impl!(SplitSink<*const (), ()>: Send); + assert_not_impl!(SplitSink<(), *const ()>: Send); + assert_impl!(SplitSink<(), ()>: Sync); + assert_not_impl!(SplitSink<*const (), ()>: Sync); + assert_not_impl!(SplitSink<(), *const ()>: Sync); + assert_impl!(SplitSink<PhantomPinned, PhantomPinned>: Unpin); + + assert_impl!(SplitStream<()>: Send); + assert_not_impl!(SplitStream<*const ()>: Send); + assert_impl!(SplitStream<()>: Sync); + assert_not_impl!(SplitStream<*const ()>: Sync); + assert_impl!(SplitStream<PhantomPinned>: Unpin); + + assert_impl!(StreamFuture<()>: Send); + assert_not_impl!(StreamFuture<*const ()>: Send); + assert_impl!(StreamFuture<()>: Sync); + assert_not_impl!(StreamFuture<*const ()>: Sync); + assert_impl!(StreamFuture<()>: Unpin); + assert_not_impl!(StreamFuture<PhantomPinned>: Unpin); + + assert_impl!(Take<()>: Send); + assert_not_impl!(Take<*const ()>: Send); + assert_impl!(Take<()>: Sync); + assert_not_impl!(Take<*const ()>: Sync); + assert_impl!(Take<()>: Unpin); + assert_not_impl!(Take<PhantomPinned>: Unpin); + + assert_impl!(TakeUntil<SendStream, SendFuture<()>>: Send); + assert_not_impl!(TakeUntil<SendStream, SendFuture>: Send); + assert_not_impl!(TakeUntil<SendStream, LocalFuture<()>>: Send); + assert_not_impl!(TakeUntil<LocalStream, SendFuture<()>>: Send); + assert_impl!(TakeUntil<SyncStream, SyncFuture<()>>: Sync); + assert_not_impl!(TakeUntil<SyncStream, SyncFuture>: Sync); + assert_not_impl!(TakeUntil<SyncStream, LocalFuture<()>>: Sync); + assert_not_impl!(TakeUntil<LocalStream, SyncFuture<()>>: Sync); + assert_impl!(TakeUntil<UnpinStream, UnpinFuture>: Unpin); + assert_not_impl!(TakeUntil<PinnedStream, UnpinFuture>: Unpin); + assert_not_impl!(TakeUntil<UnpinStream, PinnedFuture>: Unpin); + + assert_impl!(TakeWhile<SendStream<()>, (), ()>: Send); + assert_not_impl!(TakeWhile<LocalStream<()>, (), ()>: Send); + assert_not_impl!(TakeWhile<SendStream, (), ()>: Send); + assert_not_impl!(TakeWhile<SendStream<()>, *const (), ()>: Send); + assert_not_impl!(TakeWhile<SendStream<()>, (), *const ()>: Send); + assert_impl!(TakeWhile<SyncStream<()>, (), ()>: Sync); + assert_not_impl!(TakeWhile<LocalStream<()>, (), ()>: Sync); + assert_not_impl!(TakeWhile<SyncStream, (), ()>: Sync); + assert_not_impl!(TakeWhile<SyncStream<()>, *const (), ()>: Sync); + assert_not_impl!(TakeWhile<SyncStream<()>, (), *const ()>: Sync); + assert_impl!(TakeWhile<UnpinStream, (), PhantomPinned>: Unpin); + assert_not_impl!(TakeWhile<PinnedStream, (), ()>: Unpin); + assert_not_impl!(TakeWhile<UnpinStream, PhantomPinned, ()>: Unpin); + + assert_impl!(Then<SendStream, (), ()>: Send); + assert_not_impl!(Then<LocalStream<()>, (), ()>: Send); + assert_not_impl!(Then<SendStream<()>, *const (), ()>: Send); + assert_not_impl!(Then<SendStream<()>, (), *const ()>: Send); + assert_impl!(Then<SyncStream, (), ()>: Sync); + assert_not_impl!(Then<LocalStream<()>, (), ()>: Sync); + assert_not_impl!(Then<SyncStream<()>, *const (), ()>: Sync); + assert_not_impl!(Then<SyncStream<()>, (), *const ()>: Sync); + assert_impl!(Then<UnpinStream, (), PhantomPinned>: Unpin); + assert_not_impl!(Then<PinnedStream, (), ()>: Unpin); + assert_not_impl!(Then<UnpinStream, PhantomPinned, ()>: Unpin); + + assert_impl!(TryBufferUnordered<SendTryStream<()>>: Send); + assert_not_impl!(TryBufferUnordered<SendTryStream>: Send); + assert_not_impl!(TryBufferUnordered<LocalTryStream>: Send); + assert_impl!(TryBufferUnordered<SyncTryStream<()>>: Sync); + assert_not_impl!(TryBufferUnordered<SyncTryStream>: Sync); + assert_not_impl!(TryBufferUnordered<LocalTryStream>: Sync); + assert_impl!(TryBufferUnordered<UnpinTryStream>: Unpin); + assert_not_impl!(TryBufferUnordered<PinnedTryStream>: Unpin); + + assert_impl!(TryBuffered<SendTryStream<SendTryFuture<(), ()>>>: Send); + assert_not_impl!(TryBuffered<SendTryStream<SendTryFuture<*const (), ()>>>: Send); + assert_not_impl!(TryBuffered<SendTryStream<SendTryFuture<(), *const ()>>>: Send); + assert_not_impl!(TryBuffered<SendTryStream<LocalTryFuture<(), ()>>>: Send); + assert_not_impl!(TryBuffered<LocalTryStream<SendTryFuture<(), ()>>>: Send); + assert_impl!(TryBuffered<SyncTryStream<SyncTryFuture<(), ()>>>: Sync); + assert_not_impl!(TryBuffered<SyncTryStream<SyncTryFuture<*const (), ()>>>: Sync); + assert_not_impl!(TryBuffered<SyncTryStream<SyncTryFuture<(), *const ()>>>: Sync); + assert_not_impl!(TryBuffered<SyncTryStream<LocalTryFuture<(), ()>>>: Sync); + assert_not_impl!(TryBuffered<LocalTryStream<SyncTryFuture<(), ()>>>: Sync); + assert_impl!(TryBuffered<UnpinTryStream<PinnedTryFuture>>: Unpin); + assert_not_impl!(TryBuffered<PinnedTryStream<UnpinTryFuture>>: Unpin); + + assert_impl!(TryCollect<(), ()>: Send); + assert_not_impl!(TryCollect<*const (), ()>: Send); + assert_not_impl!(TryCollect<(), *const ()>: Send); + assert_impl!(TryCollect<(), ()>: Sync); + assert_not_impl!(TryCollect<*const (), ()>: Sync); + assert_not_impl!(TryCollect<(), *const ()>: Sync); + assert_impl!(TryCollect<(), PhantomPinned>: Unpin); + assert_not_impl!(TryCollect<PhantomPinned, ()>: Unpin); + + assert_impl!(TryConcat<SendTryStream<()>>: Send); + assert_not_impl!(TryConcat<SendTryStream>: Send); + assert_not_impl!(TryConcat<LocalTryStream>: Send); + assert_impl!(TryConcat<SyncTryStream<()>>: Sync); + assert_not_impl!(TryConcat<SyncTryStream>: Sync); + assert_not_impl!(TryConcat<LocalTryStream>: Sync); + assert_impl!(TryConcat<UnpinTryStream>: Unpin); + assert_not_impl!(TryConcat<PinnedTryStream>: Unpin); + + assert_impl!(TryFilter<SendTryStream<()>, (), ()>: Send); + assert_not_impl!(TryFilter<LocalTryStream<()>, (), ()>: Send); + assert_not_impl!(TryFilter<SendTryStream, (), ()>: Send); + assert_not_impl!(TryFilter<SendTryStream<()>, *const (), ()>: Send); + assert_not_impl!(TryFilter<SendTryStream<()>, (), *const ()>: Send); + assert_impl!(TryFilter<SyncTryStream<()>, (), ()>: Sync); + assert_not_impl!(TryFilter<LocalTryStream<()>, (), ()>: Sync); + assert_not_impl!(TryFilter<SyncTryStream, (), ()>: Sync); + assert_not_impl!(TryFilter<SyncTryStream<()>, *const (), ()>: Sync); + assert_not_impl!(TryFilter<SyncTryStream<()>, (), *const ()>: Sync); + assert_impl!(TryFilter<UnpinTryStream, (), PhantomPinned>: Unpin); + assert_not_impl!(TryFilter<PinnedTryStream, (), ()>: Unpin); + assert_not_impl!(TryFilter<UnpinTryStream, PhantomPinned, ()>: Unpin); + + assert_impl!(TryFilterMap<(), (), ()>: Send); + assert_not_impl!(TryFilterMap<*const (), (), ()>: Send); + assert_not_impl!(TryFilterMap<(), *const (), ()>: Send); + assert_not_impl!(TryFilterMap<(), (), *const ()>: Send); + assert_impl!(TryFilterMap<(), (), ()>: Sync); + assert_not_impl!(TryFilterMap<*const (), (), ()>: Sync); + assert_not_impl!(TryFilterMap<(), *const (), ()>: Sync); + assert_not_impl!(TryFilterMap<(), (), *const ()>: Sync); + assert_impl!(TryFilterMap<(), (), PhantomPinned>: Unpin); + assert_not_impl!(TryFilterMap<PhantomPinned, (), ()>: Unpin); + assert_not_impl!(TryFilterMap<(), PhantomPinned, ()>: Unpin); + + assert_impl!(TryFlatten<SendTryStream<()>>: Send); + assert_not_impl!(TryFlatten<SendTryStream>: Send); + assert_not_impl!(TryFlatten<SendTryStream>: Send); + assert_impl!(TryFlatten<SyncTryStream<()>>: Sync); + assert_not_impl!(TryFlatten<LocalTryStream<()>>: Sync); + assert_not_impl!(TryFlatten<LocalTryStream<()>>: Sync); + assert_impl!(TryFlatten<UnpinTryStream<()>>: Unpin); + assert_not_impl!(TryFlatten<UnpinTryStream>: Unpin); + assert_not_impl!(TryFlatten<PinnedTryStream>: Unpin); + + assert_impl!(TryFold<(), (), (), ()>: Send); + assert_not_impl!(TryFold<*const (), (), (), ()>: Send); + assert_not_impl!(TryFold<(), *const (), (), ()>: Send); + assert_not_impl!(TryFold<(), (), *const (), ()>: Send); + assert_not_impl!(TryFold<(), (), (), *const ()>: Send); + assert_impl!(TryFold<(), (), (), ()>: Sync); + assert_not_impl!(TryFold<*const (), (), (), ()>: Sync); + assert_not_impl!(TryFold<(), *const (), (), ()>: Sync); + assert_not_impl!(TryFold<(), (), *const (), ()>: Sync); + assert_not_impl!(TryFold<(), (), (), *const ()>: Sync); + assert_impl!(TryFold<(), (), PhantomPinned, PhantomPinned>: Unpin); + assert_not_impl!(TryFold<PhantomPinned, (), (), ()>: Unpin); + assert_not_impl!(TryFold<(), PhantomPinned, (), ()>: Unpin); + + assert_impl!(TryForEach<(), (), ()>: Send); + assert_not_impl!(TryForEach<*const (), (), ()>: Send); + assert_not_impl!(TryForEach<(), *const (), ()>: Send); + assert_not_impl!(TryForEach<(), (), *const ()>: Send); + assert_impl!(TryForEach<(), (), ()>: Sync); + assert_not_impl!(TryForEach<*const (), (), ()>: Sync); + assert_not_impl!(TryForEach<(), *const (), ()>: Sync); + assert_not_impl!(TryForEach<(), (), *const ()>: Sync); + assert_impl!(TryForEach<(), (), PhantomPinned>: Unpin); + assert_not_impl!(TryForEach<PhantomPinned, (), ()>: Unpin); + assert_not_impl!(TryForEach<(), PhantomPinned, ()>: Unpin); + + assert_impl!(TryForEachConcurrent<(), (), ()>: Send); + assert_not_impl!(TryForEachConcurrent<*const (), (), ()>: Send); + assert_not_impl!(TryForEachConcurrent<(), *const (), ()>: Send); + assert_not_impl!(TryForEachConcurrent<(), (), *const ()>: Send); + assert_impl!(TryForEachConcurrent<(), (), ()>: Sync); + assert_not_impl!(TryForEachConcurrent<*const (), (), ()>: Sync); + assert_not_impl!(TryForEachConcurrent<(), *const (), ()>: Sync); + assert_not_impl!(TryForEachConcurrent<(), (), *const ()>: Sync); + assert_impl!(TryForEachConcurrent<(), PhantomPinned, PhantomPinned>: Unpin); + assert_not_impl!(TryForEachConcurrent<PhantomPinned, (), ()>: Unpin); + + assert_impl!(TryNext<'_, ()>: Send); + assert_not_impl!(TryNext<'_, *const ()>: Send); + assert_impl!(TryNext<'_, ()>: Sync); + assert_not_impl!(TryNext<'_, *const ()>: Sync); + assert_impl!(TryNext<'_, ()>: Unpin); + assert_not_impl!(TryNext<'_, PhantomPinned>: Unpin); + + assert_impl!(TrySkipWhile<SendTryStream<()>, (), ()>: Send); + assert_not_impl!(TrySkipWhile<LocalTryStream<()>, (), ()>: Send); + assert_not_impl!(TrySkipWhile<SendTryStream, (), ()>: Send); + assert_not_impl!(TrySkipWhile<SendTryStream<()>, *const (), ()>: Send); + assert_not_impl!(TrySkipWhile<SendTryStream<()>, (), *const ()>: Send); + assert_impl!(TrySkipWhile<SyncTryStream<()>, (), ()>: Sync); + assert_not_impl!(TrySkipWhile<LocalTryStream<()>, (), ()>: Sync); + assert_not_impl!(TrySkipWhile<SyncTryStream, (), ()>: Sync); + assert_not_impl!(TrySkipWhile<SyncTryStream<()>, *const (), ()>: Sync); + assert_not_impl!(TrySkipWhile<SyncTryStream<()>, (), *const ()>: Sync); + assert_impl!(TrySkipWhile<UnpinTryStream, (), PhantomPinned>: Unpin); + assert_not_impl!(TrySkipWhile<PinnedTryStream, (), ()>: Unpin); + assert_not_impl!(TrySkipWhile<UnpinTryStream, PhantomPinned, ()>: Unpin); + + assert_impl!(TryTakeWhile<SendTryStream<()>, (), ()>: Send); + assert_not_impl!(TryTakeWhile<LocalTryStream<()>, (), ()>: Send); + assert_not_impl!(TryTakeWhile<SendTryStream, (), ()>: Send); + assert_not_impl!(TryTakeWhile<SendTryStream<()>, *const (), ()>: Send); + assert_not_impl!(TryTakeWhile<SendTryStream<()>, (), *const ()>: Send); + assert_impl!(TryTakeWhile<SyncTryStream<()>, (), ()>: Sync); + assert_not_impl!(TryTakeWhile<LocalTryStream<()>, (), ()>: Sync); + assert_not_impl!(TryTakeWhile<SyncTryStream, (), ()>: Sync); + assert_not_impl!(TryTakeWhile<SyncTryStream<()>, *const (), ()>: Sync); + assert_not_impl!(TryTakeWhile<SyncTryStream<()>, (), *const ()>: Sync); + assert_impl!(TryTakeWhile<UnpinTryStream, (), PhantomPinned>: Unpin); + assert_not_impl!(TryTakeWhile<PinnedTryStream, (), ()>: Unpin); + assert_not_impl!(TryTakeWhile<UnpinTryStream, PhantomPinned, ()>: Unpin); + + assert_impl!(TryUnfold<(), (), ()>: Send); + assert_not_impl!(TryUnfold<*const (), (), ()>: Send); + assert_not_impl!(TryUnfold<(), *const (), ()>: Send); + assert_not_impl!(TryUnfold<(), (), *const ()>: Send); + assert_impl!(TryUnfold<(), (), ()>: Sync); + assert_not_impl!(TryUnfold<*const (), (), ()>: Sync); + assert_not_impl!(TryUnfold<(), *const (), ()>: Sync); + assert_not_impl!(TryUnfold<(), (), *const ()>: Sync); + assert_impl!(TryUnfold<PhantomPinned, PhantomPinned, ()>: Unpin); + assert_not_impl!(TryUnfold<(), (), PhantomPinned>: Unpin); + + assert_impl!(Unfold<(), (), ()>: Send); + assert_not_impl!(Unfold<*const (), (), ()>: Send); + assert_not_impl!(Unfold<(), *const (), ()>: Send); + assert_not_impl!(Unfold<(), (), *const ()>: Send); + assert_impl!(Unfold<(), (), ()>: Sync); + assert_not_impl!(Unfold<*const (), (), ()>: Sync); + assert_not_impl!(Unfold<(), *const (), ()>: Sync); + assert_not_impl!(Unfold<(), (), *const ()>: Sync); + assert_impl!(Unfold<PhantomPinned, PhantomPinned, ()>: Unpin); + assert_not_impl!(Unfold<(), (), PhantomPinned>: Unpin); + + assert_impl!(Unzip<(), (), ()>: Send); + assert_not_impl!(Unzip<*const (), (), ()>: Send); + assert_not_impl!(Unzip<(), *const (), ()>: Send); + assert_not_impl!(Unzip<(), (), *const ()>: Send); + assert_impl!(Unzip<(), (), ()>: Sync); + assert_not_impl!(Unzip<*const (), (), ()>: Sync); + assert_not_impl!(Unzip<(), *const (), ()>: Sync); + assert_not_impl!(Unzip<(), (), *const ()>: Sync); + assert_impl!(Unzip<(), PhantomPinned, PhantomPinned>: Unpin); + assert_not_impl!(Unzip<PhantomPinned, (), ()>: Unpin); + + assert_impl!(Zip<SendStream<()>, SendStream<()>>: Send); + assert_not_impl!(Zip<SendStream, SendStream<()>>: Send); + assert_not_impl!(Zip<SendStream<()>, SendStream>: Send); + assert_not_impl!(Zip<LocalStream, SendStream<()>>: Send); + assert_not_impl!(Zip<SendStream<()>, LocalStream>: Send); + assert_impl!(Zip<SyncStream<()>, SyncStream<()>>: Sync); + assert_not_impl!(Zip<SyncStream, SyncStream<()>>: Sync); + assert_not_impl!(Zip<SyncStream<()>, SyncStream>: Sync); + assert_not_impl!(Zip<LocalStream, SyncStream<()>>: Sync); + assert_not_impl!(Zip<SyncStream<()>, LocalStream>: Sync); + assert_impl!(Zip<UnpinStream, UnpinStream>: Unpin); + assert_not_impl!(Zip<UnpinStream, PinnedStream>: Unpin); + assert_not_impl!(Zip<PinnedStream, UnpinStream>: Unpin); + + assert_impl!(futures_unordered::Iter<()>: Send); + assert_not_impl!(futures_unordered::Iter<*const ()>: Send); + assert_impl!(futures_unordered::Iter<()>: Sync); + assert_not_impl!(futures_unordered::Iter<*const ()>: Sync); + assert_impl!(futures_unordered::Iter<()>: Unpin); + // The definition of futures_unordered::Iter has `Fut: Unpin` bounds. + // assert_not_impl!(futures_unordered::Iter<PhantomPinned>: Unpin); + + assert_impl!(futures_unordered::IterMut<()>: Send); + assert_not_impl!(futures_unordered::IterMut<*const ()>: Send); + assert_impl!(futures_unordered::IterMut<()>: Sync); + assert_not_impl!(futures_unordered::IterMut<*const ()>: Sync); + assert_impl!(futures_unordered::IterMut<()>: Unpin); + // The definition of futures_unordered::IterMut has `Fut: Unpin` bounds. + // assert_not_impl!(futures_unordered::IterMut<PhantomPinned>: Unpin); + + assert_impl!(futures_unordered::IterPinMut<()>: Send); + assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Send); + assert_impl!(futures_unordered::IterPinMut<()>: Sync); + assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Sync); + assert_impl!(futures_unordered::IterPinMut<PhantomPinned>: Unpin); + + assert_impl!(futures_unordered::IterPinRef<()>: Send); + assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Send); + assert_impl!(futures_unordered::IterPinRef<()>: Sync); + assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync); + assert_impl!(futures_unordered::IterPinRef<PhantomPinned>: Unpin); + + assert_impl!(futures_unordered::IntoIter<()>: Send); + assert_not_impl!(futures_unordered::IntoIter<*const ()>: Send); + assert_impl!(futures_unordered::IntoIter<()>: Sync); + assert_not_impl!(futures_unordered::IntoIter<*const ()>: Sync); + // The definition of futures_unordered::IntoIter has `Fut: Unpin` bounds. + // assert_not_impl!(futures_unordered::IntoIter<PhantomPinned>: Unpin); +} + +/// Assert Send/Sync/Unpin for all public types in `futures::task`. +pub mod task { + use super::*; + use futures::task::*; + + assert_impl!(AtomicWaker: Send); + assert_impl!(AtomicWaker: Sync); + assert_impl!(AtomicWaker: Unpin); + + assert_impl!(FutureObj<*const ()>: Send); + assert_not_impl!(FutureObj<()>: Sync); + assert_impl!(FutureObj<PhantomPinned>: Unpin); + + assert_not_impl!(LocalFutureObj<()>: Send); + assert_not_impl!(LocalFutureObj<()>: Sync); + assert_impl!(LocalFutureObj<PhantomPinned>: Unpin); + + assert_impl!(SpawnError: Send); + assert_impl!(SpawnError: Sync); + assert_impl!(SpawnError: Unpin); + + assert_impl!(WakerRef<'_>: Send); + assert_impl!(WakerRef<'_>: Sync); + assert_impl!(WakerRef<'_>: Unpin); +} diff --git a/third_party/rust/futures/tests/compat.rs b/third_party/rust/futures/tests/compat.rs new file mode 100644 index 0000000000..ac04a95ea8 --- /dev/null +++ b/third_party/rust/futures/tests/compat.rs @@ -0,0 +1,16 @@ +#![cfg(feature = "compat")] +#![cfg(not(miri))] // Miri does not support epoll + +use futures::compat::Future01CompatExt; +use futures::prelude::*; +use std::time::Instant; +use tokio::runtime::Runtime; +use tokio::timer::Delay; + +#[test] +fn can_use_01_futures_in_a_03_future_running_on_a_01_executor() { + let f = async { Delay::new(Instant::now()).compat().await }; + + let mut runtime = Runtime::new().unwrap(); + runtime.block_on(f.boxed().compat()).unwrap(); +} diff --git a/third_party/rust/futures/tests/eager_drop.rs b/third_party/rust/futures/tests/eager_drop.rs new file mode 100644 index 0000000000..992507774c --- /dev/null +++ b/third_party/rust/futures/tests/eager_drop.rs @@ -0,0 +1,121 @@ +use futures::channel::oneshot; +use futures::future::{self, Future, FutureExt, TryFutureExt}; +use futures::task::{Context, Poll}; +use futures_test::future::FutureTestExt; +use pin_project::pin_project; +use std::pin::Pin; +use std::sync::mpsc; + +#[test] +fn map_ok() { + // The closure given to `map_ok` should have been dropped by the time `map` + // runs. + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + future::ready::<Result<i32, i32>>(Err(1)) + .map_ok(move |_| { + let _tx1 = tx1; + panic!("should not run"); + }) + .map(move |_| { + assert!(rx1.recv().is_err()); + tx2.send(()).unwrap() + }) + .run_in_background(); + + rx2.recv().unwrap(); +} + +#[test] +fn map_err() { + // The closure given to `map_err` should have been dropped by the time `map` + // runs. + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + future::ready::<Result<i32, i32>>(Ok(1)) + .map_err(move |_| { + let _tx1 = tx1; + panic!("should not run"); + }) + .map(move |_| { + assert!(rx1.recv().is_err()); + tx2.send(()).unwrap() + }) + .run_in_background(); + + rx2.recv().unwrap(); +} + +#[pin_project] +struct FutureData<F, T> { + _data: T, + #[pin] + future: F, +} + +impl<F: Future, T: Send + 'static> Future for FutureData<F, T> { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> { + self.project().future.poll(cx) + } +} + +#[test] +fn then_drops_eagerly() { + let (tx0, rx0) = oneshot::channel::<()>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| panic!()) } + .then(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready(()) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(()).unwrap(); + rx2.recv().unwrap(); +} + +#[test] +fn and_then_drops_eagerly() { + let (tx0, rx0) = oneshot::channel::<Result<(), ()>>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| panic!()) } + .and_then(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready(Ok(())) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(Ok(())).unwrap(); + rx2.recv().unwrap(); +} + +#[test] +fn or_else_drops_eagerly() { + let (tx0, rx0) = oneshot::channel::<Result<(), ()>>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); + + FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| panic!()) } + .or_else(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready::<Result<(), ()>>(Ok(())) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(Err(())).unwrap(); + rx2.recv().unwrap(); +} diff --git a/third_party/rust/futures/tests/eventual.rs b/third_party/rust/futures/tests/eventual.rs new file mode 100644 index 0000000000..57a49b2417 --- /dev/null +++ b/third_party/rust/futures/tests/eventual.rs @@ -0,0 +1,179 @@ +use futures::channel::oneshot; +use futures::executor::ThreadPool; +use futures::future::{self, ok, Future, FutureExt, TryFutureExt}; +use futures::task::SpawnExt; +use std::sync::mpsc; +use std::thread; + +fn run<F: Future + Send + 'static>(future: F) { + let tp = ThreadPool::new().unwrap(); + tp.spawn(future.map(drop)).unwrap(); +} + +#[test] +fn join1() { + let (tx, rx) = mpsc::channel(); + run(future::try_join(ok::<i32, i32>(1), ok(2)).map_ok(move |v| tx.send(v).unwrap())); + assert_eq!(rx.recv(), Ok((1, 2))); + assert!(rx.recv().is_err()); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +} + +#[test] +fn join2() { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + let (tx, rx) = mpsc::channel(); + run(future::try_join(p1, p2).map_ok(move |v| tx.send(v).unwrap())); + assert!(rx.try_recv().is_err()); + c1.send(1).unwrap(); + assert!(rx.try_recv().is_err()); + c2.send(2).unwrap(); + assert_eq!(rx.recv(), Ok((1, 2))); + assert!(rx.recv().is_err()); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +} + +#[test] +fn join3() { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + let (tx, rx) = mpsc::channel(); + run(future::try_join(p1, p2).map_err(move |_v| tx.send(1).unwrap())); + assert!(rx.try_recv().is_err()); + drop(c1); + assert_eq!(rx.recv(), Ok(1)); + assert!(rx.recv().is_err()); + drop(c2); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +} + +#[test] +fn join4() { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + let (tx, rx) = mpsc::channel(); + run(future::try_join(p1, p2).map_err(move |v| tx.send(v).unwrap())); + assert!(rx.try_recv().is_err()); + drop(c1); + assert!(rx.recv().is_ok()); + drop(c2); + assert!(rx.recv().is_err()); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +} + +#[test] +fn join5() { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + let (c3, p3) = oneshot::channel::<i32>(); + let (tx, rx) = mpsc::channel(); + run(future::try_join(future::try_join(p1, p2), p3).map_ok(move |v| tx.send(v).unwrap())); + assert!(rx.try_recv().is_err()); + c1.send(1).unwrap(); + assert!(rx.try_recv().is_err()); + c2.send(2).unwrap(); + assert!(rx.try_recv().is_err()); + c3.send(3).unwrap(); + assert_eq!(rx.recv(), Ok(((1, 2), 3))); + assert!(rx.recv().is_err()); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +} + +#[test] +fn select1() { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + let (tx, rx) = mpsc::channel(); + run(future::try_select(p1, p2).map_ok(move |v| tx.send(v).unwrap())); + assert!(rx.try_recv().is_err()); + c1.send(1).unwrap(); + let (v, p2) = rx.recv().unwrap().into_inner(); + assert_eq!(v, 1); + assert!(rx.recv().is_err()); + + let (tx, rx) = mpsc::channel(); + run(p2.map_ok(move |v| tx.send(v).unwrap())); + c2.send(2).unwrap(); + assert_eq!(rx.recv(), Ok(2)); + assert!(rx.recv().is_err()); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +} + +#[test] +fn select2() { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + let (tx, rx) = mpsc::channel(); + run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap())); + assert!(rx.try_recv().is_err()); + drop(c1); + let (v, p2) = rx.recv().unwrap(); + assert_eq!(v, 1); + assert!(rx.recv().is_err()); + + let (tx, rx) = mpsc::channel(); + run(p2.map_ok(move |v| tx.send(v).unwrap())); + c2.send(2).unwrap(); + assert_eq!(rx.recv(), Ok(2)); + assert!(rx.recv().is_err()); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +} + +#[test] +fn select3() { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + let (tx, rx) = mpsc::channel(); + run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap())); + assert!(rx.try_recv().is_err()); + drop(c1); + let (v, p2) = rx.recv().unwrap(); + assert_eq!(v, 1); + assert!(rx.recv().is_err()); + + let (tx, rx) = mpsc::channel(); + run(p2.map_err(move |_v| tx.send(2).unwrap())); + drop(c2); + assert_eq!(rx.recv(), Ok(2)); + assert!(rx.recv().is_err()); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +} + +#[test] +fn select4() { + const N: usize = if cfg!(miri) { 100 } else { 10000 }; + + let (tx, rx) = mpsc::channel::<oneshot::Sender<i32>>(); + + let t = thread::spawn(move || { + for c in rx { + c.send(1).unwrap(); + } + }); + + let (tx2, rx2) = mpsc::channel(); + for _ in 0..N { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + + let tx3 = tx2.clone(); + run(future::try_select(p1, p2).map_ok(move |_| tx3.send(()).unwrap())); + tx.send(c1).unwrap(); + rx2.recv().unwrap(); + drop(c2); + } + drop(tx); + + t.join().unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +} diff --git a/third_party/rust/futures/tests/future_abortable.rs b/third_party/rust/futures/tests/future_abortable.rs new file mode 100644 index 0000000000..e119f0b719 --- /dev/null +++ b/third_party/rust/futures/tests/future_abortable.rs @@ -0,0 +1,44 @@ +use futures::channel::oneshot; +use futures::executor::block_on; +use futures::future::{abortable, Aborted, FutureExt}; +use futures::task::{Context, Poll}; +use futures_test::task::new_count_waker; + +#[test] +fn abortable_works() { + let (_tx, a_rx) = oneshot::channel::<()>(); + let (abortable_rx, abort_handle) = abortable(a_rx); + + abort_handle.abort(); + assert!(abortable_rx.is_aborted()); + assert_eq!(Err(Aborted), block_on(abortable_rx)); +} + +#[test] +fn abortable_awakens() { + let (_tx, a_rx) = oneshot::channel::<()>(); + let (mut abortable_rx, abort_handle) = abortable(a_rx); + + let (waker, counter) = new_count_waker(); + let mut cx = Context::from_waker(&waker); + + assert_eq!(counter, 0); + assert_eq!(Poll::Pending, abortable_rx.poll_unpin(&mut cx)); + assert_eq!(counter, 0); + + abort_handle.abort(); + assert_eq!(counter, 1); + assert!(abortable_rx.is_aborted()); + assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(&mut cx)); +} + +#[test] +fn abortable_resolves() { + let (tx, a_rx) = oneshot::channel::<()>(); + let (abortable_rx, _abort_handle) = abortable(a_rx); + + tx.send(()).unwrap(); + + assert!(!abortable_rx.is_aborted()); + assert_eq!(Ok(Ok(())), block_on(abortable_rx)); +} diff --git a/third_party/rust/futures/tests/future_basic_combinators.rs b/third_party/rust/futures/tests/future_basic_combinators.rs new file mode 100644 index 0000000000..372ab48b79 --- /dev/null +++ b/third_party/rust/futures/tests/future_basic_combinators.rs @@ -0,0 +1,104 @@ +use futures::future::{self, FutureExt, TryFutureExt}; +use futures_test::future::FutureTestExt; +use std::sync::mpsc; + +#[test] +fn basic_future_combinators() { + let (tx1, rx) = mpsc::channel(); + let tx2 = tx1.clone(); + let tx3 = tx1.clone(); + + let fut = future::ready(1) + .then(move |x| { + tx1.send(x).unwrap(); // Send 1 + tx1.send(2).unwrap(); // Send 2 + future::ready(3) + }) + .map(move |x| { + tx2.send(x).unwrap(); // Send 3 + tx2.send(4).unwrap(); // Send 4 + 5 + }) + .map(move |x| { + tx3.send(x).unwrap(); // Send 5 + }); + + assert!(rx.try_recv().is_err()); // Not started yet + fut.run_in_background(); // Start it + for i in 1..=5 { + assert_eq!(rx.recv(), Ok(i)); + } // Check it + assert!(rx.recv().is_err()); // Should be done +} + +#[test] +fn basic_try_future_combinators() { + let (tx1, rx) = mpsc::channel(); + let tx2 = tx1.clone(); + let tx3 = tx1.clone(); + let tx4 = tx1.clone(); + let tx5 = tx1.clone(); + let tx6 = tx1.clone(); + let tx7 = tx1.clone(); + let tx8 = tx1.clone(); + let tx9 = tx1.clone(); + let tx10 = tx1.clone(); + + let fut = future::ready(Ok(1)) + .and_then(move |x: i32| { + tx1.send(x).unwrap(); // Send 1 + tx1.send(2).unwrap(); // Send 2 + future::ready(Ok(3)) + }) + .or_else(move |x: i32| { + tx2.send(x).unwrap(); // Should not run + tx2.send(-1).unwrap(); + future::ready(Ok(-1)) + }) + .map_ok(move |x: i32| { + tx3.send(x).unwrap(); // Send 3 + tx3.send(4).unwrap(); // Send 4 + 5 + }) + .map_err(move |x: i32| { + tx4.send(x).unwrap(); // Should not run + tx4.send(-1).unwrap(); + -1 + }) + .map(move |x: Result<i32, i32>| { + tx5.send(x.unwrap()).unwrap(); // Send 5 + tx5.send(6).unwrap(); // Send 6 + Err(7) // Now return errors! + }) + .and_then(move |x: i32| { + tx6.send(x).unwrap(); // Should not run + tx6.send(-1).unwrap(); + future::ready(Err(-1)) + }) + .or_else(move |x: i32| { + tx7.send(x).unwrap(); // Send 7 + tx7.send(8).unwrap(); // Send 8 + future::ready(Err(9)) + }) + .map_ok(move |x: i32| { + tx8.send(x).unwrap(); // Should not run + tx8.send(-1).unwrap(); + -1 + }) + .map_err(move |x: i32| { + tx9.send(x).unwrap(); // Send 9 + tx9.send(10).unwrap(); // Send 10 + 11 + }) + .map(move |x: Result<i32, i32>| { + tx10.send(x.err().unwrap()).unwrap(); // Send 11 + tx10.send(12).unwrap(); // Send 12 + }); + + assert!(rx.try_recv().is_err()); // Not started yet + fut.run_in_background(); // Start it + for i in 1..=12 { + assert_eq!(rx.recv(), Ok(i)); + } // Check it + assert!(rx.recv().is_err()); // Should be done +} diff --git a/third_party/rust/futures/tests/future_fuse.rs b/third_party/rust/futures/tests/future_fuse.rs new file mode 100644 index 0000000000..83f2c1ce9e --- /dev/null +++ b/third_party/rust/futures/tests/future_fuse.rs @@ -0,0 +1,12 @@ +use futures::future::{self, FutureExt}; +use futures::task::Context; +use futures_test::task::panic_waker; + +#[test] +fn fuse() { + let mut future = future::ready::<i32>(2).fuse(); + let waker = panic_waker(); + let mut cx = Context::from_waker(&waker); + assert!(future.poll_unpin(&mut cx).is_ready()); + assert!(future.poll_unpin(&mut cx).is_pending()); +} diff --git a/third_party/rust/futures/tests/future_inspect.rs b/third_party/rust/futures/tests/future_inspect.rs new file mode 100644 index 0000000000..eacd1f78a2 --- /dev/null +++ b/third_party/rust/futures/tests/future_inspect.rs @@ -0,0 +1,16 @@ +use futures::executor::block_on; +use futures::future::{self, FutureExt}; + +#[test] +fn smoke() { + let mut counter = 0; + + { + let work = future::ready::<i32>(40).inspect(|val| { + counter += *val; + }); + assert_eq!(block_on(work), 40); + } + + assert_eq!(counter, 40); +} diff --git a/third_party/rust/futures/tests/future_join.rs b/third_party/rust/futures/tests/future_join.rs new file mode 100644 index 0000000000..f5df9d7775 --- /dev/null +++ b/third_party/rust/futures/tests/future_join.rs @@ -0,0 +1,32 @@ +use futures::executor::block_on; +use futures::future::Future; +use std::task::Poll; + +/// This tests verifies (through miri) that self-referencing +/// futures are not invalidated when joining them. +#[test] +fn futures_join_macro_self_referential() { + block_on(async { futures::join!(yield_now(), trouble()) }); +} + +async fn trouble() { + let lucky_number = 42; + let problematic_variable = &lucky_number; + + yield_now().await; + + // problematic dereference + let _ = { *problematic_variable }; +} + +fn yield_now() -> impl Future<Output = ()> { + let mut yielded = false; + std::future::poll_fn(move |cx| { + if core::mem::replace(&mut yielded, true) { + Poll::Ready(()) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + }) +} diff --git a/third_party/rust/futures/tests/future_join_all.rs b/third_party/rust/futures/tests/future_join_all.rs new file mode 100644 index 0000000000..44486e1ca3 --- /dev/null +++ b/third_party/rust/futures/tests/future_join_all.rs @@ -0,0 +1,41 @@ +use futures::executor::block_on; +use futures::future::{join_all, ready, Future, JoinAll}; +use futures::pin_mut; +use std::fmt::Debug; + +#[track_caller] +fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T) +where + T: PartialEq + Debug, +{ + pin_mut!(actual_fut); + let output = block_on(actual_fut); + assert_eq!(output, expected); +} + +#[test] +fn collect_collects() { + assert_done(join_all(vec![ready(1), ready(2)]), vec![1, 2]); + assert_done(join_all(vec![ready(1)]), vec![1]); + // REVIEW: should this be implemented? + // assert_done(join_all(Vec::<i32>::new()), vec![]); + + // TODO: needs more tests +} + +#[test] +fn join_all_iter_lifetime() { + // In futures-rs version 0.1, this function would fail to typecheck due to an overly + // conservative type parameterization of `JoinAll`. + fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Vec<usize>> { + let iter = bufs.into_iter().map(|b| ready::<usize>(b.len())); + join_all(iter) + } + + assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]); +} + +#[test] +fn join_all_from_iter() { + assert_done(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>(), vec![1, 2]) +} diff --git a/third_party/rust/futures/tests/future_obj.rs b/third_party/rust/futures/tests/future_obj.rs new file mode 100644 index 0000000000..0e5253464e --- /dev/null +++ b/third_party/rust/futures/tests/future_obj.rs @@ -0,0 +1,33 @@ +use futures::future::{Future, FutureExt, FutureObj}; +use futures::task::{Context, Poll}; +use std::pin::Pin; + +#[test] +fn dropping_does_not_segfault() { + FutureObj::new(async { String::new() }.boxed()); +} + +#[test] +fn dropping_drops_the_future() { + let mut times_dropped = 0; + + struct Inc<'a>(&'a mut u32); + + impl Future for Inc<'_> { + type Output = (); + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<()> { + unimplemented!() + } + } + + impl Drop for Inc<'_> { + fn drop(&mut self) { + *self.0 += 1; + } + } + + FutureObj::new(Inc(&mut times_dropped).boxed()); + + assert_eq!(times_dropped, 1); +} diff --git a/third_party/rust/futures/tests/future_select_all.rs b/third_party/rust/futures/tests/future_select_all.rs new file mode 100644 index 0000000000..299b479044 --- /dev/null +++ b/third_party/rust/futures/tests/future_select_all.rs @@ -0,0 +1,25 @@ +use futures::executor::block_on; +use futures::future::{ready, select_all}; +use std::collections::HashSet; + +#[test] +fn smoke() { + let v = vec![ready(1), ready(2), ready(3)]; + + let mut c = vec![1, 2, 3].into_iter().collect::<HashSet<_>>(); + + let (i, idx, v) = block_on(select_all(v)); + assert!(c.remove(&i)); + assert_eq!(idx, 0); + + let (i, idx, v) = block_on(select_all(v)); + assert!(c.remove(&i)); + assert_eq!(idx, 0); + + let (i, idx, v) = block_on(select_all(v)); + assert!(c.remove(&i)); + assert_eq!(idx, 0); + + assert!(c.is_empty()); + assert!(v.is_empty()); +} diff --git a/third_party/rust/futures/tests/future_select_ok.rs b/third_party/rust/futures/tests/future_select_ok.rs new file mode 100644 index 0000000000..8aec00362d --- /dev/null +++ b/third_party/rust/futures/tests/future_select_ok.rs @@ -0,0 +1,30 @@ +use futures::executor::block_on; +use futures::future::{err, ok, select_ok}; + +#[test] +fn ignore_err() { + let v = vec![err(1), err(2), ok(3), ok(4)]; + + let (i, v) = block_on(select_ok(v)).ok().unwrap(); + assert_eq!(i, 3); + + assert_eq!(v.len(), 1); + + let (i, v) = block_on(select_ok(v)).ok().unwrap(); + assert_eq!(i, 4); + + assert!(v.is_empty()); +} + +#[test] +fn last_err() { + let v = vec![ok(1), err(2), err(3)]; + + let (i, v) = block_on(select_ok(v)).ok().unwrap(); + assert_eq!(i, 1); + + assert_eq!(v.len(), 2); + + let i = block_on(select_ok(v)).err().unwrap(); + assert_eq!(i, 3); +} diff --git a/third_party/rust/futures/tests/future_shared.rs b/third_party/rust/futures/tests/future_shared.rs new file mode 100644 index 0000000000..bd69c1d7c1 --- /dev/null +++ b/third_party/rust/futures/tests/future_shared.rs @@ -0,0 +1,273 @@ +use futures::channel::oneshot; +use futures::executor::{block_on, LocalPool}; +use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt}; +use futures::task::LocalSpawn; +use std::cell::{Cell, RefCell}; +use std::panic::AssertUnwindSafe; +use std::rc::Rc; +use std::task::Poll; +use std::thread; + +struct CountClone(Rc<Cell<i32>>); + +impl Clone for CountClone { + fn clone(&self) -> Self { + self.0.set(self.0.get() + 1); + Self(self.0.clone()) + } +} + +fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) { + let (tx, rx) = oneshot::channel::<i32>(); + let f = rx.shared(); + let join_handles = (0..threads_number) + .map(|_| { + let cloned_future = f.clone(); + thread::spawn(move || { + assert_eq!(block_on(cloned_future).unwrap(), 6); + }) + }) + .collect::<Vec<_>>(); + + tx.send(6).unwrap(); + + assert_eq!(block_on(f).unwrap(), 6); + for join_handle in join_handles { + join_handle.join().unwrap(); + } +} + +#[test] +fn one_thread() { + send_shared_oneshot_and_wait_on_multiple_threads(1); +} + +#[test] +fn two_threads() { + send_shared_oneshot_and_wait_on_multiple_threads(2); +} + +#[test] +fn many_threads() { + send_shared_oneshot_and_wait_on_multiple_threads(1000); +} + +#[test] +fn drop_on_one_task_ok() { + let (tx, rx) = oneshot::channel::<u32>(); + let f1 = rx.shared(); + let f2 = f1.clone(); + + let (tx2, rx2) = oneshot::channel::<u32>(); + + let t1 = thread::spawn(|| { + let f = future::try_select(f1.map_err(|_| ()), rx2.map_err(|_| ())); + drop(block_on(f)); + }); + + let (tx3, rx3) = oneshot::channel::<u32>(); + + let t2 = thread::spawn(|| { + let _ = block_on(f2.map_ok(|x| tx3.send(x).unwrap()).map_err(|_| ())); + }); + + tx2.send(11).unwrap(); // cancel `f1` + t1.join().unwrap(); + + tx.send(42).unwrap(); // Should cause `f2` and then `rx3` to get resolved. + let result = block_on(rx3).unwrap(); + assert_eq!(result, 42); + t2.join().unwrap(); +} + +#[test] +fn drop_in_poll() { + let slot1 = Rc::new(RefCell::new(None)); + let slot2 = slot1.clone(); + + let future1 = future::lazy(move |_| { + slot2.replace(None); // Drop future + 1 + }) + .shared(); + + let future2 = LocalFutureObj::new(Box::new(future1.clone())); + slot1.replace(Some(future2)); + + assert_eq!(block_on(future1), 1); +} + +#[test] +fn peek() { + let mut local_pool = LocalPool::new(); + let spawn = &mut local_pool.spawner(); + + let (tx0, rx0) = oneshot::channel::<i32>(); + let f1 = rx0.shared(); + let f2 = f1.clone(); + + // Repeated calls on the original or clone do not change the outcome. + for _ in 0..2 { + assert!(f1.peek().is_none()); + assert!(f2.peek().is_none()); + } + + // Completing the underlying future has no effect, because the value has not been `poll`ed in. + tx0.send(42).unwrap(); + for _ in 0..2 { + assert!(f1.peek().is_none()); + assert!(f2.peek().is_none()); + } + + // Once the Shared has been polled, the value is peekable on the clone. + spawn.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))).unwrap(); + local_pool.run(); + for _ in 0..2 { + assert_eq!(*f2.peek().unwrap(), Ok(42)); + } +} + +#[test] +fn downgrade() { + let (tx, rx) = oneshot::channel::<i32>(); + let shared = rx.shared(); + // Since there are outstanding `Shared`s, we can get a `WeakShared`. + let weak = shared.downgrade().unwrap(); + // It should upgrade fine right now. + let mut shared2 = weak.upgrade().unwrap(); + + tx.send(42).unwrap(); + assert_eq!(block_on(shared).unwrap(), 42); + + // We should still be able to get a new `WeakShared` and upgrade it + // because `shared2` is outstanding. + assert!(shared2.downgrade().is_some()); + assert!(weak.upgrade().is_some()); + + assert_eq!(block_on(&mut shared2).unwrap(), 42); + // Now that all `Shared`s have been exhausted, we should not be able + // to get a new `WeakShared` or upgrade an existing one. + assert!(weak.upgrade().is_none()); + assert!(shared2.downgrade().is_none()); +} + +#[test] +fn ptr_eq() { + use future::FusedFuture; + use std::collections::hash_map::DefaultHasher; + use std::hash::Hasher; + + let (tx, rx) = oneshot::channel::<i32>(); + let shared = rx.shared(); + let mut shared2 = shared.clone(); + let mut hasher = DefaultHasher::new(); + let mut hasher2 = DefaultHasher::new(); + + // Because these two futures share the same underlying future, + // `ptr_eq` should return true. + assert!(shared.ptr_eq(&shared2)); + // Equivalence relations are symmetric + assert!(shared2.ptr_eq(&shared)); + + // If `ptr_eq` returns true, they should hash to the same value. + shared.ptr_hash(&mut hasher); + shared2.ptr_hash(&mut hasher2); + assert_eq!(hasher.finish(), hasher2.finish()); + + tx.send(42).unwrap(); + assert_eq!(block_on(&mut shared2).unwrap(), 42); + + // Now that `shared2` has completed, `ptr_eq` should return false. + assert!(shared2.is_terminated()); + assert!(!shared.ptr_eq(&shared2)); + + // `ptr_eq` should continue to work for the other `Shared`. + let shared3 = shared.clone(); + let mut hasher3 = DefaultHasher::new(); + assert!(shared.ptr_eq(&shared3)); + + shared3.ptr_hash(&mut hasher3); + assert_eq!(hasher.finish(), hasher3.finish()); + + let (_tx, rx) = oneshot::channel::<i32>(); + let shared4 = rx.shared(); + + // And `ptr_eq` should return false for two futures that don't share + // the underlying future. + assert!(!shared.ptr_eq(&shared4)); +} + +#[test] +fn dont_clone_in_single_owner_shared_future() { + let counter = CountClone(Rc::new(Cell::new(0))); + let (tx, rx) = oneshot::channel(); + + let rx = rx.shared(); + + tx.send(counter).ok().unwrap(); + + assert_eq!(block_on(rx).unwrap().0.get(), 0); +} + +#[test] +fn dont_do_unnecessary_clones_on_output() { + let counter = CountClone(Rc::new(Cell::new(0))); + let (tx, rx) = oneshot::channel(); + + let rx = rx.shared(); + + tx.send(counter).ok().unwrap(); + + assert_eq!(block_on(rx.clone()).unwrap().0.get(), 1); + assert_eq!(block_on(rx.clone()).unwrap().0.get(), 2); + assert_eq!(block_on(rx).unwrap().0.get(), 2); +} + +#[test] +fn shared_future_that_wakes_itself_until_pending_is_returned() { + let proceed = Cell::new(false); + let fut = futures::future::poll_fn(|cx| { + if proceed.get() { + Poll::Ready(()) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + }) + .shared(); + + // The join future can only complete if the second future gets a chance to run after the first + // has returned pending + assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ())); +} + +#[test] +#[should_panic(expected = "inner future panicked during poll")] +fn panic_while_poll() { + let fut = futures::future::poll_fn::<i8, _>(|_cx| panic!("test")).shared(); + + let fut_captured = fut.clone(); + std::panic::catch_unwind(AssertUnwindSafe(|| { + block_on(fut_captured); + })) + .unwrap_err(); + + block_on(fut); +} + +#[test] +#[should_panic(expected = "test_marker")] +fn poll_while_panic() { + struct S; + + impl Drop for S { + fn drop(&mut self) { + let fut = futures::future::ready(1).shared(); + assert_eq!(block_on(fut.clone()), 1); + assert_eq!(block_on(fut), 1); + } + } + + let _s = S {}; + panic!("test_marker"); +} diff --git a/third_party/rust/futures/tests/future_try_flatten_stream.rs b/third_party/rust/futures/tests/future_try_flatten_stream.rs new file mode 100644 index 0000000000..82ae1baf2c --- /dev/null +++ b/third_party/rust/futures/tests/future_try_flatten_stream.rs @@ -0,0 +1,83 @@ +use futures::executor::block_on_stream; +use futures::future::{err, ok, TryFutureExt}; +use futures::sink::Sink; +use futures::stream::Stream; +use futures::stream::{self, StreamExt}; +use futures::task::{Context, Poll}; +use std::marker::PhantomData; +use std::pin::Pin; + +#[test] +fn successful_future() { + let stream_items = vec![17, 19]; + let future_of_a_stream = ok::<_, bool>(stream::iter(stream_items).map(Ok)); + + let stream = future_of_a_stream.try_flatten_stream(); + + let mut iter = block_on_stream(stream); + assert_eq!(Ok(17), iter.next().unwrap()); + assert_eq!(Ok(19), iter.next().unwrap()); + assert_eq!(None, iter.next()); +} + +#[test] +fn failed_future() { + struct PanickingStream<T, E> { + _marker: PhantomData<(T, E)>, + } + + impl<T, E> Stream for PanickingStream<T, E> { + type Item = Result<T, E>; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { + panic!() + } + } + + let future_of_a_stream = err::<PanickingStream<bool, u32>, _>(10); + let stream = future_of_a_stream.try_flatten_stream(); + let mut iter = block_on_stream(stream); + assert_eq!(Err(10), iter.next().unwrap()); + assert_eq!(None, iter.next()); +} + +#[test] +fn assert_impls() { + struct StreamSink<T, E, Item>(PhantomData<(T, E, Item)>); + + impl<T, E, Item> Stream for StreamSink<T, E, Item> { + type Item = Result<T, E>; + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { + panic!() + } + } + + impl<T, E, Item> Sink<Item> for StreamSink<T, E, Item> { + type Error = E; + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + panic!() + } + fn start_send(self: Pin<&mut Self>, _: Item) -> Result<(), Self::Error> { + panic!() + } + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + panic!() + } + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + panic!() + } + } + + fn assert_stream<S: Stream>(_: &S) {} + fn assert_sink<S: Sink<Item>, Item>(_: &S) {} + fn assert_stream_sink<S: Stream + Sink<Item>, Item>(_: &S) {} + + let s = ok(StreamSink::<(), (), ()>(PhantomData)).try_flatten_stream(); + assert_stream(&s); + assert_sink(&s); + assert_stream_sink(&s); + let s = ok(StreamSink::<(), (), ()>(PhantomData)).flatten_sink(); + assert_stream(&s); + assert_sink(&s); + assert_stream_sink(&s); +} diff --git a/third_party/rust/futures/tests/future_try_join_all.rs b/third_party/rust/futures/tests/future_try_join_all.rs new file mode 100644 index 0000000000..9a824872f7 --- /dev/null +++ b/third_party/rust/futures/tests/future_try_join_all.rs @@ -0,0 +1,46 @@ +use futures::executor::block_on; +use futures::pin_mut; +use futures_util::future::{err, ok, try_join_all, TryJoinAll}; +use std::fmt::Debug; +use std::future::Future; + +#[track_caller] +fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T) +where + T: PartialEq + Debug, +{ + pin_mut!(actual_fut); + let output = block_on(actual_fut); + assert_eq!(output, expected); +} + +#[test] +fn collect_collects() { + assert_done(try_join_all(vec![ok(1), ok(2)]), Ok::<_, usize>(vec![1, 2])); + assert_done(try_join_all(vec![ok(1), err(2)]), Err(2)); + assert_done(try_join_all(vec![ok(1)]), Ok::<_, usize>(vec![1])); + // REVIEW: should this be implemented? + // assert_done(try_join_all(Vec::<i32>::new()), Ok(vec![])); + + // TODO: needs more tests +} + +#[test] +fn try_join_all_iter_lifetime() { + // In futures-rs version 0.1, this function would fail to typecheck due to an overly + // conservative type parameterization of `TryJoinAll`. + fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Result<Vec<usize>, ()>> { + let iter = bufs.into_iter().map(|b| ok::<usize, ()>(b.len())); + try_join_all(iter) + } + + assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1])); +} + +#[test] +fn try_join_all_from_iter() { + assert_done( + vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>(), + Ok::<_, usize>(vec![1, 2]), + ) +} diff --git a/third_party/rust/futures/tests/io_buf_reader.rs b/third_party/rust/futures/tests/io_buf_reader.rs new file mode 100644 index 0000000000..717297ccea --- /dev/null +++ b/third_party/rust/futures/tests/io_buf_reader.rs @@ -0,0 +1,432 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{ + AllowStdIo, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, + BufReader, SeekFrom, +}; +use futures::pin_mut; +use futures::task::{Context, Poll}; +use futures_test::task::noop_context; +use pin_project::pin_project; +use std::cmp; +use std::io; +use std::pin::Pin; + +// helper for maybe_pending_* tests +fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + +// https://github.com/rust-lang/futures-rs/pull/2489#discussion_r697865719 +#[pin_project(!Unpin)] +struct Cursor<T> { + #[pin] + inner: futures::io::Cursor<T>, +} + +impl<T> Cursor<T> { + fn new(inner: T) -> Self { + Self { inner: futures::io::Cursor::new(inner) } + } +} + +impl AsyncRead for Cursor<&[u8]> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + self.project().inner.poll_read(cx, buf) + } +} + +impl AsyncBufRead for Cursor<&[u8]> { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + self.project().inner.poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().inner.consume(amt) + } +} + +impl AsyncSeek for Cursor<&[u8]> { + fn poll_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<u64>> { + self.project().inner.poll_seek(cx, pos) + } +} + +struct MaybePending<'a> { + inner: &'a [u8], + ready_read: bool, + ready_fill_buf: bool, +} + +impl<'a> MaybePending<'a> { + fn new(inner: &'a [u8]) -> Self { + Self { inner, ready_read: false, ready_fill_buf: false } + } +} + +impl AsyncRead for MaybePending<'_> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + if self.ready_read { + self.ready_read = false; + Pin::new(&mut self.inner).poll_read(cx, buf) + } else { + self.ready_read = true; + Poll::Pending + } + } +} + +impl AsyncBufRead for MaybePending<'_> { + fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + if self.ready_fill_buf { + self.ready_fill_buf = false; + if self.inner.is_empty() { + return Poll::Ready(Ok(&[])); + } + let len = cmp::min(2, self.inner.len()); + Poll::Ready(Ok(&self.inner[0..len])) + } else { + self.ready_fill_buf = true; + Poll::Pending + } + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.inner = &self.inner[amt..]; + } +} + +#[test] +fn test_buffered_reader() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, inner); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 3); + assert_eq!(buf, [5, 6, 7]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 2); + assert_eq!(buf, [0, 1]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [2]); + assert_eq!(reader.buffer(), [3]); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [3, 0, 0]); + assert_eq!(reader.buffer(), []); + + let nread = reader.read(&mut buf).await.unwrap(); + assert_eq!(nread, 1); + assert_eq!(buf, [4, 0, 0]); + assert_eq!(reader.buffer(), []); + + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + }); +} + +#[test] +fn test_buffered_reader_seek() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(2, Cursor::new(inner)); + pin_mut!(reader); + + assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1, 2][..]); + reader.as_mut().consume(1); + assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3); + }); +} + +#[test] +fn test_buffered_reader_seek_relative() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(2, Cursor::new(inner)); + pin_mut!(reader); + + assert!(reader.as_mut().seek_relative(3).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.as_mut().seek_relative(0).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.as_mut().seek_relative(1).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1][..]); + assert!(reader.as_mut().seek_relative(-1).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); + assert!(reader.as_mut().seek_relative(2).await.is_ok()); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[2, 3][..]); + }); +} + +#[test] +fn test_buffered_reader_invalidated_after_read() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(3, Cursor::new(inner)); + pin_mut!(reader); + + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]); + reader.as_mut().consume(3); + + let mut buffer = [0, 0, 0, 0, 0]; + assert_eq!(reader.read(&mut buffer).await.unwrap(), 5); + assert_eq!(buffer, [0, 1, 2, 3, 4]); + + assert!(reader.as_mut().seek_relative(-2).await.is_ok()); + let mut buffer = [0, 0]; + assert_eq!(reader.read(&mut buffer).await.unwrap(), 2); + assert_eq!(buffer, [3, 4]); + }); +} + +#[test] +fn test_buffered_reader_invalidated_after_seek() { + block_on(async { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(3, Cursor::new(inner)); + pin_mut!(reader); + + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]); + reader.as_mut().consume(3); + + assert!(reader.seek(SeekFrom::Current(5)).await.is_ok()); + + assert!(reader.as_mut().seek_relative(-2).await.is_ok()); + let mut buffer = [0, 0]; + assert_eq!(reader.read(&mut buffer).await.unwrap(), 2); + assert_eq!(buffer, [3, 4]); + }); +} + +#[test] +fn test_buffered_reader_seek_underflow() { + // gimmick reader that yields its position modulo 256 for each byte + struct PositionReader { + pos: u64, + } + impl io::Read for PositionReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + let len = buf.len(); + for x in buf { + *x = self.pos as u8; + self.pos = self.pos.wrapping_add(1); + } + Ok(len) + } + } + impl io::Seek for PositionReader { + fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> { + match pos { + SeekFrom::Start(n) => { + self.pos = n; + } + SeekFrom::Current(n) => { + self.pos = self.pos.wrapping_add(n as u64); + } + SeekFrom::End(n) => { + self.pos = u64::MAX.wrapping_add(n as u64); + } + } + Ok(self.pos) + } + } + + block_on(async { + let reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 })); + pin_mut!(reader); + assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1, 2, 3, 4][..]); + assert_eq!(reader.seek(SeekFrom::End(-5)).await.unwrap(), u64::MAX - 5); + assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5); + // the following seek will require two underlying seeks + let expected = 9_223_372_036_854_775_802; + assert_eq!(reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap(), expected); + assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5); + // seeking to 0 should empty the buffer. + assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected); + assert_eq!(reader.get_ref().get_ref().pos, expected); + }); +} + +#[test] +fn test_short_reads() { + /// A dummy reader intended at testing short-reads propagation. + struct ShortReader { + lengths: Vec<usize>, + } + + impl io::Read for ShortReader { + fn read(&mut self, _: &mut [u8]) -> io::Result<usize> { + if self.lengths.is_empty() { + Ok(0) + } else { + Ok(self.lengths.remove(0)) + } + } + } + + block_on(async { + let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] }; + let mut reader = BufReader::new(AllowStdIo::new(inner)); + let mut buf = [0, 0]; + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 1); + assert_eq!(reader.read(&mut buf).await.unwrap(), 2); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 1); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + assert_eq!(reader.read(&mut buf).await.unwrap(), 0); + }); +} + +#[test] +fn maybe_pending() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, MaybePending::new(inner)); + + let mut buf = [0, 0, 0]; + let nread = run(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 3); + assert_eq!(buf, [5, 6, 7]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0, 0]; + let nread = run(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 2); + assert_eq!(buf, [0, 1]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0]; + let nread = run(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [2]); + assert_eq!(reader.buffer(), [3]); + + let mut buf = [0, 0, 0]; + let nread = run(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [3, 0, 0]); + assert_eq!(reader.buffer(), []); + + let nread = run(reader.read(&mut buf)); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [4, 0, 0]); + assert_eq!(reader.buffer(), []); + + assert_eq!(run(reader.read(&mut buf)).unwrap(), 0); +} + +#[test] +fn maybe_pending_buf_read() { + let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]); + let mut reader = BufReader::with_capacity(2, inner); + let mut v = Vec::new(); + run(reader.read_until(3, &mut v)).unwrap(); + assert_eq!(v, [0, 1, 2, 3]); + v.clear(); + run(reader.read_until(1, &mut v)).unwrap(); + assert_eq!(v, [1]); + v.clear(); + run(reader.read_until(8, &mut v)).unwrap(); + assert_eq!(v, [0]); + v.clear(); + run(reader.read_until(9, &mut v)).unwrap(); + assert_eq!(v, []); +} + +// https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309 +#[test] +fn maybe_pending_seek() { + #[pin_project] + struct MaybePendingSeek<'a> { + #[pin] + inner: Cursor<&'a [u8]>, + ready: bool, + } + + impl<'a> MaybePendingSeek<'a> { + fn new(inner: &'a [u8]) -> Self { + Self { inner: Cursor::new(inner), ready: true } + } + } + + impl AsyncRead for MaybePendingSeek<'_> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + self.project().inner.poll_read(cx, buf) + } + } + + impl AsyncBufRead for MaybePendingSeek<'_> { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + self.project().inner.poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.project().inner.consume(amt) + } + } + + impl AsyncSeek for MaybePendingSeek<'_> { + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<u64>> { + if self.ready { + *self.as_mut().project().ready = false; + self.project().inner.poll_seek(cx, pos) + } else { + *self.project().ready = true; + Poll::Pending + } + } + } + + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner)); + pin_mut!(reader); + + assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3)); + assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..])); + assert_eq!(run(reader.seek(SeekFrom::Current(i64::MIN))).ok(), None); + assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..])); + assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4)); + assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[1, 2][..])); + Pin::new(&mut reader).consume(1); + assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3)); +} diff --git a/third_party/rust/futures/tests/io_buf_writer.rs b/third_party/rust/futures/tests/io_buf_writer.rs new file mode 100644 index 0000000000..b264cd54c2 --- /dev/null +++ b/third_party/rust/futures/tests/io_buf_writer.rs @@ -0,0 +1,239 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{ + AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom, +}; +use futures::task::{Context, Poll}; +use futures_test::task::noop_context; +use std::io; +use std::pin::Pin; + +struct MaybePending { + inner: Vec<u8>, + ready: bool, +} + +impl MaybePending { + fn new(inner: Vec<u8>) -> Self { + Self { inner, ready: false } + } +} + +impl AsyncWrite for MaybePending { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + if self.ready { + self.ready = false; + Pin::new(&mut self.inner).poll_write(cx, buf) + } else { + self.ready = true; + Poll::Pending + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_close(cx) + } +} + +fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + +#[test] +fn buf_writer() { + let mut writer = BufWriter::with_capacity(2, Vec::new()); + + block_on(writer.write(&[0, 1])).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1]); + + block_on(writer.write(&[2])).unwrap(); + assert_eq!(writer.buffer(), [2]); + assert_eq!(*writer.get_ref(), [0, 1]); + + block_on(writer.write(&[3])).unwrap(); + assert_eq!(writer.buffer(), [2, 3]); + assert_eq!(*writer.get_ref(), [0, 1]); + + block_on(writer.flush()).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); + + block_on(writer.write(&[4])).unwrap(); + block_on(writer.write(&[5])).unwrap(); + assert_eq!(writer.buffer(), [4, 5]); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); + + block_on(writer.write(&[6])).unwrap(); + assert_eq!(writer.buffer(), [6]); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]); + + block_on(writer.write(&[7, 8])).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8]); + + block_on(writer.write(&[9, 10, 11])).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + + block_on(writer.flush()).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); +} + +#[test] +fn buf_writer_inner_flushes() { + let mut w = BufWriter::with_capacity(3, Vec::new()); + block_on(w.write(&[0, 1])).unwrap(); + assert_eq!(*w.get_ref(), []); + block_on(w.flush()).unwrap(); + let w = w.into_inner(); + assert_eq!(w, [0, 1]); +} + +#[test] +fn buf_writer_seek() { + // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed, + // use `Vec::new` instead of `vec![0; 8]`. + let mut w = BufWriter::with_capacity(3, Cursor::new(vec![0; 8])); + block_on(w.write_all(&[0, 1, 2, 3, 4, 5])).unwrap(); + block_on(w.write_all(&[6, 7])).unwrap(); + assert_eq!(block_on(w.seek(SeekFrom::Current(0))).ok(), Some(8)); + assert_eq!(&w.get_ref().get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]); + assert_eq!(block_on(w.seek(SeekFrom::Start(2))).ok(), Some(2)); + block_on(w.write_all(&[8, 9])).unwrap(); + block_on(w.flush()).unwrap(); + assert_eq!(&w.into_inner().into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]); +} + +#[test] +fn maybe_pending_buf_writer() { + let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new())); + + run(writer.write(&[0, 1])).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(&writer.get_ref().inner, &[0, 1]); + + run(writer.write(&[2])).unwrap(); + assert_eq!(writer.buffer(), [2]); + assert_eq!(&writer.get_ref().inner, &[0, 1]); + + run(writer.write(&[3])).unwrap(); + assert_eq!(writer.buffer(), [2, 3]); + assert_eq!(&writer.get_ref().inner, &[0, 1]); + + run(writer.flush()).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]); + + run(writer.write(&[4])).unwrap(); + run(writer.write(&[5])).unwrap(); + assert_eq!(writer.buffer(), [4, 5]); + assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]); + + run(writer.write(&[6])).unwrap(); + assert_eq!(writer.buffer(), [6]); + assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5]); + + run(writer.write(&[7, 8])).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8]); + + run(writer.write(&[9, 10, 11])).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + + run(writer.flush()).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); +} + +#[test] +fn maybe_pending_buf_writer_inner_flushes() { + let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new())); + run(w.write(&[0, 1])).unwrap(); + assert_eq!(&w.get_ref().inner, &[]); + run(w.flush()).unwrap(); + let w = w.into_inner().inner; + assert_eq!(w, [0, 1]); +} + +#[test] +fn maybe_pending_buf_writer_seek() { + struct MaybePendingSeek { + inner: Cursor<Vec<u8>>, + ready_write: bool, + ready_seek: bool, + } + + impl MaybePendingSeek { + fn new(inner: Vec<u8>) -> Self { + Self { inner: Cursor::new(inner), ready_write: false, ready_seek: false } + } + } + + impl AsyncWrite for MaybePendingSeek { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + if self.ready_write { + self.ready_write = false; + Pin::new(&mut self.inner).poll_write(cx, buf) + } else { + self.ready_write = true; + Poll::Pending + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Pin::new(&mut self.inner).poll_close(cx) + } + } + + impl AsyncSeek for MaybePendingSeek { + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<u64>> { + if self.ready_seek { + self.ready_seek = false; + Pin::new(&mut self.inner).poll_seek(cx, pos) + } else { + self.ready_seek = true; + Poll::Pending + } + } + } + + // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed, + // use `Vec::new` instead of `vec![0; 8]`. + let mut w = BufWriter::with_capacity(3, MaybePendingSeek::new(vec![0; 8])); + run(w.write_all(&[0, 1, 2, 3, 4, 5])).unwrap(); + run(w.write_all(&[6, 7])).unwrap(); + assert_eq!(run(w.seek(SeekFrom::Current(0))).ok(), Some(8)); + assert_eq!(&w.get_ref().inner.get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]); + assert_eq!(run(w.seek(SeekFrom::Start(2))).ok(), Some(2)); + run(w.write_all(&[8, 9])).unwrap(); + run(w.flush()).unwrap(); + assert_eq!(&w.into_inner().inner.into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]); +} diff --git a/third_party/rust/futures/tests/io_cursor.rs b/third_party/rust/futures/tests/io_cursor.rs new file mode 100644 index 0000000000..435ea5a155 --- /dev/null +++ b/third_party/rust/futures/tests/io_cursor.rs @@ -0,0 +1,30 @@ +use assert_matches::assert_matches; +use futures::executor::block_on; +use futures::future::lazy; +use futures::io::{AsyncWrite, Cursor}; +use futures::task::Poll; +use std::pin::Pin; + +#[test] +fn cursor_asyncwrite_vec() { + let mut cursor = Cursor::new(vec![0; 5]); + block_on(lazy(|cx| { + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2))); + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2))); + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(2))); + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[6, 7]), Poll::Ready(Ok(2))); + })); + assert_eq!(cursor.into_inner(), [1, 2, 3, 4, 5, 6, 6, 7]); +} + +#[test] +fn cursor_asyncwrite_box() { + let mut cursor = Cursor::new(vec![0; 5].into_boxed_slice()); + block_on(lazy(|cx| { + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2))); + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2))); + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(1))); + assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[6, 7]), Poll::Ready(Ok(0))); + })); + assert_eq!(&*cursor.into_inner(), [1, 2, 3, 4, 5]); +} diff --git a/third_party/rust/futures/tests/io_line_writer.rs b/third_party/rust/futures/tests/io_line_writer.rs new file mode 100644 index 0000000000..b483e0ff77 --- /dev/null +++ b/third_party/rust/futures/tests/io_line_writer.rs @@ -0,0 +1,73 @@ +use futures::executor::block_on; +use futures::io::{AsyncWriteExt, LineWriter}; +use std::io; + +#[test] +fn line_writer() { + let mut writer = LineWriter::new(Vec::new()); + + block_on(writer.write(&[0])).unwrap(); + assert_eq!(*writer.get_ref(), []); + + block_on(writer.write(&[1])).unwrap(); + assert_eq!(*writer.get_ref(), []); + + block_on(writer.flush()).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1]); + + block_on(writer.write(&[0, b'\n', 1, b'\n', 2])).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n']); + + block_on(writer.flush()).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2]); + + block_on(writer.write(&[3, b'\n'])).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']); +} + +#[test] +fn line_vectored() { + let mut line_writer = LineWriter::new(Vec::new()); + assert_eq!( + block_on(line_writer.write_vectored(&[ + io::IoSlice::new(&[]), + io::IoSlice::new(b"\n"), + io::IoSlice::new(&[]), + io::IoSlice::new(b"a"), + ])) + .unwrap(), + 2 + ); + assert_eq!(line_writer.get_ref(), b"\n"); + + assert_eq!( + block_on(line_writer.write_vectored(&[ + io::IoSlice::new(&[]), + io::IoSlice::new(b"b"), + io::IoSlice::new(&[]), + io::IoSlice::new(b"a"), + io::IoSlice::new(&[]), + io::IoSlice::new(b"c"), + ])) + .unwrap(), + 3 + ); + assert_eq!(line_writer.get_ref(), b"\n"); + block_on(line_writer.flush()).unwrap(); + assert_eq!(line_writer.get_ref(), b"\nabac"); + assert_eq!(block_on(line_writer.write_vectored(&[])).unwrap(), 0); + + assert_eq!( + block_on(line_writer.write_vectored(&[ + io::IoSlice::new(&[]), + io::IoSlice::new(&[]), + io::IoSlice::new(&[]), + io::IoSlice::new(&[]), + ])) + .unwrap(), + 0 + ); + + assert_eq!(block_on(line_writer.write_vectored(&[io::IoSlice::new(b"a\nb")])).unwrap(), 3); + assert_eq!(line_writer.get_ref(), b"\nabaca\nb"); +} diff --git a/third_party/rust/futures/tests/io_lines.rs b/third_party/rust/futures/tests/io_lines.rs new file mode 100644 index 0000000000..5ce01a6945 --- /dev/null +++ b/third_party/rust/futures/tests/io_lines.rs @@ -0,0 +1,60 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncBufReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + +macro_rules! block_on_next { + ($expr:expr) => { + block_on($expr.next()).unwrap().unwrap() + }; +} + +macro_rules! run_next { + ($expr:expr) => { + run($expr.next()).unwrap().unwrap() + }; +} + +#[test] +fn lines() { + let buf = Cursor::new(&b"12\r"[..]); + let mut s = buf.lines(); + assert_eq!(block_on_next!(s), "12\r".to_string()); + assert!(block_on(s.next()).is_none()); + + let buf = Cursor::new(&b"12\r\n\n"[..]); + let mut s = buf.lines(); + assert_eq!(block_on_next!(s), "12".to_string()); + assert_eq!(block_on_next!(s), "".to_string()); + assert!(block_on(s.next()).is_none()); +} + +#[test] +fn maybe_pending() { + let buf = + stream::iter(vec![&b"12"[..], &b"\r"[..]]).map(Ok).into_async_read().interleave_pending(); + let mut s = buf.lines(); + assert_eq!(run_next!(s), "12\r".to_string()); + assert!(run(s.next()).is_none()); + + let buf = stream::iter(vec![&b"12"[..], &b"\r\n"[..], &b"\n"[..]]) + .map(Ok) + .into_async_read() + .interleave_pending(); + let mut s = buf.lines(); + assert_eq!(run_next!(s), "12".to_string()); + assert_eq!(run_next!(s), "".to_string()); + assert!(run(s.next()).is_none()); +} diff --git a/third_party/rust/futures/tests/io_read.rs b/third_party/rust/futures/tests/io_read.rs new file mode 100644 index 0000000000..d39a6ea790 --- /dev/null +++ b/third_party/rust/futures/tests/io_read.rs @@ -0,0 +1,64 @@ +use futures::io::AsyncRead; +use futures_test::task::panic_context; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +struct MockReader { + fun: Box<dyn FnMut(&mut [u8]) -> Poll<io::Result<usize>>>, +} + +impl MockReader { + fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self { + Self { fun: Box::new(fun) } + } +} + +impl AsyncRead for MockReader { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + (self.get_mut().fun)(buf) + } +} + +/// Verifies that the default implementation of `poll_read_vectored` +/// calls `poll_read` with an empty slice if no buffers are provided. +#[test] +fn read_vectored_no_buffers() { + let mut reader = MockReader::new(|buf| { + assert_eq!(buf, b""); + Err(io::ErrorKind::BrokenPipe.into()).into() + }); + let cx = &mut panic_context(); + let bufs = &mut []; + + let res = Pin::new(&mut reader).poll_read_vectored(cx, bufs); + let res = res.map_err(|e| e.kind()); + assert_eq!(res, Poll::Ready(Err(io::ErrorKind::BrokenPipe))) +} + +/// Verifies that the default implementation of `poll_read_vectored` +/// calls `poll_read` with the first non-empty buffer. +#[test] +fn read_vectored_first_non_empty() { + let mut reader = MockReader::new(|buf| { + assert_eq!(buf.len(), 4); + buf.copy_from_slice(b"four"); + Poll::Ready(Ok(4)) + }); + let cx = &mut panic_context(); + let mut buf = [0; 4]; + let bufs = &mut [ + io::IoSliceMut::new(&mut []), + io::IoSliceMut::new(&mut []), + io::IoSliceMut::new(&mut buf), + ]; + + let res = Pin::new(&mut reader).poll_read_vectored(cx, bufs); + let res = res.map_err(|e| e.kind()); + assert_eq!(res, Poll::Ready(Ok(4))); + assert_eq!(buf, b"four"[..]); +} diff --git a/third_party/rust/futures/tests/io_read_exact.rs b/third_party/rust/futures/tests/io_read_exact.rs new file mode 100644 index 0000000000..6582e50b80 --- /dev/null +++ b/third_party/rust/futures/tests/io_read_exact.rs @@ -0,0 +1,17 @@ +use futures::executor::block_on; +use futures::io::AsyncReadExt; + +#[test] +fn read_exact() { + let mut reader: &[u8] = &[1, 2, 3, 4, 5]; + let mut out = [0u8; 3]; + + let res = block_on(reader.read_exact(&mut out)); // read 3 bytes out + assert!(res.is_ok()); + assert_eq!(out, [1, 2, 3]); + assert_eq!(reader.len(), 2); + + let res = block_on(reader.read_exact(&mut out)); // read another 3 bytes, but only 2 bytes left + assert!(res.is_err()); + assert_eq!(reader.len(), 0); +} diff --git a/third_party/rust/futures/tests/io_read_line.rs b/third_party/rust/futures/tests/io_read_line.rs new file mode 100644 index 0000000000..88a877928a --- /dev/null +++ b/third_party/rust/futures/tests/io_read_line.rs @@ -0,0 +1,58 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncBufReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + +#[test] +fn read_line() { + let mut buf = Cursor::new(b"12"); + let mut v = String::new(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2); + assert_eq!(v, "12"); + + let mut buf = Cursor::new(b"12\n\n"); + let mut v = String::new(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 3); + assert_eq!(v, "12\n"); + v.clear(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 1); + assert_eq!(v, "\n"); + v.clear(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 0); + assert_eq!(v, ""); +} + +#[test] +fn maybe_pending() { + let mut buf = b"12".interleave_pending(); + let mut v = String::new(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2); + assert_eq!(v, "12"); + + let mut buf = + stream::iter(vec![&b"12"[..], &b"\n\n"[..]]).map(Ok).into_async_read().interleave_pending(); + let mut v = String::new(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 3); + assert_eq!(v, "12\n"); + v.clear(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 1); + assert_eq!(v, "\n"); + v.clear(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0); + assert_eq!(v, ""); + v.clear(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0); + assert_eq!(v, ""); +} diff --git a/third_party/rust/futures/tests/io_read_to_end.rs b/third_party/rust/futures/tests/io_read_to_end.rs new file mode 100644 index 0000000000..7122511fcb --- /dev/null +++ b/third_party/rust/futures/tests/io_read_to_end.rs @@ -0,0 +1,65 @@ +use futures::{ + executor::block_on, + io::{self, AsyncRead, AsyncReadExt}, + task::{Context, Poll}, +}; +use std::pin::Pin; + +#[test] +#[should_panic(expected = "assertion failed: n <= buf.len()")] +fn issue2310() { + struct MyRead { + first: bool, + } + + impl MyRead { + fn new() -> Self { + MyRead { first: false } + } + } + + impl AsyncRead for MyRead { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context, + _buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + Poll::Ready(if !self.first { + self.first = true; + // First iteration: return more than the buffer size + Ok(64) + } else { + // Second iteration: indicate that we are done + Ok(0) + }) + } + } + + struct VecWrapper { + inner: Vec<u8>, + } + + impl VecWrapper { + fn new() -> Self { + VecWrapper { inner: Vec::new() } + } + } + + impl Drop for VecWrapper { + fn drop(&mut self) { + // Observe uninitialized bytes + println!("{:?}", &self.inner); + // Overwrite heap contents + for b in &mut self.inner { + *b = 0x90; + } + } + } + + block_on(async { + let mut vec = VecWrapper::new(); + let mut read = MyRead::new(); + + read.read_to_end(&mut vec.inner).await.unwrap(); + }) +} diff --git a/third_party/rust/futures/tests/io_read_to_string.rs b/third_party/rust/futures/tests/io_read_to_string.rs new file mode 100644 index 0000000000..ae6aaa21d8 --- /dev/null +++ b/third_party/rust/futures/tests/io_read_to_string.rs @@ -0,0 +1,44 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +#[test] +fn read_to_string() { + let mut c = Cursor::new(&b""[..]); + let mut v = String::new(); + assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 0); + assert_eq!(v, ""); + + let mut c = Cursor::new(&b"1"[..]); + let mut v = String::new(); + assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 1); + assert_eq!(v, "1"); + + let mut c = Cursor::new(&b"\xff"[..]); + let mut v = String::new(); + assert!(block_on(c.read_to_string(&mut v)).is_err()); +} + +#[test] +fn interleave_pending() { + fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } + } + let mut buf = stream::iter(vec![&b"12"[..], &b"33"[..], &b"3"[..]]) + .map(Ok) + .into_async_read() + .interleave_pending(); + + let mut v = String::new(); + assert_eq!(run(buf.read_to_string(&mut v)).unwrap(), 5); + assert_eq!(v, "12333"); +} diff --git a/third_party/rust/futures/tests/io_read_until.rs b/third_party/rust/futures/tests/io_read_until.rs new file mode 100644 index 0000000000..71f857f4b0 --- /dev/null +++ b/third_party/rust/futures/tests/io_read_until.rs @@ -0,0 +1,60 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncBufReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +fn run<F: Future + Unpin>(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + +#[test] +fn read_until() { + let mut buf = Cursor::new(b"12"); + let mut v = Vec::new(); + assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 2); + assert_eq!(v, b"12"); + + let mut buf = Cursor::new(b"1233"); + let mut v = Vec::new(); + assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 3); + assert_eq!(v, b"123"); + v.truncate(0); + assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 1); + assert_eq!(v, b"3"); + v.truncate(0); + assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 0); + assert_eq!(v, []); +} + +#[test] +fn maybe_pending() { + let mut buf = b"12".interleave_pending(); + let mut v = Vec::new(); + assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2); + assert_eq!(v, b"12"); + + let mut buf = stream::iter(vec![&b"12"[..], &b"33"[..], &b"3"[..]]) + .map(Ok) + .into_async_read() + .interleave_pending(); + let mut v = Vec::new(); + assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 3); + assert_eq!(v, b"123"); + v.clear(); + assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 1); + assert_eq!(v, b"3"); + v.clear(); + assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 1); + assert_eq!(v, b"3"); + v.clear(); + assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 0); + assert_eq!(v, []); +} diff --git a/third_party/rust/futures/tests/io_window.rs b/third_party/rust/futures/tests/io_window.rs new file mode 100644 index 0000000000..8f0d48bc94 --- /dev/null +++ b/third_party/rust/futures/tests/io_window.rs @@ -0,0 +1,30 @@ +#![allow(clippy::reversed_empty_ranges)] // This is intentional. + +use futures::io::Window; + +#[test] +fn set() { + let mut buffer = Window::new(&[1, 2, 3]); + buffer.set(..3); + assert_eq!(buffer.as_ref(), &[1, 2, 3]); + buffer.set(3..3); + assert_eq!(buffer.as_ref(), &[]); + buffer.set(3..=2); // == 3..3 + assert_eq!(buffer.as_ref(), &[]); + buffer.set(0..2); + assert_eq!(buffer.as_ref(), &[1, 2]); +} + +#[test] +#[should_panic] +fn set_panic_out_of_bounds() { + let mut buffer = Window::new(&[1, 2, 3]); + buffer.set(2..4); +} + +#[test] +#[should_panic] +fn set_panic_start_is_greater_than_end() { + let mut buffer = Window::new(&[1, 2, 3]); + buffer.set(3..2); +} diff --git a/third_party/rust/futures/tests/io_write.rs b/third_party/rust/futures/tests/io_write.rs new file mode 100644 index 0000000000..6af27553cb --- /dev/null +++ b/third_party/rust/futures/tests/io_write.rs @@ -0,0 +1,65 @@ +use futures::io::AsyncWrite; +use futures_test::task::panic_context; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +struct MockWriter { + fun: Box<dyn FnMut(&[u8]) -> Poll<io::Result<usize>>>, +} + +impl MockWriter { + fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self { + Self { fun: Box::new(fun) } + } +} + +impl AsyncWrite for MockWriter { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + (self.get_mut().fun)(buf) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + panic!() + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + panic!() + } +} + +/// Verifies that the default implementation of `poll_write_vectored` +/// calls `poll_write` with an empty slice if no buffers are provided. +#[test] +fn write_vectored_no_buffers() { + let mut writer = MockWriter::new(|buf| { + assert_eq!(buf, b""); + Err(io::ErrorKind::BrokenPipe.into()).into() + }); + let cx = &mut panic_context(); + let bufs = &mut []; + + let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs); + let res = res.map_err(|e| e.kind()); + assert_eq!(res, Poll::Ready(Err(io::ErrorKind::BrokenPipe))) +} + +/// Verifies that the default implementation of `poll_write_vectored` +/// calls `poll_write` with the first non-empty buffer. +#[test] +fn write_vectored_first_non_empty() { + let mut writer = MockWriter::new(|buf| { + assert_eq!(buf, b"four"); + Poll::Ready(Ok(4)) + }); + let cx = &mut panic_context(); + let bufs = &mut [io::IoSlice::new(&[]), io::IoSlice::new(&[]), io::IoSlice::new(b"four")]; + + let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs); + let res = res.map_err(|e| e.kind()); + assert_eq!(res, Poll::Ready(Ok(4))); +} diff --git a/third_party/rust/futures/tests/lock_mutex.rs b/third_party/rust/futures/tests/lock_mutex.rs new file mode 100644 index 0000000000..c15e76bd84 --- /dev/null +++ b/third_party/rust/futures/tests/lock_mutex.rs @@ -0,0 +1,69 @@ +use futures::channel::mpsc; +use futures::executor::{block_on, ThreadPool}; +use futures::future::{ready, FutureExt}; +use futures::lock::Mutex; +use futures::stream::StreamExt; +use futures::task::{Context, SpawnExt}; +use futures_test::future::FutureTestExt; +use futures_test::task::{new_count_waker, panic_context}; +use std::sync::Arc; + +#[test] +fn mutex_acquire_uncontested() { + let mutex = Mutex::new(()); + for _ in 0..10 { + assert!(mutex.lock().poll_unpin(&mut panic_context()).is_ready()); + } +} + +#[test] +fn mutex_wakes_waiters() { + let mutex = Mutex::new(()); + let (waker, counter) = new_count_waker(); + let lock = mutex.lock().poll_unpin(&mut panic_context()); + assert!(lock.is_ready()); + + let mut cx = Context::from_waker(&waker); + let mut waiter = mutex.lock(); + assert!(waiter.poll_unpin(&mut cx).is_pending()); + assert_eq!(counter, 0); + + drop(lock); + + assert_eq!(counter, 1); + assert!(waiter.poll_unpin(&mut panic_context()).is_ready()); +} + +#[test] +fn mutex_contested() { + { + let (tx, mut rx) = mpsc::unbounded(); + let pool = ThreadPool::builder().pool_size(16).create().unwrap(); + + let tx = Arc::new(tx); + let mutex = Arc::new(Mutex::new(0)); + + let num_tasks = 1000; + for _ in 0..num_tasks { + let tx = tx.clone(); + let mutex = mutex.clone(); + pool.spawn(async move { + let mut lock = mutex.lock().await; + ready(()).pending_once().await; + *lock += 1; + tx.unbounded_send(()).unwrap(); + drop(lock); + }) + .unwrap(); + } + + block_on(async { + for _ in 0..num_tasks { + rx.next().await.unwrap(); + } + let lock = mutex.lock().await; + assert_eq!(num_tasks, *lock); + }); + } + std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 +} diff --git a/third_party/rust/futures/tests/macro_comma_support.rs b/third_party/rust/futures/tests/macro_comma_support.rs new file mode 100644 index 0000000000..85871e98be --- /dev/null +++ b/third_party/rust/futures/tests/macro_comma_support.rs @@ -0,0 +1,43 @@ +use futures::{ + executor::block_on, + future::{self, FutureExt}, + join, ready, + task::Poll, + try_join, +}; + +#[test] +fn ready() { + block_on(future::poll_fn(|_| { + ready!(Poll::Ready(()),); + Poll::Ready(()) + })) +} + +#[test] +fn poll() { + use futures::poll; + + block_on(async { + let _ = poll!(async {}.boxed(),); + }) +} + +#[test] +fn join() { + block_on(async { + let future1 = async { 1 }; + let future2 = async { 2 }; + join!(future1, future2,); + }) +} + +#[test] +fn try_join() { + block_on(async { + let future1 = async { 1 }.never_error(); + let future2 = async { 2 }.never_error(); + try_join!(future1, future2,) + }) + .unwrap(); +} diff --git a/third_party/rust/futures/tests/object_safety.rs b/third_party/rust/futures/tests/object_safety.rs new file mode 100644 index 0000000000..30c892f5e6 --- /dev/null +++ b/third_party/rust/futures/tests/object_safety.rs @@ -0,0 +1,49 @@ +fn assert_is_object_safe<T>() {} + +#[test] +fn future() { + // `FutureExt`, `TryFutureExt` and `UnsafeFutureObj` are not object safe. + use futures::future::{FusedFuture, Future, TryFuture}; + + assert_is_object_safe::<&dyn Future<Output = ()>>(); + assert_is_object_safe::<&dyn FusedFuture<Output = ()>>(); + assert_is_object_safe::<&dyn TryFuture<Ok = (), Error = (), Output = Result<(), ()>>>(); +} + +#[test] +fn stream() { + // `StreamExt` and `TryStreamExt` are not object safe. + use futures::stream::{FusedStream, Stream, TryStream}; + + assert_is_object_safe::<&dyn Stream<Item = ()>>(); + assert_is_object_safe::<&dyn FusedStream<Item = ()>>(); + assert_is_object_safe::<&dyn TryStream<Ok = (), Error = (), Item = Result<(), ()>>>(); +} + +#[test] +fn sink() { + // `SinkExt` is not object safe. + use futures::sink::Sink; + + assert_is_object_safe::<&dyn Sink<(), Error = ()>>(); +} + +#[test] +fn io() { + // `AsyncReadExt`, `AsyncWriteExt`, `AsyncSeekExt` and `AsyncBufReadExt` are not object safe. + use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite}; + + assert_is_object_safe::<&dyn AsyncRead>(); + assert_is_object_safe::<&dyn AsyncWrite>(); + assert_is_object_safe::<&dyn AsyncSeek>(); + assert_is_object_safe::<&dyn AsyncBufRead>(); +} + +#[test] +fn task() { + // `ArcWake`, `SpawnExt` and `LocalSpawnExt` are not object safe. + use futures::task::{LocalSpawn, Spawn}; + + assert_is_object_safe::<&dyn Spawn>(); + assert_is_object_safe::<&dyn LocalSpawn>(); +} diff --git a/third_party/rust/futures/tests/oneshot.rs b/third_party/rust/futures/tests/oneshot.rs new file mode 100644 index 0000000000..34b78a33fb --- /dev/null +++ b/third_party/rust/futures/tests/oneshot.rs @@ -0,0 +1,78 @@ +use futures::channel::oneshot; +use futures::future::{FutureExt, TryFutureExt}; +use futures_test::future::FutureTestExt; +use std::sync::mpsc; +use std::thread; + +#[test] +fn oneshot_send1() { + let (tx1, rx1) = oneshot::channel::<i32>(); + let (tx2, rx2) = mpsc::channel(); + + let t = thread::spawn(|| tx1.send(1).unwrap()); + rx1.map_ok(move |x| tx2.send(x)).run_in_background(); + assert_eq!(1, rx2.recv().unwrap()); + t.join().unwrap(); +} + +#[test] +fn oneshot_send2() { + let (tx1, rx1) = oneshot::channel::<i32>(); + let (tx2, rx2) = mpsc::channel(); + + thread::spawn(|| tx1.send(1).unwrap()).join().unwrap(); + rx1.map_ok(move |x| tx2.send(x).unwrap()).run_in_background(); + assert_eq!(1, rx2.recv().unwrap()); +} + +#[test] +fn oneshot_send3() { + let (tx1, rx1) = oneshot::channel::<i32>(); + let (tx2, rx2) = mpsc::channel(); + + rx1.map_ok(move |x| tx2.send(x).unwrap()).run_in_background(); + thread::spawn(|| tx1.send(1).unwrap()).join().unwrap(); + assert_eq!(1, rx2.recv().unwrap()); +} + +#[test] +fn oneshot_drop_tx1() { + let (tx1, rx1) = oneshot::channel::<i32>(); + let (tx2, rx2) = mpsc::channel(); + + drop(tx1); + rx1.map(move |result| tx2.send(result).unwrap()).run_in_background(); + + assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap()); +} + +#[test] +fn oneshot_drop_tx2() { + let (tx1, rx1) = oneshot::channel::<i32>(); + let (tx2, rx2) = mpsc::channel(); + + let t = thread::spawn(|| drop(tx1)); + rx1.map(move |result| tx2.send(result).unwrap()).run_in_background(); + t.join().unwrap(); + + assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap()); +} + +#[test] +fn oneshot_drop_rx() { + let (tx, rx) = oneshot::channel::<i32>(); + drop(rx); + assert_eq!(Err(2), tx.send(2)); +} + +#[test] +fn oneshot_debug() { + let (tx, rx) = oneshot::channel::<i32>(); + assert_eq!(format!("{:?}", tx), "Sender { complete: false }"); + assert_eq!(format!("{:?}", rx), "Receiver { complete: false }"); + drop(rx); + assert_eq!(format!("{:?}", tx), "Sender { complete: true }"); + let (tx, rx) = oneshot::channel::<i32>(); + drop(tx); + assert_eq!(format!("{:?}", rx), "Receiver { complete: true }"); +} diff --git a/third_party/rust/futures/tests/ready_queue.rs b/third_party/rust/futures/tests/ready_queue.rs new file mode 100644 index 0000000000..c19d62593c --- /dev/null +++ b/third_party/rust/futures/tests/ready_queue.rs @@ -0,0 +1,148 @@ +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::Poll; +use futures_test::task::noop_context; +use std::panic::{self, AssertUnwindSafe}; +use std::sync::{Arc, Barrier}; +use std::thread; + +#[test] +fn basic_usage() { + block_on(future::lazy(move |cx| { + let mut queue = FuturesUnordered::new(); + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let (tx3, rx3) = oneshot::channel(); + + queue.push(rx1); + queue.push(rx2); + queue.push(rx3); + + assert!(!queue.poll_next_unpin(cx).is_ready()); + + tx2.send("hello").unwrap(); + + assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx)); + assert!(!queue.poll_next_unpin(cx).is_ready()); + + tx1.send("world").unwrap(); + tx3.send("world2").unwrap(); + + assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); + })); +} + +#[test] +fn resolving_errors() { + block_on(future::lazy(move |cx| { + let mut queue = FuturesUnordered::new(); + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let (tx3, rx3) = oneshot::channel(); + + queue.push(rx1); + queue.push(rx2); + queue.push(rx3); + + assert!(!queue.poll_next_unpin(cx).is_ready()); + + drop(tx2); + + assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx)); + assert!(!queue.poll_next_unpin(cx).is_ready()); + + drop(tx1); + tx3.send("world2").unwrap(); + + assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); + })); +} + +#[test] +fn dropping_ready_queue() { + block_on(future::lazy(move |_| { + let queue = FuturesUnordered::new(); + let (mut tx1, rx1) = oneshot::channel::<()>(); + let (mut tx2, rx2) = oneshot::channel::<()>(); + let (mut tx3, rx3) = oneshot::channel::<()>(); + + queue.push(rx1); + queue.push(rx2); + queue.push(rx3); + + { + let cx = &mut noop_context(); + assert!(!tx1.poll_canceled(cx).is_ready()); + assert!(!tx2.poll_canceled(cx).is_ready()); + assert!(!tx3.poll_canceled(cx).is_ready()); + + drop(queue); + + assert!(tx1.poll_canceled(cx).is_ready()); + assert!(tx2.poll_canceled(cx).is_ready()); + assert!(tx3.poll_canceled(cx).is_ready()); + } + })); +} + +#[test] +fn stress() { + const ITER: usize = if cfg!(miri) { 30 } else { 300 }; + + for i in 0..ITER { + let n = (i % 10) + 1; + + let mut queue = FuturesUnordered::new(); + + for _ in 0..5 { + let barrier = Arc::new(Barrier::new(n + 1)); + + for num in 0..n { + let barrier = barrier.clone(); + let (tx, rx) = oneshot::channel(); + + queue.push(rx); + + thread::spawn(move || { + barrier.wait(); + tx.send(num).unwrap(); + }); + } + + barrier.wait(); + + let mut sync = block_on_stream(queue); + + let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect(); + + assert_eq!(rx.len(), n); + + rx.sort_unstable(); + + for (i, x) in rx.into_iter().enumerate() { + assert_eq!(i, x); + } + + queue = sync.into_inner(); + } + } +} + +#[test] +fn panicking_future_dropped() { + block_on(future::lazy(move |cx| { + let mut queue = FuturesUnordered::new(); + queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() })); + + let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx))); + assert!(r.is_err()); + assert!(queue.is_empty()); + assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); + })); +} diff --git a/third_party/rust/futures/tests/recurse.rs b/third_party/rust/futures/tests/recurse.rs new file mode 100644 index 0000000000..d81753c9d7 --- /dev/null +++ b/third_party/rust/futures/tests/recurse.rs @@ -0,0 +1,25 @@ +use futures::executor::block_on; +use futures::future::{self, BoxFuture, FutureExt}; +use std::sync::mpsc; +use std::thread; + +#[test] +fn lots() { + #[cfg(not(futures_sanitizer))] + const N: i32 = 1_000; + #[cfg(futures_sanitizer)] // If N is many, asan reports stack-overflow: https://gist.github.com/taiki-e/099446d21cbec69d4acbacf7a9646136 + const N: i32 = 100; + + fn do_it(input: (i32, i32)) -> BoxFuture<'static, i32> { + let (n, x) = input; + if n == 0 { + future::ready(x).boxed() + } else { + future::ready((n - 1, x + n)).then(do_it).boxed() + } + } + + let (tx, rx) = mpsc::channel(); + thread::spawn(|| block_on(do_it((N, 0)).map(move |x| tx.send(x).unwrap()))); + assert_eq!((0..=N).sum::<i32>(), rx.recv().unwrap()); +} diff --git a/third_party/rust/futures/tests/sink.rs b/third_party/rust/futures/tests/sink.rs new file mode 100644 index 0000000000..5b691e74c6 --- /dev/null +++ b/third_party/rust/futures/tests/sink.rs @@ -0,0 +1,554 @@ +use futures::channel::{mpsc, oneshot}; +use futures::executor::block_on; +use futures::future::{self, poll_fn, Future, FutureExt, TryFutureExt}; +use futures::never::Never; +use futures::ready; +use futures::sink::{self, Sink, SinkErrInto, SinkExt}; +use futures::stream::{self, Stream, StreamExt}; +use futures::task::{self, ArcWake, Context, Poll, Waker}; +use futures_test::task::panic_context; +use std::cell::{Cell, RefCell}; +use std::collections::VecDeque; +use std::fmt; +use std::mem; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +fn sassert_next<S>(s: &mut S, item: S::Item) +where + S: Stream + Unpin, + S::Item: Eq + fmt::Debug, +{ + match s.poll_next_unpin(&mut panic_context()) { + Poll::Ready(None) => panic!("stream is at its end"), + Poll::Ready(Some(e)) => assert_eq!(e, item), + Poll::Pending => panic!("stream wasn't ready"), + } +} + +fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T { + match x { + Poll::Ready(Ok(x)) => x, + Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), + Poll::Pending => panic!("Poll::Pending"), + } +} + +// An Unpark struct that records unpark events for inspection +struct Flag(AtomicBool); + +impl Flag { + fn new() -> Arc<Self> { + Arc::new(Self(AtomicBool::new(false))) + } + + fn take(&self) -> bool { + self.0.swap(false, Ordering::SeqCst) + } + + fn set(&self, v: bool) { + self.0.store(v, Ordering::SeqCst) + } +} + +impl ArcWake for Flag { + fn wake_by_ref(arc_self: &Arc<Self>) { + arc_self.set(true) + } +} + +fn flag_cx<F, R>(f: F) -> R +where + F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R, +{ + let flag = Flag::new(); + let waker = task::waker_ref(&flag); + let cx = &mut Context::from_waker(&waker); + f(flag.clone(), cx) +} + +// Sends a value on an i32 channel sink +struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>); + +impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> { + fn new(sink: S, item: Item) -> Self { + Self(Some(sink), Some(item)) + } +} + +impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> { + type Output = Result<S, S::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let Self(inner, item) = self.get_mut(); + { + let mut inner = inner.as_mut().unwrap(); + ready!(Pin::new(&mut inner).poll_ready(cx))?; + Pin::new(&mut inner).start_send(item.take().unwrap())?; + } + Poll::Ready(Ok(inner.take().unwrap())) + } +} + +// Immediately accepts all requests to start pushing, but completion is managed +// by manually flushing +struct ManualFlush<T: Unpin> { + data: Vec<T>, + waiting_tasks: Vec<Waker>, +} + +impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> { + type Error = (); + + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> { + if let Some(item) = item { + self.data.push(item); + } else { + self.force_flush(); + } + Ok(()) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if self.data.is_empty() { + Poll::Ready(Ok(())) + } else { + self.waiting_tasks.push(cx.waker().clone()); + Poll::Pending + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.poll_flush(cx) + } +} + +impl<T: Unpin> ManualFlush<T> { + fn new() -> Self { + Self { data: Vec::new(), waiting_tasks: Vec::new() } + } + + fn force_flush(&mut self) -> Vec<T> { + for task in self.waiting_tasks.drain(..) { + task.wake() + } + mem::take(&mut self.data) + } +} + +struct ManualAllow<T: Unpin> { + data: Vec<T>, + allow: Rc<Allow>, +} + +struct Allow { + flag: Cell<bool>, + tasks: RefCell<Vec<Waker>>, +} + +impl Allow { + fn new() -> Self { + Self { flag: Cell::new(false), tasks: RefCell::new(Vec::new()) } + } + + fn check(&self, cx: &mut Context<'_>) -> bool { + if self.flag.get() { + true + } else { + self.tasks.borrow_mut().push(cx.waker().clone()); + false + } + } + + fn start(&self) { + self.flag.set(true); + let mut tasks = self.tasks.borrow_mut(); + for task in tasks.drain(..) { + task.wake(); + } + } +} + +impl<T: Unpin> Sink<T> for ManualAllow<T> { + type Error = (); + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if self.allow.check(cx) { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } + + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + self.data.push(item); + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + Poll::Ready(Ok(())) + } +} + +fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) { + let allow = Rc::new(Allow::new()); + let manual_allow = ManualAllow { data: Vec::new(), allow: allow.clone() }; + (manual_allow, allow) +} + +#[test] +fn either_sink() { + let mut s = + if true { Vec::<i32>::new().left_sink() } else { VecDeque::<i32>::new().right_sink() }; + + Pin::new(&mut s).start_send(0).unwrap(); +} + +#[test] +fn vec_sink() { + let mut v = Vec::new(); + Pin::new(&mut v).start_send(0).unwrap(); + Pin::new(&mut v).start_send(1).unwrap(); + assert_eq!(v, vec![0, 1]); + block_on(v.flush()).unwrap(); + assert_eq!(v, vec![0, 1]); +} + +#[test] +fn vecdeque_sink() { + let mut deque = VecDeque::new(); + Pin::new(&mut deque).start_send(2).unwrap(); + Pin::new(&mut deque).start_send(3).unwrap(); + + assert_eq!(deque.pop_front(), Some(2)); + assert_eq!(deque.pop_front(), Some(3)); + assert_eq!(deque.pop_front(), None); +} + +#[test] +fn send() { + let mut v = Vec::new(); + + block_on(v.send(0)).unwrap(); + assert_eq!(v, vec![0]); + + block_on(v.send(1)).unwrap(); + assert_eq!(v, vec![0, 1]); + + block_on(v.send(2)).unwrap(); + assert_eq!(v, vec![0, 1, 2]); +} + +#[test] +fn send_all() { + let mut v = Vec::new(); + + block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap(); + assert_eq!(v, vec![0, 1]); + + block_on(v.send_all(&mut stream::iter(vec![2, 3]).map(Ok))).unwrap(); + assert_eq!(v, vec![0, 1, 2, 3]); + + block_on(v.send_all(&mut stream::iter(vec![4, 5]).map(Ok))).unwrap(); + assert_eq!(v, vec![0, 1, 2, 3, 4, 5]); +} + +// Test that `start_send` on an `mpsc` channel does indeed block when the +// channel is full +#[test] +fn mpsc_blocking_start_send() { + let (mut tx, mut rx) = mpsc::channel::<i32>(0); + + block_on(future::lazy(|_| { + tx.start_send(0).unwrap(); + + flag_cx(|flag, cx| { + let mut task = StartSendFut::new(tx, 1); + + assert!(task.poll_unpin(cx).is_pending()); + assert!(!flag.take()); + sassert_next(&mut rx, 0); + assert!(flag.take()); + unwrap(task.poll_unpin(cx)); + assert!(!flag.take()); + sassert_next(&mut rx, 1); + }) + })); +} + +// test `flush` by using `with` to make the first insertion into a sink block +// until a oneshot is completed +#[test] +fn with_flush() { + let (tx, rx) = oneshot::channel(); + let mut block = rx.boxed(); + let mut sink = Vec::new().with(|elem| { + mem::replace(&mut block, future::ok(()).boxed()) + .map_ok(move |()| elem + 1) + .map_err(|_| -> Never { panic!() }) + }); + + assert_eq!(Pin::new(&mut sink).start_send(0).ok(), Some(())); + + flag_cx(|flag, cx| { + let mut task = sink.flush(); + assert!(task.poll_unpin(cx).is_pending()); + tx.send(()).unwrap(); + assert!(flag.take()); + + unwrap(task.poll_unpin(cx)); + + block_on(sink.send(1)).unwrap(); + assert_eq!(sink.get_ref(), &[1, 2]); + }) +} + +// test simple use of with to change data +#[test] +fn with_as_map() { + let mut sink = Vec::new().with(|item| future::ok::<i32, Never>(item * 2)); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + block_on(sink.send(2)).unwrap(); + assert_eq!(sink.get_ref(), &[0, 2, 4]); +} + +// test simple use of with_flat_map +#[test] +fn with_flat_map() { + let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok)); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + block_on(sink.send(2)).unwrap(); + block_on(sink.send(3)).unwrap(); + assert_eq!(sink.get_ref(), &[1, 2, 2, 3, 3, 3]); +} + +// Check that `with` propagates `poll_ready` to the inner sink. +// Regression test for the issue #1834. +#[test] +fn with_propagates_poll_ready() { + let (tx, mut rx) = mpsc::channel::<i32>(0); + let mut tx = tx.with(|item: i32| future::ok::<i32, mpsc::SendError>(item + 10)); + + block_on(future::lazy(|_| { + flag_cx(|flag, cx| { + let mut tx = Pin::new(&mut tx); + + // Should be ready for the first item. + assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(tx.as_mut().start_send(0), Ok(())); + + // Should be ready for the second item only after the first one is received. + assert_eq!(tx.as_mut().poll_ready(cx), Poll::Pending); + assert!(!flag.take()); + sassert_next(&mut rx, 10); + assert!(flag.take()); + assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(tx.as_mut().start_send(1), Ok(())); + }) + })); +} + +// test that the `with` sink doesn't require the underlying sink to flush, +// but doesn't claim to be flushed until the underlying sink is +#[test] +fn with_flush_propagate() { + let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>); + flag_cx(|flag, cx| { + unwrap(Pin::new(&mut sink).poll_ready(cx)); + Pin::new(&mut sink).start_send(Some(0)).unwrap(); + unwrap(Pin::new(&mut sink).poll_ready(cx)); + Pin::new(&mut sink).start_send(Some(1)).unwrap(); + + { + let mut task = sink.flush(); + assert!(task.poll_unpin(cx).is_pending()); + assert!(!flag.take()); + } + assert_eq!(sink.get_mut().force_flush(), vec![0, 1]); + assert!(flag.take()); + unwrap(sink.flush().poll_unpin(cx)); + }) +} + +// test that `Clone` is implemented on `with` sinks +#[test] +fn with_implements_clone() { + let (mut tx, rx) = mpsc::channel(5); + + { + let mut is_positive = tx.clone().with(|item| future::ok::<bool, mpsc::SendError>(item > 0)); + + let mut is_long = + tx.clone().with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5)); + + block_on(is_positive.clone().send(-1)).unwrap(); + block_on(is_long.clone().send("123456")).unwrap(); + block_on(is_long.send("123")).unwrap(); + block_on(is_positive.send(1)).unwrap(); + } + + block_on(tx.send(false)).unwrap(); + + block_on(tx.close()).unwrap(); + + assert_eq!(block_on(rx.collect::<Vec<_>>()), vec![false, true, false, true, false]); +} + +// test that a buffer is a no-nop around a sink that always accepts sends +#[test] +fn buffer_noop() { + let mut sink = Vec::new().buffer(0); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + assert_eq!(sink.get_ref(), &[0, 1]); + + let mut sink = Vec::new().buffer(1); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + assert_eq!(sink.get_ref(), &[0, 1]); +} + +// test basic buffer functionality, including both filling up to capacity, +// and writing out when the underlying sink is ready +#[test] +fn buffer() { + let (sink, allow) = manual_allow::<i32>(); + let sink = sink.buffer(2); + + let sink = block_on(StartSendFut::new(sink, 0)).unwrap(); + let mut sink = block_on(StartSendFut::new(sink, 1)).unwrap(); + + flag_cx(|flag, cx| { + let mut task = sink.send(2); + assert!(task.poll_unpin(cx).is_pending()); + assert!(!flag.take()); + allow.start(); + assert!(flag.take()); + unwrap(task.poll_unpin(cx)); + assert_eq!(sink.get_ref().data, vec![0, 1, 2]); + }) +} + +#[test] +fn fanout_smoke() { + let sink1 = Vec::new(); + let sink2 = Vec::new(); + let mut sink = sink1.fanout(sink2); + block_on(sink.send_all(&mut stream::iter(vec![1, 2, 3]).map(Ok))).unwrap(); + let (sink1, sink2) = sink.into_inner(); + assert_eq!(sink1, vec![1, 2, 3]); + assert_eq!(sink2, vec![1, 2, 3]); +} + +#[test] +fn fanout_backpressure() { + let (left_send, mut left_recv) = mpsc::channel(0); + let (right_send, mut right_recv) = mpsc::channel(0); + let sink = left_send.fanout(right_send); + + let mut sink = block_on(StartSendFut::new(sink, 0)).unwrap(); + + flag_cx(|flag, cx| { + let mut task = sink.send(2); + assert!(!flag.take()); + assert!(task.poll_unpin(cx).is_pending()); + assert_eq!(block_on(left_recv.next()), Some(0)); + assert!(flag.take()); + assert!(task.poll_unpin(cx).is_pending()); + assert_eq!(block_on(right_recv.next()), Some(0)); + assert!(flag.take()); + + assert!(task.poll_unpin(cx).is_pending()); + assert_eq!(block_on(left_recv.next()), Some(2)); + assert!(flag.take()); + assert!(task.poll_unpin(cx).is_pending()); + assert_eq!(block_on(right_recv.next()), Some(2)); + assert!(flag.take()); + + unwrap(task.poll_unpin(cx)); + // make sure receivers live until end of test to prevent send errors + drop(left_recv); + drop(right_recv); + }) +} + +#[test] +fn sink_map_err() { + { + let cx = &mut panic_context(); + let (tx, _rx) = mpsc::channel(1); + let mut tx = tx.sink_map_err(|_| ()); + assert_eq!(Pin::new(&mut tx).start_send(()), Ok(())); + assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(()))); + } + + let tx = mpsc::channel(0).0; + assert_eq!(Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), Err(())); +} + +#[test] +fn sink_unfold() { + block_on(poll_fn(|cx| { + let (tx, mut rx) = mpsc::channel(1); + let unfold = sink::unfold((), |(), i: i32| { + let mut tx = tx.clone(); + async move { + tx.send(i).await.unwrap(); + Ok::<_, String>(()) + } + }); + futures::pin_mut!(unfold); + assert_eq!(unfold.as_mut().start_send(1), Ok(())); + assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Ready(Ok(()))); + assert_eq!(rx.try_next().unwrap(), Some(1)); + + assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(unfold.as_mut().start_send(2), Ok(())); + assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(unfold.as_mut().start_send(3), Ok(())); + assert_eq!(rx.try_next().unwrap(), Some(2)); + assert!(rx.try_next().is_err()); + assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(()))); + assert_eq!(unfold.as_mut().start_send(4), Ok(())); + assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Pending); // Channel full + assert_eq!(rx.try_next().unwrap(), Some(3)); + assert_eq!(rx.try_next().unwrap(), Some(4)); + + Poll::Ready(()) + })) +} + +#[test] +fn err_into() { + #[derive(Copy, Clone, Debug, PartialEq, Eq)] + struct ErrIntoTest; + + impl From<mpsc::SendError> for ErrIntoTest { + fn from(_: mpsc::SendError) -> Self { + Self + } + } + + { + let cx = &mut panic_context(); + let (tx, _rx) = mpsc::channel(1); + let mut tx: SinkErrInto<mpsc::Sender<()>, _, ErrIntoTest> = tx.sink_err_into(); + assert_eq!(Pin::new(&mut tx).start_send(()), Ok(())); + assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(()))); + } + + let tx = mpsc::channel(0).0; + assert_eq!(Pin::new(&mut tx.sink_err_into()).start_send(()), Err(ErrIntoTest)); +} diff --git a/third_party/rust/futures/tests/sink_fanout.rs b/third_party/rust/futures/tests/sink_fanout.rs new file mode 100644 index 0000000000..e57b2d8c7b --- /dev/null +++ b/third_party/rust/futures/tests/sink_fanout.rs @@ -0,0 +1,24 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::future::join3; +use futures::sink::SinkExt; +use futures::stream::{self, StreamExt}; + +#[test] +fn it_works() { + let (tx1, rx1) = mpsc::channel(1); + let (tx2, rx2) = mpsc::channel(2); + let tx = tx1.fanout(tx2).sink_map_err(|_| ()); + + let src = stream::iter((0..10).map(Ok)); + let fwd = src.forward(tx); + + let collect_fut1 = rx1.collect::<Vec<_>>(); + let collect_fut2 = rx2.collect::<Vec<_>>(); + let (_, vec1, vec2) = block_on(join3(fwd, collect_fut1, collect_fut2)); + + let expected = (0..10).collect::<Vec<_>>(); + + assert_eq!(vec1, expected); + assert_eq!(vec2, expected); +} diff --git a/third_party/rust/futures/tests/stream.rs b/third_party/rust/futures/tests/stream.rs new file mode 100644 index 0000000000..5cde45833f --- /dev/null +++ b/third_party/rust/futures/tests/stream.rs @@ -0,0 +1,463 @@ +use std::cell::Cell; +use std::iter; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::Arc; +use std::task::Context; + +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::future::{self, Future}; +use futures::lock::Mutex; +use futures::sink::SinkExt; +use futures::stream::{self, StreamExt}; +use futures::task::Poll; +use futures::{ready, FutureExt}; +use futures_core::Stream; +use futures_test::task::noop_context; + +#[test] +fn select() { + fn select_and_compare(a: Vec<u32>, b: Vec<u32>, expected: Vec<u32>) { + let a = stream::iter(a); + let b = stream::iter(b); + let vec = block_on(stream::select(a, b).collect::<Vec<_>>()); + assert_eq!(vec, expected); + } + + select_and_compare(vec![1, 2, 3], vec![4, 5, 6], vec![1, 4, 2, 5, 3, 6]); + select_and_compare(vec![1, 2, 3], vec![4, 5], vec![1, 4, 2, 5, 3]); + select_and_compare(vec![1, 2], vec![4, 5, 6], vec![1, 4, 2, 5, 6]); +} + +#[test] +fn flat_map() { + block_on(async { + let st = + stream::iter(vec![stream::iter(0..=4u8), stream::iter(6..=10), stream::iter(0..=2)]); + + let values: Vec<_> = + st.flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0))).collect().await; + + assert_eq!(values, vec![0, 2, 4, 6, 8, 10, 0, 2]); + }); +} + +#[test] +fn scan() { + block_on(async { + let values = stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2]) + .scan(1, |state, e| { + *state += 1; + futures::future::ready(if e < *state { Some(e) } else { None }) + }) + .collect::<Vec<_>>() + .await; + + assert_eq!(values, vec![1u8, 2, 3, 4]); + }); +} + +#[test] +fn flatten_unordered() { + use futures::executor::block_on; + use futures::stream::*; + use futures::task::*; + use std::convert::identity; + use std::pin::Pin; + use std::thread; + use std::time::Duration; + + struct DataStream { + data: Vec<u8>, + polled: bool, + wake_immediately: bool, + } + + impl Stream for DataStream { + type Item = u8; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> { + if !self.polled { + if !self.wake_immediately { + let waker = ctx.waker().clone(); + let sleep_time = + Duration::from_millis(*self.data.first().unwrap_or(&0) as u64 / 10); + thread::spawn(move || { + thread::sleep(sleep_time); + waker.wake_by_ref(); + }); + } else { + ctx.waker().wake_by_ref(); + } + self.polled = true; + Poll::Pending + } else { + self.polled = false; + Poll::Ready(self.data.pop()) + } + } + } + + struct Interchanger { + polled: bool, + base: u8, + wake_immediately: bool, + } + + impl Stream for Interchanger { + type Item = DataStream; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> { + if !self.polled { + self.polled = true; + if !self.wake_immediately { + let waker = ctx.waker().clone(); + let sleep_time = Duration::from_millis(self.base as u64); + thread::spawn(move || { + thread::sleep(sleep_time); + waker.wake_by_ref(); + }); + } else { + ctx.waker().wake_by_ref(); + } + Poll::Pending + } else { + let data: Vec<_> = (0..6).rev().map(|v| v + self.base * 6).collect(); + self.base += 1; + self.polled = false; + Poll::Ready(Some(DataStream { + polled: false, + data, + wake_immediately: self.wake_immediately && self.base % 2 == 0, + })) + } + } + } + + // basic behaviour + { + block_on(async { + let st = stream::iter(vec![ + stream::iter(0..=4u8), + stream::iter(6..=10), + stream::iter(10..=12), + ]); + + let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await; + + assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]); + }); + + block_on(async { + let st = stream::iter(vec![ + stream::iter(0..=4u8), + stream::iter(6..=10), + stream::iter(0..=2), + ]); + + let mut fm_unordered = st + .flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0))) + .collect::<Vec<_>>() + .await; + + fm_unordered.sort_unstable(); + + assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]); + }); + } + + // wake up immediately + { + block_on(async { + let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true } + .take(10) + .map(|s| s.map(identity)) + .flatten_unordered(10) + .collect::<Vec<_>>() + .await; + + fl_unordered.sort_unstable(); + + assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>()); + }); + + block_on(async { + let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)) + .collect::<Vec<_>>() + .await; + + fm_unordered.sort_unstable(); + + assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>()); + }); + } + + // wake up after delay + { + block_on(async { + let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .map(|s| s.map(identity)) + .flatten_unordered(10) + .collect::<Vec<_>>() + .await; + + fl_unordered.sort_unstable(); + + assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>()); + }); + + block_on(async { + let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)) + .collect::<Vec<_>>() + .await; + + fm_unordered.sort_unstable(); + + assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>()); + }); + + block_on(async { + let (mut fm_unordered, mut fl_unordered) = futures_util::join!( + Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)) + .collect::<Vec<_>>(), + Interchanger { polled: false, base: 0, wake_immediately: false } + .take(10) + .map(|s| s.map(identity)) + .flatten_unordered(10) + .collect::<Vec<_>>() + ); + + fm_unordered.sort_unstable(); + fl_unordered.sort_unstable(); + + assert_eq!(fm_unordered, fl_unordered); + assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>()); + }); + } + + // waker panics + { + let stream = Arc::new(Mutex::new( + Interchanger { polled: false, base: 0, wake_immediately: true } + .take(10) + .flat_map_unordered(10, |s| s.map(identity)), + )); + + struct PanicWaker; + + impl ArcWake for PanicWaker { + fn wake_by_ref(_arc_self: &Arc<Self>) { + panic!("WAKE UP"); + } + } + + std::thread::spawn({ + let stream = stream.clone(); + move || { + let mut st = poll_fn(|cx| { + let mut lock = ready!(stream.lock().poll_unpin(cx)); + + let panic_waker = waker(Arc::new(PanicWaker)); + let mut panic_cx = Context::from_waker(&panic_waker); + let _ = ready!(lock.poll_next_unpin(&mut panic_cx)); + + Poll::Ready(Some(())) + }); + + block_on(st.next()) + } + }) + .join() + .unwrap_err(); + + block_on(async move { + let mut values: Vec<_> = stream.lock().await.by_ref().collect().await; + values.sort_unstable(); + + assert_eq!(values, (0..60).collect::<Vec<u8>>()); + }); + } + + // stream panics + { + let st = stream::iter(iter::once( + once(Box::pin(async { panic!("Polled") })).left_stream::<DataStream>(), + )) + .chain( + Interchanger { polled: false, base: 0, wake_immediately: true } + .map(|stream| stream.right_stream()) + .take(10), + ); + + let stream = Arc::new(Mutex::new(st.flatten_unordered(10))); + + std::thread::spawn({ + let stream = stream.clone(); + move || { + let mut st = poll_fn(|cx| { + let mut lock = ready!(stream.lock().poll_unpin(cx)); + let data = ready!(lock.poll_next_unpin(cx)); + + Poll::Ready(data) + }); + + block_on(st.next()) + } + }) + .join() + .unwrap_err(); + + block_on(async move { + let mut values: Vec<_> = stream.lock().await.by_ref().collect().await; + values.sort_unstable(); + + assert_eq!(values, (0..60).collect::<Vec<u8>>()); + }); + } +} + +#[test] +fn take_until() { + fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> { + let mut i = 0; + future::poll_fn(move |_cx| { + i += 1; + if i <= stop_on { + Poll::Pending + } else { + Poll::Ready(()) + } + }) + } + + block_on(async { + // Verify stopping works: + let stream = stream::iter(1u32..=10); + let stop_fut = make_stop_fut(5); + + let stream = stream.take_until(stop_fut); + let last = stream.fold(0, |_, i| async move { i }).await; + assert_eq!(last, 5); + + // Verify take_future() works: + let stream = stream::iter(1..=10); + let stop_fut = make_stop_fut(5); + + let mut stream = stream.take_until(stop_fut); + + assert_eq!(stream.next().await, Some(1)); + assert_eq!(stream.next().await, Some(2)); + + stream.take_future(); + + let last = stream.fold(0, |_, i| async move { i }).await; + assert_eq!(last, 10); + + // Verify take_future() returns None if stream is stopped: + let stream = stream::iter(1u32..=10); + let stop_fut = make_stop_fut(1); + let mut stream = stream.take_until(stop_fut); + assert_eq!(stream.next().await, Some(1)); + assert_eq!(stream.next().await, None); + assert!(stream.take_future().is_none()); + + // Verify TakeUntil is fused: + let mut i = 0; + let stream = stream::poll_fn(move |_cx| { + i += 1; + match i { + 1 => Poll::Ready(Some(1)), + 2 => Poll::Ready(None), + _ => panic!("TakeUntil not fused"), + } + }); + + let stop_fut = make_stop_fut(1); + let mut stream = stream.take_until(stop_fut); + assert_eq!(stream.next().await, Some(1)); + assert_eq!(stream.next().await, None); + assert_eq!(stream.next().await, None); + }); +} + +#[test] +#[should_panic] +fn chunks_panic_on_cap_zero() { + let (_, rx1) = mpsc::channel::<()>(1); + + let _ = rx1.chunks(0); +} + +#[test] +#[should_panic] +fn ready_chunks_panic_on_cap_zero() { + let (_, rx1) = mpsc::channel::<()>(1); + + let _ = rx1.ready_chunks(0); +} + +#[test] +fn ready_chunks() { + let (mut tx, rx1) = mpsc::channel::<i32>(16); + + let mut s = rx1.ready_chunks(2); + + let mut cx = noop_context(); + assert!(s.next().poll_unpin(&mut cx).is_pending()); + + block_on(async { + tx.send(1).await.unwrap(); + + assert_eq!(s.next().await.unwrap(), vec![1]); + tx.send(2).await.unwrap(); + tx.send(3).await.unwrap(); + tx.send(4).await.unwrap(); + assert_eq!(s.next().await.unwrap(), vec![2, 3]); + assert_eq!(s.next().await.unwrap(), vec![4]); + }); +} + +struct SlowStream { + times_should_poll: usize, + times_polled: Rc<Cell<usize>>, +} +impl Stream for SlowStream { + type Item = usize; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + self.times_polled.set(self.times_polled.get() + 1); + if self.times_polled.get() % 2 == 0 { + cx.waker().wake_by_ref(); + return Poll::Pending; + } + if self.times_polled.get() >= self.times_should_poll { + return Poll::Ready(None); + } + Poll::Ready(Some(self.times_polled.get())) + } +} + +#[test] +fn select_with_strategy_doesnt_terminate_early() { + for side in [stream::PollNext::Left, stream::PollNext::Right] { + let times_should_poll = 10; + let count = Rc::new(Cell::new(0)); + let b = stream::iter([10, 20]); + + let mut selected = stream::select_with_strategy( + SlowStream { times_should_poll, times_polled: count.clone() }, + b, + |_: &mut ()| side, + ); + block_on(async move { while selected.next().await.is_some() {} }); + assert_eq!(count.get(), times_should_poll + 1); + } +} diff --git a/third_party/rust/futures/tests/stream_abortable.rs b/third_party/rust/futures/tests/stream_abortable.rs new file mode 100644 index 0000000000..2339dd0522 --- /dev/null +++ b/third_party/rust/futures/tests/stream_abortable.rs @@ -0,0 +1,46 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::stream::{abortable, Stream, StreamExt}; +use futures::task::{Context, Poll}; +use futures::SinkExt; +use futures_test::task::new_count_waker; +use std::pin::Pin; + +#[test] +fn abortable_works() { + let (_tx, a_rx) = mpsc::channel::<()>(1); + let (mut abortable_rx, abort_handle) = abortable(a_rx); + + abort_handle.abort(); + assert!(abortable_rx.is_aborted()); + assert_eq!(None, block_on(abortable_rx.next())); +} + +#[test] +fn abortable_awakens() { + let (_tx, a_rx) = mpsc::channel::<()>(1); + let (mut abortable_rx, abort_handle) = abortable(a_rx); + + let (waker, counter) = new_count_waker(); + let mut cx = Context::from_waker(&waker); + + assert_eq!(counter, 0); + assert_eq!(Poll::Pending, Pin::new(&mut abortable_rx).poll_next(&mut cx)); + assert_eq!(counter, 0); + + abort_handle.abort(); + assert_eq!(counter, 1); + assert!(abortable_rx.is_aborted()); + assert_eq!(Poll::Ready(None), Pin::new(&mut abortable_rx).poll_next(&mut cx)); +} + +#[test] +fn abortable_resolves() { + let (mut tx, a_rx) = mpsc::channel::<()>(1); + let (mut abortable_rx, _abort_handle) = abortable(a_rx); + + block_on(tx.send(())).unwrap(); + + assert!(!abortable_rx.is_aborted()); + assert_eq!(Some(()), block_on(abortable_rx.next())); +} diff --git a/third_party/rust/futures/tests/stream_buffer_unordered.rs b/third_party/rust/futures/tests/stream_buffer_unordered.rs new file mode 100644 index 0000000000..9a2ee174ed --- /dev/null +++ b/third_party/rust/futures/tests/stream_buffer_unordered.rs @@ -0,0 +1,73 @@ +use futures::channel::{mpsc, oneshot}; +use futures::executor::{block_on, block_on_stream}; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use std::sync::mpsc as std_mpsc; +use std::thread; + +#[test] +#[ignore] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790 +fn works() { + const N: usize = 4; + + let (mut tx, rx) = mpsc::channel(1); + + let (tx2, rx2) = std_mpsc::channel(); + let (tx3, rx3) = std_mpsc::channel(); + let t1 = thread::spawn(move || { + for _ in 0..=N { + let (mytx, myrx) = oneshot::channel(); + block_on(tx.send(myrx)).unwrap(); + tx3.send(mytx).unwrap(); + } + rx2.recv().unwrap(); + for _ in 0..N { + let (mytx, myrx) = oneshot::channel(); + block_on(tx.send(myrx)).unwrap(); + tx3.send(mytx).unwrap(); + } + }); + + let (tx4, rx4) = std_mpsc::channel(); + let t2 = thread::spawn(move || { + for item in block_on_stream(rx.buffer_unordered(N)) { + tx4.send(item.unwrap()).unwrap(); + } + }); + + let o1 = rx3.recv().unwrap(); + let o2 = rx3.recv().unwrap(); + let o3 = rx3.recv().unwrap(); + let o4 = rx3.recv().unwrap(); + assert!(rx4.try_recv().is_err()); + + o1.send(1).unwrap(); + assert_eq!(rx4.recv(), Ok(1)); + o3.send(3).unwrap(); + assert_eq!(rx4.recv(), Ok(3)); + tx2.send(()).unwrap(); + o2.send(2).unwrap(); + assert_eq!(rx4.recv(), Ok(2)); + o4.send(4).unwrap(); + assert_eq!(rx4.recv(), Ok(4)); + + let o5 = rx3.recv().unwrap(); + let o6 = rx3.recv().unwrap(); + let o7 = rx3.recv().unwrap(); + let o8 = rx3.recv().unwrap(); + let o9 = rx3.recv().unwrap(); + + o5.send(5).unwrap(); + assert_eq!(rx4.recv(), Ok(5)); + o8.send(8).unwrap(); + assert_eq!(rx4.recv(), Ok(8)); + o9.send(9).unwrap(); + assert_eq!(rx4.recv(), Ok(9)); + o7.send(7).unwrap(); + assert_eq!(rx4.recv(), Ok(7)); + o6.send(6).unwrap(); + assert_eq!(rx4.recv(), Ok(6)); + + t1.join().unwrap(); + t2.join().unwrap(); +} diff --git a/third_party/rust/futures/tests/stream_catch_unwind.rs b/third_party/rust/futures/tests/stream_catch_unwind.rs new file mode 100644 index 0000000000..8b23a0a7ef --- /dev/null +++ b/third_party/rust/futures/tests/stream_catch_unwind.rs @@ -0,0 +1,27 @@ +use futures::executor::block_on_stream; +use futures::stream::{self, StreamExt}; + +#[test] +fn panic_in_the_middle_of_the_stream() { + let stream = stream::iter(vec![Some(10), None, Some(11)]); + + // panic on second element + let stream_panicking = stream.map(|o| o.unwrap()); + let mut iter = block_on_stream(stream_panicking.catch_unwind()); + + assert_eq!(10, iter.next().unwrap().ok().unwrap()); + assert!(iter.next().unwrap().is_err()); + assert!(iter.next().is_none()); +} + +#[test] +fn no_panic() { + let stream = stream::iter(vec![10, 11, 12]); + + let mut iter = block_on_stream(stream.catch_unwind()); + + assert_eq!(10, iter.next().unwrap().ok().unwrap()); + assert_eq!(11, iter.next().unwrap().ok().unwrap()); + assert_eq!(12, iter.next().unwrap().ok().unwrap()); + assert!(iter.next().is_none()); +} diff --git a/third_party/rust/futures/tests/stream_futures_ordered.rs b/third_party/rust/futures/tests/stream_futures_ordered.rs new file mode 100644 index 0000000000..5a4a3e22ee --- /dev/null +++ b/third_party/rust/futures/tests/stream_futures_ordered.rs @@ -0,0 +1,172 @@ +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{self, join, Future, FutureExt, TryFutureExt}; +use futures::stream::{FuturesOrdered, StreamExt}; +use futures::task::Poll; +use futures_test::task::noop_context; +use std::any::Any; + +#[test] +fn works_1() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + + let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesOrdered<_>>(); + + b_tx.send(99).unwrap(); + assert!(stream.poll_next_unpin(&mut noop_context()).is_pending()); + + a_tx.send(33).unwrap(); + c_tx.send(33).unwrap(); + + let mut iter = block_on_stream(stream); + assert_eq!(Some(Ok(33)), iter.next()); + assert_eq!(Some(Ok(99)), iter.next()); + assert_eq!(Some(Ok(33)), iter.next()); + assert_eq!(None, iter.next()); +} + +#[test] +fn works_2() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + + let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()] + .into_iter() + .collect::<FuturesOrdered<_>>(); + + let mut cx = noop_context(); + a_tx.send(33).unwrap(); + b_tx.send(33).unwrap(); + assert!(stream.poll_next_unpin(&mut cx).is_ready()); + assert!(stream.poll_next_unpin(&mut cx).is_pending()); + c_tx.send(33).unwrap(); + assert!(stream.poll_next_unpin(&mut cx).is_ready()); +} + +#[test] +fn test_push_front() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + let (d_tx, d_rx) = oneshot::channel::<i32>(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_back(a_rx); + stream.push_back(b_rx); + stream.push_back(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // 1 and 2 should be received in order + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + + stream.push_front(d_rx); + d_tx.send(4).unwrap(); + + // we pushed `d_rx` to the front and sent 4, so we should recieve 4 next + // and then 3 after it + assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); +} + +#[test] +fn test_push_back() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + let (d_tx, d_rx) = oneshot::channel::<i32>(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_back(a_rx); + stream.push_back(b_rx); + stream.push_back(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // All results should be received in order + + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + + stream.push_back(d_rx); + d_tx.send(4).unwrap(); + + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx)); +} + +#[test] +fn from_iterator() { + let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)] + .into_iter() + .collect::<FuturesOrdered<_>>(); + assert_eq!(stream.len(), 3); + assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]); +} + +#[test] +fn queue_never_unblocked() { + let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>(); + let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>(); + let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>(); + + let mut stream = vec![ + Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>, + Box::new( + future::try_select(b_rx, c_rx) + .map_err(|e| e.factor_first().0) + .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>)), + ) as _, + ] + .into_iter() + .collect::<FuturesOrdered<_>>(); + + let cx = &mut noop_context(); + for _ in 0..10 { + assert!(stream.poll_next_unpin(cx).is_pending()); + } + + b_tx.send(Box::new(())).unwrap(); + assert!(stream.poll_next_unpin(cx).is_pending()); + c_tx.send(Box::new(())).unwrap(); + assert!(stream.poll_next_unpin(cx).is_pending()); + assert!(stream.poll_next_unpin(cx).is_pending()); +} + +#[test] +fn test_push_front_negative() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + + let mut stream = FuturesOrdered::new(); + + let mut cx = noop_context(); + + stream.push_front(a_rx); + stream.push_front(b_rx); + stream.push_front(c_rx); + + a_tx.send(1).unwrap(); + b_tx.send(2).unwrap(); + c_tx.send(3).unwrap(); + + // These should all be recieved in reverse order + assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx)); + assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx)); +} diff --git a/third_party/rust/futures/tests/stream_futures_unordered.rs b/third_party/rust/futures/tests/stream_futures_unordered.rs new file mode 100644 index 0000000000..b568280479 --- /dev/null +++ b/third_party/rust/futures/tests/stream_futures_unordered.rs @@ -0,0 +1,383 @@ +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{self, join, Future, FutureExt}; +use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; +use futures::task::{Context, Poll}; +use futures_test::future::FutureTestExt; +use futures_test::task::noop_context; +use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending}; +use std::iter::FromIterator; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; + +#[test] +fn is_terminated() { + let mut cx = noop_context(); + let mut tasks = FuturesUnordered::new(); + + assert_eq!(tasks.is_terminated(), false); + assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); + assert_eq!(tasks.is_terminated(), true); + + // Test that the sentinel value doesn't leak + assert_eq!(tasks.is_empty(), true); + assert_eq!(tasks.len(), 0); + assert_eq!(tasks.iter_mut().len(), 0); + + tasks.push(future::ready(1)); + + assert_eq!(tasks.is_empty(), false); + assert_eq!(tasks.len(), 1); + assert_eq!(tasks.iter_mut().len(), 1); + + assert_eq!(tasks.is_terminated(), false); + assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1))); + assert_eq!(tasks.is_terminated(), false); + assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); + assert_eq!(tasks.is_terminated(), true); +} + +#[test] +fn works_1() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + + let mut iter = + block_on_stream(vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>()); + + b_tx.send(99).unwrap(); + assert_eq!(Some(Ok(99)), iter.next()); + + a_tx.send(33).unwrap(); + c_tx.send(33).unwrap(); + assert_eq!(Some(Ok(33)), iter.next()); + assert_eq!(Some(Ok(33)), iter.next()); + assert_eq!(None, iter.next()); +} + +#[test] +fn works_2() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + + let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()] + .into_iter() + .collect::<FuturesUnordered<_>>(); + + a_tx.send(9).unwrap(); + b_tx.send(10).unwrap(); + + let mut cx = noop_context(); + assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(9)))); + c_tx.send(20).unwrap(); + assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(30)))); + assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(None)); +} + +#[test] +fn from_iterator() { + let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)] + .into_iter() + .collect::<FuturesUnordered<_>>(); + assert_eq!(stream.len(), 3); + assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]); +} + +#[test] +fn finished_future() { + let (_a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + + let mut stream = vec![ + Box::new(a_rx) as Box<dyn Future<Output = Result<_, _>> + Unpin>, + Box::new(future::select(b_rx, c_rx).map(|e| e.factor_first().0)) as _, + ] + .into_iter() + .collect::<FuturesUnordered<_>>(); + + let cx = &mut noop_context(); + for _ in 0..10 { + assert!(stream.poll_next_unpin(cx).is_pending()); + } + + b_tx.send(12).unwrap(); + c_tx.send(3).unwrap(); + assert!(stream.poll_next_unpin(cx).is_ready()); + assert!(stream.poll_next_unpin(cx).is_pending()); + assert!(stream.poll_next_unpin(cx).is_pending()); +} + +#[test] +fn iter_mut_cancel() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + + let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>(); + + for rx in stream.iter_mut() { + rx.close(); + } + + let mut iter = block_on_stream(stream); + + assert!(a_tx.is_canceled()); + assert!(b_tx.is_canceled()); + assert!(c_tx.is_canceled()); + + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), None); +} + +#[test] +fn iter_mut_len() { + let mut stream = + vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()] + .into_iter() + .collect::<FuturesUnordered<_>>(); + + let mut iter_mut = stream.iter_mut(); + assert_eq!(iter_mut.len(), 3); + assert!(iter_mut.next().is_some()); + assert_eq!(iter_mut.len(), 2); + assert!(iter_mut.next().is_some()); + assert_eq!(iter_mut.len(), 1); + assert!(iter_mut.next().is_some()); + assert_eq!(iter_mut.len(), 0); + assert!(iter_mut.next().is_none()); +} + +#[test] +fn iter_cancel() { + struct AtomicCancel<F> { + future: F, + cancel: AtomicBool, + } + + impl<F: Future + Unpin> Future for AtomicCancel<F> { + type Output = Option<<F as Future>::Output>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + if self.cancel.load(Ordering::Relaxed) { + Poll::Ready(None) + } else { + self.future.poll_unpin(cx).map(Some) + } + } + } + + impl<F: Future + Unpin> AtomicCancel<F> { + fn new(future: F) -> Self { + Self { future, cancel: AtomicBool::new(false) } + } + } + + let stream = vec![ + AtomicCancel::new(future::pending::<()>()), + AtomicCancel::new(future::pending::<()>()), + AtomicCancel::new(future::pending::<()>()), + ] + .into_iter() + .collect::<FuturesUnordered<_>>(); + + for f in stream.iter() { + f.cancel.store(true, Ordering::Relaxed); + } + + let mut iter = block_on_stream(stream); + + assert_eq!(iter.next(), Some(None)); + assert_eq!(iter.next(), Some(None)); + assert_eq!(iter.next(), Some(None)); + assert_eq!(iter.next(), None); +} + +#[test] +fn iter_len() { + let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()] + .into_iter() + .collect::<FuturesUnordered<_>>(); + + let mut iter = stream.iter(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} + +#[test] +fn into_iter_cancel() { + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + + let stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>(); + + let stream = stream + .into_iter() + .map(|mut rx| { + rx.close(); + rx + }) + .collect::<FuturesUnordered<_>>(); + + let mut iter = block_on_stream(stream); + + assert!(a_tx.is_canceled()); + assert!(b_tx.is_canceled()); + assert!(c_tx.is_canceled()); + + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled))); + assert_eq!(iter.next(), None); +} + +#[test] +fn into_iter_len() { + let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()] + .into_iter() + .collect::<FuturesUnordered<_>>(); + + let mut into_iter = stream.into_iter(); + assert_eq!(into_iter.len(), 3); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 2); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 1); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 0); + assert!(into_iter.next().is_none()); +} + +#[test] +fn into_iter_partial() { + let stream = vec![future::ready(1), future::ready(2), future::ready(3), future::ready(4)] + .into_iter() + .collect::<FuturesUnordered<_>>(); + + let mut into_iter = stream.into_iter(); + assert!(into_iter.next().is_some()); + assert!(into_iter.next().is_some()); + assert!(into_iter.next().is_some()); + assert_eq!(into_iter.len(), 1); + // don't panic when iterator is dropped before completing +} + +#[test] +fn futures_not_moved_after_poll() { + // Future that will be ready after being polled twice, + // asserting that it does not move. + let fut = future::ready(()).pending_once().assert_unmoved(); + let mut stream = vec![fut; 3].into_iter().collect::<FuturesUnordered<_>>(); + assert_stream_pending!(stream); + assert_stream_next!(stream, ()); + assert_stream_next!(stream, ()); + assert_stream_next!(stream, ()); + assert_stream_done!(stream); +} + +#[test] +fn len_valid_during_out_of_order_completion() { + // Complete futures out-of-order and add new futures afterwards to ensure + // length values remain correct. + let (a_tx, a_rx) = oneshot::channel::<i32>(); + let (b_tx, b_rx) = oneshot::channel::<i32>(); + let (c_tx, c_rx) = oneshot::channel::<i32>(); + let (d_tx, d_rx) = oneshot::channel::<i32>(); + + let mut cx = noop_context(); + let mut stream = FuturesUnordered::new(); + assert_eq!(stream.len(), 0); + + stream.push(a_rx); + assert_eq!(stream.len(), 1); + stream.push(b_rx); + assert_eq!(stream.len(), 2); + stream.push(c_rx); + assert_eq!(stream.len(), 3); + + b_tx.send(4).unwrap(); + assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(4)))); + assert_eq!(stream.len(), 2); + + stream.push(d_rx); + assert_eq!(stream.len(), 3); + + c_tx.send(5).unwrap(); + assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(5)))); + assert_eq!(stream.len(), 2); + + d_tx.send(6).unwrap(); + assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(6)))); + assert_eq!(stream.len(), 1); + + a_tx.send(7).unwrap(); + assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(7)))); + assert_eq!(stream.len(), 0); +} + +#[test] +fn polled_only_once_at_most_per_iteration() { + #[derive(Debug, Clone, Copy, Default)] + struct F { + polled: bool, + } + + impl Future for F { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> { + if self.polled { + panic!("polled twice") + } else { + self.polled = true; + Poll::Pending + } + } + } + + let cx = &mut noop_context(); + + let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 10]); + assert!(tasks.poll_next_unpin(cx).is_pending()); + assert_eq!(10, tasks.iter().filter(|f| f.polled).count()); + + let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]); + assert!(tasks.poll_next_unpin(cx).is_pending()); + assert_eq!(33, tasks.iter().filter(|f| f.polled).count()); + + let mut tasks = FuturesUnordered::<F>::new(); + assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx)); +} + +#[test] +fn clear() { + let mut tasks = FuturesUnordered::from_iter(vec![future::ready(1), future::ready(2)]); + + assert_eq!(block_on(tasks.next()), Some(1)); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + tasks.push(future::ready(3)); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + assert_eq!(block_on(tasks.next()), None); + assert!(tasks.is_terminated()); + tasks.clear(); + assert!(!tasks.is_terminated()); +} diff --git a/third_party/rust/futures/tests/stream_into_async_read.rs b/third_party/rust/futures/tests/stream_into_async_read.rs new file mode 100644 index 0000000000..60188d3e58 --- /dev/null +++ b/third_party/rust/futures/tests/stream_into_async_read.rs @@ -0,0 +1,94 @@ +use core::pin::Pin; +use futures::io::{AsyncBufRead, AsyncRead}; +use futures::stream::{self, TryStreamExt}; +use futures::task::Poll; +use futures_test::{stream::StreamTestExt, task::noop_context}; + +macro_rules! assert_read { + ($reader:expr, $buf:expr, $item:expr) => { + let mut cx = noop_context(); + loop { + match Pin::new(&mut $reader).poll_read(&mut cx, $buf) { + Poll::Ready(Ok(x)) => { + assert_eq!(x, $item); + break; + } + Poll::Ready(Err(err)) => { + panic!("assertion failed: expected value but got {}", err); + } + Poll::Pending => { + continue; + } + } + } + }; +} + +macro_rules! assert_fill_buf { + ($reader:expr, $buf:expr) => { + let mut cx = noop_context(); + loop { + match Pin::new(&mut $reader).poll_fill_buf(&mut cx) { + Poll::Ready(Ok(x)) => { + assert_eq!(x, $buf); + break; + } + Poll::Ready(Err(err)) => { + panic!("assertion failed: expected value but got {}", err); + } + Poll::Pending => { + continue; + } + } + } + }; +} + +#[test] +fn test_into_async_read() { + let stream = stream::iter((1..=3).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])])); + let mut reader = stream.interleave_pending().into_async_read(); + let mut buf = vec![0; 3]; + + assert_read!(reader, &mut buf, 3); + assert_eq!(&buf, &[1, 2, 3]); + + assert_read!(reader, &mut buf, 2); + assert_eq!(&buf[..2], &[4, 5]); + + assert_read!(reader, &mut buf, 3); + assert_eq!(&buf, &[1, 2, 3]); + + assert_read!(reader, &mut buf, 2); + assert_eq!(&buf[..2], &[4, 5]); + + assert_read!(reader, &mut buf, 3); + assert_eq!(&buf, &[1, 2, 3]); + + assert_read!(reader, &mut buf, 2); + assert_eq!(&buf[..2], &[4, 5]); + + assert_read!(reader, &mut buf, 0); +} + +#[test] +fn test_into_async_bufread() { + let stream = stream::iter((1..=2).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])])); + let mut reader = stream.interleave_pending().into_async_read(); + + let mut reader = Pin::new(&mut reader); + + assert_fill_buf!(reader, &[1, 2, 3, 4, 5][..]); + reader.as_mut().consume(3); + + assert_fill_buf!(reader, &[4, 5][..]); + reader.as_mut().consume(2); + + assert_fill_buf!(reader, &[1, 2, 3, 4, 5][..]); + reader.as_mut().consume(2); + + assert_fill_buf!(reader, &[3, 4, 5][..]); + reader.as_mut().consume(3); + + assert_fill_buf!(reader, &[][..]); +} diff --git a/third_party/rust/futures/tests/stream_peekable.rs b/third_party/rust/futures/tests/stream_peekable.rs new file mode 100644 index 0000000000..153fcc25b4 --- /dev/null +++ b/third_party/rust/futures/tests/stream_peekable.rs @@ -0,0 +1,58 @@ +use futures::executor::block_on; +use futures::pin_mut; +use futures::stream::{self, Peekable, StreamExt}; + +#[test] +fn peekable() { + block_on(async { + let peekable: Peekable<_> = stream::iter(vec![1u8, 2, 3]).peekable(); + pin_mut!(peekable); + assert_eq!(peekable.as_mut().peek().await, Some(&1u8)); + assert_eq!(peekable.collect::<Vec<u8>>().await, vec![1, 2, 3]); + + let s = stream::once(async { 1 }).peekable(); + pin_mut!(s); + assert_eq!(s.as_mut().peek().await, Some(&1u8)); + assert_eq!(s.collect::<Vec<u8>>().await, vec![1]); + }); +} + +#[test] +fn peekable_mut() { + block_on(async { + let s = stream::iter(vec![1u8, 2, 3]).peekable(); + pin_mut!(s); + if let Some(p) = s.as_mut().peek_mut().await { + if *p == 1 { + *p = 5; + } + } + assert_eq!(s.collect::<Vec<_>>().await, vec![5, 2, 3]); + }); +} + +#[test] +fn peekable_next_if_eq() { + block_on(async { + // first, try on references + let s = stream::iter(vec!["Heart", "of", "Gold"]).peekable(); + pin_mut!(s); + // try before `peek()` + assert_eq!(s.as_mut().next_if_eq(&"trillian").await, None); + assert_eq!(s.as_mut().next_if_eq(&"Heart").await, Some("Heart")); + // try after peek() + assert_eq!(s.as_mut().peek().await, Some(&"of")); + assert_eq!(s.as_mut().next_if_eq(&"of").await, Some("of")); + assert_eq!(s.as_mut().next_if_eq(&"zaphod").await, None); + // make sure `next()` still behaves + assert_eq!(s.next().await, Some("Gold")); + + // make sure comparison works for owned values + let s = stream::iter(vec![String::from("Ludicrous"), "speed".into()]).peekable(); + pin_mut!(s); + // make sure basic functionality works + assert_eq!(s.as_mut().next_if_eq("Ludicrous").await, Some("Ludicrous".into())); + assert_eq!(s.as_mut().next_if_eq("speed").await, Some("speed".into())); + assert_eq!(s.as_mut().next_if_eq("").await, None); + }); +} diff --git a/third_party/rust/futures/tests/stream_select_all.rs b/third_party/rust/futures/tests/stream_select_all.rs new file mode 100644 index 0000000000..4ae0735762 --- /dev/null +++ b/third_party/rust/futures/tests/stream_select_all.rs @@ -0,0 +1,197 @@ +use futures::channel::mpsc; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{self, FutureExt}; +use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt}; +use futures::task::Poll; +use futures_test::task::noop_context; + +#[test] +fn is_terminated() { + let mut cx = noop_context(); + let mut tasks = SelectAll::new(); + + assert_eq!(tasks.is_terminated(), false); + assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); + assert_eq!(tasks.is_terminated(), true); + + // Test that the sentinel value doesn't leak + assert_eq!(tasks.is_empty(), true); + assert_eq!(tasks.len(), 0); + + tasks.push(future::ready(1).into_stream()); + + assert_eq!(tasks.is_empty(), false); + assert_eq!(tasks.len(), 1); + + assert_eq!(tasks.is_terminated(), false); + assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1))); + assert_eq!(tasks.is_terminated(), false); + assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); + assert_eq!(tasks.is_terminated(), true); +} + +#[test] +fn issue_1626() { + let a = stream::iter(0..=2); + let b = stream::iter(10..=14); + + let mut s = block_on_stream(stream::select_all(vec![a, b])); + + assert_eq!(s.next(), Some(0)); + assert_eq!(s.next(), Some(10)); + assert_eq!(s.next(), Some(1)); + assert_eq!(s.next(), Some(11)); + assert_eq!(s.next(), Some(2)); + assert_eq!(s.next(), Some(12)); + assert_eq!(s.next(), Some(13)); + assert_eq!(s.next(), Some(14)); + assert_eq!(s.next(), None); +} + +#[test] +fn works_1() { + let (a_tx, a_rx) = mpsc::unbounded::<u32>(); + let (b_tx, b_rx) = mpsc::unbounded::<u32>(); + let (c_tx, c_rx) = mpsc::unbounded::<u32>(); + + let streams = vec![a_rx, b_rx, c_rx]; + + let mut stream = block_on_stream(select_all(streams)); + + b_tx.unbounded_send(99).unwrap(); + a_tx.unbounded_send(33).unwrap(); + assert_eq!(Some(33), stream.next()); + assert_eq!(Some(99), stream.next()); + + b_tx.unbounded_send(99).unwrap(); + a_tx.unbounded_send(33).unwrap(); + assert_eq!(Some(33), stream.next()); + assert_eq!(Some(99), stream.next()); + + c_tx.unbounded_send(42).unwrap(); + assert_eq!(Some(42), stream.next()); + a_tx.unbounded_send(43).unwrap(); + assert_eq!(Some(43), stream.next()); + + drop((a_tx, b_tx, c_tx)); + assert_eq!(None, stream.next()); +} + +#[test] +fn clear() { + let mut tasks = + select_all(vec![stream::iter(vec![1].into_iter()), stream::iter(vec![2].into_iter())]); + + assert_eq!(block_on(tasks.next()), Some(1)); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + tasks.push(stream::iter(vec![3].into_iter())); + assert!(!tasks.is_empty()); + + tasks.clear(); + assert!(tasks.is_empty()); + + assert_eq!(block_on(tasks.next()), None); + assert!(tasks.is_terminated()); + tasks.clear(); + assert!(!tasks.is_terminated()); +} + +#[test] +fn iter_mut() { + let mut stream = + vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] + .into_iter() + .collect::<SelectAll<_>>(); + + let mut iter = stream.iter_mut(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] + .into_iter() + .collect::<SelectAll<_>>(); + + assert_eq!(stream.len(), 3); + assert_eq!(block_on(stream.next()), Some(1)); + assert_eq!(stream.len(), 2); + let mut iter = stream.iter_mut(); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + assert_eq!(block_on(stream.next()), Some(2)); + assert_eq!(stream.len(), 2); + assert_eq!(block_on(stream.next()), None); + let mut iter = stream.iter_mut(); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} + +#[test] +fn iter() { + let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] + .into_iter() + .collect::<SelectAll<_>>(); + + let mut iter = stream.iter(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] + .into_iter() + .collect::<SelectAll<_>>(); + + assert_eq!(stream.len(), 3); + assert_eq!(block_on(stream.next()), Some(1)); + assert_eq!(stream.len(), 2); + let mut iter = stream.iter(); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); + + assert_eq!(block_on(stream.next()), Some(2)); + assert_eq!(stream.len(), 2); + assert_eq!(block_on(stream.next()), None); + let mut iter = stream.iter(); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} + +#[test] +fn into_iter() { + let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] + .into_iter() + .collect::<SelectAll<_>>(); + + let mut iter = stream.into_iter(); + assert_eq!(iter.len(), 3); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 2); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 1); + assert!(iter.next().is_some()); + assert_eq!(iter.len(), 0); + assert!(iter.next().is_none()); +} diff --git a/third_party/rust/futures/tests/stream_select_next_some.rs b/third_party/rust/futures/tests/stream_select_next_some.rs new file mode 100644 index 0000000000..8252ad7b54 --- /dev/null +++ b/third_party/rust/futures/tests/stream_select_next_some.rs @@ -0,0 +1,86 @@ +use futures::executor::block_on; +use futures::future::{self, FusedFuture, FutureExt}; +use futures::select; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::{Context, Poll}; +use futures_test::future::FutureTestExt; +use futures_test::task::new_count_waker; + +#[test] +fn is_terminated() { + let (waker, counter) = new_count_waker(); + let mut cx = Context::from_waker(&waker); + + let mut tasks = FuturesUnordered::new(); + + let mut select_next_some = tasks.select_next_some(); + assert_eq!(select_next_some.is_terminated(), false); + assert_eq!(select_next_some.poll_unpin(&mut cx), Poll::Pending); + assert_eq!(counter, 1); + assert_eq!(select_next_some.is_terminated(), true); + drop(select_next_some); + + tasks.push(future::ready(1)); + + let mut select_next_some = tasks.select_next_some(); + assert_eq!(select_next_some.is_terminated(), false); + assert_eq!(select_next_some.poll_unpin(&mut cx), Poll::Ready(1)); + assert_eq!(select_next_some.is_terminated(), false); + assert_eq!(select_next_some.poll_unpin(&mut cx), Poll::Pending); + assert_eq!(select_next_some.is_terminated(), true); +} + +#[test] +fn select() { + // Checks that even though `async_tasks` will yield a `None` and return + // `is_terminated() == true` during the first poll, it manages to toggle + // back to having items after a future is pushed into it during the second + // poll (after pending_once completes). + block_on(async { + let mut fut = future::ready(1).pending_once(); + let mut async_tasks = FuturesUnordered::new(); + let mut total = 0; + loop { + select! { + num = fut => { + total += num; + async_tasks.push(async { 5 }); + }, + num = async_tasks.select_next_some() => { + total += num; + } + complete => break, + } + } + assert_eq!(total, 6); + }); +} + +// Check that `select!` macro does not fail when importing from `futures_util`. +#[test] +fn futures_util_select() { + use futures_util::select; + + // Checks that even though `async_tasks` will yield a `None` and return + // `is_terminated() == true` during the first poll, it manages to toggle + // back to having items after a future is pushed into it during the second + // poll (after pending_once completes). + block_on(async { + let mut fut = future::ready(1).pending_once(); + let mut async_tasks = FuturesUnordered::new(); + let mut total = 0; + loop { + select! { + num = fut => { + total += num; + async_tasks.push(async { 5 }); + }, + num = async_tasks.select_next_some() => { + total += num; + } + complete => break, + } + } + assert_eq!(total, 6); + }); +} diff --git a/third_party/rust/futures/tests/stream_split.rs b/third_party/rust/futures/tests/stream_split.rs new file mode 100644 index 0000000000..694c151807 --- /dev/null +++ b/third_party/rust/futures/tests/stream_split.rs @@ -0,0 +1,57 @@ +use futures::executor::block_on; +use futures::sink::{Sink, SinkExt}; +use futures::stream::{self, Stream, StreamExt}; +use futures::task::{Context, Poll}; +use pin_project::pin_project; +use std::pin::Pin; + +#[test] +fn test_split() { + #[pin_project] + struct Join<T, U> { + #[pin] + stream: T, + #[pin] + sink: U, + } + + impl<T: Stream, U> Stream for Join<T, U> { + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { + self.project().stream.poll_next(cx) + } + } + + impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> { + type Error = U::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.project().sink.poll_ready(cx) + } + + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { + self.project().sink.start_send(item) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.project().sink.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + self.project().sink.poll_close(cx) + } + } + + let mut dest: Vec<i32> = Vec::new(); + { + let join = Join { stream: stream::iter(vec![10, 20, 30]), sink: &mut dest }; + + let (sink, stream) = join.split(); + let join = sink.reunite(stream).expect("test_split: reunite error"); + let (mut sink, stream) = join.split(); + let mut stream = stream.map(Ok); + block_on(sink.send_all(&mut stream)).unwrap(); + } + assert_eq!(dest, vec![10, 20, 30]); +} diff --git a/third_party/rust/futures/tests/stream_try_stream.rs b/third_party/rust/futures/tests/stream_try_stream.rs new file mode 100644 index 0000000000..194e74db74 --- /dev/null +++ b/third_party/rust/futures/tests/stream_try_stream.rs @@ -0,0 +1,38 @@ +use futures::{ + stream::{self, StreamExt, TryStreamExt}, + task::Poll, +}; +use futures_test::task::noop_context; + +#[test] +fn try_filter_map_after_err() { + let cx = &mut noop_context(); + let mut s = stream::iter(1..=3) + .map(Ok) + .try_filter_map(|v| async move { Err::<Option<()>, _>(v) }) + .filter_map(|r| async move { r.ok() }) + .boxed(); + assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); +} + +#[test] +fn try_skip_while_after_err() { + let cx = &mut noop_context(); + let mut s = stream::iter(1..=3) + .map(Ok) + .try_skip_while(|_| async move { Err::<_, ()>(()) }) + .filter_map(|r| async move { r.ok() }) + .boxed(); + assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); +} + +#[test] +fn try_take_while_after_err() { + let cx = &mut noop_context(); + let mut s = stream::iter(1..=3) + .map(Ok) + .try_take_while(|_| async move { Err::<_, ()>(()) }) + .filter_map(|r| async move { r.ok() }) + .boxed(); + assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx)); +} diff --git a/third_party/rust/futures/tests/stream_unfold.rs b/third_party/rust/futures/tests/stream_unfold.rs new file mode 100644 index 0000000000..16b10813b1 --- /dev/null +++ b/third_party/rust/futures/tests/stream_unfold.rs @@ -0,0 +1,32 @@ +use futures::future; +use futures::stream; +use futures_test::future::FutureTestExt; +use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending}; + +#[test] +fn unfold1() { + let mut stream = stream::unfold(0, |state| { + if state <= 2 { + future::ready(Some((state * 2, state + 1))).pending_once() + } else { + future::ready(None).pending_once() + } + }); + + // Creates the future with the closure + // Not ready (delayed future) + assert_stream_pending!(stream); + // Future is ready, yields the item + assert_stream_next!(stream, 0); + + // Repeat + assert_stream_pending!(stream); + assert_stream_next!(stream, 2); + + assert_stream_pending!(stream); + assert_stream_next!(stream, 4); + + // No more items + assert_stream_pending!(stream); + assert_stream_done!(stream); +} diff --git a/third_party/rust/futures/tests/task_arc_wake.rs b/third_party/rust/futures/tests/task_arc_wake.rs new file mode 100644 index 0000000000..aedc15bcb8 --- /dev/null +++ b/third_party/rust/futures/tests/task_arc_wake.rs @@ -0,0 +1,79 @@ +use futures::task::{self, ArcWake, Waker}; +use std::panic; +use std::sync::{Arc, Mutex}; + +struct CountingWaker { + nr_wake: Mutex<i32>, +} + +impl CountingWaker { + fn new() -> Self { + Self { nr_wake: Mutex::new(0) } + } + + fn wakes(&self) -> i32 { + *self.nr_wake.lock().unwrap() + } +} + +impl ArcWake for CountingWaker { + fn wake_by_ref(arc_self: &Arc<Self>) { + let mut lock = arc_self.nr_wake.lock().unwrap(); + *lock += 1; + } +} + +#[test] +fn create_from_arc() { + let some_w = Arc::new(CountingWaker::new()); + + let w1: Waker = task::waker(some_w.clone()); + assert_eq!(2, Arc::strong_count(&some_w)); + w1.wake_by_ref(); + assert_eq!(1, some_w.wakes()); + + let w2 = w1.clone(); + assert_eq!(3, Arc::strong_count(&some_w)); + + w2.wake_by_ref(); + assert_eq!(2, some_w.wakes()); + + drop(w2); + assert_eq!(2, Arc::strong_count(&some_w)); + drop(w1); + assert_eq!(1, Arc::strong_count(&some_w)); +} + +#[test] +fn ref_wake_same() { + let some_w = Arc::new(CountingWaker::new()); + + let w1: Waker = task::waker(some_w.clone()); + let w2 = task::waker_ref(&some_w); + let w3 = w2.clone(); + + assert!(w1.will_wake(&w2)); + assert!(w2.will_wake(&w3)); +} + +#[test] +fn proper_refcount_on_wake_panic() { + struct PanicWaker; + + impl ArcWake for PanicWaker { + fn wake_by_ref(_arc_self: &Arc<Self>) { + panic!("WAKE UP"); + } + } + + let some_w = Arc::new(PanicWaker); + + let w1: Waker = task::waker(some_w.clone()); + assert_eq!( + "WAKE UP", + *panic::catch_unwind(|| w1.wake_by_ref()).unwrap_err().downcast::<&str>().unwrap() + ); + assert_eq!(2, Arc::strong_count(&some_w)); // some_w + w1 + drop(w1); + assert_eq!(1, Arc::strong_count(&some_w)); // some_w +} diff --git a/third_party/rust/futures/tests/task_atomic_waker.rs b/third_party/rust/futures/tests/task_atomic_waker.rs new file mode 100644 index 0000000000..cec3db2876 --- /dev/null +++ b/third_party/rust/futures/tests/task_atomic_waker.rs @@ -0,0 +1,48 @@ +use futures::executor::block_on; +use futures::future::poll_fn; +use futures::task::{AtomicWaker, Poll}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::thread; + +#[test] +fn basic() { + let atomic_waker = Arc::new(AtomicWaker::new()); + let atomic_waker_copy = atomic_waker.clone(); + + let returned_pending = Arc::new(AtomicUsize::new(0)); + let returned_pending_copy = returned_pending.clone(); + + let woken = Arc::new(AtomicUsize::new(0)); + let woken_copy = woken.clone(); + + let t = thread::spawn(move || { + let mut pending_count = 0; + + block_on(poll_fn(move |cx| { + if woken_copy.load(Ordering::Relaxed) == 1 { + Poll::Ready(()) + } else { + // Assert we return pending exactly once + assert_eq!(0, pending_count); + pending_count += 1; + atomic_waker_copy.register(cx.waker()); + + returned_pending_copy.store(1, Ordering::Relaxed); + + Poll::Pending + } + })) + }); + + while returned_pending.load(Ordering::Relaxed) == 0 {} + + // give spawned thread some time to sleep in `block_on` + thread::yield_now(); + + woken.store(1, Ordering::Relaxed); + atomic_waker.wake(); + + t.join().unwrap(); +} diff --git a/third_party/rust/futures/tests/test_macro.rs b/third_party/rust/futures/tests/test_macro.rs new file mode 100644 index 0000000000..6adf51d8bb --- /dev/null +++ b/third_party/rust/futures/tests/test_macro.rs @@ -0,0 +1,20 @@ +#[futures_test::test] +async fn it_works() { + let fut = async { true }; + assert!(fut.await); + + let fut = async { false }; + assert!(!fut.await); +} + +#[should_panic] +#[futures_test::test] +async fn it_is_being_run() { + let fut = async { false }; + assert!(fut.await); +} + +#[futures_test::test] +async fn return_ty() -> Result<(), ()> { + Ok(()) +} diff --git a/third_party/rust/futures/tests/try_join.rs b/third_party/rust/futures/tests/try_join.rs new file mode 100644 index 0000000000..0281ab897d --- /dev/null +++ b/third_party/rust/futures/tests/try_join.rs @@ -0,0 +1,35 @@ +#![deny(unreachable_code)] + +use futures::{executor::block_on, try_join}; + +// TODO: This abuses https://github.com/rust-lang/rust/issues/58733 in order to +// test behavior of the `try_join!` macro with the never type before it is +// stabilized. Once `!` is again stabilized this can be removed and replaced +// with direct use of `!` below where `Never` is used. +trait MyTrait { + type Output; +} +impl<T> MyTrait for fn() -> T { + type Output = T; +} +type Never = <fn() -> ! as MyTrait>::Output; + +#[test] +fn try_join_never_error() { + block_on(async { + let future1 = async { Ok::<(), Never>(()) }; + let future2 = async { Ok::<(), Never>(()) }; + try_join!(future1, future2) + }) + .unwrap(); +} + +#[test] +fn try_join_never_ok() { + block_on(async { + let future1 = async { Err::<Never, ()>(()) }; + let future2 = async { Err::<Never, ()>(()) }; + try_join!(future1, future2) + }) + .unwrap_err(); +} |