use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; use pin_project_lite::pin_project; pin_project! { /// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method. #[derive(Debug, Clone)] #[must_use = "sinks do nothing unless polled"] pub struct SinkMapErr { #[pin] sink: Si, f: Option, } } impl SinkMapErr { pub(super) fn new(sink: Si, f: F) -> Self { Self { sink, f: Some(f) } } delegate_access_inner!(sink, Si, ()); fn take_f(self: Pin<&mut Self>) -> F { self.project().f.take().expect("polled MapErr after completion") } } impl Sink for SinkMapErr where Si: Sink, F: FnOnce(Si::Error) -> E, { type Error = E; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.as_mut().project().sink.poll_ready(cx).map_err(|e| self.as_mut().take_f()(e)) } fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { self.as_mut().project().sink.start_send(item).map_err(|e| self.as_mut().take_f()(e)) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.as_mut().project().sink.poll_flush(cx).map_err(|e| self.as_mut().take_f()(e)) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.as_mut().project().sink.poll_close(cx).map_err(|e| self.as_mut().take_f()(e)) } } // Forwarding impl of Stream from the underlying sink impl Stream for SinkMapErr { type Item = S::Item; delegate_stream!(sink); } impl FusedStream for SinkMapErr { fn is_terminated(&self) -> bool { self.sink.is_terminated() } }