diff options
Diffstat (limited to 'vendor/futures-util/src/stream/try_stream/into_async_read.rs')
-rw-r--r-- | vendor/futures-util/src/stream/try_stream/into_async_read.rs | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/vendor/futures-util/src/stream/try_stream/into_async_read.rs b/vendor/futures-util/src/stream/try_stream/into_async_read.rs new file mode 100644 index 000000000..914b277a0 --- /dev/null +++ b/vendor/futures-util/src/stream/try_stream/into_async_read.rs @@ -0,0 +1,165 @@ +use crate::stream::TryStreamExt; +use core::pin::Pin; +use futures_core::ready; +use futures_core::stream::TryStream; +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; +use std::cmp; +use std::io::{Error, Result}; + +/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. +#[derive(Debug)] +#[must_use = "readers do nothing unless polled"] +#[cfg_attr(docsrs, doc(cfg(feature = "io")))] +pub struct IntoAsyncRead<St> +where + St: TryStream<Error = Error> + Unpin, + St::Ok: AsRef<[u8]>, +{ + stream: St, + state: ReadState<St::Ok>, +} + +impl<St> Unpin for IntoAsyncRead<St> +where + St: TryStream<Error = Error> + Unpin, + St::Ok: AsRef<[u8]>, +{ +} + +#[derive(Debug)] +enum ReadState<T: AsRef<[u8]>> { + Ready { chunk: T, chunk_start: usize }, + PendingChunk, + Eof, +} + +impl<St> IntoAsyncRead<St> +where + St: TryStream<Error = Error> + Unpin, + St::Ok: AsRef<[u8]>, +{ + pub(super) fn new(stream: St) -> Self { + Self { stream, state: ReadState::PendingChunk } + } +} + +impl<St> AsyncRead for IntoAsyncRead<St> +where + St: TryStream<Error = Error> + Unpin, + St::Ok: AsRef<[u8]>, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<Result<usize>> { + loop { + match &mut self.state { + ReadState::Ready { chunk, chunk_start } => { + let chunk = chunk.as_ref(); + let len = cmp::min(buf.len(), chunk.len() - *chunk_start); + + buf[..len].copy_from_slice(&chunk[*chunk_start..*chunk_start + len]); + *chunk_start += len; + + if chunk.len() == *chunk_start { + self.state = ReadState::PendingChunk; + } + + return Poll::Ready(Ok(len)); + } + ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) { + Some(Ok(chunk)) => { + if !chunk.as_ref().is_empty() { + self.state = ReadState::Ready { chunk, chunk_start: 0 }; + } + } + Some(Err(err)) => { + self.state = ReadState::Eof; + return Poll::Ready(Err(err)); + } + None => { + self.state = ReadState::Eof; + return Poll::Ready(Ok(0)); + } + }, + ReadState::Eof => { + return Poll::Ready(Ok(0)); + } + } + } + } +} + +impl<St> AsyncWrite for IntoAsyncRead<St> +where + St: TryStream<Error = Error> + AsyncWrite + Unpin, + St::Ok: AsRef<[u8]>, +{ + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<Result<usize>> { + Pin::new(&mut self.stream).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + Pin::new(&mut self.stream).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + Pin::new(&mut self.stream).poll_close(cx) + } +} + +impl<St> AsyncBufRead for IntoAsyncRead<St> +where + St: TryStream<Error = Error> + Unpin, + St::Ok: AsRef<[u8]>, +{ + fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { + while let ReadState::PendingChunk = self.state { + match ready!(self.stream.try_poll_next_unpin(cx)) { + Some(Ok(chunk)) => { + if !chunk.as_ref().is_empty() { + self.state = ReadState::Ready { chunk, chunk_start: 0 }; + } + } + Some(Err(err)) => { + self.state = ReadState::Eof; + return Poll::Ready(Err(err)); + } + None => { + self.state = ReadState::Eof; + return Poll::Ready(Ok(&[])); + } + } + } + + if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state { + let chunk = chunk.as_ref(); + return Poll::Ready(Ok(&chunk[chunk_start..])); + } + + // To get to this point we must be in ReadState::Eof + Poll::Ready(Ok(&[])) + } + + fn consume(mut self: Pin<&mut Self>, amount: usize) { + // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295 + if amount == 0 { + return; + } + if let ReadState::Ready { chunk, chunk_start } = &mut self.state { + *chunk_start += amount; + debug_assert!(*chunk_start <= chunk.as_ref().len()); + if *chunk_start >= chunk.as_ref().len() { + self.state = ReadState::PendingChunk; + } + } else { + debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk"); + } + } +} |