diff options
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/stream/unfold.rs')
-rw-r--r-- | third_party/rust/futures-0.1.31/src/stream/unfold.rs | 114 |
1 files changed, 114 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/stream/unfold.rs b/third_party/rust/futures-0.1.31/src/stream/unfold.rs new file mode 100644 index 0000000000..ac427b8c3b --- /dev/null +++ b/third_party/rust/futures-0.1.31/src/stream/unfold.rs @@ -0,0 +1,114 @@ +use core::mem; + +use {Future, IntoFuture, Async, Poll}; +use stream::Stream; + +/// Creates a `Stream` from a seed and a closure returning a `Future`. +/// +/// This function is the dual for the `Stream::fold()` adapter: while +/// `Stream::fold()` reduces a `Stream` to one single value, `unfold()` creates a +/// `Stream` from a seed value. +/// +/// `unfold()` will call the provided closure with the provided seed, then wait +/// for the returned `Future` to complete with `(a, b)`. It will then yield the +/// value `a`, and use `b` as the next internal state. +/// +/// If the closure returns `None` instead of `Some(Future)`, then the `unfold()` +/// will stop producing items and return `Ok(Async::Ready(None))` in future +/// calls to `poll()`. +/// +/// In case of error generated by the returned `Future`, the error will be +/// returned by the `Stream`. The `Stream` will then yield +/// `Ok(Async::Ready(None))` in future calls to `poll()`. +/// +/// This function can typically be used when wanting to go from the "world of +/// futures" to the "world of streams": the provided closure can build a +/// `Future` using other library functions working on futures, and `unfold()` +/// will turn it into a `Stream` by repeating the operation. +/// +/// # Example +/// +/// ```rust +/// use futures::stream::{self, Stream}; +/// use futures::future::{self, Future}; +/// +/// let mut stream = stream::unfold(0, |state| { +/// if state <= 2 { +/// let next_state = state + 1; +/// let yielded = state * 2; +/// let fut = future::ok::<_, u32>((yielded, next_state)); +/// Some(fut) +/// } else { +/// None +/// } +/// }); +/// +/// let result = stream.collect().wait(); +/// assert_eq!(result, Ok(vec![0, 2, 4])); +/// ``` +pub fn unfold<T, F, Fut, It>(init: T, f: F) -> Unfold<T, F, Fut> + where F: FnMut(T) -> Option<Fut>, + Fut: IntoFuture<Item = (It, T)>, +{ + Unfold { + f: f, + state: State::Ready(init), + } +} + +/// A stream which creates futures, polls them and return their result +/// +/// This stream is returned by the `futures::stream::unfold` method +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Unfold<T, F, Fut> where Fut: IntoFuture { + f: F, + state: State<T, Fut::Future>, +} + +impl <T, F, Fut, It> Stream for Unfold<T, F, Fut> + where F: FnMut(T) -> Option<Fut>, + Fut: IntoFuture<Item = (It, T)>, +{ + type Item = It; + type Error = Fut::Error; + + fn poll(&mut self) -> Poll<Option<It>, Fut::Error> { + loop { + match mem::replace(&mut self.state, State::Empty) { + // State::Empty may happen if the future returned an error + State::Empty => { return Ok(Async::Ready(None)); } + State::Ready(state) => { + match (self.f)(state) { + Some(fut) => { self.state = State::Processing(fut.into_future()); } + None => { return Ok(Async::Ready(None)); } + } + } + State::Processing(mut fut) => { + match fut.poll()? { + Async:: Ready((item, next_state)) => { + self.state = State::Ready(next_state); + return Ok(Async::Ready(Some(item))); + } + Async::NotReady => { + self.state = State::Processing(fut); + return Ok(Async::NotReady); + } + } + } + } + } + } +} + +#[derive(Debug)] +enum State<T, F> where F: Future { + /// Placeholder state when doing work, or when the returned Future generated an error + Empty, + + /// Ready to generate new future; current internal state is the `T` + Ready(T), + + /// Working on a future generated previously + Processing(F), +} |