#![allow(dead_code)] use std::fmt; use std::sync::Arc; use std::thread; use futures::{Future, IntoFuture, Async, Poll}; use futures::future::FutureResult; use futures::stream::Stream; use futures::executor::{self, NotifyHandle, Notify}; use futures::task; pub mod local_executor; pub fn f_ok(a: i32) -> FutureResult { Ok(a).into_future() } pub fn f_err(a: u32) -> FutureResult { Err(a).into_future() } pub fn r_ok(a: i32) -> Result { Ok(a) } pub fn r_err(a: u32) -> Result { Err(a) } pub fn assert_done(f: F, result: Result) where T: Future, T::Item: Eq + fmt::Debug, T::Error: Eq + fmt::Debug, F: FnOnce() -> T, { assert_eq!(f().wait(), result); } pub fn assert_empty T>(mut f: F) { assert!(executor::spawn(f()).poll_future_notify(¬ify_panic(), 0).ok().unwrap().is_not_ready()); } pub fn sassert_done(s: &mut S) { match executor::spawn(s).poll_stream_notify(¬ify_panic(), 0) { Ok(Async::Ready(None)) => {} Ok(Async::Ready(Some(_))) => panic!("stream had more elements"), Ok(Async::NotReady) => panic!("stream wasn't ready"), Err(_) => panic!("stream had an error"), } } pub fn sassert_empty(s: &mut S) { match executor::spawn(s).poll_stream_notify(¬ify_noop(), 0) { Ok(Async::Ready(None)) => panic!("stream is at its end"), Ok(Async::Ready(Some(_))) => panic!("stream had more elements"), Ok(Async::NotReady) => {} Err(_) => panic!("stream had an error"), } } pub fn sassert_next(s: &mut S, item: S::Item) where S::Item: Eq + fmt::Debug { match executor::spawn(s).poll_stream_notify(¬ify_panic(), 0) { Ok(Async::Ready(None)) => panic!("stream is at its end"), Ok(Async::Ready(Some(e))) => assert_eq!(e, item), Ok(Async::NotReady) => panic!("stream wasn't ready"), Err(_) => panic!("stream had an error"), } } pub fn sassert_err(s: &mut S, err: S::Error) where S::Error: Eq + fmt::Debug { match executor::spawn(s).poll_stream_notify(¬ify_panic(), 0) { Ok(Async::Ready(None)) => panic!("stream is at its end"), Ok(Async::Ready(Some(_))) => panic!("stream had more elements"), Ok(Async::NotReady) => panic!("stream wasn't ready"), Err(e) => assert_eq!(e, err), } } pub fn notify_panic() -> NotifyHandle { struct Foo; impl Notify for Foo { fn notify(&self, _id: usize) { panic!("should not be notified"); } } NotifyHandle::from(Arc::new(Foo)) } pub fn notify_noop() -> NotifyHandle { struct Noop; impl Notify for Noop { fn notify(&self, _id: usize) {} } const NOOP : &'static Noop = &Noop; NotifyHandle::from(NOOP) } pub trait ForgetExt { fn forget(self); } impl ForgetExt for F where F: Future + Sized + Send + 'static, F::Item: Send, F::Error: Send { fn forget(self) { thread::spawn(|| self.wait()); } } pub struct DelayFuture(F,bool); impl Future for DelayFuture { type Item = F::Item; type Error = F::Error; fn poll(&mut self) -> Poll { if self.1 { self.0.poll() } else { self.1 = true; task::current().notify(); Ok(Async::NotReady) } } } /// Introduces one `Ok(Async::NotReady)` before polling the given future pub fn delay_future(f: F) -> DelayFuture where F: IntoFuture, { DelayFuture(f.into_future(), false) }