summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/tests
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-0.1.31/tests')
-rw-r--r--third_party/rust/futures-0.1.31/tests/all.rs377
-rw-r--r--third_party/rust/futures-0.1.31/tests/bilock.rs111
-rw-r--r--third_party/rust/futures-0.1.31/tests/buffer_unordered.rs74
-rw-r--r--third_party/rust/futures-0.1.31/tests/channel.rs75
-rw-r--r--third_party/rust/futures-0.1.31/tests/eager_drop.rs82
-rw-r--r--third_party/rust/futures-0.1.31/tests/eventual.rs320
-rw-r--r--third_party/rust/futures-0.1.31/tests/fuse.rs39
-rw-r--r--third_party/rust/futures-0.1.31/tests/future_flatten_stream.rs43
-rw-r--r--third_party/rust/futures-0.1.31/tests/futures_ordered.rs88
-rw-r--r--third_party/rust/futures-0.1.31/tests/futures_unordered.rs167
-rw-r--r--third_party/rust/futures-0.1.31/tests/inspect.rs23
-rw-r--r--third_party/rust/futures-0.1.31/tests/mpsc-close.rs152
-rw-r--r--third_party/rust/futures-0.1.31/tests/mpsc.rs567
-rw-r--r--third_party/rust/futures-0.1.31/tests/oneshot.rs253
-rw-r--r--third_party/rust/futures-0.1.31/tests/ready_queue.rs164
-rw-r--r--third_party/rust/futures-0.1.31/tests/recurse.rs25
-rw-r--r--third_party/rust/futures-0.1.31/tests/select_all.rs27
-rw-r--r--third_party/rust/futures-0.1.31/tests/select_ok.rs40
-rw-r--r--third_party/rust/futures-0.1.31/tests/shared.rs236
-rw-r--r--third_party/rust/futures-0.1.31/tests/sink.rs446
-rw-r--r--third_party/rust/futures-0.1.31/tests/split.rs47
-rw-r--r--third_party/rust/futures-0.1.31/tests/stream.rs416
-rw-r--r--third_party/rust/futures-0.1.31/tests/stream_catch_unwind.rs29
-rw-r--r--third_party/rust/futures-0.1.31/tests/support/local_executor.rs164
-rw-r--r--third_party/rust/futures-0.1.31/tests/support/mod.rs134
-rw-r--r--third_party/rust/futures-0.1.31/tests/unfold.rs52
-rw-r--r--third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs189
-rw-r--r--third_party/rust/futures-0.1.31/tests/unsync.rs266
28 files changed, 4606 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/tests/all.rs b/third_party/rust/futures-0.1.31/tests/all.rs
new file mode 100644
index 0000000000..40e402f553
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/all.rs
@@ -0,0 +1,377 @@
+#![allow(bare_trait_objects, unknown_lints)]
+
+extern crate futures;
+
+use std::sync::mpsc::{channel, TryRecvError};
+
+use futures::future::*;
+use futures::future;
+use futures::executor;
+use futures::sync::oneshot::{self, Canceled};
+
+mod support;
+use support::*;
+
+fn unselect<T, U, E>(r: Result<(T, U), (E, U)>) -> Result<T, E> {
+ match r {
+ Ok((t, _)) => Ok(t),
+ Err((e, _)) => Err(e),
+ }
+}
+
+#[test]
+fn result_smoke() {
+ fn is_future_v<A, B, C>(_: C)
+ where A: Send + 'static,
+ B: Send + 'static,
+ C: Future<Item=A, Error=B>
+ {}
+
+ is_future_v::<i32, u32, _>(f_ok(1).map(|a| a + 1));
+ is_future_v::<i32, u32, _>(f_ok(1).map_err(|a| a + 1));
+ is_future_v::<i32, u32, _>(f_ok(1).and_then(Ok));
+ is_future_v::<i32, u32, _>(f_ok(1).or_else(Err));
+ is_future_v::<(i32, i32), u32, _>(f_ok(1).join(Err(3)));
+ is_future_v::<i32, u32, _>(f_ok(1).map(f_ok).flatten());
+
+ assert_done(|| f_ok(1), r_ok(1));
+ assert_done(|| f_err(1), r_err(1));
+ assert_done(|| result(Ok(1)), r_ok(1));
+ assert_done(|| result(Err(1)), r_err(1));
+ assert_done(|| ok(1), r_ok(1));
+ assert_done(|| err(1), r_err(1));
+ assert_done(|| f_ok(1).map(|a| a + 2), r_ok(3));
+ assert_done(|| f_err(1).map(|a| a + 2), r_err(1));
+ assert_done(|| f_ok(1).map_err(|a| a + 2), r_ok(1));
+ assert_done(|| f_err(1).map_err(|a| a + 2), r_err(3));
+ assert_done(|| f_ok(1).and_then(|a| Ok(a + 2)), r_ok(3));
+ assert_done(|| f_err(1).and_then(|a| Ok(a + 2)), r_err(1));
+ assert_done(|| f_ok(1).and_then(|a| Err(a as u32 + 3)), r_err(4));
+ assert_done(|| f_err(1).and_then(|a| Err(a as u32 + 4)), r_err(1));
+ assert_done(|| f_ok(1).or_else(|a| Ok(a as i32 + 2)), r_ok(1));
+ assert_done(|| f_err(1).or_else(|a| Ok(a as i32 + 2)), r_ok(3));
+ assert_done(|| f_ok(1).or_else(|a| Err(a + 3)), r_ok(1));
+ assert_done(|| f_err(1).or_else(|a| Err(a + 4)), r_err(5));
+ assert_done(|| f_ok(1).select(f_err(2)).then(unselect), r_ok(1));
+ assert_done(|| f_ok(1).select(Ok(2)).then(unselect), r_ok(1));
+ assert_done(|| f_err(1).select(f_ok(1)).then(unselect), r_err(1));
+ assert_done(|| f_ok(1).select(empty()).then(unselect), Ok(1));
+ assert_done(|| empty().select(f_ok(1)).then(unselect), Ok(1));
+ assert_done(|| f_ok(1).join(f_err(1)), Err(1));
+ assert_done(|| f_ok(1).join(Ok(2)), Ok((1, 2)));
+ assert_done(|| f_err(1).join(f_ok(1)), Err(1));
+ assert_done(|| f_ok(1).then(|_| Ok(2)), r_ok(2));
+ assert_done(|| f_ok(1).then(|_| Err(2)), r_err(2));
+ assert_done(|| f_err(1).then(|_| Ok(2)), r_ok(2));
+ assert_done(|| f_err(1).then(|_| Err(2)), r_err(2));
+}
+
+#[test]
+fn test_empty() {
+ fn empty() -> Empty<i32, u32> { future::empty() }
+
+ assert_empty(|| empty());
+ assert_empty(|| empty().select(empty()));
+ assert_empty(|| empty().join(empty()));
+ assert_empty(|| empty().join(f_ok(1)));
+ assert_empty(|| f_ok(1).join(empty()));
+ assert_empty(|| empty().or_else(move |_| empty()));
+ assert_empty(|| empty().and_then(move |_| empty()));
+ assert_empty(|| f_err(1).or_else(move |_| empty()));
+ assert_empty(|| f_ok(1).and_then(move |_| empty()));
+ assert_empty(|| empty().map(|a| a + 1));
+ assert_empty(|| empty().map_err(|a| a + 1));
+ assert_empty(|| empty().then(|a| a));
+}
+
+#[test]
+fn test_ok() {
+ assert_done(|| ok(1), r_ok(1));
+ assert_done(|| err(1), r_err(1));
+}
+
+#[test]
+fn flatten() {
+ fn ok<T: Send + 'static>(a: T) -> FutureResult<T, u32> {
+ future::ok(a)
+ }
+ fn err<E: Send + 'static>(b: E) -> FutureResult<i32, E> {
+ future::err(b)
+ }
+
+ assert_done(|| ok(ok(1)).flatten(), r_ok(1));
+ assert_done(|| ok(err(1)).flatten(), r_err(1));
+ assert_done(|| err(1u32).map(ok).flatten(), r_err(1));
+ assert_done(|| future::ok::<_, u8>(future::ok::<_, u32>(1))
+ .flatten(), r_ok(1));
+ assert_empty(|| ok(empty::<i32, u32>()).flatten());
+ assert_empty(|| empty::<i32, u32>().map(ok).flatten());
+}
+
+#[test]
+fn smoke_oneshot() {
+ assert_done(|| {
+ let (c, p) = oneshot::channel();
+ c.send(1).unwrap();
+ p
+ }, Ok(1));
+ assert_done(|| {
+ let (c, p) = oneshot::channel::<i32>();
+ drop(c);
+ p
+ }, Err(Canceled));
+ let mut completes = Vec::new();
+ assert_empty(|| {
+ let (a, b) = oneshot::channel::<i32>();
+ completes.push(a);
+ b
+ });
+
+ let (c, p) = oneshot::channel::<i32>();
+ drop(c);
+ let res = executor::spawn(p).poll_future_notify(&notify_panic(), 0);
+ assert!(res.is_err());
+ let (c, p) = oneshot::channel::<i32>();
+ drop(c);
+ let (tx, rx) = channel();
+ p.then(move |_| {
+ tx.send(())
+ }).forget();
+ rx.recv().unwrap();
+}
+
+#[test]
+fn select_cancels() {
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |b| { btx.send(b).unwrap(); b });
+ let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+
+ let f = b.select(d).then(unselect);
+ // assert!(f.poll(&mut Task::new()).is_not_ready());
+ assert!(brx.try_recv().is_err());
+ assert!(drx.try_recv().is_err());
+ a.send(1).unwrap();
+ let res = executor::spawn(f).poll_future_notify(&notify_panic(), 0);
+ assert!(res.ok().unwrap().is_ready());
+ assert_eq!(brx.recv().unwrap(), 1);
+ drop(c);
+ assert!(drx.recv().is_err());
+
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, _brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |b| { btx.send(b).unwrap(); b });
+ let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+
+ let mut f = executor::spawn(b.select(d).then(unselect));
+ assert!(f.poll_future_notify(&notify_noop(), 0).ok().unwrap().is_not_ready());
+ assert!(f.poll_future_notify(&notify_noop(), 0).ok().unwrap().is_not_ready());
+ a.send(1).unwrap();
+ assert!(f.poll_future_notify(&notify_panic(), 0).ok().unwrap().is_ready());
+ drop((c, f));
+ assert!(drx.recv().is_err());
+}
+
+#[test]
+fn join_cancels() {
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, _brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |b| { btx.send(b).unwrap(); b });
+ let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+
+ let f = b.join(d);
+ drop(a);
+ let res = executor::spawn(f).poll_future_notify(&notify_panic(), 0);
+ assert!(res.is_err());
+ drop(c);
+ assert!(drx.recv().is_err());
+
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, _brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |b| { btx.send(b).unwrap(); b });
+ let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+
+ let (tx, rx) = channel();
+ let f = b.join(d);
+ f.then(move |_| {
+ tx.send(()).unwrap();
+ let res: Result<(), ()> = Ok(());
+ res
+ }).forget();
+ assert!(rx.try_recv().is_err());
+ drop(a);
+ rx.recv().unwrap();
+ drop(c);
+ assert!(drx.recv().is_err());
+}
+
+#[test]
+fn join_incomplete() {
+ let (a, b) = oneshot::channel::<i32>();
+ let (tx, rx) = channel();
+ let mut f = executor::spawn(ok(1).join(b).map(move |r| tx.send(r).unwrap()));
+ assert!(f.poll_future_notify(&notify_noop(), 0).ok().unwrap().is_not_ready());
+ assert!(rx.try_recv().is_err());
+ a.send(2).unwrap();
+ assert!(f.poll_future_notify(&notify_noop(), 0).ok().unwrap().is_ready());
+ assert_eq!(rx.recv().unwrap(), (1, 2));
+
+ let (a, b) = oneshot::channel::<i32>();
+ let (tx, rx) = channel();
+ let mut f = executor::spawn(b.join(Ok(2)).map(move |r| tx.send(r).unwrap()));
+ assert!(f.poll_future_notify(&notify_noop(), 0).ok().unwrap().is_not_ready());
+ assert!(rx.try_recv().is_err());
+ a.send(1).unwrap();
+ assert!(f.poll_future_notify(&notify_noop(), 0).ok().unwrap().is_ready());
+ assert_eq!(rx.recv().unwrap(), (1, 2));
+
+ let (a, b) = oneshot::channel::<i32>();
+ let (tx, rx) = channel();
+ let mut f = executor::spawn(ok(1).join(b).map_err(move |_r| tx.send(2).unwrap()));
+ assert!(f.poll_future_notify(&notify_noop(), 0).ok().unwrap().is_not_ready());
+ assert!(rx.try_recv().is_err());
+ drop(a);
+ assert!(f.poll_future_notify(&notify_noop(), 0).is_err());
+ assert_eq!(rx.recv().unwrap(), 2);
+
+ let (a, b) = oneshot::channel::<i32>();
+ let (tx, rx) = channel();
+ let mut f = executor::spawn(b.join(Ok(2)).map_err(move |_r| tx.send(1).unwrap()));
+ assert!(f.poll_future_notify(&notify_noop(), 0).ok().unwrap().is_not_ready());
+ assert!(rx.try_recv().is_err());
+ drop(a);
+ assert!(f.poll_future_notify(&notify_noop(), 0).is_err());
+ assert_eq!(rx.recv().unwrap(), 1);
+}
+
+#[test]
+fn collect_collects() {
+ assert_done(|| join_all(vec![f_ok(1), f_ok(2)]), Ok(vec![1, 2]));
+ assert_done(|| join_all(vec![f_ok(1)]), Ok(vec![1]));
+ assert_done(|| join_all(Vec::<Result<i32, u32>>::new()), Ok(vec![]));
+
+ // TODO: needs more tests
+}
+
+#[test]
+fn select2() {
+ fn d<T, U, E>(r: Result<(T, U), (E, U)>) -> Result<T, E> {
+ match r {
+ Ok((t, _u)) => Ok(t),
+ Err((e, _u)) => Err(e),
+ }
+ }
+
+ assert_done(|| f_ok(2).select(empty()).then(d), Ok(2));
+ assert_done(|| empty().select(f_ok(2)).then(d), Ok(2));
+ assert_done(|| f_err(2).select(empty()).then(d), Err(2));
+ assert_done(|| empty().select(f_err(2)).then(d), Err(2));
+
+ assert_done(|| {
+ f_ok(1).select(f_ok(2))
+ .map_err(|_| 0)
+ .and_then(|(a, b)| b.map(move |b| a + b))
+ }, Ok(3));
+
+ // Finish one half of a select and then fail the second, ensuring that we
+ // get the notification of the second one.
+ {
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let f = b.select(d);
+ let (tx, rx) = channel();
+ f.map(move |r| tx.send(r).unwrap()).forget();
+ a.send(1).unwrap();
+ let (val, next) = rx.recv().unwrap();
+ assert_eq!(val, 1);
+ let (tx, rx) = channel();
+ next.map_err(move |_r| tx.send(2).unwrap()).forget();
+ assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty);
+ drop(c);
+ assert_eq!(rx.recv().unwrap(), 2);
+ }
+
+ // Fail the second half and ensure that we see the first one finish
+ {
+ let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let f = b.select(d);
+ let (tx, rx) = channel();
+ f.map_err(move |r| tx.send((1, r.1)).unwrap()).forget();
+ drop(c);
+ let (val, next) = rx.recv().unwrap();
+ assert_eq!(val, 1);
+ let (tx, rx) = channel();
+ next.map(move |r| tx.send(r).unwrap()).forget();
+ assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty);
+ a.send(2).unwrap();
+ assert_eq!(rx.recv().unwrap(), 2);
+ }
+
+ // Cancelling the first half should cancel the second
+ {
+ let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |v| { btx.send(v).unwrap(); v });
+ let d = d.map(move |v| { dtx.send(v).unwrap(); v });
+ let f = b.select(d);
+ drop(f);
+ assert!(drx.recv().is_err());
+ assert!(brx.recv().is_err());
+ }
+
+ // Cancel after a schedule
+ {
+ let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |v| { btx.send(v).unwrap(); v });
+ let d = d.map(move |v| { dtx.send(v).unwrap(); v });
+ let f = b.select(d);
+ drop(executor::spawn(f).poll_future_notify(&support::notify_noop(), 0));
+ assert!(drx.recv().is_err());
+ assert!(brx.recv().is_err());
+ }
+
+ // Cancel propagates
+ {
+ let ((a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
+ let ((btx, brx), (dtx, drx)) = (channel(), channel());
+ let b = b.map(move |v| { btx.send(v).unwrap(); v });
+ let d = d.map(move |v| { dtx.send(v).unwrap(); v });
+ let (tx, rx) = channel();
+ b.select(d).map(move |_| tx.send(()).unwrap()).forget();
+ drop(a);
+ assert!(drx.recv().is_err());
+ assert!(brx.recv().is_err());
+ assert!(rx.recv().is_err());
+ }
+
+ // Cancel on early drop
+ {
+ let (tx, rx) = channel();
+ let f = f_ok(1).select(empty().map(move |()| {
+ tx.send(()).unwrap();
+ 1
+ }));
+ drop(f);
+ assert!(rx.recv().is_err());
+ }
+}
+
+#[test]
+fn option() {
+ assert_eq!(Ok(Some(())), Some(ok::<(), ()>(())).wait());
+ assert_eq!(Ok(None), <Option<FutureResult<(), ()>> as Future>::wait(None));
+}
+
+#[test]
+fn spawn_does_unsize() {
+ #[derive(Clone, Copy)]
+ struct EmptyNotify;
+ impl executor::Notify for EmptyNotify {
+ fn notify(&self, _: usize) { panic!("Cannot notify"); }
+ }
+ static EMPTY: &'static EmptyNotify = &EmptyNotify;
+
+ let spawn: executor::Spawn<FutureResult<(), ()>> = executor::spawn(future::ok(()));
+ let mut spawn_box: Box<executor::Spawn<Future<Item = (), Error = ()>>> = Box::new(spawn);
+ spawn_box.poll_future_notify(&EMPTY, 0).unwrap();
+}
diff --git a/third_party/rust/futures-0.1.31/tests/bilock.rs b/third_party/rust/futures-0.1.31/tests/bilock.rs
new file mode 100644
index 0000000000..1658bdae27
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/bilock.rs
@@ -0,0 +1,111 @@
+#![allow(bare_trait_objects, unknown_lints)]
+
+extern crate futures;
+
+use std::thread;
+
+use futures::prelude::*;
+use futures::executor;
+use futures::stream;
+use futures::future;
+use futures::sync::BiLock;
+
+mod support;
+use support::*;
+
+#[test]
+fn smoke() {
+ let future = future::lazy(|| {
+ let (a, b) = BiLock::new(1);
+
+ {
+ let mut lock = match a.poll_lock() {
+ Async::Ready(l) => l,
+ Async::NotReady => panic!("poll not ready"),
+ };
+ assert_eq!(*lock, 1);
+ *lock = 2;
+
+ assert!(b.poll_lock().is_not_ready());
+ assert!(a.poll_lock().is_not_ready());
+ }
+
+ assert!(b.poll_lock().is_ready());
+ assert!(a.poll_lock().is_ready());
+
+ {
+ let lock = match b.poll_lock() {
+ Async::Ready(l) => l,
+ Async::NotReady => panic!("poll not ready"),
+ };
+ assert_eq!(*lock, 2);
+ }
+
+ assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2);
+
+ Ok::<(), ()>(())
+ });
+
+ assert!(executor::spawn(future)
+ .poll_future_notify(&notify_noop(), 0)
+ .expect("failure in poll")
+ .is_ready());
+}
+
+#[test]
+fn concurrent() {
+ const N: usize = 10000;
+ let (a, b) = BiLock::new(0);
+
+ let a = Increment {
+ a: Some(a),
+ remaining: N,
+ };
+ let b = stream::iter_ok::<_, ()>((0..N)).fold(b, |b, _n| {
+ b.lock().map(|mut b| {
+ *b += 1;
+ b.unlock()
+ })
+ });
+
+ let t1 = thread::spawn(move || a.wait());
+ let b = b.wait().expect("b error");
+ let a = t1.join().unwrap().expect("a error");
+
+ match a.poll_lock() {
+ Async::Ready(l) => assert_eq!(*l, 2 * N),
+ Async::NotReady => panic!("poll not ready"),
+ }
+ match b.poll_lock() {
+ Async::Ready(l) => assert_eq!(*l, 2 * N),
+ Async::NotReady => panic!("poll not ready"),
+ }
+
+ assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N);
+
+ struct Increment {
+ remaining: usize,
+ a: Option<BiLock<usize>>,
+ }
+
+ impl Future for Increment {
+ type Item = BiLock<usize>;
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<BiLock<usize>, ()> {
+ loop {
+ if self.remaining == 0 {
+ return Ok(self.a.take().unwrap().into())
+ }
+
+ let a = self.a.as_ref().unwrap();
+ let mut a = match a.poll_lock() {
+ Async::Ready(l) => l,
+ Async::NotReady => return Ok(Async::NotReady),
+ };
+ self.remaining -= 1;
+ *a += 1;
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/tests/buffer_unordered.rs b/third_party/rust/futures-0.1.31/tests/buffer_unordered.rs
new file mode 100644
index 0000000000..005bbd9835
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/buffer_unordered.rs
@@ -0,0 +1,74 @@
+extern crate futures;
+
+use std::sync::mpsc as std_mpsc;
+use std::thread;
+
+use futures::prelude::*;
+use futures::sync::oneshot;
+use futures::sync::mpsc;
+
+#[test]
+fn works() {
+ const N: usize = 4;
+
+ let (mut tx, rx) = mpsc::channel(1);
+
+ let (tx2, rx2) = std_mpsc::channel();
+ let (tx3, rx3) = std_mpsc::channel();
+ let t1 = thread::spawn(move || {
+ for _ in 0..N+1 {
+ let (mytx, myrx) = oneshot::channel();
+ tx = tx.send(myrx).wait().unwrap();
+ tx3.send(mytx).unwrap();
+ }
+ rx2.recv().unwrap();
+ for _ in 0..N {
+ let (mytx, myrx) = oneshot::channel();
+ tx = tx.send(myrx).wait().unwrap();
+ tx3.send(mytx).unwrap();
+ }
+ });
+
+ let (tx4, rx4) = std_mpsc::channel();
+ let t2 = thread::spawn(move || {
+ for item in rx.map_err(|_| panic!()).buffer_unordered(N).wait() {
+ tx4.send(item.unwrap()).unwrap();
+ }
+ });
+
+ let o1 = rx3.recv().unwrap();
+ let o2 = rx3.recv().unwrap();
+ let o3 = rx3.recv().unwrap();
+ let o4 = rx3.recv().unwrap();
+ assert!(rx4.try_recv().is_err());
+
+ o1.send(1).unwrap();
+ assert_eq!(rx4.recv(), Ok(1));
+ o3.send(3).unwrap();
+ assert_eq!(rx4.recv(), Ok(3));
+ tx2.send(()).unwrap();
+ o2.send(2).unwrap();
+ assert_eq!(rx4.recv(), Ok(2));
+ o4.send(4).unwrap();
+ assert_eq!(rx4.recv(), Ok(4));
+
+ let o5 = rx3.recv().unwrap();
+ let o6 = rx3.recv().unwrap();
+ let o7 = rx3.recv().unwrap();
+ let o8 = rx3.recv().unwrap();
+ let o9 = rx3.recv().unwrap();
+
+ o5.send(5).unwrap();
+ assert_eq!(rx4.recv(), Ok(5));
+ o8.send(8).unwrap();
+ assert_eq!(rx4.recv(), Ok(8));
+ o9.send(9).unwrap();
+ assert_eq!(rx4.recv(), Ok(9));
+ o7.send(7).unwrap();
+ assert_eq!(rx4.recv(), Ok(7));
+ o6.send(6).unwrap();
+ assert_eq!(rx4.recv(), Ok(6));
+
+ t1.join().unwrap();
+ t2.join().unwrap();
+}
diff --git a/third_party/rust/futures-0.1.31/tests/channel.rs b/third_party/rust/futures-0.1.31/tests/channel.rs
new file mode 100644
index 0000000000..7940de4509
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/channel.rs
@@ -0,0 +1,75 @@
+#![allow(bare_trait_objects, unknown_lints)]
+
+extern crate futures;
+
+use std::sync::atomic::*;
+
+use futures::prelude::*;
+use futures::future::result;
+use futures::sync::mpsc;
+
+mod support;
+use support::*;
+
+#[test]
+fn sequence() {
+ let (tx, mut rx) = mpsc::channel(1);
+
+ sassert_empty(&mut rx);
+ sassert_empty(&mut rx);
+
+ let amt = 20;
+ send(amt, tx).forget();
+ let mut rx = rx.wait();
+ for i in (1..amt + 1).rev() {
+ assert_eq!(rx.next(), Some(Ok(i)));
+ }
+ assert_eq!(rx.next(), None);
+
+ fn send(n: u32, sender: mpsc::Sender<u32>)
+ -> Box<Future<Item=(), Error=()> + Send> {
+ if n == 0 {
+ return Box::new(result(Ok(())))
+ }
+ Box::new(sender.send(n).map_err(|_| ()).and_then(move |sender| {
+ send(n - 1, sender)
+ }))
+ }
+}
+
+#[test]
+fn drop_sender() {
+ let (tx, mut rx) = mpsc::channel::<u32>(1);
+ drop(tx);
+ sassert_done(&mut rx);
+}
+
+#[test]
+fn drop_rx() {
+ let (tx, rx) = mpsc::channel::<u32>(1);
+ let tx = tx.send(1).wait().ok().unwrap();
+ drop(rx);
+ assert!(tx.send(1).wait().is_err());
+}
+
+#[test]
+fn drop_order() {
+ #[allow(deprecated)]
+ static DROPS: AtomicUsize = ATOMIC_USIZE_INIT;
+ let (tx, rx) = mpsc::channel(1);
+
+ struct A;
+
+ impl Drop for A {
+ fn drop(&mut self) {
+ DROPS.fetch_add(1, Ordering::SeqCst);
+ }
+ }
+
+ let tx = tx.send(A).wait().unwrap();
+ assert_eq!(DROPS.load(Ordering::SeqCst), 0);
+ drop(rx);
+ assert_eq!(DROPS.load(Ordering::SeqCst), 1);
+ assert!(tx.send(A).wait().is_err());
+ assert_eq!(DROPS.load(Ordering::SeqCst), 2);
+}
diff --git a/third_party/rust/futures-0.1.31/tests/eager_drop.rs b/third_party/rust/futures-0.1.31/tests/eager_drop.rs
new file mode 100644
index 0000000000..79f94d5ddc
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/eager_drop.rs
@@ -0,0 +1,82 @@
+extern crate futures;
+
+use std::sync::mpsc::channel;
+
+use futures::prelude::*;
+use futures::sync::oneshot;
+use futures::future::{err, ok};
+
+mod support;
+use support::*;
+
+#[test]
+fn map() {
+ // Whatever runs after a `map` should have dropped the closure by that
+ // point.
+ let (tx, rx) = channel::<()>();
+ let (tx2, rx2) = channel();
+ err::<i32, i32>(1).map(move |a| { drop(tx); a }).map_err(move |_| {
+ assert!(rx.recv().is_err());
+ tx2.send(()).unwrap()
+ }).forget();
+ rx2.recv().unwrap();
+}
+
+#[test]
+fn map_err() {
+ // Whatever runs after a `map_err` should have dropped the closure by that
+ // point.
+ let (tx, rx) = channel::<()>();
+ let (tx2, rx2) = channel();
+ ok::<i32, i32>(1).map_err(move |a| { drop(tx); a }).map(move |_| {
+ assert!(rx.recv().is_err());
+ tx2.send(()).unwrap()
+ }).forget();
+ rx2.recv().unwrap();
+}
+
+struct FutureData<F, T> {
+ _data: T,
+ future: F,
+}
+
+impl<F: Future, T: Send + 'static> Future for FutureData<F, T> {
+ type Item = F::Item;
+ type Error = F::Error;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ self.future.poll()
+ }
+}
+
+#[test]
+fn and_then_drops_eagerly() {
+ let (c, p) = oneshot::channel::<()>();
+ let (tx, rx) = channel::<()>();
+ let (tx2, rx2) = channel();
+ FutureData { _data: tx, future: p }.and_then(move |_| {
+ assert!(rx.recv().is_err());
+ tx2.send(()).unwrap();
+ ok(1)
+ }).forget();
+ assert!(rx2.try_recv().is_err());
+ c.send(()).unwrap();
+ rx2.recv().unwrap();
+}
+
+// #[test]
+// fn or_else_drops_eagerly() {
+// let (p1, c1) = oneshot::<(), ()>();
+// let (p2, c2) = oneshot::<(), ()>();
+// let (tx, rx) = channel::<()>();
+// let (tx2, rx2) = channel();
+// p1.map(move |a| { drop(tx); a }).or_else(move |_| {
+// assert!(rx.recv().is_err());
+// p2
+// }).map(move |_| tx2.send(()).unwrap()).forget();
+// assert!(rx2.try_recv().is_err());
+// c1.fail(());
+// assert!(rx2.try_recv().is_err());
+// c2.finish(());
+// rx2.recv().unwrap();
+// }
diff --git a/third_party/rust/futures-0.1.31/tests/eventual.rs b/third_party/rust/futures-0.1.31/tests/eventual.rs
new file mode 100644
index 0000000000..fc484aaad2
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/eventual.rs
@@ -0,0 +1,320 @@
+extern crate futures;
+
+mod support;
+use support::*;
+
+use std::sync::mpsc;
+use std::thread;
+
+use futures::prelude::*;
+use futures::future::{ok, err};
+use futures::sync::oneshot;
+
+#[test]
+fn and_then1() {
+ let (tx, rx) = mpsc::channel();
+
+ let tx2 = tx.clone();
+ let p1 = ok::<_, i32>("a").then(move |t| { tx2.send("first").unwrap(); t });
+ let tx2 = tx.clone();
+ let p2 = ok("b").then(move |t| { tx2.send("second").unwrap(); t });
+ let f = p1.and_then(|_| p2);
+
+ assert!(rx.try_recv().is_err());
+ f.map(move |s| tx.send(s).unwrap()).forget();
+ assert_eq!(rx.recv(), Ok("first"));
+ assert_eq!(rx.recv(), Ok("second"));
+ assert_eq!(rx.recv(), Ok("b"));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn and_then2() {
+ let (tx, rx) = mpsc::channel();
+
+ let tx2 = tx.clone();
+ let p1 = err::<i32, _>(2).then(move |t| { tx2.send("first").unwrap(); t });
+ let tx2 = tx.clone();
+ let p2 = ok("b").then(move |t| { tx2.send("second").unwrap(); t });
+ let f = p1.and_then(|_| p2);
+
+ assert!(rx.try_recv().is_err());
+ f.map_err(|_| drop(tx)).forget();
+ assert_eq!(rx.recv(), Ok("first"));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn oneshot1() {
+ let (c, p) = oneshot::channel::<i32>();
+ let t = thread::spawn(|| c.send(1).unwrap());
+
+ let (tx, rx) = mpsc::channel();
+ p.map(move |e| tx.send(e).unwrap()).forget();
+ assert_eq!(rx.recv(), Ok(1));
+ t.join().unwrap();
+}
+
+#[test]
+fn oneshot2() {
+ let (c, p) = oneshot::channel::<i32>();
+ let t = thread::spawn(|| c.send(1).unwrap());
+ t.join().unwrap();
+
+ let (tx, rx) = mpsc::channel();
+ p.map(move |e| tx.send(e).unwrap()).forget();
+ assert_eq!(rx.recv(), Ok(1));
+}
+
+#[test]
+fn oneshot3() {
+ let (c, p) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ p.map(move |e| tx.send(e).unwrap()).forget();
+
+ let t = thread::spawn(|| c.send(1).unwrap());
+ t.join().unwrap();
+
+ assert_eq!(rx.recv(), Ok(1));
+}
+
+#[test]
+fn oneshot4() {
+ let (c, p) = oneshot::channel::<i32>();
+ drop(c);
+
+ let (tx, rx) = mpsc::channel();
+ p.map(move |e| tx.send(e).unwrap()).forget();
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn oneshot5() {
+ let (c, p) = oneshot::channel::<i32>();
+ let t = thread::spawn(|| drop(c));
+ let (tx, rx) = mpsc::channel();
+ p.map(move |t| tx.send(t).unwrap()).forget();
+ t.join().unwrap();
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn oneshot6() {
+ let (c, p) = oneshot::channel::<i32>();
+ drop(p);
+ c.send(2).unwrap_err();
+}
+
+#[test]
+fn cancel1() {
+ let (c, p) = oneshot::channel::<i32>();
+ drop(c);
+ p.map(|_| panic!()).forget();
+}
+
+#[test]
+fn map_err1() {
+ ok::<i32, i32>(1).map_err(|_| panic!()).forget();
+}
+
+#[test]
+fn map_err2() {
+ let (tx, rx) = mpsc::channel();
+ err::<i32, i32>(1).map_err(move |v| tx.send(v).unwrap()).forget();
+ assert_eq!(rx.recv(), Ok(1));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn map_err3() {
+ let (c, p) = oneshot::channel::<i32>();
+ p.map_err(|_| {}).forget();
+ drop(c);
+}
+
+#[test]
+fn or_else1() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+
+ let (tx, rx) = mpsc::channel();
+ let tx2 = tx.clone();
+ let p1 = p1.map_err(move |i| { tx2.send(2).unwrap(); i });
+ let tx2 = tx.clone();
+ let p2 = p2.map(move |i| { tx2.send(i).unwrap(); i });
+
+ assert!(rx.try_recv().is_err());
+ drop(c1);
+ c2.send(3).unwrap();
+ p1.or_else(|_| p2).map(move |v| tx.send(v).unwrap()).forget();
+
+ assert_eq!(rx.recv(), Ok(2));
+ assert_eq!(rx.recv(), Ok(3));
+ assert_eq!(rx.recv(), Ok(3));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn or_else2() {
+ let (c1, p1) = oneshot::channel::<i32>();
+
+ let (tx, rx) = mpsc::channel();
+
+ p1.or_else(move |_| {
+ tx.send(()).unwrap();
+ ok::<i32, i32>(1)
+ }).forget();
+
+ c1.send(2).unwrap();
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn join1() {
+ let (tx, rx) = mpsc::channel();
+ ok::<i32, i32>(1).join(ok(2))
+ .map(move |v| tx.send(v).unwrap())
+ .forget();
+ assert_eq!(rx.recv(), Ok((1, 2)));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn join2() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ p1.join(p2).map(move |v| tx.send(v).unwrap()).forget();
+ assert!(rx.try_recv().is_err());
+ c1.send(1).unwrap();
+ assert!(rx.try_recv().is_err());
+ c2.send(2).unwrap();
+ assert_eq!(rx.recv(), Ok((1, 2)));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn join3() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ p1.join(p2).map_err(move |_v| tx.send(1).unwrap()).forget();
+ assert!(rx.try_recv().is_err());
+ drop(c1);
+ assert_eq!(rx.recv(), Ok(1));
+ assert!(rx.recv().is_err());
+ drop(c2);
+}
+
+#[test]
+fn join4() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ p1.join(p2).map_err(move |v| tx.send(v).unwrap()).forget();
+ assert!(rx.try_recv().is_err());
+ drop(c1);
+ assert!(rx.recv().is_ok());
+ drop(c2);
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn join5() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (c3, p3) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ p1.join(p2).join(p3).map(move |v| tx.send(v).unwrap()).forget();
+ assert!(rx.try_recv().is_err());
+ c1.send(1).unwrap();
+ assert!(rx.try_recv().is_err());
+ c2.send(2).unwrap();
+ assert!(rx.try_recv().is_err());
+ c3.send(3).unwrap();
+ assert_eq!(rx.recv(), Ok(((1, 2), 3)));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn select1() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ p1.select(p2).map(move |v| tx.send(v).unwrap()).forget();
+ assert!(rx.try_recv().is_err());
+ c1.send(1).unwrap();
+ let (v, p2) = rx.recv().unwrap();
+ assert_eq!(v, 1);
+ assert!(rx.recv().is_err());
+
+ let (tx, rx) = mpsc::channel();
+ p2.map(move |v| tx.send(v).unwrap()).forget();
+ c2.send(2).unwrap();
+ assert_eq!(rx.recv(), Ok(2));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn select2() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ p1.select(p2).map_err(move |v| tx.send((1, v.1)).unwrap()).forget();
+ assert!(rx.try_recv().is_err());
+ drop(c1);
+ let (v, p2) = rx.recv().unwrap();
+ assert_eq!(v, 1);
+ assert!(rx.recv().is_err());
+
+ let (tx, rx) = mpsc::channel();
+ p2.map(move |v| tx.send(v).unwrap()).forget();
+ c2.send(2).unwrap();
+ assert_eq!(rx.recv(), Ok(2));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn select3() {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+ let (tx, rx) = mpsc::channel();
+ p1.select(p2).map_err(move |v| tx.send((1, v.1)).unwrap()).forget();
+ assert!(rx.try_recv().is_err());
+ drop(c1);
+ let (v, p2) = rx.recv().unwrap();
+ assert_eq!(v, 1);
+ assert!(rx.recv().is_err());
+
+ let (tx, rx) = mpsc::channel();
+ p2.map_err(move |_v| tx.send(2).unwrap()).forget();
+ drop(c2);
+ assert_eq!(rx.recv(), Ok(2));
+ assert!(rx.recv().is_err());
+}
+
+#[test]
+fn select4() {
+ let (tx, rx) = mpsc::channel::<oneshot::Sender<i32>>();
+
+ let t = thread::spawn(move || {
+ for c in rx {
+ c.send(1).unwrap();
+ }
+ });
+
+ let (tx2, rx2) = mpsc::channel();
+ for _ in 0..10000 {
+ let (c1, p1) = oneshot::channel::<i32>();
+ let (c2, p2) = oneshot::channel::<i32>();
+
+ let tx3 = tx2.clone();
+ p1.select(p2).map(move |_| tx3.send(()).unwrap()).forget();
+ tx.send(c1).unwrap();
+ rx2.recv().unwrap();
+ drop(c2);
+ }
+ drop(tx);
+
+ t.join().unwrap();
+}
diff --git a/third_party/rust/futures-0.1.31/tests/fuse.rs b/third_party/rust/futures-0.1.31/tests/fuse.rs
new file mode 100644
index 0000000000..177d914e19
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/fuse.rs
@@ -0,0 +1,39 @@
+extern crate futures;
+
+use futures::prelude::*;
+use futures::future::ok;
+use futures::executor;
+
+mod support;
+use support::*;
+
+#[test]
+fn fuse() {
+ let mut future = executor::spawn(ok::<i32, u32>(2).fuse());
+ assert!(future.poll_future_notify(&notify_panic(), 0).unwrap().is_ready());
+ assert!(future.poll_future_notify(&notify_panic(), 0).unwrap().is_not_ready());
+}
+
+#[test]
+fn fuse_is_done() {
+ use futures::future::{Fuse, FutureResult};
+
+ struct Wrapped(Fuse<FutureResult<i32, u32>>);
+
+ impl Future for Wrapped {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ assert!(!self.0.is_done());
+ assert_eq!(self.0.poll().unwrap(), Async::Ready(2));
+ assert!(self.0.is_done());
+ assert_eq!(self.0.poll().unwrap(), Async::NotReady);
+ assert!(self.0.is_done());
+
+ Ok(Async::Ready(()))
+ }
+ }
+
+ assert!(Wrapped(ok::<i32, u32>(2).fuse()).wait().is_ok());
+} \ No newline at end of file
diff --git a/third_party/rust/futures-0.1.31/tests/future_flatten_stream.rs b/third_party/rust/futures-0.1.31/tests/future_flatten_stream.rs
new file mode 100644
index 0000000000..442d381fd7
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/future_flatten_stream.rs
@@ -0,0 +1,43 @@
+extern crate core;
+extern crate futures;
+
+use core::marker;
+
+use futures::prelude::*;
+use futures::future::{ok, err};
+use futures::stream;
+
+#[test]
+fn successful_future() {
+ let stream_items = vec![17, 19];
+ let future_of_a_stream = ok::<_, bool>(stream::iter_ok(stream_items));
+
+ let stream = future_of_a_stream.flatten_stream();
+
+ let mut iter = stream.wait();
+ assert_eq!(Ok(17), iter.next().unwrap());
+ assert_eq!(Ok(19), iter.next().unwrap());
+ assert_eq!(None, iter.next());
+}
+
+struct PanickingStream<T, E> {
+ _marker: marker::PhantomData<(T, E)>
+}
+
+impl<T, E> Stream for PanickingStream<T, E> {
+ type Item = T;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ panic!()
+ }
+}
+
+#[test]
+fn failed_future() {
+ let future_of_a_stream = err::<PanickingStream<bool, u32>, _>(10);
+ let stream = future_of_a_stream.flatten_stream();
+ let mut iter = stream.wait();
+ assert_eq!(Err(10), iter.next().unwrap());
+ assert_eq!(None, iter.next());
+}
diff --git a/third_party/rust/futures-0.1.31/tests/futures_ordered.rs b/third_party/rust/futures-0.1.31/tests/futures_ordered.rs
new file mode 100644
index 0000000000..6054192e3b
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/futures_ordered.rs
@@ -0,0 +1,88 @@
+#![allow(bare_trait_objects, unknown_lints)]
+
+extern crate futures;
+
+use std::any::Any;
+
+use futures::sync::oneshot;
+use futures::stream::futures_ordered;
+use futures::prelude::*;
+
+mod support;
+
+#[test]
+fn works_1() {
+ let (a_tx, a_rx) = oneshot::channel::<u32>();
+ let (b_tx, b_rx) = oneshot::channel::<u32>();
+ let (c_tx, c_rx) = oneshot::channel::<u32>();
+
+ let stream = futures_ordered(vec![a_rx, b_rx, c_rx]);
+
+ let mut spawn = futures::executor::spawn(stream);
+ b_tx.send(99).unwrap();
+ assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
+
+ a_tx.send(33).unwrap();
+ c_tx.send(33).unwrap();
+ assert_eq!(Some(Ok(33)), spawn.wait_stream());
+ assert_eq!(Some(Ok(99)), spawn.wait_stream());
+ assert_eq!(Some(Ok(33)), spawn.wait_stream());
+ assert_eq!(None, spawn.wait_stream());
+}
+
+#[test]
+fn works_2() {
+ let (a_tx, a_rx) = oneshot::channel::<u32>();
+ let (b_tx, b_rx) = oneshot::channel::<u32>();
+ let (c_tx, c_rx) = oneshot::channel::<u32>();
+
+ let stream = futures_ordered(vec![
+ Box::new(a_rx) as Box<Future<Item = _, Error = _>>,
+ Box::new(b_rx.join(c_rx).map(|(a, b)| a + b)),
+ ]);
+
+ let mut spawn = futures::executor::spawn(stream);
+ a_tx.send(33).unwrap();
+ b_tx.send(33).unwrap();
+ assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready());
+ assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
+ c_tx.send(33).unwrap();
+ assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready());
+}
+
+#[test]
+fn from_iterator() {
+ use futures::future::ok;
+ use futures::stream::FuturesOrdered;
+
+ let stream = vec![
+ ok::<u32, ()>(1),
+ ok::<u32, ()>(2),
+ ok::<u32, ()>(3)
+ ].into_iter().collect::<FuturesOrdered<_>>();
+ assert_eq!(stream.len(), 3);
+ assert_eq!(stream.collect().wait(), Ok(vec![1,2,3]));
+}
+
+#[test]
+fn queue_never_unblocked() {
+ let (_a_tx, a_rx) = oneshot::channel::<Box<Any+Send>>();
+ let (b_tx, b_rx) = oneshot::channel::<Box<Any+Send>>();
+ let (c_tx, c_rx) = oneshot::channel::<Box<Any+Send>>();
+
+ let stream = futures_ordered(vec![
+ Box::new(a_rx) as Box<Future<Item = _, Error = _>>,
+ Box::new(b_rx.select(c_rx).then(|res| Ok(Box::new(res) as Box<Any+Send>))),
+ ]);
+
+ let mut spawn = futures::executor::spawn(stream);
+ for _ in 0..10 {
+ assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
+ }
+
+ b_tx.send(Box::new(())).unwrap();
+ assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
+ c_tx.send(Box::new(())).unwrap();
+ assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
+ assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
+}
diff --git a/third_party/rust/futures-0.1.31/tests/futures_unordered.rs b/third_party/rust/futures-0.1.31/tests/futures_unordered.rs
new file mode 100644
index 0000000000..325a6f3e48
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/futures_unordered.rs
@@ -0,0 +1,167 @@
+#![allow(bare_trait_objects, unknown_lints)]
+
+extern crate futures;
+
+use std::any::Any;
+
+use futures::sync::oneshot;
+use std::iter::FromIterator;
+use futures::stream::{futures_unordered, FuturesUnordered};
+use futures::prelude::*;
+
+mod support;
+
+#[test]
+fn works_1() {
+ let (a_tx, a_rx) = oneshot::channel::<u32>();
+ let (b_tx, b_rx) = oneshot::channel::<u32>();
+ let (c_tx, c_rx) = oneshot::channel::<u32>();
+
+ let stream = futures_unordered(vec![a_rx, b_rx, c_rx]);
+
+ let mut spawn = futures::executor::spawn(stream);
+ b_tx.send(99).unwrap();
+ assert_eq!(Some(Ok(99)), spawn.wait_stream());
+
+ a_tx.send(33).unwrap();
+ c_tx.send(33).unwrap();
+ assert_eq!(Some(Ok(33)), spawn.wait_stream());
+ assert_eq!(Some(Ok(33)), spawn.wait_stream());
+ assert_eq!(None, spawn.wait_stream());
+}
+
+#[test]
+fn works_2() {
+ let (a_tx, a_rx) = oneshot::channel::<u32>();
+ let (b_tx, b_rx) = oneshot::channel::<u32>();
+ let (c_tx, c_rx) = oneshot::channel::<u32>();
+
+ let stream = futures_unordered(vec![
+ Box::new(a_rx) as Box<Future<Item = _, Error = _>>,
+ Box::new(b_rx.join(c_rx).map(|(a, b)| a + b)),
+ ]);
+
+ let mut spawn = futures::executor::spawn(stream);
+ a_tx.send(33).unwrap();
+ b_tx.send(33).unwrap();
+ assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready());
+ c_tx.send(33).unwrap();
+ assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready());
+}
+
+#[test]
+fn from_iterator() {
+ use futures::future::ok;
+ use futures::stream::FuturesUnordered;
+
+ let stream = vec![
+ ok::<u32, ()>(1),
+ ok::<u32, ()>(2),
+ ok::<u32, ()>(3)
+ ].into_iter().collect::<FuturesUnordered<_>>();
+ assert_eq!(stream.len(), 3);
+ assert_eq!(stream.collect().wait(), Ok(vec![1,2,3]));
+}
+
+#[test]
+fn finished_future_ok() {
+ let (_a_tx, a_rx) = oneshot::channel::<Box<Any+Send>>();
+ let (b_tx, b_rx) = oneshot::channel::<Box<Any+Send>>();
+ let (c_tx, c_rx) = oneshot::channel::<Box<Any+Send>>();
+
+ let stream = futures_unordered(vec![
+ Box::new(a_rx) as Box<Future<Item = _, Error = _>>,
+ Box::new(b_rx.select(c_rx).then(|res| Ok(Box::new(res) as Box<Any+Send>))),
+ ]);
+
+ let mut spawn = futures::executor::spawn(stream);
+ for _ in 0..10 {
+ assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
+ }
+
+ b_tx.send(Box::new(())).unwrap();
+ let next = spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap();
+ assert!(next.is_ready());
+ c_tx.send(Box::new(())).unwrap();
+ assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
+ assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
+}
+
+#[test]
+fn iter_mut_cancel() {
+ let (a_tx, a_rx) = oneshot::channel::<u32>();
+ let (b_tx, b_rx) = oneshot::channel::<u32>();
+ let (c_tx, c_rx) = oneshot::channel::<u32>();
+
+ let mut stream = futures_unordered(vec![a_rx, b_rx, c_rx]);
+
+ for rx in stream.iter_mut() {
+ rx.close();
+ }
+
+ assert!(a_tx.is_canceled());
+ assert!(b_tx.is_canceled());
+ assert!(c_tx.is_canceled());
+
+ let mut spawn = futures::executor::spawn(stream);
+ assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
+ assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
+ assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
+ assert_eq!(None, spawn.wait_stream());
+}
+
+#[test]
+fn iter_mut_len() {
+ let mut stream = futures_unordered(vec![
+ futures::future::empty::<(),()>(),
+ futures::future::empty::<(),()>(),
+ futures::future::empty::<(),()>()
+ ]);
+
+ let mut iter_mut = stream.iter_mut();
+ assert_eq!(iter_mut.len(), 3);
+ assert!(iter_mut.next().is_some());
+ assert_eq!(iter_mut.len(), 2);
+ assert!(iter_mut.next().is_some());
+ assert_eq!(iter_mut.len(), 1);
+ assert!(iter_mut.next().is_some());
+ assert_eq!(iter_mut.len(), 0);
+ assert!(iter_mut.next().is_none());
+}
+
+#[test]
+fn polled_only_once_at_most_per_iteration() {
+ #[derive(Debug, Clone, Copy, Default)]
+ struct F {
+ polled: bool,
+ }
+
+ impl Future for F {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
+ if self.polled {
+ panic!("polled twice")
+ } else {
+ self.polled = true;
+ Ok(Async::NotReady)
+ }
+ }
+ }
+
+
+ let tasks = FuturesUnordered::from_iter(vec![F::default(); 10]);
+ let mut tasks = futures::executor::spawn(tasks);
+ assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
+ assert_eq!(10, tasks.get_mut().iter_mut().filter(|f| f.polled).count());
+
+ let tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
+ let mut tasks = futures::executor::spawn(tasks);
+ assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
+ assert_eq!(33, tasks.get_mut().iter_mut().filter(|f| f.polled).count());
+
+ let tasks = FuturesUnordered::<F>::new();
+ let mut tasks = futures::executor::spawn(tasks);
+ assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready());
+}
diff --git a/third_party/rust/futures-0.1.31/tests/inspect.rs b/third_party/rust/futures-0.1.31/tests/inspect.rs
new file mode 100644
index 0000000000..c16372ed91
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/inspect.rs
@@ -0,0 +1,23 @@
+extern crate futures;
+
+use futures::prelude::*;
+use futures::future::{ok, err};
+
+#[test]
+fn smoke() {
+ let mut counter = 0;
+
+ {
+ let work = ok::<u32, u32>(40).inspect(|val| { counter += *val; });
+ assert_eq!(work.wait(), Ok(40));
+ }
+
+ assert_eq!(counter, 40);
+
+ {
+ let work = err::<u32, u32>(4).inspect(|val| { counter += *val; });
+ assert_eq!(work.wait(), Err(4));
+ }
+
+ assert_eq!(counter, 40);
+}
diff --git a/third_party/rust/futures-0.1.31/tests/mpsc-close.rs b/third_party/rust/futures-0.1.31/tests/mpsc-close.rs
new file mode 100644
index 0000000000..061616ae06
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/mpsc-close.rs
@@ -0,0 +1,152 @@
+extern crate futures;
+
+use std::sync::{Arc, Weak};
+use std::thread;
+use std::time::{Duration, Instant};
+
+use futures::prelude::*;
+use futures::sync::mpsc::*;
+use futures::task;
+
+#[test]
+fn smoke() {
+ let (mut sender, receiver) = channel(1);
+
+ let t = thread::spawn(move ||{
+ while let Ok(s) = sender.send(42).wait() {
+ sender = s;
+ }
+ });
+
+ receiver.take(3).for_each(|_| Ok(())).wait().unwrap();
+
+ t.join().unwrap()
+}
+
+// Stress test that `try_send()`s occurring concurrently with receiver
+// close/drops don't appear as successful sends.
+#[test]
+fn stress_try_send_as_receiver_closes() {
+ const AMT: usize = 10000;
+ // To provide variable timing characteristics (in the hopes of
+ // reproducing the collision that leads to a race), we busy-re-poll
+ // the test MPSC receiver a variable number of times before actually
+ // stopping. We vary this countdown between 1 and the following
+ // value.
+ const MAX_COUNTDOWN: usize = 20;
+ // When we detect that a successfully sent item is still in the
+ // queue after a disconnect, we spin for up to 100ms to confirm that
+ // it is a persistent condition and not a concurrency illusion.
+ const SPIN_TIMEOUT_S: u64 = 10;
+ const SPIN_SLEEP_MS: u64 = 10;
+ struct TestRx {
+ rx: Receiver<Arc<()>>,
+ // The number of times to query `rx` before dropping it.
+ poll_count: usize
+ }
+ struct TestTask {
+ command_rx: Receiver<TestRx>,
+ test_rx: Option<Receiver<Arc<()>>>,
+ countdown: usize,
+ }
+ impl TestTask {
+ /// Create a new TestTask
+ fn new() -> (TestTask, Sender<TestRx>) {
+ let (command_tx, command_rx) = channel::<TestRx>(0);
+ (
+ TestTask {
+ command_rx: command_rx,
+ test_rx: None,
+ countdown: 0, // 0 means no countdown is in progress.
+ },
+ command_tx,
+ )
+ }
+ }
+ impl Future for TestTask {
+ type Item = ();
+ type Error = ();
+ fn poll(&mut self) -> Poll<(), ()> {
+ // Poll the test channel, if one is present.
+ if let Some(ref mut rx) = self.test_rx {
+ if let Ok(Async::Ready(v)) = rx.poll() {
+ let _ = v.expect("test finished unexpectedly!");
+ }
+ self.countdown -= 1;
+ // Busy-poll until the countdown is finished.
+ task::current().notify();
+ }
+ // Accept any newly submitted MPSC channels for testing.
+ match self.command_rx.poll()? {
+ Async::Ready(Some(TestRx { rx, poll_count })) => {
+ self.test_rx = Some(rx);
+ self.countdown = poll_count;
+ task::current().notify();
+ },
+ Async::Ready(None) => return Ok(Async::Ready(())),
+ _ => {},
+ }
+ if self.countdown == 0 {
+ // Countdown complete -- drop the Receiver.
+ self.test_rx = None;
+ }
+ Ok(Async::NotReady)
+ }
+ }
+ let (f, mut cmd_tx) = TestTask::new();
+ let bg = thread::spawn(move || f.wait());
+ for i in 0..AMT {
+ let (mut test_tx, rx) = channel(0);
+ let poll_count = i % MAX_COUNTDOWN;
+ cmd_tx.try_send(TestRx { rx: rx, poll_count: poll_count }).unwrap();
+ let mut prev_weak: Option<Weak<()>> = None;
+ let mut attempted_sends = 0;
+ let mut successful_sends = 0;
+ loop {
+ // Create a test item.
+ let item = Arc::new(());
+ let weak = Arc::downgrade(&item);
+ match test_tx.try_send(item) {
+ Ok(_) => {
+ prev_weak = Some(weak);
+ successful_sends += 1;
+ }
+ Err(ref e) if e.is_full() => {}
+ Err(ref e) if e.is_disconnected() => {
+ // Test for evidence of the race condition.
+ if let Some(prev_weak) = prev_weak {
+ if prev_weak.upgrade().is_some() {
+ // The previously sent item is still allocated.
+ // However, there appears to be some aspect of the
+ // concurrency that can legitimately cause the Arc
+ // to be momentarily valid. Spin for up to 100ms
+ // waiting for the previously sent item to be
+ // dropped.
+ let t0 = Instant::now();
+ let mut spins = 0;
+ loop {
+ if prev_weak.upgrade().is_none() {
+ break;
+ }
+ assert!(t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
+ "item not dropped on iteration {} after \
+ {} sends ({} successful). spin=({})",
+ i, attempted_sends, successful_sends, spins
+ );
+ spins += 1;
+ thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
+ }
+ }
+ }
+ break;
+ }
+ Err(ref e) => panic!("unexpected error: {}", e),
+ }
+ attempted_sends += 1;
+ }
+ }
+ drop(cmd_tx);
+ bg.join()
+ .expect("background thread join")
+ .expect("background thread result");
+}
diff --git a/third_party/rust/futures-0.1.31/tests/mpsc.rs b/third_party/rust/futures-0.1.31/tests/mpsc.rs
new file mode 100644
index 0000000000..9cb83e5952
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/mpsc.rs
@@ -0,0 +1,567 @@
+#![cfg(feature = "use_std")]
+#![allow(bare_trait_objects, unknown_lints)]
+
+#[macro_use]
+extern crate futures;
+
+use futures::prelude::*;
+use futures::future::{lazy, ok};
+use futures::stream::unfold;
+use futures::sync::mpsc;
+use futures::sync::oneshot;
+
+use std::thread;
+use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+mod support;
+use support::*;
+
+
+trait AssertSend: Send {}
+impl AssertSend for mpsc::Sender<i32> {}
+impl AssertSend for mpsc::Receiver<i32> {}
+
+#[test]
+fn send_recv() {
+ let (tx, rx) = mpsc::channel::<i32>(16);
+ let mut rx = rx.wait();
+
+ tx.send(1).wait().unwrap();
+
+ assert_eq!(rx.next().unwrap(), Ok(1));
+}
+
+#[test]
+fn send_recv_no_buffer() {
+ let (mut tx, mut rx) = mpsc::channel::<i32>(0);
+
+ // Run on a task context
+ lazy(move || {
+ assert!(tx.poll_complete().unwrap().is_ready());
+ assert!(tx.poll_ready().unwrap().is_ready());
+
+ // Send first message
+ let res = tx.start_send(1).unwrap();
+ assert!(is_ready(&res));
+ assert!(tx.poll_ready().unwrap().is_not_ready());
+
+ // Send second message
+ let res = tx.start_send(2).unwrap();
+ assert!(!is_ready(&res));
+
+ // Take the value
+ assert_eq!(rx.poll().unwrap(), Async::Ready(Some(1)));
+ assert!(tx.poll_ready().unwrap().is_ready());
+
+ let res = tx.start_send(2).unwrap();
+ assert!(is_ready(&res));
+ assert!(tx.poll_ready().unwrap().is_not_ready());
+
+ // Take the value
+ assert_eq!(rx.poll().unwrap(), Async::Ready(Some(2)));
+ assert!(tx.poll_ready().unwrap().is_ready());
+
+ Ok::<(), ()>(())
+ }).wait().unwrap();
+}
+
+#[test]
+fn send_shared_recv() {
+ let (tx1, rx) = mpsc::channel::<i32>(16);
+ let tx2 = tx1.clone();
+ let mut rx = rx.wait();
+
+ tx1.send(1).wait().unwrap();
+ assert_eq!(rx.next().unwrap(), Ok(1));
+
+ tx2.send(2).wait().unwrap();
+ assert_eq!(rx.next().unwrap(), Ok(2));
+}
+
+#[test]
+fn send_recv_threads() {
+ let (tx, rx) = mpsc::channel::<i32>(16);
+ let mut rx = rx.wait();
+
+ thread::spawn(move|| {
+ tx.send(1).wait().unwrap();
+ });
+
+ assert_eq!(rx.next().unwrap(), Ok(1));
+}
+
+#[test]
+fn send_recv_threads_no_capacity() {
+ let (tx, rx) = mpsc::channel::<i32>(0);
+ let mut rx = rx.wait();
+
+ let (readytx, readyrx) = mpsc::channel::<()>(2);
+ let mut readyrx = readyrx.wait();
+ let t = thread::spawn(move|| {
+ let readytx = readytx.sink_map_err(|_| panic!());
+ let (a, b) = tx.send(1).join(readytx.send(())).wait().unwrap();
+ a.send(2).join(b.send(())).wait().unwrap();
+ });
+
+ drop(readyrx.next().unwrap());
+ assert_eq!(rx.next().unwrap(), Ok(1));
+ drop(readyrx.next().unwrap());
+ assert_eq!(rx.next().unwrap(), Ok(2));
+
+ t.join().unwrap();
+}
+
+#[test]
+fn recv_close_gets_none() {
+ let (mut tx, mut rx) = mpsc::channel::<i32>(10);
+
+ // Run on a task context
+ lazy(move || {
+ rx.close();
+
+ assert_eq!(rx.poll(), Ok(Async::Ready(None)));
+ assert!(tx.poll_ready().is_err());
+
+ drop(tx);
+
+ Ok::<(), ()>(())
+ }).wait().unwrap();
+}
+
+
+#[test]
+fn tx_close_gets_none() {
+ let (_, mut rx) = mpsc::channel::<i32>(10);
+
+ // Run on a task context
+ lazy(move || {
+ assert_eq!(rx.poll(), Ok(Async::Ready(None)));
+ assert_eq!(rx.poll(), Ok(Async::Ready(None)));
+
+ Ok::<(), ()>(())
+ }).wait().unwrap();
+}
+
+#[test]
+fn spawn_sends_items() {
+ let core = local_executor::Core::new();
+ let stream = unfold(0, |i| Some(ok::<_,u8>((i, i + 1))));
+ let rx = mpsc::spawn(stream, &core, 1);
+ assert_eq!(core.run(rx.take(4).collect()).unwrap(),
+ [0, 1, 2, 3]);
+}
+
+#[test]
+fn spawn_kill_dead_stream() {
+ use std::thread;
+ use std::time::Duration;
+ use futures::future::Either;
+ use futures::sync::oneshot;
+
+ // a stream which never returns anything (maybe a remote end isn't
+ // responding), but dropping it leads to observable side effects
+ // (like closing connections, releasing limited resources, ...)
+ #[derive(Debug)]
+ struct Dead {
+ // when dropped you should get Err(oneshot::Canceled) on the
+ // receiving end
+ done: oneshot::Sender<()>,
+ }
+ impl Stream for Dead {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ Ok(Async::NotReady)
+ }
+ }
+
+ // need to implement a timeout for the test, as it would hang
+ // forever right now
+ let (timeout_tx, timeout_rx) = oneshot::channel();
+ thread::spawn(move || {
+ thread::sleep(Duration::from_millis(1000));
+ let _ = timeout_tx.send(());
+ });
+
+ let core = local_executor::Core::new();
+ let (done_tx, done_rx) = oneshot::channel();
+ let stream = Dead{done: done_tx};
+ let rx = mpsc::spawn(stream, &core, 1);
+ let res = core.run(
+ Ok::<_, ()>(())
+ .into_future()
+ .then(move |_| {
+ // now drop the spawned stream: maybe some timeout exceeded,
+ // or some connection on this end was closed by the remote
+ // end.
+ drop(rx);
+ // and wait for the spawned stream to release its resources
+ done_rx
+ })
+ .select2(timeout_rx)
+ );
+ match res {
+ Err(Either::A((oneshot::Canceled, _))) => (),
+ _ => {
+ panic!("dead stream wasn't canceled");
+ },
+ }
+}
+
+#[test]
+fn stress_shared_unbounded() {
+ const AMT: u32 = 10000;
+ const NTHREADS: u32 = 8;
+ let (tx, rx) = mpsc::unbounded::<i32>();
+ let mut rx = rx.wait();
+
+ let t = thread::spawn(move|| {
+ for _ in 0..AMT * NTHREADS {
+ assert_eq!(rx.next().unwrap(), Ok(1));
+ }
+
+ if rx.next().is_some() {
+ panic!();
+ }
+ });
+
+ for _ in 0..NTHREADS {
+ let tx = tx.clone();
+
+ thread::spawn(move|| {
+ for _ in 0..AMT {
+ tx.unbounded_send(1).unwrap();
+ }
+ });
+ }
+
+ drop(tx);
+
+ t.join().ok().unwrap();
+}
+
+#[test]
+fn stress_shared_bounded_hard() {
+ const AMT: u32 = 10000;
+ const NTHREADS: u32 = 8;
+ let (tx, rx) = mpsc::channel::<i32>(0);
+ let mut rx = rx.wait();
+
+ let t = thread::spawn(move|| {
+ for _ in 0..AMT * NTHREADS {
+ assert_eq!(rx.next().unwrap(), Ok(1));
+ }
+
+ if rx.next().is_some() {
+ panic!();
+ }
+ });
+
+ for _ in 0..NTHREADS {
+ let mut tx = tx.clone();
+
+ thread::spawn(move|| {
+ for _ in 0..AMT {
+ tx = tx.send(1).wait().unwrap();
+ }
+ });
+ }
+
+ drop(tx);
+
+ t.join().ok().unwrap();
+}
+
+#[test]
+fn stress_receiver_multi_task_bounded_hard() {
+ const AMT: usize = 10_000;
+ const NTHREADS: u32 = 2;
+
+ let (mut tx, rx) = mpsc::channel::<usize>(0);
+ let rx = Arc::new(Mutex::new(Some(rx)));
+ let n = Arc::new(AtomicUsize::new(0));
+
+ let mut th = vec![];
+
+ for _ in 0..NTHREADS {
+ let rx = rx.clone();
+ let n = n.clone();
+
+ let t = thread::spawn(move || {
+ let mut i = 0;
+
+ loop {
+ i += 1;
+ let mut lock = rx.lock().ok().unwrap();
+
+ match lock.take() {
+ Some(mut rx) => {
+ if i % 5 == 0 {
+ let (item, rest) = rx.into_future().wait().ok().unwrap();
+
+ if item.is_none() {
+ break;
+ }
+
+ n.fetch_add(1, Ordering::Relaxed);
+ *lock = Some(rest);
+ } else {
+ // Just poll
+ let n = n.clone();
+ let r = lazy(move || {
+ let r = match rx.poll().unwrap() {
+ Async::Ready(Some(_)) => {
+ n.fetch_add(1, Ordering::Relaxed);
+ *lock = Some(rx);
+ false
+ }
+ Async::Ready(None) => {
+ true
+ }
+ Async::NotReady => {
+ *lock = Some(rx);
+ false
+ }
+ };
+
+ Ok::<bool, ()>(r)
+ }).wait().unwrap();
+
+ if r {
+ break;
+ }
+ }
+ }
+ None => break,
+ }
+ }
+ });
+
+ th.push(t);
+ }
+
+ for i in 0..AMT {
+ tx = tx.send(i).wait().unwrap();
+ }
+
+ drop(tx);
+
+ for t in th {
+ t.join().unwrap();
+ }
+
+ assert_eq!(AMT, n.load(Ordering::Relaxed));
+}
+
+/// Stress test that receiver properly receives all the messages
+/// after sender dropped.
+#[test]
+fn stress_drop_sender() {
+ fn list() -> Box<Stream<Item=i32, Error=u32>> {
+ let (tx, rx) = mpsc::channel(1);
+ tx.send(Ok(1))
+ .and_then(|tx| tx.send(Ok(2)))
+ .and_then(|tx| tx.send(Ok(3)))
+ .forget();
+ Box::new(rx.then(|r| r.unwrap()))
+ }
+
+ for _ in 0..10000 {
+ assert_eq!(list().wait().collect::<Result<Vec<_>, _>>(),
+ Ok(vec![1, 2, 3]));
+ }
+}
+
+/// Stress test that after receiver dropped,
+/// no messages are lost.
+fn stress_close_receiver_iter() {
+ let (tx, rx) = mpsc::unbounded();
+ let (unwritten_tx, unwritten_rx) = std::sync::mpsc::channel();
+ let th = thread::spawn(move || {
+ for i in 1.. {
+ if let Err(_) = tx.unbounded_send(i) {
+ unwritten_tx.send(i).expect("unwritten_tx");
+ return;
+ }
+ }
+ });
+
+ let mut rx = rx.wait();
+
+ // Read one message to make sure thread effectively started
+ assert_eq!(Some(Ok(1)), rx.next());
+
+ rx.get_mut().close();
+
+ for i in 2.. {
+ match rx.next() {
+ Some(Ok(r)) => assert!(i == r),
+ Some(Err(_)) => unreachable!(),
+ None => {
+ let unwritten = unwritten_rx.recv().expect("unwritten_rx");
+ assert_eq!(unwritten, i);
+ th.join().unwrap();
+ return;
+ }
+ }
+ }
+}
+
+#[test]
+fn stress_close_receiver() {
+ for _ in 0..10000 {
+ stress_close_receiver_iter();
+ }
+}
+
+/// Tests that after `poll_ready` indicates capacity a channel can always send without waiting.
+#[test]
+fn stress_poll_ready() {
+ // A task which checks channel capacity using poll_ready, and pushes items onto the channel when
+ // ready.
+ struct SenderTask {
+ sender: mpsc::Sender<u32>,
+ count: u32,
+ }
+ impl Future for SenderTask {
+ type Item = ();
+ type Error = ();
+ fn poll(&mut self) -> Poll<(), ()> {
+ // In a loop, check if the channel is ready. If so, push an item onto the channel
+ // (asserting that it doesn't attempt to block).
+ while self.count > 0 {
+ try_ready!(self.sender.poll_ready().map_err(|_| ()));
+ assert!(self.sender.start_send(self.count).unwrap().is_ready());
+ self.count -= 1;
+ }
+ Ok(Async::Ready(()))
+ }
+ }
+
+ const AMT: u32 = 1000;
+ const NTHREADS: u32 = 8;
+
+ /// Run a stress test using the specified channel capacity.
+ fn stress(capacity: usize) {
+ let (tx, rx) = mpsc::channel(capacity);
+ let mut threads = Vec::new();
+ for _ in 0..NTHREADS {
+ let sender = tx.clone();
+ threads.push(thread::spawn(move || {
+ SenderTask {
+ sender: sender,
+ count: AMT,
+ }.wait()
+ }));
+ }
+ drop(tx);
+
+ let mut rx = rx.wait();
+ for _ in 0..AMT * NTHREADS {
+ assert!(rx.next().is_some());
+ }
+
+ assert!(rx.next().is_none());
+
+ for thread in threads {
+ thread.join().unwrap().unwrap();
+ }
+ }
+
+ stress(0);
+ stress(1);
+ stress(8);
+ stress(16);
+}
+
+fn is_ready<T>(res: &AsyncSink<T>) -> bool {
+ match *res {
+ AsyncSink::Ready => true,
+ _ => false,
+ }
+}
+
+#[test]
+fn try_send_1() {
+ const N: usize = 3000;
+ let (mut tx, rx) = mpsc::channel(0);
+
+ let t = thread::spawn(move || {
+ for i in 0..N {
+ loop {
+ if tx.try_send(i).is_ok() {
+ break
+ }
+ }
+ }
+ });
+ for (i, j) in rx.wait().enumerate() {
+ assert_eq!(i, j.unwrap());
+ }
+ t.join().unwrap();
+}
+
+#[test]
+fn try_send_2() {
+ let (mut tx, rx) = mpsc::channel(0);
+
+ tx.try_send("hello").unwrap();
+
+ let (readytx, readyrx) = oneshot::channel::<()>();
+
+ let th = thread::spawn(|| {
+ lazy(|| {
+ assert!(tx.start_send("fail").unwrap().is_not_ready());
+ Ok::<_, ()>(())
+ }).wait().unwrap();
+
+ drop(readytx);
+ tx.send("goodbye").wait().unwrap();
+ });
+
+ let mut rx = rx.wait();
+
+ drop(readyrx.wait());
+ assert_eq!(rx.next(), Some(Ok("hello")));
+ assert_eq!(rx.next(), Some(Ok("goodbye")));
+ assert!(rx.next().is_none());
+
+ th.join().unwrap();
+}
+
+#[test]
+fn try_send_fail() {
+ let (mut tx, rx) = mpsc::channel(0);
+ let mut rx = rx.wait();
+
+ tx.try_send("hello").unwrap();
+
+ // This should fail
+ assert!(tx.try_send("fail").is_err());
+
+ assert_eq!(rx.next(), Some(Ok("hello")));
+
+ tx.try_send("goodbye").unwrap();
+ drop(tx);
+
+ assert_eq!(rx.next(), Some(Ok("goodbye")));
+ assert!(rx.next().is_none());
+}
+
+#[test]
+fn bounded_is_really_bounded() {
+ use futures::Async::*;
+ let (mut tx, mut rx) = mpsc::channel(0);
+ lazy(|| {
+ assert!(tx.start_send(1).unwrap().is_ready());
+ // Not ready until we receive
+ assert!(!tx.poll_complete().unwrap().is_ready());
+ // Receive the value
+ assert_eq!(rx.poll().unwrap(), Ready(Some(1)));
+ // Now the sender is ready
+ assert!(tx.poll_complete().unwrap().is_ready());
+ Ok::<_, ()>(())
+ }).wait().unwrap();
+}
diff --git a/third_party/rust/futures-0.1.31/tests/oneshot.rs b/third_party/rust/futures-0.1.31/tests/oneshot.rs
new file mode 100644
index 0000000000..45c1996876
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/oneshot.rs
@@ -0,0 +1,253 @@
+extern crate futures;
+
+use std::sync::mpsc;
+use std::thread;
+
+use futures::prelude::*;
+use futures::future::{lazy, ok};
+use futures::sync::oneshot::*;
+
+mod support;
+use support::*;
+
+#[test]
+fn smoke_poll() {
+ let (mut tx, rx) = channel::<u32>();
+ let mut task = futures::executor::spawn(lazy(|| {
+ assert!(tx.poll_cancel().unwrap().is_not_ready());
+ assert!(tx.poll_cancel().unwrap().is_not_ready());
+ drop(rx);
+ assert!(tx.poll_cancel().unwrap().is_ready());
+ assert!(tx.poll_cancel().unwrap().is_ready());
+ ok::<(), ()>(())
+ }));
+ assert!(task.poll_future_notify(&notify_noop(), 0).unwrap().is_ready());
+}
+
+#[test]
+fn cancel_notifies() {
+ let (tx, rx) = channel::<u32>();
+ let (tx2, rx2) = mpsc::channel();
+
+ WaitForCancel { tx: tx }.then(move |v| tx2.send(v)).forget();
+ drop(rx);
+ rx2.recv().unwrap().unwrap();
+}
+
+struct WaitForCancel {
+ tx: Sender<u32>,
+}
+
+impl Future for WaitForCancel {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ self.tx.poll_cancel()
+ }
+}
+
+#[test]
+fn cancel_lots() {
+ let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
+ let t = thread::spawn(move || {
+ for (tx, tx2) in rx {
+ WaitForCancel { tx: tx }.then(move |v| tx2.send(v)).forget();
+ }
+
+ });
+
+ for _ in 0..20000 {
+ let (otx, orx) = channel::<u32>();
+ let (tx2, rx2) = mpsc::channel();
+ tx.send((otx, tx2)).unwrap();
+ drop(orx);
+ rx2.recv().unwrap().unwrap();
+ }
+ drop(tx);
+
+ t.join().unwrap();
+}
+
+#[test]
+fn close() {
+ let (mut tx, mut rx) = channel::<u32>();
+ rx.close();
+ assert!(rx.poll().is_err());
+ assert!(tx.poll_cancel().unwrap().is_ready());
+}
+
+#[test]
+fn close_wakes() {
+ let (tx, mut rx) = channel::<u32>();
+ let (tx2, rx2) = mpsc::channel();
+ let t = thread::spawn(move || {
+ rx.close();
+ rx2.recv().unwrap();
+ });
+ WaitForCancel { tx: tx }.wait().unwrap();
+ tx2.send(()).unwrap();
+ t.join().unwrap();
+}
+
+#[test]
+fn is_canceled() {
+ let (tx, rx) = channel::<u32>();
+ assert!(!tx.is_canceled());
+ drop(rx);
+ assert!(tx.is_canceled());
+}
+
+#[test]
+fn cancel_sends() {
+ let (tx, rx) = mpsc::channel::<Sender<_>>();
+ let t = thread::spawn(move || {
+ for otx in rx {
+ let _ = otx.send(42);
+ }
+ });
+
+ for _ in 0..20000 {
+ let (otx, mut orx) = channel::<u32>();
+ tx.send(otx).unwrap();
+
+ orx.close();
+ // Not necessary to wrap in a task because the implementation of oneshot
+ // never calls `task::current()` if the channel has been closed already.
+ let _ = orx.poll();
+ }
+
+ drop(tx);
+ t.join().unwrap();
+}
+
+#[test]
+fn spawn_sends_items() {
+ let core = local_executor::Core::new();
+ let future = ok::<_, ()>(1);
+ let rx = spawn(future, &core);
+ assert_eq!(core.run(rx).unwrap(), 1);
+}
+
+#[test]
+fn spawn_kill_dead_stream() {
+ use std::thread;
+ use std::time::Duration;
+ use futures::future::Either;
+ use futures::sync::oneshot;
+
+ // a future which never returns anything (forever accepting incoming
+ // connections), but dropping it leads to observable side effects
+ // (like closing listening sockets, releasing limited resources,
+ // ...)
+ #[derive(Debug)]
+ struct Dead {
+ // when dropped you should get Err(oneshot::Canceled) on the
+ // receiving end
+ done: oneshot::Sender<()>,
+ }
+ impl Future for Dead {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ Ok(Async::NotReady)
+ }
+ }
+
+ // need to implement a timeout for the test, as it would hang
+ // forever right now
+ let (timeout_tx, timeout_rx) = oneshot::channel();
+ thread::spawn(move || {
+ thread::sleep(Duration::from_millis(1000));
+ let _ = timeout_tx.send(());
+ });
+
+ let core = local_executor::Core::new();
+ let (done_tx, done_rx) = oneshot::channel();
+ let future = Dead{done: done_tx};
+ let rx = spawn(future, &core);
+ let res = core.run(
+ Ok::<_, ()>(())
+ .into_future()
+ .then(move |_| {
+ // now drop the spawned future: maybe some timeout exceeded,
+ // or some connection on this end was closed by the remote
+ // end.
+ drop(rx);
+ // and wait for the spawned future to release its resources
+ done_rx
+ })
+ .select2(timeout_rx)
+ );
+ match res {
+ Err(Either::A((oneshot::Canceled, _))) => (),
+ Ok(Either::B(((), _))) => {
+ panic!("dead future wasn't canceled (timeout)");
+ },
+ _ => {
+ panic!("dead future wasn't canceled (unexpected result)");
+ },
+ }
+}
+
+#[test]
+fn spawn_dont_kill_forgot_dead_stream() {
+ use std::thread;
+ use std::time::Duration;
+ use futures::future::Either;
+ use futures::sync::oneshot;
+
+ // a future which never returns anything (forever accepting incoming
+ // connections), but dropping it leads to observable side effects
+ // (like closing listening sockets, releasing limited resources,
+ // ...)
+ #[derive(Debug)]
+ struct Dead {
+ // when dropped you should get Err(oneshot::Canceled) on the
+ // receiving end
+ done: oneshot::Sender<()>,
+ }
+ impl Future for Dead {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ Ok(Async::NotReady)
+ }
+ }
+
+ // need to implement a timeout for the test, as it would hang
+ // forever right now
+ let (timeout_tx, timeout_rx) = oneshot::channel();
+ thread::spawn(move || {
+ thread::sleep(Duration::from_millis(1000));
+ let _ = timeout_tx.send(());
+ });
+
+ let core = local_executor::Core::new();
+ let (done_tx, done_rx) = oneshot::channel();
+ let future = Dead{done: done_tx};
+ let rx = spawn(future, &core);
+ let res = core.run(
+ Ok::<_, ()>(())
+ .into_future()
+ .then(move |_| {
+ // forget the spawned future: should keep running, i.e. hit
+ // the timeout below.
+ rx.forget();
+ // and wait for the spawned future to release its resources
+ done_rx
+ })
+ .select2(timeout_rx)
+ );
+ match res {
+ Err(Either::A((oneshot::Canceled, _))) => {
+ panic!("forgotten dead future was canceled");
+ },
+ Ok(Either::B(((), _))) => (), // reached timeout
+ _ => {
+ panic!("forgotten dead future was canceled (unexpected result)");
+ },
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/tests/ready_queue.rs b/third_party/rust/futures-0.1.31/tests/ready_queue.rs
new file mode 100644
index 0000000000..b0dc2375ba
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/ready_queue.rs
@@ -0,0 +1,164 @@
+extern crate futures;
+
+use std::panic::{self, AssertUnwindSafe};
+
+use futures::prelude::*;
+use futures::Async::*;
+use futures::future;
+use futures::stream::FuturesUnordered;
+use futures::sync::oneshot;
+
+trait AssertSendSync: Send + Sync {}
+impl AssertSendSync for FuturesUnordered<()> {}
+
+#[test]
+fn basic_usage() {
+ future::lazy(move || {
+ let mut queue = FuturesUnordered::new();
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
+ let (tx3, rx3) = oneshot::channel();
+
+ queue.push(rx1);
+ queue.push(rx2);
+ queue.push(rx3);
+
+ assert!(!queue.poll().unwrap().is_ready());
+
+ tx2.send("hello").unwrap();
+
+ assert_eq!(Ready(Some("hello")), queue.poll().unwrap());
+ assert!(!queue.poll().unwrap().is_ready());
+
+ tx1.send("world").unwrap();
+ tx3.send("world2").unwrap();
+
+ assert_eq!(Ready(Some("world")), queue.poll().unwrap());
+ assert_eq!(Ready(Some("world2")), queue.poll().unwrap());
+ assert_eq!(Ready(None), queue.poll().unwrap());
+
+ Ok::<_, ()>(())
+ }).wait().unwrap();
+}
+
+#[test]
+fn resolving_errors() {
+ future::lazy(move || {
+ let mut queue = FuturesUnordered::new();
+ let (tx1, rx1) = oneshot::channel();
+ let (tx2, rx2) = oneshot::channel();
+ let (tx3, rx3) = oneshot::channel();
+
+ queue.push(rx1);
+ queue.push(rx2);
+ queue.push(rx3);
+
+ assert!(!queue.poll().unwrap().is_ready());
+
+ drop(tx2);
+
+ assert!(queue.poll().is_err());
+ assert!(!queue.poll().unwrap().is_ready());
+
+ drop(tx1);
+ tx3.send("world2").unwrap();
+
+ assert!(queue.poll().is_err());
+ assert_eq!(Ready(Some("world2")), queue.poll().unwrap());
+ assert_eq!(Ready(None), queue.poll().unwrap());
+
+ Ok::<_, ()>(())
+ }).wait().unwrap();
+}
+
+#[test]
+fn dropping_ready_queue() {
+ future::lazy(move || {
+ let mut queue = FuturesUnordered::new();
+ let (mut tx1, rx1) = oneshot::channel::<()>();
+ let (mut tx2, rx2) = oneshot::channel::<()>();
+ let (mut tx3, rx3) = oneshot::channel::<()>();
+
+ queue.push(rx1);
+ queue.push(rx2);
+ queue.push(rx3);
+
+ assert!(!tx1.poll_cancel().unwrap().is_ready());
+ assert!(!tx2.poll_cancel().unwrap().is_ready());
+ assert!(!tx3.poll_cancel().unwrap().is_ready());
+
+ drop(queue);
+
+ assert!(tx1.poll_cancel().unwrap().is_ready());
+ assert!(tx2.poll_cancel().unwrap().is_ready());
+ assert!(tx3.poll_cancel().unwrap().is_ready());
+
+ Ok::<_, ()>(())
+ }).wait().unwrap();
+}
+
+#[test]
+fn stress() {
+ const ITER: usize = 300;
+
+ use std::sync::{Arc, Barrier};
+ use std::thread;
+
+ for i in 0..ITER {
+ let n = (i % 10) + 1;
+
+ let mut queue = FuturesUnordered::new();
+
+ for _ in 0..5 {
+ let barrier = Arc::new(Barrier::new(n + 1));
+
+ for num in 0..n {
+ let barrier = barrier.clone();
+ let (tx, rx) = oneshot::channel();
+
+ queue.push(rx);
+
+ thread::spawn(move || {
+ barrier.wait();
+ tx.send(num).unwrap();
+ });
+ }
+
+ barrier.wait();
+
+ let mut sync = queue.wait();
+
+ let mut rx: Vec<_> = (&mut sync)
+ .take(n)
+ .map(|res| res.unwrap())
+ .collect();
+
+ assert_eq!(rx.len(), n);
+
+ rx.sort();
+
+ for num in 0..n {
+ assert_eq!(rx[num], num);
+ }
+
+ queue = sync.into_inner();
+ }
+ }
+}
+
+#[test]
+fn panicking_future_dropped() {
+ future::lazy(move || {
+ let mut queue = FuturesUnordered::new();
+ queue.push(future::poll_fn(|| -> Poll<i32, i32> {
+ panic!()
+ }));
+
+ let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll()));
+ assert!(r.is_err());
+ assert!(queue.is_empty());
+ assert_eq!(Ready(None), queue.poll().unwrap());
+
+ Ok::<_, ()>(())
+ }).wait().unwrap();
+}
diff --git a/third_party/rust/futures-0.1.31/tests/recurse.rs b/third_party/rust/futures-0.1.31/tests/recurse.rs
new file mode 100644
index 0000000000..a521ed13b7
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/recurse.rs
@@ -0,0 +1,25 @@
+#![allow(bare_trait_objects, unknown_lints)]
+
+extern crate futures;
+
+use std::sync::mpsc::channel;
+
+use futures::future::ok;
+use futures::prelude::*;
+
+#[test]
+fn lots() {
+ fn doit(n: usize) -> Box<Future<Item=(), Error=()> + Send> {
+ if n == 0 {
+ Box::new(ok(()))
+ } else {
+ Box::new(ok(n - 1).and_then(doit))
+ }
+ }
+
+ let (tx, rx) = channel();
+ ::std::thread::spawn(|| {
+ doit(1_000).map(move |_| tx.send(()).unwrap()).wait()
+ });
+ rx.recv().unwrap();
+}
diff --git a/third_party/rust/futures-0.1.31/tests/select_all.rs b/third_party/rust/futures-0.1.31/tests/select_all.rs
new file mode 100644
index 0000000000..7780aa306d
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/select_all.rs
@@ -0,0 +1,27 @@
+extern crate futures;
+
+use futures::prelude::*;
+use futures::future::{ok, select_all, err};
+
+#[test]
+fn smoke() {
+ let v = vec![
+ ok(1),
+ err(2),
+ ok(3),
+ ];
+
+ let (i, idx, v) = select_all(v).wait().ok().unwrap();
+ assert_eq!(i, 1);
+ assert_eq!(idx, 0);
+
+ let (i, idx, v) = select_all(v).wait().err().unwrap();
+ assert_eq!(i, 2);
+ assert_eq!(idx, 0);
+
+ let (i, idx, v) = select_all(v).wait().ok().unwrap();
+ assert_eq!(i, 3);
+ assert_eq!(idx, 0);
+
+ assert!(v.is_empty());
+}
diff --git a/third_party/rust/futures-0.1.31/tests/select_ok.rs b/third_party/rust/futures-0.1.31/tests/select_ok.rs
new file mode 100644
index 0000000000..85f39e2d39
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/select_ok.rs
@@ -0,0 +1,40 @@
+extern crate futures;
+
+use futures::future::*;
+
+#[test]
+fn ignore_err() {
+ let v = vec![
+ err(1),
+ err(2),
+ ok(3),
+ ok(4),
+ ];
+
+ let (i, v) = select_ok(v).wait().ok().unwrap();
+ assert_eq!(i, 3);
+
+ assert_eq!(v.len(), 1);
+
+ let (i, v) = select_ok(v).wait().ok().unwrap();
+ assert_eq!(i, 4);
+
+ assert!(v.is_empty());
+}
+
+#[test]
+fn last_err() {
+ let v = vec![
+ ok(1),
+ err(2),
+ err(3),
+ ];
+
+ let (i, v) = select_ok(v).wait().ok().unwrap();
+ assert_eq!(i, 1);
+
+ assert_eq!(v.len(), 2);
+
+ let i = select_ok(v).wait().err().unwrap();
+ assert_eq!(i, 3);
+}
diff --git a/third_party/rust/futures-0.1.31/tests/shared.rs b/third_party/rust/futures-0.1.31/tests/shared.rs
new file mode 100644
index 0000000000..97989fe2cb
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/shared.rs
@@ -0,0 +1,236 @@
+#![allow(bare_trait_objects, unknown_lints)]
+
+extern crate futures;
+
+mod support;
+
+use std::cell::RefCell;
+use std::rc::Rc;
+use std::thread;
+
+use futures::sync::oneshot;
+use futures::prelude::*;
+use futures::future;
+
+fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) {
+ let (tx, rx) = oneshot::channel::<u32>();
+ let f = rx.shared();
+ let threads = (0..threads_number).map(|_| {
+ let cloned_future = f.clone();
+ thread::spawn(move || {
+ assert_eq!(*cloned_future.wait().unwrap(), 6);
+ })
+ }).collect::<Vec<_>>();
+ tx.send(6).unwrap();
+ assert_eq!(*f.wait().unwrap(), 6);
+ for f in threads {
+ f.join().unwrap();
+ }
+}
+
+#[test]
+fn one_thread() {
+ send_shared_oneshot_and_wait_on_multiple_threads(1);
+}
+
+#[test]
+fn two_threads() {
+ send_shared_oneshot_and_wait_on_multiple_threads(2);
+}
+
+#[test]
+fn many_threads() {
+ send_shared_oneshot_and_wait_on_multiple_threads(1000);
+}
+
+#[test]
+fn drop_on_one_task_ok() {
+ let (tx, rx) = oneshot::channel::<u32>();
+ let f1 = rx.shared();
+ let f2 = f1.clone();
+
+ let (tx2, rx2) = oneshot::channel::<u32>();
+
+ let t1 = thread::spawn(|| {
+ let f = f1.map_err(|_| ()).map(|x| *x).select(rx2.map_err(|_| ()));
+ drop(f.wait());
+ });
+
+ let (tx3, rx3) = oneshot::channel::<u32>();
+
+ let t2 = thread::spawn(|| {
+ let _ = f2.map(|x| tx3.send(*x).unwrap()).map_err(|_| ()).wait();
+ });
+
+ tx2.send(11).unwrap(); // cancel `f1`
+ t1.join().unwrap();
+
+ tx.send(42).unwrap(); // Should cause `f2` and then `rx3` to get resolved.
+ let result = rx3.wait().unwrap();
+ assert_eq!(result, 42);
+ t2.join().unwrap();
+}
+
+#[test]
+fn drop_in_poll() {
+ let slot = Rc::new(RefCell::new(None));
+ let slot2 = slot.clone();
+ let future = future::poll_fn(move || {
+ drop(slot2.borrow_mut().take().unwrap());
+ Ok::<_, u32>(1.into())
+ }).shared();
+ let future2 = Box::new(future.clone()) as Box<Future<Item=_, Error=_>>;
+ *slot.borrow_mut() = Some(future2);
+ assert_eq!(*future.wait().unwrap(), 1);
+}
+
+#[test]
+fn peek() {
+ let core = ::support::local_executor::Core::new();
+
+ let (tx0, rx0) = oneshot::channel::<u32>();
+ let f1 = rx0.shared();
+ let f2 = f1.clone();
+
+ // Repeated calls on the original or clone do not change the outcome.
+ for _ in 0..2 {
+ assert!(f1.peek().is_none());
+ assert!(f2.peek().is_none());
+ }
+
+ // Completing the underlying future has no effect, because the value has not been `poll`ed in.
+ tx0.send(42).unwrap();
+ for _ in 0..2 {
+ assert!(f1.peek().is_none());
+ assert!(f2.peek().is_none());
+ }
+
+ // Once the Shared has been polled, the value is peekable on the clone.
+ core.spawn(f1.map(|_|()).map_err(|_|()));
+ core.run(future::ok::<(),()>(())).unwrap();
+ for _ in 0..2 {
+ assert_eq!(42, *f2.peek().unwrap().unwrap());
+ }
+}
+
+#[test]
+fn polled_then_ignored() {
+ let core = ::support::local_executor::Core::new();
+
+ let (tx0, rx0) = oneshot::channel::<u32>();
+ let f1 = rx0.shared();
+ let f2 = f1.clone();
+
+ let (tx1, rx1) = oneshot::channel::<u32>();
+ let (tx2, rx2) = oneshot::channel::<u32>();
+ let (tx3, rx3) = oneshot::channel::<u32>();
+
+ core.spawn(f1.map(|n| tx3.send(*n).unwrap()).map_err(|_|()));
+
+ core.run(future::ok::<(),()>(())).unwrap(); // Allow f1 to be polled.
+
+ core.spawn(f2.map_err(|_| ()).map(|x| *x).select(rx2.map_err(|_| ())).map_err(|_| ())
+ .and_then(|(_, f2)| rx3.map_err(|_| ()).map(move |n| {drop(f2); tx1.send(n).unwrap()})));
+
+ core.run(future::ok::<(),()>(())).unwrap(); // Allow f2 to be polled.
+
+ tx2.send(11).unwrap(); // Resolve rx2, causing f2 to no longer get polled.
+
+ core.run(future::ok::<(),()>(())).unwrap(); // Let the send() propagate.
+
+ tx0.send(42).unwrap(); // Should cause f1, then rx3, and then rx1 to resolve.
+
+ assert_eq!(core.run(rx1).unwrap(), 42);
+}
+
+#[test]
+fn recursive_poll() {
+ use futures::sync::mpsc;
+ use futures::Stream;
+
+ let core = ::support::local_executor::Core::new();
+ let (tx0, rx0) = mpsc::unbounded::<Box<Future<Item=(),Error=()>>>();
+ let run_stream = rx0.for_each(|f| f);
+
+ let (tx1, rx1) = oneshot::channel::<()>();
+
+ let f1 = run_stream.shared();
+ let f2 = f1.clone();
+ let f3 = f1.clone();
+ tx0.unbounded_send(Box::new(
+ f1.map(|_|()).map_err(|_|())
+ .select(rx1.map_err(|_|()))
+ .map(|_| ()).map_err(|_|()))).unwrap();
+
+ core.spawn(f2.map(|_|()).map_err(|_|()));
+
+ // Call poll() on the spawned future. We want to be sure that this does not trigger a
+ // deadlock or panic due to a recursive lock() on a mutex.
+ core.run(future::ok::<(),()>(())).unwrap();
+
+ tx1.send(()).unwrap(); // Break the cycle.
+ drop(tx0);
+ core.run(f3).unwrap();
+}
+
+#[test]
+fn recursive_poll_with_unpark() {
+ use futures::sync::mpsc;
+ use futures::{Stream, task};
+
+ let core = ::support::local_executor::Core::new();
+ let (tx0, rx0) = mpsc::unbounded::<Box<Future<Item=(),Error=()>>>();
+ let run_stream = rx0.for_each(|f| f);
+
+ let (tx1, rx1) = oneshot::channel::<()>();
+
+ let f1 = run_stream.shared();
+ let f2 = f1.clone();
+ let f3 = f1.clone();
+ tx0.unbounded_send(Box::new(future::lazy(move || {
+ task::current().notify();
+ f1.map(|_|()).map_err(|_|())
+ .select(rx1.map_err(|_|()))
+ .map(|_| ()).map_err(|_|())
+ }))).unwrap();
+
+ core.spawn(f2.map(|_|()).map_err(|_|()));
+
+ // Call poll() on the spawned future. We want to be sure that this does not trigger a
+ // deadlock or panic due to a recursive lock() on a mutex.
+ core.run(future::ok::<(),()>(())).unwrap();
+
+ tx1.send(()).unwrap(); // Break the cycle.
+ drop(tx0);
+ core.run(f3).unwrap();
+}
+
+#[test]
+fn shared_future_that_wakes_itself_until_pending_is_returned() {
+ use futures::Async;
+ use std::cell::Cell;
+
+ let core = ::support::local_executor::Core::new();
+
+ let proceed = Cell::new(false);
+ let fut = futures::future::poll_fn(|| {
+ Ok::<_, ()>(if proceed.get() {
+ Async::Ready(())
+ } else {
+ futures::task::current().notify();
+ Async::NotReady
+ })
+ })
+ .shared()
+ .map(|_| ())
+ .map_err(|_| ());
+
+ // The join future can only complete if the second future gets a chance to run after the first
+ // has returned pending
+ let second = futures::future::lazy(|| {
+ proceed.set(true);
+ Ok::<_, ()>(())
+ });
+
+ core.run(fut.join(second)).unwrap();
+}
diff --git a/third_party/rust/futures-0.1.31/tests/sink.rs b/third_party/rust/futures-0.1.31/tests/sink.rs
new file mode 100644
index 0000000000..460dbdf20c
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/sink.rs
@@ -0,0 +1,446 @@
+#![allow(bare_trait_objects, unknown_lints)]
+
+extern crate futures;
+
+use std::mem;
+use std::sync::Arc;
+use std::rc::Rc;
+use std::cell::{Cell, RefCell};
+use std::sync::atomic::{Ordering, AtomicBool};
+
+use futures::prelude::*;
+use futures::future::ok;
+use futures::stream;
+use futures::sync::{oneshot, mpsc};
+use futures::task::{self, Task};
+use futures::executor::{self, Notify};
+use futures::sink::SinkFromErr;
+
+mod support;
+use support::*;
+
+#[test]
+fn vec_sink() {
+ let mut v = Vec::new();
+ assert_eq!(v.start_send(0), Ok(AsyncSink::Ready));
+ assert_eq!(v.start_send(1), Ok(AsyncSink::Ready));
+ assert_eq!(v, vec![0, 1]);
+ assert_done(move || v.flush(), Ok(vec![0, 1]));
+}
+
+#[test]
+fn send() {
+ let v = Vec::new();
+
+ let v = v.send(0).wait().unwrap();
+ assert_eq!(v, vec![0]);
+
+ let v = v.send(1).wait().unwrap();
+ assert_eq!(v, vec![0, 1]);
+
+ assert_done(move || v.send(2),
+ Ok(vec![0, 1, 2]));
+}
+
+#[test]
+fn send_all() {
+ let v = Vec::new();
+
+ let (v, _) = v.send_all(stream::iter_ok(vec![0, 1])).wait().unwrap();
+ assert_eq!(v, vec![0, 1]);
+
+ let (v, _) = v.send_all(stream::iter_ok(vec![2, 3])).wait().unwrap();
+ assert_eq!(v, vec![0, 1, 2, 3]);
+
+ assert_done(
+ move || v.send_all(stream::iter_ok(vec![4, 5])).map(|(v, _)| v),
+ Ok(vec![0, 1, 2, 3, 4, 5]));
+}
+
+// An Unpark struct that records unpark events for inspection
+struct Flag(pub AtomicBool);
+
+impl Flag {
+ fn new() -> Arc<Flag> {
+ Arc::new(Flag(AtomicBool::new(false)))
+ }
+
+ fn get(&self) -> bool {
+ self.0.load(Ordering::SeqCst)
+ }
+
+ fn set(&self, v: bool) {
+ self.0.store(v, Ordering::SeqCst)
+ }
+}
+
+impl Notify for Flag {
+ fn notify(&self, _id: usize) {
+ self.set(true)
+ }
+}
+
+// Sends a value on an i32 channel sink
+struct StartSendFut<S: Sink>(Option<S>, Option<S::SinkItem>);
+
+impl<S: Sink> StartSendFut<S> {
+ fn new(sink: S, item: S::SinkItem) -> StartSendFut<S> {
+ StartSendFut(Some(sink), Some(item))
+ }
+}
+
+impl<S: Sink> Future for StartSendFut<S> {
+ type Item = S;
+ type Error = S::SinkError;
+
+ fn poll(&mut self) -> Poll<S, S::SinkError> {
+ match self.0.as_mut().unwrap().start_send(self.1.take().unwrap())? {
+ AsyncSink::Ready => Ok(Async::Ready(self.0.take().unwrap())),
+ AsyncSink::NotReady(item) => {
+ self.1 = Some(item);
+ Ok(Async::NotReady)
+ }
+ }
+
+ }
+}
+
+#[test]
+// Test that `start_send` on an `mpsc` channel does indeed block when the
+// channel is full
+fn mpsc_blocking_start_send() {
+ let (mut tx, mut rx) = mpsc::channel::<i32>(0);
+
+ futures::future::lazy(|| {
+ assert_eq!(tx.start_send(0).unwrap(), AsyncSink::Ready);
+
+ let flag = Flag::new();
+ let mut task = executor::spawn(StartSendFut::new(tx, 1));
+
+ assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
+ assert!(!flag.get());
+ sassert_next(&mut rx, 0);
+ assert!(flag.get());
+ flag.set(false);
+ assert!(task.poll_future_notify(&flag, 0).unwrap().is_ready());
+ assert!(!flag.get());
+ sassert_next(&mut rx, 1);
+
+ Ok::<(), ()>(())
+ }).wait().unwrap();
+}
+
+#[test]
+// test `flush` by using `with` to make the first insertion into a sink block
+// until a oneshot is completed
+fn with_flush() {
+ let (tx, rx) = oneshot::channel();
+ let mut block = Box::new(rx) as Box<Future<Item = _, Error = _>>;
+ let mut sink = Vec::new().with(|elem| {
+ mem::replace(&mut block, Box::new(ok(())))
+ .map(move |_| elem + 1).map_err(|_| -> () { panic!() })
+ });
+
+ assert_eq!(sink.start_send(0), Ok(AsyncSink::Ready));
+
+ let flag = Flag::new();
+ let mut task = executor::spawn(sink.flush());
+ assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
+ tx.send(()).unwrap();
+ assert!(flag.get());
+
+ let sink = match task.poll_future_notify(&flag, 0).unwrap() {
+ Async::Ready(sink) => sink,
+ _ => panic!()
+ };
+
+ assert_eq!(sink.send(1).wait().unwrap().get_ref(), &[1, 2]);
+}
+
+#[test]
+// test simple use of with to change data
+fn with_as_map() {
+ let sink = Vec::new().with(|item| -> Result<i32, ()> {
+ Ok(item * 2)
+ });
+ let sink = sink.send(0).wait().unwrap();
+ let sink = sink.send(1).wait().unwrap();
+ let sink = sink.send(2).wait().unwrap();
+ assert_eq!(sink.get_ref(), &[0, 2, 4]);
+}
+
+#[test]
+// test simple use of with_flat_map
+fn with_flat_map() {
+ let sink = Vec::new().with_flat_map(|item| {
+ stream::iter_ok(vec![item; item])
+ });
+ let sink = sink.send(0).wait().unwrap();
+ let sink = sink.send(1).wait().unwrap();
+ let sink = sink.send(2).wait().unwrap();
+ let sink = sink.send(3).wait().unwrap();
+ assert_eq!(sink.get_ref(), &[1,2,2,3,3,3]);
+}
+
+// Immediately accepts all requests to start pushing, but completion is managed
+// by manually flushing
+struct ManualFlush<T> {
+ data: Vec<T>,
+ waiting_tasks: Vec<Task>,
+}
+
+impl<T> Sink for ManualFlush<T> {
+ type SinkItem = Option<T>; // Pass None to flush
+ type SinkError = ();
+
+ fn start_send(&mut self, op: Option<T>) -> StartSend<Option<T>, ()> {
+ if let Some(item) = op {
+ self.data.push(item);
+ } else {
+ self.force_flush();
+ }
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), ()> {
+ if self.data.is_empty() {
+ Ok(Async::Ready(()))
+ } else {
+ self.waiting_tasks.push(task::current());
+ Ok(Async::NotReady)
+ }
+ }
+
+ fn close(&mut self) -> Poll<(), ()> {
+ Ok(().into())
+ }
+}
+
+impl<T> ManualFlush<T> {
+ fn new() -> ManualFlush<T> {
+ ManualFlush {
+ data: Vec::new(),
+ waiting_tasks: Vec::new()
+ }
+ }
+
+ fn force_flush(&mut self) -> Vec<T> {
+ for task in self.waiting_tasks.drain(..) {
+ task.notify()
+ }
+ mem::replace(&mut self.data, Vec::new())
+ }
+}
+
+#[test]
+// test that the `with` sink doesn't require the underlying sink to flush,
+// but doesn't claim to be flushed until the underlying sink is
+fn with_flush_propagate() {
+ let mut sink = ManualFlush::new().with(|x| -> Result<Option<i32>, ()> { Ok(x) });
+ assert_eq!(sink.start_send(Some(0)).unwrap(), AsyncSink::Ready);
+ assert_eq!(sink.start_send(Some(1)).unwrap(), AsyncSink::Ready);
+
+ let flag = Flag::new();
+ let mut task = executor::spawn(sink.flush());
+ assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
+ assert!(!flag.get());
+ assert_eq!(task.get_mut().get_mut().get_mut().force_flush(), vec![0, 1]);
+ assert!(flag.get());
+ assert!(task.poll_future_notify(&flag, 0).unwrap().is_ready());
+}
+
+#[test]
+// test that a buffer is a no-nop around a sink that always accepts sends
+fn buffer_noop() {
+ let sink = Vec::new().buffer(0);
+ let sink = sink.send(0).wait().unwrap();
+ let sink = sink.send(1).wait().unwrap();
+ assert_eq!(sink.get_ref(), &[0, 1]);
+
+ let sink = Vec::new().buffer(1);
+ let sink = sink.send(0).wait().unwrap();
+ let sink = sink.send(1).wait().unwrap();
+ assert_eq!(sink.get_ref(), &[0, 1]);
+}
+
+struct ManualAllow<T> {
+ data: Vec<T>,
+ allow: Rc<Allow>,
+}
+
+struct Allow {
+ flag: Cell<bool>,
+ tasks: RefCell<Vec<Task>>,
+}
+
+impl Allow {
+ fn new() -> Allow {
+ Allow {
+ flag: Cell::new(false),
+ tasks: RefCell::new(Vec::new()),
+ }
+ }
+
+ fn check(&self) -> bool {
+ if self.flag.get() {
+ true
+ } else {
+ self.tasks.borrow_mut().push(task::current());
+ false
+ }
+ }
+
+ fn start(&self) {
+ self.flag.set(true);
+ let mut tasks = self.tasks.borrow_mut();
+ for task in tasks.drain(..) {
+ task.notify();
+ }
+ }
+}
+
+impl<T> Sink for ManualAllow<T> {
+ type SinkItem = T;
+ type SinkError = ();
+
+ fn start_send(&mut self, item: T) -> StartSend<T, ()> {
+ if self.allow.check() {
+ self.data.push(item);
+ Ok(AsyncSink::Ready)
+ } else {
+ Ok(AsyncSink::NotReady(item))
+ }
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), ()> {
+ Ok(Async::Ready(()))
+ }
+
+ fn close(&mut self) -> Poll<(), ()> {
+ Ok(().into())
+ }
+}
+
+fn manual_allow<T>() -> (ManualAllow<T>, Rc<Allow>) {
+ let allow = Rc::new(Allow::new());
+ let manual_allow = ManualAllow {
+ data: Vec::new(),
+ allow: allow.clone(),
+ };
+ (manual_allow, allow)
+}
+
+#[test]
+// test basic buffer functionality, including both filling up to capacity,
+// and writing out when the underlying sink is ready
+fn buffer() {
+ let (sink, allow) = manual_allow::<i32>();
+ let sink = sink.buffer(2);
+
+ let sink = StartSendFut::new(sink, 0).wait().unwrap();
+ let sink = StartSendFut::new(sink, 1).wait().unwrap();
+
+ let flag = Flag::new();
+ let mut task = executor::spawn(sink.send(2));
+ assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
+ assert!(!flag.get());
+ allow.start();
+ assert!(flag.get());
+ match task.poll_future_notify(&flag, 0).unwrap() {
+ Async::Ready(sink) => {
+ assert_eq!(sink.get_ref().data, vec![0, 1, 2]);
+ }
+ _ => panic!()
+ }
+}
+
+#[test]
+fn fanout_smoke() {
+ let sink1 = Vec::new();
+ let sink2 = Vec::new();
+ let sink = sink1.fanout(sink2);
+ let stream = futures::stream::iter_ok(vec![1,2,3]);
+ let (sink, _) = sink.send_all(stream).wait().unwrap();
+ let (sink1, sink2) = sink.into_inner();
+ assert_eq!(sink1, vec![1,2,3]);
+ assert_eq!(sink2, vec![1,2,3]);
+}
+
+#[test]
+fn fanout_backpressure() {
+ let (left_send, left_recv) = mpsc::channel(0);
+ let (right_send, right_recv) = mpsc::channel(0);
+ let sink = left_send.fanout(right_send);
+
+ let sink = StartSendFut::new(sink, 0).wait().unwrap();
+ let sink = StartSendFut::new(sink, 1).wait().unwrap();
+
+ let flag = Flag::new();
+ let mut task = executor::spawn(sink.send(2));
+ assert!(!flag.get());
+ assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
+ let (item, left_recv) = left_recv.into_future().wait().unwrap();
+ assert_eq!(item, Some(0));
+ assert!(flag.get());
+ assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
+ let (item, right_recv) = right_recv.into_future().wait().unwrap();
+ assert_eq!(item, Some(0));
+ assert!(flag.get());
+ assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
+ let (item, left_recv) = left_recv.into_future().wait().unwrap();
+ assert_eq!(item, Some(1));
+ assert!(flag.get());
+ assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
+ let (item, right_recv) = right_recv.into_future().wait().unwrap();
+ assert_eq!(item, Some(1));
+ assert!(flag.get());
+ let (item, left_recv) = left_recv.into_future().wait().unwrap();
+ assert_eq!(item, Some(2));
+ assert!(flag.get());
+ assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
+ let (item, right_recv) = right_recv.into_future().wait().unwrap();
+ assert_eq!(item, Some(2));
+ match task.poll_future_notify(&flag, 0).unwrap() {
+ Async::Ready(_) => {
+ },
+ _ => panic!()
+ };
+ // make sure receivers live until end of test to prevent send errors
+ drop(left_recv);
+ drop(right_recv);
+}
+
+#[test]
+fn map_err() {
+ {
+ let (tx, _rx) = mpsc::channel(1);
+ let mut tx = tx.sink_map_err(|_| ());
+ assert_eq!(tx.start_send(()), Ok(AsyncSink::Ready));
+ assert_eq!(tx.poll_complete(), Ok(Async::Ready(())));
+ }
+
+ let tx = mpsc::channel(0).0;
+ assert_eq!(tx.sink_map_err(|_| ()).start_send(()), Err(()));
+}
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
+struct FromErrTest;
+
+impl<T> From<mpsc::SendError<T>> for FromErrTest {
+ fn from(_: mpsc::SendError<T>) -> FromErrTest {
+ FromErrTest
+ }
+}
+
+#[test]
+fn from_err() {
+ {
+ let (tx, _rx) = mpsc::channel(1);
+ let mut tx: SinkFromErr<mpsc::Sender<()>, FromErrTest> = tx.sink_from_err();
+ assert_eq!(tx.start_send(()), Ok(AsyncSink::Ready));
+ assert_eq!(tx.poll_complete(), Ok(Async::Ready(())));
+ }
+
+ let tx = mpsc::channel(0).0;
+ assert_eq!(tx.sink_from_err().start_send(()), Err(FromErrTest));
+}
diff --git a/third_party/rust/futures-0.1.31/tests/split.rs b/third_party/rust/futures-0.1.31/tests/split.rs
new file mode 100644
index 0000000000..7a0667f135
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/split.rs
@@ -0,0 +1,47 @@
+extern crate futures;
+
+use futures::prelude::*;
+use futures::stream::iter_ok;
+
+struct Join<T, U>(T, U);
+
+impl<T: Stream, U> Stream for Join<T, U> {
+ type Item = T::Item;
+ type Error = T::Error;
+
+ fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
+ self.0.poll()
+ }
+}
+
+impl<T, U: Sink> Sink for Join<T, U> {
+ type SinkItem = U::SinkItem;
+ type SinkError = U::SinkError;
+
+ fn start_send(&mut self, item: U::SinkItem)
+ -> StartSend<U::SinkItem, U::SinkError>
+ {
+ self.1.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), U::SinkError> {
+ self.1.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), U::SinkError> {
+ self.1.close()
+ }
+}
+
+#[test]
+fn test_split() {
+ let mut dest = Vec::new();
+ {
+ let j = Join(iter_ok(vec![10, 20, 30]), &mut dest);
+ let (sink, stream) = j.split();
+ let j = sink.reunite(stream).expect("test_split: reunite error");
+ let (sink, stream) = j.split();
+ sink.send_all(stream).wait().unwrap();
+ }
+ assert_eq!(dest, vec![10, 20, 30]);
+}
diff --git a/third_party/rust/futures-0.1.31/tests/stream.rs b/third_party/rust/futures-0.1.31/tests/stream.rs
new file mode 100644
index 0000000000..2400a2abb1
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/stream.rs
@@ -0,0 +1,416 @@
+#![allow(bare_trait_objects, unknown_lints)]
+
+#[macro_use]
+extern crate futures;
+
+use futures::prelude::*;
+use futures::executor;
+use futures::future::{err, ok};
+use futures::stream::{empty, iter_ok, poll_fn, Peekable};
+use futures::sync::oneshot;
+use futures::sync::mpsc;
+
+mod support;
+use support::*;
+
+pub struct Iter<I> {
+ iter: I,
+}
+
+pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter>
+ where J: IntoIterator<Item=Result<T, E>>,
+{
+ Iter {
+ iter: i.into_iter(),
+ }
+}
+
+impl<I, T, E> Stream for Iter<I>
+ where I: Iterator<Item=Result<T, E>>,
+{
+ type Item = T;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<T>, E> {
+ match self.iter.next() {
+ Some(Ok(e)) => Ok(Async::Ready(Some(e))),
+ Some(Err(e)) => Err(e),
+ None => Ok(Async::Ready(None)),
+ }
+ }
+}
+
+fn list() -> Box<Stream<Item=i32, Error=u32> + Send> {
+ let (tx, rx) = mpsc::channel(1);
+ tx.send(Ok(1))
+ .and_then(|tx| tx.send(Ok(2)))
+ .and_then(|tx| tx.send(Ok(3)))
+ .forget();
+ Box::new(rx.then(|r| r.unwrap()))
+}
+
+fn err_list() -> Box<Stream<Item=i32, Error=u32> + Send> {
+ let (tx, rx) = mpsc::channel(1);
+ tx.send(Ok(1))
+ .and_then(|tx| tx.send(Ok(2)))
+ .and_then(|tx| tx.send(Err(3)))
+ .forget();
+ Box::new(rx.then(|r| r.unwrap()))
+}
+
+#[test]
+fn map() {
+ assert_done(|| list().map(|a| a + 1).collect(), Ok(vec![2, 3, 4]));
+}
+
+#[test]
+fn map_err() {
+ assert_done(|| err_list().map_err(|a| a + 1).collect(), Err(4));
+}
+
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
+struct FromErrTest(u32);
+
+impl From<u32> for FromErrTest {
+ fn from(i: u32) -> FromErrTest {
+ FromErrTest(i)
+ }
+}
+
+#[test]
+fn from_err() {
+ assert_done(|| err_list().from_err().collect(), Err(FromErrTest(3)));
+}
+
+#[test]
+fn fold() {
+ assert_done(|| list().fold(0, |a, b| ok::<i32, u32>(a + b)), Ok(6));
+ assert_done(|| err_list().fold(0, |a, b| ok::<i32, u32>(a + b)), Err(3));
+}
+
+#[test]
+fn filter() {
+ assert_done(|| list().filter(|a| *a % 2 == 0).collect(), Ok(vec![2]));
+}
+
+#[test]
+fn filter_map() {
+ assert_done(|| list().filter_map(|x| {
+ if x % 2 == 0 {
+ Some(x + 10)
+ } else {
+ None
+ }
+ }).collect(), Ok(vec![12]));
+}
+
+#[test]
+fn and_then() {
+ assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4]));
+ assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect(),
+ Err(1));
+}
+
+#[test]
+fn then() {
+ assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4]));
+
+}
+
+#[test]
+fn or_else() {
+ assert_done(|| err_list().or_else(|a| {
+ ok::<i32, u32>(a as i32)
+ }).collect(), Ok(vec![1, 2, 3]));
+}
+
+#[test]
+fn flatten() {
+ assert_done(|| list().map(|_| list()).flatten().collect(),
+ Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3]));
+
+}
+
+#[test]
+fn skip() {
+ assert_done(|| list().skip(2).collect(), Ok(vec![3]));
+}
+
+#[test]
+fn skip_passes_errors_through() {
+ let mut s = iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)])
+ .skip(1)
+ .wait();
+ assert_eq!(s.next(), Some(Err(1)));
+ assert_eq!(s.next(), Some(Err(2)));
+ assert_eq!(s.next(), Some(Ok(4)));
+ assert_eq!(s.next(), Some(Ok(5)));
+ assert_eq!(s.next(), None);
+}
+
+#[test]
+fn skip_while() {
+ assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(),
+ Ok(vec![2, 3]));
+}
+#[test]
+fn take() {
+ assert_done(|| list().take(2).collect(), Ok(vec![1, 2]));
+}
+
+#[test]
+fn take_while() {
+ assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(),
+ Ok(vec![1, 2]));
+}
+
+#[test]
+fn take_passes_errors_through() {
+ let mut s = iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)])
+ .take(1)
+ .wait();
+ assert_eq!(s.next(), Some(Err(1)));
+ assert_eq!(s.next(), Some(Err(2)));
+ assert_eq!(s.next(), Some(Ok(3)));
+ assert_eq!(s.next(), None);
+
+ let mut s = iter(vec![Ok(1), Err(2)]).take(1).wait();
+ assert_eq!(s.next(), Some(Ok(1)));
+ assert_eq!(s.next(), None);
+}
+
+#[test]
+fn peekable() {
+ assert_done(|| list().peekable().collect(), Ok(vec![1, 2, 3]));
+}
+
+#[test]
+fn fuse() {
+ let mut stream = list().fuse().wait();
+ assert_eq!(stream.next(), Some(Ok(1)));
+ assert_eq!(stream.next(), Some(Ok(2)));
+ assert_eq!(stream.next(), Some(Ok(3)));
+ assert_eq!(stream.next(), None);
+ assert_eq!(stream.next(), None);
+ assert_eq!(stream.next(), None);
+}
+
+#[test]
+fn buffered() {
+ let (tx, rx) = mpsc::channel(1);
+ let (a, b) = oneshot::channel::<u32>();
+ let (c, d) = oneshot::channel::<u32>();
+
+ tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
+ .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
+ .forget();
+
+ let mut rx = rx.buffered(2);
+ sassert_empty(&mut rx);
+ c.send(3).unwrap();
+ sassert_empty(&mut rx);
+ a.send(5).unwrap();
+ let mut rx = rx.wait();
+ assert_eq!(rx.next(), Some(Ok(5)));
+ assert_eq!(rx.next(), Some(Ok(3)));
+ assert_eq!(rx.next(), None);
+
+ let (tx, rx) = mpsc::channel(1);
+ let (a, b) = oneshot::channel::<u32>();
+ let (c, d) = oneshot::channel::<u32>();
+
+ tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
+ .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
+ .forget();
+
+ let mut rx = rx.buffered(1);
+ sassert_empty(&mut rx);
+ c.send(3).unwrap();
+ sassert_empty(&mut rx);
+ a.send(5).unwrap();
+ let mut rx = rx.wait();
+ assert_eq!(rx.next(), Some(Ok(5)));
+ assert_eq!(rx.next(), Some(Ok(3)));
+ assert_eq!(rx.next(), None);
+}
+
+#[test]
+fn unordered() {
+ let (tx, rx) = mpsc::channel(1);
+ let (a, b) = oneshot::channel::<u32>();
+ let (c, d) = oneshot::channel::<u32>();
+
+ tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
+ .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
+ .forget();
+
+ let mut rx = rx.buffer_unordered(2);
+ sassert_empty(&mut rx);
+ let mut rx = rx.wait();
+ c.send(3).unwrap();
+ assert_eq!(rx.next(), Some(Ok(3)));
+ a.send(5).unwrap();
+ assert_eq!(rx.next(), Some(Ok(5)));
+ assert_eq!(rx.next(), None);
+
+ let (tx, rx) = mpsc::channel(1);
+ let (a, b) = oneshot::channel::<u32>();
+ let (c, d) = oneshot::channel::<u32>();
+
+ tx.send(Box::new(b.map_err(|_| ())) as Box<Future<Item = _, Error = _> + Send>)
+ .and_then(|tx| tx.send(Box::new(d.map_err(|_| ()))))
+ .forget();
+
+ // We don't even get to see `c` until `a` completes.
+ let mut rx = rx.buffer_unordered(1);
+ sassert_empty(&mut rx);
+ c.send(3).unwrap();
+ sassert_empty(&mut rx);
+ a.send(5).unwrap();
+ let mut rx = rx.wait();
+ assert_eq!(rx.next(), Some(Ok(5)));
+ assert_eq!(rx.next(), Some(Ok(3)));
+ assert_eq!(rx.next(), None);
+}
+
+#[test]
+fn zip() {
+ assert_done(|| list().zip(list()).collect(),
+ Ok(vec![(1, 1), (2, 2), (3, 3)]));
+ assert_done(|| list().zip(list().take(2)).collect(),
+ Ok(vec![(1, 1), (2, 2)]));
+ assert_done(|| list().take(2).zip(list()).collect(),
+ Ok(vec![(1, 1), (2, 2)]));
+ assert_done(|| err_list().zip(list()).collect(), Err(3));
+ assert_done(|| list().zip(list().map(|x| x + 1)).collect(),
+ Ok(vec![(1, 2), (2, 3), (3, 4)]));
+}
+
+#[test]
+fn peek() {
+ struct Peek {
+ inner: Peekable<Box<Stream<Item = i32, Error =u32> + Send>>
+ }
+
+ impl Future for Peek {
+ type Item = ();
+ type Error = u32;
+
+ fn poll(&mut self) -> Poll<(), u32> {
+ {
+ let res = try_ready!(self.inner.peek());
+ assert_eq!(res, Some(&1));
+ }
+ assert_eq!(self.inner.peek().unwrap(), Some(&1).into());
+ assert_eq!(self.inner.poll().unwrap(), Some(1).into());
+ Ok(().into())
+ }
+ }
+
+ Peek {
+ inner: list().peekable(),
+ }.wait().unwrap()
+}
+
+#[test]
+fn wait() {
+ assert_eq!(list().wait().collect::<Result<Vec<_>, _>>(),
+ Ok(vec![1, 2, 3]));
+}
+
+#[test]
+fn chunks() {
+ assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]]));
+ assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]]));
+ assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]]));
+ let mut list = executor::spawn(err_list().chunks(3));
+ let i = list.wait_stream().unwrap().unwrap();
+ assert_eq!(i, vec![1, 2]);
+ let i = list.wait_stream().unwrap().unwrap_err();
+ assert_eq!(i, 3);
+}
+
+#[test]
+#[should_panic]
+fn chunks_panic_on_cap_zero() {
+ let _ = list().chunks(0);
+}
+
+#[test]
+fn select() {
+ let a = iter_ok::<_, u32>(vec![1, 2, 3]);
+ let b = iter_ok(vec![4, 5, 6]);
+ assert_done(|| a.select(b).collect(), Ok(vec![1, 4, 2, 5, 3, 6]));
+
+ let a = iter_ok::<_, u32>(vec![1, 2, 3]);
+ let b = iter_ok(vec![1, 2]);
+ assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3]));
+
+ let a = iter_ok(vec![1, 2]);
+ let b = iter_ok::<_, u32>(vec![1, 2, 3]);
+ assert_done(|| a.select(b).collect(), Ok(vec![1, 1, 2, 2, 3]));
+}
+
+#[test]
+fn forward() {
+ let v = Vec::new();
+ let v = iter_ok::<_, ()>(vec![0, 1]).forward(v).wait().unwrap().1;
+ assert_eq!(v, vec![0, 1]);
+
+ let v = iter_ok::<_, ()>(vec![2, 3]).forward(v).wait().unwrap().1;
+ assert_eq!(v, vec![0, 1, 2, 3]);
+
+ assert_done(move || iter_ok(vec![4, 5]).forward(v).map(|(_, s)| s),
+ Ok::<_, ()>(vec![0, 1, 2, 3, 4, 5]));
+}
+
+#[test]
+#[allow(deprecated)]
+fn concat() {
+ let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
+ assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
+
+ let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
+ assert_done(move || b.concat(), Err(()));
+}
+
+#[test]
+fn concat2() {
+ let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
+ assert_done(move || a.concat2(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));
+
+ let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]);
+ assert_done(move || b.concat2(), Err(()));
+
+ let c = empty::<Vec<()>, ()>();
+ assert_done(move || c.concat2(), Ok(vec![]))
+}
+
+#[test]
+fn stream_poll_fn() {
+ let mut counter = 5usize;
+
+ let read_stream = poll_fn(move || -> Poll<Option<usize>, std::io::Error> {
+ if counter == 0 {
+ return Ok(Async::Ready(None));
+ }
+ counter -= 1;
+ Ok(Async::Ready(Some(counter)))
+ });
+
+ assert_eq!(read_stream.wait().count(), 5);
+}
+
+#[test]
+fn inspect() {
+ let mut seen = vec![];
+ assert_done(|| list().inspect(|&a| seen.push(a)).collect(), Ok(vec![1, 2, 3]));
+ assert_eq!(seen, [1, 2, 3]);
+}
+
+#[test]
+fn inspect_err() {
+ let mut seen = vec![];
+ assert_done(|| err_list().inspect_err(|&a| seen.push(a)).collect(), Err(3));
+ assert_eq!(seen, [3]);
+}
diff --git a/third_party/rust/futures-0.1.31/tests/stream_catch_unwind.rs b/third_party/rust/futures-0.1.31/tests/stream_catch_unwind.rs
new file mode 100644
index 0000000000..a06748d09a
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/stream_catch_unwind.rs
@@ -0,0 +1,29 @@
+extern crate futures;
+
+use futures::stream;
+use futures::prelude::*;
+
+#[test]
+fn panic_in_the_middle_of_the_stream() {
+ let stream = stream::iter_ok::<_, bool>(vec![Some(10), None, Some(11)]);
+
+ // panic on second element
+ let stream_panicking = stream.map(|o| o.unwrap());
+ let mut iter = stream_panicking.catch_unwind().wait();
+
+ assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap());
+ assert!(iter.next().unwrap().is_err());
+ assert!(iter.next().is_none());
+}
+
+#[test]
+fn no_panic() {
+ let stream = stream::iter_ok::<_, bool>(vec![10, 11, 12]);
+
+ let mut iter = stream.catch_unwind().wait();
+
+ assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap());
+ assert_eq!(Ok(11), iter.next().unwrap().ok().unwrap());
+ assert_eq!(Ok(12), iter.next().unwrap().ok().unwrap());
+ assert!(iter.next().is_none());
+}
diff --git a/third_party/rust/futures-0.1.31/tests/support/local_executor.rs b/third_party/rust/futures-0.1.31/tests/support/local_executor.rs
new file mode 100644
index 0000000000..cf89e8152f
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/support/local_executor.rs
@@ -0,0 +1,164 @@
+//! Execution of futures on a single thread
+//!
+//! This module has no special handling of any blocking operations other than
+//! futures-aware inter-thread communications, and is not intended to be used to
+//! manage I/O. For futures that do I/O you'll likely want to use `tokio-core`.
+
+#![allow(bare_trait_objects, unknown_lints)]
+
+use std::cell::{Cell, RefCell};
+use std::sync::{Arc, Mutex, mpsc};
+
+use futures::executor::{self, Spawn, Notify};
+use futures::future::{Executor, ExecuteError};
+use futures::{Future, Async};
+
+/// Main loop object
+pub struct Core {
+ tx: mpsc::Sender<usize>,
+ rx: mpsc::Receiver<usize>,
+ notify: Arc<MyNotify>,
+
+ // Slab of running futures used to track what's running and what slots are
+ // empty. Slot indexes are then sent along tx/rx above to indicate which
+ // future is ready to get polled.
+ tasks: RefCell<Vec<Slot>>,
+ next_vacant: Cell<usize>,
+}
+
+enum Slot {
+ Vacant { next_vacant: usize },
+ Running(Option<Spawn<Box<Future<Item = (), Error = ()>>>>),
+}
+
+impl Core {
+ /// Create a new `Core`.
+ pub fn new() -> Self {
+ let (tx, rx) = mpsc::channel();
+ Core {
+ notify: Arc::new(MyNotify {
+ tx: Mutex::new(tx.clone()),
+ }),
+ tx: tx,
+ rx: rx,
+ next_vacant: Cell::new(0),
+ tasks: RefCell::new(Vec::new()),
+ }
+ }
+
+ /// Spawn a future to be executed by a future call to `run`.
+ ///
+ /// The future `f` provided will not be executed until `run` is called
+ /// below. While futures passed to `run` are executing, the future provided
+ /// here will be executed concurrently as well.
+ pub fn spawn<F>(&self, f: F)
+ where F: Future<Item=(), Error=()> + 'static
+ {
+ let idx = self.next_vacant.get();
+ let mut tasks = self.tasks.borrow_mut();
+ match tasks.get_mut(idx) {
+ Some(&mut Slot::Vacant { next_vacant }) => {
+ self.next_vacant.set(next_vacant);
+ }
+ Some(&mut Slot::Running (_)) => {
+ panic!("vacant points to running future")
+ }
+ None => {
+ assert_eq!(idx, tasks.len());
+ tasks.push(Slot::Vacant { next_vacant: 0 });
+ self.next_vacant.set(idx + 1);
+ }
+ }
+ tasks[idx] = Slot::Running(Some(executor::spawn(Box::new(f))));
+ self.tx.send(idx).unwrap();
+ }
+
+ /// Run the loop until the future `f` completes.
+ ///
+ /// This method will block the current thread until the future `f` has
+ /// resolved. While waiting on `f` to finish it will also execute any
+ /// futures spawned via `spawn` above.
+ pub fn run<F>(&self, f: F) -> Result<F::Item, F::Error>
+ where F: Future,
+ {
+ let id = usize::max_value();
+ self.tx.send(id).unwrap();
+ let mut f = executor::spawn(f);
+ loop {
+ if self.turn() {
+ match f.poll_future_notify(&self.notify, id)? {
+ Async::Ready(e) => return Ok(e),
+ Async::NotReady => {}
+ }
+ }
+ }
+ }
+
+ /// "Turns" this event loop one tick.
+ ///
+ /// This'll block the current thread until something happens, and once an
+ /// event happens this will act on that event.
+ ///
+ /// # Return value
+ ///
+ /// Returns `true` if the future passed to `run` should be polled or `false`
+ /// otherwise.
+ fn turn(&self) -> bool {
+ let task_id = self.rx.recv().unwrap();
+ if task_id == usize::max_value() {
+ return true
+ }
+
+ // This may be a spurious wakeup so we're not guaranteed to have a
+ // future associated with `task_id`, so do a fallible lookup.
+ //
+ // Note that we don't want to borrow `self.tasks` for too long so we
+ // try to extract the future here and leave behind a tombstone future
+ // which'll get replaced or removed later. This is how we support
+ // spawn-in-run.
+ let mut future = match self.tasks.borrow_mut().get_mut(task_id) {
+ Some(&mut Slot::Running(ref mut future)) => future.take().unwrap(),
+ Some(&mut Slot::Vacant { .. }) => return false,
+ None => return false,
+ };
+
+ // Drive this future forward. If it's done we remove it and if it's not
+ // done then we put it back in the tasks array.
+ let done = match future.poll_future_notify(&self.notify, task_id) {
+ Ok(Async::Ready(())) | Err(()) => true,
+ Ok(Async::NotReady) => false,
+ };
+ let mut tasks = self.tasks.borrow_mut();
+ if done {
+ tasks[task_id] = Slot::Vacant { next_vacant: self.next_vacant.get() };
+ self.next_vacant.set(task_id);
+ } else {
+ tasks[task_id] = Slot::Running(Some(future));
+ }
+
+ return false
+ }
+}
+
+impl<F> Executor<F> for Core
+ where F: Future<Item = (), Error = ()> + 'static,
+{
+ fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
+ self.spawn(future);
+ Ok(())
+ }
+}
+
+struct MyNotify {
+ // TODO: it's pretty unfortunate to use a `Mutex` here where the `Sender`
+ // itself is basically `Sync` as-is. Ideally this'd use something like
+ // an off-the-shelf mpsc queue as well as `thread::park` and
+ // `Thread::unpark`.
+ tx: Mutex<mpsc::Sender<usize>>,
+}
+
+impl Notify for MyNotify {
+ fn notify(&self, id: usize) {
+ drop(self.tx.lock().unwrap().send(id));
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/tests/support/mod.rs b/third_party/rust/futures-0.1.31/tests/support/mod.rs
new file mode 100644
index 0000000000..297749777a
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/support/mod.rs
@@ -0,0 +1,134 @@
+#![allow(dead_code)]
+
+use std::fmt;
+use std::sync::Arc;
+use std::thread;
+
+use futures::{Future, IntoFuture, Async, Poll};
+use futures::future::FutureResult;
+use futures::stream::Stream;
+use futures::executor::{self, NotifyHandle, Notify};
+use futures::task;
+
+pub mod local_executor;
+
+pub fn f_ok(a: i32) -> FutureResult<i32, u32> { Ok(a).into_future() }
+pub fn f_err(a: u32) -> FutureResult<i32, u32> { Err(a).into_future() }
+pub fn r_ok(a: i32) -> Result<i32, u32> { Ok(a) }
+pub fn r_err(a: u32) -> Result<i32, u32> { Err(a) }
+
+pub fn assert_done<T, F>(f: F, result: Result<T::Item, T::Error>)
+ where T: Future,
+ T::Item: Eq + fmt::Debug,
+ T::Error: Eq + fmt::Debug,
+ F: FnOnce() -> T,
+{
+ assert_eq!(f().wait(), result);
+}
+
+pub fn assert_empty<T: Future, F: FnMut() -> T>(mut f: F) {
+ assert!(executor::spawn(f()).poll_future_notify(&notify_panic(), 0).ok().unwrap().is_not_ready());
+}
+
+pub fn sassert_done<S: Stream>(s: &mut S) {
+ match executor::spawn(s).poll_stream_notify(&notify_panic(), 0) {
+ Ok(Async::Ready(None)) => {}
+ Ok(Async::Ready(Some(_))) => panic!("stream had more elements"),
+ Ok(Async::NotReady) => panic!("stream wasn't ready"),
+ Err(_) => panic!("stream had an error"),
+ }
+}
+
+pub fn sassert_empty<S: Stream>(s: &mut S) {
+ match executor::spawn(s).poll_stream_notify(&notify_noop(), 0) {
+ Ok(Async::Ready(None)) => panic!("stream is at its end"),
+ Ok(Async::Ready(Some(_))) => panic!("stream had more elements"),
+ Ok(Async::NotReady) => {}
+ Err(_) => panic!("stream had an error"),
+ }
+}
+
+pub fn sassert_next<S: Stream>(s: &mut S, item: S::Item)
+ where S::Item: Eq + fmt::Debug
+{
+ match executor::spawn(s).poll_stream_notify(&notify_panic(), 0) {
+ Ok(Async::Ready(None)) => panic!("stream is at its end"),
+ Ok(Async::Ready(Some(e))) => assert_eq!(e, item),
+ Ok(Async::NotReady) => panic!("stream wasn't ready"),
+ Err(_) => panic!("stream had an error"),
+ }
+}
+
+pub fn sassert_err<S: Stream>(s: &mut S, err: S::Error)
+ where S::Error: Eq + fmt::Debug
+{
+ match executor::spawn(s).poll_stream_notify(&notify_panic(), 0) {
+ Ok(Async::Ready(None)) => panic!("stream is at its end"),
+ Ok(Async::Ready(Some(_))) => panic!("stream had more elements"),
+ Ok(Async::NotReady) => panic!("stream wasn't ready"),
+ Err(e) => assert_eq!(e, err),
+ }
+}
+
+pub fn notify_panic() -> NotifyHandle {
+ struct Foo;
+
+ impl Notify for Foo {
+ fn notify(&self, _id: usize) {
+ panic!("should not be notified");
+ }
+ }
+
+ NotifyHandle::from(Arc::new(Foo))
+}
+
+pub fn notify_noop() -> NotifyHandle {
+ struct Noop;
+
+ impl Notify for Noop {
+ fn notify(&self, _id: usize) {}
+ }
+
+ const NOOP : &'static Noop = &Noop;
+
+ NotifyHandle::from(NOOP)
+}
+
+pub trait ForgetExt {
+ fn forget(self);
+}
+
+impl<F> ForgetExt for F
+ where F: Future + Sized + Send + 'static,
+ F::Item: Send,
+ F::Error: Send
+{
+ fn forget(self) {
+ thread::spawn(|| self.wait());
+ }
+}
+
+pub struct DelayFuture<F>(F,bool);
+
+impl<F: Future> Future for DelayFuture<F> {
+ type Item = F::Item;
+ type Error = F::Error;
+
+ fn poll(&mut self) -> Poll<F::Item,F::Error> {
+ if self.1 {
+ self.0.poll()
+ } else {
+ self.1 = true;
+ task::current().notify();
+ Ok(Async::NotReady)
+ }
+ }
+}
+
+/// Introduces one `Ok(Async::NotReady)` before polling the given future
+pub fn delay_future<F>(f: F) -> DelayFuture<F::Future>
+ where F: IntoFuture,
+{
+ DelayFuture(f.into_future(), false)
+}
+
diff --git a/third_party/rust/futures-0.1.31/tests/unfold.rs b/third_party/rust/futures-0.1.31/tests/unfold.rs
new file mode 100644
index 0000000000..1669a18aa5
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/unfold.rs
@@ -0,0 +1,52 @@
+extern crate futures;
+
+mod support;
+
+use futures::stream;
+
+use support::*;
+
+#[test]
+fn unfold1() {
+ let mut stream = stream::unfold(0, |state| {
+ if state <= 2 {
+ let res: Result<_,()> = Ok((state * 2, state + 1));
+ Some(delay_future(res))
+ } else {
+ None
+ }
+ });
+ // Creates the future with the closure
+ // Not ready (delayed future)
+ sassert_empty(&mut stream);
+ // future is ready, yields the item
+ sassert_next(&mut stream, 0);
+
+ // Repeat
+ sassert_empty(&mut stream);
+ sassert_next(&mut stream, 2);
+
+ sassert_empty(&mut stream);
+ sassert_next(&mut stream, 4);
+
+ // no more items
+ sassert_done(&mut stream);
+}
+
+#[test]
+fn unfold_err1() {
+ let mut stream = stream::unfold(0, |state| {
+ if state <= 2 {
+ Some(Ok((state * 2, state + 1)))
+ } else {
+ Some(Err(-1))
+ }
+ });
+ sassert_next(&mut stream, 0);
+ sassert_next(&mut stream, 2);
+ sassert_next(&mut stream, 4);
+ sassert_err(&mut stream, -1);
+
+ // An error was generated by the stream, it will then finish
+ sassert_done(&mut stream);
+}
diff --git a/third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs b/third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs
new file mode 100644
index 0000000000..55b0ca5ac2
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/unsync-oneshot.rs
@@ -0,0 +1,189 @@
+extern crate futures;
+
+use futures::prelude::*;
+use futures::future;
+use futures::unsync::oneshot::{channel, Canceled, spawn};
+
+mod support;
+use support::local_executor;
+
+#[test]
+fn smoke() {
+ let (tx, rx) = channel();
+ tx.send(33).unwrap();
+ assert_eq!(rx.wait().unwrap(), 33);
+}
+
+#[test]
+fn canceled() {
+ let (_, rx) = channel::<()>();
+ assert_eq!(rx.wait().unwrap_err(), Canceled);
+}
+
+#[test]
+fn poll_cancel() {
+ let (mut tx, _) = channel::<()>();
+ assert!(tx.poll_cancel().unwrap().is_ready());
+}
+
+#[test]
+fn tx_complete_rx_unparked() {
+ let (tx, rx) = channel();
+
+ let res = rx.join(future::lazy(move || {
+ tx.send(55).unwrap();
+ Ok(11)
+ }));
+ assert_eq!(res.wait().unwrap(), (55, 11));
+}
+
+#[test]
+fn tx_dropped_rx_unparked() {
+ let (tx, rx) = channel::<i32>();
+
+ let res = rx.join(future::lazy(move || {
+ let _tx = tx;
+ Ok(11)
+ }));
+ assert_eq!(res.wait().unwrap_err(), Canceled);
+}
+
+
+#[test]
+fn is_canceled() {
+ let (tx, rx) = channel::<u32>();
+ assert!(!tx.is_canceled());
+ drop(rx);
+ assert!(tx.is_canceled());
+}
+
+#[test]
+fn spawn_sends_items() {
+ let core = local_executor::Core::new();
+ let future = future::ok::<_, ()>(1);
+ let rx = spawn(future, &core);
+ assert_eq!(core.run(rx).unwrap(), 1);
+}
+
+#[test]
+fn spawn_kill_dead_stream() {
+ use std::thread;
+ use std::time::Duration;
+ use futures::future::Either;
+ use futures::sync::oneshot;
+
+ // a future which never returns anything (forever accepting incoming
+ // connections), but dropping it leads to observable side effects
+ // (like closing listening sockets, releasing limited resources,
+ // ...)
+ #[derive(Debug)]
+ struct Dead {
+ // when dropped you should get Err(oneshot::Canceled) on the
+ // receiving end
+ done: oneshot::Sender<()>,
+ }
+ impl Future for Dead {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ Ok(Async::NotReady)
+ }
+ }
+
+ // need to implement a timeout for the test, as it would hang
+ // forever right now
+ let (timeout_tx, timeout_rx) = oneshot::channel();
+ thread::spawn(move || {
+ thread::sleep(Duration::from_millis(1000));
+ let _ = timeout_tx.send(());
+ });
+
+ let core = local_executor::Core::new();
+ let (done_tx, done_rx) = oneshot::channel();
+ let future = Dead{done: done_tx};
+ let rx = spawn(future, &core);
+ let res = core.run(
+ Ok::<_, ()>(())
+ .into_future()
+ .then(move |_| {
+ // now drop the spawned future: maybe some timeout exceeded,
+ // or some connection on this end was closed by the remote
+ // end.
+ drop(rx);
+ // and wait for the spawned future to release its resources
+ done_rx
+ })
+ .select2(timeout_rx)
+ );
+ match res {
+ Err(Either::A((oneshot::Canceled, _))) => (),
+ Ok(Either::B(((), _))) => {
+ panic!("dead future wasn't canceled (timeout)");
+ },
+ _ => {
+ panic!("dead future wasn't canceled (unexpected result)");
+ },
+ }
+}
+
+#[test]
+fn spawn_dont_kill_forgot_dead_stream() {
+ use std::thread;
+ use std::time::Duration;
+ use futures::future::Either;
+ use futures::sync::oneshot;
+
+ // a future which never returns anything (forever accepting incoming
+ // connections), but dropping it leads to observable side effects
+ // (like closing listening sockets, releasing limited resources,
+ // ...)
+ #[derive(Debug)]
+ struct Dead {
+ // when dropped you should get Err(oneshot::Canceled) on the
+ // receiving end
+ done: oneshot::Sender<()>,
+ }
+ impl Future for Dead {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ Ok(Async::NotReady)
+ }
+ }
+
+ // need to implement a timeout for the test, as it would hang
+ // forever right now
+ let (timeout_tx, timeout_rx) = oneshot::channel();
+ thread::spawn(move || {
+ thread::sleep(Duration::from_millis(1000));
+ let _ = timeout_tx.send(());
+ });
+
+ let core = local_executor::Core::new();
+ let (done_tx, done_rx) = oneshot::channel();
+ let future = Dead{done: done_tx};
+ let rx = spawn(future, &core);
+ let res = core.run(
+ Ok::<_, ()>(())
+ .into_future()
+ .then(move |_| {
+ // forget the spawned future: should keep running, i.e. hit
+ // the timeout below.
+ rx.forget();
+ // and wait for the spawned future to release its resources
+ done_rx
+ })
+ .select2(timeout_rx)
+ );
+ match res {
+ Err(Either::A((oneshot::Canceled, _))) => {
+ panic!("forgotten dead future was canceled");
+ },
+ Ok(Either::B(((), _))) => (), // reached timeout
+ _ => {
+ panic!("forgotten dead future was canceled (unexpected result)");
+ },
+ }
+}
diff --git a/third_party/rust/futures-0.1.31/tests/unsync.rs b/third_party/rust/futures-0.1.31/tests/unsync.rs
new file mode 100644
index 0000000000..490db0af1c
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/tests/unsync.rs
@@ -0,0 +1,266 @@
+#![cfg(feature = "use_std")]
+#![allow(bare_trait_objects, unknown_lints)]
+
+extern crate futures;
+
+mod support;
+
+use futures::prelude::*;
+use futures::unsync::oneshot;
+use futures::unsync::mpsc::{self, SendError};
+use futures::future::lazy;
+use futures::stream::{iter_ok, unfold};
+
+use support::local_executor::Core;
+
+#[test]
+fn mpsc_send_recv() {
+ let (tx, rx) = mpsc::channel::<i32>(1);
+ let mut rx = rx.wait();
+
+ tx.send(42).wait().unwrap();
+
+ assert_eq!(rx.next(), Some(Ok(42)));
+ assert_eq!(rx.next(), None);
+}
+
+#[test]
+fn mpsc_rx_notready() {
+ let (_tx, mut rx) = mpsc::channel::<i32>(1);
+
+ lazy(|| {
+ assert_eq!(rx.poll().unwrap(), Async::NotReady);
+ Ok(()) as Result<(), ()>
+ }).wait().unwrap();
+}
+
+#[test]
+fn mpsc_rx_end() {
+ let (_, mut rx) = mpsc::channel::<i32>(1);
+
+ lazy(|| {
+ assert_eq!(rx.poll().unwrap(), Async::Ready(None));
+ Ok(()) as Result<(), ()>
+ }).wait().unwrap();
+}
+
+#[test]
+fn mpsc_tx_clone_weak_rc() {
+ let (tx, mut rx) = mpsc::channel::<i32>(1); // rc = 1
+
+ let tx_clone = tx.clone(); // rc = 2
+ lazy(|| {
+ assert_eq!(rx.poll().unwrap(), Async::NotReady);
+ Ok(()) as Result<(), ()>
+ }).wait().unwrap();
+
+ drop(tx); // rc = 1
+ lazy(|| {
+ assert_eq!(rx.poll().unwrap(), Async::NotReady);
+ Ok(()) as Result<(), ()>
+ }).wait().unwrap();
+
+ drop(tx_clone); // rc = 0
+ lazy(|| {
+ assert_eq!(rx.poll().unwrap(), Async::Ready(None));
+ Ok(()) as Result<(), ()>
+ }).wait().unwrap();
+}
+
+#[test]
+fn mpsc_tx_notready() {
+ let (tx, _rx) = mpsc::channel::<i32>(1);
+ let tx = tx.send(1).wait().unwrap();
+ lazy(move || {
+ assert!(tx.send(2).poll().unwrap().is_not_ready());
+ Ok(()) as Result<(), ()>
+ }).wait().unwrap();
+}
+
+#[test]
+fn mpsc_tx_err() {
+ let (tx, _) = mpsc::channel::<i32>(1);
+ lazy(move || {
+ assert!(tx.send(2).poll().is_err());
+ Ok(()) as Result<(), ()>
+ }).wait().unwrap();
+}
+
+#[test]
+fn mpsc_backpressure() {
+ let (tx, rx) = mpsc::channel::<i32>(1);
+ lazy(move || {
+ iter_ok(vec![1, 2, 3])
+ .forward(tx)
+ .map_err(|e: SendError<i32>| panic!("{}", e))
+ .join(rx.take(3).collect().map(|xs| {
+ assert_eq!(xs, [1, 2, 3]);
+ }))
+ }).wait().unwrap();
+}
+
+#[test]
+fn mpsc_unbounded() {
+ let (tx, rx) = mpsc::unbounded::<i32>();
+ lazy(move || {
+ iter_ok(vec![1, 2, 3])
+ .forward(tx)
+ .map_err(|e: SendError<i32>| panic!("{}", e))
+ .join(rx.take(3).collect().map(|xs| {
+ assert_eq!(xs, [1, 2, 3]);
+ }))
+ }).wait().unwrap();
+}
+
+#[test]
+fn mpsc_recv_unpark() {
+ let core = Core::new();
+ let (tx, rx) = mpsc::channel::<i32>(1);
+ let tx2 = tx.clone();
+ core.spawn(rx.collect().map(|xs| assert_eq!(xs, [1, 2])));
+ core.spawn(lazy(move || tx.send(1).map(|_| ()).map_err(|e| panic!("{}", e))));
+ core.run(lazy(move || tx2.send(2))).unwrap();
+}
+
+#[test]
+fn mpsc_send_unpark() {
+ let core = Core::new();
+ let (tx, rx) = mpsc::channel::<i32>(1);
+ let (donetx, donerx) = oneshot::channel();
+ core.spawn(iter_ok(vec![1, 2]).forward(tx)
+ .then(|x: Result<_, SendError<i32>>| {
+ assert!(x.is_err());
+ donetx.send(()).unwrap();
+ Ok(())
+ }));
+ core.spawn(lazy(move || { let _ = rx; Ok(()) }));
+ core.run(donerx).unwrap();
+}
+
+#[test]
+fn spawn_sends_items() {
+ let core = Core::new();
+ let stream = unfold(0, |i| Some(Ok::<_,u8>((i, i + 1))));
+ let rx = mpsc::spawn(stream, &core, 1);
+ assert_eq!(core.run(rx.take(4).collect()).unwrap(),
+ [0, 1, 2, 3]);
+}
+
+#[test]
+fn spawn_kill_dead_stream() {
+ use std::thread;
+ use std::time::Duration;
+ use futures::future::Either;
+
+ // a stream which never returns anything (maybe a remote end isn't
+ // responding), but dropping it leads to observable side effects
+ // (like closing connections, releasing limited resources, ...)
+ #[derive(Debug)]
+ struct Dead {
+ // when dropped you should get Err(oneshot::Canceled) on the
+ // receiving end
+ done: oneshot::Sender<()>,
+ }
+ impl Stream for Dead {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ Ok(Async::NotReady)
+ }
+ }
+
+ // need to implement a timeout for the test, as it would hang
+ // forever right now
+ let (timeout_tx, timeout_rx) = futures::sync::oneshot::channel();
+ thread::spawn(move || {
+ thread::sleep(Duration::from_millis(1000));
+ let _ = timeout_tx.send(());
+ });
+
+ let core = Core::new();
+ let (done_tx, done_rx) = oneshot::channel();
+ let stream = Dead{done: done_tx};
+ let rx = mpsc::spawn(stream, &core, 1);
+ let res = core.run(
+ Ok::<_, ()>(())
+ .into_future()
+ .then(move |_| {
+ // now drop the spawned stream: maybe some timeout exceeded,
+ // or some connection on this end was closed by the remote
+ // end.
+ drop(rx);
+ // and wait for the spawned stream to release its resources
+ done_rx
+ })
+ .select2(timeout_rx)
+ );
+ match res {
+ Err(Either::A((oneshot::Canceled, _))) => (),
+ _ => {
+ panic!("dead stream wasn't canceled");
+ },
+ }
+}
+
+
+/// Test case for PR #768 (issue #766).
+/// The issue was:
+/// Given that an empty channel is polled by the Receiver, and the only Sender
+/// gets dropped without sending anything, then the Receiver would get stuck.
+
+#[test]
+fn dropped_sender_of_unused_channel_notifies_receiver() {
+ let core = Core::new();
+ type FUTURE = Box<futures::Future<Item=u8, Error=()>>;
+
+ // Constructs the channel which we want to test, and two futures which
+ // act on that channel.
+ let pair = |reverse| -> Vec<FUTURE> {
+ // This is the channel which we want to test.
+ let (tx, rx) = mpsc::channel::<u8>(1);
+ let mut futures: Vec<FUTURE> = vec![
+ Box::new(futures::stream::iter_ok(vec![])
+ .forward(tx)
+ .map_err(|_: mpsc::SendError<u8>| ())
+ .map(|_| 42)
+ ),
+ Box::new(rx.fold((), |_, _| Ok(()))
+ .map(|_| 24)
+ ),
+ ];
+ if reverse {
+ futures.reverse();
+ }
+ futures
+ };
+
+ let make_test_future = |reverse| -> Box<Future<Item=Vec<u8>, Error=()>> {
+ let f = futures::future::join_all(pair(reverse));
+
+ // Use a timeout. This is not meant to test the `sync::oneshot` but
+ // merely uses it to implement this timeout.
+ let (timeout_tx, timeout_rx) = futures::sync::oneshot::channel::<Vec<u8>>();
+ std::thread::spawn(move || {
+ std::thread::sleep(std::time::Duration::from_millis(1000));
+ let x = timeout_tx.send(vec![0]);
+ assert!(x.is_err(), "Test timed out.");
+ });
+
+ Box::new(f.select(timeout_rx.map_err(|_|()))
+ .map_err(|x| x.0)
+ .map(|x| x.0)
+ )
+ };
+
+ // The order of the tested futures is important to test fix of PR #768.
+ // We want future_2 to poll on the Receiver before the Sender is dropped.
+ let result = core.run(make_test_future(false));
+ assert!(result.is_ok());
+ assert_eq!(vec![42, 24], result.unwrap());
+
+ // Test also the other ordering:
+ let result = core.run(make_test_future(true));
+ assert!(result.is_ok());
+ assert_eq!(vec![24, 42], result.unwrap());
+}