use core::mem; use core::fmt::{Debug, Formatter, Result as FmtResult}; use core::default::Default; use {Poll, Async}; use future::Future; use stream::Stream; /// A stream combinator to concatenate the results of a stream into the first /// yielded item. /// /// This structure is produced by the `Stream::concat2` method. #[must_use = "streams do nothing unless polled"] pub struct Concat2 where S: Stream, { inner: ConcatSafe } impl Debug for Concat2 where S: Stream, S::Item: Debug { fn fmt(&self, fmt: &mut Formatter) -> FmtResult { fmt.debug_struct("Concat2") .field("inner", &self.inner) .finish() } } pub fn new2(s: S) -> Concat2 where S: Stream, S::Item: Extend<<::Item as IntoIterator>::Item> + IntoIterator + Default, { Concat2 { inner: new_safe(s) } } impl Future for Concat2 where S: Stream, S::Item: Extend<<::Item as IntoIterator>::Item> + IntoIterator + Default, { type Item = S::Item; type Error = S::Error; fn poll(&mut self) -> Poll { self.inner.poll().map(|a| { match a { Async::NotReady => Async::NotReady, Async::Ready(None) => Async::Ready(Default::default()), Async::Ready(Some(e)) => Async::Ready(e) } }) } } /// A stream combinator to concatenate the results of a stream into the first /// yielded item. /// /// This structure is produced by the `Stream::concat` method. #[deprecated(since="0.1.18", note="please use `Stream::Concat2` instead")] #[must_use = "streams do nothing unless polled"] pub struct Concat where S: Stream, { inner: ConcatSafe } #[allow(deprecated)] impl Debug for Concat where S: Stream, S::Item: Debug { fn fmt(&self, fmt: &mut Formatter) -> FmtResult { fmt.debug_struct("Concat") .field("inner", &self.inner) .finish() } } #[allow(deprecated)] pub fn new(s: S) -> Concat where S: Stream, S::Item: Extend<<::Item as IntoIterator>::Item> + IntoIterator, { Concat { inner: new_safe(s) } } #[allow(deprecated)] impl Future for Concat where S: Stream, S::Item: Extend<<::Item as IntoIterator>::Item> + IntoIterator, { type Item = S::Item; type Error = S::Error; fn poll(&mut self) -> Poll { self.inner.poll().map(|a| { match a { Async::NotReady => Async::NotReady, Async::Ready(None) => panic!("attempted concatenation of empty stream"), Async::Ready(Some(e)) => Async::Ready(e) } }) } } #[derive(Debug)] struct ConcatSafe where S: Stream, { stream: S, extend: Inner, } fn new_safe(s: S) -> ConcatSafe where S: Stream, S::Item: Extend<<::Item as IntoIterator>::Item> + IntoIterator, { ConcatSafe { stream: s, extend: Inner::First, } } impl Future for ConcatSafe where S: Stream, S::Item: Extend<<::Item as IntoIterator>::Item> + IntoIterator, { type Item = Option; type Error = S::Error; fn poll(&mut self) -> Poll { loop { match self.stream.poll() { Ok(Async::Ready(Some(i))) => { match self.extend { Inner::First => { self.extend = Inner::Extending(i); }, Inner::Extending(ref mut e) => { e.extend(i); }, Inner::Done => unreachable!(), } }, Ok(Async::Ready(None)) => { match mem::replace(&mut self.extend, Inner::Done) { Inner::First => return Ok(Async::Ready(None)), Inner::Extending(e) => return Ok(Async::Ready(Some(e))), Inner::Done => panic!("cannot poll Concat again") } }, Ok(Async::NotReady) => return Ok(Async::NotReady), Err(e) => { self.extend = Inner::Done; return Err(e) } } } } } #[derive(Debug)] enum Inner { First, Extending(E), Done, }