diff options
Diffstat (limited to 'third_party/rust/futures/tests_disabled')
-rw-r--r-- | third_party/rust/futures/tests_disabled/all.rs | 400 | ||||
-rw-r--r-- | third_party/rust/futures/tests_disabled/bilock.rs | 102 | ||||
-rw-r--r-- | third_party/rust/futures/tests_disabled/stream.rs | 368 |
3 files changed, 870 insertions, 0 deletions
diff --git a/third_party/rust/futures/tests_disabled/all.rs b/third_party/rust/futures/tests_disabled/all.rs new file mode 100644 index 0000000000..a7a571040a --- /dev/null +++ b/third_party/rust/futures/tests_disabled/all.rs @@ -0,0 +1,400 @@ +use futures::channel::oneshot::{self, Canceled}; +use futures::executor::block_on; +use futures::future; +use std::sync::mpsc::{channel, TryRecvError}; + +// mod support; +// use support::*; + +fn unselect<T, E, A, B>(r: Result<Either<(T, B), (T, A)>, Either<(E, B), (E, A)>>) -> Result<T, E> { + match r { + Ok(Either::Left((t, _))) | Ok(Either::Right((t, _))) => Ok(t), + Err(Either::Left((e, _))) | Err(Either::Right((e, _))) => Err(e), + } +} + +#[test] +fn result_smoke() { + fn is_future_v<A, B, C>(_: C) + where + A: Send + 'static, + B: Send + 'static, + C: Future<Item = A, Error = B>, + { + } + + is_future_v::<i32, u32, _>(f_ok(1).map(|a| a + 1)); + is_future_v::<i32, u32, _>(f_ok(1).map_err(|a| a + 1)); + is_future_v::<i32, u32, _>(f_ok(1).and_then(Ok)); + is_future_v::<i32, u32, _>(f_ok(1).or_else(Err)); + is_future_v::<(i32, i32), u32, _>(f_ok(1).join(Err(3))); + is_future_v::<i32, u32, _>(f_ok(1).map(f_ok).flatten()); + + assert_done(|| f_ok(1), r_ok(1)); + assert_done(|| f_err(1), r_err(1)); + assert_done(|| result(Ok(1)), r_ok(1)); + assert_done(|| result(Err(1)), r_err(1)); + assert_done(|| ok(1), r_ok(1)); + assert_done(|| err(1), r_err(1)); + assert_done(|| f_ok(1).map(|a| a + 2), r_ok(3)); + assert_done(|| f_err(1).map(|a| a + 2), r_err(1)); + assert_done(|| f_ok(1).map_err(|a| a + 2), r_ok(1)); + assert_done(|| f_err(1).map_err(|a| a + 2), r_err(3)); + assert_done(|| f_ok(1).and_then(|a| Ok(a + 2)), r_ok(3)); + assert_done(|| f_err(1).and_then(|a| Ok(a + 2)), r_err(1)); + assert_done(|| f_ok(1).and_then(|a| Err(a as u32 + 3)), r_err(4)); + assert_done(|| f_err(1).and_then(|a| Err(a as u32 + 4)), r_err(1)); + assert_done(|| f_ok(1).or_else(|a| Ok(a as i32 + 2)), r_ok(1)); + assert_done(|| f_err(1).or_else(|a| Ok(a as i32 + 2)), r_ok(3)); + assert_done(|| f_ok(1).or_else(|a| Err(a + 3)), r_ok(1)); + assert_done(|| f_err(1).or_else(|a| Err(a + 4)), r_err(5)); + assert_done(|| f_ok(1).select(f_err(2)).then(unselect), r_ok(1)); + assert_done(|| f_ok(1).select(Ok(2)).then(unselect), r_ok(1)); + assert_done(|| f_err(1).select(f_ok(1)).then(unselect), r_err(1)); + assert_done(|| f_ok(1).select(empty()).then(unselect), Ok(1)); + assert_done(|| empty().select(f_ok(1)).then(unselect), Ok(1)); + assert_done(|| f_ok(1).join(f_err(1)), Err(1)); + assert_done(|| f_ok(1).join(Ok(2)), Ok((1, 2))); + assert_done(|| f_err(1).join(f_ok(1)), Err(1)); + assert_done(|| f_ok(1).then(|_| Ok(2)), r_ok(2)); + assert_done(|| f_ok(1).then(|_| Err(2)), r_err(2)); + assert_done(|| f_err(1).then(|_| Ok(2)), r_ok(2)); + assert_done(|| f_err(1).then(|_| Err(2)), r_err(2)); +} + +#[test] +fn test_empty() { + fn empty() -> Empty<i32, u32> { + future::empty() + } + + assert_empty(|| empty()); + assert_empty(|| empty().select(empty())); + assert_empty(|| empty().join(empty())); + assert_empty(|| empty().join(f_ok(1))); + assert_empty(|| f_ok(1).join(empty())); + assert_empty(|| empty().or_else(move |_| empty())); + assert_empty(|| empty().and_then(move |_| empty())); + assert_empty(|| f_err(1).or_else(move |_| empty())); + assert_empty(|| f_ok(1).and_then(move |_| empty())); + assert_empty(|| empty().map(|a| a + 1)); + assert_empty(|| empty().map_err(|a| a + 1)); + assert_empty(|| empty().then(|a| a)); +} + +#[test] +fn test_ok() { + assert_done(|| ok(1), r_ok(1)); + assert_done(|| err(1), r_err(1)); +} + +#[test] +fn flatten() { + fn ok<T: Send + 'static>(a: T) -> FutureResult<T, u32> { + future::ok(a) + } + fn err<E: Send + 'static>(b: E) -> FutureResult<i32, E> { + future::err(b) + } + + assert_done(|| ok(ok(1)).flatten(), r_ok(1)); + assert_done(|| ok(err(1)).flatten(), r_err(1)); + assert_done(|| err(1u32).map(ok).flatten(), r_err(1)); + assert_done(|| future::ok(future::ok(1)).flatten(), r_ok(1)); + assert_empty(|| ok(empty::<i32, u32>()).flatten()); + assert_empty(|| empty::<i32, u32>().map(ok).flatten()); +} + +#[test] +fn smoke_oneshot() { + assert_done( + || { + let (c, p) = oneshot::channel(); + c.send(1).unwrap(); + p + }, + Ok(1), + ); + assert_done( + || { + let (c, p) = oneshot::channel::<i32>(); + drop(c); + p + }, + Err(Canceled), + ); + let mut completes = Vec::new(); + assert_empty(|| { + let (a, b) = oneshot::channel::<i32>(); + completes.push(a); + b + }); + + let (c, mut p) = oneshot::channel::<i32>(); + drop(c); + let res = panic_waker_lw(|lw| p.poll(lw)); + assert!(res.is_err()); + let (c, p) = oneshot::channel::<i32>(); + drop(c); + let (tx, rx) = channel(); + p.then(move |_| tx.send(())).forget(); + rx.recv().unwrap(); +} + +#[test] +fn select_cancels() { + let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); + + let mut f = b.select(d).then(unselect); + // assert!(f.poll(&mut Task::new()).is_pending()); + assert!(brx.try_recv().is_err()); + assert!(drx.try_recv().is_err()); + a.send(1).unwrap(); + noop_waker_lw(|lw| { + let res = f.poll(lw); + assert!(res.ok().unwrap().is_ready()); + assert_eq!(brx.recv().unwrap(), 1); + drop(c); + assert!(drx.recv().is_err()); + + let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let ((btx, _brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); + + let mut f = b.select(d).then(unselect); + assert!(f.poll(lw).ok().unwrap().is_pending()); + assert!(f.poll(lw).ok().unwrap().is_pending()); + a.send(1).unwrap(); + assert!(f.poll(lw).ok().unwrap().is_ready()); + drop((c, f)); + assert!(drx.recv().is_err()); + }) +} + +#[test] +fn join_cancels() { + let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let ((btx, _brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); + + let mut f = b.join(d); + drop(a); + let res = panic_waker_lw(|lw| f.poll(lw)); + assert!(res.is_err()); + drop(c); + assert!(drx.recv().is_err()); + + let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let ((btx, _brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); + + let (tx, rx) = channel(); + let f = b.join(d); + f.then(move |_| { + tx.send(()).unwrap(); + let res: Result<(), ()> = Ok(()); + res + }) + .forget(); + assert!(rx.try_recv().is_err()); + drop(a); + rx.recv().unwrap(); + drop(c); + assert!(drx.recv().is_err()); +} + +#[test] +fn join_incomplete() { + let (a, b) = oneshot::channel::<i32>(); + let (tx, rx) = channel(); + noop_waker_lw(|lw| { + let mut f = ok(1).join(b).map(move |r| tx.send(r).unwrap()); + assert!(f.poll(lw).ok().unwrap().is_pending()); + assert!(rx.try_recv().is_err()); + a.send(2).unwrap(); + assert!(f.poll(lw).ok().unwrap().is_ready()); + assert_eq!(rx.recv().unwrap(), (1, 2)); + + let (a, b) = oneshot::channel::<i32>(); + let (tx, rx) = channel(); + let mut f = b.join(Ok(2)).map(move |r| tx.send(r).unwrap()); + assert!(f.poll(lw).ok().unwrap().is_pending()); + assert!(rx.try_recv().is_err()); + a.send(1).unwrap(); + assert!(f.poll(lw).ok().unwrap().is_ready()); + assert_eq!(rx.recv().unwrap(), (1, 2)); + + let (a, b) = oneshot::channel::<i32>(); + let (tx, rx) = channel(); + let mut f = ok(1).join(b).map_err(move |_r| tx.send(2).unwrap()); + assert!(f.poll(lw).ok().unwrap().is_pending()); + assert!(rx.try_recv().is_err()); + drop(a); + assert!(f.poll(lw).is_err()); + assert_eq!(rx.recv().unwrap(), 2); + + let (a, b) = oneshot::channel::<i32>(); + let (tx, rx) = channel(); + let mut f = b.join(Ok(2)).map_err(move |_r| tx.send(1).unwrap()); + assert!(f.poll(lw).ok().unwrap().is_pending()); + assert!(rx.try_recv().is_err()); + drop(a); + assert!(f.poll(lw).is_err()); + assert_eq!(rx.recv().unwrap(), 1); + }) +} + +#[test] +fn select2() { + assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2)); + assert_done(|| empty().select(f_ok(2)).then(unselect), Ok(2)); + assert_done(|| f_err(2).select(empty()).then(unselect), Err(2)); + assert_done(|| empty().select(f_err(2)).then(unselect), Err(2)); + + assert_done( + || { + f_ok(1).select(f_ok(2)).map_err(|_| 0).and_then(|either_tup| { + let (a, b) = either_tup.into_inner(); + b.map(move |b| a + b) + }) + }, + Ok(3), + ); + + // Finish one half of a select and then fail the second, ensuring that we + // get the notification of the second one. + { + let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let f = b.select(d); + let (tx, rx) = channel(); + f.map(move |r| tx.send(r).unwrap()).forget(); + a.send(1).unwrap(); + let (val, next) = rx.recv().unwrap().into_inner(); + assert_eq!(val, 1); + let (tx, rx) = channel(); + next.map_err(move |_r| tx.send(2).unwrap()).forget(); + assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty); + drop(c); + assert_eq!(rx.recv().unwrap(), 2); + } + + // Fail the second half and ensure that we see the first one finish + { + let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let f = b.select(d); + let (tx, rx) = channel(); + f.map_err(move |r| tx.send((1, r.into_inner().1)).unwrap()).forget(); + drop(c); + let (val, next) = rx.recv().unwrap(); + assert_eq!(val, 1); + let (tx, rx) = channel(); + next.map(move |r| tx.send(r).unwrap()).forget(); + assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty); + a.send(2).unwrap(); + assert_eq!(rx.recv().unwrap(), 2); + } + + // Cancelling the first half should cancel the second + { + let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map(move |v| { + dtx.send(v).unwrap(); + v + }); + let f = b.select(d); + drop(f); + assert!(drx.recv().is_err()); + assert!(brx.recv().is_err()); + } + + // Cancel after a schedule + { + let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map(move |v| { + dtx.send(v).unwrap(); + v + }); + let mut f = b.select(d); + let _res = noop_waker_lw(|lw| f.poll(lw)); + drop(f); + assert!(drx.recv().is_err()); + assert!(brx.recv().is_err()); + } + + // Cancel propagates + { + let ((a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map(move |v| { + dtx.send(v).unwrap(); + v + }); + let (tx, rx) = channel(); + b.select(d).map(move |_| tx.send(()).unwrap()).forget(); + drop(a); + assert!(drx.recv().is_err()); + assert!(brx.recv().is_err()); + assert!(rx.recv().is_err()); + } + + // Cancel on early drop + { + let (tx, rx) = channel(); + let f = f_ok(1).select(empty::<_, ()>().map(move |()| { + tx.send(()).unwrap(); + 1 + })); + drop(f); + assert!(rx.recv().is_err()); + } +} + +#[test] +fn option() { + assert_eq!(Ok(Some(())), block_on(Some(ok::<(), ()>(())).into_future())); + assert_eq!(Ok::<_, ()>(None::<()>), block_on(None::<FutureResult<(), ()>>.into_future())); +} diff --git a/third_party/rust/futures/tests_disabled/bilock.rs b/third_party/rust/futures/tests_disabled/bilock.rs new file mode 100644 index 0000000000..0166ca48ba --- /dev/null +++ b/third_party/rust/futures/tests_disabled/bilock.rs @@ -0,0 +1,102 @@ +use futures::future; +use futures::stream; +use futures::task; +use futures_util::lock::BiLock; +use std::thread; + +// mod support; +// use support::*; + +#[test] +fn smoke() { + let future = future::lazy(|_| { + let (a, b) = BiLock::new(1); + + { + let mut lock = match a.poll_lock() { + Poll::Ready(l) => l, + Poll::Pending => panic!("poll not ready"), + }; + assert_eq!(*lock, 1); + *lock = 2; + + assert!(b.poll_lock().is_pending()); + assert!(a.poll_lock().is_pending()); + } + + assert!(b.poll_lock().is_ready()); + assert!(a.poll_lock().is_ready()); + + { + let lock = match b.poll_lock() { + Poll::Ready(l) => l, + Poll::Pending => panic!("poll not ready"), + }; + assert_eq!(*lock, 2); + } + + assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2); + + Ok::<(), ()>(()) + }); + + assert!(task::spawn(future) + .poll_future_notify(¬ify_noop(), 0) + .expect("failure in poll") + .is_ready()); +} + +#[test] +fn concurrent() { + const N: usize = 10000; + let (a, b) = BiLock::new(0); + + let a = Increment { a: Some(a), remaining: N }; + let b = stream::iter_ok(0..N).fold(b, |b, _n| { + b.lock().map(|mut b| { + *b += 1; + b.unlock() + }) + }); + + let t1 = thread::spawn(move || a.wait()); + let b = b.wait().expect("b error"); + let a = t1.join().unwrap().expect("a error"); + + match a.poll_lock() { + Poll::Ready(l) => assert_eq!(*l, 2 * N), + Poll::Pending => panic!("poll not ready"), + } + match b.poll_lock() { + Poll::Ready(l) => assert_eq!(*l, 2 * N), + Poll::Pending => panic!("poll not ready"), + } + + assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N); + + struct Increment { + remaining: usize, + a: Option<BiLock<usize>>, + } + + impl Future for Increment { + type Item = BiLock<usize>; + type Error = (); + + fn poll(&mut self) -> Poll<BiLock<usize>, ()> { + loop { + if self.remaining == 0 { + return Ok(self.a.take().unwrap().into()); + } + + let a = self.a.as_ref().unwrap(); + let mut a = match a.poll_lock() { + Poll::Ready(l) => l, + Poll::Pending => return Ok(Poll::Pending), + }; + self.remaining -= 1; + *a += 1; + } + } + } +} diff --git a/third_party/rust/futures/tests_disabled/stream.rs b/third_party/rust/futures/tests_disabled/stream.rs new file mode 100644 index 0000000000..a4eec2c7aa --- /dev/null +++ b/third_party/rust/futures/tests_disabled/stream.rs @@ -0,0 +1,368 @@ +use futures::channel::mpsc; +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{err, ok}; +use futures::stream::{empty, iter_ok, poll_fn, Peekable}; + +// mod support; +// use support::*; + +pub struct Iter<I> { + iter: I, +} + +pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter> +where + J: IntoIterator<Item = Result<T, E>>, +{ + Iter { iter: i.into_iter() } +} + +impl<I, T, E> Stream for Iter<I> +where + I: Iterator<Item = Result<T, E>>, +{ + type Item = T; + type Error = E; + + fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<T>, E> { + match self.iter.next() { + Some(Ok(e)) => Ok(Poll::Ready(Some(e))), + Some(Err(e)) => Err(e), + None => Ok(Poll::Ready(None)), + } + } +} + +fn list() -> Box<Stream<Item = i32, Error = u32> + Send> { + let (tx, rx) = mpsc::channel(1); + tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Ok(3))).forget(); + Box::new(rx.then(|r| r.unwrap())) +} + +fn err_list() -> Box<Stream<Item = i32, Error = u32> + Send> { + let (tx, rx) = mpsc::channel(1); + tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Err(3))).forget(); + Box::new(rx.then(|r| r.unwrap())) +} + +#[test] +fn map() { + assert_done(|| list().map(|a| a + 1).collect(), Ok(vec![2, 3, 4])); +} + +#[test] +fn map_err() { + assert_done(|| err_list().map_err(|a| a + 1).collect::<Vec<_>>(), Err(4)); +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +struct FromErrTest(u32); + +impl From<u32> for FromErrTest { + fn from(i: u32) -> Self { + Self(i) + } +} + +#[test] +fn from_err() { + assert_done(|| err_list().err_into().collect::<Vec<_>>(), Err(FromErrTest(3))); +} + +#[test] +fn fold() { + assert_done(|| list().fold(0, |a, b| ok::<i32, u32>(a + b)), Ok(6)); + assert_done(|| err_list().fold(0, |a, b| ok::<i32, u32>(a + b)), Err(3)); +} + +#[test] +fn filter() { + assert_done(|| list().filter(|a| ok(*a % 2 == 0)).collect(), Ok(vec![2])); +} + +#[test] +fn filter_map() { + assert_done( + || list().filter_map(|x| ok(if x % 2 == 0 { Some(x + 10) } else { None })).collect(), + Ok(vec![12]), + ); +} + +#[test] +fn and_then() { + assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4])); + assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect::<Vec<_>>(), Err(1)); +} + +#[test] +fn then() { + assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4])); +} + +#[test] +fn or_else() { + assert_done(|| err_list().or_else(|a| ok::<i32, u32>(a as i32)).collect(), Ok(vec![1, 2, 3])); +} + +#[test] +fn flatten() { + assert_done(|| list().map(|_| list()).flatten().collect(), Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3])); +} + +#[test] +fn skip() { + assert_done(|| list().skip(2).collect(), Ok(vec![3])); +} + +#[test] +fn skip_passes_errors_through() { + let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1)); + assert_eq!(s.next(), Some(Err(1))); + assert_eq!(s.next(), Some(Err(2))); + assert_eq!(s.next(), Some(Ok(4))); + assert_eq!(s.next(), Some(Ok(5))); + assert_eq!(s.next(), None); +} + +#[test] +fn skip_while() { + assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), Ok(vec![2, 3])); +} +#[test] +fn take() { + assert_done(|| list().take(2).collect(), Ok(vec![1, 2])); +} + +#[test] +fn take_while() { + assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), Ok(vec![1, 2])); +} + +#[test] +fn take_passes_errors_through() { + let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)]).take(1)); + assert_eq!(s.next(), Some(Err(1))); + assert_eq!(s.next(), Some(Err(2))); + assert_eq!(s.next(), Some(Ok(3))); + assert_eq!(s.next(), None); + + let mut s = block_on_stream(iter(vec![Ok(1), Err(2)]).take(1)); + assert_eq!(s.next(), Some(Ok(1))); + assert_eq!(s.next(), None); +} + +#[test] +fn peekable() { + assert_done(|| list().peekable().collect(), Ok(vec![1, 2, 3])); +} + +#[test] +fn fuse() { + let mut stream = block_on_stream(list().fuse()); + assert_eq!(stream.next(), Some(Ok(1))); + assert_eq!(stream.next(), Some(Ok(2))); + assert_eq!(stream.next(), Some(Ok(3))); + assert_eq!(stream.next(), None); + assert_eq!(stream.next(), None); + assert_eq!(stream.next(), None); +} + +#[test] +fn buffered() { + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::<u32>(); + let (c, d) = oneshot::channel::<u32>(); + + tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) + .forget(); + + let mut rx = rx.buffered(2); + sassert_empty(&mut rx); + c.send(3).unwrap(); + sassert_empty(&mut rx); + a.send(5).unwrap(); + let mut rx = block_on_stream(rx); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), Some(Ok(3))); + assert_eq!(rx.next(), None); + + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::<u32>(); + let (c, d) = oneshot::channel::<u32>(); + + tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) + .forget(); + + let mut rx = rx.buffered(1); + sassert_empty(&mut rx); + c.send(3).unwrap(); + sassert_empty(&mut rx); + a.send(5).unwrap(); + let mut rx = block_on_stream(rx); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), Some(Ok(3))); + assert_eq!(rx.next(), None); +} + +#[test] +fn unordered() { + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::<u32>(); + let (c, d) = oneshot::channel::<u32>(); + + tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>) + .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) + .forget(); + + let mut rx = rx.buffer_unordered(2); + sassert_empty(&mut rx); + let mut rx = block_on_stream(rx); + c.send(3).unwrap(); + assert_eq!(rx.next(), Some(Ok(3))); + a.send(5).unwrap(); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), None); + + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::<u32>(); + let (c, d) = oneshot::channel::<u32>(); + + tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>) + .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) + .forget(); + + // We don't even get to see `c` until `a` completes. + let mut rx = rx.buffer_unordered(1); + sassert_empty(&mut rx); + c.send(3).unwrap(); + sassert_empty(&mut rx); + a.send(5).unwrap(); + let mut rx = block_on_stream(rx); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), Some(Ok(3))); + assert_eq!(rx.next(), None); +} + +#[test] +fn zip() { + assert_done(|| list().zip(list()).collect(), Ok(vec![(1, 1), (2, 2), (3, 3)])); + assert_done(|| list().zip(list().take(2)).collect(), Ok(vec![(1, 1), (2, 2)])); + assert_done(|| list().take(2).zip(list()).collect(), Ok(vec![(1, 1), (2, 2)])); + assert_done(|| err_list().zip(list()).collect::<Vec<_>>(), Err(3)); + assert_done(|| list().zip(list().map(|x| x + 1)).collect(), Ok(vec![(1, 2), (2, 3), (3, 4)])); +} + +#[test] +fn peek() { + struct Peek { + inner: Peekable<Box<Stream<Item = i32, Error = u32> + Send>>, + } + + impl Future for Peek { + type Item = (); + type Error = u32; + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll<(), u32> { + { + let res = ready!(self.inner.peek(cx))?; + assert_eq!(res, Some(&1)); + } + assert_eq!(self.inner.peek(cx).unwrap(), Some(&1).into()); + assert_eq!(self.inner.poll_next(cx).unwrap(), Some(1).into()); + Ok(Poll::Ready(())) + } + } + + block_on(Peek { inner: list().peekable() }).unwrap() +} + +#[test] +fn wait() { + assert_eq!(block_on_stream(list()).collect::<Result<Vec<_>, _>>(), Ok(vec![1, 2, 3])); +} + +#[test] +fn chunks() { + assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]])); + assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]])); + assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]])); + let mut list = block_on_stream(err_list().chunks(3)); + let i = list.next().unwrap().unwrap(); + assert_eq!(i, vec![1, 2]); + let i = list.next().unwrap().unwrap_err(); + assert_eq!(i, 3); +} + +#[test] +#[should_panic] +fn chunks_panic_on_cap_zero() { + let _ = list().chunks(0); +} + +#[test] +fn forward() { + let v = Vec::new(); + let v = block_on(iter_ok::<_, Never>(vec![0, 1]).forward(v)).unwrap().1; + assert_eq!(v, vec![0, 1]); + + let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1; + assert_eq!(v, vec![0, 1, 2, 3]); + + assert_done( + move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s), + Ok(vec![0, 1, 2, 3, 4, 5]), + ); +} + +#[test] +fn concat() { + let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]); + assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])); + + let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]); + assert_done(move || b.concat(), Err(())); +} + +#[test] +fn concat2() { + let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]); + assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])); + + let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]); + assert_done(move || b.concat(), Err(())); + + let c = empty::<Vec<()>, ()>(); + assert_done(move || c.concat(), Ok(vec![])) +} + +#[test] +fn stream_poll_fn() { + let mut counter = 5usize; + + let read_stream = poll_fn(move |_| -> Poll<Option<usize>, std::io::Error> { + if counter == 0 { + return Ok(Poll::Ready(None)); + } + counter -= 1; + Ok(Poll::Ready(Some(counter))) + }); + + assert_eq!(block_on_stream(read_stream).count(), 5); +} + +#[test] +fn inspect() { + let mut seen = vec![]; + assert_done(|| list().inspect(|&a| seen.push(a)).collect(), Ok(vec![1, 2, 3])); + assert_eq!(seen, [1, 2, 3]); +} + +#[test] +fn inspect_err() { + let mut seen = vec![]; + assert_done(|| err_list().inspect_err(|&a| seen.push(a)).collect::<Vec<_>>(), Err(3)); + assert_eq!(seen, [3]); +} |