use bytes::Buf; use futures_core::stream::Stream; use pin_project_lite::pin_project; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; pin_project! { /// Convert a [`Stream`] of byte chunks into an [`AsyncRead`]. /// /// This type performs the inverse operation of [`ReaderStream`]. /// /// # Example /// /// ``` /// use bytes::Bytes; /// use tokio::io::{AsyncReadExt, Result}; /// use tokio_util::io::StreamReader; /// # #[tokio::main] /// # async fn main() -> std::io::Result<()> { /// /// // Create a stream from an iterator. /// let stream = tokio_stream::iter(vec![ /// Result::Ok(Bytes::from_static(&[0, 1, 2, 3])), /// Result::Ok(Bytes::from_static(&[4, 5, 6, 7])), /// Result::Ok(Bytes::from_static(&[8, 9, 10, 11])), /// ]); /// /// // Convert it to an AsyncRead. /// let mut read = StreamReader::new(stream); /// /// // Read five bytes from the stream. /// let mut buf = [0; 5]; /// read.read_exact(&mut buf).await?; /// assert_eq!(buf, [0, 1, 2, 3, 4]); /// /// // Read the rest of the current chunk. /// assert_eq!(read.read(&mut buf).await?, 3); /// assert_eq!(&buf[..3], [5, 6, 7]); /// /// // Read the next chunk. /// assert_eq!(read.read(&mut buf).await?, 4); /// assert_eq!(&buf[..4], [8, 9, 10, 11]); /// /// // We have now reached the end. /// assert_eq!(read.read(&mut buf).await?, 0); /// /// # Ok(()) /// # } /// ``` /// /// [`AsyncRead`]: tokio::io::AsyncRead /// [`Stream`]: futures_core::Stream /// [`ReaderStream`]: crate::io::ReaderStream #[derive(Debug)] pub struct StreamReader { #[pin] inner: S, chunk: Option, } } impl StreamReader where S: Stream>, B: Buf, E: Into, { /// Convert a stream of byte chunks into an [`AsyncRead`](tokio::io::AsyncRead). /// /// The item should be a [`Result`] with the ok variant being something that /// implements the [`Buf`] trait (e.g. `Vec` or `Bytes`). The error /// should be convertible into an [io error]. /// /// [`Result`]: std::result::Result /// [`Buf`]: bytes::Buf /// [io error]: std::io::Error pub fn new(stream: S) -> Self { Self { inner: stream, chunk: None, } } /// Do we have a chunk and is it non-empty? fn has_chunk(&self) -> bool { if let Some(ref chunk) = self.chunk { chunk.remaining() > 0 } else { false } } /// Consumes this `StreamReader`, returning a Tuple consisting /// of the underlying stream and an Option of the interal buffer, /// which is Some in case the buffer contains elements. pub fn into_inner_with_chunk(self) -> (S, Option) { if self.has_chunk() { (self.inner, self.chunk) } else { (self.inner, None) } } } impl StreamReader { /// Gets a reference to the underlying stream. /// /// It is inadvisable to directly read from the underlying stream. pub fn get_ref(&self) -> &S { &self.inner } /// Gets a mutable reference to the underlying stream. /// /// It is inadvisable to directly read from the underlying stream. pub fn get_mut(&mut self) -> &mut S { &mut self.inner } /// Gets a pinned mutable reference to the underlying stream. /// /// It is inadvisable to directly read from the underlying stream. pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S> { self.project().inner } /// Consumes this `BufWriter`, returning the underlying stream. /// /// Note that any leftover data in the internal buffer is lost. /// If you additionally want access to the internal buffer use /// [`into_inner_with_chunk`]. /// /// [`into_inner_with_chunk`]: crate::io::StreamReader::into_inner_with_chunk pub fn into_inner(self) -> S { self.inner } } impl AsyncRead for StreamReader where S: Stream>, B: Buf, E: Into, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { if buf.remaining() == 0 { return Poll::Ready(Ok(())); } let inner_buf = match self.as_mut().poll_fill_buf(cx) { Poll::Ready(Ok(buf)) => buf, Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), Poll::Pending => return Poll::Pending, }; let len = std::cmp::min(inner_buf.len(), buf.remaining()); buf.put_slice(&inner_buf[..len]); self.consume(len); Poll::Ready(Ok(())) } } impl AsyncBufRead for StreamReader where S: Stream>, B: Buf, E: Into, { fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if self.as_mut().has_chunk() { // This unwrap is very sad, but it can't be avoided. let buf = self.project().chunk.as_ref().unwrap().chunk(); return Poll::Ready(Ok(buf)); } else { match self.as_mut().project().inner.poll_next(cx) { Poll::Ready(Some(Ok(chunk))) => { // Go around the loop in case the chunk is empty. *self.as_mut().project().chunk = Some(chunk); } Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())), Poll::Ready(None) => return Poll::Ready(Ok(&[])), Poll::Pending => return Poll::Pending, } } } } fn consume(self: Pin<&mut Self>, amt: usize) { if amt > 0 { self.project() .chunk .as_mut() .expect("No chunk present") .advance(amt); } } }