use crate::stream::{StreamExt, TryStreamExt, Fuse}; use core::fmt; use core::pin::Pin; use futures_core::future::Future; use futures_core::stream::{TryStream, Stream}; use futures_core::task::{Context, Poll}; use futures_sink::Sink; /// Future for the [`send_all`](super::SinkExt::send_all) method. #[allow(explicit_outlives_requirements)] // https://github.com/rust-lang/rust/issues/60993 #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct SendAll<'a, Si, St> where Si: ?Sized, St: ?Sized + TryStream, { sink: &'a mut Si, stream: Fuse<&'a mut St>, buffered: Option, } impl fmt::Debug for SendAll<'_, Si, St> where Si: fmt::Debug + ?Sized, St: fmt::Debug + ?Sized + TryStream, St::Ok: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("SendAll") .field("sink", &self.sink) .field("stream", &self.stream) .field("buffered", &self.buffered) .finish() } } // Pinning is never projected to any fields impl Unpin for SendAll<'_, Si, St> where Si: Unpin + ?Sized, St: TryStream + Unpin + ?Sized, {} impl<'a, Si, St, Ok, Error> SendAll<'a, Si, St> where Si: Sink + Unpin + ?Sized, St: TryStream + Stream + Unpin + ?Sized, { pub(super) fn new( sink: &'a mut Si, stream: &'a mut St, ) -> SendAll<'a, Si, St> { SendAll { sink, stream: stream.fuse(), buffered: None, } } fn try_start_send( &mut self, cx: &mut Context<'_>, item: St::Ok, ) -> Poll> { debug_assert!(self.buffered.is_none()); match Pin::new(&mut self.sink).poll_ready(cx)? { Poll::Ready(()) => { Poll::Ready(Pin::new(&mut self.sink).start_send(item)) } Poll::Pending => { self.buffered = Some(item); Poll::Pending } } } } impl Future for SendAll<'_, Si, St> where Si: Sink + Unpin + ?Sized, St: Stream> + Unpin + ?Sized, { type Output = Result<(), Error>; fn poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll { let this = &mut *self; // If we've got an item buffered already, we need to write it to the // sink before we can do anything else if let Some(item) = this.buffered.take() { ready!(this.try_start_send(cx, item))? } loop { match this.stream.try_poll_next_unpin(cx)? { Poll::Ready(Some(item)) => { ready!(this.try_start_send(cx, item))? } Poll::Ready(None) => { ready!(Pin::new(&mut this.sink).poll_flush(cx))?; return Poll::Ready(Ok(())) } Poll::Pending => { ready!(Pin::new(&mut this.sink).poll_flush(cx))?; return Poll::Pending } } } } }