summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-util/src/io
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
commit26a029d407be480d791972afb5975cf62c9360a6 (patch)
treef435a8308119effd964b339f76abb83a57c29483 /third_party/rust/futures-util/src/io
parentInitial commit. (diff)
downloadfirefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz
firefox-26a029d407be480d791972afb5975cf62c9360a6.zip
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-util/src/io')
-rw-r--r--third_party/rust/futures-util/src/io/allow_std.rs200
-rw-r--r--third_party/rust/futures-util/src/io/buf_reader.rs263
-rw-r--r--third_party/rust/futures-util/src/io/buf_writer.rs224
-rw-r--r--third_party/rust/futures-util/src/io/chain.rs142
-rw-r--r--third_party/rust/futures-util/src/io/close.rs28
-rw-r--r--third_party/rust/futures-util/src/io/copy.rs58
-rw-r--r--third_party/rust/futures-util/src/io/copy_buf.rs78
-rw-r--r--third_party/rust/futures-util/src/io/copy_buf_abortable.rs124
-rw-r--r--third_party/rust/futures-util/src/io/cursor.rs232
-rw-r--r--third_party/rust/futures-util/src/io/empty.rs59
-rw-r--r--third_party/rust/futures-util/src/io/fill_buf.rs51
-rw-r--r--third_party/rust/futures-util/src/io/flush.rs31
-rw-r--r--third_party/rust/futures-util/src/io/into_sink.rs82
-rw-r--r--third_party/rust/futures-util/src/io/line_writer.rs155
-rw-r--r--third_party/rust/futures-util/src/io/lines.rs47
-rw-r--r--third_party/rust/futures-util/src/io/mod.rs841
-rw-r--r--third_party/rust/futures-util/src/io/read.rs30
-rw-r--r--third_party/rust/futures-util/src/io/read_exact.rs42
-rw-r--r--third_party/rust/futures-util/src/io/read_line.rs57
-rw-r--r--third_party/rust/futures-util/src/io/read_to_end.rs91
-rw-r--r--third_party/rust/futures-util/src/io/read_to_string.rs59
-rw-r--r--third_party/rust/futures-util/src/io/read_until.rs60
-rw-r--r--third_party/rust/futures-util/src/io/read_vectored.rs30
-rw-r--r--third_party/rust/futures-util/src/io/repeat.rs66
-rw-r--r--third_party/rust/futures-util/src/io/seek.rs30
-rw-r--r--third_party/rust/futures-util/src/io/sink.rs67
-rw-r--r--third_party/rust/futures-util/src/io/split.rs115
-rw-r--r--third_party/rust/futures-util/src/io/take.rs125
-rw-r--r--third_party/rust/futures-util/src/io/window.rs104
-rw-r--r--third_party/rust/futures-util/src/io/write.rs30
-rw-r--r--third_party/rust/futures-util/src/io/write_all.rs43
-rw-r--r--third_party/rust/futures-util/src/io/write_all_vectored.rs193
-rw-r--r--third_party/rust/futures-util/src/io/write_vectored.rs30
33 files changed, 3787 insertions, 0 deletions
diff --git a/third_party/rust/futures-util/src/io/allow_std.rs b/third_party/rust/futures-util/src/io/allow_std.rs
new file mode 100644
index 0000000000..ec30ee31e5
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/allow_std.rs
@@ -0,0 +1,200 @@
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom};
+use std::pin::Pin;
+use std::{fmt, io};
+
+/// A simple wrapper type which allows types which implement only
+/// implement `std::io::Read` or `std::io::Write`
+/// to be used in contexts which expect an `AsyncRead` or `AsyncWrite`.
+///
+/// If these types issue an error with the kind `io::ErrorKind::WouldBlock`,
+/// it is expected that they will notify the current task on readiness.
+/// Synchronous `std` types should not issue errors of this kind and
+/// are safe to use in this context. However, using these types with
+/// `AllowStdIo` will cause the event loop to block, so they should be used
+/// with care.
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct AllowStdIo<T>(T);
+
+impl<T> Unpin for AllowStdIo<T> {}
+
+macro_rules! try_with_interrupt {
+ ($e:expr) => {
+ loop {
+ match $e {
+ Ok(e) => {
+ break e;
+ }
+ Err(ref e) if e.kind() == ::std::io::ErrorKind::Interrupted => {
+ continue;
+ }
+ Err(e) => {
+ return Poll::Ready(Err(e));
+ }
+ }
+ }
+ };
+}
+
+impl<T> AllowStdIo<T> {
+ /// Creates a new `AllowStdIo` from an existing IO object.
+ pub fn new(io: T) -> Self {
+ Self(io)
+ }
+
+ /// Returns a reference to the contained IO object.
+ pub fn get_ref(&self) -> &T {
+ &self.0
+ }
+
+ /// Returns a mutable reference to the contained IO object.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.0
+ }
+
+ /// Consumes self and returns the contained IO object.
+ pub fn into_inner(self) -> T {
+ self.0
+ }
+}
+
+impl<T> io::Write for AllowStdIo<T>
+where
+ T: io::Write,
+{
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.0.write(buf)
+ }
+ fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
+ self.0.write_vectored(bufs)
+ }
+ fn flush(&mut self) -> io::Result<()> {
+ self.0.flush()
+ }
+ fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
+ self.0.write_all(buf)
+ }
+ fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> {
+ self.0.write_fmt(fmt)
+ }
+}
+
+impl<T> AsyncWrite for AllowStdIo<T>
+where
+ T: io::Write,
+{
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(Ok(try_with_interrupt!(self.0.write(buf))))
+ }
+
+ fn poll_write_vectored(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(Ok(try_with_interrupt!(self.0.write_vectored(bufs))))
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ try_with_interrupt!(self.0.flush());
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.poll_flush(cx)
+ }
+}
+
+impl<T> io::Read for AllowStdIo<T>
+where
+ T: io::Read,
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.0.read(buf)
+ }
+ fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
+ self.0.read_vectored(bufs)
+ }
+ fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
+ self.0.read_to_end(buf)
+ }
+ fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
+ self.0.read_to_string(buf)
+ }
+ fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
+ self.0.read_exact(buf)
+ }
+}
+
+impl<T> AsyncRead for AllowStdIo<T>
+where
+ T: io::Read,
+{
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(Ok(try_with_interrupt!(self.0.read(buf))))
+ }
+
+ fn poll_read_vectored(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(Ok(try_with_interrupt!(self.0.read_vectored(bufs))))
+ }
+}
+
+impl<T> io::Seek for AllowStdIo<T>
+where
+ T: io::Seek,
+{
+ fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
+ self.0.seek(pos)
+ }
+}
+
+impl<T> AsyncSeek for AllowStdIo<T>
+where
+ T: io::Seek,
+{
+ fn poll_seek(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<u64>> {
+ Poll::Ready(Ok(try_with_interrupt!(self.0.seek(pos))))
+ }
+}
+
+impl<T> io::BufRead for AllowStdIo<T>
+where
+ T: io::BufRead,
+{
+ fn fill_buf(&mut self) -> io::Result<&[u8]> {
+ self.0.fill_buf()
+ }
+ fn consume(&mut self, amt: usize) {
+ self.0.consume(amt)
+ }
+}
+
+impl<T> AsyncBufRead for AllowStdIo<T>
+where
+ T: io::BufRead,
+{
+ fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ let this: *mut Self = &mut *self as *mut _;
+ Poll::Ready(Ok(try_with_interrupt!(unsafe { &mut *this }.0.fill_buf())))
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ self.0.consume(amt)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/buf_reader.rs b/third_party/rust/futures-util/src/io/buf_reader.rs
new file mode 100644
index 0000000000..0334a9f081
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/buf_reader.rs
@@ -0,0 +1,263 @@
+use super::DEFAULT_BUF_SIZE;
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
+use pin_project_lite::pin_project;
+use std::io::{self, Read};
+use std::pin::Pin;
+use std::{cmp, fmt};
+
+pin_project! {
+ /// The `BufReader` struct adds buffering to any reader.
+ ///
+ /// It can be excessively inefficient to work directly with a [`AsyncRead`]
+ /// instance. A `BufReader` performs large, infrequent reads on the underlying
+ /// [`AsyncRead`] and maintains an in-memory buffer of the results.
+ ///
+ /// `BufReader` can improve the speed of programs that make *small* and
+ /// *repeated* read calls to the same file or network socket. It does not
+ /// help when reading very large amounts at once, or reading just one or a few
+ /// times. It also provides no advantage when reading from a source that is
+ /// already in memory, like a `Vec<u8>`.
+ ///
+ /// When the `BufReader` is dropped, the contents of its buffer will be
+ /// discarded. Creating multiple instances of a `BufReader` on the same
+ /// stream can cause data loss.
+ ///
+ /// [`AsyncRead`]: futures_io::AsyncRead
+ ///
+ // TODO: Examples
+ pub struct BufReader<R> {
+ #[pin]
+ inner: R,
+ buffer: Box<[u8]>,
+ pos: usize,
+ cap: usize,
+ }
+}
+
+impl<R: AsyncRead> BufReader<R> {
+ /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
+ /// but may change in the future.
+ pub fn new(inner: R) -> Self {
+ Self::with_capacity(DEFAULT_BUF_SIZE, inner)
+ }
+
+ /// Creates a new `BufReader` with the specified buffer capacity.
+ pub fn with_capacity(capacity: usize, inner: R) -> Self {
+ unsafe {
+ let mut buffer = Vec::with_capacity(capacity);
+ buffer.set_len(capacity);
+ super::initialize(&inner, &mut buffer);
+ Self { inner, buffer: buffer.into_boxed_slice(), pos: 0, cap: 0 }
+ }
+ }
+
+ delegate_access_inner!(inner, R, ());
+
+ /// Returns a reference to the internally buffered data.
+ ///
+ /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
+ pub fn buffer(&self) -> &[u8] {
+ &self.buffer[self.pos..self.cap]
+ }
+
+ /// Invalidates all data in the internal buffer.
+ #[inline]
+ fn discard_buffer(self: Pin<&mut Self>) {
+ let this = self.project();
+ *this.pos = 0;
+ *this.cap = 0;
+ }
+}
+
+impl<R: AsyncRead + AsyncSeek> BufReader<R> {
+ /// Seeks relative to the current position. If the new position lies within the buffer,
+ /// the buffer will not be flushed, allowing for more efficient seeks.
+ /// This method does not return the location of the underlying reader, so the caller
+ /// must track this information themselves if it is required.
+ pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> {
+ SeeKRelative { inner: self, offset, first: true }
+ }
+
+ /// Attempts to seek relative to the current position. If the new position lies within the buffer,
+ /// the buffer will not be flushed, allowing for more efficient seeks.
+ /// This method does not return the location of the underlying reader, so the caller
+ /// must track this information themselves if it is required.
+ pub fn poll_seek_relative(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ offset: i64,
+ ) -> Poll<io::Result<()>> {
+ let pos = self.pos as u64;
+ if offset < 0 {
+ if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
+ *self.project().pos = new_pos as usize;
+ return Poll::Ready(Ok(()));
+ }
+ } else if let Some(new_pos) = pos.checked_add(offset as u64) {
+ if new_pos <= self.cap as u64 {
+ *self.project().pos = new_pos as usize;
+ return Poll::Ready(Ok(()));
+ }
+ }
+ self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ()))
+ }
+}
+
+impl<R: AsyncRead> AsyncRead for BufReader<R> {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ // If we don't have any buffered data and we're doing a massive read
+ // (larger than our internal buffer), bypass our internal buffer
+ // entirely.
+ if self.pos == self.cap && buf.len() >= self.buffer.len() {
+ let res = ready!(self.as_mut().project().inner.poll_read(cx, buf));
+ self.discard_buffer();
+ return Poll::Ready(res);
+ }
+ let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
+ let nread = rem.read(buf)?;
+ self.consume(nread);
+ Poll::Ready(Ok(nread))
+ }
+
+ fn poll_read_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<io::Result<usize>> {
+ let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
+ if self.pos == self.cap && total_len >= self.buffer.len() {
+ let res = ready!(self.as_mut().project().inner.poll_read_vectored(cx, bufs));
+ self.discard_buffer();
+ return Poll::Ready(res);
+ }
+ let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
+ let nread = rem.read_vectored(bufs)?;
+ self.consume(nread);
+ Poll::Ready(Ok(nread))
+ }
+}
+
+impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ let this = self.project();
+
+ // If we've reached the end of our internal buffer then we need to fetch
+ // some more data from the underlying reader.
+ // Branch using `>=` instead of the more correct `==`
+ // to tell the compiler that the pos..cap slice is always valid.
+ if *this.pos >= *this.cap {
+ debug_assert!(*this.pos == *this.cap);
+ *this.cap = ready!(this.inner.poll_read(cx, this.buffer))?;
+ *this.pos = 0;
+ }
+ Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap]))
+ }
+
+ fn consume(self: Pin<&mut Self>, amt: usize) {
+ *self.project().pos = cmp::min(self.pos + amt, self.cap);
+ }
+}
+
+impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
+ delegate_async_write!(inner);
+}
+
+impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("BufReader")
+ .field("reader", &self.inner)
+ .field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buffer.len()))
+ .finish()
+ }
+}
+
+impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
+ /// Seek to an offset, in bytes, in the underlying reader.
+ ///
+ /// The position used for seeking with `SeekFrom::Current(_)` is the
+ /// position the underlying reader would be at if the `BufReader` had no
+ /// internal buffer.
+ ///
+ /// Seeking always discards the internal buffer, even if the seek position
+ /// would otherwise fall within it. This guarantees that calling
+ /// `.into_inner()` immediately after a seek yields the underlying reader
+ /// at the same position.
+ ///
+ /// To seek without discarding the internal buffer, use
+ /// [`BufReader::seek_relative`](BufReader::seek_relative) or
+ /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
+ ///
+ /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
+ ///
+ /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
+ /// where `n` minus the internal buffer length overflows an `i64`, two
+ /// seeks will be performed instead of one. If the second seek returns
+ /// `Err`, the underlying reader will be left at the same position it would
+ /// have if you called `seek` with `SeekFrom::Current(0)`.
+ fn poll_seek(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<u64>> {
+ let result: u64;
+ if let SeekFrom::Current(n) = pos {
+ let remainder = (self.cap - self.pos) as i64;
+ // it should be safe to assume that remainder fits within an i64 as the alternative
+ // means we managed to allocate 8 exbibytes and that's absurd.
+ // But it's not out of the realm of possibility for some weird underlying reader to
+ // support seeking by i64::min_value() so we need to handle underflow when subtracting
+ // remainder.
+ if let Some(offset) = n.checked_sub(remainder) {
+ result =
+ ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(offset)))?;
+ } else {
+ // seek backwards by our remainder, and then by the offset
+ ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(-remainder)))?;
+ self.as_mut().discard_buffer();
+ result = ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(n)))?;
+ }
+ } else {
+ // Seeking with Start/End doesn't care about our buffer length.
+ result = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?;
+ }
+ self.discard_buffer();
+ Poll::Ready(Ok(result))
+ }
+}
+
+/// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless polled"]
+pub struct SeeKRelative<'a, R> {
+ inner: Pin<&'a mut BufReader<R>>,
+ offset: i64,
+ first: bool,
+}
+
+impl<R> Future for SeeKRelative<'_, R>
+where
+ R: AsyncRead + AsyncSeek,
+{
+ type Output = io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let offset = self.offset;
+ if self.first {
+ self.first = false;
+ self.inner.as_mut().poll_seek_relative(cx, offset)
+ } else {
+ self.inner
+ .as_mut()
+ .as_mut()
+ .poll_seek(cx, SeekFrom::Current(offset))
+ .map(|res| res.map(|_| ()))
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/buf_writer.rs b/third_party/rust/futures-util/src/io/buf_writer.rs
new file mode 100644
index 0000000000..cb74863ad0
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/buf_writer.rs
@@ -0,0 +1,224 @@
+use super::DEFAULT_BUF_SIZE;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, SeekFrom};
+use pin_project_lite::pin_project;
+use std::fmt;
+use std::io::{self, Write};
+use std::pin::Pin;
+use std::ptr;
+
+pin_project! {
+ /// Wraps a writer and buffers its output.
+ ///
+ /// It can be excessively inefficient to work directly with something that
+ /// implements [`AsyncWrite`]. A `BufWriter` keeps an in-memory buffer of data and
+ /// writes it to an underlying writer in large, infrequent batches.
+ ///
+ /// `BufWriter` can improve the speed of programs that make *small* and
+ /// *repeated* write calls to the same file or network socket. It does not
+ /// help when writing very large amounts at once, or writing just one or a few
+ /// times. It also provides no advantage when writing to a destination that is
+ /// in memory, like a `Vec<u8>`.
+ ///
+ /// When the `BufWriter` is dropped, the contents of its buffer will be
+ /// discarded. Creating multiple instances of a `BufWriter` on the same
+ /// stream can cause data loss. If you need to write out the contents of its
+ /// buffer, you must manually call flush before the writer is dropped.
+ ///
+ /// [`AsyncWrite`]: futures_io::AsyncWrite
+ /// [`flush`]: super::AsyncWriteExt::flush
+ ///
+ // TODO: Examples
+ pub struct BufWriter<W> {
+ #[pin]
+ inner: W,
+ buf: Vec<u8>,
+ written: usize,
+ }
+}
+
+impl<W: AsyncWrite> BufWriter<W> {
+ /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB,
+ /// but may change in the future.
+ pub fn new(inner: W) -> Self {
+ Self::with_capacity(DEFAULT_BUF_SIZE, inner)
+ }
+
+ /// Creates a new `BufWriter` with the specified buffer capacity.
+ pub fn with_capacity(cap: usize, inner: W) -> Self {
+ Self { inner, buf: Vec::with_capacity(cap), written: 0 }
+ }
+
+ pub(super) fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let mut this = self.project();
+
+ let len = this.buf.len();
+ let mut ret = Ok(());
+ while *this.written < len {
+ match ready!(this.inner.as_mut().poll_write(cx, &this.buf[*this.written..])) {
+ Ok(0) => {
+ ret = Err(io::Error::new(
+ io::ErrorKind::WriteZero,
+ "failed to write the buffered data",
+ ));
+ break;
+ }
+ Ok(n) => *this.written += n,
+ Err(e) => {
+ ret = Err(e);
+ break;
+ }
+ }
+ }
+ if *this.written > 0 {
+ this.buf.drain(..*this.written);
+ }
+ *this.written = 0;
+ Poll::Ready(ret)
+ }
+
+ delegate_access_inner!(inner, W, ());
+
+ /// Returns a reference to the internally buffered data.
+ pub fn buffer(&self) -> &[u8] {
+ &self.buf
+ }
+
+ /// Capacity of `buf`. how many chars can be held in buffer
+ pub(super) fn capacity(&self) -> usize {
+ self.buf.capacity()
+ }
+
+ /// Remaining number of bytes to reach `buf` 's capacity
+ #[inline]
+ pub(super) fn spare_capacity(&self) -> usize {
+ self.buf.capacity() - self.buf.len()
+ }
+
+ /// Write a byte slice directly into buffer
+ ///
+ /// Will truncate the number of bytes written to `spare_capacity()` so you want to
+ /// calculate the size of your slice to avoid losing bytes
+ ///
+ /// Based on `std::io::BufWriter`
+ pub(super) fn write_to_buf(self: Pin<&mut Self>, buf: &[u8]) -> usize {
+ let available = self.spare_capacity();
+ let amt_to_buffer = available.min(buf.len());
+
+ // SAFETY: `amt_to_buffer` is <= buffer's spare capacity by construction.
+ unsafe {
+ self.write_to_buffer_unchecked(&buf[..amt_to_buffer]);
+ }
+
+ amt_to_buffer
+ }
+
+ /// Write byte slice directly into `self.buf`
+ ///
+ /// Based on `std::io::BufWriter`
+ #[inline]
+ unsafe fn write_to_buffer_unchecked(self: Pin<&mut Self>, buf: &[u8]) {
+ debug_assert!(buf.len() <= self.spare_capacity());
+ let this = self.project();
+ let old_len = this.buf.len();
+ let buf_len = buf.len();
+ let src = buf.as_ptr();
+ let dst = this.buf.as_mut_ptr().add(old_len);
+ ptr::copy_nonoverlapping(src, dst, buf_len);
+ this.buf.set_len(old_len + buf_len);
+ }
+
+ /// Write directly using `inner`, bypassing buffering
+ pub(super) fn inner_poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.project().inner.poll_write(cx, buf)
+ }
+
+ /// Write directly using `inner`, bypassing buffering
+ pub(super) fn inner_poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.project().inner.poll_write_vectored(cx, bufs)
+ }
+}
+
+impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if self.buf.len() + buf.len() > self.buf.capacity() {
+ ready!(self.as_mut().flush_buf(cx))?;
+ }
+ if buf.len() >= self.buf.capacity() {
+ self.project().inner.poll_write(cx, buf)
+ } else {
+ Poll::Ready(self.project().buf.write(buf))
+ }
+ }
+
+ fn poll_write_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
+ if self.buf.len() + total_len > self.buf.capacity() {
+ ready!(self.as_mut().flush_buf(cx))?;
+ }
+ if total_len >= self.buf.capacity() {
+ self.project().inner.poll_write_vectored(cx, bufs)
+ } else {
+ Poll::Ready(self.project().buf.write_vectored(bufs))
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ ready!(self.as_mut().flush_buf(cx))?;
+ self.project().inner.poll_flush(cx)
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ ready!(self.as_mut().flush_buf(cx))?;
+ self.project().inner.poll_close(cx)
+ }
+}
+
+impl<W: AsyncRead> AsyncRead for BufWriter<W> {
+ delegate_async_read!(inner);
+}
+
+impl<W: AsyncBufRead> AsyncBufRead for BufWriter<W> {
+ delegate_async_buf_read!(inner);
+}
+
+impl<W: fmt::Debug> fmt::Debug for BufWriter<W> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("BufWriter")
+ .field("writer", &self.inner)
+ .field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity()))
+ .field("written", &self.written)
+ .finish()
+ }
+}
+
+impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
+ /// Seek to the offset, in bytes, in the underlying writer.
+ ///
+ /// Seeking always writes out the internal buffer before seeking.
+ fn poll_seek(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<u64>> {
+ ready!(self.as_mut().flush_buf(cx))?;
+ self.project().inner.poll_seek(cx, pos)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/chain.rs b/third_party/rust/futures-util/src/io/chain.rs
new file mode 100644
index 0000000000..728a3d2dc0
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/chain.rs
@@ -0,0 +1,142 @@
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut};
+use pin_project_lite::pin_project;
+use std::fmt;
+use std::io;
+use std::pin::Pin;
+
+pin_project! {
+ /// Reader for the [`chain`](super::AsyncReadExt::chain) method.
+ #[must_use = "readers do nothing unless polled"]
+ pub struct Chain<T, U> {
+ #[pin]
+ first: T,
+ #[pin]
+ second: U,
+ done_first: bool,
+ }
+}
+
+impl<T, U> Chain<T, U>
+where
+ T: AsyncRead,
+ U: AsyncRead,
+{
+ pub(super) fn new(first: T, second: U) -> Self {
+ Self { first, second, done_first: false }
+ }
+
+ /// Gets references to the underlying readers in this `Chain`.
+ pub fn get_ref(&self) -> (&T, &U) {
+ (&self.first, &self.second)
+ }
+
+ /// Gets mutable references to the underlying readers in this `Chain`.
+ ///
+ /// Care should be taken to avoid modifying the internal I/O state of the
+ /// underlying readers as doing so may corrupt the internal state of this
+ /// `Chain`.
+ pub fn get_mut(&mut self) -> (&mut T, &mut U) {
+ (&mut self.first, &mut self.second)
+ }
+
+ /// Gets pinned mutable references to the underlying readers in this `Chain`.
+ ///
+ /// Care should be taken to avoid modifying the internal I/O state of the
+ /// underlying readers as doing so may corrupt the internal state of this
+ /// `Chain`.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut T>, Pin<&mut U>) {
+ let this = self.project();
+ (this.first, this.second)
+ }
+
+ /// Consumes the `Chain`, returning the wrapped readers.
+ pub fn into_inner(self) -> (T, U) {
+ (self.first, self.second)
+ }
+}
+
+impl<T, U> fmt::Debug for Chain<T, U>
+where
+ T: fmt::Debug,
+ U: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Chain")
+ .field("t", &self.first)
+ .field("u", &self.second)
+ .field("done_first", &self.done_first)
+ .finish()
+ }
+}
+
+impl<T, U> AsyncRead for Chain<T, U>
+where
+ T: AsyncRead,
+ U: AsyncRead,
+{
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ let this = self.project();
+
+ if !*this.done_first {
+ match ready!(this.first.poll_read(cx, buf)?) {
+ 0 if !buf.is_empty() => *this.done_first = true,
+ n => return Poll::Ready(Ok(n)),
+ }
+ }
+ this.second.poll_read(cx, buf)
+ }
+
+ fn poll_read_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<io::Result<usize>> {
+ let this = self.project();
+
+ if !*this.done_first {
+ let n = ready!(this.first.poll_read_vectored(cx, bufs)?);
+ if n == 0 && bufs.iter().any(|b| !b.is_empty()) {
+ *this.done_first = true
+ } else {
+ return Poll::Ready(Ok(n));
+ }
+ }
+ this.second.poll_read_vectored(cx, bufs)
+ }
+}
+
+impl<T, U> AsyncBufRead for Chain<T, U>
+where
+ T: AsyncBufRead,
+ U: AsyncBufRead,
+{
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ let this = self.project();
+
+ if !*this.done_first {
+ match ready!(this.first.poll_fill_buf(cx)?) {
+ buf if buf.is_empty() => {
+ *this.done_first = true;
+ }
+ buf => return Poll::Ready(Ok(buf)),
+ }
+ }
+ this.second.poll_fill_buf(cx)
+ }
+
+ fn consume(self: Pin<&mut Self>, amt: usize) {
+ let this = self.project();
+
+ if !*this.done_first {
+ this.first.consume(amt)
+ } else {
+ this.second.consume(amt)
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/close.rs b/third_party/rust/futures-util/src/io/close.rs
new file mode 100644
index 0000000000..b94459279a
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/close.rs
@@ -0,0 +1,28 @@
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncWrite;
+use std::io;
+use std::pin::Pin;
+
+/// Future for the [`close`](super::AsyncWriteExt::close) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Close<'a, W: ?Sized> {
+ writer: &'a mut W,
+}
+
+impl<W: ?Sized + Unpin> Unpin for Close<'_, W> {}
+
+impl<'a, W: AsyncWrite + ?Sized + Unpin> Close<'a, W> {
+ pub(super) fn new(writer: &'a mut W) -> Self {
+ Self { writer }
+ }
+}
+
+impl<W: AsyncWrite + ?Sized + Unpin> Future for Close<'_, W> {
+ type Output = io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Pin::new(&mut *self.writer).poll_close(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/copy.rs b/third_party/rust/futures-util/src/io/copy.rs
new file mode 100644
index 0000000000..c80add271b
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/copy.rs
@@ -0,0 +1,58 @@
+use super::{copy_buf, BufReader, CopyBuf};
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncRead, AsyncWrite};
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+
+/// Creates a future which copies all the bytes from one object to another.
+///
+/// The returned future will copy all the bytes read from this `AsyncRead` into the
+/// `writer` specified. This future will only complete once the `reader` has hit
+/// EOF and all bytes have been written to and flushed from the `writer`
+/// provided.
+///
+/// On success the number of bytes is returned.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::io::{self, AsyncWriteExt, Cursor};
+///
+/// let reader = Cursor::new([1, 2, 3, 4]);
+/// let mut writer = Cursor::new(vec![0u8; 5]);
+///
+/// let bytes = io::copy(reader, &mut writer).await?;
+/// writer.close().await?;
+///
+/// assert_eq!(bytes, 4);
+/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
+/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+/// ```
+pub fn copy<R, W>(reader: R, writer: &mut W) -> Copy<'_, R, W>
+where
+ R: AsyncRead,
+ W: AsyncWrite + Unpin + ?Sized,
+{
+ Copy { inner: copy_buf(BufReader::new(reader), writer) }
+}
+
+pin_project! {
+ /// Future for the [`copy()`] function.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct Copy<'a, R, W: ?Sized> {
+ #[pin]
+ inner: CopyBuf<'a, BufReader<R>, W>,
+ }
+}
+
+impl<R: AsyncRead, W: AsyncWrite + Unpin + ?Sized> Future for Copy<'_, R, W> {
+ type Output = io::Result<u64>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.project().inner.poll(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/copy_buf.rs b/third_party/rust/futures-util/src/io/copy_buf.rs
new file mode 100644
index 0000000000..50f7abdca9
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/copy_buf.rs
@@ -0,0 +1,78 @@
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncBufRead, AsyncWrite};
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+
+/// Creates a future which copies all the bytes from one object to another.
+///
+/// The returned future will copy all the bytes read from this `AsyncBufRead` into the
+/// `writer` specified. This future will only complete once the `reader` has hit
+/// EOF and all bytes have been written to and flushed from the `writer`
+/// provided.
+///
+/// On success the number of bytes is returned.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::io::{self, AsyncWriteExt, Cursor};
+///
+/// let reader = Cursor::new([1, 2, 3, 4]);
+/// let mut writer = Cursor::new(vec![0u8; 5]);
+///
+/// let bytes = io::copy_buf(reader, &mut writer).await?;
+/// writer.close().await?;
+///
+/// assert_eq!(bytes, 4);
+/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
+/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+/// ```
+pub fn copy_buf<R, W>(reader: R, writer: &mut W) -> CopyBuf<'_, R, W>
+where
+ R: AsyncBufRead,
+ W: AsyncWrite + Unpin + ?Sized,
+{
+ CopyBuf { reader, writer, amt: 0 }
+}
+
+pin_project! {
+ /// Future for the [`copy_buf()`] function.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct CopyBuf<'a, R, W: ?Sized> {
+ #[pin]
+ reader: R,
+ writer: &'a mut W,
+ amt: u64,
+ }
+}
+
+impl<R, W> Future for CopyBuf<'_, R, W>
+where
+ R: AsyncBufRead,
+ W: AsyncWrite + Unpin + ?Sized,
+{
+ type Output = io::Result<u64>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+ loop {
+ let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
+ if buffer.is_empty() {
+ ready!(Pin::new(&mut this.writer).poll_flush(cx))?;
+ return Poll::Ready(Ok(*this.amt));
+ }
+
+ let i = ready!(Pin::new(&mut this.writer).poll_write(cx, buffer))?;
+ if i == 0 {
+ return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
+ }
+ *this.amt += i as u64;
+ this.reader.as_mut().consume(i);
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/copy_buf_abortable.rs b/third_party/rust/futures-util/src/io/copy_buf_abortable.rs
new file mode 100644
index 0000000000..fdbc4a5f00
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/copy_buf_abortable.rs
@@ -0,0 +1,124 @@
+use crate::abortable::{AbortHandle, AbortInner, Aborted};
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncBufRead, AsyncWrite};
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+
+/// Creates a future which copies all the bytes from one object to another, with its `AbortHandle`.
+///
+/// The returned future will copy all the bytes read from this `AsyncBufRead` into the
+/// `writer` specified. This future will only complete once abort has been requested or the `reader` has hit
+/// EOF and all bytes have been written to and flushed from the `writer`
+/// provided.
+///
+/// On success the number of bytes is returned. If aborted, `Aborted` is returned. Otherwise, the underlying error is returned.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::io::{self, AsyncWriteExt, Cursor};
+/// use futures::future::Aborted;
+///
+/// let reader = Cursor::new([1, 2, 3, 4]);
+/// let mut writer = Cursor::new(vec![0u8; 5]);
+///
+/// let (fut, abort_handle) = io::copy_buf_abortable(reader, &mut writer);
+/// let bytes = fut.await;
+/// abort_handle.abort();
+/// writer.close().await.unwrap();
+/// match bytes {
+/// Ok(Ok(n)) => {
+/// assert_eq!(n, 4);
+/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
+/// Ok(n)
+/// },
+/// Ok(Err(a)) => {
+/// Err::<u64, Aborted>(a)
+/// }
+/// Err(e) => panic!("{}", e)
+/// }
+/// # }).unwrap();
+/// ```
+pub fn copy_buf_abortable<R, W>(
+ reader: R,
+ writer: &mut W,
+) -> (CopyBufAbortable<'_, R, W>, AbortHandle)
+where
+ R: AsyncBufRead,
+ W: AsyncWrite + Unpin + ?Sized,
+{
+ let (handle, reg) = AbortHandle::new_pair();
+ (CopyBufAbortable { reader, writer, amt: 0, inner: reg.inner }, handle)
+}
+
+pin_project! {
+ /// Future for the [`copy_buf()`] function.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct CopyBufAbortable<'a, R, W: ?Sized> {
+ #[pin]
+ reader: R,
+ writer: &'a mut W,
+ amt: u64,
+ inner: Arc<AbortInner>
+ }
+}
+
+macro_rules! ready_or_break {
+ ($e:expr $(,)?) => {
+ match $e {
+ $crate::task::Poll::Ready(t) => t,
+ $crate::task::Poll::Pending => break,
+ }
+ };
+}
+
+impl<R, W> Future for CopyBufAbortable<'_, R, W>
+where
+ R: AsyncBufRead,
+ W: AsyncWrite + Unpin + Sized,
+{
+ type Output = Result<Result<u64, Aborted>, io::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+ loop {
+ // Check if the task has been aborted
+ if this.inner.aborted.load(Ordering::Relaxed) {
+ return Poll::Ready(Ok(Err(Aborted)));
+ }
+
+ // Read some bytes from the reader, and if we have reached EOF, return total bytes read
+ let buffer = ready_or_break!(this.reader.as_mut().poll_fill_buf(cx))?;
+ if buffer.is_empty() {
+ ready_or_break!(Pin::new(&mut this.writer).poll_flush(cx))?;
+ return Poll::Ready(Ok(Ok(*this.amt)));
+ }
+
+ // Pass the buffer to the writer, and update the amount written
+ let i = ready_or_break!(Pin::new(&mut this.writer).poll_write(cx, buffer))?;
+ if i == 0 {
+ return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
+ }
+ *this.amt += i as u64;
+ this.reader.as_mut().consume(i);
+ }
+ // Schedule the task to be woken up again.
+ // Never called unless Poll::Pending is returned from io objects.
+ this.inner.waker.register(cx.waker());
+
+ // Check to see if the task was aborted between the first check and
+ // registration.
+ // Checking with `Relaxed` is sufficient because
+ // `register` introduces an `AcqRel` barrier.
+ if this.inner.aborted.load(Ordering::Relaxed) {
+ return Poll::Ready(Ok(Err(Aborted)));
+ }
+ Poll::Pending
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/cursor.rs b/third_party/rust/futures-util/src/io/cursor.rs
new file mode 100644
index 0000000000..c6e2aeea28
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/cursor.rs
@@ -0,0 +1,232 @@
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom};
+use std::io;
+use std::pin::Pin;
+
+/// A `Cursor` wraps an in-memory buffer and provides it with a
+/// [`AsyncSeek`] implementation.
+///
+/// `Cursor`s are used with in-memory buffers, anything implementing
+/// `AsRef<[u8]>`, to allow them to implement [`AsyncRead`] and/or [`AsyncWrite`],
+/// allowing these buffers to be used anywhere you might use a reader or writer
+/// that does actual I/O.
+///
+/// This library implements some I/O traits on various types which
+/// are commonly used as a buffer, like `Cursor<`[`Vec`]`<u8>>` and
+/// `Cursor<`[`&[u8]`][bytes]`>`.
+///
+/// [`AsyncSeek`]: trait.AsyncSeek.html
+/// [`AsyncRead`]: trait.AsyncRead.html
+/// [`AsyncWrite`]: trait.AsyncWrite.html
+/// [bytes]: https://doc.rust-lang.org/std/primitive.slice.html
+#[derive(Clone, Debug, Default)]
+pub struct Cursor<T> {
+ inner: io::Cursor<T>,
+}
+
+impl<T> Cursor<T> {
+ /// Creates a new cursor wrapping the provided underlying in-memory buffer.
+ ///
+ /// Cursor initial position is `0` even if underlying buffer (e.g., `Vec`)
+ /// is not empty. So writing to cursor starts with overwriting `Vec`
+ /// content, not with appending to it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::io::Cursor;
+ ///
+ /// let buff = Cursor::new(Vec::new());
+ /// # fn force_inference(_: &Cursor<Vec<u8>>) {}
+ /// # force_inference(&buff);
+ /// ```
+ pub fn new(inner: T) -> Self {
+ Self { inner: io::Cursor::new(inner) }
+ }
+
+ /// Consumes this cursor, returning the underlying value.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::io::Cursor;
+ ///
+ /// let buff = Cursor::new(Vec::new());
+ /// # fn force_inference(_: &Cursor<Vec<u8>>) {}
+ /// # force_inference(&buff);
+ ///
+ /// let vec = buff.into_inner();
+ /// ```
+ pub fn into_inner(self) -> T {
+ self.inner.into_inner()
+ }
+
+ /// Gets a reference to the underlying value in this cursor.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::io::Cursor;
+ ///
+ /// let buff = Cursor::new(Vec::new());
+ /// # fn force_inference(_: &Cursor<Vec<u8>>) {}
+ /// # force_inference(&buff);
+ ///
+ /// let reference = buff.get_ref();
+ /// ```
+ pub fn get_ref(&self) -> &T {
+ self.inner.get_ref()
+ }
+
+ /// Gets a mutable reference to the underlying value in this cursor.
+ ///
+ /// Care should be taken to avoid modifying the internal I/O state of the
+ /// underlying value as it may corrupt this cursor's position.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::io::Cursor;
+ ///
+ /// let mut buff = Cursor::new(Vec::new());
+ /// # fn force_inference(_: &Cursor<Vec<u8>>) {}
+ /// # force_inference(&buff);
+ ///
+ /// let reference = buff.get_mut();
+ /// ```
+ pub fn get_mut(&mut self) -> &mut T {
+ self.inner.get_mut()
+ }
+
+ /// Returns the current position of this cursor.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncSeekExt, Cursor, SeekFrom};
+ ///
+ /// let mut buff = Cursor::new(vec![1, 2, 3, 4, 5]);
+ ///
+ /// assert_eq!(buff.position(), 0);
+ ///
+ /// buff.seek(SeekFrom::Current(2)).await?;
+ /// assert_eq!(buff.position(), 2);
+ ///
+ /// buff.seek(SeekFrom::Current(-1)).await?;
+ /// assert_eq!(buff.position(), 1);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ pub fn position(&self) -> u64 {
+ self.inner.position()
+ }
+
+ /// Sets the position of this cursor.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::io::Cursor;
+ ///
+ /// let mut buff = Cursor::new(vec![1, 2, 3, 4, 5]);
+ ///
+ /// assert_eq!(buff.position(), 0);
+ ///
+ /// buff.set_position(2);
+ /// assert_eq!(buff.position(), 2);
+ ///
+ /// buff.set_position(4);
+ /// assert_eq!(buff.position(), 4);
+ /// ```
+ pub fn set_position(&mut self, pos: u64) {
+ self.inner.set_position(pos)
+ }
+}
+
+impl<T> AsyncSeek for Cursor<T>
+where
+ T: AsRef<[u8]> + Unpin,
+{
+ fn poll_seek(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<u64>> {
+ Poll::Ready(io::Seek::seek(&mut self.inner, pos))
+ }
+}
+
+impl<T: AsRef<[u8]> + Unpin> AsyncRead for Cursor<T> {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(io::Read::read(&mut self.inner, buf))
+ }
+
+ fn poll_read_vectored(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(io::Read::read_vectored(&mut self.inner, bufs))
+ }
+}
+
+impl<T> AsyncBufRead for Cursor<T>
+where
+ T: AsRef<[u8]> + Unpin,
+{
+ fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ Poll::Ready(io::BufRead::fill_buf(&mut self.get_mut().inner))
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ io::BufRead::consume(&mut self.inner, amt)
+ }
+}
+
+macro_rules! delegate_async_write_to_stdio {
+ () => {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(io::Write::write(&mut self.inner, buf))
+ }
+
+ fn poll_write_vectored(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(io::Write::write_vectored(&mut self.inner, bufs))
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(io::Write::flush(&mut self.inner))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.poll_flush(cx)
+ }
+ };
+}
+
+impl AsyncWrite for Cursor<&mut [u8]> {
+ delegate_async_write_to_stdio!();
+}
+
+impl AsyncWrite for Cursor<&mut Vec<u8>> {
+ delegate_async_write_to_stdio!();
+}
+
+impl AsyncWrite for Cursor<Vec<u8>> {
+ delegate_async_write_to_stdio!();
+}
+
+impl AsyncWrite for Cursor<Box<[u8]>> {
+ delegate_async_write_to_stdio!();
+}
diff --git a/third_party/rust/futures-util/src/io/empty.rs b/third_party/rust/futures-util/src/io/empty.rs
new file mode 100644
index 0000000000..02f6103f54
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/empty.rs
@@ -0,0 +1,59 @@
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncBufRead, AsyncRead};
+use std::fmt;
+use std::io;
+use std::pin::Pin;
+
+/// Reader for the [`empty()`] function.
+#[must_use = "readers do nothing unless polled"]
+pub struct Empty {
+ _priv: (),
+}
+
+/// Constructs a new handle to an empty reader.
+///
+/// All reads from the returned reader will return `Poll::Ready(Ok(0))`.
+///
+/// # Examples
+///
+/// A slightly sad example of not reading anything into a buffer:
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::io::{self, AsyncReadExt};
+///
+/// let mut buffer = String::new();
+/// let mut reader = io::empty();
+/// reader.read_to_string(&mut buffer).await?;
+/// assert!(buffer.is_empty());
+/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+/// ```
+pub fn empty() -> Empty {
+ Empty { _priv: () }
+}
+
+impl AsyncRead for Empty {
+ #[inline]
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ _: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(Ok(0))
+ }
+}
+
+impl AsyncBufRead for Empty {
+ #[inline]
+ fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ Poll::Ready(Ok(&[]))
+ }
+ #[inline]
+ fn consume(self: Pin<&mut Self>, _: usize) {}
+}
+
+impl fmt::Debug for Empty {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Empty { .. }")
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/fill_buf.rs b/third_party/rust/futures-util/src/io/fill_buf.rs
new file mode 100644
index 0000000000..a1484c0322
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/fill_buf.rs
@@ -0,0 +1,51 @@
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncBufRead;
+use std::io;
+use std::pin::Pin;
+
+/// Future for the [`fill_buf`](super::AsyncBufReadExt::fill_buf) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct FillBuf<'a, R: ?Sized> {
+ reader: Option<&'a mut R>,
+}
+
+impl<R: ?Sized> Unpin for FillBuf<'_, R> {}
+
+impl<'a, R: AsyncBufRead + ?Sized + Unpin> FillBuf<'a, R> {
+ pub(super) fn new(reader: &'a mut R) -> Self {
+ Self { reader: Some(reader) }
+ }
+}
+
+impl<'a, R> Future for FillBuf<'a, R>
+where
+ R: AsyncBufRead + ?Sized + Unpin,
+{
+ type Output = io::Result<&'a [u8]>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ let reader = this.reader.take().expect("Polled FillBuf after completion");
+
+ match Pin::new(&mut *reader).poll_fill_buf(cx) {
+ // With polonius it is possible to remove this inner match and just have the correct
+ // lifetime of the reference inferred based on which branch is taken
+ Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) {
+ Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)),
+ Poll::Ready(Err(err)) => {
+ unreachable!("reader indicated readiness but then returned an error: {:?}", err)
+ }
+ Poll::Pending => {
+ unreachable!("reader indicated readiness but then returned pending")
+ }
+ },
+ Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
+ Poll::Pending => {
+ this.reader = Some(reader);
+ Poll::Pending
+ }
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/flush.rs b/third_party/rust/futures-util/src/io/flush.rs
new file mode 100644
index 0000000000..b75d14c5d3
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/flush.rs
@@ -0,0 +1,31 @@
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncWrite;
+use std::io;
+use std::pin::Pin;
+
+/// Future for the [`flush`](super::AsyncWriteExt::flush) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Flush<'a, W: ?Sized> {
+ writer: &'a mut W,
+}
+
+impl<W: ?Sized + Unpin> Unpin for Flush<'_, W> {}
+
+impl<'a, W: AsyncWrite + ?Sized + Unpin> Flush<'a, W> {
+ pub(super) fn new(writer: &'a mut W) -> Self {
+ Self { writer }
+ }
+}
+
+impl<W> Future for Flush<'_, W>
+where
+ W: AsyncWrite + ?Sized + Unpin,
+{
+ type Output = io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Pin::new(&mut *self.writer).poll_flush(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/into_sink.rs b/third_party/rust/futures-util/src/io/into_sink.rs
new file mode 100644
index 0000000000..6a41ee2269
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/into_sink.rs
@@ -0,0 +1,82 @@
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncWrite;
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+
+#[derive(Debug)]
+struct Block<Item> {
+ offset: usize,
+ bytes: Item,
+}
+
+pin_project! {
+ /// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method.
+ #[must_use = "sinks do nothing unless polled"]
+ #[derive(Debug)]
+ #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
+ pub struct IntoSink<W, Item> {
+ #[pin]
+ writer: W,
+ // An outstanding block for us to push into the underlying writer, along with an offset of how
+ // far into this block we have written already.
+ buffer: Option<Block<Item>>,
+ }
+}
+
+impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> {
+ pub(super) fn new(writer: W) -> Self {
+ Self { writer, buffer: None }
+ }
+
+ /// If we have an outstanding block in `buffer` attempt to push it into the writer, does _not_
+ /// flush the writer after it succeeds in pushing the block into it.
+ fn poll_flush_buffer(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), io::Error>> {
+ let mut this = self.project();
+
+ if let Some(buffer) = this.buffer {
+ loop {
+ let bytes = buffer.bytes.as_ref();
+ let written = ready!(this.writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?;
+ buffer.offset += written;
+ if buffer.offset == bytes.len() {
+ break;
+ }
+ }
+ }
+ *this.buffer = None;
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl<W: AsyncWrite, Item: AsRef<[u8]>> Sink<Item> for IntoSink<W, Item> {
+ type Error = io::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ ready!(self.poll_flush_buffer(cx))?;
+ Poll::Ready(Ok(()))
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+ debug_assert!(self.buffer.is_none());
+ *self.project().buffer = Some(Block { offset: 0, bytes: item });
+ Ok(())
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ ready!(self.as_mut().poll_flush_buffer(cx))?;
+ ready!(self.project().writer.poll_flush(cx))?;
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ ready!(self.as_mut().poll_flush_buffer(cx))?;
+ ready!(self.project().writer.poll_close(cx))?;
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/line_writer.rs b/third_party/rust/futures-util/src/io/line_writer.rs
new file mode 100644
index 0000000000..71cd668325
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/line_writer.rs
@@ -0,0 +1,155 @@
+use super::buf_writer::BufWriter;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncWrite;
+use futures_io::IoSlice;
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+
+pin_project! {
+/// Wrap a writer, like [`BufWriter`] does, but prioritizes buffering lines
+///
+/// This was written based on `std::io::LineWriter` which goes into further details
+/// explaining the code.
+///
+/// Buffering is actually done using `BufWriter`. This class will leverage `BufWriter`
+/// to write on-each-line.
+#[derive(Debug)]
+pub struct LineWriter<W: AsyncWrite> {
+ #[pin]
+ buf_writer: BufWriter<W>,
+}
+}
+
+impl<W: AsyncWrite> LineWriter<W> {
+ /// Create a new `LineWriter` with default buffer capacity. The default is currently 1KB
+ /// which was taken from `std::io::LineWriter`
+ pub fn new(inner: W) -> LineWriter<W> {
+ LineWriter::with_capacity(1024, inner)
+ }
+
+ /// Creates a new `LineWriter` with the specified buffer capacity.
+ pub fn with_capacity(capacity: usize, inner: W) -> LineWriter<W> {
+ LineWriter { buf_writer: BufWriter::with_capacity(capacity, inner) }
+ }
+
+ /// Flush `buf_writer` if last char is "new line"
+ fn flush_if_completed_line(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let this = self.project();
+ match this.buf_writer.buffer().last().copied() {
+ Some(b'\n') => this.buf_writer.flush_buf(cx),
+ _ => Poll::Ready(Ok(())),
+ }
+ }
+
+ /// Returns a reference to `buf_writer`'s internally buffered data.
+ pub fn buffer(&self) -> &[u8] {
+ self.buf_writer.buffer()
+ }
+
+ /// Acquires a reference to the underlying sink or stream that this combinator is
+ /// pulling from.
+ pub fn get_ref(&self) -> &W {
+ self.buf_writer.get_ref()
+ }
+}
+
+impl<W: AsyncWrite> AsyncWrite for LineWriter<W> {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ let mut this = self.as_mut().project();
+ let newline_index = match memchr::memrchr(b'\n', buf) {
+ None => {
+ ready!(self.as_mut().flush_if_completed_line(cx)?);
+ return self.project().buf_writer.poll_write(cx, buf);
+ }
+ Some(newline_index) => newline_index + 1,
+ };
+
+ ready!(this.buf_writer.as_mut().poll_flush(cx)?);
+
+ let lines = &buf[..newline_index];
+
+ let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write(cx, lines))? };
+
+ if flushed == 0 {
+ return Poll::Ready(Ok(0));
+ }
+
+ let tail = if flushed >= newline_index {
+ &buf[flushed..]
+ } else if newline_index - flushed <= this.buf_writer.capacity() {
+ &buf[flushed..newline_index]
+ } else {
+ let scan_area = &buf[flushed..];
+ let scan_area = &scan_area[..this.buf_writer.capacity()];
+ match memchr::memrchr(b'\n', scan_area) {
+ Some(newline_index) => &scan_area[..newline_index + 1],
+ None => scan_area,
+ }
+ };
+
+ let buffered = this.buf_writer.as_mut().write_to_buf(tail);
+ Poll::Ready(Ok(flushed + buffered))
+ }
+
+ fn poll_write_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ let mut this = self.as_mut().project();
+ // `is_write_vectored()` is handled in original code, but not in this crate
+ // see https://github.com/rust-lang/rust/issues/70436
+
+ let last_newline_buf_idx = bufs
+ .iter()
+ .enumerate()
+ .rev()
+ .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i));
+ let last_newline_buf_idx = match last_newline_buf_idx {
+ None => {
+ ready!(self.as_mut().flush_if_completed_line(cx)?);
+ return self.project().buf_writer.poll_write_vectored(cx, bufs);
+ }
+ Some(i) => i,
+ };
+
+ ready!(this.buf_writer.as_mut().poll_flush(cx)?);
+
+ let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1);
+
+ let flushed = { ready!(this.buf_writer.as_mut().inner_poll_write_vectored(cx, lines))? };
+ if flushed == 0 {
+ return Poll::Ready(Ok(0));
+ }
+
+ let lines_len = lines.iter().map(|buf| buf.len()).sum();
+ if flushed < lines_len {
+ return Poll::Ready(Ok(flushed));
+ }
+
+ let buffered: usize = tail
+ .iter()
+ .filter(|buf| !buf.is_empty())
+ .map(|buf| this.buf_writer.as_mut().write_to_buf(buf))
+ .take_while(|&n| n > 0)
+ .sum();
+
+ Poll::Ready(Ok(flushed + buffered))
+ }
+
+ /// Forward to `buf_writer` 's `BufWriter::poll_flush()`
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.as_mut().project().buf_writer.poll_flush(cx)
+ }
+
+ /// Forward to `buf_writer` 's `BufWriter::poll_close()`
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.as_mut().project().buf_writer.poll_close(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/lines.rs b/third_party/rust/futures-util/src/io/lines.rs
new file mode 100644
index 0000000000..b5561bfa7d
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/lines.rs
@@ -0,0 +1,47 @@
+use super::read_line::read_line_internal;
+use futures_core::ready;
+use futures_core::stream::Stream;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncBufRead;
+use pin_project_lite::pin_project;
+use std::io;
+use std::mem;
+use std::pin::Pin;
+
+pin_project! {
+ /// Stream for the [`lines`](super::AsyncBufReadExt::lines) method.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct Lines<R> {
+ #[pin]
+ reader: R,
+ buf: String,
+ bytes: Vec<u8>,
+ read: usize,
+ }
+}
+
+impl<R: AsyncBufRead> Lines<R> {
+ pub(super) fn new(reader: R) -> Self {
+ Self { reader, buf: String::new(), bytes: Vec::new(), read: 0 }
+ }
+}
+
+impl<R: AsyncBufRead> Stream for Lines<R> {
+ type Item = io::Result<String>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let this = self.project();
+ let n = ready!(read_line_internal(this.reader, cx, this.buf, this.bytes, this.read))?;
+ if n == 0 && this.buf.is_empty() {
+ return Poll::Ready(None);
+ }
+ if this.buf.ends_with('\n') {
+ this.buf.pop();
+ if this.buf.ends_with('\r') {
+ this.buf.pop();
+ }
+ }
+ Poll::Ready(Some(Ok(mem::take(this.buf))))
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/mod.rs b/third_party/rust/futures-util/src/io/mod.rs
new file mode 100644
index 0000000000..8ce3ad644b
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/mod.rs
@@ -0,0 +1,841 @@
+//! Asynchronous I/O.
+//!
+//! This module is the asynchronous version of `std::io`. It defines four
+//! traits, [`AsyncRead`], [`AsyncWrite`], [`AsyncSeek`], and [`AsyncBufRead`],
+//! which mirror the `Read`, `Write`, `Seek`, and `BufRead` traits of the
+//! standard library. However, these traits integrate with the asynchronous
+//! task system, so that if an I/O object isn't ready for reading (or writing),
+//! the thread is not blocked, and instead the current task is queued to be
+//! woken when I/O is ready.
+//!
+//! In addition, the [`AsyncReadExt`], [`AsyncWriteExt`], [`AsyncSeekExt`], and
+//! [`AsyncBufReadExt`] extension traits offer a variety of useful combinators
+//! for operating with asynchronous I/O objects, including ways to work with
+//! them using futures, streams and sinks.
+//!
+//! This module is only available when the `std` feature of this
+//! library is activated, and it is activated by default.
+
+#[cfg(feature = "io-compat")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
+use crate::compat::Compat;
+use crate::future::assert_future;
+use crate::stream::assert_stream;
+use std::{pin::Pin, ptr};
+
+// Re-export some types from `std::io` so that users don't have to deal
+// with conflicts when `use`ing `futures::io` and `std::io`.
+#[doc(no_inline)]
+pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
+
+pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
+
+// used by `BufReader` and `BufWriter`
+// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
+const DEFAULT_BUF_SIZE: usize = 8 * 1024;
+
+/// Initializes a buffer if necessary.
+///
+/// A buffer is currently always initialized.
+#[inline]
+unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) {
+ ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len())
+}
+
+mod allow_std;
+pub use self::allow_std::AllowStdIo;
+
+mod buf_reader;
+pub use self::buf_reader::{BufReader, SeeKRelative};
+
+mod buf_writer;
+pub use self::buf_writer::BufWriter;
+
+mod line_writer;
+pub use self::line_writer::LineWriter;
+
+mod chain;
+pub use self::chain::Chain;
+
+mod close;
+pub use self::close::Close;
+
+mod copy;
+pub use self::copy::{copy, Copy};
+
+mod copy_buf;
+pub use self::copy_buf::{copy_buf, CopyBuf};
+
+mod copy_buf_abortable;
+pub use self::copy_buf_abortable::{copy_buf_abortable, CopyBufAbortable};
+
+mod cursor;
+pub use self::cursor::Cursor;
+
+mod empty;
+pub use self::empty::{empty, Empty};
+
+mod fill_buf;
+pub use self::fill_buf::FillBuf;
+
+mod flush;
+pub use self::flush::Flush;
+
+#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
+mod into_sink;
+#[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
+pub use self::into_sink::IntoSink;
+
+mod lines;
+pub use self::lines::Lines;
+
+mod read;
+pub use self::read::Read;
+
+mod read_vectored;
+pub use self::read_vectored::ReadVectored;
+
+mod read_exact;
+pub use self::read_exact::ReadExact;
+
+mod read_line;
+pub use self::read_line::ReadLine;
+
+mod read_to_end;
+pub use self::read_to_end::ReadToEnd;
+
+mod read_to_string;
+pub use self::read_to_string::ReadToString;
+
+mod read_until;
+pub use self::read_until::ReadUntil;
+
+mod repeat;
+pub use self::repeat::{repeat, Repeat};
+
+mod seek;
+pub use self::seek::Seek;
+
+mod sink;
+pub use self::sink::{sink, Sink};
+
+mod split;
+pub use self::split::{ReadHalf, ReuniteError, WriteHalf};
+
+mod take;
+pub use self::take::Take;
+
+mod window;
+pub use self::window::Window;
+
+mod write;
+pub use self::write::Write;
+
+mod write_vectored;
+pub use self::write_vectored::WriteVectored;
+
+mod write_all;
+pub use self::write_all::WriteAll;
+
+#[cfg(feature = "write-all-vectored")]
+mod write_all_vectored;
+#[cfg(feature = "write-all-vectored")]
+pub use self::write_all_vectored::WriteAllVectored;
+
+/// An extension trait which adds utility methods to `AsyncRead` types.
+pub trait AsyncReadExt: AsyncRead {
+ /// Creates an adaptor which will chain this stream with another.
+ ///
+ /// The returned `AsyncRead` instance will first read all bytes from this object
+ /// until EOF is encountered. Afterwards the output is equivalent to the
+ /// output of `next`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let reader1 = Cursor::new([1, 2, 3, 4]);
+ /// let reader2 = Cursor::new([5, 6, 7, 8]);
+ ///
+ /// let mut reader = reader1.chain(reader2);
+ /// let mut buffer = Vec::new();
+ ///
+ /// // read the value into a Vec.
+ /// reader.read_to_end(&mut buffer).await?;
+ /// assert_eq!(buffer, [1, 2, 3, 4, 5, 6, 7, 8]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn chain<R>(self, next: R) -> Chain<Self, R>
+ where
+ Self: Sized,
+ R: AsyncRead,
+ {
+ assert_read(Chain::new(self, next))
+ }
+
+ /// Tries to read some bytes directly into the given `buf` in asynchronous
+ /// manner, returning a future type.
+ ///
+ /// The returned future will resolve to the number of bytes read once the read
+ /// operation is completed.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let mut reader = Cursor::new([1, 2, 3, 4]);
+ /// let mut output = [0u8; 5];
+ ///
+ /// let bytes = reader.read(&mut output[..]).await?;
+ ///
+ /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous
+ /// // reader. In a real system you could get anywhere from 1 to
+ /// // `output.len()` bytes in a single read.
+ /// assert_eq!(bytes, 4);
+ /// assert_eq!(output, [1, 2, 3, 4, 0]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<usize>, _>(Read::new(self, buf))
+ }
+
+ /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored
+ /// IO operations.
+ ///
+ /// The returned future will resolve to the number of bytes read once the read
+ /// operation is completed.
+ fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<usize>, _>(ReadVectored::new(self, bufs))
+ }
+
+ /// Creates a future which will read exactly enough bytes to fill `buf`,
+ /// returning an error if end of file (EOF) is hit sooner.
+ ///
+ /// The returned future will resolve once the read operation is completed.
+ ///
+ /// In the case of an error the buffer and the object will be discarded, with
+ /// the error yielded.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let mut reader = Cursor::new([1, 2, 3, 4]);
+ /// let mut output = [0u8; 4];
+ ///
+ /// reader.read_exact(&mut output).await?;
+ ///
+ /// assert_eq!(output, [1, 2, 3, 4]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ ///
+ /// ## EOF is hit before `buf` is filled
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{self, AsyncReadExt, Cursor};
+ ///
+ /// let mut reader = Cursor::new([1, 2, 3, 4]);
+ /// let mut output = [0u8; 5];
+ ///
+ /// let result = reader.read_exact(&mut output).await;
+ ///
+ /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
+ /// # });
+ /// ```
+ fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<()>, _>(ReadExact::new(self, buf))
+ }
+
+ /// Creates a future which will read all the bytes from this `AsyncRead`.
+ ///
+ /// On success the total number of bytes read is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let mut reader = Cursor::new([1, 2, 3, 4]);
+ /// let mut output = Vec::with_capacity(4);
+ ///
+ /// let bytes = reader.read_to_end(&mut output).await?;
+ ///
+ /// assert_eq!(bytes, 4);
+ /// assert_eq!(output, vec![1, 2, 3, 4]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<usize>, _>(ReadToEnd::new(self, buf))
+ }
+
+ /// Creates a future which will read all the bytes from this `AsyncRead`.
+ ///
+ /// On success the total number of bytes read is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let mut reader = Cursor::new(&b"1234"[..]);
+ /// let mut buffer = String::with_capacity(4);
+ ///
+ /// let bytes = reader.read_to_string(&mut buffer).await?;
+ ///
+ /// assert_eq!(bytes, 4);
+ /// assert_eq!(buffer, String::from("1234"));
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToString<'a, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<usize>, _>(ReadToString::new(self, buf))
+ }
+
+ /// Helper method for splitting this read/write object into two halves.
+ ///
+ /// The two halves returned implement the `AsyncRead` and `AsyncWrite`
+ /// traits, respectively.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{self, AsyncReadExt, Cursor};
+ ///
+ /// // Note that for `Cursor` the read and write halves share a single
+ /// // seek position. This may or may not be true for other types that
+ /// // implement both `AsyncRead` and `AsyncWrite`.
+ ///
+ /// let reader = Cursor::new([1, 2, 3, 4]);
+ /// let mut buffer = Cursor::new(vec![0, 0, 0, 0, 5, 6, 7, 8]);
+ /// let mut writer = Cursor::new(vec![0u8; 5]);
+ ///
+ /// {
+ /// let (buffer_reader, mut buffer_writer) = (&mut buffer).split();
+ /// io::copy(reader, &mut buffer_writer).await?;
+ /// io::copy(buffer_reader, &mut writer).await?;
+ /// }
+ ///
+ /// assert_eq!(buffer.into_inner(), [1, 2, 3, 4, 5, 6, 7, 8]);
+ /// assert_eq!(writer.into_inner(), [5, 6, 7, 8, 0]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
+ where
+ Self: AsyncWrite + Sized,
+ {
+ let (r, w) = split::split(self);
+ (assert_read(r), assert_write(w))
+ }
+
+ /// Creates an AsyncRead adapter which will read at most `limit` bytes
+ /// from the underlying reader.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let reader = Cursor::new(&b"12345678"[..]);
+ /// let mut buffer = [0; 5];
+ ///
+ /// let mut take = reader.take(4);
+ /// let n = take.read(&mut buffer).await?;
+ ///
+ /// assert_eq!(n, 4);
+ /// assert_eq!(&buffer, b"1234\0");
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn take(self, limit: u64) -> Take<Self>
+ where
+ Self: Sized,
+ {
+ assert_read(Take::new(self, limit))
+ }
+
+ /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be
+ /// used as a futures 0.1 / tokio-io 0.1 `AsyncRead`. If the wrapped type
+ /// implements [`AsyncWrite`] as well, the result will also implement the
+ /// futures 0.1 / tokio 0.1 `AsyncWrite` trait.
+ ///
+ /// Requires the `io-compat` feature to enable.
+ #[cfg(feature = "io-compat")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
+ fn compat(self) -> Compat<Self>
+ where
+ Self: Sized + Unpin,
+ {
+ Compat::new(self)
+ }
+}
+
+impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
+
+/// An extension trait which adds utility methods to `AsyncWrite` types.
+pub trait AsyncWriteExt: AsyncWrite {
+ /// Creates a future which will entirely flush this `AsyncWrite`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AllowStdIo, AsyncWriteExt};
+ /// use std::io::{BufWriter, Cursor};
+ ///
+ /// let mut output = vec![0u8; 5];
+ ///
+ /// {
+ /// let writer = Cursor::new(&mut output);
+ /// let mut buffered = AllowStdIo::new(BufWriter::new(writer));
+ /// buffered.write_all(&[1, 2]).await?;
+ /// buffered.write_all(&[3, 4]).await?;
+ /// buffered.flush().await?;
+ /// }
+ ///
+ /// assert_eq!(output, [1, 2, 3, 4, 0]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn flush(&mut self) -> Flush<'_, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<()>, _>(Flush::new(self))
+ }
+
+ /// Creates a future which will entirely close this `AsyncWrite`.
+ fn close(&mut self) -> Close<'_, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<()>, _>(Close::new(self))
+ }
+
+ /// Creates a future which will write bytes from `buf` into the object.
+ ///
+ /// The returned future will resolve to the number of bytes written once the write
+ /// operation is completed.
+ fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<usize>, _>(Write::new(self, buf))
+ }
+
+ /// Creates a future which will write bytes from `bufs` into the object using vectored
+ /// IO operations.
+ ///
+ /// The returned future will resolve to the number of bytes written once the write
+ /// operation is completed.
+ fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<usize>, _>(WriteVectored::new(self, bufs))
+ }
+
+ /// Write data into this object.
+ ///
+ /// Creates a future that will write the entire contents of the buffer `buf` into
+ /// this `AsyncWrite`.
+ ///
+ /// The returned future will not complete until all the data has been written.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncWriteExt, Cursor};
+ ///
+ /// let mut writer = Cursor::new(vec![0u8; 5]);
+ ///
+ /// writer.write_all(&[1, 2, 3, 4]).await?;
+ ///
+ /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<()>, _>(WriteAll::new(self, buf))
+ }
+
+ /// Attempts to write multiple buffers into this writer.
+ ///
+ /// Creates a future that will write the entire contents of `bufs` into this
+ /// `AsyncWrite` using [vectored writes].
+ ///
+ /// The returned future will not complete until all the data has been
+ /// written.
+ ///
+ /// [vectored writes]: std::io::Write::write_vectored
+ ///
+ /// # Notes
+ ///
+ /// Unlike `io::Write::write_vectored`, this takes a *mutable* reference to
+ /// a slice of `IoSlice`s, not an immutable one. That's because we need to
+ /// modify the slice to keep track of the bytes already written.
+ ///
+ /// Once this futures returns, the contents of `bufs` are unspecified, as
+ /// this depends on how many calls to `write_vectored` were necessary. It is
+ /// best to understand this function as taking ownership of `bufs` and to
+ /// not use `bufs` afterwards. The underlying buffers, to which the
+ /// `IoSlice`s point (but not the `IoSlice`s themselves), are unchanged and
+ /// can be reused.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::AsyncWriteExt;
+ /// use futures_util::io::Cursor;
+ /// use std::io::IoSlice;
+ ///
+ /// let mut writer = Cursor::new(Vec::new());
+ /// let bufs = &mut [
+ /// IoSlice::new(&[1]),
+ /// IoSlice::new(&[2, 3]),
+ /// IoSlice::new(&[4, 5, 6]),
+ /// ];
+ ///
+ /// writer.write_all_vectored(bufs).await?;
+ /// // Note: the contents of `bufs` is now unspecified, see the Notes section.
+ ///
+ /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4, 5, 6]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ #[cfg(feature = "write-all-vectored")]
+ fn write_all_vectored<'a>(
+ &'a mut self,
+ bufs: &'a mut [IoSlice<'a>],
+ ) -> WriteAllVectored<'a, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<()>, _>(WriteAllVectored::new(self, bufs))
+ }
+
+ /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be
+ /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
+ /// Requires the `io-compat` feature to enable.
+ #[cfg(feature = "io-compat")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
+ fn compat_write(self) -> Compat<Self>
+ where
+ Self: Sized + Unpin,
+ {
+ Compat::new(self)
+ }
+
+ /// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`.
+ ///
+ /// This adapter produces a sink that will write each value passed to it
+ /// into the underlying writer.
+ ///
+ /// Note that this function consumes the given writer, returning a wrapped
+ /// version.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::AsyncWriteExt;
+ /// use futures::stream::{self, StreamExt};
+ ///
+ /// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]);
+ ///
+ /// let mut writer = vec![];
+ ///
+ /// stream.forward((&mut writer).into_sink()).await?;
+ ///
+ /// assert_eq!(writer, vec![1, 2, 3, 4, 5, 6]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(())
+ /// # })?;
+ /// # Ok::<(), Box<dyn std::error::Error>>(())
+ /// ```
+ #[cfg(feature = "sink")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
+ fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
+ where
+ Self: Sized,
+ {
+ crate::sink::assert_sink::<Item, Error, _>(IntoSink::new(self))
+ }
+}
+
+impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
+
+/// An extension trait which adds utility methods to `AsyncSeek` types.
+pub trait AsyncSeekExt: AsyncSeek {
+ /// Creates a future which will seek an IO object, and then yield the
+ /// new position in the object and the object itself.
+ ///
+ /// In the case of an error the buffer and the object will be discarded, with
+ /// the error yielded.
+ fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<u64>, _>(Seek::new(self, pos))
+ }
+
+ /// Creates a future which will return the current seek position from the
+ /// start of the stream.
+ ///
+ /// This is equivalent to `self.seek(SeekFrom::Current(0))`.
+ fn stream_position(&mut self) -> Seek<'_, Self>
+ where
+ Self: Unpin,
+ {
+ self.seek(SeekFrom::Current(0))
+ }
+}
+
+impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
+
+/// An extension trait which adds utility methods to `AsyncBufRead` types.
+pub trait AsyncBufReadExt: AsyncBufRead {
+ /// Creates a future which will wait for a non-empty buffer to be available from this I/O
+ /// object or EOF to be reached.
+ ///
+ /// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf).
+ ///
+ /// ```rust
+ /// # futures::executor::block_on(async {
+ /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
+ ///
+ /// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read();
+ ///
+ /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
+ /// stream.consume_unpin(2);
+ ///
+ /// assert_eq!(stream.fill_buf().await?, vec![3]);
+ /// stream.consume_unpin(1);
+ ///
+ /// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]);
+ /// stream.consume_unpin(3);
+ ///
+ /// assert_eq!(stream.fill_buf().await?, vec![]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn fill_buf(&mut self) -> FillBuf<'_, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<&[u8]>, _>(FillBuf::new(self))
+ }
+
+ /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types.
+ ///
+ /// ```rust
+ /// # futures::executor::block_on(async {
+ /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
+ ///
+ /// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read();
+ ///
+ /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
+ /// stream.consume_unpin(2);
+ ///
+ /// assert_eq!(stream.fill_buf().await?, vec![3]);
+ /// stream.consume_unpin(1);
+ ///
+ /// assert_eq!(stream.fill_buf().await?, vec![]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn consume_unpin(&mut self, amt: usize)
+ where
+ Self: Unpin,
+ {
+ Pin::new(self).consume(amt)
+ }
+
+ /// Creates a future which will read all the bytes associated with this I/O
+ /// object into `buf` until the delimiter `byte` or EOF is reached.
+ /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
+ ///
+ /// This function will read bytes from the underlying stream until the
+ /// delimiter or EOF is found. Once found, all bytes up to, and including,
+ /// the delimiter (if found) will be appended to `buf`.
+ ///
+ /// The returned future will resolve to the number of bytes read once the read
+ /// operation is completed.
+ ///
+ /// In the case of an error the buffer and the object will be discarded, with
+ /// the error yielded.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncBufReadExt, Cursor};
+ ///
+ /// let mut cursor = Cursor::new(b"lorem-ipsum");
+ /// let mut buf = vec![];
+ ///
+ /// // cursor is at 'l'
+ /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
+ /// assert_eq!(num_bytes, 6);
+ /// assert_eq!(buf, b"lorem-");
+ /// buf.clear();
+ ///
+ /// // cursor is at 'i'
+ /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
+ /// assert_eq!(num_bytes, 5);
+ /// assert_eq!(buf, b"ipsum");
+ /// buf.clear();
+ ///
+ /// // cursor is at EOF
+ /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
+ /// assert_eq!(num_bytes, 0);
+ /// assert_eq!(buf, b"");
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<usize>, _>(ReadUntil::new(self, byte, buf))
+ }
+
+ /// Creates a future which will read all the bytes associated with this I/O
+ /// object into `buf` until a newline (the 0xA byte) or EOF is reached,
+ /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
+ ///
+ /// This function will read bytes from the underlying stream until the
+ /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
+ /// up to, and including, the delimiter (if found) will be appended to
+ /// `buf`.
+ ///
+ /// The returned future will resolve to the number of bytes read once the read
+ /// operation is completed.
+ ///
+ /// In the case of an error the buffer and the object will be discarded, with
+ /// the error yielded.
+ ///
+ /// # Errors
+ ///
+ /// This function has the same error semantics as [`read_until`] and will
+ /// also return an error if the read bytes are not valid UTF-8. If an I/O
+ /// error is encountered then `buf` may contain some bytes already read in
+ /// the event that all data read so far was valid UTF-8.
+ ///
+ /// [`read_until`]: AsyncBufReadExt::read_until
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncBufReadExt, Cursor};
+ ///
+ /// let mut cursor = Cursor::new(b"foo\nbar");
+ /// let mut buf = String::new();
+ ///
+ /// // cursor is at 'f'
+ /// let num_bytes = cursor.read_line(&mut buf).await?;
+ /// assert_eq!(num_bytes, 4);
+ /// assert_eq!(buf, "foo\n");
+ /// buf.clear();
+ ///
+ /// // cursor is at 'b'
+ /// let num_bytes = cursor.read_line(&mut buf).await?;
+ /// assert_eq!(num_bytes, 3);
+ /// assert_eq!(buf, "bar");
+ /// buf.clear();
+ ///
+ /// // cursor is at EOF
+ /// let num_bytes = cursor.read_line(&mut buf).await?;
+ /// assert_eq!(num_bytes, 0);
+ /// assert_eq!(buf, "");
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
+ where
+ Self: Unpin,
+ {
+ assert_future::<Result<usize>, _>(ReadLine::new(self, buf))
+ }
+
+ /// Returns a stream over the lines of this reader.
+ /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
+ ///
+ /// The stream returned from this function will yield instances of
+ /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
+ /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
+ ///
+ /// [`io::Result`]: std::io::Result
+ /// [`String`]: String
+ ///
+ /// # Errors
+ ///
+ /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
+ ///
+ /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncBufReadExt, Cursor};
+ /// use futures::stream::StreamExt;
+ ///
+ /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor");
+ ///
+ /// let mut lines_stream = cursor.lines().map(|l| l.unwrap());
+ /// assert_eq!(lines_stream.next().await, Some(String::from("lorem")));
+ /// assert_eq!(lines_stream.next().await, Some(String::from("ipsum")));
+ /// assert_eq!(lines_stream.next().await, Some(String::from("dolor")));
+ /// assert_eq!(lines_stream.next().await, None);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn lines(self) -> Lines<Self>
+ where
+ Self: Sized,
+ {
+ assert_stream::<Result<String>, _>(Lines::new(self))
+ }
+}
+
+impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
+
+// Just a helper function to ensure the reader we're returning all have the
+// right implementations.
+pub(crate) fn assert_read<R>(reader: R) -> R
+where
+ R: AsyncRead,
+{
+ reader
+}
+// Just a helper function to ensure the writer we're returning all have the
+// right implementations.
+pub(crate) fn assert_write<W>(writer: W) -> W
+where
+ W: AsyncWrite,
+{
+ writer
+}
diff --git a/third_party/rust/futures-util/src/io/read.rs b/third_party/rust/futures-util/src/io/read.rs
new file mode 100644
index 0000000000..677ba818d9
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/read.rs
@@ -0,0 +1,30 @@
+use crate::io::AsyncRead;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use std::io;
+use std::pin::Pin;
+
+/// Future for the [`read`](super::AsyncReadExt::read) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Read<'a, R: ?Sized> {
+ reader: &'a mut R,
+ buf: &'a mut [u8],
+}
+
+impl<R: ?Sized + Unpin> Unpin for Read<'_, R> {}
+
+impl<'a, R: AsyncRead + ?Sized + Unpin> Read<'a, R> {
+ pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self {
+ Self { reader, buf }
+ }
+}
+
+impl<R: AsyncRead + ?Sized + Unpin> Future for Read<'_, R> {
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ Pin::new(&mut this.reader).poll_read(cx, this.buf)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/read_exact.rs b/third_party/rust/futures-util/src/io/read_exact.rs
new file mode 100644
index 0000000000..cd0b20e597
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/read_exact.rs
@@ -0,0 +1,42 @@
+use crate::io::AsyncRead;
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use std::io;
+use std::mem;
+use std::pin::Pin;
+
+/// Future for the [`read_exact`](super::AsyncReadExt::read_exact) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct ReadExact<'a, R: ?Sized> {
+ reader: &'a mut R,
+ buf: &'a mut [u8],
+}
+
+impl<R: ?Sized + Unpin> Unpin for ReadExact<'_, R> {}
+
+impl<'a, R: AsyncRead + ?Sized + Unpin> ReadExact<'a, R> {
+ pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self {
+ Self { reader, buf }
+ }
+}
+
+impl<R: AsyncRead + ?Sized + Unpin> Future for ReadExact<'_, R> {
+ type Output = io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ while !this.buf.is_empty() {
+ let n = ready!(Pin::new(&mut this.reader).poll_read(cx, this.buf))?;
+ {
+ let (_, rest) = mem::take(&mut this.buf).split_at_mut(n);
+ this.buf = rest;
+ }
+ if n == 0 {
+ return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
+ }
+ }
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/read_line.rs b/third_party/rust/futures-util/src/io/read_line.rs
new file mode 100644
index 0000000000..e1b8fc9455
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/read_line.rs
@@ -0,0 +1,57 @@
+use super::read_until::read_until_internal;
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncBufRead;
+use std::io;
+use std::mem;
+use std::pin::Pin;
+use std::str;
+
+/// Future for the [`read_line`](super::AsyncBufReadExt::read_line) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct ReadLine<'a, R: ?Sized> {
+ reader: &'a mut R,
+ buf: &'a mut String,
+ bytes: Vec<u8>,
+ read: usize,
+}
+
+impl<R: ?Sized + Unpin> Unpin for ReadLine<'_, R> {}
+
+impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> {
+ pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
+ Self { reader, bytes: mem::take(buf).into_bytes(), buf, read: 0 }
+ }
+}
+
+pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
+ reader: Pin<&mut R>,
+ cx: &mut Context<'_>,
+ buf: &mut String,
+ bytes: &mut Vec<u8>,
+ read: &mut usize,
+) -> Poll<io::Result<usize>> {
+ let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
+ if str::from_utf8(bytes).is_err() {
+ Poll::Ready(ret.and_then(|_| {
+ Err(io::Error::new(io::ErrorKind::InvalidData, "stream did not contain valid UTF-8"))
+ }))
+ } else {
+ debug_assert!(buf.is_empty());
+ debug_assert_eq!(*read, 0);
+ // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
+ mem::swap(unsafe { buf.as_mut_vec() }, bytes);
+ Poll::Ready(ret)
+ }
+}
+
+impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> {
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let Self { reader, buf, bytes, read } = &mut *self;
+ read_line_internal(Pin::new(reader), cx, buf, bytes, read)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/read_to_end.rs b/third_party/rust/futures-util/src/io/read_to_end.rs
new file mode 100644
index 0000000000..919d7d13c7
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/read_to_end.rs
@@ -0,0 +1,91 @@
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncRead;
+use std::io;
+use std::pin::Pin;
+use std::vec::Vec;
+
+/// Future for the [`read_to_end`](super::AsyncReadExt::read_to_end) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct ReadToEnd<'a, R: ?Sized> {
+ reader: &'a mut R,
+ buf: &'a mut Vec<u8>,
+ start_len: usize,
+}
+
+impl<R: ?Sized + Unpin> Unpin for ReadToEnd<'_, R> {}
+
+impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToEnd<'a, R> {
+ pub(super) fn new(reader: &'a mut R, buf: &'a mut Vec<u8>) -> Self {
+ let start_len = buf.len();
+ Self { reader, buf, start_len }
+ }
+}
+
+struct Guard<'a> {
+ buf: &'a mut Vec<u8>,
+ len: usize,
+}
+
+impl Drop for Guard<'_> {
+ fn drop(&mut self) {
+ unsafe {
+ self.buf.set_len(self.len);
+ }
+ }
+}
+
+// This uses an adaptive system to extend the vector when it fills. We want to
+// avoid paying to allocate and zero a huge chunk of memory if the reader only
+// has 4 bytes while still making large reads if the reader does have a ton
+// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
+// time is 4,500 times (!) slower than this if the reader has a very small
+// amount of data to return.
+//
+// Because we're extending the buffer with uninitialized data for trusted
+// readers, we need to make sure to truncate that if any of this panics.
+pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
+ mut rd: Pin<&mut R>,
+ cx: &mut Context<'_>,
+ buf: &mut Vec<u8>,
+ start_len: usize,
+) -> Poll<io::Result<usize>> {
+ let mut g = Guard { len: buf.len(), buf };
+ loop {
+ if g.len == g.buf.len() {
+ unsafe {
+ g.buf.reserve(32);
+ let capacity = g.buf.capacity();
+ g.buf.set_len(capacity);
+ super::initialize(&rd, &mut g.buf[g.len..]);
+ }
+ }
+
+ let buf = &mut g.buf[g.len..];
+ match ready!(rd.as_mut().poll_read(cx, buf)) {
+ Ok(0) => return Poll::Ready(Ok(g.len - start_len)),
+ Ok(n) => {
+ // We can't allow bogus values from read. If it is too large, the returned vec could have its length
+ // set past its capacity, or if it overflows the vec could be shortened which could create an invalid
+ // string if this is called via read_to_string.
+ assert!(n <= buf.len());
+ g.len += n;
+ }
+ Err(e) => return Poll::Ready(Err(e)),
+ }
+ }
+}
+
+impl<A> Future for ReadToEnd<'_, A>
+where
+ A: AsyncRead + ?Sized + Unpin,
+{
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf, this.start_len)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/read_to_string.rs b/third_party/rust/futures-util/src/io/read_to_string.rs
new file mode 100644
index 0000000000..c175396d81
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/read_to_string.rs
@@ -0,0 +1,59 @@
+use super::read_to_end::read_to_end_internal;
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncRead;
+use std::pin::Pin;
+use std::vec::Vec;
+use std::{io, mem, str};
+
+/// Future for the [`read_to_string`](super::AsyncReadExt::read_to_string) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct ReadToString<'a, R: ?Sized> {
+ reader: &'a mut R,
+ buf: &'a mut String,
+ bytes: Vec<u8>,
+ start_len: usize,
+}
+
+impl<R: ?Sized + Unpin> Unpin for ReadToString<'_, R> {}
+
+impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToString<'a, R> {
+ pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
+ let start_len = buf.len();
+ Self { reader, bytes: mem::take(buf).into_bytes(), buf, start_len }
+ }
+}
+
+fn read_to_string_internal<R: AsyncRead + ?Sized>(
+ reader: Pin<&mut R>,
+ cx: &mut Context<'_>,
+ buf: &mut String,
+ bytes: &mut Vec<u8>,
+ start_len: usize,
+) -> Poll<io::Result<usize>> {
+ let ret = ready!(read_to_end_internal(reader, cx, bytes, start_len));
+ if str::from_utf8(bytes).is_err() {
+ Poll::Ready(ret.and_then(|_| {
+ Err(io::Error::new(io::ErrorKind::InvalidData, "stream did not contain valid UTF-8"))
+ }))
+ } else {
+ debug_assert!(buf.is_empty());
+ // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
+ mem::swap(unsafe { buf.as_mut_vec() }, bytes);
+ Poll::Ready(ret)
+ }
+}
+
+impl<A> Future for ReadToString<'_, A>
+where
+ A: AsyncRead + ?Sized + Unpin,
+{
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let Self { reader, buf, bytes, start_len } = &mut *self;
+ read_to_string_internal(Pin::new(reader), cx, buf, bytes, *start_len)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/read_until.rs b/third_party/rust/futures-util/src/io/read_until.rs
new file mode 100644
index 0000000000..72b59eab13
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/read_until.rs
@@ -0,0 +1,60 @@
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncBufRead;
+use std::io;
+use std::mem;
+use std::pin::Pin;
+
+/// Future for the [`read_until`](super::AsyncBufReadExt::read_until) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct ReadUntil<'a, R: ?Sized> {
+ reader: &'a mut R,
+ byte: u8,
+ buf: &'a mut Vec<u8>,
+ read: usize,
+}
+
+impl<R: ?Sized + Unpin> Unpin for ReadUntil<'_, R> {}
+
+impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> {
+ pub(super) fn new(reader: &'a mut R, byte: u8, buf: &'a mut Vec<u8>) -> Self {
+ Self { reader, byte, buf, read: 0 }
+ }
+}
+
+pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
+ mut reader: Pin<&mut R>,
+ cx: &mut Context<'_>,
+ byte: u8,
+ buf: &mut Vec<u8>,
+ read: &mut usize,
+) -> Poll<io::Result<usize>> {
+ loop {
+ let (done, used) = {
+ let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
+ if let Some(i) = memchr::memchr(byte, available) {
+ buf.extend_from_slice(&available[..=i]);
+ (true, i + 1)
+ } else {
+ buf.extend_from_slice(available);
+ (false, available.len())
+ }
+ };
+ reader.as_mut().consume(used);
+ *read += used;
+ if done || used == 0 {
+ return Poll::Ready(Ok(mem::replace(read, 0)));
+ }
+ }
+}
+
+impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let Self { reader, byte, buf, read } = &mut *self;
+ read_until_internal(Pin::new(reader), cx, *byte, buf, read)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/read_vectored.rs b/third_party/rust/futures-util/src/io/read_vectored.rs
new file mode 100644
index 0000000000..4e22df57e9
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/read_vectored.rs
@@ -0,0 +1,30 @@
+use crate::io::AsyncRead;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use std::io::{self, IoSliceMut};
+use std::pin::Pin;
+
+/// Future for the [`read_vectored`](super::AsyncReadExt::read_vectored) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct ReadVectored<'a, R: ?Sized> {
+ reader: &'a mut R,
+ bufs: &'a mut [IoSliceMut<'a>],
+}
+
+impl<R: ?Sized + Unpin> Unpin for ReadVectored<'_, R> {}
+
+impl<'a, R: AsyncRead + ?Sized + Unpin> ReadVectored<'a, R> {
+ pub(super) fn new(reader: &'a mut R, bufs: &'a mut [IoSliceMut<'a>]) -> Self {
+ Self { reader, bufs }
+ }
+}
+
+impl<R: AsyncRead + ?Sized + Unpin> Future for ReadVectored<'_, R> {
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ Pin::new(&mut this.reader).poll_read_vectored(cx, this.bufs)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/repeat.rs b/third_party/rust/futures-util/src/io/repeat.rs
new file mode 100644
index 0000000000..2828bf0114
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/repeat.rs
@@ -0,0 +1,66 @@
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncRead, IoSliceMut};
+use std::fmt;
+use std::io;
+use std::pin::Pin;
+
+/// Reader for the [`repeat()`] function.
+#[must_use = "readers do nothing unless polled"]
+pub struct Repeat {
+ byte: u8,
+}
+
+/// Creates an instance of a reader that infinitely repeats one byte.
+///
+/// All reads from this reader will succeed by filling the specified buffer with
+/// the given byte.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::io::{self, AsyncReadExt};
+///
+/// let mut buffer = [0; 3];
+/// let mut reader = io::repeat(0b101);
+/// reader.read_exact(&mut buffer).await.unwrap();
+/// assert_eq!(buffer, [0b101, 0b101, 0b101]);
+/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+/// ```
+pub fn repeat(byte: u8) -> Repeat {
+ Repeat { byte }
+}
+
+impl AsyncRead for Repeat {
+ #[inline]
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ for slot in &mut *buf {
+ *slot = self.byte;
+ }
+ Poll::Ready(Ok(buf.len()))
+ }
+
+ #[inline]
+ fn poll_read_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<io::Result<usize>> {
+ let mut nwritten = 0;
+ for buf in bufs {
+ nwritten += ready!(self.as_mut().poll_read(cx, buf))?;
+ }
+ Poll::Ready(Ok(nwritten))
+ }
+}
+
+impl fmt::Debug for Repeat {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Repeat { .. }")
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/seek.rs b/third_party/rust/futures-util/src/io/seek.rs
new file mode 100644
index 0000000000..0aa2371393
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/seek.rs
@@ -0,0 +1,30 @@
+use crate::io::{AsyncSeek, SeekFrom};
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use std::io;
+use std::pin::Pin;
+
+/// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Seek<'a, S: ?Sized> {
+ seek: &'a mut S,
+ pos: SeekFrom,
+}
+
+impl<S: ?Sized + Unpin> Unpin for Seek<'_, S> {}
+
+impl<'a, S: AsyncSeek + ?Sized + Unpin> Seek<'a, S> {
+ pub(super) fn new(seek: &'a mut S, pos: SeekFrom) -> Self {
+ Self { seek, pos }
+ }
+}
+
+impl<S: AsyncSeek + ?Sized + Unpin> Future for Seek<'_, S> {
+ type Output = io::Result<u64>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ Pin::new(&mut this.seek).poll_seek(cx, this.pos)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/sink.rs b/third_party/rust/futures-util/src/io/sink.rs
new file mode 100644
index 0000000000..4a32ca7041
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/sink.rs
@@ -0,0 +1,67 @@
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncWrite, IoSlice};
+use std::fmt;
+use std::io;
+use std::pin::Pin;
+
+/// Writer for the [`sink()`] function.
+#[must_use = "writers do nothing unless polled"]
+pub struct Sink {
+ _priv: (),
+}
+
+/// Creates an instance of a writer which will successfully consume all data.
+///
+/// All calls to `poll_write` on the returned instance will return `Poll::Ready(Ok(buf.len()))`
+/// and the contents of the buffer will not be inspected.
+///
+/// # Examples
+///
+/// ```rust
+/// # futures::executor::block_on(async {
+/// use futures::io::{self, AsyncWriteExt};
+///
+/// let buffer = vec![1, 2, 3, 5, 8];
+/// let mut writer = io::sink();
+/// let num_bytes = writer.write(&buffer).await?;
+/// assert_eq!(num_bytes, 5);
+/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+/// ```
+pub fn sink() -> Sink {
+ Sink { _priv: () }
+}
+
+impl AsyncWrite for Sink {
+ #[inline]
+ fn poll_write(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(Ok(buf.len()))
+ }
+
+ #[inline]
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(Ok(bufs.iter().map(|b| b.len()).sum()))
+ }
+
+ #[inline]
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+ #[inline]
+ fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl fmt::Debug for Sink {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Sink { .. }")
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/split.rs b/third_party/rust/futures-util/src/io/split.rs
new file mode 100644
index 0000000000..3f1b9af456
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/split.rs
@@ -0,0 +1,115 @@
+use crate::lock::BiLock;
+use core::fmt;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut};
+use std::io;
+use std::pin::Pin;
+
+/// The readable half of an object returned from `AsyncRead::split`.
+#[derive(Debug)]
+pub struct ReadHalf<T> {
+ handle: BiLock<T>,
+}
+
+/// The writable half of an object returned from `AsyncRead::split`.
+#[derive(Debug)]
+pub struct WriteHalf<T> {
+ handle: BiLock<T>,
+}
+
+fn lock_and_then<T, U, E, F>(lock: &BiLock<T>, cx: &mut Context<'_>, f: F) -> Poll<Result<U, E>>
+where
+ F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<U, E>>,
+{
+ let mut l = ready!(lock.poll_lock(cx));
+ f(l.as_pin_mut(), cx)
+}
+
+pub(super) fn split<T: AsyncRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) {
+ let (a, b) = BiLock::new(t);
+ (ReadHalf { handle: a }, WriteHalf { handle: b })
+}
+
+impl<T: Unpin> ReadHalf<T> {
+ /// Attempts to put the two "halves" of a split `AsyncRead + AsyncWrite` back
+ /// together. Succeeds only if the `ReadHalf<T>` and `WriteHalf<T>` are
+ /// a matching pair originating from the same call to `AsyncReadExt::split`.
+ pub fn reunite(self, other: WriteHalf<T>) -> Result<T, ReuniteError<T>> {
+ self.handle
+ .reunite(other.handle)
+ .map_err(|err| ReuniteError(ReadHalf { handle: err.0 }, WriteHalf { handle: err.1 }))
+ }
+}
+
+impl<T: Unpin> WriteHalf<T> {
+ /// Attempts to put the two "halves" of a split `AsyncRead + AsyncWrite` back
+ /// together. Succeeds only if the `ReadHalf<T>` and `WriteHalf<T>` are
+ /// a matching pair originating from the same call to `AsyncReadExt::split`.
+ pub fn reunite(self, other: ReadHalf<T>) -> Result<T, ReuniteError<T>> {
+ other.reunite(self)
+ }
+}
+
+impl<R: AsyncRead> AsyncRead for ReadHalf<R> {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ lock_and_then(&self.handle, cx, |l, cx| l.poll_read(cx, buf))
+ }
+
+ fn poll_read_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<io::Result<usize>> {
+ lock_and_then(&self.handle, cx, |l, cx| l.poll_read_vectored(cx, bufs))
+ }
+}
+
+impl<W: AsyncWrite> AsyncWrite for WriteHalf<W> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ lock_and_then(&self.handle, cx, |l, cx| l.poll_write(cx, buf))
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ lock_and_then(&self.handle, cx, |l, cx| l.poll_write_vectored(cx, bufs))
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ lock_and_then(&self.handle, cx, |l, cx| l.poll_flush(cx))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ lock_and_then(&self.handle, cx, |l, cx| l.poll_close(cx))
+ }
+}
+
+/// Error indicating a `ReadHalf<T>` and `WriteHalf<T>` were not two halves
+/// of a `AsyncRead + AsyncWrite`, and thus could not be `reunite`d.
+pub struct ReuniteError<T>(pub ReadHalf<T>, pub WriteHalf<T>);
+
+impl<T> fmt::Debug for ReuniteError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_tuple("ReuniteError").field(&"...").finish()
+ }
+}
+
+impl<T> fmt::Display for ReuniteError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "tried to reunite a ReadHalf and WriteHalf that don't form a pair")
+ }
+}
+
+#[cfg(feature = "std")]
+impl<T: core::any::Any> std::error::Error for ReuniteError<T> {}
diff --git a/third_party/rust/futures-util/src/io/take.rs b/third_party/rust/futures-util/src/io/take.rs
new file mode 100644
index 0000000000..2c494804d9
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/take.rs
@@ -0,0 +1,125 @@
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncBufRead, AsyncRead};
+use pin_project_lite::pin_project;
+use std::pin::Pin;
+use std::{cmp, io};
+
+pin_project! {
+ /// Reader for the [`take`](super::AsyncReadExt::take) method.
+ #[derive(Debug)]
+ #[must_use = "readers do nothing unless you `.await` or poll them"]
+ pub struct Take<R> {
+ #[pin]
+ inner: R,
+ limit: u64,
+ }
+}
+
+impl<R: AsyncRead> Take<R> {
+ pub(super) fn new(inner: R, limit: u64) -> Self {
+ Self { inner, limit }
+ }
+
+ /// Returns the remaining number of bytes that can be
+ /// read before this instance will return EOF.
+ ///
+ /// # Note
+ ///
+ /// This instance may reach `EOF` after reading fewer bytes than indicated by
+ /// this method if the underlying [`AsyncRead`] instance reaches EOF.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let reader = Cursor::new(&b"12345678"[..]);
+ /// let mut buffer = [0; 2];
+ ///
+ /// let mut take = reader.take(4);
+ /// let n = take.read(&mut buffer).await?;
+ ///
+ /// assert_eq!(take.limit(), 2);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ pub fn limit(&self) -> u64 {
+ self.limit
+ }
+
+ /// Sets the number of bytes that can be read before this instance will
+ /// return EOF. This is the same as constructing a new `Take` instance, so
+ /// the amount of bytes read and the previous limit value don't matter when
+ /// calling this method.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let reader = Cursor::new(&b"12345678"[..]);
+ /// let mut buffer = [0; 4];
+ ///
+ /// let mut take = reader.take(4);
+ /// let n = take.read(&mut buffer).await?;
+ ///
+ /// assert_eq!(n, 4);
+ /// assert_eq!(take.limit(), 0);
+ ///
+ /// take.set_limit(10);
+ /// let n = take.read(&mut buffer).await?;
+ /// assert_eq!(n, 4);
+ ///
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ pub fn set_limit(&mut self, limit: u64) {
+ self.limit = limit
+ }
+
+ delegate_access_inner!(inner, R, ());
+}
+
+impl<R: AsyncRead> AsyncRead for Take<R> {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<Result<usize, io::Error>> {
+ let this = self.project();
+
+ if *this.limit == 0 {
+ return Poll::Ready(Ok(0));
+ }
+
+ let max = cmp::min(buf.len() as u64, *this.limit) as usize;
+ let n = ready!(this.inner.poll_read(cx, &mut buf[..max]))?;
+ *this.limit -= n as u64;
+ Poll::Ready(Ok(n))
+ }
+}
+
+impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ let this = self.project();
+
+ // Don't call into inner reader at all at EOF because it may still block
+ if *this.limit == 0 {
+ return Poll::Ready(Ok(&[]));
+ }
+
+ let buf = ready!(this.inner.poll_fill_buf(cx)?);
+ let cap = cmp::min(buf.len() as u64, *this.limit) as usize;
+ Poll::Ready(Ok(&buf[..cap]))
+ }
+
+ fn consume(self: Pin<&mut Self>, amt: usize) {
+ let this = self.project();
+
+ // Don't let callers reset the limit by passing an overlarge value
+ let amt = cmp::min(amt as u64, *this.limit) as usize;
+ *this.limit -= amt as u64;
+ this.inner.consume(amt);
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/window.rs b/third_party/rust/futures-util/src/io/window.rs
new file mode 100644
index 0000000000..77b7267c69
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/window.rs
@@ -0,0 +1,104 @@
+use std::ops::{Bound, Range, RangeBounds};
+
+/// A owned window around an underlying buffer.
+///
+/// Normally slices work great for considering sub-portions of a buffer, but
+/// unfortunately a slice is a *borrowed* type in Rust which has an associated
+/// lifetime. When working with future and async I/O these lifetimes are not
+/// always appropriate, and are sometimes difficult to store in tasks. This
+/// type strives to fill this gap by providing an "owned slice" around an
+/// underlying buffer of bytes.
+///
+/// A `Window<T>` wraps an underlying buffer, `T`, and has configurable
+/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation
+/// that this type carries.
+///
+/// This type can be particularly useful when working with the `write_all`
+/// combinator in this crate. Data can be sliced via `Window`, consumed by
+/// `write_all`, and then earned back once the write operation finishes through
+/// the `into_inner` method on this type.
+#[derive(Debug)]
+pub struct Window<T> {
+ inner: T,
+ range: Range<usize>,
+}
+
+impl<T: AsRef<[u8]>> Window<T> {
+ /// Creates a new window around the buffer `t` defaulting to the entire
+ /// slice.
+ ///
+ /// Further methods can be called on the returned `Window<T>` to alter the
+ /// window into the data provided.
+ pub fn new(t: T) -> Self {
+ Self { range: 0..t.as_ref().len(), inner: t }
+ }
+
+ /// Gets a shared reference to the underlying buffer inside of this
+ /// `Window`.
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Gets a mutable reference to the underlying buffer inside of this
+ /// `Window`.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Consumes this `Window`, returning the underlying buffer.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ /// Returns the starting index of this window into the underlying buffer
+ /// `T`.
+ pub fn start(&self) -> usize {
+ self.range.start
+ }
+
+ /// Returns the end index of this window into the underlying buffer
+ /// `T`.
+ pub fn end(&self) -> usize {
+ self.range.end
+ }
+
+ /// Changes the range of this window to the range specified.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `range` is out of bounds for the underlying
+ /// slice or if [`start_bound()`] of `range` comes after the [`end_bound()`].
+ ///
+ /// [`start_bound()`]: std::ops::RangeBounds::start_bound
+ /// [`end_bound()`]: std::ops::RangeBounds::end_bound
+ pub fn set<R: RangeBounds<usize>>(&mut self, range: R) {
+ let start = match range.start_bound() {
+ Bound::Included(n) => *n,
+ Bound::Excluded(n) => *n + 1,
+ Bound::Unbounded => 0,
+ };
+ let end = match range.end_bound() {
+ Bound::Included(n) => *n + 1,
+ Bound::Excluded(n) => *n,
+ Bound::Unbounded => self.inner.as_ref().len(),
+ };
+
+ assert!(end <= self.inner.as_ref().len());
+ assert!(start <= end);
+
+ self.range.start = start;
+ self.range.end = end;
+ }
+}
+
+impl<T: AsRef<[u8]>> AsRef<[u8]> for Window<T> {
+ fn as_ref(&self) -> &[u8] {
+ &self.inner.as_ref()[self.range.start..self.range.end]
+ }
+}
+
+impl<T: AsMut<[u8]>> AsMut<[u8]> for Window<T> {
+ fn as_mut(&mut self) -> &mut [u8] {
+ &mut self.inner.as_mut()[self.range.start..self.range.end]
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/write.rs b/third_party/rust/futures-util/src/io/write.rs
new file mode 100644
index 0000000000..c47ef9e2eb
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/write.rs
@@ -0,0 +1,30 @@
+use crate::io::AsyncWrite;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use std::io;
+use std::pin::Pin;
+
+/// Future for the [`write`](super::AsyncWriteExt::write) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Write<'a, W: ?Sized> {
+ writer: &'a mut W,
+ buf: &'a [u8],
+}
+
+impl<W: ?Sized + Unpin> Unpin for Write<'_, W> {}
+
+impl<'a, W: AsyncWrite + ?Sized + Unpin> Write<'a, W> {
+ pub(super) fn new(writer: &'a mut W, buf: &'a [u8]) -> Self {
+ Self { writer, buf }
+ }
+}
+
+impl<W: AsyncWrite + ?Sized + Unpin> Future for Write<'_, W> {
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ Pin::new(&mut this.writer).poll_write(cx, this.buf)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/write_all.rs b/third_party/rust/futures-util/src/io/write_all.rs
new file mode 100644
index 0000000000..08c025f94d
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/write_all.rs
@@ -0,0 +1,43 @@
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncWrite;
+use std::io;
+use std::mem;
+use std::pin::Pin;
+
+/// Future for the [`write_all`](super::AsyncWriteExt::write_all) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct WriteAll<'a, W: ?Sized> {
+ writer: &'a mut W,
+ buf: &'a [u8],
+}
+
+impl<W: ?Sized + Unpin> Unpin for WriteAll<'_, W> {}
+
+impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAll<'a, W> {
+ pub(super) fn new(writer: &'a mut W, buf: &'a [u8]) -> Self {
+ Self { writer, buf }
+ }
+}
+
+impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAll<'_, W> {
+ type Output = io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let this = &mut *self;
+ while !this.buf.is_empty() {
+ let n = ready!(Pin::new(&mut this.writer).poll_write(cx, this.buf))?;
+ {
+ let (_, rest) = mem::take(&mut this.buf).split_at(n);
+ this.buf = rest;
+ }
+ if n == 0 {
+ return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
+ }
+ }
+
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/write_all_vectored.rs b/third_party/rust/futures-util/src/io/write_all_vectored.rs
new file mode 100644
index 0000000000..a8fc4c641c
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/write_all_vectored.rs
@@ -0,0 +1,193 @@
+use futures_core::future::Future;
+use futures_core::ready;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncWrite;
+use futures_io::IoSlice;
+use std::io;
+use std::pin::Pin;
+
+/// Future for the
+/// [`write_all_vectored`](super::AsyncWriteExt::write_all_vectored) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct WriteAllVectored<'a, W: ?Sized + Unpin> {
+ writer: &'a mut W,
+ bufs: &'a mut [IoSlice<'a>],
+}
+
+impl<W: ?Sized + Unpin> Unpin for WriteAllVectored<'_, W> {}
+
+impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAllVectored<'a, W> {
+ pub(super) fn new(writer: &'a mut W, mut bufs: &'a mut [IoSlice<'a>]) -> Self {
+ IoSlice::advance_slices(&mut bufs, 0);
+ Self { writer, bufs }
+ }
+}
+
+impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAllVectored<'_, W> {
+ type Output = io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let this = &mut *self;
+ while !this.bufs.is_empty() {
+ let n = ready!(Pin::new(&mut this.writer).poll_write_vectored(cx, this.bufs))?;
+ if n == 0 {
+ return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
+ } else {
+ IoSlice::advance_slices(&mut this.bufs, n);
+ }
+ }
+
+ Poll::Ready(Ok(()))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::cmp::min;
+ use std::future::Future;
+ use std::io;
+ use std::pin::Pin;
+ use std::task::{Context, Poll};
+
+ use crate::io::{AsyncWrite, AsyncWriteExt, IoSlice};
+ use crate::task::noop_waker;
+
+ /// Create a new writer that reads from at most `n_bufs` and reads
+ /// `per_call` bytes (in total) per call to write.
+ fn test_writer(n_bufs: usize, per_call: usize) -> TestWriter {
+ TestWriter { n_bufs, per_call, written: Vec::new() }
+ }
+
+ // TODO: maybe move this the future-test crate?
+ struct TestWriter {
+ n_bufs: usize,
+ per_call: usize,
+ written: Vec<u8>,
+ }
+
+ impl AsyncWrite for TestWriter {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.poll_write_vectored(cx, &[IoSlice::new(buf)])
+ }
+
+ fn poll_write_vectored(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ let mut left = self.per_call;
+ let mut written = 0;
+ for buf in bufs.iter().take(self.n_bufs) {
+ let n = min(left, buf.len());
+ self.written.extend_from_slice(&buf[0..n]);
+ left -= n;
+ written += n;
+ }
+ Poll::Ready(Ok(written))
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ // TODO: maybe move this the future-test crate?
+ macro_rules! assert_poll_ok {
+ ($e:expr, $expected:expr) => {
+ let expected = $expected;
+ match $e {
+ Poll::Ready(Ok(ok)) if ok == expected => {}
+ got => {
+ panic!("unexpected result, got: {:?}, wanted: Ready(Ok({:?}))", got, expected)
+ }
+ }
+ };
+ }
+
+ #[test]
+ fn test_writer_read_from_one_buf() {
+ let waker = noop_waker();
+ let mut cx = Context::from_waker(&waker);
+
+ let mut dst = test_writer(1, 2);
+ let mut dst = Pin::new(&mut dst);
+
+ assert_poll_ok!(dst.as_mut().poll_write(&mut cx, &[]), 0);
+ assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, &[]), 0);
+
+ // Read at most 2 bytes.
+ assert_poll_ok!(dst.as_mut().poll_write(&mut cx, &[1, 1, 1]), 2);
+ let bufs = &[IoSlice::new(&[2, 2, 2])];
+ assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 2);
+
+ // Only read from first buf.
+ let bufs = &[IoSlice::new(&[3]), IoSlice::new(&[4, 4])];
+ assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 1);
+
+ assert_eq!(dst.written, &[1, 1, 2, 2, 3]);
+ }
+
+ #[test]
+ fn test_writer_read_from_multiple_bufs() {
+ let waker = noop_waker();
+ let mut cx = Context::from_waker(&waker);
+
+ let mut dst = test_writer(3, 3);
+ let mut dst = Pin::new(&mut dst);
+
+ // Read at most 3 bytes from two buffers.
+ let bufs = &[IoSlice::new(&[1]), IoSlice::new(&[2, 2, 2])];
+ assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 3);
+
+ // Read at most 3 bytes from three buffers.
+ let bufs = &[IoSlice::new(&[3]), IoSlice::new(&[4]), IoSlice::new(&[5, 5])];
+ assert_poll_ok!(dst.as_mut().poll_write_vectored(&mut cx, bufs), 3);
+
+ assert_eq!(dst.written, &[1, 2, 2, 3, 4, 5]);
+ }
+
+ #[test]
+ fn test_write_all_vectored() {
+ let waker = noop_waker();
+ let mut cx = Context::from_waker(&waker);
+
+ #[rustfmt::skip] // Becomes unreadable otherwise.
+ let tests: Vec<(_, &'static [u8])> = vec![
+ (vec![], &[]),
+ (vec![IoSlice::new(&[]), IoSlice::new(&[])], &[]),
+ (vec![IoSlice::new(&[1])], &[1]),
+ (vec![IoSlice::new(&[1, 2])], &[1, 2]),
+ (vec![IoSlice::new(&[1, 2, 3])], &[1, 2, 3]),
+ (vec![IoSlice::new(&[1, 2, 3, 4])], &[1, 2, 3, 4]),
+ (vec![IoSlice::new(&[1, 2, 3, 4, 5])], &[1, 2, 3, 4, 5]),
+ (vec![IoSlice::new(&[1]), IoSlice::new(&[2])], &[1, 2]),
+ (vec![IoSlice::new(&[1, 1]), IoSlice::new(&[2, 2])], &[1, 1, 2, 2]),
+ (vec![IoSlice::new(&[1, 1, 1]), IoSlice::new(&[2, 2, 2])], &[1, 1, 1, 2, 2, 2]),
+ (vec![IoSlice::new(&[1, 1, 1, 1]), IoSlice::new(&[2, 2, 2, 2])], &[1, 1, 1, 1, 2, 2, 2, 2]),
+ (vec![IoSlice::new(&[1]), IoSlice::new(&[2]), IoSlice::new(&[3])], &[1, 2, 3]),
+ (vec![IoSlice::new(&[1, 1]), IoSlice::new(&[2, 2]), IoSlice::new(&[3, 3])], &[1, 1, 2, 2, 3, 3]),
+ (vec![IoSlice::new(&[1, 1, 1]), IoSlice::new(&[2, 2, 2]), IoSlice::new(&[3, 3, 3])], &[1, 1, 1, 2, 2, 2, 3, 3, 3]),
+ ];
+
+ for (mut input, wanted) in tests {
+ let mut dst = test_writer(2, 2);
+ {
+ let mut future = dst.write_all_vectored(&mut *input);
+ match Pin::new(&mut future).poll(&mut cx) {
+ Poll::Ready(Ok(())) => {}
+ other => panic!("unexpected result polling future: {:?}", other),
+ }
+ }
+ assert_eq!(&*dst.written, &*wanted);
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/write_vectored.rs b/third_party/rust/futures-util/src/io/write_vectored.rs
new file mode 100644
index 0000000000..14a01d7302
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/write_vectored.rs
@@ -0,0 +1,30 @@
+use crate::io::AsyncWrite;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use std::io::{self, IoSlice};
+use std::pin::Pin;
+
+/// Future for the [`write_vectored`](super::AsyncWriteExt::write_vectored) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct WriteVectored<'a, W: ?Sized> {
+ writer: &'a mut W,
+ bufs: &'a [IoSlice<'a>],
+}
+
+impl<W: ?Sized + Unpin> Unpin for WriteVectored<'_, W> {}
+
+impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteVectored<'a, W> {
+ pub(super) fn new(writer: &'a mut W, bufs: &'a [IoSlice<'a>]) -> Self {
+ Self { writer, bufs }
+ }
+}
+
+impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteVectored<'_, W> {
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ Pin::new(&mut this.writer).poll_write_vectored(cx, this.bufs)
+ }
+}