summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures/tests
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/futures/tests
parentInitial commit. (diff)
downloadfirefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz
firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures/tests')
-rw-r--r--third_party/rust/futures/tests/abortable.rs39
-rw-r--r--third_party/rust/futures/tests/arc_wake.rs77
-rw-r--r--third_party/rust/futures/tests/async_await_macros.rs360
-rw-r--r--third_party/rust/futures/tests/atomic_waker.rs49
-rw-r--r--third_party/rust/futures/tests/basic_combinators.rs98
-rw-r--r--third_party/rust/futures/tests/buffer_unordered.rs73
-rw-r--r--third_party/rust/futures/tests/compat.rs17
-rw-r--r--third_party/rust/futures/tests/eager_drop.rs117
-rw-r--r--third_party/rust/futures/tests/eventual.rs159
-rw-r--r--third_party/rust/futures/tests/fuse.rs12
-rw-r--r--third_party/rust/futures/tests/future_obj.rs33
-rw-r--r--third_party/rust/futures/tests/future_try_flatten_stream.rs82
-rw-r--r--third_party/rust/futures/tests/futures_ordered.rs83
-rw-r--r--third_party/rust/futures/tests/futures_unordered.rs287
-rw-r--r--third_party/rust/futures/tests/inspect.rs14
-rw-r--r--third_party/rust/futures/tests/io_buf_reader.rs321
-rw-r--r--third_party/rust/futures/tests/io_buf_writer.rs236
-rw-r--r--third_party/rust/futures/tests/io_cursor.rs29
-rw-r--r--third_party/rust/futures/tests/io_lines.rs62
-rw-r--r--third_party/rust/futures/tests/io_read.rs65
-rw-r--r--third_party/rust/futures/tests/io_read_exact.rs17
-rw-r--r--third_party/rust/futures/tests/io_read_line.rs60
-rw-r--r--third_party/rust/futures/tests/io_read_to_string.rs45
-rw-r--r--third_party/rust/futures/tests/io_read_until.rs60
-rw-r--r--third_party/rust/futures/tests/io_window.rs26
-rw-r--r--third_party/rust/futures/tests/io_write.rs70
-rw-r--r--third_party/rust/futures/tests/join_all.rs43
-rw-r--r--third_party/rust/futures/tests/macro_comma_support.rs42
-rw-r--r--third_party/rust/futures/tests/mutex.rs69
-rw-r--r--third_party/rust/futures/tests/object_safety.rs49
-rw-r--r--third_party/rust/futures/tests/oneshot.rs66
-rw-r--r--third_party/rust/futures/tests/ready_queue.rs151
-rw-r--r--third_party/rust/futures/tests/recurse.rs22
-rw-r--r--third_party/rust/futures/tests/select_all.rs29
-rw-r--r--third_party/rust/futures/tests/select_ok.rs39
-rw-r--r--third_party/rust/futures/tests/shared.rs151
-rw-r--r--third_party/rust/futures/tests/sink.rs516
-rw-r--r--third_party/rust/futures/tests/sink_fanout.rs24
-rw-r--r--third_party/rust/futures/tests/split.rs77
-rw-r--r--third_party/rust/futures/tests/stream.rs32
-rw-r--r--third_party/rust/futures/tests/stream_catch_unwind.rs27
-rw-r--r--third_party/rust/futures/tests/stream_into_async_read.rs96
-rw-r--r--third_party/rust/futures/tests/stream_peekable.rs13
-rw-r--r--third_party/rust/futures/tests/stream_select_all.rs78
-rw-r--r--third_party/rust/futures/tests/stream_select_next_some.rs85
-rw-r--r--third_party/rust/futures/tests/try_join.rs36
-rw-r--r--third_party/rust/futures/tests/try_join_all.rs44
-rw-r--r--third_party/rust/futures/tests/unfold.rs35
48 files changed, 4215 insertions, 0 deletions
diff --git a/third_party/rust/futures/tests/abortable.rs b/third_party/rust/futures/tests/abortable.rs
new file mode 100644
index 0000000000..5925c9a27b
--- /dev/null
+++ b/third_party/rust/futures/tests/abortable.rs
@@ -0,0 +1,39 @@
+use futures::channel::oneshot;
+use futures::executor::block_on;
+use futures::future::{abortable, Aborted, FutureExt};
+use futures::task::{Context, Poll};
+use futures_test::task::new_count_waker;
+
+#[test]
+fn abortable_works() {
+ let (_tx, a_rx) = oneshot::channel::<()>();
+ let (abortable_rx, abort_handle) = abortable(a_rx);
+
+ abort_handle.abort();
+ assert_eq!(Err(Aborted), block_on(abortable_rx));
+}
+
+#[test]
+fn abortable_awakens() {
+ let (_tx, a_rx) = oneshot::channel::<()>();
+ let (mut abortable_rx, abort_handle) = abortable(a_rx);
+
+ let (waker, counter) = new_count_waker();
+ let mut cx = Context::from_waker(&waker);
+ assert_eq!(counter, 0);
+ assert_eq!(Poll::Pending, abortable_rx.poll_unpin(&mut cx));
+ assert_eq!(counter, 0);
+ abort_handle.abort();
+ assert_eq!(counter, 1);
+ assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(&mut cx));
+}
+
+#[test]
+fn abortable_resolves() {
+ let (tx, a_rx) = oneshot::channel::<()>();
+ let (abortable_rx, _abort_handle) = abortable(a_rx);
+
+ tx.send(()).unwrap();
+
+ assert_eq!(Ok(Ok(())), block_on(abortable_rx));
+}
diff --git a/third_party/rust/futures/tests/arc_wake.rs b/third_party/rust/futures/tests/arc_wake.rs
new file mode 100644
index 0000000000..1940e4f98b
--- /dev/null
+++ b/third_party/rust/futures/tests/arc_wake.rs
@@ -0,0 +1,77 @@
+use futures::task::{self, ArcWake, Waker};
+use std::sync::{Arc, Mutex};
+
+struct CountingWaker {
+ nr_wake: Mutex<i32>,
+}
+
+impl CountingWaker {
+ fn new() -> CountingWaker {
+ CountingWaker {
+ nr_wake: Mutex::new(0),
+ }
+ }
+
+ fn wakes(&self) -> i32 {
+ *self.nr_wake.lock().unwrap()
+ }
+}
+
+impl ArcWake for CountingWaker {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ let mut lock = arc_self.nr_wake.lock().unwrap();
+ *lock += 1;
+ }
+}
+
+#[test]
+fn create_waker_from_arc() {
+ let some_w = Arc::new(CountingWaker::new());
+
+ let w1: Waker = task::waker(some_w.clone());
+ assert_eq!(2, Arc::strong_count(&some_w));
+ w1.wake_by_ref();
+ assert_eq!(1, some_w.wakes());
+
+ let w2 = w1.clone();
+ assert_eq!(3, Arc::strong_count(&some_w));
+
+ w2.wake_by_ref();
+ assert_eq!(2, some_w.wakes());
+
+ drop(w2);
+ assert_eq!(2, Arc::strong_count(&some_w));
+ drop(w1);
+ assert_eq!(1, Arc::strong_count(&some_w));
+}
+
+struct PanicWaker;
+
+impl ArcWake for PanicWaker {
+ fn wake_by_ref(_arc_self: &Arc<Self>) {
+ panic!("WAKE UP");
+ }
+}
+
+#[test]
+fn proper_refcount_on_wake_panic() {
+ let some_w = Arc::new(PanicWaker);
+
+ let w1: Waker = task::waker(some_w.clone());
+ assert_eq!("WAKE UP", *std::panic::catch_unwind(|| w1.wake_by_ref()).unwrap_err().downcast::<&str>().unwrap());
+ assert_eq!(2, Arc::strong_count(&some_w)); // some_w + w1
+ drop(w1);
+ assert_eq!(1, Arc::strong_count(&some_w)); // some_w
+}
+
+#[test]
+fn waker_ref_wake_same() {
+ let some_w = Arc::new(CountingWaker::new());
+
+ let w1: Waker = task::waker(some_w.clone());
+ let w2 = task::waker_ref(&some_w);
+ let w3 = w2.clone();
+
+ assert!(w1.will_wake(&w2));
+ assert!(w2.will_wake(&w3));
+}
diff --git a/third_party/rust/futures/tests/async_await_macros.rs b/third_party/rust/futures/tests/async_await_macros.rs
new file mode 100644
index 0000000000..bc717df535
--- /dev/null
+++ b/third_party/rust/futures/tests/async_await_macros.rs
@@ -0,0 +1,360 @@
+#![recursion_limit="128"]
+
+use futures::{pending, pin_mut, poll, join, try_join, select};
+use futures::channel::{mpsc, oneshot};
+use futures::executor::block_on;
+use futures::future::{self, FutureExt, poll_fn};
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+use futures::task::{Context, Poll};
+
+#[test]
+fn poll_and_pending() {
+ let pending_once = async { pending!() };
+ block_on(async {
+ pin_mut!(pending_once);
+ assert_eq!(Poll::Pending, poll!(&mut pending_once));
+ assert_eq!(Poll::Ready(()), poll!(&mut pending_once));
+ });
+}
+
+#[test]
+fn join() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (tx2, rx2) = oneshot::channel::<i32>();
+
+ let fut = async {
+ let res = join!(rx1, rx2);
+ assert_eq!((Ok(1), Ok(2)), res);
+ };
+
+ block_on(async {
+ pin_mut!(fut);
+ assert_eq!(Poll::Pending, poll!(&mut fut));
+ tx1.send(1).unwrap();
+ assert_eq!(Poll::Pending, poll!(&mut fut));
+ tx2.send(2).unwrap();
+ assert_eq!(Poll::Ready(()), poll!(&mut fut));
+ });
+}
+
+#[test]
+fn select() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (_tx2, rx2) = oneshot::channel::<i32>();
+ tx1.send(1).unwrap();
+ let mut ran = false;
+ block_on(async {
+ select! {
+ res = rx1.fuse() => {
+ assert_eq!(Ok(1), res);
+ ran = true;
+ },
+ _ = rx2.fuse() => unreachable!(),
+ }
+ });
+ assert!(ran);
+}
+
+#[test]
+fn select_biased() {
+ use futures::select_biased;
+
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (_tx2, rx2) = oneshot::channel::<i32>();
+ tx1.send(1).unwrap();
+ let mut ran = false;
+ block_on(async {
+ select_biased! {
+ res = rx1.fuse() => {
+ assert_eq!(Ok(1), res);
+ ran = true;
+ },
+ _ = rx2.fuse() => unreachable!(),
+ }
+ });
+ assert!(ran);
+}
+
+#[test]
+fn select_streams() {
+ let (mut tx1, rx1) = mpsc::channel::<i32>(1);
+ let (mut tx2, rx2) = mpsc::channel::<i32>(1);
+ let mut rx1 = rx1.fuse();
+ let mut rx2 = rx2.fuse();
+ let mut ran = false;
+ let mut total = 0;
+ block_on(async {
+ let mut tx1_opt;
+ let mut tx2_opt;
+ select! {
+ _ = rx1.next() => panic!(),
+ _ = rx2.next() => panic!(),
+ default => {
+ tx1.send(2).await.unwrap();
+ tx2.send(3).await.unwrap();
+ tx1_opt = Some(tx1);
+ tx2_opt = Some(tx2);
+ }
+ complete => panic!(),
+ }
+ loop {
+ select! {
+ // runs first and again after default
+ x = rx1.next() => if let Some(x) = x { total += x; },
+ // runs second and again after default
+ x = rx2.next() => if let Some(x) = x { total += x; },
+ // runs third
+ default => {
+ assert_eq!(total, 5);
+ ran = true;
+ drop(tx1_opt.take().unwrap());
+ drop(tx2_opt.take().unwrap());
+ },
+ // runs last
+ complete => break,
+ };
+ }
+ });
+ assert!(ran);
+}
+
+#[test]
+fn select_can_move_uncompleted_futures() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (tx2, rx2) = oneshot::channel::<i32>();
+ tx1.send(1).unwrap();
+ tx2.send(2).unwrap();
+ let mut ran = false;
+ let mut rx1 = rx1.fuse();
+ let mut rx2 = rx2.fuse();
+ block_on(async {
+ select! {
+ res = rx1 => {
+ assert_eq!(Ok(1), res);
+ assert_eq!(Ok(2), rx2.await);
+ ran = true;
+ },
+ res = rx2 => {
+ assert_eq!(Ok(2), res);
+ assert_eq!(Ok(1), rx1.await);
+ ran = true;
+ },
+ }
+ });
+ assert!(ran);
+}
+
+#[test]
+fn select_nested() {
+ let mut outer_fut = future::ready(1);
+ let mut inner_fut = future::ready(2);
+ let res = block_on(async {
+ select! {
+ x = outer_fut => {
+ select! {
+ y = inner_fut => x + y,
+ }
+ }
+ }
+ });
+ assert_eq!(res, 3);
+}
+
+#[test]
+fn select_size() {
+ let fut = async {
+ let mut ready = future::ready(0i32);
+ select! {
+ _ = ready => {},
+ }
+ };
+ assert_eq!(::std::mem::size_of_val(&fut), 24);
+
+ let fut = async {
+ let mut ready1 = future::ready(0i32);
+ let mut ready2 = future::ready(0i32);
+ select! {
+ _ = ready1 => {},
+ _ = ready2 => {},
+ }
+ };
+ assert_eq!(::std::mem::size_of_val(&fut), 40);
+}
+
+#[test]
+fn select_on_non_unpin_expressions() {
+ // The returned Future is !Unpin
+ let make_non_unpin_fut = || { async {
+ 5
+ }};
+
+ let res = block_on(async {
+ let select_res;
+ select! {
+ value_1 = make_non_unpin_fut().fuse() => { select_res = value_1 },
+ value_2 = make_non_unpin_fut().fuse() => { select_res = value_2 },
+ };
+ select_res
+ });
+ assert_eq!(res, 5);
+}
+
+#[test]
+fn select_on_non_unpin_expressions_with_default() {
+ // The returned Future is !Unpin
+ let make_non_unpin_fut = || { async {
+ 5
+ }};
+
+ let res = block_on(async {
+ let select_res;
+ select! {
+ value_1 = make_non_unpin_fut().fuse() => { select_res = value_1 },
+ value_2 = make_non_unpin_fut().fuse() => { select_res = value_2 },
+ default => { select_res = 7 },
+ };
+ select_res
+ });
+ assert_eq!(res, 5);
+}
+
+#[test]
+fn select_on_non_unpin_size() {
+ // The returned Future is !Unpin
+ let make_non_unpin_fut = || { async {
+ 5
+ }};
+
+ let fut = async {
+ let select_res;
+ select! {
+ value_1 = make_non_unpin_fut().fuse() => { select_res = value_1 },
+ value_2 = make_non_unpin_fut().fuse() => { select_res = value_2 },
+ };
+ select_res
+ };
+
+ assert_eq!(48, std::mem::size_of_val(&fut));
+}
+
+#[test]
+fn select_can_be_used_as_expression() {
+ block_on(async {
+ let res = select! {
+ x = future::ready(7) => { x },
+ y = future::ready(3) => { y + 1 },
+ };
+ assert!(res == 7 || res == 4);
+ });
+}
+
+#[test]
+fn select_with_default_can_be_used_as_expression() {
+ fn poll_always_pending<T>(_cx: &mut Context<'_>) -> Poll<T> {
+ Poll::Pending
+ }
+
+ block_on(async {
+ let res = select! {
+ x = poll_fn(poll_always_pending::<i32>).fuse() => x,
+ y = poll_fn(poll_always_pending::<i32>).fuse() => { y + 1 },
+ default => 99,
+ };
+ assert_eq!(res, 99);
+ });
+}
+
+#[test]
+fn select_with_complete_can_be_used_as_expression() {
+ block_on(async {
+ let res = select! {
+ x = future::pending::<i32>() => { x },
+ y = future::pending::<i32>() => { y + 1 },
+ default => 99,
+ complete => 237,
+ };
+ assert_eq!(res, 237);
+ });
+}
+
+async fn require_mutable(_: &mut i32) {}
+async fn async_noop() {}
+
+#[test]
+fn select_on_mutable_borrowing_future_with_same_borrow_in_block() {
+ block_on(async {
+ let mut value = 234;
+ select! {
+ x = require_mutable(&mut value).fuse() => { },
+ y = async_noop().fuse() => {
+ value += 5;
+ },
+ }
+ });
+}
+
+#[test]
+fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() {
+ block_on(async {
+ let mut value = 234;
+ select! {
+ x = require_mutable(&mut value).fuse() => { },
+ y = async_noop().fuse() => {
+ value += 5;
+ },
+ default => {
+ value += 27;
+ },
+ }
+ });
+}
+
+#[test]
+fn join_size() {
+ let fut = async {
+ let ready = future::ready(0i32);
+ join!(ready)
+ };
+ assert_eq!(::std::mem::size_of_val(&fut), 16);
+
+ let fut = async {
+ let ready1 = future::ready(0i32);
+ let ready2 = future::ready(0i32);
+ join!(ready1, ready2)
+ };
+ assert_eq!(::std::mem::size_of_val(&fut), 28);
+}
+
+#[test]
+fn try_join_size() {
+ let fut = async {
+ let ready = future::ready(Ok::<i32, i32>(0));
+ try_join!(ready)
+ };
+ assert_eq!(::std::mem::size_of_val(&fut), 16);
+
+ let fut = async {
+ let ready1 = future::ready(Ok::<i32, i32>(0));
+ let ready2 = future::ready(Ok::<i32, i32>(0));
+ try_join!(ready1, ready2)
+ };
+ assert_eq!(::std::mem::size_of_val(&fut), 28);
+}
+
+#[test]
+fn join_doesnt_require_unpin() {
+ let _ = async {
+ join!(async {}, async {})
+ };
+}
+
+#[test]
+fn try_join_doesnt_require_unpin() {
+ let _ = async {
+ try_join!(
+ async { Ok::<(), ()>(()) },
+ async { Ok::<(), ()>(()) },
+ )
+ };
+}
diff --git a/third_party/rust/futures/tests/atomic_waker.rs b/third_party/rust/futures/tests/atomic_waker.rs
new file mode 100644
index 0000000000..d9ce753701
--- /dev/null
+++ b/third_party/rust/futures/tests/atomic_waker.rs
@@ -0,0 +1,49 @@
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use std::thread;
+
+use futures::executor::block_on;
+use futures::future::poll_fn;
+use futures::task::{AtomicWaker, Poll};
+
+#[test]
+fn basic() {
+ let atomic_waker = Arc::new(AtomicWaker::new());
+ let atomic_waker_copy = atomic_waker.clone();
+
+ let returned_pending = Arc::new(AtomicUsize::new(0));
+ let returned_pending_copy = returned_pending.clone();
+
+ let woken = Arc::new(AtomicUsize::new(0));
+ let woken_copy = woken.clone();
+
+ let t = thread::spawn(move || {
+ let mut pending_count = 0;
+
+ block_on(poll_fn(move |cx| {
+ if woken_copy.load(Ordering::Relaxed) == 1 {
+ Poll::Ready(())
+ } else {
+ // Assert we return pending exactly once
+ assert_eq!(0, pending_count);
+ pending_count += 1;
+ atomic_waker_copy.register(cx.waker());
+
+ returned_pending_copy.store(1, Ordering::Relaxed);
+
+ Poll::Pending
+ }
+ }))
+ });
+
+ while returned_pending.load(Ordering::Relaxed) == 0 {}
+
+ // give spawned thread some time to sleep in `block_on`
+ thread::yield_now();
+
+ woken.store(1, Ordering::Relaxed);
+ atomic_waker.wake();
+
+ t.join().unwrap();
+}
diff --git a/third_party/rust/futures/tests/basic_combinators.rs b/third_party/rust/futures/tests/basic_combinators.rs
new file mode 100644
index 0000000000..fa65b6f5f1
--- /dev/null
+++ b/third_party/rust/futures/tests/basic_combinators.rs
@@ -0,0 +1,98 @@
+use futures::future::{self, FutureExt, TryFutureExt};
+use futures_test::future::FutureTestExt;
+use std::sync::mpsc;
+
+#[test]
+fn basic_future_combinators() {
+ let (tx1, rx) = mpsc::channel();
+ let tx2 = tx1.clone();
+ let tx3 = tx1.clone();
+
+ let fut = future::ready(1)
+ .then(move |x| {
+ tx1.send(x).unwrap(); // Send 1
+ tx1.send(2).unwrap(); // Send 2
+ future::ready(3)
+ }).map(move |x| {
+ tx2.send(x).unwrap(); // Send 3
+ tx2.send(4).unwrap(); // Send 4
+ 5
+ }).map(move |x| {
+ tx3.send(x).unwrap(); // Send 5
+ });
+
+ assert!(rx.try_recv().is_err()); // Not started yet
+ fut.run_in_background(); // Start it
+ for i in 1..=5 { assert_eq!(rx.recv(), Ok(i)); } // Check it
+ assert!(rx.recv().is_err()); // Should be done
+}
+
+#[test]
+fn basic_try_future_combinators() {
+ let (tx1, rx) = mpsc::channel();
+ let tx2 = tx1.clone();
+ let tx3 = tx1.clone();
+ let tx4 = tx1.clone();
+ let tx5 = tx1.clone();
+ let tx6 = tx1.clone();
+ let tx7 = tx1.clone();
+ let tx8 = tx1.clone();
+ let tx9 = tx1.clone();
+ let tx10 = tx1.clone();
+
+ let fut = future::ready(Ok(1))
+ .and_then(move |x: i32| {
+ tx1.send(x).unwrap(); // Send 1
+ tx1.send(2).unwrap(); // Send 2
+ future::ready(Ok(3))
+ })
+ .or_else(move |x: i32| {
+ tx2.send(x).unwrap(); // Should not run
+ tx2.send(-1).unwrap();
+ future::ready(Ok(-1))
+ })
+ .map_ok(move |x: i32| {
+ tx3.send(x).unwrap(); // Send 3
+ tx3.send(4).unwrap(); // Send 4
+ 5
+ })
+ .map_err(move |x: i32| {
+ tx4.send(x).unwrap(); // Should not run
+ tx4.send(-1).unwrap();
+ -1
+ })
+ .map(move |x: Result<i32, i32>| {
+ tx5.send(x.unwrap()).unwrap(); // Send 5
+ tx5.send(6).unwrap(); // Send 6
+ Err(7) // Now return errors!
+ })
+ .and_then(move |x: i32| {
+ tx6.send(x).unwrap(); // Should not run
+ tx6.send(-1).unwrap();
+ future::ready(Err(-1))
+ })
+ .or_else(move |x: i32| {
+ tx7.send(x).unwrap(); // Send 7
+ tx7.send(8).unwrap(); // Send 8
+ future::ready(Err(9))
+ })
+ .map_ok(move |x: i32| {
+ tx8.send(x).unwrap(); // Should not run
+ tx8.send(-1).unwrap();
+ -1
+ })
+ .map_err(move |x: i32| {
+ tx9.send(x).unwrap(); // Send 9
+ tx9.send(10).unwrap(); // Send 10
+ 11
+ })
+ .map(move |x: Result<i32, i32>| {
+ tx10.send(x.err().unwrap()).unwrap(); // Send 11
+ tx10.send(12).unwrap(); // Send 12
+ });
+
+ assert!(rx.try_recv().is_err()); // Not started yet
+ fut.run_in_background(); // Start it
+ for i in 1..=12 { assert_eq!(rx.recv(), Ok(i)); } // Check it
+ assert!(rx.recv().is_err()); // Should be done
+}
diff --git a/third_party/rust/futures/tests/buffer_unordered.rs b/third_party/rust/futures/tests/buffer_unordered.rs
new file mode 100644
index 0000000000..1c559c8544
--- /dev/null
+++ b/third_party/rust/futures/tests/buffer_unordered.rs
@@ -0,0 +1,73 @@
+use futures::channel::{oneshot, mpsc};
+use futures::executor::{block_on, block_on_stream};
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+use std::sync::mpsc as std_mpsc;
+use std::thread;
+
+#[test]
+#[ignore] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790
+fn works() {
+ const N: usize = 4;
+
+ let (mut tx, rx) = mpsc::channel(1);
+
+ let (tx2, rx2) = std_mpsc::channel();
+ let (tx3, rx3) = std_mpsc::channel();
+ let t1 = thread::spawn(move || {
+ for _ in 0..=N {
+ let (mytx, myrx) = oneshot::channel();
+ block_on(tx.send(myrx)).unwrap();
+ tx3.send(mytx).unwrap();
+ }
+ rx2.recv().unwrap();
+ for _ in 0..N {
+ let (mytx, myrx) = oneshot::channel();
+ block_on(tx.send(myrx)).unwrap();
+ tx3.send(mytx).unwrap();
+ }
+ });
+
+ let (tx4, rx4) = std_mpsc::channel();
+ let t2 = thread::spawn(move || {
+ for item in block_on_stream(rx.buffer_unordered(N)) {
+ tx4.send(item.unwrap()).unwrap();
+ }
+ });
+
+ let o1 = rx3.recv().unwrap();
+ let o2 = rx3.recv().unwrap();
+ let o3 = rx3.recv().unwrap();
+ let o4 = rx3.recv().unwrap();
+ assert!(rx4.try_recv().is_err());
+
+ o1.send(1).unwrap();
+ assert_eq!(rx4.recv(), Ok(1));
+ o3.send(3).unwrap();
+ assert_eq!(rx4.recv(), Ok(3));
+ tx2.send(()).unwrap();
+ o2.send(2).unwrap();
+ assert_eq!(rx4.recv(), Ok(2));
+ o4.send(4).unwrap();
+ assert_eq!(rx4.recv(), Ok(4));
+
+ let o5 = rx3.recv().unwrap();
+ let o6 = rx3.recv().unwrap();
+ let o7 = rx3.recv().unwrap();
+ let o8 = rx3.recv().unwrap();
+ let o9 = rx3.recv().unwrap();
+
+ o5.send(5).unwrap();
+ assert_eq!(rx4.recv(), Ok(5));
+ o8.send(8).unwrap();
+ assert_eq!(rx4.recv(), Ok(8));
+ o9.send(9).unwrap();
+ assert_eq!(rx4.recv(), Ok(9));
+ o7.send(7).unwrap();
+ assert_eq!(rx4.recv(), Ok(7));
+ o6.send(6).unwrap();
+ assert_eq!(rx4.recv(), Ok(6));
+
+ t1.join().unwrap();
+ t2.join().unwrap();
+}
diff --git a/third_party/rust/futures/tests/compat.rs b/third_party/rust/futures/tests/compat.rs
new file mode 100644
index 0000000000..39adc7cba4
--- /dev/null
+++ b/third_party/rust/futures/tests/compat.rs
@@ -0,0 +1,17 @@
+#![cfg(feature = "compat")]
+
+use tokio::timer::Delay;
+use tokio::runtime::Runtime;
+use std::time::Instant;
+use futures::prelude::*;
+use futures::compat::Future01CompatExt;
+
+#[test]
+fn can_use_01_futures_in_a_03_future_running_on_a_01_executor() {
+ let f = async {
+ Delay::new(Instant::now()).compat().await
+ };
+
+ let mut runtime = Runtime::new().unwrap();
+ runtime.block_on(f.boxed().compat()).unwrap();
+}
diff --git a/third_party/rust/futures/tests/eager_drop.rs b/third_party/rust/futures/tests/eager_drop.rs
new file mode 100644
index 0000000000..674e40121d
--- /dev/null
+++ b/third_party/rust/futures/tests/eager_drop.rs
@@ -0,0 +1,117 @@
+use futures::channel::oneshot;
+use futures::future::{self, Future, FutureExt, TryFutureExt};
+use futures::task::{Context, Poll};
+use futures_test::future::FutureTestExt;
+use pin_utils::unsafe_pinned;
+use std::pin::Pin;
+use std::sync::mpsc;
+
+#[test]
+fn map_ok() {
+ // The closure given to `map_ok` should have been dropped by the time `map`
+ // runs.
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
+
+ future::ready::<Result<i32, i32>>(Err(1))
+ .map_ok(move |_| { let _tx1 = tx1; panic!("should not run"); })
+ .map(move |_| {
+ assert!(rx1.recv().is_err());
+ tx2.send(()).unwrap()
+ })
+ .run_in_background();
+
+ rx2.recv().unwrap();
+}
+
+#[test]
+fn map_err() {
+ // The closure given to `map_err` should have been dropped by the time `map`
+ // runs.
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
+
+ future::ready::<Result<i32, i32>>(Ok(1))
+ .map_err(move |_| { let _tx1 = tx1; panic!("should not run"); })
+ .map(move |_| {
+ assert!(rx1.recv().is_err());
+ tx2.send(()).unwrap()
+ })
+ .run_in_background();
+
+ rx2.recv().unwrap();
+}
+
+struct FutureData<F, T> {
+ _data: T,
+ future: F,
+}
+
+impl<F, T> FutureData<F, T> {
+ unsafe_pinned!(future: F);
+}
+
+impl<F: Future, T: Send + 'static> Future for FutureData<F, T> {
+ type Output = F::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
+ self.future().poll(cx)
+ }
+}
+
+#[test]
+fn then_drops_eagerly() {
+ let (tx0, rx0) = oneshot::channel::<()>();
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
+
+ FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
+ .then(move |_| {
+ assert!(rx1.recv().is_err()); // tx1 should have been dropped
+ tx2.send(()).unwrap();
+ future::ready(())
+ })
+ .run_in_background();
+
+ assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
+ tx0.send(()).unwrap();
+ rx2.recv().unwrap();
+}
+
+#[test]
+fn and_then_drops_eagerly() {
+ let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
+
+ FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
+ .and_then(move |_| {
+ assert!(rx1.recv().is_err()); // tx1 should have been dropped
+ tx2.send(()).unwrap();
+ future::ready(Ok(()))
+ })
+ .run_in_background();
+
+ assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
+ tx0.send(Ok(())).unwrap();
+ rx2.recv().unwrap();
+}
+
+#[test]
+fn or_else_drops_eagerly() {
+ let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
+ let (tx1, rx1) = mpsc::channel::<()>();
+ let (tx2, rx2) = mpsc::channel::<()>();
+
+ FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
+ .or_else(move |_| {
+ assert!(rx1.recv().is_err()); // tx1 should have been dropped
+ tx2.send(()).unwrap();
+ future::ready::<Result<(), ()>>(Ok(()))
+ })
+ .run_in_background();
+
+ assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
+ tx0.send(Err(())).unwrap();
+ rx2.recv().unwrap();
+}
diff --git a/third_party/rust/futures/tests/eventual.rs b/third_party/rust/futures/tests/eventual.rs
new file mode 100644
index 0000000000..bff000dd09
--- /dev/null
+++ b/third_party/rust/futures/tests/eventual.rs
@@ -0,0 +1,159 @@
+use futures::channel::oneshot;
+use futures::executor::ThreadPool;
+use futures::future::{self, ok, Future, FutureExt, TryFutureExt};
+use futures::task::SpawnExt;
+use std::sync::mpsc;
+use std::thread;
+
+fn run<F: Future + Send + 'static>(future: F) {
+ let tp = ThreadPool::new().unwrap();
+ tp.spawn(future.map(drop)).unwrap();
+}
+
+#[test]
+fn join1() {
+ let (tx, rx) = mpsc::channel();
+ run(future::try_join(ok::<i32, i32>(1), ok(2)).map_ok(move |v| tx.send(v).unwrap()));
+ assert_eq!(rx.recv(), Ok((1, 2)));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn join2() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ run(future::try_join(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
+ assert!(rx.try_recv().is_err());
+ c1.send(1).unwrap();
+ assert!(rx.try_recv().is_err());
+ c2.send(2).unwrap();
+ assert_eq!(rx.recv(), Ok((1, 2)));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn join3() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ run(future::try_join(p1, p2).map_err(move |_v| tx.send(1).unwrap()));
+ assert!(rx.try_recv().is_err());
+ drop(c1);
+ assert_eq!(rx.recv(), Ok(1));
+ assert!(rx.recv().is_err());
+ drop(c2);
+}
+
+#[test]
+fn join4() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ run(future::try_join(p1, p2).map_err(move |v| tx.send(v).unwrap()));
+ assert!(rx.try_recv().is_err());
+ drop(c1);
+ assert!(rx.recv().is_ok());
+ drop(c2);
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn join5() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (c3, p3) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ run(future::try_join(future::try_join(p1, p2), p3).map_ok(move |v| tx.send(v).unwrap()));
+ assert!(rx.try_recv().is_err());
+ c1.send(1).unwrap();
+ assert!(rx.try_recv().is_err());
+ c2.send(2).unwrap();
+ assert!(rx.try_recv().is_err());
+ c3.send(3).unwrap();
+ assert_eq!(rx.recv(), Ok(((1, 2), 3)));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn select1() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ run(future::try_select(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
+ assert!(rx.try_recv().is_err());
+ c1.send(1).unwrap();
+ let (v, p2) = rx.recv().unwrap().into_inner();
+ assert_eq!(v, 1);
+ assert!(rx.recv().is_err());
+
+ let (tx, rx) = mpsc::channel();
+ run(p2.map_ok(move |v| tx.send(v).unwrap()));
+ c2.send(2).unwrap();
+ assert_eq!(rx.recv(), Ok(2));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn select2() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
+ assert!(rx.try_recv().is_err());
+ drop(c1);
+ let (v, p2) = rx.recv().unwrap();
+ assert_eq!(v, 1);
+ assert!(rx.recv().is_err());
+
+ let (tx, rx) = mpsc::channel();
+ run(p2.map_ok(move |v| tx.send(v).unwrap()));
+ c2.send(2).unwrap();
+ assert_eq!(rx.recv(), Ok(2));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn select3() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
+ assert!(rx.try_recv().is_err());
+ drop(c1);
+ let (v, p2) = rx.recv().unwrap();
+ assert_eq!(v, 1);
+ assert!(rx.recv().is_err());
+
+ let (tx, rx) = mpsc::channel();
+ run(p2.map_err(move |_v| tx.send(2).unwrap()));
+ drop(c2);
+ assert_eq!(rx.recv(), Ok(2));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn select4() {
+ let (tx, rx) = mpsc::channel::<oneshot::Sender<i32>>();
+
+ let t = thread::spawn(move || {
+ for c in rx {
+ c.send(1).unwrap();
+ }
+ });
+
+ let (tx2, rx2) = mpsc::channel();
+ for _ in 0..10000 {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+
+ let tx3 = tx2.clone();
+ run(future::try_select(p1, p2).map_ok(move |_| tx3.send(()).unwrap()));
+ tx.send(c1).unwrap();
+ rx2.recv().unwrap();
+ drop(c2);
+ }
+ drop(tx);
+
+ t.join().unwrap();
+}
diff --git a/third_party/rust/futures/tests/fuse.rs b/third_party/rust/futures/tests/fuse.rs
new file mode 100644
index 0000000000..83f2c1ce9e
--- /dev/null
+++ b/third_party/rust/futures/tests/fuse.rs
@@ -0,0 +1,12 @@
+use futures::future::{self, FutureExt};
+use futures::task::Context;
+use futures_test::task::panic_waker;
+
+#[test]
+fn fuse() {
+ let mut future = future::ready::<i32>(2).fuse();
+ let waker = panic_waker();
+ let mut cx = Context::from_waker(&waker);
+ assert!(future.poll_unpin(&mut cx).is_ready());
+ assert!(future.poll_unpin(&mut cx).is_pending());
+}
diff --git a/third_party/rust/futures/tests/future_obj.rs b/third_party/rust/futures/tests/future_obj.rs
new file mode 100644
index 0000000000..c6b18fc85c
--- /dev/null
+++ b/third_party/rust/futures/tests/future_obj.rs
@@ -0,0 +1,33 @@
+use futures::future::{Future, FutureObj, FutureExt};
+use std::pin::Pin;
+use futures::task::{Context, Poll};
+
+#[test]
+fn dropping_does_not_segfault() {
+ FutureObj::new(async { String::new() }.boxed());
+}
+
+#[test]
+fn dropping_drops_the_future() {
+ let mut times_dropped = 0;
+
+ struct Inc<'a>(&'a mut u32);
+
+ impl Future for Inc<'_> {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<()> {
+ unimplemented!()
+ }
+ }
+
+ impl Drop for Inc<'_> {
+ fn drop(&mut self) {
+ *self.0 += 1;
+ }
+ }
+
+ FutureObj::new(Inc(&mut times_dropped).boxed());
+
+ assert_eq!(times_dropped, 1);
+}
diff --git a/third_party/rust/futures/tests/future_try_flatten_stream.rs b/third_party/rust/futures/tests/future_try_flatten_stream.rs
new file mode 100644
index 0000000000..082c5efa9a
--- /dev/null
+++ b/third_party/rust/futures/tests/future_try_flatten_stream.rs
@@ -0,0 +1,82 @@
+use core::marker::PhantomData;
+use core::pin::Pin;
+use futures::executor::block_on_stream;
+use futures::future::{ok, err, TryFutureExt};
+use futures::sink::Sink;
+use futures::stream::{self, Stream, StreamExt};
+use futures::task::{Context, Poll};
+
+#[test]
+fn successful_future() {
+ let stream_items = vec![17, 19];
+ let future_of_a_stream = ok::<_, bool>(stream::iter(stream_items).map(Ok));
+
+ let stream = future_of_a_stream.try_flatten_stream();
+
+ let mut iter = block_on_stream(stream);
+ assert_eq!(Ok(17), iter.next().unwrap());
+ assert_eq!(Ok(19), iter.next().unwrap());
+ assert_eq!(None, iter.next());
+}
+
+struct PanickingStream<T, E> {
+ _marker: PhantomData<(T, E)>
+}
+
+impl<T, E> Stream for PanickingStream<T, E> {
+ type Item = Result<T, E>;
+
+ fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ panic!()
+ }
+}
+
+#[test]
+fn failed_future() {
+ let future_of_a_stream = err::<PanickingStream<bool, u32>, _>(10);
+ let stream = future_of_a_stream.try_flatten_stream();
+ let mut iter = block_on_stream(stream);
+ assert_eq!(Err(10), iter.next().unwrap());
+ assert_eq!(None, iter.next());
+}
+
+struct StreamSink<T, E, Item>(PhantomData<(T, E, Item)>);
+
+impl<T, E, Item> Stream for StreamSink<T, E, Item> {
+ type Item = Result<T, E>;
+ fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ panic!()
+ }
+}
+
+impl<T, E, Item> Sink<Item> for StreamSink<T, E, Item> {
+ type Error = E;
+ fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ panic!()
+ }
+ fn start_send(self: Pin<&mut Self>, _: Item) -> Result<(), Self::Error> {
+ panic!()
+ }
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ panic!()
+ }
+ fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ panic!()
+ }
+}
+
+fn assert_stream<S: Stream>(_: &S) {}
+fn assert_sink<S: Sink<Item>, Item>(_: &S) {}
+fn assert_stream_sink<S: Stream + Sink<Item>, Item>(_: &S) {}
+
+#[test]
+fn assert_impls() {
+ let s = ok(StreamSink::<(), (), ()>(PhantomData)).try_flatten_stream();
+ assert_stream(&s);
+ assert_sink(&s);
+ assert_stream_sink(&s);
+ let s = ok(StreamSink::<(), (), ()>(PhantomData)).flatten_sink();
+ assert_stream(&s);
+ assert_sink(&s);
+ assert_stream_sink(&s);
+}
diff --git a/third_party/rust/futures/tests/futures_ordered.rs b/third_party/rust/futures/tests/futures_ordered.rs
new file mode 100644
index 0000000000..d06b62f76c
--- /dev/null
+++ b/third_party/rust/futures/tests/futures_ordered.rs
@@ -0,0 +1,83 @@
+use futures::channel::oneshot;
+use futures::executor::{block_on, block_on_stream};
+use futures::future::{self, join, Future, FutureExt, TryFutureExt};
+use futures::stream::{StreamExt, FuturesOrdered};
+use futures_test::task::noop_context;
+use std::any::Any;
+
+#[test]
+fn works_1() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesOrdered<_>>();
+
+ b_tx.send(99).unwrap();
+ assert!(stream.poll_next_unpin(&mut noop_context()).is_pending());
+
+ a_tx.send(33).unwrap();
+ c_tx.send(33).unwrap();
+
+ let mut iter = block_on_stream(stream);
+ assert_eq!(Some(Ok(33)), iter.next());
+ assert_eq!(Some(Ok(99)), iter.next());
+ assert_eq!(Some(Ok(33)), iter.next());
+ assert_eq!(None, iter.next());
+}
+
+#[test]
+fn works_2() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut stream = vec![
+ a_rx.boxed(),
+ join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(),
+ ].into_iter().collect::<FuturesOrdered<_>>();
+
+ let mut cx = noop_context();
+ a_tx.send(33).unwrap();
+ b_tx.send(33).unwrap();
+ assert!(stream.poll_next_unpin(&mut cx).is_ready());
+ assert!(stream.poll_next_unpin(&mut cx).is_pending());
+ c_tx.send(33).unwrap();
+ assert!(stream.poll_next_unpin(&mut cx).is_ready());
+}
+
+#[test]
+fn from_iterator() {
+ let stream = vec![
+ future::ready::<i32>(1),
+ future::ready::<i32>(2),
+ future::ready::<i32>(3)
+ ].into_iter().collect::<FuturesOrdered<_>>();
+ assert_eq!(stream.len(), 3);
+ assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1,2,3]);
+}
+
+#[test]
+fn queue_never_unblocked() {
+ let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
+ let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>();
+ let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>();
+
+ let mut stream = vec![
+ Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>,
+ Box::new(future::try_select(b_rx, c_rx)
+ .map_err(|e| e.factor_first().0)
+ .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>))) as _,
+ ].into_iter().collect::<FuturesOrdered<_>>();
+
+ let cx = &mut noop_context();
+ for _ in 0..10 {
+ assert!(stream.poll_next_unpin(cx).is_pending());
+ }
+
+ b_tx.send(Box::new(())).unwrap();
+ assert!(stream.poll_next_unpin(cx).is_pending());
+ c_tx.send(Box::new(())).unwrap();
+ assert!(stream.poll_next_unpin(cx).is_pending());
+ assert!(stream.poll_next_unpin(cx).is_pending());
+}
diff --git a/third_party/rust/futures/tests/futures_unordered.rs b/third_party/rust/futures/tests/futures_unordered.rs
new file mode 100644
index 0000000000..57eb98fd1b
--- /dev/null
+++ b/third_party/rust/futures/tests/futures_unordered.rs
@@ -0,0 +1,287 @@
+use std::marker::Unpin;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
+
+use futures::channel::oneshot;
+use futures::executor::{block_on, block_on_stream};
+use futures::future::{self, join, Future, FutureExt};
+use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
+use futures::task::{Context, Poll};
+use futures_test::future::FutureTestExt;
+use futures_test::task::noop_context;
+use futures_test::{assert_stream_done, assert_stream_next};
+
+#[test]
+fn is_terminated() {
+ let mut cx = noop_context();
+ let mut tasks = FuturesUnordered::new();
+
+ assert_eq!(tasks.is_terminated(), false);
+ assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
+ assert_eq!(tasks.is_terminated(), true);
+
+ // Test that the sentinel value doesn't leak
+ assert_eq!(tasks.is_empty(), true);
+ assert_eq!(tasks.len(), 0);
+ assert_eq!(tasks.iter_mut().len(), 0);
+
+ tasks.push(future::ready(1));
+
+ assert_eq!(tasks.is_empty(), false);
+ assert_eq!(tasks.len(), 1);
+ assert_eq!(tasks.iter_mut().len(), 1);
+
+ assert_eq!(tasks.is_terminated(), false);
+ assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
+ assert_eq!(tasks.is_terminated(), false);
+ assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
+ assert_eq!(tasks.is_terminated(), true);
+}
+
+#[test]
+fn works_1() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut iter = block_on_stream(
+ vec![a_rx, b_rx, c_rx]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>(),
+ );
+
+ b_tx.send(99).unwrap();
+ assert_eq!(Some(Ok(99)), iter.next());
+
+ a_tx.send(33).unwrap();
+ c_tx.send(33).unwrap();
+ assert_eq!(Some(Ok(33)), iter.next());
+ assert_eq!(Some(Ok(33)), iter.next());
+ assert_eq!(None, iter.next());
+}
+
+#[test]
+fn works_2() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut stream = vec![
+ a_rx.boxed(),
+ join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(),
+ ]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+
+ a_tx.send(9).unwrap();
+ b_tx.send(10).unwrap();
+
+ let mut cx = noop_context();
+ assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(9))));
+ c_tx.send(20).unwrap();
+ assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(30))));
+ assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(None));
+}
+
+#[test]
+fn from_iterator() {
+ let stream = vec![
+ future::ready::<i32>(1),
+ future::ready::<i32>(2),
+ future::ready::<i32>(3),
+ ]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+ assert_eq!(stream.len(), 3);
+ assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
+}
+
+#[test]
+fn finished_future() {
+ let (_a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut stream = vec![
+ Box::new(a_rx) as Box<dyn Future<Output = Result<_, _>> + Unpin>,
+ Box::new(future::select(b_rx, c_rx).map(|e| e.factor_first().0)) as _,
+ ]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+
+ let cx = &mut noop_context();
+ for _ in 0..10 {
+ assert!(stream.poll_next_unpin(cx).is_pending());
+ }
+
+ b_tx.send(12).unwrap();
+ c_tx.send(3).unwrap();
+ assert!(stream.poll_next_unpin(cx).is_ready());
+ assert!(stream.poll_next_unpin(cx).is_pending());
+ assert!(stream.poll_next_unpin(cx).is_pending());
+}
+
+#[test]
+fn iter_mut_cancel() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut stream = vec![a_rx, b_rx, c_rx]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+
+ for rx in stream.iter_mut() {
+ rx.close();
+ }
+
+ let mut iter = block_on_stream(stream);
+
+ assert!(a_tx.is_canceled());
+ assert!(b_tx.is_canceled());
+ assert!(c_tx.is_canceled());
+
+ assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
+ assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
+ assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
+ assert_eq!(iter.next(), None);
+}
+
+#[test]
+fn iter_mut_len() {
+ let mut stream = vec![
+ future::pending::<()>(),
+ future::pending::<()>(),
+ future::pending::<()>(),
+ ]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+
+ let mut iter_mut = stream.iter_mut();
+ assert_eq!(iter_mut.len(), 3);
+ assert!(iter_mut.next().is_some());
+ assert_eq!(iter_mut.len(), 2);
+ assert!(iter_mut.next().is_some());
+ assert_eq!(iter_mut.len(), 1);
+ assert!(iter_mut.next().is_some());
+ assert_eq!(iter_mut.len(), 0);
+ assert!(iter_mut.next().is_none());
+}
+
+#[test]
+fn iter_cancel() {
+ struct AtomicCancel<F> {
+ future: F,
+ cancel: AtomicBool,
+ }
+
+ impl<F: Future + Unpin> Future for AtomicCancel<F> {
+ type Output = Option<<F as Future>::Output>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ if self.cancel.load(Ordering::Relaxed) {
+ Poll::Ready(None)
+ } else {
+ self.future.poll_unpin(cx).map(Some)
+ }
+ }
+ }
+
+ impl<F: Future + Unpin> AtomicCancel<F> {
+ fn new(future: F) -> Self {
+ Self { future, cancel: AtomicBool::new(false) }
+ }
+ }
+
+ let stream = vec![
+ AtomicCancel::new(future::pending::<()>()),
+ AtomicCancel::new(future::pending::<()>()),
+ AtomicCancel::new(future::pending::<()>()),
+ ]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+
+ for f in stream.iter() {
+ f.cancel.store(true, Ordering::Relaxed);
+ }
+
+ let mut iter = block_on_stream(stream);
+
+ assert_eq!(iter.next(), Some(None));
+ assert_eq!(iter.next(), Some(None));
+ assert_eq!(iter.next(), Some(None));
+ assert_eq!(iter.next(), None);
+}
+
+#[test]
+fn iter_len() {
+ let stream = vec![
+ future::pending::<()>(),
+ future::pending::<()>(),
+ future::pending::<()>(),
+ ]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+
+ let mut iter = stream.iter();
+ assert_eq!(iter.len(), 3);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 2);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 1);
+ assert!(iter.next().is_some());
+ assert_eq!(iter.len(), 0);
+ assert!(iter.next().is_none());
+}
+
+#[test]
+fn futures_not_moved_after_poll() {
+ // Future that will be ready after being polled twice,
+ // asserting that it does not move.
+ let fut = future::ready(()).pending_once().assert_unmoved();
+ let mut stream = vec![fut; 3].into_iter().collect::<FuturesUnordered<_>>();
+ assert_stream_next!(stream, ());
+ assert_stream_next!(stream, ());
+ assert_stream_next!(stream, ());
+ assert_stream_done!(stream);
+}
+
+#[test]
+fn len_valid_during_out_of_order_completion() {
+ // Complete futures out-of-order and add new futures afterwards to ensure
+ // length values remain correct.
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+ let (d_tx, d_rx) = oneshot::channel::<i32>();
+
+ let mut cx = noop_context();
+ let mut stream = FuturesUnordered::new();
+ assert_eq!(stream.len(), 0);
+
+ stream.push(a_rx);
+ assert_eq!(stream.len(), 1);
+ stream.push(b_rx);
+ assert_eq!(stream.len(), 2);
+ stream.push(c_rx);
+ assert_eq!(stream.len(), 3);
+
+ b_tx.send(4).unwrap();
+ assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(4))));
+ assert_eq!(stream.len(), 2);
+
+ stream.push(d_rx);
+ assert_eq!(stream.len(), 3);
+
+ c_tx.send(5).unwrap();
+ assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(5))));
+ assert_eq!(stream.len(), 2);
+
+ d_tx.send(6).unwrap();
+ assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(6))));
+ assert_eq!(stream.len(), 1);
+
+ a_tx.send(7).unwrap();
+ assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(7))));
+ assert_eq!(stream.len(), 0);
+}
diff --git a/third_party/rust/futures/tests/inspect.rs b/third_party/rust/futures/tests/inspect.rs
new file mode 100644
index 0000000000..42f6f73634
--- /dev/null
+++ b/third_party/rust/futures/tests/inspect.rs
@@ -0,0 +1,14 @@
+use futures::executor::block_on;
+use futures::future::{self, FutureExt};
+
+#[test]
+fn smoke() {
+ let mut counter = 0;
+
+ {
+ let work = future::ready::<i32>(40).inspect(|val| { counter += *val; });
+ assert_eq!(block_on(work), 40);
+ }
+
+ assert_eq!(counter, 40);
+}
diff --git a/third_party/rust/futures/tests/io_buf_reader.rs b/third_party/rust/futures/tests/io_buf_reader.rs
new file mode 100644
index 0000000000..a3d723a691
--- /dev/null
+++ b/third_party/rust/futures/tests/io_buf_reader.rs
@@ -0,0 +1,321 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{
+ AsyncSeek, AsyncSeekExt, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt,
+ AllowStdIo, BufReader, Cursor, SeekFrom,
+};
+use futures::task::{Context, Poll};
+use futures_test::task::noop_context;
+use std::cmp;
+use std::io;
+use std::pin::Pin;
+
+/// A dummy reader intended at testing short-reads propagation.
+struct ShortReader {
+ lengths: Vec<usize>,
+}
+
+impl io::Read for ShortReader {
+ fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
+ if self.lengths.is_empty() {
+ Ok(0)
+ } else {
+ Ok(self.lengths.remove(0))
+ }
+ }
+}
+
+macro_rules! run_fill_buf {
+ ($reader:expr) => {{
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
+ break x;
+ }
+ }
+ }};
+}
+
+#[test]
+fn test_buffered_reader() {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let mut reader = BufReader::with_capacity(2, inner);
+
+ let mut buf = [0, 0, 0];
+ let nread = block_on(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 3);
+ assert_eq!(buf, [5, 6, 7]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0, 0];
+ let nread = block_on(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 2);
+ assert_eq!(buf, [0, 1]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0];
+ let nread = block_on(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 1);
+ assert_eq!(buf, [2]);
+ assert_eq!(reader.buffer(), [3]);
+
+ let mut buf = [0, 0, 0];
+ let nread = block_on(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 1);
+ assert_eq!(buf, [3, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ let nread = block_on(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 1);
+ assert_eq!(buf, [4, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+}
+
+#[test]
+fn test_buffered_reader_seek() {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let mut reader = BufReader::with_capacity(2, Cursor::new(inner));
+
+ assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3));
+ assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
+ assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
+ assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
+ assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
+ assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
+ Pin::new(&mut reader).consume(1);
+ assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
+}
+
+#[test]
+fn test_buffered_reader_seek_underflow() {
+ // gimmick reader that yields its position modulo 256 for each byte
+ struct PositionReader {
+ pos: u64
+ }
+ impl io::Read for PositionReader {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ let len = buf.len();
+ for x in buf {
+ *x = self.pos as u8;
+ self.pos = self.pos.wrapping_add(1);
+ }
+ Ok(len)
+ }
+ }
+ impl io::Seek for PositionReader {
+ fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
+ match pos {
+ SeekFrom::Start(n) => {
+ self.pos = n;
+ }
+ SeekFrom::Current(n) => {
+ self.pos = self.pos.wrapping_add(n as u64);
+ }
+ SeekFrom::End(n) => {
+ self.pos = u64::max_value().wrapping_add(n as u64);
+ }
+ }
+ Ok(self.pos)
+ }
+ }
+
+ let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
+ assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..]));
+ assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value()-5));
+ assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
+ // the following seek will require two underlying seeks
+ let expected = 9_223_372_036_854_775_802;
+ assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected));
+ assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
+ // seeking to 0 should empty the buffer.
+ assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected));
+ assert_eq!(reader.get_ref().get_ref().pos, expected);
+}
+
+#[test]
+fn test_short_reads() {
+ let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
+ let mut reader = BufReader::new(AllowStdIo::new(inner));
+ let mut buf = [0, 0];
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 2);
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+ assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+}
+
+struct MaybePending<'a> {
+ inner: &'a [u8],
+ ready_read: bool,
+ ready_fill_buf: bool,
+}
+
+impl<'a> MaybePending<'a> {
+ fn new(inner: &'a [u8]) -> Self {
+ Self { inner, ready_read: false, ready_fill_buf: false }
+ }
+}
+
+impl AsyncRead for MaybePending<'_> {
+ fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
+ -> Poll<io::Result<usize>>
+ {
+ if self.ready_read {
+ self.ready_read = false;
+ Pin::new(&mut self.inner).poll_read(cx, buf)
+ } else {
+ self.ready_read = true;
+ Poll::Pending
+ }
+ }
+}
+
+impl AsyncBufRead for MaybePending<'_> {
+ fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>)
+ -> Poll<io::Result<&[u8]>>
+ {
+ if self.ready_fill_buf {
+ self.ready_fill_buf = false;
+ if self.inner.is_empty() { return Poll::Ready(Ok(&[])) }
+ let len = cmp::min(2, self.inner.len());
+ Poll::Ready(Ok(&self.inner[0..len]))
+ } else {
+ self.ready_fill_buf = true;
+ Poll::Pending
+ }
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ self.inner = &self.inner[amt..];
+ }
+}
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+}
+
+#[test]
+fn maybe_pending() {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let mut reader = BufReader::with_capacity(2, MaybePending::new(inner));
+
+ let mut buf = [0, 0, 0];
+ let nread = run(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 3);
+ assert_eq!(buf, [5, 6, 7]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0, 0];
+ let nread = run(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 2);
+ assert_eq!(buf, [0, 1]);
+ assert_eq!(reader.buffer(), []);
+
+ let mut buf = [0];
+ let nread = run(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 1);
+ assert_eq!(buf, [2]);
+ assert_eq!(reader.buffer(), [3]);
+
+ let mut buf = [0, 0, 0];
+ let nread = run(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 1);
+ assert_eq!(buf, [3, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ let nread = run(reader.read(&mut buf));
+ assert_eq!(nread.unwrap(), 1);
+ assert_eq!(buf, [4, 0, 0]);
+ assert_eq!(reader.buffer(), []);
+
+ assert_eq!(run(reader.read(&mut buf)).unwrap(), 0);
+}
+
+#[test]
+fn maybe_pending_buf_read() {
+ let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]);
+ let mut reader = BufReader::with_capacity(2, inner);
+ let mut v = Vec::new();
+ run(reader.read_until(3, &mut v)).unwrap();
+ assert_eq!(v, [0, 1, 2, 3]);
+ v.clear();
+ run(reader.read_until(1, &mut v)).unwrap();
+ assert_eq!(v, [1]);
+ v.clear();
+ run(reader.read_until(8, &mut v)).unwrap();
+ assert_eq!(v, [0]);
+ v.clear();
+ run(reader.read_until(9, &mut v)).unwrap();
+ assert_eq!(v, []);
+}
+
+struct MaybePendingSeek<'a> {
+ inner: Cursor<&'a [u8]>,
+ ready: bool,
+}
+
+impl<'a> MaybePendingSeek<'a> {
+ fn new(inner: &'a [u8]) -> Self {
+ Self { inner: Cursor::new(inner), ready: true }
+ }
+}
+
+impl AsyncRead for MaybePendingSeek<'_> {
+ fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
+ -> Poll<io::Result<usize>>
+ {
+ Pin::new(&mut self.inner).poll_read(cx, buf)
+ }
+}
+
+impl AsyncBufRead for MaybePendingSeek<'_> {
+ fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
+ -> Poll<io::Result<&[u8]>>
+ {
+ let this: *mut Self = &mut *self as *mut _;
+ Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx)
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ Pin::new(&mut self.inner).consume(amt)
+ }
+}
+
+impl AsyncSeek for MaybePendingSeek<'_> {
+ fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
+ -> Poll<io::Result<u64>>
+ {
+ if self.ready {
+ self.ready = false;
+ Pin::new(&mut self.inner).poll_seek(cx, pos)
+ } else {
+ self.ready = true;
+ Poll::Pending
+ }
+ }
+}
+
+// https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
+#[test]
+fn maybe_pending_seek() {
+ let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+ let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
+
+ assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3));
+ assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
+ assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
+ assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
+ assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
+ assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
+ Pin::new(&mut reader).consume(1);
+ assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
+}
diff --git a/third_party/rust/futures/tests/io_buf_writer.rs b/third_party/rust/futures/tests/io_buf_writer.rs
new file mode 100644
index 0000000000..7bdcd16df0
--- /dev/null
+++ b/third_party/rust/futures/tests/io_buf_writer.rs
@@ -0,0 +1,236 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom};
+use futures::task::{Context, Poll};
+use futures_test::task::noop_context;
+use std::io;
+use std::pin::Pin;
+
+#[test]
+fn buf_writer() {
+ let mut writer = BufWriter::with_capacity(2, Vec::new());
+
+ block_on(writer.write(&[0, 1])).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(*writer.get_ref(), [0, 1]);
+
+ block_on(writer.write(&[2])).unwrap();
+ assert_eq!(writer.buffer(), [2]);
+ assert_eq!(*writer.get_ref(), [0, 1]);
+
+ block_on(writer.write(&[3])).unwrap();
+ assert_eq!(writer.buffer(), [2, 3]);
+ assert_eq!(*writer.get_ref(), [0, 1]);
+
+ block_on(writer.flush()).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3]);
+
+ block_on(writer.write(&[4])).unwrap();
+ block_on(writer.write(&[5])).unwrap();
+ assert_eq!(writer.buffer(), [4, 5]);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3]);
+
+ block_on(writer.write(&[6])).unwrap();
+ assert_eq!(writer.buffer(), [6]);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]);
+
+ block_on(writer.write(&[7, 8])).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8]);
+
+ block_on(writer.write(&[9, 10, 11])).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
+
+ block_on(writer.flush()).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
+}
+
+#[test]
+fn buf_writer_inner_flushes() {
+ let mut w = BufWriter::with_capacity(3, Vec::new());
+ block_on(w.write(&[0, 1])).unwrap();
+ assert_eq!(*w.get_ref(), []);
+ block_on(w.flush()).unwrap();
+ let w = w.into_inner();
+ assert_eq!(w, [0, 1]);
+}
+
+#[test]
+fn buf_writer_seek() {
+ // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed,
+ // use `Vec::new` instead of `vec![0; 8]`.
+ let mut w = BufWriter::with_capacity(3, Cursor::new(vec![0; 8]));
+ block_on(w.write_all(&[0, 1, 2, 3, 4, 5])).unwrap();
+ block_on(w.write_all(&[6, 7])).unwrap();
+ assert_eq!(block_on(w.seek(SeekFrom::Current(0))).ok(), Some(8));
+ assert_eq!(&w.get_ref().get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]);
+ assert_eq!(block_on(w.seek(SeekFrom::Start(2))).ok(), Some(2));
+ block_on(w.write_all(&[8, 9])).unwrap();
+ block_on(w.flush()).unwrap();
+ assert_eq!(&w.into_inner().into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]);
+}
+
+struct MaybePending {
+ inner: Vec<u8>,
+ ready: bool,
+}
+
+impl MaybePending {
+ fn new(inner: Vec<u8>) -> Self {
+ Self { inner, ready: false }
+ }
+}
+
+impl AsyncWrite for MaybePending {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if self.ready {
+ self.ready = false;
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ } else {
+ self.ready = true;
+ Poll::Pending
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_close(cx)
+ }
+}
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+}
+
+#[test]
+fn maybe_pending_buf_writer() {
+ let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new()));
+
+ run(writer.write(&[0, 1])).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(&writer.get_ref().inner, &[0, 1]);
+
+ run(writer.write(&[2])).unwrap();
+ assert_eq!(writer.buffer(), [2]);
+ assert_eq!(&writer.get_ref().inner, &[0, 1]);
+
+ run(writer.write(&[3])).unwrap();
+ assert_eq!(writer.buffer(), [2, 3]);
+ assert_eq!(&writer.get_ref().inner, &[0, 1]);
+
+ run(writer.flush()).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]);
+
+ run(writer.write(&[4])).unwrap();
+ run(writer.write(&[5])).unwrap();
+ assert_eq!(writer.buffer(), [4, 5]);
+ assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3]);
+
+ run(writer.write(&[6])).unwrap();
+ assert_eq!(writer.buffer(), [6]);
+ assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5]);
+
+ run(writer.write(&[7, 8])).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8]);
+
+ run(writer.write(&[9, 10, 11])).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
+
+ run(writer.flush()).unwrap();
+ assert_eq!(writer.buffer(), []);
+ assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
+}
+
+#[test]
+fn maybe_pending_buf_writer_inner_flushes() {
+ let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new()));
+ run(w.write(&[0, 1])).unwrap();
+ assert_eq!(&w.get_ref().inner, &[]);
+ run(w.flush()).unwrap();
+ let w = w.into_inner().inner;
+ assert_eq!(w, [0, 1]);
+}
+
+
+struct MaybePendingSeek {
+ inner: Cursor<Vec<u8>>,
+ ready_write: bool,
+ ready_seek: bool,
+}
+
+impl MaybePendingSeek {
+ fn new(inner: Vec<u8>) -> Self {
+ Self { inner: Cursor::new(inner), ready_write: false, ready_seek: false }
+ }
+}
+
+impl AsyncWrite for MaybePendingSeek {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if self.ready_write {
+ self.ready_write = false;
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ } else {
+ self.ready_write = true;
+ Poll::Pending
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Pin::new(&mut self.inner).poll_close(cx)
+ }
+}
+
+impl AsyncSeek for MaybePendingSeek {
+ fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
+ -> Poll<io::Result<u64>>
+ {
+ if self.ready_seek {
+ self.ready_seek = false;
+ Pin::new(&mut self.inner).poll_seek(cx, pos)
+ } else {
+ self.ready_seek = true;
+ Poll::Pending
+ }
+ }
+}
+
+#[test]
+fn maybe_pending_buf_writer_seek() {
+ // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed,
+ // use `Vec::new` instead of `vec![0; 8]`.
+ let mut w = BufWriter::with_capacity(3, MaybePendingSeek::new(vec![0; 8]));
+ run(w.write_all(&[0, 1, 2, 3, 4, 5])).unwrap();
+ run(w.write_all(&[6, 7])).unwrap();
+ assert_eq!(run(w.seek(SeekFrom::Current(0))).ok(), Some(8));
+ assert_eq!(&w.get_ref().inner.get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]);
+ assert_eq!(run(w.seek(SeekFrom::Start(2))).ok(), Some(2));
+ run(w.write_all(&[8, 9])).unwrap();
+ run(w.flush()).unwrap();
+ assert_eq!(&w.into_inner().inner.into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]);
+}
diff --git a/third_party/rust/futures/tests/io_cursor.rs b/third_party/rust/futures/tests/io_cursor.rs
new file mode 100644
index 0000000000..4f80a75a37
--- /dev/null
+++ b/third_party/rust/futures/tests/io_cursor.rs
@@ -0,0 +1,29 @@
+use assert_matches::assert_matches;
+use futures::future::lazy;
+use futures::io::{AsyncWrite, Cursor};
+use futures::task::Poll;
+use std::pin::Pin;
+
+#[test]
+fn cursor_asyncwrite_vec() {
+ let mut cursor = Cursor::new(vec![0; 5]);
+ futures::executor::block_on(lazy(|cx| {
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2)));
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2)));
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(2)));
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[6, 7]), Poll::Ready(Ok(2)));
+ }));
+ assert_eq!(cursor.into_inner(), [1, 2, 3, 4, 5, 6, 6, 7]);
+}
+
+#[test]
+fn cursor_asyncwrite_box() {
+ let mut cursor = Cursor::new(vec![0; 5].into_boxed_slice());
+ futures::executor::block_on(lazy(|cx| {
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2)));
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2)));
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(1)));
+ assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[6, 7]), Poll::Ready(Ok(0)));
+ }));
+ assert_eq!(&*cursor.into_inner(), [1, 2, 3, 4, 5]);
+}
diff --git a/third_party/rust/futures/tests/io_lines.rs b/third_party/rust/futures/tests/io_lines.rs
new file mode 100644
index 0000000000..39eafa9a66
--- /dev/null
+++ b/third_party/rust/futures/tests/io_lines.rs
@@ -0,0 +1,62 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::io::{AsyncBufReadExt, Cursor};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
+
+macro_rules! block_on_next {
+ ($expr:expr) => {
+ block_on($expr.next()).unwrap().unwrap()
+ };
+}
+
+#[test]
+fn lines() {
+ let buf = Cursor::new(&b"12\r"[..]);
+ let mut s = buf.lines();
+ assert_eq!(block_on_next!(s), "12\r".to_string());
+ assert!(block_on(s.next()).is_none());
+
+ let buf = Cursor::new(&b"12\r\n\n"[..]);
+ let mut s = buf.lines();
+ assert_eq!(block_on_next!(s), "12".to_string());
+ assert_eq!(block_on_next!(s), "".to_string());
+ assert!(block_on(s.next()).is_none());
+}
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+}
+
+macro_rules! run_next {
+ ($expr:expr) => {
+ run($expr.next()).unwrap().unwrap()
+ };
+}
+
+#[test]
+fn maybe_pending() {
+ let buf = stream::iter(vec![&b"12"[..], &b"\r"[..]])
+ .map(Ok)
+ .into_async_read()
+ .interleave_pending();
+ let mut s = buf.lines();
+ assert_eq!(run_next!(s), "12\r".to_string());
+ assert!(run(s.next()).is_none());
+
+ let buf = stream::iter(vec![&b"12"[..], &b"\r\n"[..], &b"\n"[..]])
+ .map(Ok)
+ .into_async_read()
+ .interleave_pending();
+ let mut s = buf.lines();
+ assert_eq!(run_next!(s), "12".to_string());
+ assert_eq!(run_next!(s), "".to_string());
+ assert!(run(s.next()).is_none());
+}
diff --git a/third_party/rust/futures/tests/io_read.rs b/third_party/rust/futures/tests/io_read.rs
new file mode 100644
index 0000000000..f99c4edbb6
--- /dev/null
+++ b/third_party/rust/futures/tests/io_read.rs
@@ -0,0 +1,65 @@
+use futures::io::AsyncRead;
+use futures_test::task::panic_context;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+struct MockReader {
+ fun: Box<dyn FnMut(&mut [u8]) -> Poll<io::Result<usize>>>,
+}
+
+impl MockReader {
+ pub fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
+ MockReader { fun: Box::new(fun) }
+ }
+}
+
+impl AsyncRead for MockReader {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut [u8]
+ ) -> Poll<io::Result<usize>> {
+ (self.get_mut().fun)(buf)
+ }
+}
+
+/// Verifies that the default implementation of `poll_read_vectored`
+/// calls `poll_read` with an empty slice if no buffers are provided.
+#[test]
+fn read_vectored_no_buffers() {
+ let mut reader = MockReader::new(|buf| {
+ assert_eq!(buf, b"");
+ Err(io::ErrorKind::BrokenPipe.into()).into()
+ });
+ let cx = &mut panic_context();
+ let bufs = &mut [];
+
+ let res = Pin::new(&mut reader).poll_read_vectored(cx, bufs);
+ let res = res.map_err(|e| e.kind());
+ assert_eq!(res, Poll::Ready(Err(io::ErrorKind::BrokenPipe)))
+}
+
+/// Verifies that the default implementation of `poll_read_vectored`
+/// calls `poll_read` with the first non-empty buffer.
+#[test]
+fn read_vectored_first_non_empty() {
+ let mut reader = MockReader::new(|buf| {
+ assert_eq!(buf.len(), 4);
+ buf.copy_from_slice(b"four");
+ Poll::Ready(Ok(4))
+ });
+ let cx = &mut panic_context();
+ let mut buf = [0; 4];
+ let bufs = &mut [
+ io::IoSliceMut::new(&mut []),
+ io::IoSliceMut::new(&mut []),
+ io::IoSliceMut::new(&mut buf),
+ ];
+
+ let res = Pin::new(&mut reader).poll_read_vectored(cx, bufs);
+ let res = res.map_err(|e| e.kind());
+ assert_eq!(res, Poll::Ready(Ok(4)));
+ assert_eq!(buf, b"four"[..]);
+}
+
diff --git a/third_party/rust/futures/tests/io_read_exact.rs b/third_party/rust/futures/tests/io_read_exact.rs
new file mode 100644
index 0000000000..4941773cb5
--- /dev/null
+++ b/third_party/rust/futures/tests/io_read_exact.rs
@@ -0,0 +1,17 @@
+use futures::executor::block_on;
+use futures::io::AsyncReadExt;
+
+#[test]
+fn read_exact() {
+ let mut reader: &[u8] = &[1, 2, 3, 4, 5];
+ let mut out = [0u8; 3];
+
+ let res = block_on(reader.read_exact(&mut out)); // read 3 bytes out
+ assert!(res.is_ok());
+ assert_eq!(out, [1,2,3]);
+ assert_eq!(reader.len(), 2);
+
+ let res = block_on(reader.read_exact(&mut out)); // read another 3 bytes, but only 2 bytes left
+ assert!(res.is_err());
+ assert_eq!(reader.len(), 0);
+}
diff --git a/third_party/rust/futures/tests/io_read_line.rs b/third_party/rust/futures/tests/io_read_line.rs
new file mode 100644
index 0000000000..d1dba5e6d2
--- /dev/null
+++ b/third_party/rust/futures/tests/io_read_line.rs
@@ -0,0 +1,60 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::io::{AsyncBufReadExt, Cursor};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
+
+#[test]
+fn read_line() {
+ let mut buf = Cursor::new(b"12");
+ let mut v = String::new();
+ assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2);
+ assert_eq!(v, "12");
+
+ let mut buf = Cursor::new(b"12\n\n");
+ let mut v = String::new();
+ assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 3);
+ assert_eq!(v, "12\n");
+ v.clear();
+ assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 1);
+ assert_eq!(v, "\n");
+ v.clear();
+ assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 0);
+ assert_eq!(v, "");
+}
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+}
+
+#[test]
+fn maybe_pending() {
+ let mut buf = b"12".interleave_pending();
+ let mut v = String::new();
+ assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2);
+ assert_eq!(v, "12");
+
+ let mut buf = stream::iter(vec![&b"12"[..], &b"\n\n"[..]])
+ .map(Ok)
+ .into_async_read()
+ .interleave_pending();
+ let mut v = String::new();
+ assert_eq!(run(buf.read_line(&mut v)).unwrap(), 3);
+ assert_eq!(v, "12\n");
+ v.clear();
+ assert_eq!(run(buf.read_line(&mut v)).unwrap(), 1);
+ assert_eq!(v, "\n");
+ v.clear();
+ assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0);
+ assert_eq!(v, "");
+ v.clear();
+ assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0);
+ assert_eq!(v, "");
+}
diff --git a/third_party/rust/futures/tests/io_read_to_string.rs b/third_party/rust/futures/tests/io_read_to_string.rs
new file mode 100644
index 0000000000..db825af21c
--- /dev/null
+++ b/third_party/rust/futures/tests/io_read_to_string.rs
@@ -0,0 +1,45 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::io::{AsyncReadExt, Cursor};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
+
+#[test]
+fn read_to_string() {
+ let mut c = Cursor::new(&b""[..]);
+ let mut v = String::new();
+ assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 0);
+ assert_eq!(v, "");
+
+ let mut c = Cursor::new(&b"1"[..]);
+ let mut v = String::new();
+ assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 1);
+ assert_eq!(v, "1");
+
+ let mut c = Cursor::new(&b"\xff"[..]);
+ let mut v = String::new();
+ assert!(block_on(c.read_to_string(&mut v)).is_err());
+}
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+}
+
+#[test]
+fn interleave_pending() {
+ let mut buf = stream::iter(vec![&b"12"[..], &b"33"[..], &b"3"[..]])
+ .map(Ok)
+ .into_async_read()
+ .interleave_pending();
+
+ let mut v = String::new();
+ assert_eq!(run(buf.read_to_string(&mut v)).unwrap(), 5);
+ assert_eq!(v, "12333");
+}
diff --git a/third_party/rust/futures/tests/io_read_until.rs b/third_party/rust/futures/tests/io_read_until.rs
new file mode 100644
index 0000000000..5152281795
--- /dev/null
+++ b/third_party/rust/futures/tests/io_read_until.rs
@@ -0,0 +1,60 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::io::{AsyncBufReadExt, Cursor};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
+
+#[test]
+fn read_until() {
+ let mut buf = Cursor::new(b"12");
+ let mut v = Vec::new();
+ assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 2);
+ assert_eq!(v, b"12");
+
+ let mut buf = Cursor::new(b"1233");
+ let mut v = Vec::new();
+ assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 3);
+ assert_eq!(v, b"123");
+ v.truncate(0);
+ assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 1);
+ assert_eq!(v, b"3");
+ v.truncate(0);
+ assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 0);
+ assert_eq!(v, []);
+}
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+ let mut cx = noop_context();
+ loop {
+ if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+ return x;
+ }
+ }
+}
+
+#[test]
+fn maybe_pending() {
+ let mut buf = b"12".interleave_pending();
+ let mut v = Vec::new();
+ assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2);
+ assert_eq!(v, b"12");
+
+ let mut buf = stream::iter(vec![&b"12"[..], &b"33"[..], &b"3"[..]])
+ .map(Ok)
+ .into_async_read()
+ .interleave_pending();
+ let mut v = Vec::new();
+ assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 3);
+ assert_eq!(v, b"123");
+ v.clear();
+ assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 1);
+ assert_eq!(v, b"3");
+ v.clear();
+ assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 1);
+ assert_eq!(v, b"3");
+ v.clear();
+ assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 0);
+ assert_eq!(v, []);
+}
diff --git a/third_party/rust/futures/tests/io_window.rs b/third_party/rust/futures/tests/io_window.rs
new file mode 100644
index 0000000000..98df69c83b
--- /dev/null
+++ b/third_party/rust/futures/tests/io_window.rs
@@ -0,0 +1,26 @@
+use futures::io::Window;
+
+#[test]
+fn set() {
+ let mut buffer = Window::new(&[1, 2, 3]);
+ buffer.set(..3);
+ buffer.set(3..3);
+ buffer.set(3..=2); // == 3..3
+ buffer.set(0..2);
+
+ assert_eq!(buffer.as_ref(), &[1, 2]);
+}
+
+#[test]
+#[should_panic]
+fn set_panic_out_of_bounds() {
+ let mut buffer = Window::new(&[1, 2, 3]);
+ buffer.set(2..4);
+}
+
+#[test]
+#[should_panic]
+fn set_panic_start_is_greater_than_end() {
+ let mut buffer = Window::new(&[1, 2, 3]);
+ buffer.set(3..2);
+}
diff --git a/third_party/rust/futures/tests/io_write.rs b/third_party/rust/futures/tests/io_write.rs
new file mode 100644
index 0000000000..b96344446c
--- /dev/null
+++ b/third_party/rust/futures/tests/io_write.rs
@@ -0,0 +1,70 @@
+use futures::io::AsyncWrite;
+use futures_test::task::panic_context;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+struct MockWriter {
+ fun: Box<dyn FnMut(&[u8]) -> Poll<io::Result<usize>>>,
+}
+
+impl MockWriter {
+ pub fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
+ MockWriter { fun: Box::new(fun) }
+ }
+}
+
+impl AsyncWrite for MockWriter {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ (self.get_mut().fun)(buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ panic!()
+ }
+
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ panic!()
+ }
+}
+
+/// Verifies that the default implementation of `poll_write_vectored`
+/// calls `poll_write` with an empty slice if no buffers are provided.
+#[test]
+fn write_vectored_no_buffers() {
+ let mut writer = MockWriter::new(|buf| {
+ assert_eq!(buf, b"");
+ Err(io::ErrorKind::BrokenPipe.into()).into()
+ });
+ let cx = &mut panic_context();
+ let bufs = &mut [];
+
+ let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs);
+ let res = res.map_err(|e| e.kind());
+ assert_eq!(res, Poll::Ready(Err(io::ErrorKind::BrokenPipe)))
+}
+
+/// Verifies that the default implementation of `poll_write_vectored`
+/// calls `poll_write` with the first non-empty buffer.
+#[test]
+fn write_vectored_first_non_empty() {
+ let mut writer = MockWriter::new(|buf| {
+ assert_eq!(buf, b"four");
+ Poll::Ready(Ok(4))
+ });
+ let cx = &mut panic_context();
+ let bufs = &mut [
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(&[]),
+ io::IoSlice::new(b"four")
+ ];
+
+ let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs);
+ let res = res.map_err(|e| e.kind());
+ assert_eq!(res, Poll::Ready(Ok(4)));
+}
+
diff --git a/third_party/rust/futures/tests/join_all.rs b/third_party/rust/futures/tests/join_all.rs
new file mode 100644
index 0000000000..63967bf987
--- /dev/null
+++ b/third_party/rust/futures/tests/join_all.rs
@@ -0,0 +1,43 @@
+use futures_util::future::*;
+use std::future::Future;
+use futures::executor::block_on;
+use std::fmt::Debug;
+
+fn assert_done<T, F>(actual_fut: F, expected: T)
+where
+ T: PartialEq + Debug,
+ F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
+{
+ let output = block_on(actual_fut());
+ assert_eq!(output, expected);
+}
+
+#[test]
+fn collect_collects() {
+ assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]);
+ assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]);
+ // REVIEW: should this be implemented?
+ // assert_done(|| Box::new(join_all(Vec::<i32>::new())), vec![]);
+
+ // TODO: needs more tests
+}
+
+#[test]
+fn join_all_iter_lifetime() {
+ // In futures-rs version 0.1, this function would fail to typecheck due to an overly
+ // conservative type parameterization of `JoinAll`.
+ fn sizes<'a>(bufs: Vec<&'a [u8]>) -> Box<dyn Future<Output = Vec<usize>> + Unpin> {
+ let iter = bufs.into_iter().map(|b| ready::<usize>(b.len()));
+ Box::new(join_all(iter))
+ }
+
+ assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3 as usize, 0, 1]);
+}
+
+#[test]
+fn join_all_from_iter() {
+ assert_done(
+ || Box::new(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>()),
+ vec![1, 2],
+ )
+}
diff --git a/third_party/rust/futures/tests/macro_comma_support.rs b/third_party/rust/futures/tests/macro_comma_support.rs
new file mode 100644
index 0000000000..111f65af4e
--- /dev/null
+++ b/third_party/rust/futures/tests/macro_comma_support.rs
@@ -0,0 +1,42 @@
+#[macro_use]
+extern crate futures;
+
+use futures::{
+ executor::block_on,
+ future::{self, FutureExt},
+ task::Poll,
+};
+
+#[test]
+fn ready() {
+ block_on(future::poll_fn(|_| {
+ ready!(Poll::Ready(()),);
+ Poll::Ready(())
+ }))
+}
+
+#[test]
+fn poll() {
+ block_on(async {
+ let _ = poll!(async {}.boxed(),);
+ })
+}
+
+#[test]
+fn join() {
+ block_on(async {
+ let future1 = async { 1 };
+ let future2 = async { 2 };
+ join!(future1, future2,);
+ })
+}
+
+#[test]
+fn try_join() {
+ block_on(async {
+ let future1 = async { 1 }.never_error();
+ let future2 = async { 2 }.never_error();
+ try_join!(future1, future2,)
+ })
+ .unwrap();
+}
diff --git a/third_party/rust/futures/tests/mutex.rs b/third_party/rust/futures/tests/mutex.rs
new file mode 100644
index 0000000000..bad53a9b8f
--- /dev/null
+++ b/third_party/rust/futures/tests/mutex.rs
@@ -0,0 +1,69 @@
+use futures::channel::mpsc;
+use futures::executor::block_on;
+use futures::future::{ready, FutureExt};
+use futures::lock::Mutex;
+use futures::stream::StreamExt;
+use futures::task::{Context, SpawnExt};
+use futures_test::future::FutureTestExt;
+use futures_test::task::{new_count_waker, panic_context};
+use std::sync::Arc;
+
+#[test]
+fn mutex_acquire_uncontested() {
+ let mutex = Mutex::new(());
+ for _ in 0..10 {
+ assert!(mutex.lock().poll_unpin(&mut panic_context()).is_ready());
+ }
+}
+
+#[test]
+fn mutex_wakes_waiters() {
+ let mutex = Mutex::new(());
+ let (waker, counter) = new_count_waker();
+ let lock = mutex.lock().poll_unpin(&mut panic_context());
+ assert!(lock.is_ready());
+
+ let mut cx = Context::from_waker(&waker);
+ let mut waiter = mutex.lock();
+ assert!(waiter.poll_unpin(&mut cx).is_pending());
+ assert_eq!(counter, 0);
+
+ drop(lock);
+
+ assert_eq!(counter, 1);
+ assert!(waiter.poll_unpin(&mut panic_context()).is_ready());
+}
+
+#[test]
+fn mutex_contested() {
+ let (tx, mut rx) = mpsc::unbounded();
+ let pool = futures::executor::ThreadPool::builder()
+ .pool_size(16)
+ .create()
+ .unwrap();
+
+ let tx = Arc::new(tx);
+ let mutex = Arc::new(Mutex::new(0));
+
+ let num_tasks = 1000;
+ for _ in 0..num_tasks {
+ let tx = tx.clone();
+ let mutex = mutex.clone();
+ pool.spawn(async move {
+ let mut lock = mutex.lock().await;
+ ready(()).pending_once().await;
+ *lock += 1;
+ tx.unbounded_send(()).unwrap();
+ drop(lock);
+ })
+ .unwrap();
+ }
+
+ block_on(async {
+ for _ in 0..num_tasks {
+ rx.next().await.unwrap();
+ }
+ let lock = mutex.lock().await;
+ assert_eq!(num_tasks, *lock);
+ })
+}
diff --git a/third_party/rust/futures/tests/object_safety.rs b/third_party/rust/futures/tests/object_safety.rs
new file mode 100644
index 0000000000..30c892f5e6
--- /dev/null
+++ b/third_party/rust/futures/tests/object_safety.rs
@@ -0,0 +1,49 @@
+fn assert_is_object_safe<T>() {}
+
+#[test]
+fn future() {
+ // `FutureExt`, `TryFutureExt` and `UnsafeFutureObj` are not object safe.
+ use futures::future::{FusedFuture, Future, TryFuture};
+
+ assert_is_object_safe::<&dyn Future<Output = ()>>();
+ assert_is_object_safe::<&dyn FusedFuture<Output = ()>>();
+ assert_is_object_safe::<&dyn TryFuture<Ok = (), Error = (), Output = Result<(), ()>>>();
+}
+
+#[test]
+fn stream() {
+ // `StreamExt` and `TryStreamExt` are not object safe.
+ use futures::stream::{FusedStream, Stream, TryStream};
+
+ assert_is_object_safe::<&dyn Stream<Item = ()>>();
+ assert_is_object_safe::<&dyn FusedStream<Item = ()>>();
+ assert_is_object_safe::<&dyn TryStream<Ok = (), Error = (), Item = Result<(), ()>>>();
+}
+
+#[test]
+fn sink() {
+ // `SinkExt` is not object safe.
+ use futures::sink::Sink;
+
+ assert_is_object_safe::<&dyn Sink<(), Error = ()>>();
+}
+
+#[test]
+fn io() {
+ // `AsyncReadExt`, `AsyncWriteExt`, `AsyncSeekExt` and `AsyncBufReadExt` are not object safe.
+ use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
+
+ assert_is_object_safe::<&dyn AsyncRead>();
+ assert_is_object_safe::<&dyn AsyncWrite>();
+ assert_is_object_safe::<&dyn AsyncSeek>();
+ assert_is_object_safe::<&dyn AsyncBufRead>();
+}
+
+#[test]
+fn task() {
+ // `ArcWake`, `SpawnExt` and `LocalSpawnExt` are not object safe.
+ use futures::task::{LocalSpawn, Spawn};
+
+ assert_is_object_safe::<&dyn Spawn>();
+ assert_is_object_safe::<&dyn LocalSpawn>();
+}
diff --git a/third_party/rust/futures/tests/oneshot.rs b/third_party/rust/futures/tests/oneshot.rs
new file mode 100644
index 0000000000..58951ec581
--- /dev/null
+++ b/third_party/rust/futures/tests/oneshot.rs
@@ -0,0 +1,66 @@
+use futures::channel::oneshot;
+use futures::future::{FutureExt, TryFutureExt};
+use futures_test::future::FutureTestExt;
+use std::sync::mpsc;
+use std::thread;
+
+#[test]
+fn oneshot_send1() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (tx2, rx2) = mpsc::channel();
+
+ let t = thread::spawn(|| tx1.send(1).unwrap());
+ rx1.map_ok(move |x| tx2.send(x)).run_in_background();
+ assert_eq!(1, rx2.recv().unwrap());
+ t.join().unwrap();
+}
+
+#[test]
+fn oneshot_send2() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (tx2, rx2) = mpsc::channel();
+
+ thread::spawn(|| tx1.send(1).unwrap()).join().unwrap();
+ rx1.map_ok(move |x| tx2.send(x).unwrap()).run_in_background();
+ assert_eq!(1, rx2.recv().unwrap());
+}
+
+#[test]
+fn oneshot_send3() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (tx2, rx2) = mpsc::channel();
+
+ rx1.map_ok(move |x| tx2.send(x).unwrap()).run_in_background();
+ thread::spawn(|| tx1.send(1).unwrap()).join().unwrap();
+ assert_eq!(1, rx2.recv().unwrap());
+}
+
+#[test]
+fn oneshot_drop_tx1() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (tx2, rx2) = mpsc::channel();
+
+ drop(tx1);
+ rx1.map(move |result| tx2.send(result).unwrap()).run_in_background();
+
+ assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap());
+}
+
+#[test]
+fn oneshot_drop_tx2() {
+ let (tx1, rx1) = oneshot::channel::<i32>();
+ let (tx2, rx2) = mpsc::channel();
+
+ let t = thread::spawn(|| drop(tx1));
+ rx1.map(move |result| tx2.send(result).unwrap()).run_in_background();
+ t.join().unwrap();
+
+ assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap());
+}
+
+#[test]
+fn oneshot_drop_rx() {
+ let (tx, rx) = oneshot::channel::<i32>();
+ drop(rx);
+ assert_eq!(Err(2), tx.send(2));
+}
diff --git a/third_party/rust/futures/tests/ready_queue.rs b/third_party/rust/futures/tests/ready_queue.rs
new file mode 100644
index 0000000000..15a0befda8
--- /dev/null
+++ b/third_party/rust/futures/tests/ready_queue.rs
@@ -0,0 +1,151 @@
+use futures::channel::oneshot;
+use futures::executor::{block_on, block_on_stream};
+use futures::future;
+use futures::stream::{FuturesUnordered, StreamExt};
+use futures::task::Poll;
+use futures_test::task::noop_context;
+use std::panic::{self, AssertUnwindSafe};
+use std::sync::{Arc, Barrier};
+use std::thread;
+
+trait AssertSendSync: Send + Sync {}
+impl AssertSendSync for FuturesUnordered<()> {}
+
+#[test]
+fn basic_usage() {
+ block_on(future::lazy(move |cx| {
+ let mut queue = FuturesUnordered::new();
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
+ let (tx3, rx3) = oneshot::channel();
+
+ queue.push(rx1);
+ queue.push(rx2);
+ queue.push(rx3);
+
+ assert!(!queue.poll_next_unpin(cx).is_ready());
+
+ tx2.send("hello").unwrap();
+
+ assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx));
+ assert!(!queue.poll_next_unpin(cx).is_ready());
+
+ tx1.send("world").unwrap();
+ tx3.send("world2").unwrap();
+
+ assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx));
+ assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
+ assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
+ }));
+}
+
+#[test]
+fn resolving_errors() {
+ block_on(future::lazy(move |cx| {
+ let mut queue = FuturesUnordered::new();
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
+ let (tx3, rx3) = oneshot::channel();
+
+ queue.push(rx1);
+ queue.push(rx2);
+ queue.push(rx3);
+
+ assert!(!queue.poll_next_unpin(cx).is_ready());
+
+ drop(tx2);
+
+ assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
+ assert!(!queue.poll_next_unpin(cx).is_ready());
+
+ drop(tx1);
+ tx3.send("world2").unwrap();
+
+ assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
+ assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
+ assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
+ }));
+}
+
+#[test]
+fn dropping_ready_queue() {
+ block_on(future::lazy(move |_| {
+ let queue = FuturesUnordered::new();
+ let (mut tx1, rx1) = oneshot::channel::<()>();
+ let (mut tx2, rx2) = oneshot::channel::<()>();
+ let (mut tx3, rx3) = oneshot::channel::<()>();
+
+ queue.push(rx1);
+ queue.push(rx2);
+ queue.push(rx3);
+
+ {
+ let cx = &mut noop_context();
+ assert!(!tx1.poll_canceled(cx).is_ready());
+ assert!(!tx2.poll_canceled(cx).is_ready());
+ assert!(!tx3.poll_canceled(cx).is_ready());
+
+ drop(queue);
+
+ assert!(tx1.poll_canceled(cx).is_ready());
+ assert!(tx2.poll_canceled(cx).is_ready());
+ assert!(tx3.poll_canceled(cx).is_ready());
+ }
+ }));
+}
+
+#[test]
+fn stress() {
+ const ITER: usize = 300;
+
+ for i in 0..ITER {
+ let n = (i % 10) + 1;
+
+ let mut queue = FuturesUnordered::new();
+
+ for _ in 0..5 {
+ let barrier = Arc::new(Barrier::new(n + 1));
+
+ for num in 0..n {
+ let barrier = barrier.clone();
+ let (tx, rx) = oneshot::channel();
+
+ queue.push(rx);
+
+ thread::spawn(move || {
+ barrier.wait();
+ tx.send(num).unwrap();
+ });
+ }
+
+ barrier.wait();
+
+ let mut sync = block_on_stream(queue);
+
+ let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect();
+
+ assert_eq!(rx.len(), n);
+
+ rx.sort();
+
+ for (i, x) in rx.into_iter().enumerate() {
+ assert_eq!(i, x);
+ }
+
+ queue = sync.into_inner();
+ }
+ }
+}
+
+#[test]
+fn panicking_future_dropped() {
+ block_on(future::lazy(move |cx| {
+ let mut queue = FuturesUnordered::new();
+ queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() }));
+
+ let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx)));
+ assert!(r.is_err());
+ assert!(queue.is_empty());
+ assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
+ }));
+}
diff --git a/third_party/rust/futures/tests/recurse.rs b/third_party/rust/futures/tests/recurse.rs
new file mode 100644
index 0000000000..2920a41a59
--- /dev/null
+++ b/third_party/rust/futures/tests/recurse.rs
@@ -0,0 +1,22 @@
+use futures::executor::block_on;
+use futures::future::{self, FutureExt, BoxFuture};
+use std::sync::mpsc;
+use std::thread;
+
+#[test]
+fn lots() {
+ fn do_it(input: (i32, i32)) -> BoxFuture<'static, i32> {
+ let (n, x) = input;
+ if n == 0 {
+ future::ready(x).boxed()
+ } else {
+ future::ready((n - 1, x + n)).then(do_it).boxed()
+ }
+ }
+
+ let (tx, rx) = mpsc::channel();
+ thread::spawn(|| {
+ block_on(do_it((1_000, 0)).map(move |x| tx.send(x).unwrap()))
+ });
+ assert_eq!(500_500, rx.recv().unwrap());
+}
diff --git a/third_party/rust/futures/tests/select_all.rs b/third_party/rust/futures/tests/select_all.rs
new file mode 100644
index 0000000000..aad977d351
--- /dev/null
+++ b/third_party/rust/futures/tests/select_all.rs
@@ -0,0 +1,29 @@
+use futures::executor::block_on;
+use futures::future::{ready, select_all};
+use std::collections::HashSet;
+
+#[test]
+fn smoke() {
+ let v = vec![
+ ready(1),
+ ready(2),
+ ready(3),
+ ];
+
+ let mut c = vec![1, 2, 3].into_iter().collect::<HashSet<_>>();
+
+ let (i, idx, v) = block_on(select_all(v));
+ assert!(c.remove(&i));
+ assert_eq!(idx, 0);
+
+ let (i, idx, v) = block_on(select_all(v));
+ assert!(c.remove(&i));
+ assert_eq!(idx, 0);
+
+ let (i, idx, v) = block_on(select_all(v));
+ assert!(c.remove(&i));
+ assert_eq!(idx, 0);
+
+ assert!(c.is_empty());
+ assert!(v.is_empty());
+}
diff --git a/third_party/rust/futures/tests/select_ok.rs b/third_party/rust/futures/tests/select_ok.rs
new file mode 100644
index 0000000000..db88a95c7a
--- /dev/null
+++ b/third_party/rust/futures/tests/select_ok.rs
@@ -0,0 +1,39 @@
+use futures::executor::block_on;
+use futures::future::{err, ok, select_ok};
+
+#[test]
+fn ignore_err() {
+ let v = vec![
+ err(1),
+ err(2),
+ ok(3),
+ ok(4),
+ ];
+
+ let (i, v) = block_on(select_ok(v)).ok().unwrap();
+ assert_eq!(i, 3);
+
+ assert_eq!(v.len(), 1);
+
+ let (i, v) = block_on(select_ok(v)).ok().unwrap();
+ assert_eq!(i, 4);
+
+ assert!(v.is_empty());
+}
+
+#[test]
+fn last_err() {
+ let v = vec![
+ ok(1),
+ err(2),
+ err(3),
+ ];
+
+ let (i, v) = block_on(select_ok(v)).ok().unwrap();
+ assert_eq!(i, 1);
+
+ assert_eq!(v.len(), 2);
+
+ let i = block_on(select_ok(v)).err().unwrap();
+ assert_eq!(i, 3);
+}
diff --git a/third_party/rust/futures/tests/shared.rs b/third_party/rust/futures/tests/shared.rs
new file mode 100644
index 0000000000..8402bfe10b
--- /dev/null
+++ b/third_party/rust/futures/tests/shared.rs
@@ -0,0 +1,151 @@
+use futures::channel::oneshot;
+use futures::executor::{block_on, LocalPool};
+use futures::future::{self, FutureExt, TryFutureExt, LocalFutureObj};
+use futures::task::LocalSpawn;
+use std::cell::{Cell, RefCell};
+use std::rc::Rc;
+use std::thread;
+
+fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) {
+ let (tx, rx) = oneshot::channel::<i32>();
+ let f = rx.shared();
+ let join_handles = (0..threads_number)
+ .map(|_| {
+ let cloned_future = f.clone();
+ thread::spawn(move || {
+ assert_eq!(block_on(cloned_future).unwrap(), 6);
+ })
+ })
+ .collect::<Vec<_>>();
+
+ tx.send(6).unwrap();
+
+ assert_eq!(block_on(f).unwrap(), 6);
+ for join_handle in join_handles {
+ join_handle.join().unwrap();
+ }
+}
+
+#[test]
+fn one_thread() {
+ send_shared_oneshot_and_wait_on_multiple_threads(1);
+}
+
+#[test]
+fn two_threads() {
+ send_shared_oneshot_and_wait_on_multiple_threads(2);
+}
+
+#[test]
+fn many_threads() {
+ send_shared_oneshot_and_wait_on_multiple_threads(1000);
+}
+
+#[test]
+fn drop_on_one_task_ok() {
+ let (tx, rx) = oneshot::channel::<u32>();
+ let f1 = rx.shared();
+ let f2 = f1.clone();
+
+ let (tx2, rx2) = oneshot::channel::<u32>();
+
+ let t1 = thread::spawn(|| {
+ let f = future::try_select(f1.map_err(|_| ()), rx2.map_err(|_| ()));
+ drop(block_on(f));
+ });
+
+ let (tx3, rx3) = oneshot::channel::<u32>();
+
+ let t2 = thread::spawn(|| {
+ let _ = block_on(f2.map_ok(|x| tx3.send(x).unwrap()).map_err(|_| ()));
+ });
+
+ tx2.send(11).unwrap(); // cancel `f1`
+ t1.join().unwrap();
+
+ tx.send(42).unwrap(); // Should cause `f2` and then `rx3` to get resolved.
+ let result = block_on(rx3).unwrap();
+ assert_eq!(result, 42);
+ t2.join().unwrap();
+}
+
+#[test]
+fn drop_in_poll() {
+ let slot1 = Rc::new(RefCell::new(None));
+ let slot2 = slot1.clone();
+
+ let future1 = future::lazy(move |_| {
+ slot2.replace(None); // Drop future
+ 1
+ }).shared();
+
+ let future2 = LocalFutureObj::new(Box::new(future1.clone()));
+ slot1.replace(Some(future2));
+
+ assert_eq!(block_on(future1), 1);
+}
+
+#[test]
+fn peek() {
+ let mut local_pool = LocalPool::new();
+ let spawn = &mut local_pool.spawner();
+
+ let (tx0, rx0) = oneshot::channel::<i32>();
+ let f1 = rx0.shared();
+ let f2 = f1.clone();
+
+ // Repeated calls on the original or clone do not change the outcome.
+ for _ in 0..2 {
+ assert!(f1.peek().is_none());
+ assert!(f2.peek().is_none());
+ }
+
+ // Completing the underlying future has no effect, because the value has not been `poll`ed in.
+ tx0.send(42).unwrap();
+ for _ in 0..2 {
+ assert!(f1.peek().is_none());
+ assert!(f2.peek().is_none());
+ }
+
+ // Once the Shared has been polled, the value is peekable on the clone.
+ spawn.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))).unwrap();
+ local_pool.run();
+ for _ in 0..2 {
+ assert_eq!(*f2.peek().unwrap(), Ok(42));
+ }
+}
+
+struct CountClone(Rc<Cell<i32>>);
+
+impl Clone for CountClone {
+ fn clone(&self) -> Self {
+ self.0.set(self.0.get() + 1);
+ CountClone(self.0.clone())
+ }
+}
+
+#[test]
+fn dont_clone_in_single_owner_shared_future() {
+ let counter = CountClone(Rc::new(Cell::new(0)));
+ let (tx, rx) = oneshot::channel();
+
+ let rx = rx.shared();
+
+ tx.send(counter).ok().unwrap();
+
+ assert_eq!(block_on(rx).unwrap().0.get(), 0);
+}
+
+#[test]
+fn dont_do_unnecessary_clones_on_output() {
+ let counter = CountClone(Rc::new(Cell::new(0)));
+ let (tx, rx) = oneshot::channel();
+
+ let rx = rx.shared();
+
+ tx.send(counter).ok().unwrap();
+
+ assert_eq!(block_on(rx.clone()).unwrap().0.get(), 1);
+ assert_eq!(block_on(rx.clone()).unwrap().0.get(), 2);
+ assert_eq!(block_on(rx).unwrap().0.get(), 2);
+}
diff --git a/third_party/rust/futures/tests/sink.rs b/third_party/rust/futures/tests/sink.rs
new file mode 100644
index 0000000000..f967e1b643
--- /dev/null
+++ b/third_party/rust/futures/tests/sink.rs
@@ -0,0 +1,516 @@
+use futures::channel::{mpsc, oneshot};
+use futures::executor::block_on;
+use futures::future::{self, Future, FutureExt, TryFutureExt};
+use futures::never::Never;
+use futures::ready;
+use futures::sink::{Sink, SinkErrInto, SinkExt};
+use futures::stream::{self, Stream, StreamExt};
+use futures::task::{self, ArcWake, Context, Poll, Waker};
+use futures_test::task::panic_context;
+use std::cell::{Cell, RefCell};
+use std::collections::VecDeque;
+use std::fmt;
+use std::mem;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+
+fn sassert_next<S>(s: &mut S, item: S::Item)
+where
+ S: Stream + Unpin,
+ S::Item: Eq + fmt::Debug,
+{
+ match s.poll_next_unpin(&mut panic_context()) {
+ Poll::Ready(None) => panic!("stream is at its end"),
+ Poll::Ready(Some(e)) => assert_eq!(e, item),
+ Poll::Pending => panic!("stream wasn't ready"),
+ }
+}
+
+fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T {
+ match x {
+ Poll::Ready(Ok(x)) => x,
+ Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"),
+ Poll::Pending => panic!("Poll::Pending"),
+ }
+}
+
+#[test]
+fn either_sink() {
+ let mut s = if true {
+ Vec::<i32>::new().left_sink()
+ } else {
+ VecDeque::<i32>::new().right_sink()
+ };
+
+ Pin::new(&mut s).start_send(0).unwrap();
+}
+
+#[test]
+fn vec_sink() {
+ let mut v = Vec::new();
+ Pin::new(&mut v).start_send(0).unwrap();
+ Pin::new(&mut v).start_send(1).unwrap();
+ assert_eq!(v, vec![0, 1]);
+ block_on(v.flush()).unwrap();
+ assert_eq!(v, vec![0, 1]);
+}
+
+#[test]
+fn vecdeque_sink() {
+ let mut deque = VecDeque::new();
+ Pin::new(&mut deque).start_send(2).unwrap();
+ Pin::new(&mut deque).start_send(3).unwrap();
+
+ assert_eq!(deque.pop_front(), Some(2));
+ assert_eq!(deque.pop_front(), Some(3));
+ assert_eq!(deque.pop_front(), None);
+}
+
+#[test]
+fn send() {
+ let mut v = Vec::new();
+
+ block_on(v.send(0)).unwrap();
+ assert_eq!(v, vec![0]);
+
+ block_on(v.send(1)).unwrap();
+ assert_eq!(v, vec![0, 1]);
+
+ block_on(v.send(2)).unwrap();
+ assert_eq!(v, vec![0, 1, 2]);
+}
+
+#[test]
+fn send_all() {
+ let mut v = Vec::new();
+
+ block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap();
+ assert_eq!(v, vec![0, 1]);
+
+ block_on(v.send_all(&mut stream::iter(vec![2, 3]).map(Ok))).unwrap();
+ assert_eq!(v, vec![0, 1, 2, 3]);
+
+ block_on(v.send_all(&mut stream::iter(vec![4, 5]).map(Ok))).unwrap();
+ assert_eq!(v, vec![0, 1, 2, 3, 4, 5]);
+}
+
+// An Unpark struct that records unpark events for inspection
+struct Flag(AtomicBool);
+
+impl Flag {
+ fn new() -> Arc<Self> {
+ Arc::new(Self(AtomicBool::new(false)))
+ }
+
+ fn take(&self) -> bool {
+ self.0.swap(false, Ordering::SeqCst)
+ }
+
+ fn set(&self, v: bool) {
+ self.0.store(v, Ordering::SeqCst)
+ }
+}
+
+impl ArcWake for Flag {
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.set(true)
+ }
+}
+
+fn flag_cx<F, R>(f: F) -> R
+where
+ F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,
+{
+ let flag = Flag::new();
+ let waker = task::waker_ref(&flag);
+ let cx = &mut Context::from_waker(&waker);
+ f(flag.clone(), cx)
+}
+
+// Sends a value on an i32 channel sink
+struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>);
+
+impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> {
+ fn new(sink: S, item: Item) -> Self {
+ Self(Some(sink), Some(item))
+ }
+}
+
+impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> {
+ type Output = Result<S, S::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let Self(inner, item) = self.get_mut();
+ {
+ let mut inner = inner.as_mut().unwrap();
+ ready!(Pin::new(&mut inner).poll_ready(cx))?;
+ Pin::new(&mut inner).start_send(item.take().unwrap())?;
+ }
+ Poll::Ready(Ok(inner.take().unwrap()))
+ }
+}
+
+// Test that `start_send` on an `mpsc` channel does indeed block when the
+// channel is full
+#[test]
+fn mpsc_blocking_start_send() {
+ let (mut tx, mut rx) = mpsc::channel::<i32>(0);
+
+ block_on(future::lazy(|_| {
+ tx.start_send(0).unwrap();
+
+ flag_cx(|flag, cx| {
+ let mut task = StartSendFut::new(tx, 1);
+
+ assert!(task.poll_unpin(cx).is_pending());
+ assert!(!flag.take());
+ sassert_next(&mut rx, 0);
+ assert!(flag.take());
+ unwrap(task.poll_unpin(cx));
+ assert!(!flag.take());
+ sassert_next(&mut rx, 1);
+ })
+ }));
+}
+
+// test `flush` by using `with` to make the first insertion into a sink block
+// until a oneshot is completed
+#[test]
+fn with_flush() {
+ let (tx, rx) = oneshot::channel();
+ let mut block = rx.boxed();
+ let mut sink = Vec::new().with(|elem| {
+ mem::replace(&mut block, future::ok(()).boxed())
+ .map_ok(move |()| elem + 1)
+ .map_err(|_| -> Never { panic!() })
+ });
+
+ assert_eq!(Pin::new(&mut sink).start_send(0).ok(), Some(()));
+
+ flag_cx(|flag, cx| {
+ let mut task = sink.flush();
+ assert!(task.poll_unpin(cx).is_pending());
+ tx.send(()).unwrap();
+ assert!(flag.take());
+
+ unwrap(task.poll_unpin(cx));
+
+ block_on(sink.send(1)).unwrap();
+ assert_eq!(sink.get_ref(), &[1, 2]);
+ })
+}
+
+// test simple use of with to change data
+#[test]
+fn with_as_map() {
+ let mut sink = Vec::new().with(|item| future::ok::<i32, Never>(item * 2));
+ block_on(sink.send(0)).unwrap();
+ block_on(sink.send(1)).unwrap();
+ block_on(sink.send(2)).unwrap();
+ assert_eq!(sink.get_ref(), &[0, 2, 4]);
+}
+
+// test simple use of with_flat_map
+#[test]
+fn with_flat_map() {
+ let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok));
+ block_on(sink.send(0)).unwrap();
+ block_on(sink.send(1)).unwrap();
+ block_on(sink.send(2)).unwrap();
+ block_on(sink.send(3)).unwrap();
+ assert_eq!(sink.get_ref(), &[1, 2, 2, 3, 3, 3]);
+}
+
+// Check that `with` propagates `poll_ready` to the inner sink.
+// Regression test for the issue #1834.
+#[test]
+fn with_propagates_poll_ready() {
+ let (tx, mut rx) = mpsc::channel::<i32>(0);
+ let mut tx = tx.with(|item: i32| future::ok::<i32, mpsc::SendError>(item + 10));
+
+ block_on(future::lazy(|_| {
+ flag_cx(|flag, cx| {
+ let mut tx = Pin::new(&mut tx);
+
+ // Should be ready for the first item.
+ assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
+ assert_eq!(tx.as_mut().start_send(0), Ok(()));
+
+ // Should be ready for the second item only after the first one is received.
+ assert_eq!(tx.as_mut().poll_ready(cx), Poll::Pending);
+ assert!(!flag.take());
+ sassert_next(&mut rx, 10);
+ assert!(flag.take());
+ assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
+ assert_eq!(tx.as_mut().start_send(1), Ok(()));
+ })
+ }));
+}
+
+// Immediately accepts all requests to start pushing, but completion is managed
+// by manually flushing
+struct ManualFlush<T: Unpin> {
+ data: Vec<T>,
+ waiting_tasks: Vec<Waker>,
+}
+
+impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> {
+ type Error = ();
+
+ fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> {
+ if let Some(item) = item {
+ self.data.push(item);
+ } else {
+ self.force_flush();
+ }
+ Ok(())
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if self.data.is_empty() {
+ Poll::Ready(Ok(()))
+ } else {
+ self.waiting_tasks.push(cx.waker().clone());
+ Poll::Pending
+ }
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.poll_flush(cx)
+ }
+}
+
+impl<T: Unpin> ManualFlush<T> {
+ fn new() -> Self {
+ Self {
+ data: Vec::new(),
+ waiting_tasks: Vec::new(),
+ }
+ }
+
+ fn force_flush(&mut self) -> Vec<T> {
+ for task in self.waiting_tasks.drain(..) {
+ task.wake()
+ }
+ mem::replace(&mut self.data, Vec::new())
+ }
+}
+
+// test that the `with` sink doesn't require the underlying sink to flush,
+// but doesn't claim to be flushed until the underlying sink is
+#[test]
+fn with_flush_propagate() {
+ let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>);
+ flag_cx(|flag, cx| {
+ unwrap(Pin::new(&mut sink).poll_ready(cx));
+ Pin::new(&mut sink).start_send(Some(0)).unwrap();
+ unwrap(Pin::new(&mut sink).poll_ready(cx));
+ Pin::new(&mut sink).start_send(Some(1)).unwrap();
+
+ {
+ let mut task = sink.flush();
+ assert!(task.poll_unpin(cx).is_pending());
+ assert!(!flag.take());
+ }
+ assert_eq!(sink.get_mut().force_flush(), vec![0, 1]);
+ assert!(flag.take());
+ unwrap(sink.flush().poll_unpin(cx));
+ })
+}
+
+// test that a buffer is a no-nop around a sink that always accepts sends
+#[test]
+fn buffer_noop() {
+ let mut sink = Vec::new().buffer(0);
+ block_on(sink.send(0)).unwrap();
+ block_on(sink.send(1)).unwrap();
+ assert_eq!(sink.get_ref(), &[0, 1]);
+
+ let mut sink = Vec::new().buffer(1);
+ block_on(sink.send(0)).unwrap();
+ block_on(sink.send(1)).unwrap();
+ assert_eq!(sink.get_ref(), &[0, 1]);
+}
+
+struct ManualAllow<T: Unpin> {
+ data: Vec<T>,
+ allow: Rc<Allow>,
+}
+
+struct Allow {
+ flag: Cell<bool>,
+ tasks: RefCell<Vec<Waker>>,
+}
+
+impl Allow {
+ fn new() -> Self {
+ Self {
+ flag: Cell::new(false),
+ tasks: RefCell::new(Vec::new()),
+ }
+ }
+
+ fn check(&self, cx: &mut Context<'_>) -> bool {
+ if self.flag.get() {
+ true
+ } else {
+ self.tasks.borrow_mut().push(cx.waker().clone());
+ false
+ }
+ }
+
+ fn start(&self) {
+ self.flag.set(true);
+ let mut tasks = self.tasks.borrow_mut();
+ for task in tasks.drain(..) {
+ task.wake();
+ }
+ }
+}
+
+impl<T: Unpin> Sink<T> for ManualAllow<T> {
+ type Error = ();
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if self.allow.check(cx) {
+ Poll::Ready(Ok(()))
+ } else {
+ Poll::Pending
+ }
+ }
+
+ fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
+ self.data.push(item);
+ Ok(())
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) {
+ let allow = Rc::new(Allow::new());
+ let manual_allow = ManualAllow {
+ data: Vec::new(),
+ allow: allow.clone(),
+ };
+ (manual_allow, allow)
+}
+
+// test basic buffer functionality, including both filling up to capacity,
+// and writing out when the underlying sink is ready
+#[test]
+fn buffer() {
+ let (sink, allow) = manual_allow::<i32>();
+ let sink = sink.buffer(2);
+
+ let sink = block_on(StartSendFut::new(sink, 0)).unwrap();
+ let mut sink = block_on(StartSendFut::new(sink, 1)).unwrap();
+
+ flag_cx(|flag, cx| {
+ let mut task = sink.send(2);
+ assert!(task.poll_unpin(cx).is_pending());
+ assert!(!flag.take());
+ allow.start();
+ assert!(flag.take());
+ unwrap(task.poll_unpin(cx));
+ assert_eq!(sink.get_ref().data, vec![0, 1, 2]);
+ })
+}
+
+#[test]
+fn fanout_smoke() {
+ let sink1 = Vec::new();
+ let sink2 = Vec::new();
+ let mut sink = sink1.fanout(sink2);
+ block_on(sink.send_all(&mut stream::iter(vec![1, 2, 3]).map(Ok))).unwrap();
+ let (sink1, sink2) = sink.into_inner();
+ assert_eq!(sink1, vec![1, 2, 3]);
+ assert_eq!(sink2, vec![1, 2, 3]);
+}
+
+#[test]
+fn fanout_backpressure() {
+ let (left_send, mut left_recv) = mpsc::channel(0);
+ let (right_send, mut right_recv) = mpsc::channel(0);
+ let sink = left_send.fanout(right_send);
+
+ let mut sink = block_on(StartSendFut::new(sink, 0)).unwrap();
+
+ flag_cx(|flag, cx| {
+ let mut task = sink.send(2);
+ assert!(!flag.take());
+ assert!(task.poll_unpin(cx).is_pending());
+ assert_eq!(block_on(left_recv.next()), Some(0));
+ assert!(flag.take());
+ assert!(task.poll_unpin(cx).is_pending());
+ assert_eq!(block_on(right_recv.next()), Some(0));
+ assert!(flag.take());
+
+ assert!(task.poll_unpin(cx).is_pending());
+ assert_eq!(block_on(left_recv.next()), Some(2));
+ assert!(flag.take());
+ assert!(task.poll_unpin(cx).is_pending());
+ assert_eq!(block_on(right_recv.next()), Some(2));
+ assert!(flag.take());
+
+ unwrap(task.poll_unpin(cx));
+ // make sure receivers live until end of test to prevent send errors
+ drop(left_recv);
+ drop(right_recv);
+ })
+}
+
+#[test]
+fn sink_map_err() {
+ {
+ let cx = &mut panic_context();
+ let (tx, _rx) = mpsc::channel(1);
+ let mut tx = tx.sink_map_err(|_| ());
+ assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
+ assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(())));
+ }
+
+ let tx = mpsc::channel(0).0;
+ assert_eq!(
+ Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()),
+ Err(())
+ );
+}
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
+struct ErrIntoTest;
+
+impl From<mpsc::SendError> for ErrIntoTest {
+ fn from(_: mpsc::SendError) -> Self {
+ Self
+ }
+}
+
+#[test]
+fn err_into() {
+ {
+ let cx = &mut panic_context();
+ let (tx, _rx) = mpsc::channel(1);
+ let mut tx: SinkErrInto<mpsc::Sender<()>, _, ErrIntoTest> = tx.sink_err_into();
+ assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
+ assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(())));
+ }
+
+ let tx = mpsc::channel(0).0;
+ assert_eq!(
+ Pin::new(&mut tx.sink_err_into()).start_send(()),
+ Err(ErrIntoTest)
+ );
+}
diff --git a/third_party/rust/futures/tests/sink_fanout.rs b/third_party/rust/futures/tests/sink_fanout.rs
new file mode 100644
index 0000000000..e57b2d8c7b
--- /dev/null
+++ b/third_party/rust/futures/tests/sink_fanout.rs
@@ -0,0 +1,24 @@
+use futures::channel::mpsc;
+use futures::executor::block_on;
+use futures::future::join3;
+use futures::sink::SinkExt;
+use futures::stream::{self, StreamExt};
+
+#[test]
+fn it_works() {
+ let (tx1, rx1) = mpsc::channel(1);
+ let (tx2, rx2) = mpsc::channel(2);
+ let tx = tx1.fanout(tx2).sink_map_err(|_| ());
+
+ let src = stream::iter((0..10).map(Ok));
+ let fwd = src.forward(tx);
+
+ let collect_fut1 = rx1.collect::<Vec<_>>();
+ let collect_fut2 = rx2.collect::<Vec<_>>();
+ let (_, vec1, vec2) = block_on(join3(fwd, collect_fut1, collect_fut2));
+
+ let expected = (0..10).collect::<Vec<_>>();
+
+ assert_eq!(vec1, expected);
+ assert_eq!(vec2, expected);
+}
diff --git a/third_party/rust/futures/tests/split.rs b/third_party/rust/futures/tests/split.rs
new file mode 100644
index 0000000000..9f4f1a07e2
--- /dev/null
+++ b/third_party/rust/futures/tests/split.rs
@@ -0,0 +1,77 @@
+use futures::executor::block_on;
+use futures::sink::{Sink, SinkExt};
+use futures::stream::{self, Stream, StreamExt};
+use futures::task::{Context, Poll};
+use pin_utils::unsafe_pinned;
+use std::pin::Pin;
+
+struct Join<T, U> {
+ stream: T,
+ sink: U
+}
+
+impl<T, U> Join<T, U> {
+ unsafe_pinned!(stream: T);
+ unsafe_pinned!(sink: U);
+}
+
+impl<T: Stream, U> Stream for Join<T, U> {
+ type Item = T::Item;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<T::Item>> {
+ self.stream().poll_next(cx)
+ }
+}
+
+impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> {
+ type Error = U::Error;
+
+ fn poll_ready(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ self.sink().poll_ready(cx)
+ }
+
+ fn start_send(
+ self: Pin<&mut Self>,
+ item: Item,
+ ) -> Result<(), Self::Error> {
+ self.sink().start_send(item)
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ self.sink().poll_flush(cx)
+ }
+
+ fn poll_close(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ self.sink().poll_close(cx)
+ }
+}
+
+#[test]
+fn test_split() {
+ let mut dest: Vec<i32> = Vec::new();
+ {
+ let join = Join {
+ stream: stream::iter(vec![10, 20, 30]),
+ sink: &mut dest
+ };
+
+ let (sink, stream) = join.split();
+ let join = sink.reunite(stream).expect("test_split: reunite error");
+ let (mut sink, stream) = join.split();
+ let mut stream = stream.map(Ok);
+ block_on(sink.send_all(&mut stream)).unwrap();
+ }
+ assert_eq!(dest, vec![10, 20, 30]);
+}
diff --git a/third_party/rust/futures/tests/stream.rs b/third_party/rust/futures/tests/stream.rs
new file mode 100644
index 0000000000..fd6a8b6da7
--- /dev/null
+++ b/third_party/rust/futures/tests/stream.rs
@@ -0,0 +1,32 @@
+use futures::executor::block_on;
+use futures::stream::{self, StreamExt};
+
+#[test]
+fn select() {
+ fn select_and_compare(a: Vec<u32>, b: Vec<u32>, expected: Vec<u32>) {
+ let a = stream::iter(a);
+ let b = stream::iter(b);
+ let vec = block_on(stream::select(a, b).collect::<Vec<_>>());
+ assert_eq!(vec, expected);
+ }
+
+ select_and_compare(vec![1, 2, 3], vec![4, 5, 6], vec![1, 4, 2, 5, 3, 6]);
+ select_and_compare(vec![1, 2, 3], vec![4, 5], vec![1, 4, 2, 5, 3]);
+ select_and_compare(vec![1, 2], vec![4, 5, 6], vec![1, 4, 2, 5, 6]);
+}
+
+#[test]
+fn scan() {
+ futures::executor::block_on(async {
+ assert_eq!(
+ stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2])
+ .scan(1, |acc, e| {
+ *acc += 1;
+ futures::future::ready(if e < *acc { Some(e) } else { None })
+ })
+ .collect::<Vec<_>>()
+ .await,
+ vec![1u8, 2, 3, 4]
+ );
+ });
+}
diff --git a/third_party/rust/futures/tests/stream_catch_unwind.rs b/third_party/rust/futures/tests/stream_catch_unwind.rs
new file mode 100644
index 0000000000..8b23a0a7ef
--- /dev/null
+++ b/third_party/rust/futures/tests/stream_catch_unwind.rs
@@ -0,0 +1,27 @@
+use futures::executor::block_on_stream;
+use futures::stream::{self, StreamExt};
+
+#[test]
+fn panic_in_the_middle_of_the_stream() {
+ let stream = stream::iter(vec![Some(10), None, Some(11)]);
+
+ // panic on second element
+ let stream_panicking = stream.map(|o| o.unwrap());
+ let mut iter = block_on_stream(stream_panicking.catch_unwind());
+
+ assert_eq!(10, iter.next().unwrap().ok().unwrap());
+ assert!(iter.next().unwrap().is_err());
+ assert!(iter.next().is_none());
+}
+
+#[test]
+fn no_panic() {
+ let stream = stream::iter(vec![10, 11, 12]);
+
+ let mut iter = block_on_stream(stream.catch_unwind());
+
+ assert_eq!(10, iter.next().unwrap().ok().unwrap());
+ assert_eq!(11, iter.next().unwrap().ok().unwrap());
+ assert_eq!(12, iter.next().unwrap().ok().unwrap());
+ assert!(iter.next().is_none());
+}
diff --git a/third_party/rust/futures/tests/stream_into_async_read.rs b/third_party/rust/futures/tests/stream_into_async_read.rs
new file mode 100644
index 0000000000..c528af03f0
--- /dev/null
+++ b/third_party/rust/futures/tests/stream_into_async_read.rs
@@ -0,0 +1,96 @@
+use core::pin::Pin;
+use futures::io::{AsyncRead, AsyncBufRead};
+use futures::stream::{self, TryStreamExt};
+use futures::task::Poll;
+use futures_test::{task::noop_context, stream::StreamTestExt};
+
+macro_rules! assert_read {
+ ($reader:expr, $buf:expr, $item:expr) => {
+ let mut cx = noop_context();
+ loop {
+ match Pin::new(&mut $reader).poll_read(&mut cx, $buf) {
+ Poll::Ready(Ok(x)) => {
+ assert_eq!(x, $item);
+ break;
+ }
+ Poll::Ready(Err(err)) => {
+ panic!("assertion failed: expected value but got {}", err);
+ }
+ Poll::Pending => {
+ continue;
+ }
+ }
+ }
+ };
+}
+
+macro_rules! assert_fill_buf {
+ ($reader:expr, $buf:expr) => {
+ let mut cx = noop_context();
+ loop {
+ match Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
+ Poll::Ready(Ok(x)) => {
+ assert_eq!(x, $buf);
+ break;
+ }
+ Poll::Ready(Err(err)) => {
+ panic!("assertion failed: expected value but got {}", err);
+ }
+ Poll::Pending => {
+ continue;
+ }
+ }
+ }
+ };
+}
+
+#[test]
+fn test_into_async_read() {
+ let stream = stream::iter((1..=3).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])]));
+ let mut reader = stream.interleave_pending().into_async_read();
+ let mut buf = vec![0; 3];
+
+ assert_read!(reader, &mut buf, 3);
+ assert_eq!(&buf, &[1, 2, 3]);
+
+ assert_read!(reader, &mut buf, 2);
+ assert_eq!(&buf[..2], &[4, 5]);
+
+ assert_read!(reader, &mut buf, 3);
+ assert_eq!(&buf, &[1, 2, 3]);
+
+ assert_read!(reader, &mut buf, 2);
+ assert_eq!(&buf[..2], &[4, 5]);
+
+ assert_read!(reader, &mut buf, 3);
+ assert_eq!(&buf, &[1, 2, 3]);
+
+ assert_read!(reader, &mut buf, 2);
+ assert_eq!(&buf[..2], &[4, 5]);
+
+ assert_read!(reader, &mut buf, 0);
+}
+
+#[test]
+fn test_into_async_bufread() -> std::io::Result<()> {
+ let stream = stream::iter((1..=2).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])]));
+ let mut reader = stream.interleave_pending().into_async_read();
+
+ let mut reader = Pin::new(&mut reader);
+
+ assert_fill_buf!(reader, &[1, 2, 3, 4, 5][..]);
+ reader.as_mut().consume(3);
+
+ assert_fill_buf!(reader, &[4, 5][..]);
+ reader.as_mut().consume(2);
+
+ assert_fill_buf!(reader, &[1, 2, 3, 4, 5][..]);
+ reader.as_mut().consume(2);
+
+ assert_fill_buf!(reader, &[3, 4, 5][..]);
+ reader.as_mut().consume(3);
+
+ assert_fill_buf!(reader, &[][..]);
+
+ Ok(())
+}
diff --git a/third_party/rust/futures/tests/stream_peekable.rs b/third_party/rust/futures/tests/stream_peekable.rs
new file mode 100644
index 0000000000..b65a0572cb
--- /dev/null
+++ b/third_party/rust/futures/tests/stream_peekable.rs
@@ -0,0 +1,13 @@
+use futures::executor::block_on;
+use futures::pin_mut;
+use futures::stream::{self, Peekable, StreamExt};
+
+#[test]
+fn peekable() {
+ block_on(async {
+ let peekable: Peekable<_> = stream::iter(vec![1u8, 2, 3]).peekable();
+ pin_mut!(peekable);
+ assert_eq!(peekable.as_mut().peek().await, Some(&1u8));
+ assert_eq!(peekable.collect::<Vec<u8>>().await, vec![1, 2, 3]);
+ });
+}
diff --git a/third_party/rust/futures/tests/stream_select_all.rs b/third_party/rust/futures/tests/stream_select_all.rs
new file mode 100644
index 0000000000..eb711dda0c
--- /dev/null
+++ b/third_party/rust/futures/tests/stream_select_all.rs
@@ -0,0 +1,78 @@
+use futures::channel::mpsc;
+use futures::executor::block_on_stream;
+use futures::future::{self, FutureExt};
+use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt};
+use futures::task::Poll;
+use futures_test::task::noop_context;
+
+#[test]
+fn is_terminated() {
+ let mut cx = noop_context();
+ let mut tasks = SelectAll::new();
+
+ assert_eq!(tasks.is_terminated(), false);
+ assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
+ assert_eq!(tasks.is_terminated(), true);
+
+ // Test that the sentinel value doesn't leak
+ assert_eq!(tasks.is_empty(), true);
+ assert_eq!(tasks.len(), 0);
+
+ tasks.push(future::ready(1).into_stream());
+
+ assert_eq!(tasks.is_empty(), false);
+ assert_eq!(tasks.len(), 1);
+
+ assert_eq!(tasks.is_terminated(), false);
+ assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
+ assert_eq!(tasks.is_terminated(), false);
+ assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
+ assert_eq!(tasks.is_terminated(), true);
+}
+
+#[test]
+fn issue_1626() {
+ let a = stream::iter(0..=2);
+ let b = stream::iter(10..=14);
+
+ let mut s = block_on_stream(stream::select_all(vec![a, b]));
+
+ assert_eq!(s.next(), Some(0));
+ assert_eq!(s.next(), Some(10));
+ assert_eq!(s.next(), Some(1));
+ assert_eq!(s.next(), Some(11));
+ assert_eq!(s.next(), Some(2));
+ assert_eq!(s.next(), Some(12));
+ assert_eq!(s.next(), Some(13));
+ assert_eq!(s.next(), Some(14));
+ assert_eq!(s.next(), None);
+}
+
+#[test]
+fn works_1() {
+ let (a_tx, a_rx) = mpsc::unbounded::<u32>();
+ let (b_tx, b_rx) = mpsc::unbounded::<u32>();
+ let (c_tx, c_rx) = mpsc::unbounded::<u32>();
+
+ let streams = vec![a_rx, b_rx, c_rx];
+
+ let mut stream = block_on_stream(select_all(streams));
+
+ b_tx.unbounded_send(99).unwrap();
+ a_tx.unbounded_send(33).unwrap();
+ assert_eq!(Some(33), stream.next());
+ assert_eq!(Some(99), stream.next());
+
+ b_tx.unbounded_send(99).unwrap();
+ a_tx.unbounded_send(33).unwrap();
+ assert_eq!(Some(33), stream.next());
+ assert_eq!(Some(99), stream.next());
+
+ c_tx.unbounded_send(42).unwrap();
+ assert_eq!(Some(42), stream.next());
+ a_tx.unbounded_send(43).unwrap();
+ assert_eq!(Some(43), stream.next());
+
+ drop((a_tx, b_tx, c_tx));
+ assert_eq!(None, stream.next());
+}
diff --git a/third_party/rust/futures/tests/stream_select_next_some.rs b/third_party/rust/futures/tests/stream_select_next_some.rs
new file mode 100644
index 0000000000..09d7e895c1
--- /dev/null
+++ b/third_party/rust/futures/tests/stream_select_next_some.rs
@@ -0,0 +1,85 @@
+use futures::{future, select};
+use futures::future::{FusedFuture, FutureExt};
+use futures::stream::{FuturesUnordered, StreamExt};
+use futures::task::{Context, Poll};
+use futures_test::future::FutureTestExt;
+use futures_test::task::new_count_waker;
+
+#[test]
+fn is_terminated() {
+ let (waker, counter) = new_count_waker();
+ let mut cx = Context::from_waker(&waker);
+
+ let mut tasks = FuturesUnordered::new();
+
+ let mut select_next_some = tasks.select_next_some();
+ assert_eq!(select_next_some.is_terminated(), false);
+ assert_eq!(select_next_some.poll_unpin(&mut cx), Poll::Pending);
+ assert_eq!(counter, 1);
+ assert_eq!(select_next_some.is_terminated(), true);
+ drop(select_next_some);
+
+ tasks.push(future::ready(1));
+
+ let mut select_next_some = tasks.select_next_some();
+ assert_eq!(select_next_some.is_terminated(), false);
+ assert_eq!(select_next_some.poll_unpin(&mut cx), Poll::Ready(1));
+ assert_eq!(select_next_some.is_terminated(), false);
+ assert_eq!(select_next_some.poll_unpin(&mut cx), Poll::Pending);
+ assert_eq!(select_next_some.is_terminated(), true);
+}
+
+#[test]
+fn select() {
+ // Checks that even though `async_tasks` will yield a `None` and return
+ // `is_terminated() == true` during the first poll, it manages to toggle
+ // back to having items after a future is pushed into it during the second
+ // poll (after pending_once completes).
+ futures::executor::block_on(async {
+ let mut fut = future::ready(1).pending_once();
+ let mut async_tasks = FuturesUnordered::new();
+ let mut total = 0;
+ loop {
+ select! {
+ num = fut => {
+ total += num;
+ async_tasks.push(async { 5 });
+ },
+ num = async_tasks.select_next_some() => {
+ total += num;
+ }
+ complete => break,
+ }
+ }
+ assert_eq!(total, 6);
+ });
+}
+
+// Check that `select!` macro does not fail when importing from `futures_util`.
+#[test]
+fn futures_util_select() {
+ use futures_util::select;
+
+ // Checks that even though `async_tasks` will yield a `None` and return
+ // `is_terminated() == true` during the first poll, it manages to toggle
+ // back to having items after a future is pushed into it during the second
+ // poll (after pending_once completes).
+ futures::executor::block_on(async {
+ let mut fut = future::ready(1).pending_once();
+ let mut async_tasks = FuturesUnordered::new();
+ let mut total = 0;
+ loop {
+ select! {
+ num = fut => {
+ total += num;
+ async_tasks.push(async { 5 });
+ },
+ num = async_tasks.select_next_some() => {
+ total += num;
+ }
+ complete => break,
+ }
+ }
+ assert_eq!(total, 6);
+ });
+}
diff --git a/third_party/rust/futures/tests/try_join.rs b/third_party/rust/futures/tests/try_join.rs
new file mode 100644
index 0000000000..6c6d0843d5
--- /dev/null
+++ b/third_party/rust/futures/tests/try_join.rs
@@ -0,0 +1,36 @@
+#![deny(unreachable_code)]
+
+use futures::{try_join, executor::block_on};
+
+// TODO: This abuses https://github.com/rust-lang/rust/issues/58733 in order to
+// test behaviour of the `try_join!` macro with the never type before it is
+// stabilized. Once `!` is again stabilized this can be removed and replaced
+// with direct use of `!` below where `Never` is used.
+trait MyTrait {
+ type Output;
+}
+impl<T> MyTrait for fn() -> T {
+ type Output = T;
+}
+type Never = <fn() -> ! as MyTrait>::Output;
+
+
+#[test]
+fn try_join_never_error() {
+ block_on(async {
+ let future1 = async { Ok::<(), Never>(()) };
+ let future2 = async { Ok::<(), Never>(()) };
+ try_join!(future1, future2)
+ })
+ .unwrap();
+}
+
+#[test]
+fn try_join_never_ok() {
+ block_on(async {
+ let future1 = async { Err::<Never, ()>(()) };
+ let future2 = async { Err::<Never, ()>(()) };
+ try_join!(future1, future2)
+ })
+ .unwrap_err();
+}
diff --git a/third_party/rust/futures/tests/try_join_all.rs b/third_party/rust/futures/tests/try_join_all.rs
new file mode 100644
index 0000000000..662b866ec8
--- /dev/null
+++ b/third_party/rust/futures/tests/try_join_all.rs
@@ -0,0 +1,44 @@
+use futures_util::future::*;
+use std::future::Future;
+use futures::executor::block_on;
+use std::fmt::Debug;
+
+fn assert_done<T, F>(actual_fut: F, expected: T)
+where
+ T: PartialEq + Debug,
+ F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
+{
+ let output = block_on(actual_fut());
+ assert_eq!(output, expected);
+}
+
+#[test]
+fn collect_collects() {
+ assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2]));
+ assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2));
+ assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1]));
+ // REVIEW: should this be implemented?
+ // assert_done(|| Box::new(try_join_all(Vec::<i32>::new())), Ok(vec![]));
+
+ // TODO: needs more tests
+}
+
+#[test]
+fn try_join_all_iter_lifetime() {
+ // In futures-rs version 0.1, this function would fail to typecheck due to an overly
+ // conservative type parameterization of `TryJoinAll`.
+ fn sizes<'a>(bufs: Vec<&'a [u8]>) -> Box<dyn Future<Output = Result<Vec<usize>, ()>> + Unpin> {
+ let iter = bufs.into_iter().map(|b| ok::<usize, ()>(b.len()));
+ Box::new(try_join_all(iter))
+ }
+
+ assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3 as usize, 0, 1]));
+}
+
+#[test]
+fn try_join_all_from_iter() {
+ assert_done(
+ || Box::new(vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>()),
+ Ok::<_, usize>(vec![1, 2]),
+ )
+}
diff --git a/third_party/rust/futures/tests/unfold.rs b/third_party/rust/futures/tests/unfold.rs
new file mode 100644
index 0000000000..95722cf8a6
--- /dev/null
+++ b/third_party/rust/futures/tests/unfold.rs
@@ -0,0 +1,35 @@
+use futures::future;
+use futures::stream;
+
+use futures_test::future::FutureTestExt;
+use futures_test::{
+ assert_stream_done, assert_stream_next, assert_stream_pending,
+};
+
+#[test]
+fn unfold1() {
+ let mut stream = stream::unfold(0, |state| {
+ if state <= 2 {
+ future::ready(Some((state * 2, state + 1))).pending_once()
+ } else {
+ future::ready(None).pending_once()
+ }
+ });
+
+ // Creates the future with the closure
+ // Not ready (delayed future)
+ assert_stream_pending!(stream);
+ // Future is ready, yields the item
+ assert_stream_next!(stream, 0);
+
+ // Repeat
+ assert_stream_pending!(stream);
+ assert_stream_next!(stream, 2);
+
+ assert_stream_pending!(stream);
+ assert_stream_next!(stream, 4);
+
+ // No more items
+ assert_stream_pending!(stream);
+ assert_stream_done!(stream);
+}