summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/src/stream/unfold.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/stream/unfold.rs')
-rw-r--r--third_party/rust/futures-0.1.31/src/stream/unfold.rs114
1 files changed, 114 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/stream/unfold.rs b/third_party/rust/futures-0.1.31/src/stream/unfold.rs
new file mode 100644
index 0000000000..ac427b8c3b
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/stream/unfold.rs
@@ -0,0 +1,114 @@
+use core::mem;
+
+use {Future, IntoFuture, Async, Poll};
+use stream::Stream;
+
+/// Creates a `Stream` from a seed and a closure returning a `Future`.
+///
+/// This function is the dual for the `Stream::fold()` adapter: while
+/// `Stream::fold()` reduces a `Stream` to one single value, `unfold()` creates a
+/// `Stream` from a seed value.
+///
+/// `unfold()` will call the provided closure with the provided seed, then wait
+/// for the returned `Future` to complete with `(a, b)`. It will then yield the
+/// value `a`, and use `b` as the next internal state.
+///
+/// If the closure returns `None` instead of `Some(Future)`, then the `unfold()`
+/// will stop producing items and return `Ok(Async::Ready(None))` in future
+/// calls to `poll()`.
+///
+/// In case of error generated by the returned `Future`, the error will be
+/// returned by the `Stream`. The `Stream` will then yield
+/// `Ok(Async::Ready(None))` in future calls to `poll()`.
+///
+/// This function can typically be used when wanting to go from the "world of
+/// futures" to the "world of streams": the provided closure can build a
+/// `Future` using other library functions working on futures, and `unfold()`
+/// will turn it into a `Stream` by repeating the operation.
+///
+/// # Example
+///
+/// ```rust
+/// use futures::stream::{self, Stream};
+/// use futures::future::{self, Future};
+///
+/// let mut stream = stream::unfold(0, |state| {
+/// if state <= 2 {
+/// let next_state = state + 1;
+/// let yielded = state * 2;
+/// let fut = future::ok::<_, u32>((yielded, next_state));
+/// Some(fut)
+/// } else {
+/// None
+/// }
+/// });
+///
+/// let result = stream.collect().wait();
+/// assert_eq!(result, Ok(vec![0, 2, 4]));
+/// ```
+pub fn unfold<T, F, Fut, It>(init: T, f: F) -> Unfold<T, F, Fut>
+ where F: FnMut(T) -> Option<Fut>,
+ Fut: IntoFuture<Item = (It, T)>,
+{
+ Unfold {
+ f: f,
+ state: State::Ready(init),
+ }
+}
+
+/// A stream which creates futures, polls them and return their result
+///
+/// This stream is returned by the `futures::stream::unfold` method
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Unfold<T, F, Fut> where Fut: IntoFuture {
+ f: F,
+ state: State<T, Fut::Future>,
+}
+
+impl <T, F, Fut, It> Stream for Unfold<T, F, Fut>
+ where F: FnMut(T) -> Option<Fut>,
+ Fut: IntoFuture<Item = (It, T)>,
+{
+ type Item = It;
+ type Error = Fut::Error;
+
+ fn poll(&mut self) -> Poll<Option<It>, Fut::Error> {
+ loop {
+ match mem::replace(&mut self.state, State::Empty) {
+ // State::Empty may happen if the future returned an error
+ State::Empty => { return Ok(Async::Ready(None)); }
+ State::Ready(state) => {
+ match (self.f)(state) {
+ Some(fut) => { self.state = State::Processing(fut.into_future()); }
+ None => { return Ok(Async::Ready(None)); }
+ }
+ }
+ State::Processing(mut fut) => {
+ match fut.poll()? {
+ Async:: Ready((item, next_state)) => {
+ self.state = State::Ready(next_state);
+ return Ok(Async::Ready(Some(item)));
+ }
+ Async::NotReady => {
+ self.state = State::Processing(fut);
+ return Ok(Async::NotReady);
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+#[derive(Debug)]
+enum State<T, F> where F: Future {
+ /// Placeholder state when doing work, or when the returned Future generated an error
+ Empty,
+
+ /// Ready to generate new future; current internal state is the `T`
+ Ready(T),
+
+ /// Working on a future generated previously
+ Processing(F),
+}