use core::fmt; use core::marker::PhantomData; use core::pin::Pin; use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; use pin_project_lite::pin_project; pin_project! { /// Sink for the [`with_flat_map`](super::SinkExt::with_flat_map) method. #[must_use = "sinks do nothing unless polled"] pub struct WithFlatMap { #[pin] sink: Si, f: F, #[pin] stream: Option, buffer: Option, _marker: PhantomData, } } impl fmt::Debug for WithFlatMap where Si: fmt::Debug, St: fmt::Debug, Item: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WithFlatMap") .field("sink", &self.sink) .field("stream", &self.stream) .field("buffer", &self.buffer) .finish() } } impl WithFlatMap where Si: Sink, F: FnMut(U) -> St, St: Stream>, { pub(super) fn new(sink: Si, f: F) -> Self { Self { sink, f, stream: None, buffer: None, _marker: PhantomData } } delegate_access_inner!(sink, Si, ()); fn try_empty_stream(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); if this.buffer.is_some() { ready!(this.sink.as_mut().poll_ready(cx))?; let item = this.buffer.take().unwrap(); this.sink.as_mut().start_send(item)?; } if let Some(mut some_stream) = this.stream.as_mut().as_pin_mut() { while let Some(item) = ready!(some_stream.as_mut().poll_next(cx)?) { match this.sink.as_mut().poll_ready(cx)? { Poll::Ready(()) => this.sink.as_mut().start_send(item)?, Poll::Pending => { *this.buffer = Some(item); return Poll::Pending; } }; } } this.stream.set(None); Poll::Ready(Ok(())) } } // Forwarding impl of Stream from the underlying sink impl Stream for WithFlatMap where S: Stream + Sink, F: FnMut(U) -> St, St: Stream>, { type Item = S::Item; delegate_stream!(sink); } impl FusedStream for WithFlatMap where S: FusedStream + Sink, F: FnMut(U) -> St, St: Stream>, { fn is_terminated(&self) -> bool { self.sink.is_terminated() } } impl Sink for WithFlatMap where Si: Sink, F: FnMut(U) -> St, St: Stream>, { type Error = Si::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.try_empty_stream(cx) } fn start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error> { let mut this = self.project(); assert!(this.stream.is_none()); this.stream.set(Some((this.f)(item))); Ok(()) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.as_mut().try_empty_stream(cx)?); self.project().sink.poll_flush(cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.as_mut().try_empty_stream(cx)?); self.project().sink.poll_close(cx) } }