summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-util/src/io
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-util/src/io')
-rw-r--r--third_party/rust/tokio-util/src/io/mod.rs24
-rw-r--r--third_party/rust/tokio-util/src/io/read_buf.rs65
-rw-r--r--third_party/rust/tokio-util/src/io/reader_stream.rs118
-rw-r--r--third_party/rust/tokio-util/src/io/stream_reader.rs203
-rw-r--r--third_party/rust/tokio-util/src/io/sync_bridge.rs103
5 files changed, 513 insertions, 0 deletions
diff --git a/third_party/rust/tokio-util/src/io/mod.rs b/third_party/rust/tokio-util/src/io/mod.rs
new file mode 100644
index 0000000000..eb48a21fb9
--- /dev/null
+++ b/third_party/rust/tokio-util/src/io/mod.rs
@@ -0,0 +1,24 @@
+//! Helpers for IO related tasks.
+//!
+//! The stream types are often used in combination with hyper or reqwest, as they
+//! allow converting between a hyper [`Body`] and [`AsyncRead`].
+//!
+//! The [`SyncIoBridge`] type converts from the world of async I/O
+//! to synchronous I/O; this may often come up when using synchronous APIs
+//! inside [`tokio::task::spawn_blocking`].
+//!
+//! [`Body`]: https://docs.rs/hyper/0.13/hyper/struct.Body.html
+//! [`AsyncRead`]: tokio::io::AsyncRead
+
+mod read_buf;
+mod reader_stream;
+mod stream_reader;
+cfg_io_util! {
+ mod sync_bridge;
+ pub use self::sync_bridge::SyncIoBridge;
+}
+
+pub use self::read_buf::read_buf;
+pub use self::reader_stream::ReaderStream;
+pub use self::stream_reader::StreamReader;
+pub use crate::util::{poll_read_buf, poll_write_buf};
diff --git a/third_party/rust/tokio-util/src/io/read_buf.rs b/third_party/rust/tokio-util/src/io/read_buf.rs
new file mode 100644
index 0000000000..d7938a3bc1
--- /dev/null
+++ b/third_party/rust/tokio-util/src/io/read_buf.rs
@@ -0,0 +1,65 @@
+use bytes::BufMut;
+use std::future::Future;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::io::AsyncRead;
+
+/// Read data from an `AsyncRead` into an implementer of the [`BufMut`] trait.
+///
+/// [`BufMut`]: bytes::BufMut
+///
+/// # Example
+///
+/// ```
+/// use bytes::{Bytes, BytesMut};
+/// use tokio_stream as stream;
+/// use tokio::io::Result;
+/// use tokio_util::io::{StreamReader, read_buf};
+/// # #[tokio::main]
+/// # async fn main() -> std::io::Result<()> {
+///
+/// // Create a reader from an iterator. This particular reader will always be
+/// // ready.
+/// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))]));
+///
+/// let mut buf = BytesMut::new();
+/// let mut reads = 0;
+///
+/// loop {
+/// reads += 1;
+/// let n = read_buf(&mut read, &mut buf).await?;
+///
+/// if n == 0 {
+/// break;
+/// }
+/// }
+///
+/// // one or more reads might be necessary.
+/// assert!(reads >= 1);
+/// assert_eq!(&buf[..], &[0, 1, 2, 3]);
+/// # Ok(())
+/// # }
+/// ```
+pub async fn read_buf<R, B>(read: &mut R, buf: &mut B) -> io::Result<usize>
+where
+ R: AsyncRead + Unpin,
+ B: BufMut,
+{
+ return ReadBufFn(read, buf).await;
+
+ struct ReadBufFn<'a, R, B>(&'a mut R, &'a mut B);
+
+ impl<'a, R, B> Future for ReadBufFn<'a, R, B>
+ where
+ R: AsyncRead + Unpin,
+ B: BufMut,
+ {
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ crate::util::poll_read_buf(Pin::new(this.0), cx, this.1)
+ }
+ }
+}
diff --git a/third_party/rust/tokio-util/src/io/reader_stream.rs b/third_party/rust/tokio-util/src/io/reader_stream.rs
new file mode 100644
index 0000000000..866c11408d
--- /dev/null
+++ b/third_party/rust/tokio-util/src/io/reader_stream.rs
@@ -0,0 +1,118 @@
+use bytes::{Bytes, BytesMut};
+use futures_core::stream::Stream;
+use pin_project_lite::pin_project;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::io::AsyncRead;
+
+const DEFAULT_CAPACITY: usize = 4096;
+
+pin_project! {
+ /// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks.
+ ///
+ /// This stream is fused. It performs the inverse operation of
+ /// [`StreamReader`].
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// # #[tokio::main]
+ /// # async fn main() -> std::io::Result<()> {
+ /// use tokio_stream::StreamExt;
+ /// use tokio_util::io::ReaderStream;
+ ///
+ /// // Create a stream of data.
+ /// let data = b"hello, world!";
+ /// let mut stream = ReaderStream::new(&data[..]);
+ ///
+ /// // Read all of the chunks into a vector.
+ /// let mut stream_contents = Vec::new();
+ /// while let Some(chunk) = stream.next().await {
+ /// stream_contents.extend_from_slice(&chunk?);
+ /// }
+ ///
+ /// // Once the chunks are concatenated, we should have the
+ /// // original data.
+ /// assert_eq!(stream_contents, data);
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// [`AsyncRead`]: tokio::io::AsyncRead
+ /// [`StreamReader`]: crate::io::StreamReader
+ /// [`Stream`]: futures_core::Stream
+ #[derive(Debug)]
+ pub struct ReaderStream<R> {
+ // Reader itself.
+ //
+ // This value is `None` if the stream has terminated.
+ #[pin]
+ reader: Option<R>,
+ // Working buffer, used to optimize allocations.
+ buf: BytesMut,
+ capacity: usize,
+ }
+}
+
+impl<R: AsyncRead> ReaderStream<R> {
+ /// Convert an [`AsyncRead`] into a [`Stream`] with item type
+ /// `Result<Bytes, std::io::Error>`.
+ ///
+ /// [`AsyncRead`]: tokio::io::AsyncRead
+ /// [`Stream`]: futures_core::Stream
+ pub fn new(reader: R) -> Self {
+ ReaderStream {
+ reader: Some(reader),
+ buf: BytesMut::new(),
+ capacity: DEFAULT_CAPACITY,
+ }
+ }
+
+ /// Convert an [`AsyncRead`] into a [`Stream`] with item type
+ /// `Result<Bytes, std::io::Error>`,
+ /// with a specific read buffer initial capacity.
+ ///
+ /// [`AsyncRead`]: tokio::io::AsyncRead
+ /// [`Stream`]: futures_core::Stream
+ pub fn with_capacity(reader: R, capacity: usize) -> Self {
+ ReaderStream {
+ reader: Some(reader),
+ buf: BytesMut::with_capacity(capacity),
+ capacity,
+ }
+ }
+}
+
+impl<R: AsyncRead> Stream for ReaderStream<R> {
+ type Item = std::io::Result<Bytes>;
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ use crate::util::poll_read_buf;
+
+ let mut this = self.as_mut().project();
+
+ let reader = match this.reader.as_pin_mut() {
+ Some(r) => r,
+ None => return Poll::Ready(None),
+ };
+
+ if this.buf.capacity() == 0 {
+ this.buf.reserve(*this.capacity);
+ }
+
+ match poll_read_buf(reader, cx, &mut this.buf) {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(Err(err)) => {
+ self.project().reader.set(None);
+ Poll::Ready(Some(Err(err)))
+ }
+ Poll::Ready(Ok(0)) => {
+ self.project().reader.set(None);
+ Poll::Ready(None)
+ }
+ Poll::Ready(Ok(_)) => {
+ let chunk = this.buf.split();
+ Poll::Ready(Some(Ok(chunk.freeze())))
+ }
+ }
+ }
+}
diff --git a/third_party/rust/tokio-util/src/io/stream_reader.rs b/third_party/rust/tokio-util/src/io/stream_reader.rs
new file mode 100644
index 0000000000..05ae886557
--- /dev/null
+++ b/third_party/rust/tokio-util/src/io/stream_reader.rs
@@ -0,0 +1,203 @@
+use bytes::Buf;
+use futures_core::stream::Stream;
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};
+
+pin_project! {
+ /// Convert a [`Stream`] of byte chunks into an [`AsyncRead`].
+ ///
+ /// This type performs the inverse operation of [`ReaderStream`].
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// use bytes::Bytes;
+ /// use tokio::io::{AsyncReadExt, Result};
+ /// use tokio_util::io::StreamReader;
+ /// # #[tokio::main]
+ /// # async fn main() -> std::io::Result<()> {
+ ///
+ /// // Create a stream from an iterator.
+ /// let stream = tokio_stream::iter(vec![
+ /// Result::Ok(Bytes::from_static(&[0, 1, 2, 3])),
+ /// Result::Ok(Bytes::from_static(&[4, 5, 6, 7])),
+ /// Result::Ok(Bytes::from_static(&[8, 9, 10, 11])),
+ /// ]);
+ ///
+ /// // Convert it to an AsyncRead.
+ /// let mut read = StreamReader::new(stream);
+ ///
+ /// // Read five bytes from the stream.
+ /// let mut buf = [0; 5];
+ /// read.read_exact(&mut buf).await?;
+ /// assert_eq!(buf, [0, 1, 2, 3, 4]);
+ ///
+ /// // Read the rest of the current chunk.
+ /// assert_eq!(read.read(&mut buf).await?, 3);
+ /// assert_eq!(&buf[..3], [5, 6, 7]);
+ ///
+ /// // Read the next chunk.
+ /// assert_eq!(read.read(&mut buf).await?, 4);
+ /// assert_eq!(&buf[..4], [8, 9, 10, 11]);
+ ///
+ /// // We have now reached the end.
+ /// assert_eq!(read.read(&mut buf).await?, 0);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// [`AsyncRead`]: tokio::io::AsyncRead
+ /// [`Stream`]: futures_core::Stream
+ /// [`ReaderStream`]: crate::io::ReaderStream
+ #[derive(Debug)]
+ pub struct StreamReader<S, B> {
+ #[pin]
+ inner: S,
+ chunk: Option<B>,
+ }
+}
+
+impl<S, B, E> StreamReader<S, B>
+where
+ S: Stream<Item = Result<B, E>>,
+ B: Buf,
+ E: Into<std::io::Error>,
+{
+ /// Convert a stream of byte chunks into an [`AsyncRead`](tokio::io::AsyncRead).
+ ///
+ /// The item should be a [`Result`] with the ok variant being something that
+ /// implements the [`Buf`] trait (e.g. `Vec<u8>` or `Bytes`). The error
+ /// should be convertible into an [io error].
+ ///
+ /// [`Result`]: std::result::Result
+ /// [`Buf`]: bytes::Buf
+ /// [io error]: std::io::Error
+ pub fn new(stream: S) -> Self {
+ Self {
+ inner: stream,
+ chunk: None,
+ }
+ }
+
+ /// Do we have a chunk and is it non-empty?
+ fn has_chunk(&self) -> bool {
+ if let Some(ref chunk) = self.chunk {
+ chunk.remaining() > 0
+ } else {
+ false
+ }
+ }
+
+ /// Consumes this `StreamReader`, returning a Tuple consisting
+ /// of the underlying stream and an Option of the interal buffer,
+ /// which is Some in case the buffer contains elements.
+ pub fn into_inner_with_chunk(self) -> (S, Option<B>) {
+ if self.has_chunk() {
+ (self.inner, self.chunk)
+ } else {
+ (self.inner, None)
+ }
+ }
+}
+
+impl<S, B> StreamReader<S, B> {
+ /// Gets a reference to the underlying stream.
+ ///
+ /// It is inadvisable to directly read from the underlying stream.
+ pub fn get_ref(&self) -> &S {
+ &self.inner
+ }
+
+ /// Gets a mutable reference to the underlying stream.
+ ///
+ /// It is inadvisable to directly read from the underlying stream.
+ pub fn get_mut(&mut self) -> &mut S {
+ &mut self.inner
+ }
+
+ /// Gets a pinned mutable reference to the underlying stream.
+ ///
+ /// It is inadvisable to directly read from the underlying stream.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S> {
+ self.project().inner
+ }
+
+ /// Consumes this `BufWriter`, returning the underlying stream.
+ ///
+ /// Note that any leftover data in the internal buffer is lost.
+ /// If you additionally want access to the internal buffer use
+ /// [`into_inner_with_chunk`].
+ ///
+ /// [`into_inner_with_chunk`]: crate::io::StreamReader::into_inner_with_chunk
+ pub fn into_inner(self) -> S {
+ self.inner
+ }
+}
+
+impl<S, B, E> AsyncRead for StreamReader<S, B>
+where
+ S: Stream<Item = Result<B, E>>,
+ B: Buf,
+ E: Into<std::io::Error>,
+{
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ if buf.remaining() == 0 {
+ return Poll::Ready(Ok(()));
+ }
+
+ let inner_buf = match self.as_mut().poll_fill_buf(cx) {
+ Poll::Ready(Ok(buf)) => buf,
+ Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+ Poll::Pending => return Poll::Pending,
+ };
+ let len = std::cmp::min(inner_buf.len(), buf.remaining());
+ buf.put_slice(&inner_buf[..len]);
+
+ self.consume(len);
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl<S, B, E> AsyncBufRead for StreamReader<S, B>
+where
+ S: Stream<Item = Result<B, E>>,
+ B: Buf,
+ E: Into<std::io::Error>,
+{
+ fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ loop {
+ if self.as_mut().has_chunk() {
+ // This unwrap is very sad, but it can't be avoided.
+ let buf = self.project().chunk.as_ref().unwrap().chunk();
+ return Poll::Ready(Ok(buf));
+ } else {
+ match self.as_mut().project().inner.poll_next(cx) {
+ Poll::Ready(Some(Ok(chunk))) => {
+ // Go around the loop in case the chunk is empty.
+ *self.as_mut().project().chunk = Some(chunk);
+ }
+ Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())),
+ Poll::Ready(None) => return Poll::Ready(Ok(&[])),
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+ }
+ }
+ fn consume(self: Pin<&mut Self>, amt: usize) {
+ if amt > 0 {
+ self.project()
+ .chunk
+ .as_mut()
+ .expect("No chunk present")
+ .advance(amt);
+ }
+ }
+}
diff --git a/third_party/rust/tokio-util/src/io/sync_bridge.rs b/third_party/rust/tokio-util/src/io/sync_bridge.rs
new file mode 100644
index 0000000000..9be9446a7d
--- /dev/null
+++ b/third_party/rust/tokio-util/src/io/sync_bridge.rs
@@ -0,0 +1,103 @@
+use std::io::{Read, Write};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+
+/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
+/// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
+#[derive(Debug)]
+pub struct SyncIoBridge<T> {
+ src: T,
+ rt: tokio::runtime::Handle,
+}
+
+impl<T: AsyncRead + Unpin> Read for SyncIoBridge<T> {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ let src = &mut self.src;
+ self.rt.block_on(AsyncReadExt::read(src, buf))
+ }
+
+ fn read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize> {
+ let src = &mut self.src;
+ self.rt.block_on(src.read_to_end(buf))
+ }
+
+ fn read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize> {
+ let src = &mut self.src;
+ self.rt.block_on(src.read_to_string(buf))
+ }
+
+ fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
+ let src = &mut self.src;
+ // The AsyncRead trait returns the count, synchronous doesn't.
+ let _n = self.rt.block_on(src.read_exact(buf))?;
+ Ok(())
+ }
+}
+
+impl<T: AsyncWrite + Unpin> Write for SyncIoBridge<T> {
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ let src = &mut self.src;
+ self.rt.block_on(src.write(buf))
+ }
+
+ fn flush(&mut self) -> std::io::Result<()> {
+ let src = &mut self.src;
+ self.rt.block_on(src.flush())
+ }
+
+ fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
+ let src = &mut self.src;
+ self.rt.block_on(src.write_all(buf))
+ }
+
+ fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
+ let src = &mut self.src;
+ self.rt.block_on(src.write_vectored(bufs))
+ }
+}
+
+// Because https://doc.rust-lang.org/std/io/trait.Write.html#method.is_write_vectored is at the time
+// of this writing still unstable, we expose this as part of a standalone method.
+impl<T: AsyncWrite> SyncIoBridge<T> {
+ /// Determines if the underlying [`tokio::io::AsyncWrite`] target supports efficient vectored writes.
+ ///
+ /// See [`tokio::io::AsyncWrite::is_write_vectored`].
+ pub fn is_write_vectored(&self) -> bool {
+ self.src.is_write_vectored()
+ }
+}
+
+impl<T: Unpin> SyncIoBridge<T> {
+ /// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
+ /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
+ ///
+ /// When this struct is created, it captures a handle to the current thread's runtime with [`tokio::runtime::Handle::current`].
+ /// It is hence OK to move this struct into a separate thread outside the runtime, as created
+ /// by e.g. [`tokio::task::spawn_blocking`].
+ ///
+ /// Stated even more strongly: to make use of this bridge, you *must* move
+ /// it into a separate thread outside the runtime. The synchronous I/O will use the
+ /// underlying handle to block on the backing asynchronous source, via
+ /// [`tokio::runtime::Handle::block_on`]. As noted in the documentation for that
+ /// function, an attempt to `block_on` from an asynchronous execution context
+ /// will panic.
+ ///
+ /// # Wrapping `!Unpin` types
+ ///
+ /// Use e.g. `SyncIoBridge::new(Box::pin(src))`.
+ ///
+ /// # Panic
+ ///
+ /// This will panic if called outside the context of a Tokio runtime.
+ pub fn new(src: T) -> Self {
+ Self::new_with_handle(src, tokio::runtime::Handle::current())
+ }
+
+ /// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
+ /// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
+ ///
+ /// This is the same as [`SyncIoBridge::new`], but allows passing an arbitrary handle and hence may
+ /// be initially invoked outside of an asynchronous context.
+ pub fn new_with_handle(src: T, rt: tokio::runtime::Handle) -> Self {
+ Self { src, rt }
+ }
+}