summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-0.1.31/src/future/flatten_stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-0.1.31/src/future/flatten_stream.rs')
-rw-r--r--third_party/rust/futures-0.1.31/src/future/flatten_stream.rs99
1 files changed, 99 insertions, 0 deletions
diff --git a/third_party/rust/futures-0.1.31/src/future/flatten_stream.rs b/third_party/rust/futures-0.1.31/src/future/flatten_stream.rs
new file mode 100644
index 0000000000..7bf3b9ca79
--- /dev/null
+++ b/third_party/rust/futures-0.1.31/src/future/flatten_stream.rs
@@ -0,0 +1,99 @@
+use {Async, Future, Poll};
+use core::fmt;
+use stream::Stream;
+
+/// Future for the `flatten_stream` combinator, flattening a
+/// future-of-a-stream to get just the result of the final stream as a stream.
+///
+/// This is created by the `Future::flatten_stream` method.
+#[must_use = "streams do nothing unless polled"]
+pub struct FlattenStream<F>
+ where F: Future,
+ <F as Future>::Item: Stream<Error=F::Error>,
+{
+ state: State<F>
+}
+
+impl<F> fmt::Debug for FlattenStream<F>
+ where F: Future + fmt::Debug,
+ <F as Future>::Item: Stream<Error=F::Error> + fmt::Debug,
+{
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("FlattenStream")
+ .field("state", &self.state)
+ .finish()
+ }
+}
+
+pub fn new<F>(f: F) -> FlattenStream<F>
+ where F: Future,
+ <F as Future>::Item: Stream<Error=F::Error>,
+{
+ FlattenStream {
+ state: State::Future(f)
+ }
+}
+
+#[derive(Debug)]
+enum State<F>
+ where F: Future,
+ <F as Future>::Item: Stream<Error=F::Error>,
+{
+ // future is not yet called or called and not ready
+ Future(F),
+ // future resolved to Stream
+ Stream(F::Item),
+ // EOF after future resolved to error
+ Eof,
+ // after EOF after future resolved to error
+ Done,
+}
+
+impl<F> Stream for FlattenStream<F>
+ where F: Future,
+ <F as Future>::Item: Stream<Error=F::Error>,
+{
+ type Item = <F::Item as Stream>::Item;
+ type Error = <F::Item as Stream>::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ loop {
+ let (next_state, ret_opt) = match self.state {
+ State::Future(ref mut f) => {
+ match f.poll() {
+ Ok(Async::NotReady) => {
+ // State is not changed, early return.
+ return Ok(Async::NotReady)
+ },
+ Ok(Async::Ready(stream)) => {
+ // Future resolved to stream.
+ // We do not return, but poll that
+ // stream in the next loop iteration.
+ (State::Stream(stream), None)
+ }
+ Err(e) => {
+ (State::Eof, Some(Err(e)))
+ }
+ }
+ }
+ State::Stream(ref mut s) => {
+ // Just forward call to the stream,
+ // do not track its state.
+ return s.poll();
+ }
+ State::Eof => {
+ (State::Done, Some(Ok(Async::Ready(None))))
+ }
+ State::Done => {
+ panic!("poll called after eof");
+ }
+ };
+
+ self.state = next_state;
+ if let Some(ret) = ret_opt {
+ return ret;
+ }
+ }
+ }
+}
+