diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/futures-0.1.31/tests | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-0.1.31/tests')
28 files changed, 4606 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/tests/all.rs b/third_party/rust/futures-0.1.31/tests/all.rs new file mode 100644 index 0000000000..40e402f553 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/all.rs @@ -0,0 +1,377 @@ +#![allow(bare_trait_objects, unknown_lints)] + +extern crate futures; + +use std::sync::mpsc::{channel, TryRecvError}; + +use futures::future::*; +use futures::future; +use futures::executor; +use futures::sync::oneshot::{self, Canceled}; + +mod support; +use support::*; + +fn unselect<T, U, E>(r: Result<(T, U), (E, U)>) -> Result<T, E> { + match r { + Ok((t, _)) => Ok(t), + Err((e, _)) => Err(e), + } +} + +#[test] +fn result_smoke() { + fn is_future_v<A, B, C>(_: C) + where A: Send + 'static, + B: Send + 'static, + C: Future<Item=A, Error=B> + {} + + is_future_v::<i32, u32, _>(f_ok(1).map(|a| a + 1)); + is_future_v::<i32, u32, _>(f_ok(1).map_err(|a| a + 1)); + is_future_v::<i32, u32, _>(f_ok(1).and_then(Ok)); + is_future_v::<i32, u32, _>(f_ok(1).or_else(Err)); + is_future_v::<(i32, i32), u32, _>(f_ok(1).join(Err(3))); + is_future_v::<i32, u32, _>(f_ok(1).map(f_ok).flatten()); + + assert_done(|| f_ok(1), r_ok(1)); + assert_done(|| f_err(1), r_err(1)); + assert_done(|| result(Ok(1)), r_ok(1)); + assert_done(|| result(Err(1)), r_err(1)); + assert_done(|| ok(1), r_ok(1)); + assert_done(|| err(1), r_err(1)); + assert_done(|| f_ok(1).map(|a| a + 2), r_ok(3)); + assert_done(|| f_err(1).map(|a| a + 2), r_err(1)); + assert_done(|| f_ok(1).map_err(|a| a + 2), r_ok(1)); + assert_done(|| f_err(1).map_err(|a| a + 2), r_err(3)); + assert_done(|| f_ok(1).and_then(|a| Ok(a + 2)), r_ok(3)); + assert_done(|| f_err(1).and_then(|a| Ok(a + 2)), r_err(1)); + assert_done(|| f_ok(1).and_then(|a| Err(a as u32 + 3)), r_err(4)); + assert_done(|| f_err(1).and_then(|a| Err(a as u32 + 4)), r_err(1)); + assert_done(|| f_ok(1).or_else(|a| Ok(a as i32 + 2)), r_ok(1)); + assert_done(|| f_err(1).or_else(|a| Ok(a as i32 + 2)), r_ok(3)); + assert_done(|| f_ok(1).or_else(|a| Err(a + 3)), r_ok(1)); + assert_done(|| f_err(1).or_else(|a| Err(a + 4)), r_err(5)); + assert_done(|| f_ok(1).select(f_err(2)).then(unselect), r_ok(1)); + assert_done(|| f_ok(1).select(Ok(2)).then(unselect), r_ok(1)); + assert_done(|| f_err(1).select(f_ok(1)).then(unselect), r_err(1)); + assert_done(|| f_ok(1).select(empty()).then(unselect), Ok(1)); + assert_done(|| empty().select(f_ok(1)).then(unselect), Ok(1)); + assert_done(|| f_ok(1).join(f_err(1)), Err(1)); + assert_done(|| f_ok(1).join(Ok(2)), Ok((1, 2))); + assert_done(|| f_err(1).join(f_ok(1)), Err(1)); + assert_done(|| f_ok(1).then(|_| Ok(2)), r_ok(2)); + assert_done(|| f_ok(1).then(|_| Err(2)), r_err(2)); + assert_done(|| f_err(1).then(|_| Ok(2)), r_ok(2)); + assert_done(|| f_err(1).then(|_| Err(2)), r_err(2)); +} + +#[test] +fn test_empty() { + fn empty() -> Empty<i32, u32> { future::empty() } + + assert_empty(|| empty()); + assert_empty(|| empty().select(empty())); + assert_empty(|| empty().join(empty())); + assert_empty(|| empty().join(f_ok(1))); + assert_empty(|| f_ok(1).join(empty())); + assert_empty(|| empty().or_else(move |_| empty())); + assert_empty(|| empty().and_then(move |_| empty())); + assert_empty(|| f_err(1).or_else(move |_| empty())); + assert_empty(|| f_ok(1).and_then(move |_| empty())); + assert_empty(|| empty().map(|a| a + 1)); + assert_empty(|| empty().map_err(|a| a + 1)); + assert_empty(|| empty().then(|a| a)); +} + +#[test] +fn test_ok() { + assert_done(|| ok(1), r_ok(1)); + assert_done(|| err(1), r_err(1)); +} + +#[test] +fn flatten() { + fn ok<T: Send + 'static>(a: T) -> FutureResult<T, u32> { + future::ok(a) + } + fn err<E: Send + 'static>(b: E) -> FutureResult<i32, E> { + future::err(b) + } + + assert_done(|| ok(ok(1)).flatten(), r_ok(1)); + assert_done(|| ok(err(1)).flatten(), r_err(1)); + assert_done(|| err(1u32).map(ok).flatten(), r_err(1)); + assert_done(|| future::ok::<_, u8>(future::ok::<_, u32>(1)) + .flatten(), r_ok(1)); + assert_empty(|| ok(empty::<i32, u32>()).flatten()); + assert_empty(|| empty::<i32, u32>().map(ok).flatten()); +} + +#[test] +fn smoke_oneshot() { + assert_done(|| { + let (c, p) = oneshot::channel(); + c.send(1).unwrap(); + p + }, Ok(1)); + assert_done(|| { + let (c, p) = oneshot::channel::<i32>(); + drop(c); + p + }, Err(Canceled)); + let mut completes = Vec::new(); + assert_empty(|| { + let (a, b) = oneshot::channel::<i32>(); + completes.push(a); + b + }); + + let (c, p) = oneshot::channel::<i32>(); + drop(c); + let res = executor::spawn(p).poll_future_notify(¬ify_panic(), 0); + assert!(res.is_err()); + let (c, p) = oneshot::channel::<i32>(); + drop(c); + let (tx, rx) = channel(); + p.then(move |_| { + tx.send(()) + }).forget(); + rx.recv().unwrap(); +} + +#[test] +fn select_cancels() { + let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |b| { btx.send(b).unwrap(); b }); + let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + + let f = b.select(d).then(unselect); + // assert!(f.poll(&mut Task::new()).is_not_ready()); + assert!(brx.try_recv().is_err()); + assert!(drx.try_recv().is_err()); + a.send(1).unwrap(); + let res = executor::spawn(f).poll_future_notify(¬ify_panic(), 0); + assert!(res.ok().unwrap().is_ready()); + assert_eq!(brx.recv().unwrap(), 1); + drop(c); + assert!(drx.recv().is_err()); + + let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let ((btx, _brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |b| { btx.send(b).unwrap(); b }); + let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + + let mut f = executor::spawn(b.select(d).then(unselect)); + assert!(f.poll_future_notify(¬ify_noop(), 0).ok().unwrap().is_not_ready()); + assert!(f.poll_future_notify(¬ify_noop(), 0).ok().unwrap().is_not_ready()); + a.send(1).unwrap(); + assert!(f.poll_future_notify(¬ify_panic(), 0).ok().unwrap().is_ready()); + drop((c, f)); + assert!(drx.recv().is_err()); +} + +#[test] +fn join_cancels() { + let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let ((btx, _brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |b| { btx.send(b).unwrap(); b }); + let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + + let f = b.join(d); + drop(a); + let res = executor::spawn(f).poll_future_notify(¬ify_panic(), 0); + assert!(res.is_err()); + drop(c); + assert!(drx.recv().is_err()); + + let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let ((btx, _brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |b| { btx.send(b).unwrap(); b }); + let d = d.map(move |d| { dtx.send(d).unwrap(); d }); + + let (tx, rx) = channel(); + let f = b.join(d); + f.then(move |_| { + tx.send(()).unwrap(); + let res: Result<(), ()> = Ok(()); + res + }).forget(); + assert!(rx.try_recv().is_err()); + drop(a); + rx.recv().unwrap(); + drop(c); + assert!(drx.recv().is_err()); +} + +#[test] +fn join_incomplete() { + let (a, b) = oneshot::channel::<i32>(); + let (tx, rx) = channel(); + let mut f = executor::spawn(ok(1).join(b).map(move |r| tx.send(r).unwrap())); + assert!(f.poll_future_notify(¬ify_noop(), 0).ok().unwrap().is_not_ready()); + assert!(rx.try_recv().is_err()); + a.send(2).unwrap(); + assert!(f.poll_future_notify(¬ify_noop(), 0).ok().unwrap().is_ready()); + assert_eq!(rx.recv().unwrap(), (1, 2)); + + let (a, b) = oneshot::channel::<i32>(); + let (tx, rx) = channel(); + let mut f = executor::spawn(b.join(Ok(2)).map(move |r| tx.send(r).unwrap())); + assert!(f.poll_future_notify(¬ify_noop(), 0).ok().unwrap().is_not_ready()); + assert!(rx.try_recv().is_err()); + a.send(1).unwrap(); + assert!(f.poll_future_notify(¬ify_noop(), 0).ok().unwrap().is_ready()); + assert_eq!(rx.recv().unwrap(), (1, 2)); + + let (a, b) = oneshot::channel::<i32>(); + let (tx, rx) = channel(); + let mut f = executor::spawn(ok(1).join(b).map_err(move |_r| tx.send(2).unwrap())); + assert!(f.poll_future_notify(¬ify_noop(), 0).ok().unwrap().is_not_ready()); + assert!(rx.try_recv().is_err()); + drop(a); + assert!(f.poll_future_notify(¬ify_noop(), 0).is_err()); + assert_eq!(rx.recv().unwrap(), 2); + + let (a, b) = oneshot::channel::<i32>(); + let (tx, rx) = channel(); + let mut f = executor::spawn(b.join(Ok(2)).map_err(move |_r| tx.send(1).unwrap())); + assert!(f.poll_future_notify(¬ify_noop(), 0).ok().unwrap().is_not_ready()); + assert!(rx.try_recv().is_err()); + drop(a); + assert!(f.poll_future_notify(¬ify_noop(), 0).is_err()); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn collect_collects() { + assert_done(|| join_all(vec![f_ok(1), f_ok(2)]), Ok(vec![1, 2])); + assert_done(|| join_all(vec![f_ok(1)]), Ok(vec![1])); + assert_done(|| join_all(Vec::<Result<i32, u32>>::new()), Ok(vec![])); + + // TODO: needs more tests +} + +#[test] +fn select2() { + fn d<T, U, E>(r: Result<(T, U), (E, U)>) -> Result<T, E> { + match r { + Ok((t, _u)) => Ok(t), + Err((e, _u)) => Err(e), + } + } + + assert_done(|| f_ok(2).select(empty()).then(d), Ok(2)); + assert_done(|| empty().select(f_ok(2)).then(d), Ok(2)); + assert_done(|| f_err(2).select(empty()).then(d), Err(2)); + assert_done(|| empty().select(f_err(2)).then(d), Err(2)); + + assert_done(|| { + f_ok(1).select(f_ok(2)) + .map_err(|_| 0) + .and_then(|(a, b)| b.map(move |b| a + b)) + }, Ok(3)); + + // Finish one half of a select and then fail the second, ensuring that we + // get the notification of the second one. + { + let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let f = b.select(d); + let (tx, rx) = channel(); + f.map(move |r| tx.send(r).unwrap()).forget(); + a.send(1).unwrap(); + let (val, next) = rx.recv().unwrap(); + assert_eq!(val, 1); + let (tx, rx) = channel(); + next.map_err(move |_r| tx.send(2).unwrap()).forget(); + assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty); + drop(c); + assert_eq!(rx.recv().unwrap(), 2); + } + + // Fail the second half and ensure that we see the first one finish + { + let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let f = b.select(d); + let (tx, rx) = channel(); + f.map_err(move |r| tx.send((1, r.1)).unwrap()).forget(); + drop(c); + let (val, next) = rx.recv().unwrap(); + assert_eq!(val, 1); + let (tx, rx) = channel(); + next.map(move |r| tx.send(r).unwrap()).forget(); + assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty); + a.send(2).unwrap(); + assert_eq!(rx.recv().unwrap(), 2); + } + + // Cancelling the first half should cancel the second + { + let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |v| { btx.send(v).unwrap(); v }); + let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let f = b.select(d); + drop(f); + assert!(drx.recv().is_err()); + assert!(brx.recv().is_err()); + } + + // Cancel after a schedule + { + let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |v| { btx.send(v).unwrap(); v }); + let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let f = b.select(d); + drop(executor::spawn(f).poll_future_notify(&support::notify_noop(), 0)); + assert!(drx.recv().is_err()); + assert!(brx.recv().is_err()); + } + + // Cancel propagates + { + let ((a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |v| { btx.send(v).unwrap(); v }); + let d = d.map(move |v| { dtx.send(v).unwrap(); v }); + let (tx, rx) = channel(); + b.select(d).map(move |_| tx.send(()).unwrap()).forget(); + drop(a); + assert!(drx.recv().is_err()); + assert!(brx.recv().is_err()); + assert!(rx.recv().is_err()); + } + + // Cancel on early drop + { + let (tx, rx) = channel(); + let f = f_ok(1).select(empty().map(move |()| { + tx.send(()).unwrap(); + 1 + })); + drop(f); + assert!(rx.recv().is_err()); + } +} + +#[test] +fn option() { + assert_eq!(Ok(Some(())), Some(ok::<(), ()>(())).wait()); + assert_eq!(Ok(None), <Option<FutureResult<(), ()>> as Future>::wait(None)); +} + +#[test] +fn spawn_does_unsize() { + #[derive(Clone, Copy)] + struct EmptyNotify; + impl executor::Notify for EmptyNotify { + fn notify(&self, _: usize) { panic!("Cannot notify"); } + } + static EMPTY: &'static EmptyNotify = &EmptyNotify; + + let spawn: executor::Spawn<FutureResult<(), ()>> = executor::spawn(future::ok(())); + let mut spawn_box: Box<executor::Spawn<Future<Item = (), Error = ()>>> = Box::new(spawn); + spawn_box.poll_future_notify(&EMPTY, 0).unwrap(); +} diff --git a/third_party/rust/futures-0.1.31/tests/bilock.rs b/third_party/rust/futures-0.1.31/tests/bilock.rs new file mode 100644 index 0000000000..1658bdae27 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/bilock.rs @@ -0,0 +1,111 @@ +#![allow(bare_trait_objects, unknown_lints)] + +extern crate futures; + +use std::thread; + +use futures::prelude::*; +use futures::executor; +use futures::stream; +use futures::future; +use futures::sync::BiLock; + +mod support; +use support::*; + +#[test] +fn smoke() { + let future = future::lazy(|| { + let (a, b) = BiLock::new(1); + + { + let mut lock = match a.poll_lock() { + Async::Ready(l) => l, + Async::NotReady => panic!("poll not ready"), + }; + assert_eq!(*lock, 1); + *lock = 2; + + assert!(b.poll_lock().is_not_ready()); + assert!(a.poll_lock().is_not_ready()); + } + + assert!(b.poll_lock().is_ready()); + assert!(a.poll_lock().is_ready()); + + { + let lock = match b.poll_lock() { + Async::Ready(l) => l, + Async::NotReady => panic!("poll not ready"), + }; + assert_eq!(*lock, 2); + } + + assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2); + + Ok::<(), ()>(()) + }); + + assert!(executor::spawn(future) + .poll_future_notify(¬ify_noop(), 0) + .expect("failure in poll") + .is_ready()); +} + +#[test] +fn concurrent() { + const N: usize = 10000; + let (a, b) = BiLock::new(0); + + let a = Increment { + a: Some(a), + remaining: N, + }; + let b = stream::iter_ok::<_, ()>((0..N)).fold(b, |b, _n| { + b.lock().map(|mut b| { + *b += 1; + b.unlock() + }) + }); + + let t1 = thread::spawn(move || a.wait()); + let b = b.wait().expect("b error"); + let a = t1.join().unwrap().expect("a error"); + + match a.poll_lock() { + Async::Ready(l) => assert_eq!(*l, 2 * N), + Async::NotReady => panic!("poll not ready"), + } + match b.poll_lock() { + Async::Ready(l) => assert_eq!(*l, 2 * N), + Async::NotReady => panic!("poll not ready"), + } + + assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N); + + struct Increment { + remaining: usize, + a: Option<BiLock<usize>>, + } + + impl Future for Increment { + type Item = BiLock<usize>; + type Error = (); + + fn poll(&mut self) -> Poll<BiLock<usize>, ()> { + loop { + if self.remaining == 0 { + return Ok(self.a.take().unwrap().into()) + } + + let a = self.a.as_ref().unwrap(); + let mut a = match a.poll_lock() { + Async::Ready(l) => l, + Async::NotReady => return Ok(Async::NotReady), + }; + self.remaining -= 1; + *a += 1; + } + } + } +} diff --git a/third_party/rust/futures-0.1.31/tests/buffer_unordered.rs b/third_party/rust/futures-0.1.31/tests/buffer_unordered.rs new file mode 100644 index 0000000000..005bbd9835 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/buffer_unordered.rs @@ -0,0 +1,74 @@ +extern crate futures; + +use std::sync::mpsc as std_mpsc; +use std::thread; + +use futures::prelude::*; +use futures::sync::oneshot; +use futures::sync::mpsc; + +#[test] +fn works() { + const N: usize = 4; + + let (mut tx, rx) = mpsc::channel(1); + + let (tx2, rx2) = std_mpsc::channel(); + let (tx3, rx3) = std_mpsc::channel(); + let t1 = thread::spawn(move || { + for _ in 0..N+1 { + let (mytx, myrx) = oneshot::channel(); + tx = tx.send(myrx).wait().unwrap(); + tx3.send(mytx).unwrap(); + } + rx2.recv().unwrap(); + for _ in 0..N { + let (mytx, myrx) = oneshot::channel(); + tx = tx.send(myrx).wait().unwrap(); + tx3.send(mytx).unwrap(); + } + }); + + let (tx4, rx4) = std_mpsc::channel(); + let t2 = thread::spawn(move || { + for item in rx.map_err(|_| panic!()).buffer_unordered(N).wait() { + tx4.send(item.unwrap()).unwrap(); + } + }); + + let o1 = rx3.recv().unwrap(); + let o2 = rx3.recv().unwrap(); + let o3 = rx3.recv().unwrap(); + let o4 = rx3.recv().unwrap(); + assert!(rx4.try_recv().is_err()); + + o1.send(1).unwrap(); + assert_eq!(rx4.recv(), Ok(1)); + o3.send(3).unwrap(); + assert_eq!(rx4.recv(), Ok(3)); + tx2.send(()).unwrap(); + o2.send(2).unwrap(); + assert_eq!(rx4.recv(), Ok(2)); + o4.send(4).unwrap(); + assert_eq!(rx4.recv(), Ok(4)); + + let o5 = rx3.recv().unwrap(); + let o6 = rx3.recv().unwrap(); + let o7 = rx3.recv().unwrap(); + let o8 = rx3.recv().unwrap(); + let o9 = rx3.recv().unwrap(); + + o5.send(5).unwrap(); + assert_eq!(rx4.recv(), Ok(5)); + o8.send(8).unwrap(); + assert_eq!(rx4.recv(), Ok(8)); + o9.send(9).unwrap(); + assert_eq!(rx4.recv(), Ok(9)); + o7.send(7).unwrap(); + assert_eq!(rx4.recv(), Ok(7)); + o6.send(6).unwrap(); + assert_eq!(rx4.recv(), Ok(6)); + + t1.join().unwrap(); + t2.join().unwrap(); +} diff --git a/third_party/rust/futures-0.1.31/tests/channel.rs b/third_party/rust/futures-0.1.31/tests/channel.rs new file mode 100644 index 0000000000..7940de4509 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/channel.rs @@ -0,0 +1,75 @@ +#![allow(bare_trait_objects, unknown_lints)] + +extern crate futures; + +use std::sync::atomic::*; + +use futures::prelude::*; +use futures::future::result; +use futures::sync::mpsc; + +mod support; +use support::*; + +#[test] +fn sequence() { + let (tx, mut rx) = mpsc::channel(1); + + sassert_empty(&mut rx); + sassert_empty(&mut rx); + + let amt = 20; + send(amt, tx).forget(); + let mut rx = rx.wait(); + for i in (1..amt + 1).rev() { + assert_eq!(rx.next(), Some(Ok(i))); + } + assert_eq!(rx.next(), None); + + fn send(n: u32, sender: mpsc::Sender<u32>) + -> Box<Future<Item=(), Error=()> + Send> { + if n == 0 { + return Box::new(result(Ok(()))) + } + Box::new(sender.send(n).map_err(|_| ()).and_then(move |sender| { + send(n - 1, sender) + })) + } +} + +#[test] +fn drop_sender() { + let (tx, mut rx) = mpsc::channel::<u32>(1); + drop(tx); + sassert_done(&mut rx); +} + +#[test] +fn drop_rx() { + let (tx, rx) = mpsc::channel::<u32>(1); + let tx = tx.send(1).wait().ok().unwrap(); + drop(rx); + assert!(tx.send(1).wait().is_err()); +} + +#[test] +fn drop_order() { + #[allow(deprecated)] + static DROPS: AtomicUsize = ATOMIC_USIZE_INIT; + let (tx, rx) = mpsc::channel(1); + + struct A; + + impl Drop for A { + fn drop(&mut self) { + DROPS.fetch_add(1, Ordering::SeqCst); + } + } + + let tx = tx.send(A).wait().unwrap(); + assert_eq!(DROPS.load(Ordering::SeqCst), 0); + drop(rx); + assert_eq!(DROPS.load(Ordering::SeqCst), 1); + assert!(tx.send(A).wait().is_err()); + assert_eq!(DROPS.load(Ordering::SeqCst), 2); +} diff --git a/third_party/rust/futures-0.1.31/tests/eager_drop.rs b/third_party/rust/futures-0.1.31/tests/eager_drop.rs new file mode 100644 index 0000000000..79f94d5ddc --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/eager_drop.rs @@ -0,0 +1,82 @@ +extern crate futures; + +use std::sync::mpsc::channel; + +use futures::prelude::*; +use futures::sync::oneshot; +use futures::future::{err, ok}; + +mod support; +use support::*; + +#[test] +fn map() { + // Whatever runs after a `map` should have dropped the closure by that + // point. + let (tx, rx) = channel::<()>(); + let (tx2, rx2) = channel(); + err::<i32, i32>(1).map(move |a| { drop(tx); a }).map_err(move |_| { + assert!(rx.recv().is_err()); + tx2.send(()).unwrap() + }).forget(); + rx2.recv().unwrap(); +} + +#[test] +fn map_err() { + // Whatever runs after a `map_err` should have dropped the closure by that + // point. + let (tx, rx) = channel::<()>(); + let (tx2, rx2) = channel(); + ok::<i32, i32>(1).map_err(move |a| { drop(tx); a }).map(move |_| { + assert!(rx.recv().is_err()); + tx2.send(()).unwrap() + }).forget(); + rx2.recv().unwrap(); +} + +struct FutureData<F, T> { + _data: T, + future: F, +} + +impl<F: Future, T: Send + 'static> Future for FutureData<F, T> { + type Item = F::Item; + type Error = F::Error; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + self.future.poll() + } +} + +#[test] +fn and_then_drops_eagerly() { + let (c, p) = oneshot::channel::<()>(); + let (tx, rx) = channel::<()>(); + let (tx2, rx2) = channel(); + FutureData { _data: tx, future: p }.and_then(move |_| { + assert!(rx.recv().is_err()); + tx2.send(()).unwrap(); + ok(1) + }).forget(); + assert!(rx2.try_recv().is_err()); + c.send(()).unwrap(); + rx2.recv().unwrap(); +} + +// #[test] +// fn or_else_drops_eagerly() { +// let (p1, c1) = oneshot::<(), ()>(); +// let (p2, c2) = oneshot::<(), ()>(); +// let (tx, rx) = channel::<()>(); +// let (tx2, rx2) = channel(); +// p1.map(move |a| { drop(tx); a }).or_else(move |_| { +// assert!(rx.recv().is_err()); +// p2 +// }).map(move |_| tx2.send(()).unwrap()).forget(); +// assert!(rx2.try_recv().is_err()); +// c1.fail(()); +// assert!(rx2.try_recv().is_err()); +// c2.finish(()); +// rx2.recv().unwrap(); +// } diff --git a/third_party/rust/futures-0.1.31/tests/eventual.rs b/third_party/rust/futures-0.1.31/tests/eventual.rs new file mode 100644 index 0000000000..fc484aaad2 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/eventual.rs @@ -0,0 +1,320 @@ +extern crate futures; + +mod support; +use support::*; + +use std::sync::mpsc; +use std::thread; + +use futures::prelude::*; +use futures::future::{ok, err}; +use futures::sync::oneshot; + +#[test] +fn and_then1() { + let (tx, rx) = mpsc::channel(); + + let tx2 = tx.clone(); + let p1 = ok::<_, i32>("a").then(move |t| { tx2.send("first").unwrap(); t }); + let tx2 = tx.clone(); + let p2 = ok("b").then(move |t| { tx2.send("second").unwrap(); t }); + let f = p1.and_then(|_| p2); + + assert!(rx.try_recv().is_err()); + f.map(move |s| tx.send(s).unwrap()).forget(); + assert_eq!(rx.recv(), Ok("first")); + assert_eq!(rx.recv(), Ok("second")); + assert_eq!(rx.recv(), Ok("b")); + assert!(rx.recv().is_err()); +} + +#[test] +fn and_then2() { + let (tx, rx) = mpsc::channel(); + + let tx2 = tx.clone(); + let p1 = err::<i32, _>(2).then(move |t| { tx2.send("first").unwrap(); t }); + let tx2 = tx.clone(); + let p2 = ok("b").then(move |t| { tx2.send("second").unwrap(); t }); + let f = p1.and_then(|_| p2); + + assert!(rx.try_recv().is_err()); + f.map_err(|_| drop(tx)).forget(); + assert_eq!(rx.recv(), Ok("first")); + assert!(rx.recv().is_err()); +} + +#[test] +fn oneshot1() { + let (c, p) = oneshot::channel::<i32>(); + let t = thread::spawn(|| c.send(1).unwrap()); + + let (tx, rx) = mpsc::channel(); + p.map(move |e| tx.send(e).unwrap()).forget(); + assert_eq!(rx.recv(), Ok(1)); + t.join().unwrap(); +} + +#[test] +fn oneshot2() { + let (c, p) = oneshot::channel::<i32>(); + let t = thread::spawn(|| c.send(1).unwrap()); + t.join().unwrap(); + + let (tx, rx) = mpsc::channel(); + p.map(move |e| tx.send(e).unwrap()).forget(); + assert_eq!(rx.recv(), Ok(1)); +} + +#[test] +fn oneshot3() { + let (c, p) = oneshot::channel::<i32>(); + let (tx, rx) = mpsc::channel(); + p.map(move |e| tx.send(e).unwrap()).forget(); + + let t = thread::spawn(|| c.send(1).unwrap()); + t.join().unwrap(); + + assert_eq!(rx.recv(), Ok(1)); +} + +#[test] +fn oneshot4() { + let (c, p) = oneshot::channel::<i32>(); + drop(c); + + let (tx, rx) = mpsc::channel(); + p.map(move |e| tx.send(e).unwrap()).forget(); + assert!(rx.recv().is_err()); +} + +#[test] +fn oneshot5() { + let (c, p) = oneshot::channel::<i32>(); + let t = thread::spawn(|| drop(c)); + let (tx, rx) = mpsc::channel(); + p.map(move |t| tx.send(t).unwrap()).forget(); + t.join().unwrap(); + assert!(rx.recv().is_err()); +} + +#[test] +fn oneshot6() { + let (c, p) = oneshot::channel::<i32>(); + drop(p); + c.send(2).unwrap_err(); +} + +#[test] +fn cancel1() { + let (c, p) = oneshot::channel::<i32>(); + drop(c); + p.map(|_| panic!()).forget(); +} + +#[test] +fn map_err1() { + ok::<i32, i32>(1).map_err(|_| panic!()).forget(); +} + +#[test] +fn map_err2() { + let (tx, rx) = mpsc::channel(); + err::<i32, i32>(1).map_err(move |v| tx.send(v).unwrap()).forget(); + assert_eq!(rx.recv(), Ok(1)); + assert!(rx.recv().is_err()); +} + +#[test] +fn map_err3() { + let (c, p) = oneshot::channel::<i32>(); + p.map_err(|_| {}).forget(); + drop(c); +} + +#[test] +fn or_else1() { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + + let (tx, rx) = mpsc::channel(); + let tx2 = tx.clone(); + let p1 = p1.map_err(move |i| { tx2.send(2).unwrap(); i }); + let tx2 = tx.clone(); + let p2 = p2.map(move |i| { tx2.send(i).unwrap(); i }); + + assert!(rx.try_recv().is_err()); + drop(c1); + c2.send(3).unwrap(); + p1.or_else(|_| p2).map(move |v| tx.send(v).unwrap()).forget(); + + assert_eq!(rx.recv(), Ok(2)); + assert_eq!(rx.recv(), Ok(3)); + assert_eq!(rx.recv(), Ok(3)); + assert!(rx.recv().is_err()); +} + +#[test] +fn or_else2() { + let (c1, p1) = oneshot::channel::<i32>(); + + let (tx, rx) = mpsc::channel(); + + p1.or_else(move |_| { + tx.send(()).unwrap(); + ok::<i32, i32>(1) + }).forget(); + + c1.send(2).unwrap(); + assert!(rx.recv().is_err()); +} + +#[test] +fn join1() { + let (tx, rx) = mpsc::channel(); + ok::<i32, i32>(1).join(ok(2)) + .map(move |v| tx.send(v).unwrap()) + .forget(); + assert_eq!(rx.recv(), Ok((1, 2))); + assert!(rx.recv().is_err()); +} + +#[test] +fn join2() { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + let (tx, rx) = mpsc::channel(); + p1.join(p2).map(move |v| tx.send(v).unwrap()).forget(); + assert!(rx.try_recv().is_err()); + c1.send(1).unwrap(); + assert!(rx.try_recv().is_err()); + c2.send(2).unwrap(); + assert_eq!(rx.recv(), Ok((1, 2))); + assert!(rx.recv().is_err()); +} + +#[test] +fn join3() { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + let (tx, rx) = mpsc::channel(); + p1.join(p2).map_err(move |_v| tx.send(1).unwrap()).forget(); + assert!(rx.try_recv().is_err()); + drop(c1); + assert_eq!(rx.recv(), Ok(1)); + assert!(rx.recv().is_err()); + drop(c2); +} + +#[test] +fn join4() { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + let (tx, rx) = mpsc::channel(); + p1.join(p2).map_err(move |v| tx.send(v).unwrap()).forget(); + assert!(rx.try_recv().is_err()); + drop(c1); + assert!(rx.recv().is_ok()); + drop(c2); + assert!(rx.recv().is_err()); +} + +#[test] +fn join5() { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + let (c3, p3) = oneshot::channel::<i32>(); + let (tx, rx) = mpsc::channel(); + p1.join(p2).join(p3).map(move |v| tx.send(v).unwrap()).forget(); + assert!(rx.try_recv().is_err()); + c1.send(1).unwrap(); + assert!(rx.try_recv().is_err()); + c2.send(2).unwrap(); + assert!(rx.try_recv().is_err()); + c3.send(3).unwrap(); + assert_eq!(rx.recv(), Ok(((1, 2), 3))); + assert!(rx.recv().is_err()); +} + +#[test] +fn select1() { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + let (tx, rx) = mpsc::channel(); + p1.select(p2).map(move |v| tx.send(v).unwrap()).forget(); + assert!(rx.try_recv().is_err()); + c1.send(1).unwrap(); + let (v, p2) = rx.recv().unwrap(); + assert_eq!(v, 1); + assert!(rx.recv().is_err()); + + let (tx, rx) = mpsc::channel(); + p2.map(move |v| tx.send(v).unwrap()).forget(); + c2.send(2).unwrap(); + assert_eq!(rx.recv(), Ok(2)); + assert!(rx.recv().is_err()); +} + +#[test] +fn select2() { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + let (tx, rx) = mpsc::channel(); + p1.select(p2).map_err(move |v| tx.send((1, v.1)).unwrap()).forget(); + assert!(rx.try_recv().is_err()); + drop(c1); + let (v, p2) = rx.recv().unwrap(); + assert_eq!(v, 1); + assert!(rx.recv().is_err()); + + let (tx, rx) = mpsc::channel(); + p2.map(move |v| tx.send(v).unwrap()).forget(); + c2.send(2).unwrap(); + assert_eq!(rx.recv(), Ok(2)); + assert!(rx.recv().is_err()); +} + +#[test] +fn select3() { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + let (tx, rx) = mpsc::channel(); + p1.select(p2).map_err(move |v| tx.send((1, v.1)).unwrap()).forget(); + assert!(rx.try_recv().is_err()); + drop(c1); + let (v, p2) = rx.recv().unwrap(); + assert_eq!(v, 1); + assert!(rx.recv().is_err()); + + let (tx, rx) = mpsc::channel(); + p2.map_err(move |_v| tx.send(2).unwrap()).forget(); + drop(c2); + assert_eq!(rx.recv(), Ok(2)); + assert!(rx.recv().is_err()); +} + +#[test] +fn select4() { + let (tx, rx) = mpsc::channel::<oneshot::Sender<i32>>(); + + let t = thread::spawn(move || { + for c in rx { + c.send(1).unwrap(); + } + }); + + let (tx2, rx2) = mpsc::channel(); + for _ in 0..10000 { + let (c1, p1) = oneshot::channel::<i32>(); + let (c2, p2) = oneshot::channel::<i32>(); + + let tx3 = tx2.clone(); + p1.select(p2).map(move |_| tx3.send(()).unwrap()).forget(); + tx.send(c1).unwrap(); + rx2.recv().unwrap(); + drop(c2); + } + drop(tx); + + t.join().unwrap(); +} diff --git a/third_party/rust/futures-0.1.31/tests/fuse.rs b/third_party/rust/futures-0.1.31/tests/fuse.rs new file mode 100644 index 0000000000..177d914e19 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/fuse.rs @@ -0,0 +1,39 @@ +extern crate futures; + +use futures::prelude::*; +use futures::future::ok; +use futures::executor; + +mod support; +use support::*; + +#[test] +fn fuse() { + let mut future = executor::spawn(ok::<i32, u32>(2).fuse()); + assert!(future.poll_future_notify(¬ify_panic(), 0).unwrap().is_ready()); + assert!(future.poll_future_notify(¬ify_panic(), 0).unwrap().is_not_ready()); +} + +#[test] +fn fuse_is_done() { + use futures::future::{Fuse, FutureResult}; + + struct Wrapped(Fuse<FutureResult<i32, u32>>); + + impl Future for Wrapped { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + assert!(!self.0.is_done()); + assert_eq!(self.0.poll().unwrap(), Async::Ready(2)); + assert!(self.0.is_done()); + assert_eq!(self.0.poll().unwrap(), Async::NotReady); + assert!(self.0.is_done()); + + Ok(Async::Ready(())) + } + } + + assert!(Wrapped(ok::<i32, u32>(2).fuse()).wait().is_ok()); +}
\ No newline at end of file diff --git a/third_party/rust/futures-0.1.31/tests/future_flatten_stream.rs b/third_party/rust/futures-0.1.31/tests/future_flatten_stream.rs new file mode 100644 index 0000000000..442d381fd7 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/future_flatten_stream.rs @@ -0,0 +1,43 @@ +extern crate core; +extern crate futures; + +use core::marker; + +use futures::prelude::*; +use futures::future::{ok, err}; +use futures::stream; + +#[test] +fn successful_future() { + let stream_items = vec![17, 19]; + let future_of_a_stream = ok::<_, bool>(stream::iter_ok(stream_items)); + + let stream = future_of_a_stream.flatten_stream(); + + let mut iter = stream.wait(); + assert_eq!(Ok(17), iter.next().unwrap()); + assert_eq!(Ok(19), iter.next().unwrap()); + assert_eq!(None, iter.next()); +} + +struct PanickingStream<T, E> { + _marker: marker::PhantomData<(T, E)> +} + +impl<T, E> Stream for PanickingStream<T, E> { + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + panic!() + } +} + +#[test] +fn failed_future() { + let future_of_a_stream = err::<PanickingStream<bool, u32>, _>(10); + let stream = future_of_a_stream.flatten_stream(); + let mut iter = stream.wait(); + assert_eq!(Err(10), iter.next().unwrap()); + assert_eq!(None, iter.next()); +} diff --git a/third_party/rust/futures-0.1.31/tests/futures_ordered.rs b/third_party/rust/futures-0.1.31/tests/futures_ordered.rs new file mode 100644 index 0000000000..6054192e3b --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/futures_ordered.rs @@ -0,0 +1,88 @@ +#![allow(bare_trait_objects, unknown_lints)] + +extern crate futures; + +use std::any::Any; + +use futures::sync::oneshot; +use futures::stream::futures_ordered; +use futures::prelude::*; + +mod support; + +#[test] +fn works_1() { + let (a_tx, a_rx) = oneshot::channel::<u32>(); + let (b_tx, b_rx) = oneshot::channel::<u32>(); + let (c_tx, c_rx) = oneshot::channel::<u32>(); + + let stream = futures_ordered(vec![a_rx, b_rx, c_rx]); + + let mut spawn = futures::executor::spawn(stream); + b_tx.send(99).unwrap(); + assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); + + a_tx.send(33).unwrap(); + c_tx.send(33).unwrap(); + assert_eq!(Some(Ok(33)), spawn.wait_stream()); + assert_eq!(Some(Ok(99)), spawn.wait_stream()); + assert_eq!(Some(Ok(33)), spawn.wait_stream()); + assert_eq!(None, spawn.wait_stream()); +} + +#[test] +fn works_2() { + let (a_tx, a_rx) = oneshot::channel::<u32>(); + let (b_tx, b_rx) = oneshot::channel::<u32>(); + let (c_tx, c_rx) = oneshot::channel::<u32>(); + + let stream = futures_ordered(vec![ + Box::new(a_rx) as Box<Future<Item = _, Error = _>>, + Box::new(b_rx.join(c_rx).map(|(a, b)| a + b)), + ]); + + let mut spawn = futures::executor::spawn(stream); + a_tx.send(33).unwrap(); + b_tx.send(33).unwrap(); + assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready()); + assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); + c_tx.send(33).unwrap(); + assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready()); +} + +#[test] +fn from_iterator() { + use futures::future::ok; + use futures::stream::FuturesOrdered; + + let stream = vec![ + ok::<u32, ()>(1), + ok::<u32, ()>(2), + ok::<u32, ()>(3) + ].into_iter().collect::<FuturesOrdered<_>>(); + assert_eq!(stream.len(), 3); + assert_eq!(stream.collect().wait(), Ok(vec![1,2,3])); +} + +#[test] +fn queue_never_unblocked() { + let (_a_tx, a_rx) = oneshot::channel::<Box<Any+Send>>(); + let (b_tx, b_rx) = oneshot::channel::<Box<Any+Send>>(); + let (c_tx, c_rx) = oneshot::channel::<Box<Any+Send>>(); + + let stream = futures_ordered(vec![ + Box::new(a_rx) as Box<Future<Item = _, Error = _>>, + Box::new(b_rx.select(c_rx).then(|res| Ok(Box::new(res) as Box<Any+Send>))), + ]); + + let mut spawn = futures::executor::spawn(stream); + for _ in 0..10 { + assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); + } + + b_tx.send(Box::new(())).unwrap(); + assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); + c_tx.send(Box::new(())).unwrap(); + assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); + assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); +} diff --git a/third_party/rust/futures-0.1.31/tests/futures_unordered.rs b/third_party/rust/futures-0.1.31/tests/futures_unordered.rs new file mode 100644 index 0000000000..325a6f3e48 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/futures_unordered.rs @@ -0,0 +1,167 @@ +#![allow(bare_trait_objects, unknown_lints)] + +extern crate futures; + +use std::any::Any; + +use futures::sync::oneshot; +use std::iter::FromIterator; +use futures::stream::{futures_unordered, FuturesUnordered}; +use futures::prelude::*; + +mod support; + +#[test] +fn works_1() { + let (a_tx, a_rx) = oneshot::channel::<u32>(); + let (b_tx, b_rx) = oneshot::channel::<u32>(); + let (c_tx, c_rx) = oneshot::channel::<u32>(); + + let stream = futures_unordered(vec![a_rx, b_rx, c_rx]); + + let mut spawn = futures::executor::spawn(stream); + b_tx.send(99).unwrap(); + assert_eq!(Some(Ok(99)), spawn.wait_stream()); + + a_tx.send(33).unwrap(); + c_tx.send(33).unwrap(); + assert_eq!(Some(Ok(33)), spawn.wait_stream()); + assert_eq!(Some(Ok(33)), spawn.wait_stream()); + assert_eq!(None, spawn.wait_stream()); +} + +#[test] +fn works_2() { + let (a_tx, a_rx) = oneshot::channel::<u32>(); + let (b_tx, b_rx) = oneshot::channel::<u32>(); + let (c_tx, c_rx) = oneshot::channel::<u32>(); + + let stream = futures_unordered(vec![ + Box::new(a_rx) as Box<Future<Item = _, Error = _>>, + Box::new(b_rx.join(c_rx).map(|(a, b)| a + b)), + ]); + + let mut spawn = futures::executor::spawn(stream); + a_tx.send(33).unwrap(); + b_tx.send(33).unwrap(); + assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready()); + c_tx.send(33).unwrap(); + assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready()); +} + +#[test] +fn from_iterator() { + use futures::future::ok; + use futures::stream::FuturesUnordered; + + let stream = vec![ + ok::<u32, ()>(1), + ok::<u32, ()>(2), + ok::<u32, ()>(3) + ].into_iter().collect::<FuturesUnordered<_>>(); + assert_eq!(stream.len(), 3); + assert_eq!(stream.collect().wait(), Ok(vec![1,2,3])); +} + +#[test] +fn finished_future_ok() { + let (_a_tx, a_rx) = oneshot::channel::<Box<Any+Send>>(); + let (b_tx, b_rx) = oneshot::channel::<Box<Any+Send>>(); + let (c_tx, c_rx) = oneshot::channel::<Box<Any+Send>>(); + + let stream = futures_unordered(vec![ + Box::new(a_rx) as Box<Future<Item = _, Error = _>>, + Box::new(b_rx.select(c_rx).then(|res| Ok(Box::new(res) as Box<Any+Send>))), + ]); + + let mut spawn = futures::executor::spawn(stream); + for _ in 0..10 { + assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); + } + + b_tx.send(Box::new(())).unwrap(); + let next = spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap(); + assert!(next.is_ready()); + c_tx.send(Box::new(())).unwrap(); + assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); + assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); +} + +#[test] +fn iter_mut_cancel() { + let (a_tx, a_rx) = oneshot::channel::<u32>(); + let (b_tx, b_rx) = oneshot::channel::<u32>(); + let (c_tx, c_rx) = oneshot::channel::<u32>(); + + let mut stream = futures_unordered(vec![a_rx, b_rx, c_rx]); + + for rx in stream.iter_mut() { + rx.close(); + } + + assert!(a_tx.is_canceled()); + assert!(b_tx.is_canceled()); + assert!(c_tx.is_canceled()); + + let mut spawn = futures::executor::spawn(stream); + assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream()); + assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream()); + assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream()); + assert_eq!(None, spawn.wait_stream()); +} + +#[test] +fn iter_mut_len() { + let mut stream = futures_unordered(vec![ + futures::future::empty::<(),()>(), + futures::future::empty::<(),()>(), + futures::future::empty::<(),()>() + ]); + + let mut iter_mut = stream.iter_mut(); + assert_eq!(iter_mut.len(), 3); + assert!(iter_mut.next().is_some()); + assert_eq!(iter_mut.len(), 2); + assert!(iter_mut.next().is_some()); + assert_eq!(iter_mut.len(), 1); + assert!(iter_mut.next().is_some()); + assert_eq!(iter_mut.len(), 0); + assert!(iter_mut.next().is_none()); +} + +#[test] +fn polled_only_once_at_most_per_iteration() { + #[derive(Debug, Clone, Copy, Default)] + struct F { + polled: bool, + } + + impl Future for F { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> { + if self.polled { + panic!("polled twice") + } else { + self.polled = true; + Ok(Async::NotReady) + } + } + } + + + let tasks = FuturesUnordered::from_iter(vec![F::default(); 10]); + let mut tasks = futures::executor::spawn(tasks); + assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); + assert_eq!(10, tasks.get_mut().iter_mut().filter(|f| f.polled).count()); + + let tasks = FuturesUnordered::from_iter(vec![F::default(); 33]); + let mut tasks = futures::executor::spawn(tasks); + assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); + assert_eq!(33, tasks.get_mut().iter_mut().filter(|f| f.polled).count()); + + let tasks = FuturesUnordered::<F>::new(); + let mut tasks = futures::executor::spawn(tasks); + assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready()); +} diff --git a/third_party/rust/futures-0.1.31/tests/inspect.rs b/third_party/rust/futures-0.1.31/tests/inspect.rs new file mode 100644 index 0000000000..c16372ed91 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/inspect.rs @@ -0,0 +1,23 @@ +extern crate futures; + +use futures::prelude::*; +use futures::future::{ok, err}; + +#[test] +fn smoke() { + let mut counter = 0; + + { + let work = ok::<u32, u32>(40).inspect(|val| { counter += *val; }); + assert_eq!(work.wait(), Ok(40)); + } + + assert_eq!(counter, 40); + + { + let work = err::<u32, u32>(4).inspect(|val| { counter += *val; }); + assert_eq!(work.wait(), Err(4)); + } + + assert_eq!(counter, 40); +} diff --git a/third_party/rust/futures-0.1.31/tests/mpsc-close.rs b/third_party/rust/futures-0.1.31/tests/mpsc-close.rs new file mode 100644 index 0000000000..061616ae06 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/mpsc-close.rs @@ -0,0 +1,152 @@ +extern crate futures; + +use std::sync::{Arc, Weak}; +use std::thread; +use std::time::{Duration, Instant}; + +use futures::prelude::*; +use futures::sync::mpsc::*; +use futures::task; + +#[test] +fn smoke() { + let (mut sender, receiver) = channel(1); + + let t = thread::spawn(move ||{ + while let Ok(s) = sender.send(42).wait() { + sender = s; + } + }); + + receiver.take(3).for_each(|_| Ok(())).wait().unwrap(); + + t.join().unwrap() +} + +// Stress test that `try_send()`s occurring concurrently with receiver +// close/drops don't appear as successful sends. +#[test] +fn stress_try_send_as_receiver_closes() { + const AMT: usize = 10000; + // To provide variable timing characteristics (in the hopes of + // reproducing the collision that leads to a race), we busy-re-poll + // the test MPSC receiver a variable number of times before actually + // stopping. We vary this countdown between 1 and the following + // value. + const MAX_COUNTDOWN: usize = 20; + // When we detect that a successfully sent item is still in the + // queue after a disconnect, we spin for up to 100ms to confirm that + // it is a persistent condition and not a concurrency illusion. + const SPIN_TIMEOUT_S: u64 = 10; + const SPIN_SLEEP_MS: u64 = 10; + struct TestRx { + rx: Receiver<Arc<()>>, + // The number of times to query `rx` before dropping it. + poll_count: usize + } + struct TestTask { + command_rx: Receiver<TestRx>, + test_rx: Option<Receiver<Arc<()>>>, + countdown: usize, + } + impl TestTask { + /// Create a new TestTask + fn new() -> (TestTask, Sender<TestRx>) { + let (command_tx, command_rx) = channel::<TestRx>(0); + ( + TestTask { + command_rx: command_rx, + test_rx: None, + countdown: 0, // 0 means no countdown is in progress. + }, + command_tx, + ) + } + } + impl Future for TestTask { + type Item = (); + type Error = (); + fn poll(&mut self) -> Poll<(), ()> { + // Poll the test channel, if one is present. + if let Some(ref mut rx) = self.test_rx { + if let Ok(Async::Ready(v)) = rx.poll() { + let _ = v.expect("test finished unexpectedly!"); + } + self.countdown -= 1; + // Busy-poll until the countdown is finished. + task::current().notify(); + } + // Accept any newly submitted MPSC channels for testing. + match self.command_rx.poll()? { + Async::Ready(Some(TestRx { rx, poll_count })) => { + self.test_rx = Some(rx); + self.countdown = poll_count; + task::current().notify(); + }, + Async::Ready(None) => return Ok(Async::Ready(())), + _ => {}, + } + if self.countdown == 0 { + // Countdown complete -- drop the Receiver. + self.test_rx = None; + } + Ok(Async::NotReady) + } + } + let (f, mut cmd_tx) = TestTask::new(); + let bg = thread::spawn(move || f.wait()); + for i in 0..AMT { + let (mut test_tx, rx) = channel(0); + let poll_count = i % MAX_COUNTDOWN; + cmd_tx.try_send(TestRx { rx: rx, poll_count: poll_count }).unwrap(); + let mut prev_weak: Option<Weak<()>> = None; + let mut attempted_sends = 0; + let mut successful_sends = 0; + loop { + // Create a test item. + let item = Arc::new(()); + let weak = Arc::downgrade(&item); + match test_tx.try_send(item) { + Ok(_) => { + prev_weak = Some(weak); + successful_sends += 1; + } + Err(ref e) if e.is_full() => {} + Err(ref e) if e.is_disconnected() => { + // Test for evidence of the race condition. + if let Some(prev_weak) = prev_weak { + if prev_weak.upgrade().is_some() { + // The previously sent item is still allocated. + // However, there appears to be some aspect of the + // concurrency that can legitimately cause the Arc + // to be momentarily valid. Spin for up to 100ms + // waiting for the previously sent item to be + // dropped. + let t0 = Instant::now(); + let mut spins = 0; + loop { + if prev_weak.upgrade().is_none() { + break; + } + assert!(t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S), + "item not dropped on iteration {} after \ + {} sends ({} successful). spin=({})", + i, attempted_sends, successful_sends, spins + ); + spins += 1; + thread::sleep(Duration::from_millis(SPIN_SLEEP_MS)); + } + } + } + break; + } + Err(ref e) => panic!("unexpected error: {}", e), + } + attempted_sends += 1; + } + } + drop(cmd_tx); + bg.join() + .expect("background thread join") + .expect("background thread result"); +} diff --git a/third_party/rust/futures-0.1.31/tests/mpsc.rs b/third_party/rust/futures-0.1.31/tests/mpsc.rs new file mode 100644 index 0000000000..9cb83e5952 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/mpsc.rs @@ -0,0 +1,567 @@ +#![cfg(feature = "use_std")] +#![allow(bare_trait_objects, unknown_lints)] + +#[macro_use] +extern crate futures; + +use futures::prelude::*; +use futures::future::{lazy, ok}; +use futures::stream::unfold; +use futures::sync::mpsc; +use futures::sync::oneshot; + +use std::thread; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicUsize, Ordering}; + +mod support; +use support::*; + + +trait AssertSend: Send {} +impl AssertSend for mpsc::Sender<i32> {} +impl AssertSend for mpsc::Receiver<i32> {} + +#[test] +fn send_recv() { + let (tx, rx) = mpsc::channel::<i32>(16); + let mut rx = rx.wait(); + + tx.send(1).wait().unwrap(); + + assert_eq!(rx.next().unwrap(), Ok(1)); +} + +#[test] +fn send_recv_no_buffer() { + let (mut tx, mut rx) = mpsc::channel::<i32>(0); + + // Run on a task context + lazy(move || { + assert!(tx.poll_complete().unwrap().is_ready()); + assert!(tx.poll_ready().unwrap().is_ready()); + + // Send first message + let res = tx.start_send(1).unwrap(); + assert!(is_ready(&res)); + assert!(tx.poll_ready().unwrap().is_not_ready()); + + // Send second message + let res = tx.start_send(2).unwrap(); + assert!(!is_ready(&res)); + + // Take the value + assert_eq!(rx.poll().unwrap(), Async::Ready(Some(1))); + assert!(tx.poll_ready().unwrap().is_ready()); + + let res = tx.start_send(2).unwrap(); + assert!(is_ready(&res)); + assert!(tx.poll_ready().unwrap().is_not_ready()); + + // Take the value + assert_eq!(rx.poll().unwrap(), Async::Ready(Some(2))); + assert!(tx.poll_ready().unwrap().is_ready()); + + Ok::<(), ()>(()) + }).wait().unwrap(); +} + +#[test] +fn send_shared_recv() { + let (tx1, rx) = mpsc::channel::<i32>(16); + let tx2 = tx1.clone(); + let mut rx = rx.wait(); + + tx1.send(1).wait().unwrap(); + assert_eq!(rx.next().unwrap(), Ok(1)); + + tx2.send(2).wait().unwrap(); + assert_eq!(rx.next().unwrap(), Ok(2)); +} + +#[test] +fn send_recv_threads() { + let (tx, rx) = mpsc::channel::<i32>(16); + let mut rx = rx.wait(); + + thread::spawn(move|| { + tx.send(1).wait().unwrap(); + }); + + assert_eq!(rx.next().unwrap(), Ok(1)); +} + +#[test] +fn send_recv_threads_no_capacity() { + let (tx, rx) = mpsc::channel::<i32>(0); + let mut rx = rx.wait(); + + let (readytx, readyrx) = mpsc::channel::<()>(2); + let mut readyrx = readyrx.wait(); + let t = thread::spawn(move|| { + let readytx = readytx.sink_map_err(|_| panic!()); + let (a, b) = tx.send(1).join(readytx.send(())).wait().unwrap(); + a.send(2).join(b.send(())).wait().unwrap(); + }); + + drop(readyrx.next().unwrap()); + assert_eq!(rx.next().unwrap(), Ok(1)); + drop(readyrx.next().unwrap()); + assert_eq!(rx.next().unwrap(), Ok(2)); + + t.join().unwrap(); +} + +#[test] +fn recv_close_gets_none() { + let (mut tx, mut rx) = mpsc::channel::<i32>(10); + + // Run on a task context + lazy(move || { + rx.close(); + + assert_eq!(rx.poll(), Ok(Async::Ready(None))); + assert!(tx.poll_ready().is_err()); + + drop(tx); + + Ok::<(), ()>(()) + }).wait().unwrap(); +} + + +#[test] +fn tx_close_gets_none() { + let (_, mut rx) = mpsc::channel::<i32>(10); + + // Run on a task context + lazy(move || { + assert_eq!(rx.poll(), Ok(Async::Ready(None))); + assert_eq!(rx.poll(), Ok(Async::Ready(None))); + + Ok::<(), ()>(()) + }).wait().unwrap(); +} + +#[test] +fn spawn_sends_items() { + let core = local_executor::Core::new(); + let stream = unfold(0, |i| Some(ok::<_,u8>((i, i + 1)))); + let rx = mpsc::spawn(stream, &core, 1); + assert_eq!(core.run(rx.take(4).collect()).unwrap(), + [0, 1, 2, 3]); +} + +#[test] +fn spawn_kill_dead_stream() { + use std::thread; + use std::time::Duration; + use futures::future::Either; + use futures::sync::oneshot; + + // a stream which never returns anything (maybe a remote end isn't + // responding), but dropping it leads to observable side effects + // (like closing connections, releasing limited resources, ...) + #[derive(Debug)] + struct Dead { + // when dropped you should get Err(oneshot::Canceled) on the + // receiving end + done: oneshot::Sender<()>, + } + impl Stream for Dead { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + Ok(Async::NotReady) + } + } + + // need to implement a timeout for the test, as it would hang + // forever right now + let (timeout_tx, timeout_rx) = oneshot::channel(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(1000)); + let _ = timeout_tx.send(()); + }); + + let core = local_executor::Core::new(); + let (done_tx, done_rx) = oneshot::channel(); + let stream = Dead{done: done_tx}; + let rx = mpsc::spawn(stream, &core, 1); + let res = core.run( + Ok::<_, ()>(()) + .into_future() + .then(move |_| { + // now drop the spawned stream: maybe some timeout exceeded, + // or some connection on this end was closed by the remote + // end. + drop(rx); + // and wait for the spawned stream to release its resources + done_rx + }) + .select2(timeout_rx) + ); + match res { + Err(Either::A((oneshot::Canceled, _))) => (), + _ => { + panic!("dead stream wasn't canceled"); + }, + } +} + +#[test] +fn stress_shared_unbounded() { + const AMT: u32 = 10000; + const NTHREADS: u32 = 8; + let (tx, rx) = mpsc::unbounded::<i32>(); + let mut rx = rx.wait(); + + let t = thread::spawn(move|| { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.next().unwrap(), Ok(1)); + } + + if rx.next().is_some() { + panic!(); + } + }); + + for _ in 0..NTHREADS { + let tx = tx.clone(); + + thread::spawn(move|| { + for _ in 0..AMT { + tx.unbounded_send(1).unwrap(); + } + }); + } + + drop(tx); + + t.join().ok().unwrap(); +} + +#[test] +fn stress_shared_bounded_hard() { + const AMT: u32 = 10000; + const NTHREADS: u32 = 8; + let (tx, rx) = mpsc::channel::<i32>(0); + let mut rx = rx.wait(); + + let t = thread::spawn(move|| { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.next().unwrap(), Ok(1)); + } + + if rx.next().is_some() { + panic!(); + } + }); + + for _ in 0..NTHREADS { + let mut tx = tx.clone(); + + thread::spawn(move|| { + for _ in 0..AMT { + tx = tx.send(1).wait().unwrap(); + } + }); + } + + drop(tx); + + t.join().ok().unwrap(); +} + +#[test] +fn stress_receiver_multi_task_bounded_hard() { + const AMT: usize = 10_000; + const NTHREADS: u32 = 2; + + let (mut tx, rx) = mpsc::channel::<usize>(0); + let rx = Arc::new(Mutex::new(Some(rx))); + let n = Arc::new(AtomicUsize::new(0)); + + let mut th = vec![]; + + for _ in 0..NTHREADS { + let rx = rx.clone(); + let n = n.clone(); + + let t = thread::spawn(move || { + let mut i = 0; + + loop { + i += 1; + let mut lock = rx.lock().ok().unwrap(); + + match lock.take() { + Some(mut rx) => { + if i % 5 == 0 { + let (item, rest) = rx.into_future().wait().ok().unwrap(); + + if item.is_none() { + break; + } + + n.fetch_add(1, Ordering::Relaxed); + *lock = Some(rest); + } else { + // Just poll + let n = n.clone(); + let r = lazy(move || { + let r = match rx.poll().unwrap() { + Async::Ready(Some(_)) => { + n.fetch_add(1, Ordering::Relaxed); + *lock = Some(rx); + false + } + Async::Ready(None) => { + true + } + Async::NotReady => { + *lock = Some(rx); + false + } + }; + + Ok::<bool, ()>(r) + }).wait().unwrap(); + + if r { + break; + } + } + } + None => break, + } + } + }); + + th.push(t); + } + + for i in 0..AMT { + tx = tx.send(i).wait().unwrap(); + } + + drop(tx); + + for t in th { + t.join().unwrap(); + } + + assert_eq!(AMT, n.load(Ordering::Relaxed)); +} + +/// Stress test that receiver properly receives all the messages +/// after sender dropped. +#[test] +fn stress_drop_sender() { + fn list() -> Box<Stream<Item=i32, Error=u32>> { + let (tx, rx) = mpsc::channel(1); + tx.send(Ok(1)) + .and_then(|tx| tx.send(Ok(2))) + .and_then(|tx| tx.send(Ok(3))) + .forget(); + Box::new(rx.then(|r| r.unwrap())) + } + + for _ in 0..10000 { + assert_eq!(list().wait().collect::<Result<Vec<_>, _>>(), + Ok(vec![1, 2, 3])); + } +} + +/// Stress test that after receiver dropped, +/// no messages are lost. +fn stress_close_receiver_iter() { + let (tx, rx) = mpsc::unbounded(); + let (unwritten_tx, unwritten_rx) = std::sync::mpsc::channel(); + let th = thread::spawn(move || { + for i in 1.. { + if let Err(_) = tx.unbounded_send(i) { + unwritten_tx.send(i).expect("unwritten_tx"); + return; + } + } + }); + + let mut rx = rx.wait(); + + // Read one message to make sure thread effectively started + assert_eq!(Some(Ok(1)), rx.next()); + + rx.get_mut().close(); + + for i in 2.. { + match rx.next() { + Some(Ok(r)) => assert!(i == r), + Some(Err(_)) => unreachable!(), + None => { + let unwritten = unwritten_rx.recv().expect("unwritten_rx"); + assert_eq!(unwritten, i); + th.join().unwrap(); + return; + } + } + } +} + +#[test] +fn stress_close_receiver() { + for _ in 0..10000 { + stress_close_receiver_iter(); + } +} + +/// Tests that after `poll_ready` indicates capacity a channel can always send without waiting. +#[test] +fn stress_poll_ready() { + // A task which checks channel capacity using poll_ready, and pushes items onto the channel when + // ready. + struct SenderTask { + sender: mpsc::Sender<u32>, + count: u32, + } + impl Future for SenderTask { + type Item = (); + type Error = (); + fn poll(&mut self) -> Poll<(), ()> { + // In a loop, check if the channel is ready. If so, push an item onto the channel + // (asserting that it doesn't attempt to block). + while self.count > 0 { + try_ready!(self.sender.poll_ready().map_err(|_| ())); + assert!(self.sender.start_send(self.count).unwrap().is_ready()); + self.count -= 1; + } + Ok(Async::Ready(())) + } + } + + const AMT: u32 = 1000; + const NTHREADS: u32 = 8; + + /// Run a stress test using the specified channel capacity. + fn stress(capacity: usize) { + let (tx, rx) = mpsc::channel(capacity); + let mut threads = Vec::new(); + for _ in 0..NTHREADS { + let sender = tx.clone(); + threads.push(thread::spawn(move || { + SenderTask { + sender: sender, + count: AMT, + }.wait() + })); + } + drop(tx); + + let mut rx = rx.wait(); + for _ in 0..AMT * NTHREADS { + assert!(rx.next().is_some()); + } + + assert!(rx.next().is_none()); + + for thread in threads { + thread.join().unwrap().unwrap(); + } + } + + stress(0); + stress(1); + stress(8); + stress(16); +} + +fn is_ready<T>(res: &AsyncSink<T>) -> bool { + match *res { + AsyncSink::Ready => true, + _ => false, + } +} + +#[test] +fn try_send_1() { + const N: usize = 3000; + let (mut tx, rx) = mpsc::channel(0); + + let t = thread::spawn(move || { + for i in 0..N { + loop { + if tx.try_send(i).is_ok() { + break + } + } + } + }); + for (i, j) in rx.wait().enumerate() { + assert_eq!(i, j.unwrap()); + } + t.join().unwrap(); +} + +#[test] +fn try_send_2() { + let (mut tx, rx) = mpsc::channel(0); + + tx.try_send("hello").unwrap(); + + let (readytx, readyrx) = oneshot::channel::<()>(); + + let th = thread::spawn(|| { + lazy(|| { + assert!(tx.start_send("fail").unwrap().is_not_ready()); + Ok::<_, ()>(()) + }).wait().unwrap(); + + drop(readytx); + tx.send("goodbye").wait().unwrap(); + }); + + let mut rx = rx.wait(); + + drop(readyrx.wait()); + assert_eq!(rx.next(), Some(Ok("hello"))); + assert_eq!(rx.next(), Some(Ok("goodbye"))); + assert!(rx.next().is_none()); + + th.join().unwrap(); +} + +#[test] +fn try_send_fail() { + let (mut tx, rx) = mpsc::channel(0); + let mut rx = rx.wait(); + + tx.try_send("hello").unwrap(); + + // This should fail + assert!(tx.try_send("fail").is_err()); + + assert_eq!(rx.next(), Some(Ok("hello"))); + + tx.try_send("goodbye").unwrap(); + drop(tx); + + assert_eq!(rx.next(), Some(Ok("goodbye"))); + assert!(rx.next().is_none()); +} + +#[test] +fn bounded_is_really_bounded() { + use futures::Async::*; + let (mut tx, mut rx) = mpsc::channel(0); + lazy(|| { + assert!(tx.start_send(1).unwrap().is_ready()); + // Not ready until we receive + assert!(!tx.poll_complete().unwrap().is_ready()); + // Receive the value + assert_eq!(rx.poll().unwrap(), Ready(Some(1))); + // Now the sender is ready + assert!(tx.poll_complete().unwrap().is_ready()); + Ok::<_, ()>(()) + }).wait().unwrap(); +} diff --git a/third_party/rust/futures-0.1.31/tests/oneshot.rs b/third_party/rust/futures-0.1.31/tests/oneshot.rs new file mode 100644 index 0000000000..45c1996876 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/oneshot.rs @@ -0,0 +1,253 @@ +extern crate futures; + +use std::sync::mpsc; +use std::thread; + +use futures::prelude::*; +use futures::future::{lazy, ok}; +use futures::sync::oneshot::*; + +mod support; +use support::*; + +#[test] +fn smoke_poll() { + let (mut tx, rx) = channel::<u32>(); + let mut task = futures::executor::spawn(lazy(|| { + assert!(tx.poll_cancel().unwrap().is_not_ready()); + assert!(tx.poll_cancel().unwrap().is_not_ready()); + drop(rx); + assert!(tx.poll_cancel().unwrap().is_ready()); + assert!(tx.poll_cancel().unwrap().is_ready()); + ok::<(), ()>(()) + })); + assert!(task.poll_future_notify(¬ify_noop(), 0).unwrap().is_ready()); +} + +#[test] +fn cancel_notifies() { + let (tx, rx) = channel::<u32>(); + let (tx2, rx2) = mpsc::channel(); + + WaitForCancel { tx: tx }.then(move |v| tx2.send(v)).forget(); + drop(rx); + rx2.recv().unwrap().unwrap(); +} + +struct WaitForCancel { + tx: Sender<u32>, +} + +impl Future for WaitForCancel { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + self.tx.poll_cancel() + } +} + +#[test] +fn cancel_lots() { + let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>(); + let t = thread::spawn(move || { + for (tx, tx2) in rx { + WaitForCancel { tx: tx }.then(move |v| tx2.send(v)).forget(); + } + + }); + + for _ in 0..20000 { + let (otx, orx) = channel::<u32>(); + let (tx2, rx2) = mpsc::channel(); + tx.send((otx, tx2)).unwrap(); + drop(orx); + rx2.recv().unwrap().unwrap(); + } + drop(tx); + + t.join().unwrap(); +} + +#[test] +fn close() { + let (mut tx, mut rx) = channel::<u32>(); + rx.close(); + assert!(rx.poll().is_err()); + assert!(tx.poll_cancel().unwrap().is_ready()); +} + +#[test] +fn close_wakes() { + let (tx, mut rx) = channel::<u32>(); + let (tx2, rx2) = mpsc::channel(); + let t = thread::spawn(move || { + rx.close(); + rx2.recv().unwrap(); + }); + WaitForCancel { tx: tx }.wait().unwrap(); + tx2.send(()).unwrap(); + t.join().unwrap(); +} + +#[test] +fn is_canceled() { + let (tx, rx) = channel::<u32>(); + assert!(!tx.is_canceled()); + drop(rx); + assert!(tx.is_canceled()); +} + +#[test] +fn cancel_sends() { + let (tx, rx) = mpsc::channel::<Sender<_>>(); + let t = thread::spawn(move || { + for otx in rx { + let _ = otx.send(42); + } + }); + + for _ in 0..20000 { + let (otx, mut orx) = channel::<u32>(); + tx.send(otx).unwrap(); + + orx.close(); + // Not necessary to wrap in a task because the implementation of oneshot + // never calls `task::current()` if the channel has been closed already. + let _ = orx.poll(); + } + + drop(tx); + t.join().unwrap(); +} + +#[test] +fn spawn_sends_items() { + let core = local_executor::Core::new(); + let future = ok::<_, ()>(1); + let rx = spawn(future, &core); + assert_eq!(core.run(rx).unwrap(), 1); +} + +#[test] +fn spawn_kill_dead_stream() { + use std::thread; + use std::time::Duration; + use futures::future::Either; + use futures::sync::oneshot; + + // a future which never returns anything (forever accepting incoming + // connections), but dropping it leads to observable side effects + // (like closing listening sockets, releasing limited resources, + // ...) + #[derive(Debug)] + struct Dead { + // when dropped you should get Err(oneshot::Canceled) on the + // receiving end + done: oneshot::Sender<()>, + } + impl Future for Dead { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + Ok(Async::NotReady) + } + } + + // need to implement a timeout for the test, as it would hang + // forever right now + let (timeout_tx, timeout_rx) = oneshot::channel(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(1000)); + let _ = timeout_tx.send(()); + }); + + let core = local_executor::Core::new(); + let (done_tx, done_rx) = oneshot::channel(); + let future = Dead{done: done_tx}; + let rx = spawn(future, &core); + let res = core.run( + Ok::<_, ()>(()) + .into_future() + .then(move |_| { + // now drop the spawned future: maybe some timeout exceeded, + // or some connection on this end was closed by the remote + // end. + drop(rx); + // and wait for the spawned future to release its resources + done_rx + }) + .select2(timeout_rx) + ); + match res { + Err(Either::A((oneshot::Canceled, _))) => (), + Ok(Either::B(((), _))) => { + panic!("dead future wasn't canceled (timeout)"); + }, + _ => { + panic!("dead future wasn't canceled (unexpected result)"); + }, + } +} + +#[test] +fn spawn_dont_kill_forgot_dead_stream() { + use std::thread; + use std::time::Duration; + use futures::future::Either; + use futures::sync::oneshot; + + // a future which never returns anything (forever accepting incoming + // connections), but dropping it leads to observable side effects + // (like closing listening sockets, releasing limited resources, + // ...) + #[derive(Debug)] + struct Dead { + // when dropped you should get Err(oneshot::Canceled) on the + // receiving end + done: oneshot::Sender<()>, + } + impl Future for Dead { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + Ok(Async::NotReady) + } + } + + // need to implement a timeout for the test, as it would hang + // forever right now + let (timeout_tx, timeout_rx) = oneshot::channel(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(1000)); + let _ = timeout_tx.send(()); + }); + + let core = local_executor::Core::new(); + let (done_tx, done_rx) = oneshot::channel(); + let future = Dead{done: done_tx}; + let rx = spawn(future, &core); + let res = core.run( + Ok::<_, ()>(()) + .into_future() + .then(move |_| { + // forget the spawned future: should keep running, i.e. hit + // the timeout below. + rx.forget(); + // and wait for the spawned future to release its resources + done_rx + }) + .select2(timeout_rx) + ); + match res { + Err(Either::A((oneshot::Canceled, _))) => { + panic!("forgotten dead future was canceled"); + }, + Ok(Either::B(((), _))) => (), // reached timeout + _ => { + panic!("forgotten dead future was canceled (unexpected result)"); + }, + } +} diff --git a/third_party/rust/futures-0.1.31/tests/ready_queue.rs b/third_party/rust/futures-0.1.31/tests/ready_queue.rs new file mode 100644 index 0000000000..b0dc2375ba --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/ready_queue.rs @@ -0,0 +1,164 @@ +extern crate futures; + +use std::panic::{self, AssertUnwindSafe}; + +use futures::prelude::*; +use futures::Async::*; +use futures::future; +use futures::stream::FuturesUnordered; +use futures::sync::oneshot; + +trait AssertSendSync: Send + Sync {} +impl AssertSendSync for FuturesUnordered<()> {} + +#[test] +fn basic_usage() { + future::lazy(move || { + let mut queue = FuturesUnordered::new(); + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let (tx3, rx3) = oneshot::channel(); + + queue.push(rx1); + queue.push(rx2); + queue.push(rx3); + + assert!(!queue.poll().unwrap().is_ready()); + + tx2.send("hello").unwrap(); + + assert_eq!(Ready(Some("hello")), queue.poll().unwrap()); + assert!(!queue.poll().unwrap().is_ready()); + + tx1.send("world").unwrap(); + tx3.send("world2").unwrap(); + + assert_eq!(Ready(Some("world")), queue.poll().unwrap()); + assert_eq!(Ready(Some("world2")), queue.poll().unwrap()); + assert_eq!(Ready(None), queue.poll().unwrap()); + + Ok::<_, ()>(()) + }).wait().unwrap(); +} + +#[test] +fn resolving_errors() { + future::lazy(move || { + let mut queue = FuturesUnordered::new(); + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let (tx3, rx3) = oneshot::channel(); + + queue.push(rx1); + queue.push(rx2); + queue.push(rx3); + + assert!(!queue.poll().unwrap().is_ready()); + + drop(tx2); + + assert!(queue.poll().is_err()); + assert!(!queue.poll().unwrap().is_ready()); + + drop(tx1); + tx3.send("world2").unwrap(); + + assert!(queue.poll().is_err()); + assert_eq!(Ready(Some("world2")), queue.poll().unwrap()); + assert_eq!(Ready(None), queue.poll().unwrap()); + + Ok::<_, ()>(()) + }).wait().unwrap(); +} + +#[test] +fn dropping_ready_queue() { + future::lazy(move || { + let mut queue = FuturesUnordered::new(); + let (mut tx1, rx1) = oneshot::channel::<()>(); + let (mut tx2, rx2) = oneshot::channel::<()>(); + let (mut tx3, rx3) = oneshot::channel::<()>(); + + queue.push(rx1); + queue.push(rx2); + queue.push(rx3); + + assert!(!tx1.poll_cancel().unwrap().is_ready()); + assert!(!tx2.poll_cancel().unwrap().is_ready()); + assert!(!tx3.poll_cancel().unwrap().is_ready()); + + drop(queue); + + assert!(tx1.poll_cancel().unwrap().is_ready()); + assert!(tx2.poll_cancel().unwrap().is_ready()); + assert!(tx3.poll_cancel().unwrap().is_ready()); + + Ok::<_, ()>(()) + }).wait().unwrap(); +} + +#[test] +fn stress() { + const ITER: usize = 300; + + use std::sync::{Arc, Barrier}; + use std::thread; + + for i in 0..ITER { + let n = (i % 10) + 1; + + let mut queue = FuturesUnordered::new(); + + for _ in 0..5 { + let barrier = Arc::new(Barrier::new(n + 1)); + + for num in 0..n { + let barrier = barrier.clone(); + let (tx, rx) = oneshot::channel(); + + queue.push(rx); + + thread::spawn(move || { + barrier.wait(); + tx.send(num).unwrap(); + }); + } + + barrier.wait(); + + let mut sync = queue.wait(); + + let mut rx: Vec<_> = (&mut sync) + .take(n) + .map(|res| res.unwrap()) + .collect(); + + assert_eq!(rx.len(), n); + + rx.sort(); + + for num in 0..n { + assert_eq!(rx[num], num); + } + + queue = sync.into_inner(); + } + } +} + +#[test] +fn panicking_future_dropped() { + future::lazy(move || { + let mut queue = FuturesUnordered::new(); + queue.push(future::poll_fn(|| -> Poll<i32, i32> { + panic!() + })); + + let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll())); + assert!(r.is_err()); + assert!(queue.is_empty()); + assert_eq!(Ready(None), queue.poll().unwrap()); + + Ok::<_, ()>(()) + }).wait().unwrap(); +} diff --git a/third_party/rust/futures-0.1.31/tests/recurse.rs b/third_party/rust/futures-0.1.31/tests/recurse.rs new file mode 100644 index 0000000000..a521ed13b7 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/recurse.rs @@ -0,0 +1,25 @@ +#![allow(bare_trait_objects, unknown_lints)] + +extern crate futures; + +use std::sync::mpsc::channel; + +use futures::future::ok; +use futures::prelude::*; + +#[test] +fn lots() { + fn doit(n: usize) -> Box<Future<Item=(), Error=()> + Send> { + if n == 0 { + Box::new(ok(())) + } else { + Box::new(ok(n - 1).and_then(doit)) + } + } + + let (tx, rx) = channel(); + ::std::thread::spawn(|| { + doit(1_000).map(move |_| tx.send(()).unwrap()).wait() + }); + rx.recv().unwrap(); +} diff --git a/third_party/rust/futures-0.1.31/tests/select_all.rs b/third_party/rust/futures-0.1.31/tests/select_all.rs new file mode 100644 index 0000000000..7780aa306d --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/select_all.rs @@ -0,0 +1,27 @@ +extern crate futures; + +use futures::prelude::*; +use futures::future::{ok, select_all, err}; + +#[test] +fn smoke() { + let v = vec![ + ok(1), + err(2), + ok(3), + ]; + + let (i, idx, v) = select_all(v).wait().ok().unwrap(); + assert_eq!(i, 1); + assert_eq!(idx, 0); + + let (i, idx, v) = select_all(v).wait().err().unwrap(); + assert_eq!(i, 2); + assert_eq!(idx, 0); + + let (i, idx, v) = select_all(v).wait().ok().unwrap(); + assert_eq!(i, 3); + assert_eq!(idx, 0); + + assert!(v.is_empty()); +} diff --git a/third_party/rust/futures-0.1.31/tests/select_ok.rs b/third_party/rust/futures-0.1.31/tests/select_ok.rs new file mode 100644 index 0000000000..85f39e2d39 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/select_ok.rs @@ -0,0 +1,40 @@ +extern crate futures; + +use futures::future::*; + +#[test] +fn ignore_err() { + let v = vec![ + err(1), + err(2), + ok(3), + ok(4), + ]; + + let (i, v) = select_ok(v).wait().ok().unwrap(); + assert_eq!(i, 3); + + assert_eq!(v.len(), 1); + + let (i, v) = select_ok(v).wait().ok().unwrap(); + assert_eq!(i, 4); + + assert!(v.is_empty()); +} + +#[test] +fn last_err() { + let v = vec![ + ok(1), + err(2), + err(3), + ]; + + let (i, v) = select_ok(v).wait().ok().unwrap(); + assert_eq!(i, 1); + + assert_eq!(v.len(), 2); + + let i = select_ok(v).wait().err().unwrap(); + assert_eq!(i, 3); +} diff --git a/third_party/rust/futures-0.1.31/tests/shared.rs b/third_party/rust/futures-0.1.31/tests/shared.rs new file mode 100644 index 0000000000..97989fe2cb --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/shared.rs @@ -0,0 +1,236 @@ +#![allow(bare_trait_objects, unknown_lints)] + +extern crate futures; + +mod support; + +use std::cell::RefCell; +use std::rc::Rc; +use std::thread; + +use futures::sync::oneshot; +use futures::prelude::*; +use futures::future; + +fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) { + let (tx, rx) = oneshot::channel::<u32>(); + let f = rx.shared(); + let threads = (0..threads_number).map(|_| { + let cloned_future = f.clone(); + thread::spawn(move || { + assert_eq!(*cloned_future.wait().unwrap(), 6); + }) + }).collect::<Vec<_>>(); + tx.send(6).unwrap(); + assert_eq!(*f.wait().unwrap(), 6); + for f in threads { + f.join().unwrap(); + } +} + +#[test] +fn one_thread() { + send_shared_oneshot_and_wait_on_multiple_threads(1); +} + +#[test] +fn two_threads() { + send_shared_oneshot_and_wait_on_multiple_threads(2); +} + +#[test] +fn many_threads() { + send_shared_oneshot_and_wait_on_multiple_threads(1000); +} + +#[test] +fn drop_on_one_task_ok() { + let (tx, rx) = oneshot::channel::<u32>(); + let f1 = rx.shared(); + let f2 = f1.clone(); + + let (tx2, rx2) = oneshot::channel::<u32>(); + + let t1 = thread::spawn(|| { + let f = f1.map_err(|_| ()).map(|x| *x).select(rx2.map_err(|_| ())); + drop(f.wait()); + }); + + let (tx3, rx3) = oneshot::channel::<u32>(); + + let t2 = thread::spawn(|| { + let _ = f2.map(|x| tx3.send(*x).unwrap()).map_err(|_| ()).wait(); + }); + + tx2.send(11).unwrap(); // cancel `f1` + t1.join().unwrap(); + + tx.send(42).unwrap(); // Should cause `f2` and then `rx3` to get resolved. + let result = rx3.wait().unwrap(); + assert_eq!(result, 42); + t2.join().unwrap(); +} + +#[test] +fn drop_in_poll() { + let slot = Rc::new(RefCell::new(None)); + let slot2 = slot.clone(); + let future = future::poll_fn(move || { + drop(slot2.borrow_mut().take().unwrap()); + Ok::<_, u32>(1.into()) + }).shared(); + let future2 = Box::new(future.clone()) as Box<Future<Item=_, Error=_>>; + *slot.borrow_mut() = Some(future2); + assert_eq!(*future.wait().unwrap(), 1); +} + +#[test] +fn peek() { + let core = ::support::local_executor::Core::new(); + + let (tx0, rx0) = oneshot::channel::<u32>(); + let f1 = rx0.shared(); + let f2 = f1.clone(); + + // Repeated calls on the original or clone do not change the outcome. + for _ in 0..2 { + assert!(f1.peek().is_none()); + assert!(f2.peek().is_none()); + } + + // Completing the underlying future has no effect, because the value has not been `poll`ed in. + tx0.send(42).unwrap(); + for _ in 0..2 { + assert!(f1.peek().is_none()); + assert!(f2.peek().is_none()); + } + + // Once the Shared has been polled, the value is peekable on the clone. + core.spawn(f1.map(|_|()).map_err(|_|())); + core.run(future::ok::<(),()>(())).unwrap(); + for _ in 0..2 { + assert_eq!(42, *f2.peek().unwrap().unwrap()); + } +} + +#[test] +fn polled_then_ignored() { + let core = ::support::local_executor::Core::new(); + + let (tx0, rx0) = oneshot::channel::<u32>(); + let f1 = rx0.shared(); + let f2 = f1.clone(); + + let (tx1, rx1) = oneshot::channel::<u32>(); + let (tx2, rx2) = oneshot::channel::<u32>(); + let (tx3, rx3) = oneshot::channel::<u32>(); + + core.spawn(f1.map(|n| tx3.send(*n).unwrap()).map_err(|_|())); + + core.run(future::ok::<(),()>(())).unwrap(); // Allow f1 to be polled. + + core.spawn(f2.map_err(|_| ()).map(|x| *x).select(rx2.map_err(|_| ())).map_err(|_| ()) + .and_then(|(_, f2)| rx3.map_err(|_| ()).map(move |n| {drop(f2); tx1.send(n).unwrap()}))); + + core.run(future::ok::<(),()>(())).unwrap(); // Allow f2 to be polled. + + tx2.send(11).unwrap(); // Resolve rx2, causing f2 to no longer get polled. + + core.run(future::ok::<(),()>(())).unwrap(); // Let the send() propagate. + + tx0.send(42).unwrap(); // Should cause f1, then rx3, and then rx1 to resolve. + + assert_eq!(core.run(rx1).unwrap(), 42); +} + +#[test] +fn recursive_poll() { + use futures::sync::mpsc; + use futures::Stream; + + let core = ::support::local_executor::Core::new(); + let (tx0, rx0) = mpsc::unbounded::<Box<Future<Item=(),Error=()>>>(); + let run_stream = rx0.for_each(|f| f); + + let (tx1, rx1) = oneshot::channel::<()>(); + + let f1 = run_stream.shared(); + let f2 = f1.clone(); + let f3 = f1.clone(); + tx0.unbounded_send(Box::new( + f1.map(|_|()).map_err(|_|()) + .select(rx1.map_err(|_|())) + .map(|_| ()).map_err(|_|()))).unwrap(); + + core.spawn(f2.map(|_|()).map_err(|_|())); + + // Call poll() on the spawned future. We want to be sure that this does not trigger a + // deadlock or panic due to a recursive lock() on a mutex. + core.run(future::ok::<(),()>(())).unwrap(); + + tx1.send(()).unwrap(); // Break the cycle. + drop(tx0); + core.run(f3).unwrap(); +} + +#[test] +fn recursive_poll_with_unpark() { + use futures::sync::mpsc; + use futures::{Stream, task}; + + let core = ::support::local_executor::Core::new(); + let (tx0, rx0) = mpsc::unbounded::<Box<Future<Item=(),Error=()>>>(); + let run_stream = rx0.for_each(|f| f); + + let (tx1, rx1) = oneshot::channel::<()>(); + + let f1 = run_stream.shared(); + let f2 = f1.clone(); + let f3 = f1.clone(); + tx0.unbounded_send(Box::new(future::lazy(move || { + task::current().notify(); + f1.map(|_|()).map_err(|_|()) + .select(rx1.map_err(|_|())) + .map(|_| ()).map_err(|_|()) + }))).unwrap(); + + core.spawn(f2.map(|_|()).map_err(|_|())); + + // Call poll() on the spawned future. We want to be sure that this does not trigger a + // deadlock or panic due to a recursive lock() on a mutex. + core.run(future::ok::<(),()>(())).unwrap(); + + tx1.send(()).unwrap(); // Break the cycle. + drop(tx0); + core.run(f3).unwrap(); +} + +#[test] +fn shared_future_that_wakes_itself_until_pending_is_returned() { + use futures::Async; + use std::cell::Cell; + + let core = ::support::local_executor::Core::new(); + + let proceed = Cell::new(false); + let fut = futures::future::poll_fn(|| { + Ok::<_, ()>(if proceed.get() { + Async::Ready(()) + } else { + futures::task::current().notify(); + Async::NotReady + }) + }) + .shared() + .map(|_| ()) + .map_err(|_| ()); + + // The join future can only complete if the second future gets a chance to run after the first + // has returned pending + let second = futures::future::lazy(|| { + proceed.set(true); + Ok::<_, ()>(()) + }); + + core.run(fut.join(second)).unwrap(); +} diff --git a/third_party/rust/futures-0.1.31/tests/sink.rs b/third_party/rust/futures-0.1.31/tests/sink.rs new file mode 100644 index 0000000000..460dbdf20c --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/sink.rs @@ -0,0 +1,446 @@ +#![allow(bare_trait_objects, unknown_lints)] + +extern crate futures; + +use std::mem; +use std::sync::Arc; +use std::rc::Rc; +use std::cell::{Cell, RefCell}; +use std::sync::atomic::{Ordering, AtomicBool}; + +use futures::prelude::*; +use futures::future::ok; +use futures::stream; +use futures::sync::{oneshot, mpsc}; +use futures::task::{self, Task}; +use futures::executor::{self, Notify}; +use futures::sink::SinkFromErr; + +mod support; +use support::*; + +#[test] +fn vec_sink() { + let mut v = Vec::new(); + assert_eq!(v.start_send(0), Ok(AsyncSink::Ready)); + assert_eq!(v.start_send(1), Ok(AsyncSink::Ready)); + assert_eq!(v, vec![0, 1]); + assert_done(move || v.flush(), Ok(vec![0, 1])); +} + +#[test] +fn send() { + let v = Vec::new(); + + let v = v.send(0).wait().unwrap(); + assert_eq!(v, vec![0]); + + let v = v.send(1).wait().unwrap(); + assert_eq!(v, vec![0, 1]); + + assert_done(move || v.send(2), + Ok(vec![0, 1, 2])); +} + +#[test] +fn send_all() { + let v = Vec::new(); + + let (v, _) = v.send_all(stream::iter_ok(vec![0, 1])).wait().unwrap(); + assert_eq!(v, vec![0, 1]); + + let (v, _) = v.send_all(stream::iter_ok(vec![2, 3])).wait().unwrap(); + assert_eq!(v, vec![0, 1, 2, 3]); + + assert_done( + move || v.send_all(stream::iter_ok(vec![4, 5])).map(|(v, _)| v), + Ok(vec![0, 1, 2, 3, 4, 5])); +} + +// An Unpark struct that records unpark events for inspection +struct Flag(pub AtomicBool); + +impl Flag { + fn new() -> Arc<Flag> { + Arc::new(Flag(AtomicBool::new(false))) + } + + fn get(&self) -> bool { + self.0.load(Ordering::SeqCst) + } + + fn set(&self, v: bool) { + self.0.store(v, Ordering::SeqCst) + } +} + +impl Notify for Flag { + fn notify(&self, _id: usize) { + self.set(true) + } +} + +// Sends a value on an i32 channel sink +struct StartSendFut<S: Sink>(Option<S>, Option<S::SinkItem>); + +impl<S: Sink> StartSendFut<S> { + fn new(sink: S, item: S::SinkItem) -> StartSendFut<S> { + StartSendFut(Some(sink), Some(item)) + } +} + +impl<S: Sink> Future for StartSendFut<S> { + type Item = S; + type Error = S::SinkError; + + fn poll(&mut self) -> Poll<S, S::SinkError> { + match self.0.as_mut().unwrap().start_send(self.1.take().unwrap())? { + AsyncSink::Ready => Ok(Async::Ready(self.0.take().unwrap())), + AsyncSink::NotReady(item) => { + self.1 = Some(item); + Ok(Async::NotReady) + } + } + + } +} + +#[test] +// Test that `start_send` on an `mpsc` channel does indeed block when the +// channel is full +fn mpsc_blocking_start_send() { + let (mut tx, mut rx) = mpsc::channel::<i32>(0); + + futures::future::lazy(|| { + assert_eq!(tx.start_send(0).unwrap(), AsyncSink::Ready); + + let flag = Flag::new(); + let mut task = executor::spawn(StartSendFut::new(tx, 1)); + + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + assert!(!flag.get()); + sassert_next(&mut rx, 0); + assert!(flag.get()); + flag.set(false); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_ready()); + assert!(!flag.get()); + sassert_next(&mut rx, 1); + + Ok::<(), ()>(()) + }).wait().unwrap(); +} + +#[test] +// test `flush` by using `with` to make the first insertion into a sink block +// until a oneshot is completed +fn with_flush() { + let (tx, rx) = oneshot::channel(); + let mut block = Box::new(rx) as Box<Future<Item = _, Error = _>>; + let mut sink = Vec::new().with(|elem| { + mem::replace(&mut block, Box::new(ok(()))) + .map(move |_| elem + 1).map_err(|_| -> () { panic!() }) + }); + + assert_eq!(sink.start_send(0), Ok(AsyncSink::Ready)); + + let flag = Flag::new(); + let mut task = executor::spawn(sink.flush()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + tx.send(()).unwrap(); + assert!(flag.get()); + + let sink = match task.poll_future_notify(&flag, 0).unwrap() { + Async::Ready(sink) => sink, + _ => panic!() + }; + + assert_eq!(sink.send(1).wait().unwrap().get_ref(), &[1, 2]); +} + +#[test] +// test simple use of with to change data +fn with_as_map() { + let sink = Vec::new().with(|item| -> Result<i32, ()> { + Ok(item * 2) + }); + let sink = sink.send(0).wait().unwrap(); + let sink = sink.send(1).wait().unwrap(); + let sink = sink.send(2).wait().unwrap(); + assert_eq!(sink.get_ref(), &[0, 2, 4]); +} + +#[test] +// test simple use of with_flat_map +fn with_flat_map() { + let sink = Vec::new().with_flat_map(|item| { + stream::iter_ok(vec![item; item]) + }); + let sink = sink.send(0).wait().unwrap(); + let sink = sink.send(1).wait().unwrap(); + let sink = sink.send(2).wait().unwrap(); + let sink = sink.send(3).wait().unwrap(); + assert_eq!(sink.get_ref(), &[1,2,2,3,3,3]); +} + +// Immediately accepts all requests to start pushing, but completion is managed +// by manually flushing +struct ManualFlush<T> { + data: Vec<T>, + waiting_tasks: Vec<Task>, +} + +impl<T> Sink for ManualFlush<T> { + type SinkItem = Option<T>; // Pass None to flush + type SinkError = (); + + fn start_send(&mut self, op: Option<T>) -> StartSend<Option<T>, ()> { + if let Some(item) = op { + self.data.push(item); + } else { + self.force_flush(); + } + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), ()> { + if self.data.is_empty() { + Ok(Async::Ready(())) + } else { + self.waiting_tasks.push(task::current()); + Ok(Async::NotReady) + } + } + + fn close(&mut self) -> Poll<(), ()> { + Ok(().into()) + } +} + +impl<T> ManualFlush<T> { + fn new() -> ManualFlush<T> { + ManualFlush { + data: Vec::new(), + waiting_tasks: Vec::new() + } + } + + fn force_flush(&mut self) -> Vec<T> { + for task in self.waiting_tasks.drain(..) { + task.notify() + } + mem::replace(&mut self.data, Vec::new()) + } +} + +#[test] +// test that the `with` sink doesn't require the underlying sink to flush, +// but doesn't claim to be flushed until the underlying sink is +fn with_flush_propagate() { + let mut sink = ManualFlush::new().with(|x| -> Result<Option<i32>, ()> { Ok(x) }); + assert_eq!(sink.start_send(Some(0)).unwrap(), AsyncSink::Ready); + assert_eq!(sink.start_send(Some(1)).unwrap(), AsyncSink::Ready); + + let flag = Flag::new(); + let mut task = executor::spawn(sink.flush()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + assert!(!flag.get()); + assert_eq!(task.get_mut().get_mut().get_mut().force_flush(), vec![0, 1]); + assert!(flag.get()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_ready()); +} + +#[test] +// test that a buffer is a no-nop around a sink that always accepts sends +fn buffer_noop() { + let sink = Vec::new().buffer(0); + let sink = sink.send(0).wait().unwrap(); + let sink = sink.send(1).wait().unwrap(); + assert_eq!(sink.get_ref(), &[0, 1]); + + let sink = Vec::new().buffer(1); + let sink = sink.send(0).wait().unwrap(); + let sink = sink.send(1).wait().unwrap(); + assert_eq!(sink.get_ref(), &[0, 1]); +} + +struct ManualAllow<T> { + data: Vec<T>, + allow: Rc<Allow>, +} + +struct Allow { + flag: Cell<bool>, + tasks: RefCell<Vec<Task>>, +} + +impl Allow { + fn new() -> Allow { + Allow { + flag: Cell::new(false), + tasks: RefCell::new(Vec::new()), + } + } + + fn check(&self) -> bool { + if self.flag.get() { + true + } else { + self.tasks.borrow_mut().push(task::current()); + false + } + } + + fn start(&self) { + self.flag.set(true); + let mut tasks = self.tasks.borrow_mut(); + for task in tasks.drain(..) { + task.notify(); + } + } +} + +impl<T> Sink for ManualAllow<T> { + type SinkItem = T; + type SinkError = (); + + fn start_send(&mut self, item: T) -> StartSend<T, ()> { + if self.allow.check() { + self.data.push(item); + Ok(AsyncSink::Ready) + } else { + Ok(AsyncSink::NotReady(item)) + } + } + + fn poll_complete(&mut self) -> Poll<(), ()> { + Ok(Async::Ready(())) + } + + fn close(&mut self) -> Poll<(), ()> { + Ok(().into()) + } +} + +fn manual_allow<T>() -> (ManualAllow<T>, Rc<Allow>) { + let allow = Rc::new(Allow::new()); + let manual_allow = ManualAllow { + data: Vec::new(), + allow: allow.clone(), + }; + (manual_allow, allow) +} + +#[test] +// test basic buffer functionality, including both filling up to capacity, +// and writing out when the underlying sink is ready +fn buffer() { + let (sink, allow) = manual_allow::<i32>(); + let sink = sink.buffer(2); + + let sink = StartSendFut::new(sink, 0).wait().unwrap(); + let sink = StartSendFut::new(sink, 1).wait().unwrap(); + + let flag = Flag::new(); + let mut task = executor::spawn(sink.send(2)); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + assert!(!flag.get()); + allow.start(); + assert!(flag.get()); + match task.poll_future_notify(&flag, 0).unwrap() { + Async::Ready(sink) => { + assert_eq!(sink.get_ref().data, vec![0, 1, 2]); + } + _ => panic!() + } +} + +#[test] +fn fanout_smoke() { + let sink1 = Vec::new(); + let sink2 = Vec::new(); + let sink = sink1.fanout(sink2); + let stream = futures::stream::iter_ok(vec![1,2,3]); + let (sink, _) = sink.send_all(stream).wait().unwrap(); + let (sink1, sink2) = sink.into_inner(); + assert_eq!(sink1, vec![1,2,3]); + assert_eq!(sink2, vec![1,2,3]); +} + +#[test] +fn fanout_backpressure() { + let (left_send, left_recv) = mpsc::channel(0); + let (right_send, right_recv) = mpsc::channel(0); + let sink = left_send.fanout(right_send); + + let sink = StartSendFut::new(sink, 0).wait().unwrap(); + let sink = StartSendFut::new(sink, 1).wait().unwrap(); + + let flag = Flag::new(); + let mut task = executor::spawn(sink.send(2)); + assert!(!flag.get()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + let (item, left_recv) = left_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(0)); + assert!(flag.get()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + let (item, right_recv) = right_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(0)); + assert!(flag.get()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + let (item, left_recv) = left_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(1)); + assert!(flag.get()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + let (item, right_recv) = right_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(1)); + assert!(flag.get()); + let (item, left_recv) = left_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(2)); + assert!(flag.get()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + let (item, right_recv) = right_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(2)); + match task.poll_future_notify(&flag, 0).unwrap() { + Async::Ready(_) => { + }, + _ => panic!() + }; + // make sure receivers live until end of test to prevent send errors + drop(left_recv); + drop(right_recv); +} + +#[test] +fn map_err() { + { + let (tx, _rx) = mpsc::channel(1); + let mut tx = tx.sink_map_err(|_| ()); + assert_eq!(tx.start_send(()), Ok(AsyncSink::Ready)); + assert_eq!(tx.poll_complete(), Ok(Async::Ready(()))); + } + + let tx = mpsc::channel(0).0; + assert_eq!(tx.sink_map_err(|_| ()).start_send(()), Err(())); +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +struct FromErrTest; + +impl<T> From<mpsc::SendError<T>> for FromErrTest { + fn from(_: mpsc::SendError<T>) -> FromErrTest { + FromErrTest + } +} + +#[test] +fn from_err() { + { + let (tx, _rx) = mpsc::channel(1); + let mut tx: SinkFromErr<mpsc::Sender<()>, FromErrTest> = tx.sink_from_err(); + assert_eq!(tx.start_send(()), Ok(AsyncSink::Ready)); + assert_eq!(tx.poll_complete(), Ok(Async::Ready(()))); + } + + let tx = mpsc::channel(0).0; + assert_eq!(tx.sink_from_err().start_send(()), Err(FromErrTest)); +} diff --git a/third_party/rust/futures-0.1.31/tests/split.rs b/third_party/rust/futures-0.1.31/tests/split.rs new file mode 100644 index 0000000000..7a0667f135 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/split.rs @@ -0,0 +1,47 @@ +extern crate futures; + +use futures::prelude::*; +use futures::stream::iter_ok; + +struct Join<T, U>(T, U); + +impl<T: Stream, U> Stream for Join<T, U> { + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> { + self.0.poll() + } +} + +impl<T, U: Sink> Sink for Join<T, U> { + type SinkItem = U::SinkItem; + type SinkError = U::SinkError; + + fn start_send(&mut self, item: U::SinkItem) + -> StartSend<U::SinkItem, U::SinkError> + { + self.1.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), U::SinkError> { + self.1.poll_complete() + } + + fn close(&mut self) -> Poll<(), U::SinkError> { + self.1.close() + } +} + +#[test] +fn test_split() { + let mut dest = Vec::new(); + { + let j = Join(iter_ok(vec![10, 20, 30]), &mut dest); + let (sink, stream) = j.split(); + let j = sink.reunite(stream).expect("test_split: reunite error"); + let (sink, stream) = j.split(); + sink.send_all(stream).wait().unwrap(); + } + assert_eq!(dest, vec![10, 20, 30]); +} diff --git a/third_party/rust/futures-0.1.31/tests/stream.rs b/third_party/rust/futures-0.1.31/tests/stream.rs new file mode 100644 index 0000000000..2400a2abb1 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/stream.rs @@ -0,0 +1,416 @@ +#![allow(bare_trait_objects, unknown_lints)] + +#[macro_use] +extern crate futures; + +use futures::prelude::*; +use futures::executor; +use futures::future::{err, ok}; +use futures::stream::{empty, iter_ok, poll_fn, Peekable}; +use futures::sync::oneshot; +use futures::sync::mpsc; + +mod support; +use support::*; + +pub struct Iter<I> { + iter: I, +} + +pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter> + where J: IntoIterator<Item=Result<T, E>>, +{ + Iter { + iter: i.into_iter(), + } +} + +impl<I, T, E> Stream for Iter<I> + where I: Iterator<Item=Result<T, E>>, +{ + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<Option<T>, E> { + match self.iter.next() { + Some(Ok(e)) => Ok(Async::Ready(Some(e))), + Some(Err(e)) => Err(e), + None => Ok(Async::Ready(None)), + } + } +} + +fn list() -> Box<Stream<Item=i32, Error=u32> + Send> { + let (tx, rx) = mpsc::channel(1); + tx.send(Ok(1)) + .and_then(|tx| tx.send(Ok(2))) + .and_then(|tx| tx.send(Ok(3))) + .forget(); + Box::new(rx.then(|r| r.unwrap())) +} + +fn err_list() -> Box<Stream<Item=i32, Error=u32> + Send> { + let (tx, rx) = mpsc::channel(1); + tx.send(Ok(1)) + .and_then(|tx| tx.send(Ok(2))) + .and_then(|tx| tx.send(Err(3))) + .forget(); + Box::new(rx.then(|r| r.unwrap())) +} + +#[test] +fn map() { + assert_done(|| list().map(|a| a + 1).collect(), Ok(vec![2, 3, 4])); +} + +#[test] +fn map_err() { + assert_done(|| err_list().map_err(|a| a + 1).collect(), Err(4)); +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +struct FromErrTest(u32); + +impl From<u32> for FromErrTest { + fn from(i: u32) -> FromErrTest { + FromErrTest(i) + } +} + +#[test] +fn from_err() { + assert_done(|| err_list().from_err().collect(), Err(FromErrTest(3))); +} + +#[test] +fn fold() { + assert_done(|| list().fold(0, |a, b| ok::<i32, u32>(a + b)), Ok(6)); + assert_done(|| err_list().fold(0, |a, b| ok::<i32, u32>(a + b)), Err(3)); +} + +#[test] +fn filter() { + assert_done(|| list().filter(|a| *a % 2 == 0).collect(), Ok(vec![2])); +} + +#[test] +fn filter_map() { + assert_done(|| list().filter_map(|x| { + if x % 2 == 0 { + Some(x + 10) + } else { + None + } + }).collect(), Ok(vec![12])); +} + +#[test] +fn and_then() { + assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4])); + assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect(), + Err(1)); +} + +#[test] +fn then() { + assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4])); + +} + +#[test] +fn or_else() { + assert_done(|| err_list().or_else(|a| { + ok::<i32, u32>(a as i32) + }).collect(), Ok(vec![1, 2, 3])); +} + +#[test] +fn flatten() { + assert_done(|| list().map(|_| list()).flatten().collect(), + Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3])); + +} + +#[test] +fn skip() { + assert_done(|| list().skip(2).collect(), Ok(vec![3])); +} + +#[test] +fn skip_passes_errors_through() { + let mut s = iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]) + .skip(1) + .wait(); + assert_eq!(s.next(), Some(Err(1))); + assert_eq!(s.next(), Some(Err(2))); + assert_eq!(s.next(), Some(Ok(4))); + assert_eq!(s.next(), Some(Ok(5))); + assert_eq!(s.next(), None); +} + +#[test] +fn skip_while() { + assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), + Ok(vec![2, 3])); +} +#[test] +fn take() { + assert_done(|| list().take(2).collect(), Ok(vec![1, 2])); +} + +#[test] +fn take_while() { + assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), + Ok(vec![1, 2])); +} + +#[test] +fn take_passes_errors_through() { + let mut s = iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)]) + .take(1) + .wait(); + assert_eq!(s.next(), Some(Err(1))); + assert_eq!(s.next(), Some(Err(2))); + assert_eq!(s.next(), Some(Ok(3))); + assert_eq!(s.next(), None); + + let mut s = iter(vec![Ok(1), Err(2)]).take(1).wait(); + assert_eq!(s.next(), Some(Ok(1))); + assert_eq!(s.next(), None); +} + +#[test] +fn peekable() { + assert_done(|| list().peekable().collect(), Ok(vec![1, 2, 3])); +} + +#[test] +fn fuse() { + let mut stream = list().fuse().wait(); + assert_eq!(stream.next(), Some(Ok(1))); + assert_eq!(stream.next(), Some(Ok(2))); + assert_eq!(stream.next(), Some(Ok(3))); + assert_eq!(stream.next(), None); + assert_eq!(stream.next(), None); + assert_eq!(stream.next(), None); +} + +#[test] +fn buffered() { + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::<u32>(); + let (c, d) = oneshot::channel::<u32>(); + + tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| ())))) + .forget(); + + let mut rx = rx.buffered(2); + sassert_empty(&mut rx); + c.send(3).unwrap(); + sassert_empty(&mut rx); + a.send(5).unwrap(); + let mut rx = rx.wait(); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), Some(Ok(3))); + assert_eq!(rx.next(), None); + + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::<u32>(); + let (c, d) = oneshot::channel::<u32>(); + + tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| ())))) + .forget(); + + let mut rx = rx.buffered(1); + sassert_empty(&mut rx); + c.send(3).unwrap(); + sassert_empty(&mut rx); + a.send(5).unwrap(); + let mut rx = rx.wait(); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), Some(Ok(3))); + assert_eq!(rx.next(), None); +} + +#[test] +fn unordered() { + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::<u32>(); + let (c, d) = oneshot::channel::<u32>(); + + tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| ())))) + .forget(); + + let mut rx = rx.buffer_unordered(2); + sassert_empty(&mut rx); + let mut rx = rx.wait(); + c.send(3).unwrap(); + assert_eq!(rx.next(), Some(Ok(3))); + a.send(5).unwrap(); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), None); + + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::<u32>(); + let (c, d) = oneshot::channel::<u32>(); + + tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| ())))) + .forget(); + + // We don't even get to see `c` until `a` completes. + let mut rx = rx.buffer_unordered(1); + sassert_empty(&mut rx); + c.send(3).unwrap(); + sassert_empty(&mut rx); + a.send(5).unwrap(); + let mut rx = rx.wait(); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), Some(Ok(3))); + assert_eq!(rx.next(), None); +} + +#[test] +fn zip() { + assert_done(|| list().zip(list()).collect(), + Ok(vec![(1, 1), (2, 2), (3, 3)])); + assert_done(|| list().zip(list().take(2)).collect(), + Ok(vec![(1, 1), (2, 2)])); + assert_done(|| list().take(2).zip(list()).collect(), + Ok(vec![(1, 1), (2, 2)])); + assert_done(|| err_list().zip(list()).collect(), Err(3)); + assert_done(|| list().zip(list().map(|x| x + 1)).collect(), + Ok(vec![(1, 2), (2, 3), (3, 4)])); +} + +#[test] +fn peek() { + struct Peek { + inner: Peekable<Box<Stream<Item = i32, Error =u32> + Send>> + } + + impl Future for Peek { + type Item = (); + type Error = u32; + + fn poll(&mut self) -> Poll<(), u32> { + { + let res = try_ready!(self.inner.peek()); + assert_eq!(res, Some(&1)); + } + assert_eq!(self.inner.peek().unwrap(), Some(&1).into()); + assert_eq!(self.inner.poll().unwrap(), Some(1).into()); + Ok(().into()) + } + } + + Peek { + inner: list().peekable(), + }.wait().unwrap() +} + +#[test] +fn wait() { + assert_eq!(list().wait().collect::<Result<Vec<_>, _>>(), + Ok(vec![1, 2, 3])); +} + +#[test] +fn chunks() { + assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]])); + assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]])); + assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]])); + let mut list = executor::spawn(err_list().chunks(3)); + let i = list.wait_stream().unwrap().unwrap(); + assert_eq!(i, vec![1, 2]); + let i = list.wait_stream().unwrap().unwrap_err(); + assert_eq!(i, 3); +} + +#[test] +#[should_panic] +fn chunks_panic_on_cap_zero() { + let _ = list().chunks(0); +} + +#[test] +fn select() { + let a = iter_ok::<_, u32>(vec![1, 2, 3]); + let b = iter_ok(vec![4, 5, 6]); + assert_done(|| a.select(b).collect(), Ok(vec![1, 4, 2, 5, 3, 6])); + + let a = iter_ok::<_, u32>(vec![1, 2, 3]); + let b = iter_ok(vec![1, 2]); + assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3])); + + let a = iter_ok(vec![1, 2]); + let b = iter_ok::<_, u32>(vec![1, 2, 3]); + assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3])); +} + +#[test] +fn forward() { + let v = Vec::new(); + let v = iter_ok::<_, ()>(vec![0, 1]).forward(v).wait().unwrap().1; + assert_eq!(v, vec![0, 1]); + + let v = iter_ok::<_, ()>(vec![2, 3]).forward(v).wait().unwrap().1; + assert_eq!(v, vec![0, 1, 2, 3]); + + assert_done(move || iter_ok(vec![4, 5]).forward(v).map(|(_, s)| s), + Ok::<_, ()>(vec![0, 1, 2, 3, 4, 5])); +} + +#[test] +#[allow(deprecated)] +fn concat() { + let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]); + assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])); + + let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]); + assert_done(move || b.concat(), Err(())); +} + +#[test] +fn concat2() { + let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]); + assert_done(move || a.concat2(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])); + + let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]); + assert_done(move || b.concat2(), Err(())); + + let c = empty::<Vec<()>, ()>(); + assert_done(move || c.concat2(), Ok(vec![])) +} + +#[test] +fn stream_poll_fn() { + let mut counter = 5usize; + + let read_stream = poll_fn(move || -> Poll<Option<usize>, std::io::Error> { + if counter == 0 { + return Ok(Async::Ready(None)); + } + counter -= 1; + Ok(Async::Ready(Some(counter))) + }); + + assert_eq!(read_stream.wait().count(), 5); +} + +#[test] +fn inspect() { + let mut seen = vec![]; + assert_done(|| list().inspect(|&a| seen.push(a)).collect(), Ok(vec![1, 2, 3])); + assert_eq!(seen, [1, 2, 3]); +} + +#[test] +fn inspect_err() { + let mut seen = vec![]; + assert_done(|| err_list().inspect_err(|&a| seen.push(a)).collect(), Err(3)); + assert_eq!(seen, [3]); +} diff --git a/third_party/rust/futures-0.1.31/tests/stream_catch_unwind.rs b/third_party/rust/futures-0.1.31/tests/stream_catch_unwind.rs new file mode 100644 index 0000000000..a06748d09a --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/stream_catch_unwind.rs @@ -0,0 +1,29 @@ +extern crate futures; + +use futures::stream; +use futures::prelude::*; + +#[test] +fn panic_in_the_middle_of_the_stream() { + let stream = stream::iter_ok::<_, bool>(vec![Some(10), None, Some(11)]); + + // panic on second element + let stream_panicking = stream.map(|o| o.unwrap()); + let mut iter = stream_panicking.catch_unwind().wait(); + + assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap()); + assert!(iter.next().unwrap().is_err()); + assert!(iter.next().is_none()); +} + +#[test] +fn no_panic() { + let stream = stream::iter_ok::<_, bool>(vec![10, 11, 12]); + + let mut iter = stream.catch_unwind().wait(); + + assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap()); + assert_eq!(Ok(11), iter.next().unwrap().ok().unwrap()); + assert_eq!(Ok(12), iter.next().unwrap().ok().unwrap()); + assert!(iter.next().is_none()); +} diff --git a/third_party/rust/futures-0.1.31/tests/support/local_executor.rs b/third_party/rust/futures-0.1.31/tests/support/local_executor.rs new file mode 100644 index 0000000000..cf89e8152f --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/support/local_executor.rs @@ -0,0 +1,164 @@ +//! Execution of futures on a single thread +//! +//! This module has no special handling of any blocking operations other than +//! futures-aware inter-thread communications, and is not intended to be used to +//! manage I/O. For futures that do I/O you'll likely want to use `tokio-core`. + +#![allow(bare_trait_objects, unknown_lints)] + +use std::cell::{Cell, RefCell}; +use std::sync::{Arc, Mutex, mpsc}; + +use futures::executor::{self, Spawn, Notify}; +use futures::future::{Executor, ExecuteError}; +use futures::{Future, Async}; + +/// Main loop object +pub struct Core { + tx: mpsc::Sender<usize>, + rx: mpsc::Receiver<usize>, + notify: Arc<MyNotify>, + + // Slab of running futures used to track what's running and what slots are + // empty. Slot indexes are then sent along tx/rx above to indicate which + // future is ready to get polled. + tasks: RefCell<Vec<Slot>>, + next_vacant: Cell<usize>, +} + +enum Slot { + Vacant { next_vacant: usize }, + Running(Option<Spawn<Box<Future<Item = (), Error = ()>>>>), +} + +impl Core { + /// Create a new `Core`. + pub fn new() -> Self { + let (tx, rx) = mpsc::channel(); + Core { + notify: Arc::new(MyNotify { + tx: Mutex::new(tx.clone()), + }), + tx: tx, + rx: rx, + next_vacant: Cell::new(0), + tasks: RefCell::new(Vec::new()), + } + } + + /// Spawn a future to be executed by a future call to `run`. + /// + /// The future `f` provided will not be executed until `run` is called + /// below. While futures passed to `run` are executing, the future provided + /// here will be executed concurrently as well. + pub fn spawn<F>(&self, f: F) + where F: Future<Item=(), Error=()> + 'static + { + let idx = self.next_vacant.get(); + let mut tasks = self.tasks.borrow_mut(); + match tasks.get_mut(idx) { + Some(&mut Slot::Vacant { next_vacant }) => { + self.next_vacant.set(next_vacant); + } + Some(&mut Slot::Running (_)) => { + panic!("vacant points to running future") + } + None => { + assert_eq!(idx, tasks.len()); + tasks.push(Slot::Vacant { next_vacant: 0 }); + self.next_vacant.set(idx + 1); + } + } + tasks[idx] = Slot::Running(Some(executor::spawn(Box::new(f)))); + self.tx.send(idx).unwrap(); + } + + /// Run the loop until the future `f` completes. + /// + /// This method will block the current thread until the future `f` has + /// resolved. While waiting on `f` to finish it will also execute any + /// futures spawned via `spawn` above. + pub fn run<F>(&self, f: F) -> Result<F::Item, F::Error> + where F: Future, + { + let id = usize::max_value(); + self.tx.send(id).unwrap(); + let mut f = executor::spawn(f); + loop { + if self.turn() { + match f.poll_future_notify(&self.notify, id)? { + Async::Ready(e) => return Ok(e), + Async::NotReady => {} + } + } + } + } + + /// "Turns" this event loop one tick. + /// + /// This'll block the current thread until something happens, and once an + /// event happens this will act on that event. + /// + /// # Return value + /// + /// Returns `true` if the future passed to `run` should be polled or `false` + /// otherwise. + fn turn(&self) -> bool { + let task_id = self.rx.recv().unwrap(); + if task_id == usize::max_value() { + return true + } + + // This may be a spurious wakeup so we're not guaranteed to have a + // future associated with `task_id`, so do a fallible lookup. + // + // Note that we don't want to borrow `self.tasks` for too long so we + // try to extract the future here and leave behind a tombstone future + // which'll get replaced or removed later. This is how we support + // spawn-in-run. + let mut future = match self.tasks.borrow_mut().get_mut(task_id) { + Some(&mut Slot::Running(ref mut future)) => future.take().unwrap(), + Some(&mut Slot::Vacant { .. }) => return false, + None => return false, + }; + + // Drive this future forward. If it's done we remove it and if it's not + // done then we put it back in the tasks array. + let done = match future.poll_future_notify(&self.notify, task_id) { + Ok(Async::Ready(())) | Err(()) => true, + Ok(Async::NotReady) => false, + }; + let mut tasks = self.tasks.borrow_mut(); + if done { + tasks[task_id] = Slot::Vacant { next_vacant: self.next_vacant.get() }; + self.next_vacant.set(task_id); + } else { + tasks[task_id] = Slot::Running(Some(future)); + } + + return false + } +} + +impl<F> Executor<F> for Core + where F: Future<Item = (), Error = ()> + 'static, +{ + fn execute(&self, future: F) -> Result<(), ExecuteError<F>> { + self.spawn(future); + Ok(()) + } +} + +struct MyNotify { + // TODO: it's pretty unfortunate to use a `Mutex` here where the `Sender` + // itself is basically `Sync` as-is. Ideally this'd use something like + // an off-the-shelf mpsc queue as well as `thread::park` and + // `Thread::unpark`. + tx: Mutex<mpsc::Sender<usize>>, +} + +impl Notify for MyNotify { + fn notify(&self, id: usize) { + drop(self.tx.lock().unwrap().send(id)); + } +} diff --git a/third_party/rust/futures-0.1.31/tests/support/mod.rs b/third_party/rust/futures-0.1.31/tests/support/mod.rs new file mode 100644 index 0000000000..297749777a --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/support/mod.rs @@ -0,0 +1,134 @@ +#![allow(dead_code)] + +use std::fmt; +use std::sync::Arc; +use std::thread; + +use futures::{Future, IntoFuture, Async, Poll}; +use futures::future::FutureResult; +use futures::stream::Stream; +use futures::executor::{self, NotifyHandle, Notify}; +use futures::task; + +pub mod local_executor; + +pub fn f_ok(a: i32) -> FutureResult<i32, u32> { Ok(a).into_future() } +pub fn f_err(a: u32) -> FutureResult<i32, u32> { Err(a).into_future() } +pub fn r_ok(a: i32) -> Result<i32, u32> { Ok(a) } +pub fn r_err(a: u32) -> Result<i32, u32> { Err(a) } + +pub fn assert_done<T, F>(f: F, result: Result<T::Item, T::Error>) + where T: Future, + T::Item: Eq + fmt::Debug, + T::Error: Eq + fmt::Debug, + F: FnOnce() -> T, +{ + assert_eq!(f().wait(), result); +} + +pub fn assert_empty<T: Future, F: FnMut() -> T>(mut f: F) { + assert!(executor::spawn(f()).poll_future_notify(¬ify_panic(), 0).ok().unwrap().is_not_ready()); +} + +pub fn sassert_done<S: Stream>(s: &mut S) { + match executor::spawn(s).poll_stream_notify(¬ify_panic(), 0) { + Ok(Async::Ready(None)) => {} + Ok(Async::Ready(Some(_))) => panic!("stream had more elements"), + Ok(Async::NotReady) => panic!("stream wasn't ready"), + Err(_) => panic!("stream had an error"), + } +} + +pub fn sassert_empty<S: Stream>(s: &mut S) { + match executor::spawn(s).poll_stream_notify(¬ify_noop(), 0) { + Ok(Async::Ready(None)) => panic!("stream is at its end"), + Ok(Async::Ready(Some(_))) => panic!("stream had more elements"), + Ok(Async::NotReady) => {} + Err(_) => panic!("stream had an error"), + } +} + +pub fn sassert_next<S: Stream>(s: &mut S, item: S::Item) + where S::Item: Eq + fmt::Debug +{ + match executor::spawn(s).poll_stream_notify(¬ify_panic(), 0) { + Ok(Async::Ready(None)) => panic!("stream is at its end"), + Ok(Async::Ready(Some(e))) => assert_eq!(e, item), + Ok(Async::NotReady) => panic!("stream wasn't ready"), + Err(_) => panic!("stream had an error"), + } +} + +pub fn sassert_err<S: Stream>(s: &mut S, err: S::Error) + where S::Error: Eq + fmt::Debug +{ + match executor::spawn(s).poll_stream_notify(¬ify_panic(), 0) { + Ok(Async::Ready(None)) => panic!("stream is at its end"), + Ok(Async::Ready(Some(_))) => panic!("stream had more elements"), + Ok(Async::NotReady) => panic!("stream wasn't ready"), + Err(e) => assert_eq!(e, err), + } +} + +pub fn notify_panic() -> NotifyHandle { + struct Foo; + + impl Notify for Foo { + fn notify(&self, _id: usize) { + panic!("should not be notified"); + } + } + + NotifyHandle::from(Arc::new(Foo)) +} + +pub fn notify_noop() -> NotifyHandle { + struct Noop; + + impl Notify for Noop { + fn notify(&self, _id: usize) {} + } + + const NOOP : &'static Noop = &Noop; + + NotifyHandle::from(NOOP) +} + +pub trait ForgetExt { + fn forget(self); +} + +impl<F> ForgetExt for F + where F: Future + Sized + Send + 'static, + F::Item: Send, + F::Error: Send +{ + fn forget(self) { + thread::spawn(|| self.wait()); + } +} + +pub struct DelayFuture<F>(F,bool); + +impl<F: Future> Future for DelayFuture<F> { + type Item = F::Item; + type Error = F::Error; + + fn poll(&mut self) -> Poll<F::Item,F::Error> { + if self.1 { + self.0.poll() + } else { + self.1 = true; + task::current().notify(); + Ok(Async::NotReady) + } + } +} + +/// Introduces one `Ok(Async::NotReady)` before polling the given future +pub fn delay_future<F>(f: F) -> DelayFuture<F::Future> + where F: IntoFuture, +{ + DelayFuture(f.into_future(), false) +} + diff --git a/third_party/rust/futures-0.1.31/tests/unfold.rs b/third_party/rust/futures-0.1.31/tests/unfold.rs new file mode 100644 index 0000000000..1669a18aa5 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/unfold.rs @@ -0,0 +1,52 @@ +extern crate futures; + +mod support; + +use futures::stream; + +use support::*; + +#[test] +fn unfold1() { + let mut stream = stream::unfold(0, |state| { + if state <= 2 { + let res: Result<_,()> = Ok((state * 2, state + 1)); + Some(delay_future(res)) + } else { + None + } + }); + // Creates the future with the closure + // Not ready (delayed future) + sassert_empty(&mut stream); + // future is ready, yields the item + sassert_next(&mut stream, 0); + + // Repeat + sassert_empty(&mut stream); + sassert_next(&mut stream, 2); + + sassert_empty(&mut stream); + sassert_next(&mut stream, 4); + + // no more items + sassert_done(&mut stream); +} + +#[test] +fn unfold_err1() { + let mut stream = stream::unfold(0, |state| { + if state <= 2 { + Some(Ok((state * 2, state + 1))) + } else { + Some(Err(-1)) + } + }); + sassert_next(&mut stream, 0); + sassert_next(&mut stream, 2); + sassert_next(&mut stream, 4); + sassert_err(&mut stream, -1); + + // An error was generated by the stream, it will then finish + sassert_done(&mut stream); +} diff --git a/third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs b/third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs new file mode 100644 index 0000000000..55b0ca5ac2 --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs @@ -0,0 +1,189 @@ +extern crate futures; + +use futures::prelude::*; +use futures::future; +use futures::unsync::oneshot::{channel, Canceled, spawn}; + +mod support; +use support::local_executor; + +#[test] +fn smoke() { + let (tx, rx) = channel(); + tx.send(33).unwrap(); + assert_eq!(rx.wait().unwrap(), 33); +} + +#[test] +fn canceled() { + let (_, rx) = channel::<()>(); + assert_eq!(rx.wait().unwrap_err(), Canceled); +} + +#[test] +fn poll_cancel() { + let (mut tx, _) = channel::<()>(); + assert!(tx.poll_cancel().unwrap().is_ready()); +} + +#[test] +fn tx_complete_rx_unparked() { + let (tx, rx) = channel(); + + let res = rx.join(future::lazy(move || { + tx.send(55).unwrap(); + Ok(11) + })); + assert_eq!(res.wait().unwrap(), (55, 11)); +} + +#[test] +fn tx_dropped_rx_unparked() { + let (tx, rx) = channel::<i32>(); + + let res = rx.join(future::lazy(move || { + let _tx = tx; + Ok(11) + })); + assert_eq!(res.wait().unwrap_err(), Canceled); +} + + +#[test] +fn is_canceled() { + let (tx, rx) = channel::<u32>(); + assert!(!tx.is_canceled()); + drop(rx); + assert!(tx.is_canceled()); +} + +#[test] +fn spawn_sends_items() { + let core = local_executor::Core::new(); + let future = future::ok::<_, ()>(1); + let rx = spawn(future, &core); + assert_eq!(core.run(rx).unwrap(), 1); +} + +#[test] +fn spawn_kill_dead_stream() { + use std::thread; + use std::time::Duration; + use futures::future::Either; + use futures::sync::oneshot; + + // a future which never returns anything (forever accepting incoming + // connections), but dropping it leads to observable side effects + // (like closing listening sockets, releasing limited resources, + // ...) + #[derive(Debug)] + struct Dead { + // when dropped you should get Err(oneshot::Canceled) on the + // receiving end + done: oneshot::Sender<()>, + } + impl Future for Dead { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + Ok(Async::NotReady) + } + } + + // need to implement a timeout for the test, as it would hang + // forever right now + let (timeout_tx, timeout_rx) = oneshot::channel(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(1000)); + let _ = timeout_tx.send(()); + }); + + let core = local_executor::Core::new(); + let (done_tx, done_rx) = oneshot::channel(); + let future = Dead{done: done_tx}; + let rx = spawn(future, &core); + let res = core.run( + Ok::<_, ()>(()) + .into_future() + .then(move |_| { + // now drop the spawned future: maybe some timeout exceeded, + // or some connection on this end was closed by the remote + // end. + drop(rx); + // and wait for the spawned future to release its resources + done_rx + }) + .select2(timeout_rx) + ); + match res { + Err(Either::A((oneshot::Canceled, _))) => (), + Ok(Either::B(((), _))) => { + panic!("dead future wasn't canceled (timeout)"); + }, + _ => { + panic!("dead future wasn't canceled (unexpected result)"); + }, + } +} + +#[test] +fn spawn_dont_kill_forgot_dead_stream() { + use std::thread; + use std::time::Duration; + use futures::future::Either; + use futures::sync::oneshot; + + // a future which never returns anything (forever accepting incoming + // connections), but dropping it leads to observable side effects + // (like closing listening sockets, releasing limited resources, + // ...) + #[derive(Debug)] + struct Dead { + // when dropped you should get Err(oneshot::Canceled) on the + // receiving end + done: oneshot::Sender<()>, + } + impl Future for Dead { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + Ok(Async::NotReady) + } + } + + // need to implement a timeout for the test, as it would hang + // forever right now + let (timeout_tx, timeout_rx) = oneshot::channel(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(1000)); + let _ = timeout_tx.send(()); + }); + + let core = local_executor::Core::new(); + let (done_tx, done_rx) = oneshot::channel(); + let future = Dead{done: done_tx}; + let rx = spawn(future, &core); + let res = core.run( + Ok::<_, ()>(()) + .into_future() + .then(move |_| { + // forget the spawned future: should keep running, i.e. hit + // the timeout below. + rx.forget(); + // and wait for the spawned future to release its resources + done_rx + }) + .select2(timeout_rx) + ); + match res { + Err(Either::A((oneshot::Canceled, _))) => { + panic!("forgotten dead future was canceled"); + }, + Ok(Either::B(((), _))) => (), // reached timeout + _ => { + panic!("forgotten dead future was canceled (unexpected result)"); + }, + } +} diff --git a/third_party/rust/futures-0.1.31/tests/unsync.rs b/third_party/rust/futures-0.1.31/tests/unsync.rs new file mode 100644 index 0000000000..490db0af1c --- /dev/null +++ b/third_party/rust/futures-0.1.31/tests/unsync.rs @@ -0,0 +1,266 @@ +#![cfg(feature = "use_std")] +#![allow(bare_trait_objects, unknown_lints)] + +extern crate futures; + +mod support; + +use futures::prelude::*; +use futures::unsync::oneshot; +use futures::unsync::mpsc::{self, SendError}; +use futures::future::lazy; +use futures::stream::{iter_ok, unfold}; + +use support::local_executor::Core; + +#[test] +fn mpsc_send_recv() { + let (tx, rx) = mpsc::channel::<i32>(1); + let mut rx = rx.wait(); + + tx.send(42).wait().unwrap(); + + assert_eq!(rx.next(), Some(Ok(42))); + assert_eq!(rx.next(), None); +} + +#[test] +fn mpsc_rx_notready() { + let (_tx, mut rx) = mpsc::channel::<i32>(1); + + lazy(|| { + assert_eq!(rx.poll().unwrap(), Async::NotReady); + Ok(()) as Result<(), ()> + }).wait().unwrap(); +} + +#[test] +fn mpsc_rx_end() { + let (_, mut rx) = mpsc::channel::<i32>(1); + + lazy(|| { + assert_eq!(rx.poll().unwrap(), Async::Ready(None)); + Ok(()) as Result<(), ()> + }).wait().unwrap(); +} + +#[test] +fn mpsc_tx_clone_weak_rc() { + let (tx, mut rx) = mpsc::channel::<i32>(1); // rc = 1 + + let tx_clone = tx.clone(); // rc = 2 + lazy(|| { + assert_eq!(rx.poll().unwrap(), Async::NotReady); + Ok(()) as Result<(), ()> + }).wait().unwrap(); + + drop(tx); // rc = 1 + lazy(|| { + assert_eq!(rx.poll().unwrap(), Async::NotReady); + Ok(()) as Result<(), ()> + }).wait().unwrap(); + + drop(tx_clone); // rc = 0 + lazy(|| { + assert_eq!(rx.poll().unwrap(), Async::Ready(None)); + Ok(()) as Result<(), ()> + }).wait().unwrap(); +} + +#[test] +fn mpsc_tx_notready() { + let (tx, _rx) = mpsc::channel::<i32>(1); + let tx = tx.send(1).wait().unwrap(); + lazy(move || { + assert!(tx.send(2).poll().unwrap().is_not_ready()); + Ok(()) as Result<(), ()> + }).wait().unwrap(); +} + +#[test] +fn mpsc_tx_err() { + let (tx, _) = mpsc::channel::<i32>(1); + lazy(move || { + assert!(tx.send(2).poll().is_err()); + Ok(()) as Result<(), ()> + }).wait().unwrap(); +} + +#[test] +fn mpsc_backpressure() { + let (tx, rx) = mpsc::channel::<i32>(1); + lazy(move || { + iter_ok(vec![1, 2, 3]) + .forward(tx) + .map_err(|e: SendError<i32>| panic!("{}", e)) + .join(rx.take(3).collect().map(|xs| { + assert_eq!(xs, [1, 2, 3]); + })) + }).wait().unwrap(); +} + +#[test] +fn mpsc_unbounded() { + let (tx, rx) = mpsc::unbounded::<i32>(); + lazy(move || { + iter_ok(vec![1, 2, 3]) + .forward(tx) + .map_err(|e: SendError<i32>| panic!("{}", e)) + .join(rx.take(3).collect().map(|xs| { + assert_eq!(xs, [1, 2, 3]); + })) + }).wait().unwrap(); +} + +#[test] +fn mpsc_recv_unpark() { + let core = Core::new(); + let (tx, rx) = mpsc::channel::<i32>(1); + let tx2 = tx.clone(); + core.spawn(rx.collect().map(|xs| assert_eq!(xs, [1, 2]))); + core.spawn(lazy(move || tx.send(1).map(|_| ()).map_err(|e| panic!("{}", e)))); + core.run(lazy(move || tx2.send(2))).unwrap(); +} + +#[test] +fn mpsc_send_unpark() { + let core = Core::new(); + let (tx, rx) = mpsc::channel::<i32>(1); + let (donetx, donerx) = oneshot::channel(); + core.spawn(iter_ok(vec![1, 2]).forward(tx) + .then(|x: Result<_, SendError<i32>>| { + assert!(x.is_err()); + donetx.send(()).unwrap(); + Ok(()) + })); + core.spawn(lazy(move || { let _ = rx; Ok(()) })); + core.run(donerx).unwrap(); +} + +#[test] +fn spawn_sends_items() { + let core = Core::new(); + let stream = unfold(0, |i| Some(Ok::<_,u8>((i, i + 1)))); + let rx = mpsc::spawn(stream, &core, 1); + assert_eq!(core.run(rx.take(4).collect()).unwrap(), + [0, 1, 2, 3]); +} + +#[test] +fn spawn_kill_dead_stream() { + use std::thread; + use std::time::Duration; + use futures::future::Either; + + // a stream which never returns anything (maybe a remote end isn't + // responding), but dropping it leads to observable side effects + // (like closing connections, releasing limited resources, ...) + #[derive(Debug)] + struct Dead { + // when dropped you should get Err(oneshot::Canceled) on the + // receiving end + done: oneshot::Sender<()>, + } + impl Stream for Dead { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + Ok(Async::NotReady) + } + } + + // need to implement a timeout for the test, as it would hang + // forever right now + let (timeout_tx, timeout_rx) = futures::sync::oneshot::channel(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(1000)); + let _ = timeout_tx.send(()); + }); + + let core = Core::new(); + let (done_tx, done_rx) = oneshot::channel(); + let stream = Dead{done: done_tx}; + let rx = mpsc::spawn(stream, &core, 1); + let res = core.run( + Ok::<_, ()>(()) + .into_future() + .then(move |_| { + // now drop the spawned stream: maybe some timeout exceeded, + // or some connection on this end was closed by the remote + // end. + drop(rx); + // and wait for the spawned stream to release its resources + done_rx + }) + .select2(timeout_rx) + ); + match res { + Err(Either::A((oneshot::Canceled, _))) => (), + _ => { + panic!("dead stream wasn't canceled"); + }, + } +} + + +/// Test case for PR #768 (issue #766). +/// The issue was: +/// Given that an empty channel is polled by the Receiver, and the only Sender +/// gets dropped without sending anything, then the Receiver would get stuck. + +#[test] +fn dropped_sender_of_unused_channel_notifies_receiver() { + let core = Core::new(); + type FUTURE = Box<futures::Future<Item=u8, Error=()>>; + + // Constructs the channel which we want to test, and two futures which + // act on that channel. + let pair = |reverse| -> Vec<FUTURE> { + // This is the channel which we want to test. + let (tx, rx) = mpsc::channel::<u8>(1); + let mut futures: Vec<FUTURE> = vec![ + Box::new(futures::stream::iter_ok(vec![]) + .forward(tx) + .map_err(|_: mpsc::SendError<u8>| ()) + .map(|_| 42) + ), + Box::new(rx.fold((), |_, _| Ok(())) + .map(|_| 24) + ), + ]; + if reverse { + futures.reverse(); + } + futures + }; + + let make_test_future = |reverse| -> Box<Future<Item=Vec<u8>, Error=()>> { + let f = futures::future::join_all(pair(reverse)); + + // Use a timeout. This is not meant to test the `sync::oneshot` but + // merely uses it to implement this timeout. + let (timeout_tx, timeout_rx) = futures::sync::oneshot::channel::<Vec<u8>>(); + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(1000)); + let x = timeout_tx.send(vec![0]); + assert!(x.is_err(), "Test timed out."); + }); + + Box::new(f.select(timeout_rx.map_err(|_|())) + .map_err(|x| x.0) + .map(|x| x.0) + ) + }; + + // The order of the tested futures is important to test fix of PR #768. + // We want future_2 to poll on the Receiver before the Sender is dropped. + let result = core.run(make_test_future(false)); + assert!(result.is_ok()); + assert_eq!(vec![42, 24], result.unwrap()); + + // Test also the other ordering: + let result = core.run(make_test_future(true)); + assert!(result.is_ok()); + assert_eq!(vec![24, 42], result.unwrap()); +} |