use core::fmt; use core::marker::PhantomData; use core::pin::Pin; use futures_core::future::Future; use futures_core::ready; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use futures_sink::Sink; use pin_project_lite::pin_project; pin_project! { /// Sink for the [`with`](super::SinkExt::with) method. #[must_use = "sinks do nothing unless polled"] pub struct With { #[pin] sink: Si, f: F, #[pin] state: Option, _phantom: PhantomData Item>, } } impl fmt::Debug for With where Si: fmt::Debug, Fut: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("With").field("sink", &self.sink).field("state", &self.state).finish() } } impl With where Si: Sink, F: FnMut(U) -> Fut, Fut: Future, { pub(super) fn new(sink: Si, f: F) -> Self where Fut: Future>, E: From, { Self { state: None, sink, f, _phantom: PhantomData } } } impl Clone for With where Si: Clone, F: Clone, Fut: Clone, { fn clone(&self) -> Self { Self { state: self.state.clone(), sink: self.sink.clone(), f: self.f.clone(), _phantom: PhantomData, } } } // Forwarding impl of Stream from the underlying sink impl Stream for With where S: Stream + Sink, F: FnMut(U) -> Fut, Fut: Future, { type Item = S::Item; delegate_stream!(sink); } impl With where Si: Sink, F: FnMut(U) -> Fut, Fut: Future>, E: From, { delegate_access_inner!(sink, Si, ()); /// Completes the processing of previous item if any. fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); let item = match this.state.as_mut().as_pin_mut() { None => return Poll::Ready(Ok(())), Some(fut) => ready!(fut.poll(cx))?, }; this.state.set(None); this.sink.start_send(item)?; Poll::Ready(Ok(())) } } impl Sink for With where Si: Sink, F: FnMut(U) -> Fut, Fut: Future>, E: From, { type Error = E; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.as_mut().poll(cx))?; ready!(self.project().sink.poll_ready(cx)?); Poll::Ready(Ok(())) } fn start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error> { let mut this = self.project(); assert!(this.state.is_none()); this.state.set(Some((this.f)(item))); Ok(()) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.as_mut().poll(cx))?; ready!(self.project().sink.poll_flush(cx)?); Poll::Ready(Ok(())) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.as_mut().poll(cx))?; ready!(self.project().sink.poll_close(cx)?); Poll::Ready(Ok(())) } }