use core::pin::Pin; use futures_core::future::{FusedFuture, Future, TryFuture}; use futures_core::ready; use futures_core::stream::{FusedStream, Stream, TryStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project_lite::pin_project; pin_project! { #[project = TryFlattenProj] #[derive(Debug)] pub enum TryFlatten { First { #[pin] f: Fut1 }, Second { #[pin] f: Fut2 }, Empty, } } impl TryFlatten { pub(crate) fn new(future: Fut1) -> Self { Self::First { f: future } } } impl FusedFuture for TryFlatten where Fut: TryFuture, Fut::Ok: TryFuture, { fn is_terminated(&self) -> bool { match self { Self::Empty => true, _ => false, } } } impl Future for TryFlatten where Fut: TryFuture, Fut::Ok: TryFuture, { type Output = Result<::Ok, Fut::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { Poll::Ready(loop { match self.as_mut().project() { TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { Ok(f) => self.set(Self::Second { f }), Err(e) => { self.set(Self::Empty); break Err(e); } }, TryFlattenProj::Second { f } => { let output = ready!(f.try_poll(cx)); self.set(Self::Empty); break output; } TryFlattenProj::Empty => panic!("TryFlatten polled after completion"), } }) } } impl FusedStream for TryFlatten where Fut: TryFuture, Fut::Ok: TryStream, { fn is_terminated(&self) -> bool { match self { Self::Empty => true, _ => false, } } } impl Stream for TryFlatten where Fut: TryFuture, Fut::Ok: TryStream, { type Item = Result<::Ok, Fut::Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Poll::Ready(loop { match self.as_mut().project() { TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { Ok(f) => self.set(Self::Second { f }), Err(e) => { self.set(Self::Empty); break Some(Err(e)); } }, TryFlattenProj::Second { f } => { let output = ready!(f.try_poll_next(cx)); if output.is_none() { self.set(Self::Empty); } break output; } TryFlattenProj::Empty => break None, } }) } } #[cfg(feature = "sink")] impl Sink for TryFlatten where Fut: TryFuture, Fut::Ok: Sink, { type Error = Fut::Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Poll::Ready(loop { match self.as_mut().project() { TryFlattenProj::First { f } => match ready!(f.try_poll(cx)) { Ok(f) => self.set(Self::Second { f }), Err(e) => { self.set(Self::Empty); break Err(e); } }, TryFlattenProj::Second { f } => { break ready!(f.poll_ready(cx)); } TryFlattenProj::Empty => panic!("poll_ready called after eof"), } }) } fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { match self.project() { TryFlattenProj::First { .. } => panic!("poll_ready not called first"), TryFlattenProj::Second { f } => f.start_send(item), TryFlattenProj::Empty => panic!("start_send called after eof"), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project() { TryFlattenProj::First { .. } => Poll::Ready(Ok(())), TryFlattenProj::Second { f } => f.poll_flush(cx), TryFlattenProj::Empty => panic!("poll_flush called after eof"), } } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let res = match self.as_mut().project() { TryFlattenProj::Second { f } => f.poll_close(cx), _ => Poll::Ready(Ok(())), }; if res.is_ready() { self.set(Self::Empty); } res } }