1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
#![allow(dead_code)]
use std::fmt;
use std::sync::Arc;
use std::thread;
use futures::{Future, IntoFuture, Async, Poll};
use futures::future::FutureResult;
use futures::stream::Stream;
use futures::executor::{self, NotifyHandle, Notify};
use futures::task;
pub mod local_executor;
pub fn f_ok(a: i32) -> FutureResult<i32, u32> { Ok(a).into_future() }
pub fn f_err(a: u32) -> FutureResult<i32, u32> { Err(a).into_future() }
pub fn r_ok(a: i32) -> Result<i32, u32> { Ok(a) }
pub fn r_err(a: u32) -> Result<i32, u32> { Err(a) }
pub fn assert_done<T, F>(f: F, result: Result<T::Item, T::Error>)
where T: Future,
T::Item: Eq + fmt::Debug,
T::Error: Eq + fmt::Debug,
F: FnOnce() -> T,
{
assert_eq!(f().wait(), result);
}
pub fn assert_empty<T: Future, F: FnMut() -> T>(mut f: F) {
assert!(executor::spawn(f()).poll_future_notify(¬ify_panic(), 0).ok().unwrap().is_not_ready());
}
pub fn sassert_done<S: Stream>(s: &mut S) {
match executor::spawn(s).poll_stream_notify(¬ify_panic(), 0) {
Ok(Async::Ready(None)) => {}
Ok(Async::Ready(Some(_))) => panic!("stream had more elements"),
Ok(Async::NotReady) => panic!("stream wasn't ready"),
Err(_) => panic!("stream had an error"),
}
}
pub fn sassert_empty<S: Stream>(s: &mut S) {
match executor::spawn(s).poll_stream_notify(¬ify_noop(), 0) {
Ok(Async::Ready(None)) => panic!("stream is at its end"),
Ok(Async::Ready(Some(_))) => panic!("stream had more elements"),
Ok(Async::NotReady) => {}
Err(_) => panic!("stream had an error"),
}
}
pub fn sassert_next<S: Stream>(s: &mut S, item: S::Item)
where S::Item: Eq + fmt::Debug
{
match executor::spawn(s).poll_stream_notify(¬ify_panic(), 0) {
Ok(Async::Ready(None)) => panic!("stream is at its end"),
Ok(Async::Ready(Some(e))) => assert_eq!(e, item),
Ok(Async::NotReady) => panic!("stream wasn't ready"),
Err(_) => panic!("stream had an error"),
}
}
pub fn sassert_err<S: Stream>(s: &mut S, err: S::Error)
where S::Error: Eq + fmt::Debug
{
match executor::spawn(s).poll_stream_notify(¬ify_panic(), 0) {
Ok(Async::Ready(None)) => panic!("stream is at its end"),
Ok(Async::Ready(Some(_))) => panic!("stream had more elements"),
Ok(Async::NotReady) => panic!("stream wasn't ready"),
Err(e) => assert_eq!(e, err),
}
}
pub fn notify_panic() -> NotifyHandle {
struct Foo;
impl Notify for Foo {
fn notify(&self, _id: usize) {
panic!("should not be notified");
}
}
NotifyHandle::from(Arc::new(Foo))
}
pub fn notify_noop() -> NotifyHandle {
struct Noop;
impl Notify for Noop {
fn notify(&self, _id: usize) {}
}
const NOOP : &'static Noop = &Noop;
NotifyHandle::from(NOOP)
}
pub trait ForgetExt {
fn forget(self);
}
impl<F> ForgetExt for F
where F: Future + Sized + Send + 'static,
F::Item: Send,
F::Error: Send
{
fn forget(self) {
thread::spawn(|| self.wait());
}
}
pub struct DelayFuture<F>(F,bool);
impl<F: Future> Future for DelayFuture<F> {
type Item = F::Item;
type Error = F::Error;
fn poll(&mut self) -> Poll<F::Item,F::Error> {
if self.1 {
self.0.poll()
} else {
self.1 = true;
task::current().notify();
Ok(Async::NotReady)
}
}
}
/// Introduces one `Ok(Async::NotReady)` before polling the given future
pub fn delay_future<F>(f: F) -> DelayFuture<F::Future>
where F: IntoFuture,
{
DelayFuture(f.into_future(), false)
}
|