use {Async, Future, Poll}; use core::fmt; use stream::Stream; /// Future for the `flatten_stream` combinator, flattening a /// future-of-a-stream to get just the result of the final stream as a stream. /// /// This is created by the `Future::flatten_stream` method. #[must_use = "streams do nothing unless polled"] pub struct FlattenStream where F: Future, ::Item: Stream, { state: State } impl fmt::Debug for FlattenStream where F: Future + fmt::Debug, ::Item: Stream + fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("FlattenStream") .field("state", &self.state) .finish() } } pub fn new(f: F) -> FlattenStream where F: Future, ::Item: Stream, { FlattenStream { state: State::Future(f) } } #[derive(Debug)] enum State where F: Future, ::Item: Stream, { // future is not yet called or called and not ready Future(F), // future resolved to Stream Stream(F::Item), // EOF after future resolved to error Eof, // after EOF after future resolved to error Done, } impl Stream for FlattenStream where F: Future, ::Item: Stream, { type Item = ::Item; type Error = ::Error; fn poll(&mut self) -> Poll, Self::Error> { loop { let (next_state, ret_opt) = match self.state { State::Future(ref mut f) => { match f.poll() { Ok(Async::NotReady) => { // State is not changed, early return. return Ok(Async::NotReady) }, Ok(Async::Ready(stream)) => { // Future resolved to stream. // We do not return, but poll that // stream in the next loop iteration. (State::Stream(stream), None) } Err(e) => { (State::Eof, Some(Err(e))) } } } State::Stream(ref mut s) => { // Just forward call to the stream, // do not track its state. return s.poll(); } State::Eof => { (State::Done, Some(Ok(Async::Ready(None)))) } State::Done => { panic!("poll called after eof"); } }; self.state = next_state; if let Some(ret) = ret_opt { return ret; } } } }