summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures/tests_disabled
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures/tests_disabled')
-rw-r--r--third_party/rust/futures/tests_disabled/all.rs400
-rw-r--r--third_party/rust/futures/tests_disabled/bilock.rs102
-rw-r--r--third_party/rust/futures/tests_disabled/stream.rs368
3 files changed, 870 insertions, 0 deletions
diff --git a/third_party/rust/futures/tests_disabled/all.rs b/third_party/rust/futures/tests_disabled/all.rs
new file mode 100644
index 0000000000..a7a571040a
--- /dev/null
+++ b/third_party/rust/futures/tests_disabled/all.rs
@@ -0,0 +1,400 @@
+use futures::channel::oneshot::{self, Canceled};
+use futures::executor::block_on;
+use futures::future;
+use std::sync::mpsc::{channel, TryRecvError};
+
+// mod support;
+// use support::*;
+
+fn unselect<T, E, A, B>(r: Result<Either<(T, B), (T, A)>, Either<(E, B), (E, A)>>) -> Result<T, E> {
+ match r {
+ Ok(Either::Left((t, _))) | Ok(Either::Right((t, _))) => Ok(t),
+ Err(Either::Left((e, _))) | Err(Either::Right((e, _))) => Err(e),
+ }
+}
+
+#[test]
+fn result_smoke() {
+ fn is_future_v<A, B, C>(_: C)
+ where
+ A: Send + 'static,
+ B: Send + 'static,
+ C: Future<Item = A, Error = B>,
+ {
+ }
+
+ is_future_v::<i32, u32, _>(f_ok(1).map(|a| a + 1));
+ is_future_v::<i32, u32, _>(f_ok(1).map_err(|a| a + 1));
+ is_future_v::<i32, u32, _>(f_ok(1).and_then(Ok));
+ is_future_v::<i32, u32, _>(f_ok(1).or_else(Err));
+ is_future_v::<(i32, i32), u32, _>(f_ok(1).join(Err(3)));
+ is_future_v::<i32, u32, _>(f_ok(1).map(f_ok).flatten());
+
+ assert_done(|| f_ok(1), r_ok(1));
+ assert_done(|| f_err(1), r_err(1));
+ assert_done(|| result(Ok(1)), r_ok(1));
+ assert_done(|| result(Err(1)), r_err(1));
+ assert_done(|| ok(1), r_ok(1));
+ assert_done(|| err(1), r_err(1));
+ assert_done(|| f_ok(1).map(|a| a + 2), r_ok(3));
+ assert_done(|| f_err(1).map(|a| a + 2), r_err(1));
+ assert_done(|| f_ok(1).map_err(|a| a + 2), r_ok(1));
+ assert_done(|| f_err(1).map_err(|a| a + 2), r_err(3));
+ assert_done(|| f_ok(1).and_then(|a| Ok(a + 2)), r_ok(3));
+ assert_done(|| f_err(1).and_then(|a| Ok(a + 2)), r_err(1));
+ assert_done(|| f_ok(1).and_then(|a| Err(a as u32 + 3)), r_err(4));
+ assert_done(|| f_err(1).and_then(|a| Err(a as u32 + 4)), r_err(1));
+ assert_done(|| f_ok(1).or_else(|a| Ok(a as i32 + 2)), r_ok(1));
+ assert_done(|| f_err(1).or_else(|a| Ok(a as i32 + 2)), r_ok(3));
+ assert_done(|| f_ok(1).or_else(|a| Err(a + 3)), r_ok(1));
+ assert_done(|| f_err(1).or_else(|a| Err(a + 4)), r_err(5));
+ assert_done(|| f_ok(1).select(f_err(2)).then(unselect), r_ok(1));
+ assert_done(|| f_ok(1).select(Ok(2)).then(unselect), r_ok(1));
+ assert_done(|| f_err(1).select(f_ok(1)).then(unselect), r_err(1));
+ assert_done(|| f_ok(1).select(empty()).then(unselect), Ok(1));
+ assert_done(|| empty().select(f_ok(1)).then(unselect), Ok(1));
+ assert_done(|| f_ok(1).join(f_err(1)), Err(1));
+ assert_done(|| f_ok(1).join(Ok(2)), Ok((1, 2)));
+ assert_done(|| f_err(1).join(f_ok(1)), Err(1));
+ assert_done(|| f_ok(1).then(|_| Ok(2)), r_ok(2));
+ assert_done(|| f_ok(1).then(|_| Err(2)), r_err(2));
+ assert_done(|| f_err(1).then(|_| Ok(2)), r_ok(2));
+ assert_done(|| f_err(1).then(|_| Err(2)), r_err(2));
+}
+
+#[test]
+fn test_empty() {
+ fn empty() -> Empty<i32, u32> {
+ future::empty()
+ }
+
+ assert_empty(|| empty());
+ assert_empty(|| empty().select(empty()));
+ assert_empty(|| empty().join(empty()));
+ assert_empty(|| empty().join(f_ok(1)));
+ assert_empty(|| f_ok(1).join(empty()));
+ assert_empty(|| empty().or_else(move |_| empty()));
+ assert_empty(|| empty().and_then(move |_| empty()));
+ assert_empty(|| f_err(1).or_else(move |_| empty()));
+ assert_empty(|| f_ok(1).and_then(move |_| empty()));
+ assert_empty(|| empty().map(|a| a + 1));
+ assert_empty(|| empty().map_err(|a| a + 1));
+ assert_empty(|| empty().then(|a| a));
+}
+
+#[test]
+fn test_ok() {
+ assert_done(|| ok(1), r_ok(1));
+ assert_done(|| err(1), r_err(1));
+}
+
+#[test]
+fn flatten() {
+ fn ok<T: Send + 'static>(a: T) -> FutureResult<T, u32> {
+ future::ok(a)
+ }
+ fn err<E: Send + 'static>(b: E) -> FutureResult<i32, E> {
+ future::err(b)
+ }
+
+ assert_done(|| ok(ok(1)).flatten(), r_ok(1));
+ assert_done(|| ok(err(1)).flatten(), r_err(1));
+ assert_done(|| err(1u32).map(ok).flatten(), r_err(1));
+ assert_done(|| future::ok(future::ok(1)).flatten(), r_ok(1));
+ assert_empty(|| ok(empty::<i32, u32>()).flatten());
+ assert_empty(|| empty::<i32, u32>().map(ok).flatten());
+}
+
+#[test]
+fn smoke_oneshot() {
+ assert_done(
+ || {
+ let (c, p) = oneshot::channel();
+ c.send(1).unwrap();
+ p
+ },
+ Ok(1),
+ );
+ assert_done(
+ || {
+ let (c, p) = oneshot::channel::<i32>();
+ drop(c);
+ p
+ },
+ Err(Canceled),
+ );
+ let mut completes = Vec::new();
+ assert_empty(|| {
+ let (a, b) = oneshot::channel::<i32>();
+ completes.push(a);
+ b
+ });
+
+ let (c, mut p) = oneshot::channel::<i32>();
+ drop(c);
+ let res = panic_waker_lw(|lw| p.poll(lw));
+ assert!(res.is_err());
+ let (c, p) = oneshot::channel::<i32>();
+ drop(c);
+ let (tx, rx) = channel();
+ p.then(move |_| tx.send(())).forget();
+ rx.recv().unwrap();
+}
+
+#[test]
+fn select_cancels() {
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |b| {
+ btx.send(b).unwrap();
+ b
+ });
+ let d = d.map(move |d| {
+ dtx.send(d).unwrap();
+ d
+ });
+
+ let mut f = b.select(d).then(unselect);
+ // assert!(f.poll(&mut Task::new()).is_pending());
+ assert!(brx.try_recv().is_err());
+ assert!(drx.try_recv().is_err());
+ a.send(1).unwrap();
+ noop_waker_lw(|lw| {
+ let res = f.poll(lw);
+ assert!(res.ok().unwrap().is_ready());
+ assert_eq!(brx.recv().unwrap(), 1);
+ drop(c);
+ assert!(drx.recv().is_err());
+
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, _brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |b| {
+ btx.send(b).unwrap();
+ b
+ });
+ let d = d.map(move |d| {
+ dtx.send(d).unwrap();
+ d
+ });
+
+ let mut f = b.select(d).then(unselect);
+ assert!(f.poll(lw).ok().unwrap().is_pending());
+ assert!(f.poll(lw).ok().unwrap().is_pending());
+ a.send(1).unwrap();
+ assert!(f.poll(lw).ok().unwrap().is_ready());
+ drop((c, f));
+ assert!(drx.recv().is_err());
+ })
+}
+
+#[test]
+fn join_cancels() {
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, _brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |b| {
+ btx.send(b).unwrap();
+ b
+ });
+ let d = d.map(move |d| {
+ dtx.send(d).unwrap();
+ d
+ });
+
+ let mut f = b.join(d);
+ drop(a);
+ let res = panic_waker_lw(|lw| f.poll(lw));
+ assert!(res.is_err());
+ drop(c);
+ assert!(drx.recv().is_err());
+
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, _brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |b| {
+ btx.send(b).unwrap();
+ b
+ });
+ let d = d.map(move |d| {
+ dtx.send(d).unwrap();
+ d
+ });
+
+ let (tx, rx) = channel();
+ let f = b.join(d);
+ f.then(move |_| {
+ tx.send(()).unwrap();
+ let res: Result<(), ()> = Ok(());
+ res
+ })
+ .forget();
+ assert!(rx.try_recv().is_err());
+ drop(a);
+ rx.recv().unwrap();
+ drop(c);
+ assert!(drx.recv().is_err());
+}
+
+#[test]
+fn join_incomplete() {
+ let (a, b) = oneshot::channel::<i32>();
+ let (tx, rx) = channel();
+ noop_waker_lw(|lw| {
+ let mut f = ok(1).join(b).map(move |r| tx.send(r).unwrap());
+ assert!(f.poll(lw).ok().unwrap().is_pending());
+ assert!(rx.try_recv().is_err());
+ a.send(2).unwrap();
+ assert!(f.poll(lw).ok().unwrap().is_ready());
+ assert_eq!(rx.recv().unwrap(), (1, 2));
+
+ let (a, b) = oneshot::channel::<i32>();
+ let (tx, rx) = channel();
+ let mut f = b.join(Ok(2)).map(move |r| tx.send(r).unwrap());
+ assert!(f.poll(lw).ok().unwrap().is_pending());
+ assert!(rx.try_recv().is_err());
+ a.send(1).unwrap();
+ assert!(f.poll(lw).ok().unwrap().is_ready());
+ assert_eq!(rx.recv().unwrap(), (1, 2));
+
+ let (a, b) = oneshot::channel::<i32>();
+ let (tx, rx) = channel();
+ let mut f = ok(1).join(b).map_err(move |_r| tx.send(2).unwrap());
+ assert!(f.poll(lw).ok().unwrap().is_pending());
+ assert!(rx.try_recv().is_err());
+ drop(a);
+ assert!(f.poll(lw).is_err());
+ assert_eq!(rx.recv().unwrap(), 2);
+
+ let (a, b) = oneshot::channel::<i32>();
+ let (tx, rx) = channel();
+ let mut f = b.join(Ok(2)).map_err(move |_r| tx.send(1).unwrap());
+ assert!(f.poll(lw).ok().unwrap().is_pending());
+ assert!(rx.try_recv().is_err());
+ drop(a);
+ assert!(f.poll(lw).is_err());
+ assert_eq!(rx.recv().unwrap(), 1);
+ })
+}
+
+#[test]
+fn select2() {
+ assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2));
+ assert_done(|| empty().select(f_ok(2)).then(unselect), Ok(2));
+ assert_done(|| f_err(2).select(empty()).then(unselect), Err(2));
+ assert_done(|| empty().select(f_err(2)).then(unselect), Err(2));
+
+ assert_done(
+ || {
+ f_ok(1).select(f_ok(2)).map_err(|_| 0).and_then(|either_tup| {
+ let (a, b) = either_tup.into_inner();
+ b.map(move |b| a + b)
+ })
+ },
+ Ok(3),
+ );
+
+ // Finish one half of a select and then fail the second, ensuring that we
+ // get the notification of the second one.
+ {
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let f = b.select(d);
+ let (tx, rx) = channel();
+ f.map(move |r| tx.send(r).unwrap()).forget();
+ a.send(1).unwrap();
+ let (val, next) = rx.recv().unwrap().into_inner();
+ assert_eq!(val, 1);
+ let (tx, rx) = channel();
+ next.map_err(move |_r| tx.send(2).unwrap()).forget();
+ assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty);
+ drop(c);
+ assert_eq!(rx.recv().unwrap(), 2);
+ }
+
+ // Fail the second half and ensure that we see the first one finish
+ {
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let f = b.select(d);
+ let (tx, rx) = channel();
+ f.map_err(move |r| tx.send((1, r.into_inner().1)).unwrap()).forget();
+ drop(c);
+ let (val, next) = rx.recv().unwrap();
+ assert_eq!(val, 1);
+ let (tx, rx) = channel();
+ next.map(move |r| tx.send(r).unwrap()).forget();
+ assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty);
+ a.send(2).unwrap();
+ assert_eq!(rx.recv().unwrap(), 2);
+ }
+
+ // Cancelling the first half should cancel the second
+ {
+ let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |v| {
+ btx.send(v).unwrap();
+ v
+ });
+ let d = d.map(move |v| {
+ dtx.send(v).unwrap();
+ v
+ });
+ let f = b.select(d);
+ drop(f);
+ assert!(drx.recv().is_err());
+ assert!(brx.recv().is_err());
+ }
+
+ // Cancel after a schedule
+ {
+ let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |v| {
+ btx.send(v).unwrap();
+ v
+ });
+ let d = d.map(move |v| {
+ dtx.send(v).unwrap();
+ v
+ });
+ let mut f = b.select(d);
+ let _res = noop_waker_lw(|lw| f.poll(lw));
+ drop(f);
+ assert!(drx.recv().is_err());
+ assert!(brx.recv().is_err());
+ }
+
+ // Cancel propagates
+ {
+ let ((a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |v| {
+ btx.send(v).unwrap();
+ v
+ });
+ let d = d.map(move |v| {
+ dtx.send(v).unwrap();
+ v
+ });
+ let (tx, rx) = channel();
+ b.select(d).map(move |_| tx.send(()).unwrap()).forget();
+ drop(a);
+ assert!(drx.recv().is_err());
+ assert!(brx.recv().is_err());
+ assert!(rx.recv().is_err());
+ }
+
+ // Cancel on early drop
+ {
+ let (tx, rx) = channel();
+ let f = f_ok(1).select(empty::<_, ()>().map(move |()| {
+ tx.send(()).unwrap();
+ 1
+ }));
+ drop(f);
+ assert!(rx.recv().is_err());
+ }
+}
+
+#[test]
+fn option() {
+ assert_eq!(Ok(Some(())), block_on(Some(ok::<(), ()>(())).into_future()));
+ assert_eq!(Ok::<_, ()>(None::<()>), block_on(None::<FutureResult<(), ()>>.into_future()));
+}
diff --git a/third_party/rust/futures/tests_disabled/bilock.rs b/third_party/rust/futures/tests_disabled/bilock.rs
new file mode 100644
index 0000000000..0166ca48ba
--- /dev/null
+++ b/third_party/rust/futures/tests_disabled/bilock.rs
@@ -0,0 +1,102 @@
+use futures::future;
+use futures::stream;
+use futures::task;
+use futures_util::lock::BiLock;
+use std::thread;
+
+// mod support;
+// use support::*;
+
+#[test]
+fn smoke() {
+ let future = future::lazy(|_| {
+ let (a, b) = BiLock::new(1);
+
+ {
+ let mut lock = match a.poll_lock() {
+ Poll::Ready(l) => l,
+ Poll::Pending => panic!("poll not ready"),
+ };
+ assert_eq!(*lock, 1);
+ *lock = 2;
+
+ assert!(b.poll_lock().is_pending());
+ assert!(a.poll_lock().is_pending());
+ }
+
+ assert!(b.poll_lock().is_ready());
+ assert!(a.poll_lock().is_ready());
+
+ {
+ let lock = match b.poll_lock() {
+ Poll::Ready(l) => l,
+ Poll::Pending => panic!("poll not ready"),
+ };
+ assert_eq!(*lock, 2);
+ }
+
+ assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2);
+
+ Ok::<(), ()>(())
+ });
+
+ assert!(task::spawn(future)
+ .poll_future_notify(&notify_noop(), 0)
+ .expect("failure in poll")
+ .is_ready());
+}
+
+#[test]
+fn concurrent() {
+ const N: usize = 10000;
+ let (a, b) = BiLock::new(0);
+
+ let a = Increment { a: Some(a), remaining: N };
+ let b = stream::iter_ok(0..N).fold(b, |b, _n| {
+ b.lock().map(|mut b| {
+ *b += 1;
+ b.unlock()
+ })
+ });
+
+ let t1 = thread::spawn(move || a.wait());
+ let b = b.wait().expect("b error");
+ let a = t1.join().unwrap().expect("a error");
+
+ match a.poll_lock() {
+ Poll::Ready(l) => assert_eq!(*l, 2 * N),
+ Poll::Pending => panic!("poll not ready"),
+ }
+ match b.poll_lock() {
+ Poll::Ready(l) => assert_eq!(*l, 2 * N),
+ Poll::Pending => panic!("poll not ready"),
+ }
+
+ assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N);
+
+ struct Increment {
+ remaining: usize,
+ a: Option<BiLock<usize>>,
+ }
+
+ impl Future for Increment {
+ type Item = BiLock<usize>;
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<BiLock<usize>, ()> {
+ loop {
+ if self.remaining == 0 {
+ return Ok(self.a.take().unwrap().into());
+ }
+
+ let a = self.a.as_ref().unwrap();
+ let mut a = match a.poll_lock() {
+ Poll::Ready(l) => l,
+ Poll::Pending => return Ok(Poll::Pending),
+ };
+ self.remaining -= 1;
+ *a += 1;
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures/tests_disabled/stream.rs b/third_party/rust/futures/tests_disabled/stream.rs
new file mode 100644
index 0000000000..a4eec2c7aa
--- /dev/null
+++ b/third_party/rust/futures/tests_disabled/stream.rs
@@ -0,0 +1,368 @@
+use futures::channel::mpsc;
+use futures::channel::oneshot;
+use futures::executor::{block_on, block_on_stream};
+use futures::future::{err, ok};
+use futures::stream::{empty, iter_ok, poll_fn, Peekable};
+
+// mod support;
+// use support::*;
+
+pub struct Iter<I> {
+ iter: I,
+}
+
+pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter>
+where
+ J: IntoIterator<Item = Result<T, E>>,
+{
+ Iter { iter: i.into_iter() }
+}
+
+impl<I, T, E> Stream for Iter<I>
+where
+ I: Iterator<Item = Result<T, E>>,
+{
+ type Item = T;
+ type Error = E;
+
+ fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<T>, E> {
+ match self.iter.next() {
+ Some(Ok(e)) => Ok(Poll::Ready(Some(e))),
+ Some(Err(e)) => Err(e),
+ None => Ok(Poll::Ready(None)),
+ }
+ }
+}
+
+fn list() -> Box<Stream<Item = i32, Error = u32> + Send> {
+ let (tx, rx) = mpsc::channel(1);
+ tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Ok(3))).forget();
+ Box::new(rx.then(|r| r.unwrap()))
+}
+
+fn err_list() -> Box<Stream<Item = i32, Error = u32> + Send> {
+ let (tx, rx) = mpsc::channel(1);
+ tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Err(3))).forget();
+ Box::new(rx.then(|r| r.unwrap()))
+}
+
+#[test]
+fn map() {
+ assert_done(|| list().map(|a| a + 1).collect(), Ok(vec![2, 3, 4]));
+}
+
+#[test]
+fn map_err() {
+ assert_done(|| err_list().map_err(|a| a + 1).collect::<Vec<_>>(), Err(4));
+}
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
+struct FromErrTest(u32);
+
+impl From<u32> for FromErrTest {
+ fn from(i: u32) -> Self {
+ Self(i)
+ }
+}
+
+#[test]
+fn from_err() {
+ assert_done(|| err_list().err_into().collect::<Vec<_>>(), Err(FromErrTest(3)));
+}
+
+#[test]
+fn fold() {
+ assert_done(|| list().fold(0, |a, b| ok::<i32, u32>(a + b)), Ok(6));
+ assert_done(|| err_list().fold(0, |a, b| ok::<i32, u32>(a + b)), Err(3));
+}
+
+#[test]
+fn filter() {
+ assert_done(|| list().filter(|a| ok(*a % 2 == 0)).collect(), Ok(vec![2]));
+}
+
+#[test]
+fn filter_map() {
+ assert_done(
+ || list().filter_map(|x| ok(if x % 2 == 0 { Some(x + 10) } else { None })).collect(),
+ Ok(vec![12]),
+ );
+}
+
+#[test]
+fn and_then() {
+ assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4]));
+ assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect::<Vec<_>>(), Err(1));
+}
+
+#[test]
+fn then() {
+ assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4]));
+}
+
+#[test]
+fn or_else() {
+ assert_done(|| err_list().or_else(|a| ok::<i32, u32>(a as i32)).collect(), Ok(vec![1, 2, 3]));
+}
+
+#[test]
+fn flatten() {
+ assert_done(|| list().map(|_| list()).flatten().collect(), Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3]));
+}
+
+#[test]
+fn skip() {
+ assert_done(|| list().skip(2).collect(), Ok(vec![3]));
+}
+
+#[test]
+fn skip_passes_errors_through() {
+ let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1));
+ assert_eq!(s.next(), Some(Err(1)));
+ assert_eq!(s.next(), Some(Err(2)));
+ assert_eq!(s.next(), Some(Ok(4)));
+ assert_eq!(s.next(), Some(Ok(5)));
+ assert_eq!(s.next(), None);
+}
+
+#[test]
+fn skip_while() {
+ assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), Ok(vec![2, 3]));
+}
+#[test]
+fn take() {
+ assert_done(|| list().take(2).collect(), Ok(vec![1, 2]));
+}
+
+#[test]
+fn take_while() {
+ assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), Ok(vec![1, 2]));
+}
+
+#[test]
+fn take_passes_errors_through() {
+ let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)]).take(1));
+ assert_eq!(s.next(), Some(Err(1)));
+ assert_eq!(s.next(), Some(Err(2)));
+ assert_eq!(s.next(), Some(Ok(3)));
+ assert_eq!(s.next(), None);
+
+ let mut s = block_on_stream(iter(vec![Ok(1), Err(2)]).take(1));
+ assert_eq!(s.next(), Some(Ok(1)));
+ assert_eq!(s.next(), None);
+}
+
+#[test]
+fn peekable() {
+ assert_done(|| list().peekable().collect(), Ok(vec![1, 2, 3]));
+}
+
+#[test]
+fn fuse() {
+ let mut stream = block_on_stream(list().fuse());
+ assert_eq!(stream.next(), Some(Ok(1)));
+ assert_eq!(stream.next(), Some(Ok(2)));
+ assert_eq!(stream.next(), Some(Ok(3)));
+ assert_eq!(stream.next(), None);
+ assert_eq!(stream.next(), None);
+ assert_eq!(stream.next(), None);
+}
+
+#[test]
+fn buffered() {
+ let (tx, rx) = mpsc::channel(1);
+ let (a, b) = oneshot::channel::<u32>();
+ let (c, d) = oneshot::channel::<u32>();
+
+ tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
+ .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
+ .forget();
+
+ let mut rx = rx.buffered(2);
+ sassert_empty(&mut rx);
+ c.send(3).unwrap();
+ sassert_empty(&mut rx);
+ a.send(5).unwrap();
+ let mut rx = block_on_stream(rx);
+ assert_eq!(rx.next(), Some(Ok(5)));
+ assert_eq!(rx.next(), Some(Ok(3)));
+ assert_eq!(rx.next(), None);
+
+ let (tx, rx) = mpsc::channel(1);
+ let (a, b) = oneshot::channel::<u32>();
+ let (c, d) = oneshot::channel::<u32>();
+
+ tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
+ .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
+ .forget();
+
+ let mut rx = rx.buffered(1);
+ sassert_empty(&mut rx);
+ c.send(3).unwrap();
+ sassert_empty(&mut rx);
+ a.send(5).unwrap();
+ let mut rx = block_on_stream(rx);
+ assert_eq!(rx.next(), Some(Ok(5)));
+ assert_eq!(rx.next(), Some(Ok(3)));
+ assert_eq!(rx.next(), None);
+}
+
+#[test]
+fn unordered() {
+ let (tx, rx) = mpsc::channel(1);
+ let (a, b) = oneshot::channel::<u32>();
+ let (c, d) = oneshot::channel::<u32>();
+
+ tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
+ .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
+ .forget();
+
+ let mut rx = rx.buffer_unordered(2);
+ sassert_empty(&mut rx);
+ let mut rx = block_on_stream(rx);
+ c.send(3).unwrap();
+ assert_eq!(rx.next(), Some(Ok(3)));
+ a.send(5).unwrap();
+ assert_eq!(rx.next(), Some(Ok(5)));
+ assert_eq!(rx.next(), None);
+
+ let (tx, rx) = mpsc::channel(1);
+ let (a, b) = oneshot::channel::<u32>();
+ let (c, d) = oneshot::channel::<u32>();
+
+ tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
+ .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
+ .forget();
+
+ // We don't even get to see `c` until `a` completes.
+ let mut rx = rx.buffer_unordered(1);
+ sassert_empty(&mut rx);
+ c.send(3).unwrap();
+ sassert_empty(&mut rx);
+ a.send(5).unwrap();
+ let mut rx = block_on_stream(rx);
+ assert_eq!(rx.next(), Some(Ok(5)));
+ assert_eq!(rx.next(), Some(Ok(3)));
+ assert_eq!(rx.next(), None);
+}
+
+#[test]
+fn zip() {
+ assert_done(|| list().zip(list()).collect(), Ok(vec![(1, 1), (2, 2), (3, 3)]));
+ assert_done(|| list().zip(list().take(2)).collect(), Ok(vec![(1, 1), (2, 2)]));
+ assert_done(|| list().take(2).zip(list()).collect(), Ok(vec![(1, 1), (2, 2)]));
+ assert_done(|| err_list().zip(list()).collect::<Vec<_>>(), Err(3));
+ assert_done(|| list().zip(list().map(|x| x + 1)).collect(), Ok(vec![(1, 2), (2, 3), (3, 4)]));
+}
+
+#[test]
+fn peek() {
+ struct Peek {
+ inner: Peekable<Box<Stream<Item = i32, Error = u32> + Send>>,
+ }
+
+ impl Future for Peek {
+ type Item = ();
+ type Error = u32;
+
+ fn poll(&mut self, cx: &mut Context<'_>) -> Poll<(), u32> {
+ {
+ let res = ready!(self.inner.peek(cx))?;
+ assert_eq!(res, Some(&1));
+ }
+ assert_eq!(self.inner.peek(cx).unwrap(), Some(&1).into());
+ assert_eq!(self.inner.poll_next(cx).unwrap(), Some(1).into());
+ Ok(Poll::Ready(()))
+ }
+ }
+
+ block_on(Peek { inner: list().peekable() }).unwrap()
+}
+
+#[test]
+fn wait() {
+ assert_eq!(block_on_stream(list()).collect::<Result<Vec<_>, _>>(), Ok(vec![1, 2, 3]));
+}
+
+#[test]
+fn chunks() {
+ assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]]));
+ assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]]));
+ assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]]));
+ let mut list = block_on_stream(err_list().chunks(3));
+ let i = list.next().unwrap().unwrap();
+ assert_eq!(i, vec![1, 2]);
+ let i = list.next().unwrap().unwrap_err();
+ assert_eq!(i, 3);
+}
+
+#[test]
+#[should_panic]
+fn chunks_panic_on_cap_zero() {
+ let _ = list().chunks(0);
+}
+
+#[test]
+fn forward() {
+ let v = Vec::new();
+ let v = block_on(iter_ok::<_, Never>(vec![0, 1]).forward(v)).unwrap().1;
+ assert_eq!(v, vec![0, 1]);
+
+ let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1;
+ assert_eq!(v, vec![0, 1, 2, 3]);
+
+ assert_done(
+ move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s),
+ Ok(vec![0, 1, 2, 3, 4, 5]),
+ );
+}
+
+#[test]
+fn concat() {
+ let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
+ assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
+
+ let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
+ assert_done(move || b.concat(), Err(()));
+}
+
+#[test]
+fn concat2() {
+ let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
+ assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
+
+ let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
+ assert_done(move || b.concat(), Err(()));
+
+ let c = empty::<Vec<()>, ()>();
+ assert_done(move || c.concat(), Ok(vec![]))
+}
+
+#[test]
+fn stream_poll_fn() {
+ let mut counter = 5usize;
+
+ let read_stream = poll_fn(move |_| -> Poll<Option<usize>, std::io::Error> {
+ if counter == 0 {
+ return Ok(Poll::Ready(None));
+ }
+ counter -= 1;
+ Ok(Poll::Ready(Some(counter)))
+ });
+
+ assert_eq!(block_on_stream(read_stream).count(), 5);
+}
+
+#[test]
+fn inspect() {
+ let mut seen = vec![];
+ assert_done(|| list().inspect(|&a| seen.push(a)).collect(), Ok(vec![1, 2, 3]));
+ assert_eq!(seen, [1, 2, 3]);
+}
+
+#[test]
+fn inspect_err() {
+ let mut seen = vec![];
+ assert_done(|| err_list().inspect_err(|&a| seen.push(a)).collect::<Vec<_>>(), Err(3));
+ assert_eq!(seen, [3]);
+}