summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-util/src/io
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-util/src/io')
-rw-r--r--third_party/rust/futures-util/src/io/allow_std.rs179
-rw-r--r--third_party/rust/futures-util/src/io/buf_reader.rs259
-rw-r--r--third_party/rust/futures-util/src/io/buf_writer.rs222
-rw-r--r--third_party/rust/futures-util/src/io/chain.rs166
-rw-r--r--third_party/rust/futures-util/src/io/close.rs28
-rw-r--r--third_party/rust/futures-util/src/io/copy.rs63
-rw-r--r--third_party/rust/futures-util/src/io/copy_buf.rs87
-rw-r--r--third_party/rust/futures-util/src/io/cursor.rs238
-rw-r--r--third_party/rust/futures-util/src/io/empty.rs67
-rw-r--r--third_party/rust/futures-util/src/io/flush.rs30
-rw-r--r--third_party/rust/futures-util/src/io/into_sink.rs108
-rw-r--r--third_party/rust/futures-util/src/io/lines.rs50
-rw-r--r--third_party/rust/futures-util/src/io/mod.rs675
-rw-r--r--third_party/rust/futures-util/src/io/read.rs30
-rw-r--r--third_party/rust/futures-util/src/io/read_exact.rs41
-rw-r--r--third_party/rust/futures-util/src/io/read_line.rs61
-rw-r--r--third_party/rust/futures-util/src/io/read_to_end.rs90
-rw-r--r--third_party/rust/futures-util/src/io/read_to_string.rs66
-rw-r--r--third_party/rust/futures-util/src/io/read_until.rs59
-rw-r--r--third_party/rust/futures-util/src/io/read_vectored.rs30
-rw-r--r--third_party/rust/futures-util/src/io/repeat.rs73
-rw-r--r--third_party/rust/futures-util/src/io/seek.rs30
-rw-r--r--third_party/rust/futures-util/src/io/sink.rs67
-rw-r--r--third_party/rust/futures-util/src/io/split.rs69
-rw-r--r--third_party/rust/futures-util/src/io/take.rs210
-rw-r--r--third_party/rust/futures-util/src/io/window.rs107
-rw-r--r--third_party/rust/futures-util/src/io/write.rs30
-rw-r--r--third_party/rust/futures-util/src/io/write_all.rs42
-rw-r--r--third_party/rust/futures-util/src/io/write_vectored.rs30
29 files changed, 3207 insertions, 0 deletions
diff --git a/third_party/rust/futures-util/src/io/allow_std.rs b/third_party/rust/futures-util/src/io/allow_std.rs
new file mode 100644
index 0000000000..346e9babea
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/allow_std.rs
@@ -0,0 +1,179 @@
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "read-initializer")]
+use futures_io::Initializer;
+use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, IoSlice, IoSliceMut, SeekFrom};
+use std::{fmt, io};
+use std::pin::Pin;
+
+/// A simple wrapper type which allows types which implement only
+/// implement `std::io::Read` or `std::io::Write`
+/// to be used in contexts which expect an `AsyncRead` or `AsyncWrite`.
+///
+/// If these types issue an error with the kind `io::ErrorKind::WouldBlock`,
+/// it is expected that they will notify the current task on readiness.
+/// Synchronous `std` types should not issue errors of this kind and
+/// are safe to use in this context. However, using these types with
+/// `AllowStdIo` will cause the event loop to block, so they should be used
+/// with care.
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct AllowStdIo<T>(T);
+
+impl<T> Unpin for AllowStdIo<T> {}
+
+macro_rules! try_with_interrupt {
+ ($e:expr) => {
+ loop {
+ match $e {
+ Ok(e) => {
+ break e;
+ }
+ Err(ref e) if e.kind() == ::std::io::ErrorKind::Interrupted => {
+ continue;
+ }
+ Err(e) => {
+ return Poll::Ready(Err(e));
+ }
+ }
+ }
+ }
+}
+
+impl<T> AllowStdIo<T> {
+ /// Creates a new `AllowStdIo` from an existing IO object.
+ pub fn new(io: T) -> Self {
+ AllowStdIo(io)
+ }
+
+ /// Returns a reference to the contained IO object.
+ pub fn get_ref(&self) -> &T {
+ &self.0
+ }
+
+ /// Returns a mutable reference to the contained IO object.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.0
+ }
+
+ /// Consumes self and returns the contained IO object.
+ pub fn into_inner(self) -> T {
+ self.0
+ }
+}
+
+impl<T> io::Write for AllowStdIo<T> where T: io::Write {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.0.write(buf)
+ }
+ fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
+ self.0.write_vectored(bufs)
+ }
+ fn flush(&mut self) -> io::Result<()> {
+ self.0.flush()
+ }
+ fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
+ self.0.write_all(buf)
+ }
+ fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> {
+ self.0.write_fmt(fmt)
+ }
+}
+
+impl<T> AsyncWrite for AllowStdIo<T> where T: io::Write {
+ fn poll_write(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8])
+ -> Poll<io::Result<usize>>
+ {
+ Poll::Ready(Ok(try_with_interrupt!(self.0.write(buf))))
+ }
+
+ fn poll_write_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>])
+ -> Poll<io::Result<usize>>
+ {
+ Poll::Ready(Ok(try_with_interrupt!(self.0.write_vectored(bufs))))
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ try_with_interrupt!(self.0.flush());
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.poll_flush(cx)
+ }
+}
+
+impl<T> io::Read for AllowStdIo<T> where T: io::Read {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.0.read(buf)
+ }
+ fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
+ self.0.read_vectored(bufs)
+ }
+ #[cfg(feature = "read-initializer")]
+ unsafe fn initializer(&self) -> Initializer {
+ self.0.initializer()
+ }
+ fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
+ self.0.read_to_end(buf)
+ }
+ fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
+ self.0.read_to_string(buf)
+ }
+ fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
+ self.0.read_exact(buf)
+ }
+}
+
+impl<T> AsyncRead for AllowStdIo<T> where T: io::Read {
+ fn poll_read(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8])
+ -> Poll<io::Result<usize>>
+ {
+ Poll::Ready(Ok(try_with_interrupt!(self.0.read(buf))))
+ }
+
+ fn poll_read_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>])
+ -> Poll<io::Result<usize>>
+ {
+ Poll::Ready(Ok(try_with_interrupt!(self.0.read_vectored(bufs))))
+ }
+
+ #[cfg(feature = "read-initializer")]
+ unsafe fn initializer(&self) -> Initializer {
+ self.0.initializer()
+ }
+}
+
+impl<T> io::Seek for AllowStdIo<T> where T: io::Seek {
+ fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
+ self.0.seek(pos)
+ }
+}
+
+impl<T> AsyncSeek for AllowStdIo<T> where T: io::Seek {
+ fn poll_seek(mut self: Pin<&mut Self>, _: &mut Context<'_>, pos: SeekFrom)
+ -> Poll<io::Result<u64>>
+ {
+ Poll::Ready(Ok(try_with_interrupt!(self.0.seek(pos))))
+ }
+}
+
+impl<T> io::BufRead for AllowStdIo<T> where T: io::BufRead {
+ fn fill_buf(&mut self) -> io::Result<&[u8]> {
+ self.0.fill_buf()
+ }
+ fn consume(&mut self, amt: usize) {
+ self.0.consume(amt)
+ }
+}
+
+impl<T> AsyncBufRead for AllowStdIo<T> where T: io::BufRead {
+ fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>)
+ -> Poll<io::Result<&[u8]>>
+ {
+ let this: *mut Self = &mut *self as *mut _;
+ Poll::Ready(Ok(try_with_interrupt!(unsafe { &mut *this }.0.fill_buf())))
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ self.0.consume(amt)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/buf_reader.rs b/third_party/rust/futures-util/src/io/buf_reader.rs
new file mode 100644
index 0000000000..96d3f2815e
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/buf_reader.rs
@@ -0,0 +1,259 @@
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "read-initializer")]
+use futures_io::Initializer;
+use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom};
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use std::io::{self, Read};
+use std::pin::Pin;
+use std::{cmp, fmt};
+use super::DEFAULT_BUF_SIZE;
+
+/// The `BufReader` struct adds buffering to any reader.
+///
+/// It can be excessively inefficient to work directly with a [`AsyncRead`]
+/// instance. A `BufReader` performs large, infrequent reads on the underlying
+/// [`AsyncRead`] and maintains an in-memory buffer of the results.
+///
+/// `BufReader` can improve the speed of programs that make *small* and
+/// *repeated* read calls to the same file or network socket. It does not
+/// help when reading very large amounts at once, or reading just one or a few
+/// times. It also provides no advantage when reading from a source that is
+/// already in memory, like a `Vec<u8>`.
+///
+/// When the `BufReader` is dropped, the contents of its buffer will be
+/// discarded. Creating multiple instances of a `BufReader` on the same
+/// stream can cause data loss.
+///
+/// [`AsyncRead`]: futures_io::AsyncRead
+///
+// TODO: Examples
+pub struct BufReader<R> {
+ inner: R,
+ buf: Box<[u8]>,
+ pos: usize,
+ cap: usize,
+}
+
+impl<R> BufReader<R> {
+ unsafe_pinned!(inner: R);
+ unsafe_unpinned!(pos: usize);
+ unsafe_unpinned!(cap: usize);
+}
+
+impl<R: AsyncRead> BufReader<R> {
+ /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
+ /// but may change in the future.
+ pub fn new(inner: R) -> Self {
+ Self::with_capacity(DEFAULT_BUF_SIZE, inner)
+ }
+
+ /// Creates a new `BufReader` with the specified buffer capacity.
+ pub fn with_capacity(capacity: usize, inner: R) -> Self {
+ unsafe {
+ let mut buffer = Vec::with_capacity(capacity);
+ buffer.set_len(capacity);
+ super::initialize(&inner, &mut buffer);
+ Self {
+ inner,
+ buf: buffer.into_boxed_slice(),
+ pos: 0,
+ cap: 0,
+ }
+ }
+ }
+
+ /// Gets a reference to the underlying reader.
+ ///
+ /// It is inadvisable to directly read from the underlying reader.
+ pub fn get_ref(&self) -> &R {
+ &self.inner
+ }
+
+ /// Gets a mutable reference to the underlying reader.
+ ///
+ /// It is inadvisable to directly read from the underlying reader.
+ pub fn get_mut(&mut self) -> &mut R {
+ &mut self.inner
+ }
+
+ /// Gets a pinned mutable reference to the underlying reader.
+ ///
+ /// It is inadvisable to directly read from the underlying reader.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
+ self.inner()
+ }
+
+ /// Consumes this `BufWriter`, returning the underlying reader.
+ ///
+ /// Note that any leftover data in the internal buffer is lost.
+ pub fn into_inner(self) -> R {
+ self.inner
+ }
+
+ /// Returns a reference to the internally buffered data.
+ ///
+ /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
+ pub fn buffer(&self) -> &[u8] {
+ &self.buf[self.pos..self.cap]
+ }
+
+ /// Invalidates all data in the internal buffer.
+ #[inline]
+ fn discard_buffer(mut self: Pin<&mut Self>) {
+ *self.as_mut().pos() = 0;
+ *self.cap() = 0;
+ }
+}
+
+impl<R: AsyncRead> AsyncRead for BufReader<R> {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ // If we don't have any buffered data and we're doing a massive read
+ // (larger than our internal buffer), bypass our internal buffer
+ // entirely.
+ if self.pos == self.cap && buf.len() >= self.buf.len() {
+ let res = ready!(self.as_mut().inner().poll_read(cx, buf));
+ self.discard_buffer();
+ return Poll::Ready(res);
+ }
+ let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
+ let nread = rem.read(buf)?;
+ self.consume(nread);
+ Poll::Ready(Ok(nread))
+ }
+
+ fn poll_read_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<io::Result<usize>> {
+ let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
+ if self.pos == self.cap && total_len >= self.buf.len() {
+ let res = ready!(self.as_mut().inner().poll_read_vectored(cx, bufs));
+ self.discard_buffer();
+ return Poll::Ready(res);
+ }
+ let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
+ let nread = rem.read_vectored(bufs)?;
+ self.consume(nread);
+ Poll::Ready(Ok(nread))
+ }
+
+ // we can't skip unconditionally because of the large buffer case in read.
+ #[cfg(feature = "read-initializer")]
+ unsafe fn initializer(&self) -> Initializer {
+ self.inner.initializer()
+ }
+}
+
+impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
+ fn poll_fill_buf(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<&[u8]>> {
+ let Self { inner, buf, cap, pos } = unsafe { self.get_unchecked_mut() };
+ let mut inner = unsafe { Pin::new_unchecked(inner) };
+
+ // If we've reached the end of our internal buffer then we need to fetch
+ // some more data from the underlying reader.
+ // Branch using `>=` instead of the more correct `==`
+ // to tell the compiler that the pos..cap slice is always valid.
+ if *pos >= *cap {
+ debug_assert!(*pos == *cap);
+ *cap = ready!(inner.as_mut().poll_read(cx, buf))?;
+ *pos = 0;
+ }
+ Poll::Ready(Ok(&buf[*pos..*cap]))
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ *self.as_mut().pos() = cmp::min(self.pos + amt, self.cap);
+ }
+}
+
+impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.inner().poll_write(cx, buf)
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.inner().poll_write_vectored(cx, bufs)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.inner().poll_flush(cx)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.inner().poll_close(cx)
+ }
+}
+
+impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("BufReader")
+ .field("reader", &self.inner)
+ .field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buf.len()))
+ .finish()
+ }
+}
+
+impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
+ /// Seek to an offset, in bytes, in the underlying reader.
+ ///
+ /// The position used for seeking with `SeekFrom::Current(_)` is the
+ /// position the underlying reader would be at if the `BufReader` had no
+ /// internal buffer.
+ ///
+ /// Seeking always discards the internal buffer, even if the seek position
+ /// would otherwise fall within it. This guarantees that calling
+ /// `.into_inner()` immediately after a seek yields the underlying reader
+ /// at the same position.
+ ///
+ /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
+ ///
+ /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
+ /// where `n` minus the internal buffer length overflows an `i64`, two
+ /// seeks will be performed instead of one. If the second seek returns
+ /// `Err`, the underlying reader will be left at the same position it would
+ /// have if you called `seek` with `SeekFrom::Current(0)`.
+ fn poll_seek(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<u64>> {
+ let result: u64;
+ if let SeekFrom::Current(n) = pos {
+ let remainder = (self.cap - self.pos) as i64;
+ // it should be safe to assume that remainder fits within an i64 as the alternative
+ // means we managed to allocate 8 exbibytes and that's absurd.
+ // But it's not out of the realm of possibility for some weird underlying reader to
+ // support seeking by i64::min_value() so we need to handle underflow when subtracting
+ // remainder.
+ if let Some(offset) = n.checked_sub(remainder) {
+ result = ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(offset)))?;
+ } else {
+ // seek backwards by our remainder, and then by the offset
+ ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(-remainder)))?;
+ self.as_mut().discard_buffer();
+ result = ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(n)))?;
+ }
+ } else {
+ // Seeking with Start/End doesn't care about our buffer length.
+ result = ready!(self.as_mut().inner().poll_seek(cx, pos))?;
+ }
+ self.discard_buffer();
+ Poll::Ready(Ok(result))
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/buf_writer.rs b/third_party/rust/futures-util/src/io/buf_writer.rs
new file mode 100644
index 0000000000..b0afbd81f2
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/buf_writer.rs
@@ -0,0 +1,222 @@
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "read-initializer")]
+use futures_io::Initializer;
+use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom};
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use std::fmt;
+use std::io::{self, Write};
+use std::pin::Pin;
+use super::DEFAULT_BUF_SIZE;
+
+/// Wraps a writer and buffers its output.
+///
+/// It can be excessively inefficient to work directly with something that
+/// implements [`AsyncWrite`]. A `BufWriter` keeps an in-memory buffer of data and
+/// writes it to an underlying writer in large, infrequent batches.
+///
+/// `BufWriter` can improve the speed of programs that make *small* and
+/// *repeated* write calls to the same file or network socket. It does not
+/// help when writing very large amounts at once, or writing just one or a few
+/// times. It also provides no advantage when writing to a destination that is
+/// in memory, like a `Vec<u8>`.
+///
+/// When the `BufWriter` is dropped, the contents of its buffer will be
+/// discarded. Creating multiple instances of a `BufWriter` on the same
+/// stream can cause data loss. If you need to write out the contents of its
+/// buffer, you must manually call flush before the writer is dropped.
+///
+/// [`AsyncWrite`]: futures_io::AsyncWrite
+/// [`flush`]: super::AsyncWriteExt::flush
+///
+// TODO: Examples
+pub struct BufWriter<W> {
+ inner: W,
+ buf: Vec<u8>,
+ written: usize,
+}
+
+impl<W> BufWriter<W> {
+ unsafe_pinned!(inner: W);
+ unsafe_unpinned!(buf: Vec<u8>);
+}
+
+impl<W: AsyncWrite> BufWriter<W> {
+ /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB,
+ /// but may change in the future.
+ pub fn new(inner: W) -> Self {
+ Self::with_capacity(DEFAULT_BUF_SIZE, inner)
+ }
+
+ /// Creates a new `BufWriter` with the specified buffer capacity.
+ pub fn with_capacity(cap: usize, inner: W) -> Self {
+ Self {
+ inner,
+ buf: Vec::with_capacity(cap),
+ written: 0,
+ }
+ }
+
+ fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let Self { inner, buf, written } = unsafe { self.get_unchecked_mut() };
+ let mut inner = unsafe { Pin::new_unchecked(inner) };
+
+ let len = buf.len();
+ let mut ret = Ok(());
+ while *written < len {
+ match ready!(inner.as_mut().poll_write(cx, &buf[*written..])) {
+ Ok(0) => {
+ ret = Err(io::Error::new(
+ io::ErrorKind::WriteZero,
+ "failed to write the buffered data",
+ ));
+ break;
+ }
+ Ok(n) => *written += n,
+ Err(e) => {
+ ret = Err(e);
+ break;
+ }
+ }
+ }
+ if *written > 0 {
+ buf.drain(..*written);
+ }
+ *written = 0;
+ Poll::Ready(ret)
+ }
+
+ /// Gets a reference to the underlying writer.
+ pub fn get_ref(&self) -> &W {
+ &self.inner
+ }
+
+ /// Gets a mutable reference to the underlying writer.
+ ///
+ /// It is inadvisable to directly write to the underlying writer.
+ pub fn get_mut(&mut self) -> &mut W {
+ &mut self.inner
+ }
+
+ /// Gets a pinned mutable reference to the underlying writer.
+ ///
+ /// It is inadvisable to directly write to the underlying writer.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
+ self.inner()
+ }
+
+ /// Consumes this `BufWriter`, returning the underlying writer.
+ ///
+ /// Note that any leftover data in the internal buffer is lost.
+ pub fn into_inner(self) -> W {
+ self.inner
+ }
+
+ /// Returns a reference to the internally buffered data.
+ pub fn buffer(&self) -> &[u8] {
+ &self.buf
+ }
+}
+
+impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ if self.buf.len() + buf.len() > self.buf.capacity() {
+ ready!(self.as_mut().flush_buf(cx))?;
+ }
+ if buf.len() >= self.buf.capacity() {
+ self.inner().poll_write(cx, buf)
+ } else {
+ Poll::Ready(self.buf().write(buf))
+ }
+ }
+
+ fn poll_write_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
+ if self.buf.len() + total_len > self.buf.capacity() {
+ ready!(self.as_mut().flush_buf(cx))?;
+ }
+ if total_len >= self.buf.capacity() {
+ self.inner().poll_write_vectored(cx, bufs)
+ } else {
+ Poll::Ready(self.buf().write_vectored(bufs))
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ ready!(self.as_mut().flush_buf(cx))?;
+ self.inner().poll_flush(cx)
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ ready!(self.as_mut().flush_buf(cx))?;
+ self.inner().poll_close(cx)
+ }
+}
+
+impl<W: AsyncRead> AsyncRead for BufWriter<W> {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ self.inner().poll_read(cx, buf)
+ }
+
+ fn poll_read_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.inner().poll_read_vectored(cx, bufs)
+ }
+
+ // we can't skip unconditionally because of the large buffer case in read.
+ #[cfg(feature = "read-initializer")]
+ unsafe fn initializer(&self) -> Initializer {
+ self.inner.initializer()
+ }
+}
+
+impl<W: AsyncBufRead> AsyncBufRead for BufWriter<W> {
+ fn poll_fill_buf(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<&[u8]>> {
+ self.inner().poll_fill_buf(cx)
+ }
+
+ fn consume(self: Pin<&mut Self>, amt: usize) {
+ self.inner().consume(amt)
+ }
+}
+
+impl<W: fmt::Debug> fmt::Debug for BufWriter<W> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("BufWriter")
+ .field("writer", &self.inner)
+ .field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity()))
+ .field("written", &self.written)
+ .finish()
+ }
+}
+
+impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
+ /// Seek to the offset, in bytes, in the underlying writer.
+ ///
+ /// Seeking always writes out the internal buffer before seeking.
+ fn poll_seek(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<u64>> {
+ ready!(self.as_mut().flush_buf(cx))?;
+ self.inner().poll_seek(cx, pos)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/chain.rs b/third_party/rust/futures-util/src/io/chain.rs
new file mode 100644
index 0000000000..64bbdec87a
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/chain.rs
@@ -0,0 +1,166 @@
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "read-initializer")]
+use futures_io::Initializer;
+use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut};
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use std::fmt;
+use std::io;
+use std::pin::Pin;
+
+/// Reader for the [`chain`](super::AsyncReadExt::chain) method.
+#[must_use = "readers do nothing unless polled"]
+pub struct Chain<T, U> {
+ first: T,
+ second: U,
+ done_first: bool,
+}
+
+impl<T, U> Unpin for Chain<T, U>
+where
+ T: Unpin,
+ U: Unpin,
+{
+}
+
+impl<T, U> Chain<T, U>
+where
+ T: AsyncRead,
+ U: AsyncRead,
+{
+ unsafe_pinned!(first: T);
+ unsafe_pinned!(second: U);
+ unsafe_unpinned!(done_first: bool);
+
+ pub(super) fn new(first: T, second: U) -> Self {
+ Self {
+ first,
+ second,
+ done_first: false,
+ }
+ }
+
+ /// Gets references to the underlying readers in this `Chain`.
+ pub fn get_ref(&self) -> (&T, &U) {
+ (&self.first, &self.second)
+ }
+
+ /// Gets mutable references to the underlying readers in this `Chain`.
+ ///
+ /// Care should be taken to avoid modifying the internal I/O state of the
+ /// underlying readers as doing so may corrupt the internal state of this
+ /// `Chain`.
+ pub fn get_mut(&mut self) -> (&mut T, &mut U) {
+ (&mut self.first, &mut self.second)
+ }
+
+ /// Gets pinned mutable references to the underlying readers in this `Chain`.
+ ///
+ /// Care should be taken to avoid modifying the internal I/O state of the
+ /// underlying readers as doing so may corrupt the internal state of this
+ /// `Chain`.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut T>, Pin<&mut U>) {
+ unsafe {
+ let Self { first, second, .. } = self.get_unchecked_mut();
+ (Pin::new_unchecked(first), Pin::new_unchecked(second))
+ }
+ }
+
+ /// Consumes the `Chain`, returning the wrapped readers.
+ pub fn into_inner(self) -> (T, U) {
+ (self.first, self.second)
+ }
+}
+
+impl<T, U> fmt::Debug for Chain<T, U>
+where
+ T: fmt::Debug,
+ U: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Chain")
+ .field("t", &self.first)
+ .field("u", &self.second)
+ .field("done_first", &self.done_first)
+ .finish()
+ }
+}
+
+impl<T, U> AsyncRead for Chain<T, U>
+where
+ T: AsyncRead,
+ U: AsyncRead,
+{
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ if !self.done_first {
+ match ready!(self.as_mut().first().poll_read(cx, buf)?) {
+ 0 if !buf.is_empty() => *self.as_mut().done_first() = true,
+ n => return Poll::Ready(Ok(n)),
+ }
+ }
+ self.second().poll_read(cx, buf)
+ }
+
+ fn poll_read_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<io::Result<usize>> {
+ if !self.done_first {
+ let n = ready!(self.as_mut().first().poll_read_vectored(cx, bufs)?);
+ if n == 0 && bufs.iter().any(|b| !b.is_empty()) {
+ *self.as_mut().done_first() = true
+ } else {
+ return Poll::Ready(Ok(n));
+ }
+ }
+ self.second().poll_read_vectored(cx, bufs)
+ }
+
+ #[cfg(feature = "read-initializer")]
+ unsafe fn initializer(&self) -> Initializer {
+ let initializer = self.first.initializer();
+ if initializer.should_initialize() {
+ initializer
+ } else {
+ self.second.initializer()
+ }
+ }
+}
+
+impl<T, U> AsyncBufRead for Chain<T, U>
+where
+ T: AsyncBufRead,
+ U: AsyncBufRead,
+{
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ let Self {
+ first,
+ second,
+ done_first,
+ } = unsafe { self.get_unchecked_mut() };
+ let first = unsafe { Pin::new_unchecked(first) };
+ let second = unsafe { Pin::new_unchecked(second) };
+
+ if !*done_first {
+ match ready!(first.poll_fill_buf(cx)?) {
+ buf if buf.is_empty() => {
+ *done_first = true;
+ }
+ buf => return Poll::Ready(Ok(buf)),
+ }
+ }
+ second.poll_fill_buf(cx)
+ }
+
+ fn consume(self: Pin<&mut Self>, amt: usize) {
+ if !self.done_first {
+ self.first().consume(amt)
+ } else {
+ self.second().consume(amt)
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/close.rs b/third_party/rust/futures-util/src/io/close.rs
new file mode 100644
index 0000000000..4d5669680b
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/close.rs
@@ -0,0 +1,28 @@
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncWrite;
+use std::io;
+use std::pin::Pin;
+
+/// Future for the [`close`](super::AsyncWriteExt::close) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Close<'a, W: ?Sized> {
+ writer: &'a mut W,
+}
+
+impl<W: ?Sized + Unpin> Unpin for Close<'_, W> {}
+
+impl<'a, W: AsyncWrite + ?Sized + Unpin> Close<'a, W> {
+ pub(super) fn new(writer: &'a mut W) -> Self {
+ Close { writer }
+ }
+}
+
+impl<W: AsyncWrite + ?Sized + Unpin> Future for Close<'_, W> {
+ type Output = io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Pin::new(&mut *self.writer).poll_close(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/copy.rs b/third_party/rust/futures-util/src/io/copy.rs
new file mode 100644
index 0000000000..9531aab996
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/copy.rs
@@ -0,0 +1,63 @@
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncRead, AsyncWrite};
+use std::io;
+use std::pin::Pin;
+use super::{BufReader, copy_buf, CopyBuf};
+use pin_utils::unsafe_pinned;
+
+/// Creates a future which copies all the bytes from one object to another.
+///
+/// The returned future will copy all the bytes read from this `AsyncRead` into the
+/// `writer` specified. This future will only complete once the `reader` has hit
+/// EOF and all bytes have been written to and flushed from the `writer`
+/// provided.
+///
+/// On success the number of bytes is returned.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::io::{self, AsyncWriteExt, Cursor};
+///
+/// let reader = Cursor::new([1, 2, 3, 4]);
+/// let mut writer = Cursor::new(vec![0u8; 5]);
+///
+/// let bytes = io::copy(reader, &mut writer).await?;
+/// writer.close().await?;
+///
+/// assert_eq!(bytes, 4);
+/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
+/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+/// ```
+pub fn copy<R, W>(reader: R, writer: &mut W) -> Copy<'_, R, W>
+where
+ R: AsyncRead,
+ W: AsyncWrite + Unpin + ?Sized,
+{
+ Copy {
+ inner: copy_buf(BufReader::new(reader), writer),
+ }
+}
+
+/// Future for the [`copy()`] function.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Copy<'a, R, W: ?Sized> {
+ inner: CopyBuf<'a, BufReader<R>, W>,
+}
+
+impl<'a, R: AsyncRead, W: ?Sized> Unpin for Copy<'a, R, W> where CopyBuf<'a, BufReader<R>, W>: Unpin {}
+
+impl<'a, R: AsyncRead, W: ?Sized> Copy<'a, R, W> {
+ unsafe_pinned!(inner: CopyBuf<'a, BufReader<R>, W>);
+}
+
+impl<R: AsyncRead, W: AsyncWrite + Unpin + ?Sized> Future for Copy<'_, R, W> {
+ type Output = io::Result<u64>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.inner().poll(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/copy_buf.rs b/third_party/rust/futures-util/src/io/copy_buf.rs
new file mode 100644
index 0000000000..98811825e0
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/copy_buf.rs
@@ -0,0 +1,87 @@
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncBufRead, AsyncWrite};
+use std::io;
+use std::pin::Pin;
+
+/// Creates a future which copies all the bytes from one object to another.
+///
+/// The returned future will copy all the bytes read from this `AsyncBufRead` into the
+/// `writer` specified. This future will only complete once the `reader` has hit
+/// EOF and all bytes have been written to and flushed from the `writer`
+/// provided.
+///
+/// On success the number of bytes is returned.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::io::{self, AsyncWriteExt, Cursor};
+///
+/// let reader = Cursor::new([1, 2, 3, 4]);
+/// let mut writer = Cursor::new(vec![0u8; 5]);
+///
+/// let bytes = io::copy_buf(reader, &mut writer).await?;
+/// writer.close().await?;
+///
+/// assert_eq!(bytes, 4);
+/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
+/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+/// ```
+pub fn copy_buf<R, W>(reader: R, writer: &mut W) -> CopyBuf<'_, R, W>
+where
+ R: AsyncBufRead,
+ W: AsyncWrite + Unpin + ?Sized,
+{
+ CopyBuf {
+ reader,
+ writer,
+ amt: 0,
+ }
+}
+
+/// Future for the [`copy_buf()`] function.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct CopyBuf<'a, R, W: ?Sized> {
+ reader: R,
+ writer: &'a mut W,
+ amt: u64,
+}
+
+impl<R: Unpin, W: ?Sized> Unpin for CopyBuf<'_, R, W> {}
+
+impl<R, W: Unpin + ?Sized> CopyBuf<'_, R, W> {
+ fn project(self: Pin<&mut Self>) -> (Pin<&mut R>, Pin<&mut W>, &mut u64) {
+ unsafe {
+ let this = self.get_unchecked_mut();
+ (Pin::new_unchecked(&mut this.reader), Pin::new(&mut *this.writer), &mut this.amt)
+ }
+ }
+}
+
+impl<R, W> Future for CopyBuf<'_, R, W>
+ where R: AsyncBufRead,
+ W: AsyncWrite + Unpin + ?Sized,
+{
+ type Output = io::Result<u64>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let (mut reader, mut writer, amt) = self.project();
+ loop {
+ let buffer = ready!(reader.as_mut().poll_fill_buf(cx))?;
+ if buffer.is_empty() {
+ ready!(writer.as_mut().poll_flush(cx))?;
+ return Poll::Ready(Ok(*amt));
+ }
+
+ let i = ready!(writer.as_mut().poll_write(cx, buffer))?;
+ if i == 0 {
+ return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
+ }
+ *amt += i as u64;
+ reader.as_mut().consume(i);
+ }
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/cursor.rs b/third_party/rust/futures-util/src/io/cursor.rs
new file mode 100644
index 0000000000..d1359231c5
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/cursor.rs
@@ -0,0 +1,238 @@
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "read_initializer")]
+use futures_io::Initializer;
+use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom};
+use std::io;
+use std::pin::Pin;
+
+/// A `Cursor` wraps an in-memory buffer and provides it with a
+/// [`AsyncSeek`] implementation.
+///
+/// `Cursor`s are used with in-memory buffers, anything implementing
+/// `AsRef<[u8]>`, to allow them to implement [`AsyncRead`] and/or [`AsyncWrite`],
+/// allowing these buffers to be used anywhere you might use a reader or writer
+/// that does actual I/O.
+///
+/// The standard library implements some I/O traits on various types which
+/// are commonly used as a buffer, like `Cursor<`[`Vec`]`<u8>>` and
+/// `Cursor<`[`&[u8]`][bytes]`>`.
+///
+/// [`AsyncSeek`]: trait.AsyncSeek.html
+/// [`AsyncRead`]: trait.AsyncRead.html
+/// [`AsyncWrite`]: trait.AsyncWrite.html
+/// [bytes]: https://doc.rust-lang.org/std/primitive.slice.html
+#[derive(Clone, Debug, Default)]
+pub struct Cursor<T> {
+ inner: io::Cursor<T>,
+}
+
+impl<T> Cursor<T> {
+ /// Creates a new cursor wrapping the provided underlying in-memory buffer.
+ ///
+ /// Cursor initial position is `0` even if underlying buffer (e.g., `Vec`)
+ /// is not empty. So writing to cursor starts with overwriting `Vec`
+ /// content, not with appending to it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::io::Cursor;
+ ///
+ /// let buff = Cursor::new(Vec::new());
+ /// # fn force_inference(_: &Cursor<Vec<u8>>) {}
+ /// # force_inference(&buff);
+ /// ```
+ pub fn new(inner: T) -> Cursor<T> {
+ Cursor {
+ inner: io::Cursor::new(inner),
+ }
+ }
+
+ /// Consumes this cursor, returning the underlying value.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::io::Cursor;
+ ///
+ /// let buff = Cursor::new(Vec::new());
+ /// # fn force_inference(_: &Cursor<Vec<u8>>) {}
+ /// # force_inference(&buff);
+ ///
+ /// let vec = buff.into_inner();
+ /// ```
+ pub fn into_inner(self) -> T {
+ self.inner.into_inner()
+ }
+
+ /// Gets a reference to the underlying value in this cursor.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::io::Cursor;
+ ///
+ /// let buff = Cursor::new(Vec::new());
+ /// # fn force_inference(_: &Cursor<Vec<u8>>) {}
+ /// # force_inference(&buff);
+ ///
+ /// let reference = buff.get_ref();
+ /// ```
+ pub fn get_ref(&self) -> &T {
+ self.inner.get_ref()
+ }
+
+ /// Gets a mutable reference to the underlying value in this cursor.
+ ///
+ /// Care should be taken to avoid modifying the internal I/O state of the
+ /// underlying value as it may corrupt this cursor's position.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::io::Cursor;
+ ///
+ /// let mut buff = Cursor::new(Vec::new());
+ /// # fn force_inference(_: &Cursor<Vec<u8>>) {}
+ /// # force_inference(&buff);
+ ///
+ /// let reference = buff.get_mut();
+ /// ```
+ pub fn get_mut(&mut self) -> &mut T {
+ self.inner.get_mut()
+ }
+
+ /// Returns the current position of this cursor.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncSeekExt, Cursor, SeekFrom};
+ ///
+ /// let mut buff = Cursor::new(vec![1, 2, 3, 4, 5]);
+ ///
+ /// assert_eq!(buff.position(), 0);
+ ///
+ /// buff.seek(SeekFrom::Current(2)).await?;
+ /// assert_eq!(buff.position(), 2);
+ ///
+ /// buff.seek(SeekFrom::Current(-1)).await?;
+ /// assert_eq!(buff.position(), 1);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ pub fn position(&self) -> u64 {
+ self.inner.position()
+ }
+
+ /// Sets the position of this cursor.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use futures::io::Cursor;
+ ///
+ /// let mut buff = Cursor::new(vec![1, 2, 3, 4, 5]);
+ ///
+ /// assert_eq!(buff.position(), 0);
+ ///
+ /// buff.set_position(2);
+ /// assert_eq!(buff.position(), 2);
+ ///
+ /// buff.set_position(4);
+ /// assert_eq!(buff.position(), 4);
+ /// ```
+ pub fn set_position(&mut self, pos: u64) {
+ self.inner.set_position(pos)
+ }
+}
+
+impl<T> AsyncSeek for Cursor<T>
+where
+ T: AsRef<[u8]> + Unpin,
+{
+ fn poll_seek(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ pos: SeekFrom,
+ ) -> Poll<io::Result<u64>> {
+ Poll::Ready(io::Seek::seek(&mut self.inner, pos))
+ }
+}
+
+impl<T: AsRef<[u8]> + Unpin> AsyncRead for Cursor<T> {
+ #[cfg(feature = "read_initializer")]
+ #[inline]
+ unsafe fn initializer(&self) -> Initializer {
+ io::Read::initializer(&self.inner)
+ }
+
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(io::Read::read(&mut self.inner, buf))
+ }
+
+ fn poll_read_vectored(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(io::Read::read_vectored(&mut self.inner, bufs))
+ }
+}
+
+impl<T> AsyncBufRead for Cursor<T>
+where
+ T: AsRef<[u8]> + Unpin,
+{
+ fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ Poll::Ready(io::BufRead::fill_buf(&mut self.get_mut().inner))
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ io::BufRead::consume(&mut self.inner, amt)
+ }
+}
+
+macro_rules! delegate_async_write_to_stdio {
+ () => {
+ fn poll_write(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8])
+ -> Poll<io::Result<usize>>
+ {
+ Poll::Ready(io::Write::write(&mut self.inner, buf))
+ }
+
+ fn poll_write_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>])
+ -> Poll<io::Result<usize>>
+ {
+ Poll::Ready(io::Write::write_vectored(&mut self.inner, bufs))
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(io::Write::flush(&mut self.inner))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.poll_flush(cx)
+ }
+ }
+}
+
+impl AsyncWrite for Cursor<&mut [u8]> {
+ delegate_async_write_to_stdio!();
+}
+
+impl AsyncWrite for Cursor<&mut Vec<u8>> {
+ delegate_async_write_to_stdio!();
+}
+
+impl AsyncWrite for Cursor<Vec<u8>> {
+ delegate_async_write_to_stdio!();
+}
+
+impl AsyncWrite for Cursor<Box<[u8]>> {
+ delegate_async_write_to_stdio!();
+}
diff --git a/third_party/rust/futures-util/src/io/empty.rs b/third_party/rust/futures-util/src/io/empty.rs
new file mode 100644
index 0000000000..ab2395a8af
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/empty.rs
@@ -0,0 +1,67 @@
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "read-initializer")]
+use futures_io::Initializer;
+use futures_io::{AsyncBufRead, AsyncRead};
+use std::fmt;
+use std::io;
+use std::pin::Pin;
+
+/// Reader for the [`empty()`] function.
+#[must_use = "readers do nothing unless polled"]
+pub struct Empty {
+ _priv: (),
+}
+
+/// Constructs a new handle to an empty reader.
+///
+/// All reads from the returned reader will return `Poll::Ready(Ok(0))`.
+///
+/// # Examples
+///
+/// A slightly sad example of not reading anything into a buffer:
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::io::{self, AsyncReadExt};
+///
+/// let mut buffer = String::new();
+/// let mut reader = io::empty();
+/// reader.read_to_string(&mut buffer).await?;
+/// assert!(buffer.is_empty());
+/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+/// ```
+pub fn empty() -> Empty {
+ Empty { _priv: () }
+}
+
+impl AsyncRead for Empty {
+ #[inline]
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ _: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(Ok(0))
+ }
+
+ #[cfg(feature = "read-initializer")]
+ #[inline]
+ unsafe fn initializer(&self) -> Initializer {
+ Initializer::nop()
+ }
+}
+
+impl AsyncBufRead for Empty {
+ #[inline]
+ fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ Poll::Ready(Ok(&[]))
+ }
+ #[inline]
+ fn consume(self: Pin<&mut Self>, _: usize) {}
+}
+
+impl fmt::Debug for Empty {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Empty { .. }")
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/flush.rs b/third_party/rust/futures-util/src/io/flush.rs
new file mode 100644
index 0000000000..70b867a207
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/flush.rs
@@ -0,0 +1,30 @@
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncWrite;
+use std::io;
+use std::pin::Pin;
+
+/// Future for the [`flush`](super::AsyncWriteExt::flush) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Flush<'a, W: ?Sized> {
+ writer: &'a mut W,
+}
+
+impl<W: ?Sized + Unpin> Unpin for Flush<'_, W> {}
+
+impl<'a, W: AsyncWrite + ?Sized + Unpin> Flush<'a, W> {
+ pub(super) fn new(writer: &'a mut W) -> Self {
+ Flush { writer }
+ }
+}
+
+impl<W> Future for Flush<'_, W>
+ where W: AsyncWrite + ?Sized + Unpin,
+{
+ type Output = io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Pin::new(&mut *self.writer).poll_flush(cx)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/into_sink.rs b/third_party/rust/futures-util/src/io/into_sink.rs
new file mode 100644
index 0000000000..bdc6b34140
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/into_sink.rs
@@ -0,0 +1,108 @@
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncWrite;
+use futures_sink::Sink;
+use std::io;
+use std::pin::Pin;
+use std::marker::Unpin;
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+
+#[derive(Debug)]
+struct Block<Item> {
+ offset: usize,
+ bytes: Item,
+}
+
+/// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method.
+#[must_use = "sinks do nothing unless polled"]
+#[derive(Debug)]
+pub struct IntoSink<W, Item> {
+ writer: W,
+ /// An outstanding block for us to push into the underlying writer, along with an offset of how
+ /// far into this block we have written already.
+ buffer: Option<Block<Item>>,
+}
+
+impl<W: Unpin, Item> Unpin for IntoSink<W, Item> {}
+
+impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> {
+ unsafe_pinned!(writer: W);
+ unsafe_unpinned!(buffer: Option<Block<Item>>);
+
+ pub(super) fn new(writer: W) -> Self {
+ IntoSink { writer, buffer: None }
+ }
+
+ fn project(self: Pin<&mut Self>) -> (Pin<&mut W>, &mut Option<Block<Item>>) {
+ unsafe {
+ let this = self.get_unchecked_mut();
+ (Pin::new_unchecked(&mut this.writer), &mut this.buffer)
+ }
+ }
+
+ /// If we have an outstanding block in `buffer` attempt to push it into the writer, does _not_
+ /// flush the writer after it succeeds in pushing the block into it.
+ fn poll_flush_buffer(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), io::Error>>
+ {
+ let (mut writer, buffer) = self.project();
+ if let Some(buffer) = buffer {
+ loop {
+ let bytes = buffer.bytes.as_ref();
+ let written = ready!(writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?;
+ buffer.offset += written;
+ if buffer.offset == bytes.len() {
+ break;
+ }
+ }
+ }
+ *buffer = None;
+ Poll::Ready(Ok(()))
+ }
+
+}
+
+impl<W: AsyncWrite, Item: AsRef<[u8]>> Sink<Item> for IntoSink<W, Item> {
+ type Error = io::Error;
+
+ fn poll_ready(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>>
+ {
+ ready!(self.as_mut().poll_flush_buffer(cx))?;
+ Poll::Ready(Ok(()))
+ }
+
+ #[allow(clippy::debug_assert_with_mut_call)]
+ fn start_send(
+ mut self: Pin<&mut Self>,
+ item: Item,
+ ) -> Result<(), Self::Error>
+ {
+ debug_assert!(self.as_mut().buffer().is_none());
+ *self.as_mut().buffer() = Some(Block { offset: 0, bytes: item });
+ Ok(())
+ }
+
+ fn poll_flush(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>>
+ {
+ ready!(self.as_mut().poll_flush_buffer(cx))?;
+ ready!(self.as_mut().writer().poll_flush(cx))?;
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>>
+ {
+ ready!(self.as_mut().poll_flush_buffer(cx))?;
+ ready!(self.as_mut().writer().poll_close(cx))?;
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/lines.rs b/third_party/rust/futures-util/src/io/lines.rs
new file mode 100644
index 0000000000..2e1261689b
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/lines.rs
@@ -0,0 +1,50 @@
+use futures_core::stream::Stream;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncBufRead;
+use std::io;
+use std::mem;
+use std::pin::Pin;
+use super::read_line::read_line_internal;
+
+/// Stream for the [`lines`](super::AsyncBufReadExt::lines) method.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Lines<R> {
+ reader: R,
+ buf: String,
+ bytes: Vec<u8>,
+ read: usize,
+}
+
+impl<R: Unpin> Unpin for Lines<R> {}
+
+impl<R: AsyncBufRead> Lines<R> {
+ pub(super) fn new(reader: R) -> Self {
+ Self {
+ reader,
+ buf: String::new(),
+ bytes: Vec::new(),
+ read: 0,
+ }
+ }
+}
+
+impl<R: AsyncBufRead> Stream for Lines<R> {
+ type Item = io::Result<String>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let Self { reader, buf, bytes, read } = unsafe { self.get_unchecked_mut() };
+ let reader = unsafe { Pin::new_unchecked(reader) };
+ let n = ready!(read_line_internal(reader, cx, buf, bytes, read))?;
+ if n == 0 && buf.is_empty() {
+ return Poll::Ready(None)
+ }
+ if buf.ends_with('\n') {
+ buf.pop();
+ if buf.ends_with('\r') {
+ buf.pop();
+ }
+ }
+ Poll::Ready(Some(Ok(mem::replace(buf, String::new()))))
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/mod.rs b/third_party/rust/futures-util/src/io/mod.rs
new file mode 100644
index 0000000000..43f183f424
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/mod.rs
@@ -0,0 +1,675 @@
+//! IO
+//!
+//! This module contains a number of functions for working with
+//! `AsyncRead`, `AsyncWrite`, `AsyncSeek`, and `AsyncBufRead` types, including
+//! the `AsyncReadExt`, `AsyncWriteExt`, `AsyncSeekExt`, and `AsyncBufReadExt`
+//! traits which add methods to the `AsyncRead`, `AsyncWrite`, `AsyncSeek`,
+//! and `AsyncBufRead` types.
+//!
+//! This module is only available when the `io` and `std` features of this
+//! library is activated, and it is activated by default.
+
+#[cfg(feature = "io-compat")]
+use crate::compat::Compat;
+use std::ptr;
+
+pub use futures_io::{
+ AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind,
+ IoSlice, IoSliceMut, Result, SeekFrom,
+};
+#[cfg(feature = "read-initializer")]
+pub use futures_io::Initializer;
+
+// used by `BufReader` and `BufWriter`
+// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
+const DEFAULT_BUF_SIZE: usize = 8 * 1024;
+
+/// Initializes a buffer if necessary.
+///
+/// A buffer is always initialized if `read-initializer` feature is disabled.
+#[inline]
+unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) {
+ #[cfg(feature = "read-initializer")]
+ {
+ if !_reader.initializer().should_initialize() {
+ return;
+ }
+ }
+ ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len())
+}
+
+mod allow_std;
+pub use self::allow_std::AllowStdIo;
+
+mod buf_reader;
+pub use self::buf_reader::BufReader;
+
+mod buf_writer;
+pub use self::buf_writer::BufWriter;
+
+mod chain;
+pub use self::chain::Chain;
+
+mod close;
+pub use self::close::Close;
+
+mod copy;
+pub use self::copy::{copy, Copy};
+
+mod copy_buf;
+pub use self::copy_buf::{copy_buf, CopyBuf};
+
+mod cursor;
+pub use self::cursor::Cursor;
+
+mod empty;
+pub use self::empty::{empty, Empty};
+
+mod flush;
+pub use self::flush::Flush;
+
+#[cfg(feature = "sink")]
+mod into_sink;
+#[cfg(feature = "sink")]
+pub use self::into_sink::IntoSink;
+
+mod lines;
+pub use self::lines::Lines;
+
+mod read;
+pub use self::read::Read;
+
+mod read_vectored;
+pub use self::read_vectored::ReadVectored;
+
+mod read_exact;
+pub use self::read_exact::ReadExact;
+
+mod read_line;
+pub use self::read_line::ReadLine;
+
+mod read_to_end;
+pub use self::read_to_end::ReadToEnd;
+
+mod read_to_string;
+pub use self::read_to_string::ReadToString;
+
+mod read_until;
+pub use self::read_until::ReadUntil;
+
+mod repeat;
+pub use self::repeat::{repeat, Repeat};
+
+mod seek;
+pub use self::seek::Seek;
+
+mod sink;
+pub use self::sink::{sink, Sink};
+
+mod split;
+pub use self::split::{ReadHalf, WriteHalf};
+
+mod take;
+pub use self::take::Take;
+
+mod window;
+pub use self::window::Window;
+
+mod write;
+pub use self::write::Write;
+
+mod write_vectored;
+pub use self::write_vectored::WriteVectored;
+
+mod write_all;
+pub use self::write_all::WriteAll;
+
+/// An extension trait which adds utility methods to `AsyncRead` types.
+pub trait AsyncReadExt: AsyncRead {
+ /// Creates an adaptor which will chain this stream with another.
+ ///
+ /// The returned `AsyncRead` instance will first read all bytes from this object
+ /// until EOF is encountered. Afterwards the output is equivalent to the
+ /// output of `next`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let reader1 = Cursor::new([1, 2, 3, 4]);
+ /// let reader2 = Cursor::new([5, 6, 7, 8]);
+ ///
+ /// let mut reader = reader1.chain(reader2);
+ /// let mut buffer = Vec::new();
+ ///
+ /// // read the value into a Vec.
+ /// reader.read_to_end(&mut buffer).await?;
+ /// assert_eq!(buffer, [1, 2, 3, 4, 5, 6, 7, 8]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn chain<R>(self, next: R) -> Chain<Self, R>
+ where
+ Self: Sized,
+ R: AsyncRead,
+ {
+ Chain::new(self, next)
+ }
+
+ /// Tries to read some bytes directly into the given `buf` in asynchronous
+ /// manner, returning a future type.
+ ///
+ /// The returned future will resolve to the number of bytes read once the read
+ /// operation is completed.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let mut reader = Cursor::new([1, 2, 3, 4]);
+ /// let mut output = [0u8; 5];
+ ///
+ /// let bytes = reader.read(&mut output[..]).await?;
+ ///
+ /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous
+ /// // reader. In a real system you could get anywhere from 1 to
+ /// // `output.len()` bytes in a single read.
+ /// assert_eq!(bytes, 4);
+ /// assert_eq!(output, [1, 2, 3, 4, 0]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
+ where Self: Unpin,
+ {
+ Read::new(self, buf)
+ }
+
+ /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored
+ /// IO operations.
+ ///
+ /// The returned future will resolve to the number of bytes read once the read
+ /// operation is completed.
+ fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self>
+ where Self: Unpin,
+ {
+ ReadVectored::new(self, bufs)
+ }
+
+ /// Creates a future which will read exactly enough bytes to fill `buf`,
+ /// returning an error if end of file (EOF) is hit sooner.
+ ///
+ /// The returned future will resolve once the read operation is completed.
+ ///
+ /// In the case of an error the buffer and the object will be discarded, with
+ /// the error yielded.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let mut reader = Cursor::new([1, 2, 3, 4]);
+ /// let mut output = [0u8; 4];
+ ///
+ /// reader.read_exact(&mut output).await?;
+ ///
+ /// assert_eq!(output, [1, 2, 3, 4]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ ///
+ /// ## EOF is hit before `buf` is filled
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{self, AsyncReadExt, Cursor};
+ ///
+ /// let mut reader = Cursor::new([1, 2, 3, 4]);
+ /// let mut output = [0u8; 5];
+ ///
+ /// let result = reader.read_exact(&mut output).await;
+ ///
+ /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
+ /// # });
+ /// ```
+ fn read_exact<'a>(
+ &'a mut self,
+ buf: &'a mut [u8],
+ ) -> ReadExact<'a, Self>
+ where Self: Unpin,
+ {
+ ReadExact::new(self, buf)
+ }
+
+ /// Creates a future which will read all the bytes from this `AsyncRead`.
+ ///
+ /// On success the total number of bytes read is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let mut reader = Cursor::new([1, 2, 3, 4]);
+ /// let mut output = Vec::with_capacity(4);
+ ///
+ /// let bytes = reader.read_to_end(&mut output).await?;
+ ///
+ /// assert_eq!(bytes, 4);
+ /// assert_eq!(output, vec![1, 2, 3, 4]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn read_to_end<'a>(
+ &'a mut self,
+ buf: &'a mut Vec<u8>,
+ ) -> ReadToEnd<'a, Self>
+ where Self: Unpin,
+ {
+ ReadToEnd::new(self, buf)
+ }
+
+ /// Creates a future which will read all the bytes from this `AsyncRead`.
+ ///
+ /// On success the total number of bytes read is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let mut reader = Cursor::new(&b"1234"[..]);
+ /// let mut buffer = String::with_capacity(4);
+ ///
+ /// let bytes = reader.read_to_string(&mut buffer).await?;
+ ///
+ /// assert_eq!(bytes, 4);
+ /// assert_eq!(buffer, String::from("1234"));
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn read_to_string<'a>(
+ &'a mut self,
+ buf: &'a mut String,
+ ) -> ReadToString<'a, Self>
+ where Self: Unpin,
+ {
+ ReadToString::new(self, buf)
+ }
+
+ /// Helper method for splitting this read/write object into two halves.
+ ///
+ /// The two halves returned implement the `AsyncRead` and `AsyncWrite`
+ /// traits, respectively.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{self, AsyncReadExt, Cursor};
+ ///
+ /// // Note that for `Cursor` the read and write halves share a single
+ /// // seek position. This may or may not be true for other types that
+ /// // implement both `AsyncRead` and `AsyncWrite`.
+ ///
+ /// let reader = Cursor::new([1, 2, 3, 4]);
+ /// let mut buffer = Cursor::new(vec![0, 0, 0, 0, 5, 6, 7, 8]);
+ /// let mut writer = Cursor::new(vec![0u8; 5]);
+ ///
+ /// {
+ /// let (buffer_reader, mut buffer_writer) = (&mut buffer).split();
+ /// io::copy(reader, &mut buffer_writer).await?;
+ /// io::copy(buffer_reader, &mut writer).await?;
+ /// }
+ ///
+ /// assert_eq!(buffer.into_inner(), [1, 2, 3, 4, 5, 6, 7, 8]);
+ /// assert_eq!(writer.into_inner(), [5, 6, 7, 8, 0]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
+ where Self: AsyncWrite + Sized,
+ {
+ split::split(self)
+ }
+
+ /// Creates an AsyncRead adapter which will read at most `limit` bytes
+ /// from the underlying reader.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let reader = Cursor::new(&b"12345678"[..]);
+ /// let mut buffer = [0; 5];
+ ///
+ /// let mut take = reader.take(4);
+ /// let n = take.read(&mut buffer).await?;
+ ///
+ /// assert_eq!(n, 4);
+ /// assert_eq!(&buffer, b"1234\0");
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn take(self, limit: u64) -> Take<Self>
+ where Self: Sized
+ {
+ Take::new(self, limit)
+ }
+
+ /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be
+ /// used as a futures 0.1 / tokio-io 0.1 `AsyncRead`. If the wrapped type
+ /// implements [`AsyncWrite`] as well, the result will also implement the
+ /// futures 0.1 / tokio 0.1 `AsyncWrite` trait.
+ ///
+ /// Requires the `io-compat` feature to enable.
+ #[cfg(feature = "io-compat")]
+ fn compat(self) -> Compat<Self>
+ where Self: Sized + Unpin,
+ {
+ Compat::new(self)
+ }
+}
+
+impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
+
+/// An extension trait which adds utility methods to `AsyncWrite` types.
+pub trait AsyncWriteExt: AsyncWrite {
+ /// Creates a future which will entirely flush this `AsyncWrite`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AllowStdIo, AsyncWriteExt};
+ /// use std::io::{BufWriter, Cursor};
+ ///
+ /// let mut output = vec![0u8; 5];
+ ///
+ /// {
+ /// let writer = Cursor::new(&mut output);
+ /// let mut buffered = AllowStdIo::new(BufWriter::new(writer));
+ /// buffered.write_all(&[1, 2]).await?;
+ /// buffered.write_all(&[3, 4]).await?;
+ /// buffered.flush().await?;
+ /// }
+ ///
+ /// assert_eq!(output, [1, 2, 3, 4, 0]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn flush(&mut self) -> Flush<'_, Self>
+ where Self: Unpin,
+ {
+ Flush::new(self)
+ }
+
+ /// Creates a future which will entirely close this `AsyncWrite`.
+ fn close(&mut self) -> Close<'_, Self>
+ where Self: Unpin,
+ {
+ Close::new(self)
+ }
+
+ /// Creates a future which will write bytes from `buf` into the object.
+ ///
+ /// The returned future will resolve to the number of bytes written once the write
+ /// operation is completed.
+ fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>
+ where Self: Unpin,
+ {
+ Write::new(self, buf)
+ }
+
+ /// Creates a future which will write bytes from `bufs` into the object using vectored
+ /// IO operations.
+ ///
+ /// The returned future will resolve to the number of bytes written once the write
+ /// operation is completed.
+ fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self>
+ where Self: Unpin,
+ {
+ WriteVectored::new(self, bufs)
+ }
+
+ /// Write data into this object.
+ ///
+ /// Creates a future that will write the entire contents of the buffer `buf` into
+ /// this `AsyncWrite`.
+ ///
+ /// The returned future will not complete until all the data has been written.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncWriteExt, Cursor};
+ ///
+ /// let mut writer = Cursor::new(vec![0u8; 5]);
+ ///
+ /// writer.write_all(&[1, 2, 3, 4]).await?;
+ ///
+ /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
+ where Self: Unpin,
+ {
+ WriteAll::new(self, buf)
+ }
+
+ /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be
+ /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
+ /// Requires the `io-compat` feature to enable.
+ #[cfg(feature = "io-compat")]
+ fn compat_write(self) -> Compat<Self>
+ where Self: Sized + Unpin,
+ {
+ Compat::new(self)
+ }
+
+
+ /// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`.
+ ///
+ /// This adapter produces a sink that will write each value passed to it
+ /// into the underlying writer.
+ ///
+ /// Note that this function consumes the given writer, returning a wrapped
+ /// version.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::AsyncWriteExt;
+ /// use futures::stream::{self, StreamExt};
+ ///
+ /// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]);
+ ///
+ /// let mut writer = vec![];
+ ///
+ /// stream.forward((&mut writer).into_sink()).await?;
+ ///
+ /// assert_eq!(writer, vec![1, 2, 3, 4, 5, 6]);
+ /// # Ok::<(), Box<dyn std::error::Error>>(())
+ /// # })?;
+ /// # Ok::<(), Box<dyn std::error::Error>>(())
+ /// ```
+ #[cfg(feature = "sink")]
+ fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
+ where Self: Sized,
+ {
+ IntoSink::new(self)
+ }
+}
+
+impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
+
+/// An extension trait which adds utility methods to `AsyncSeek` types.
+pub trait AsyncSeekExt: AsyncSeek {
+ /// Creates a future which will seek an IO object, and then yield the
+ /// new position in the object and the object itself.
+ ///
+ /// In the case of an error the buffer and the object will be discarded, with
+ /// the error yielded.
+ fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self>
+ where Self: Unpin,
+ {
+ Seek::new(self, pos)
+ }
+}
+
+impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
+
+/// An extension trait which adds utility methods to `AsyncBufRead` types.
+pub trait AsyncBufReadExt: AsyncBufRead {
+ /// Creates a future which will read all the bytes associated with this I/O
+ /// object into `buf` until the delimiter `byte` or EOF is reached.
+ /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
+ ///
+ /// This function will read bytes from the underlying stream until the
+ /// delimiter or EOF is found. Once found, all bytes up to, and including,
+ /// the delimiter (if found) will be appended to `buf`.
+ ///
+ /// The returned future will resolve to the number of bytes read once the read
+ /// operation is completed.
+ ///
+ /// In the case of an error the buffer and the object will be discarded, with
+ /// the error yielded.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncBufReadExt, Cursor};
+ ///
+ /// let mut cursor = Cursor::new(b"lorem-ipsum");
+ /// let mut buf = vec![];
+ ///
+ /// // cursor is at 'l'
+ /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
+ /// assert_eq!(num_bytes, 6);
+ /// assert_eq!(buf, b"lorem-");
+ /// buf.clear();
+ ///
+ /// // cursor is at 'i'
+ /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
+ /// assert_eq!(num_bytes, 5);
+ /// assert_eq!(buf, b"ipsum");
+ /// buf.clear();
+ ///
+ /// // cursor is at EOF
+ /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
+ /// assert_eq!(num_bytes, 0);
+ /// assert_eq!(buf, b"");
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn read_until<'a>(
+ &'a mut self,
+ byte: u8,
+ buf: &'a mut Vec<u8>,
+ ) -> ReadUntil<'a, Self>
+ where Self: Unpin,
+ {
+ ReadUntil::new(self, byte, buf)
+ }
+
+ /// Creates a future which will read all the bytes associated with this I/O
+ /// object into `buf` until a newline (the 0xA byte) or EOF is reached,
+ /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
+ ///
+ /// This function will read bytes from the underlying stream until the
+ /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
+ /// up to, and including, the delimiter (if found) will be appended to
+ /// `buf`.
+ ///
+ /// The returned future will resolve to the number of bytes read once the read
+ /// operation is completed.
+ ///
+ /// In the case of an error the buffer and the object will be discarded, with
+ /// the error yielded.
+ ///
+ /// # Errors
+ ///
+ /// This function has the same error semantics as [`read_until`] and will
+ /// also return an error if the read bytes are not valid UTF-8. If an I/O
+ /// error is encountered then `buf` may contain some bytes already read in
+ /// the event that all data read so far was valid UTF-8.
+ ///
+ /// [`read_until`]: AsyncBufReadExt::read_until
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncBufReadExt, Cursor};
+ ///
+ /// let mut cursor = Cursor::new(b"foo\nbar");
+ /// let mut buf = String::new();
+ ///
+ /// // cursor is at 'f'
+ /// let num_bytes = cursor.read_line(&mut buf).await?;
+ /// assert_eq!(num_bytes, 4);
+ /// assert_eq!(buf, "foo\n");
+ /// buf.clear();
+ ///
+ /// // cursor is at 'b'
+ /// let num_bytes = cursor.read_line(&mut buf).await?;
+ /// assert_eq!(num_bytes, 3);
+ /// assert_eq!(buf, "bar");
+ /// buf.clear();
+ ///
+ /// // cursor is at EOF
+ /// let num_bytes = cursor.read_line(&mut buf).await?;
+ /// assert_eq!(num_bytes, 0);
+ /// assert_eq!(buf, "");
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
+ where Self: Unpin,
+ {
+ ReadLine::new(self, buf)
+ }
+
+ /// Returns a stream over the lines of this reader.
+ /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
+ ///
+ /// The stream returned from this function will yield instances of
+ /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
+ /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
+ ///
+ /// [`io::Result`]: std::io::Result
+ /// [`String`]: String
+ ///
+ /// # Errors
+ ///
+ /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
+ ///
+ /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncBufReadExt, Cursor};
+ /// use futures::stream::StreamExt;
+ ///
+ /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor");
+ ///
+ /// let mut lines_stream = cursor.lines().map(|l| l.unwrap());
+ /// assert_eq!(lines_stream.next().await, Some(String::from("lorem")));
+ /// assert_eq!(lines_stream.next().await, Some(String::from("ipsum")));
+ /// assert_eq!(lines_stream.next().await, Some(String::from("dolor")));
+ /// assert_eq!(lines_stream.next().await, None);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ fn lines(self) -> Lines<Self>
+ where Self: Sized,
+ {
+ Lines::new(self)
+ }
+}
+
+impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
diff --git a/third_party/rust/futures-util/src/io/read.rs b/third_party/rust/futures-util/src/io/read.rs
new file mode 100644
index 0000000000..ea25959056
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/read.rs
@@ -0,0 +1,30 @@
+use crate::io::AsyncRead;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use std::io;
+use std::pin::Pin;
+
+/// Future for the [`read`](super::AsyncReadExt::read) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Read<'a, R: ?Sized> {
+ reader: &'a mut R,
+ buf: &'a mut [u8],
+}
+
+impl<R: ?Sized + Unpin> Unpin for Read<'_, R> {}
+
+impl<'a, R: AsyncRead + ?Sized + Unpin> Read<'a, R> {
+ pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self {
+ Read { reader, buf }
+ }
+}
+
+impl<R: AsyncRead + ?Sized + Unpin> Future for Read<'_, R> {
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ Pin::new(&mut this.reader).poll_read(cx, this.buf)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/read_exact.rs b/third_party/rust/futures-util/src/io/read_exact.rs
new file mode 100644
index 0000000000..a2bbd400ea
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/read_exact.rs
@@ -0,0 +1,41 @@
+use crate::io::AsyncRead;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use std::io;
+use std::mem;
+use std::pin::Pin;
+
+/// Future for the [`read_exact`](super::AsyncReadExt::read_exact) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct ReadExact<'a, R: ?Sized> {
+ reader: &'a mut R,
+ buf: &'a mut [u8],
+}
+
+impl<R: ?Sized + Unpin> Unpin for ReadExact<'_, R> {}
+
+impl<'a, R: AsyncRead + ?Sized + Unpin> ReadExact<'a, R> {
+ pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self {
+ ReadExact { reader, buf }
+ }
+}
+
+impl<R: AsyncRead + ?Sized + Unpin> Future for ReadExact<'_, R> {
+ type Output = io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ while !this.buf.is_empty() {
+ let n = ready!(Pin::new(&mut this.reader).poll_read(cx, this.buf))?;
+ {
+ let (_, rest) = mem::replace(&mut this.buf, &mut []).split_at_mut(n);
+ this.buf = rest;
+ }
+ if n == 0 {
+ return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()))
+ }
+ }
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/read_line.rs b/third_party/rust/futures-util/src/io/read_line.rs
new file mode 100644
index 0000000000..d830514b92
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/read_line.rs
@@ -0,0 +1,61 @@
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncBufRead;
+use std::io;
+use std::mem;
+use std::pin::Pin;
+use std::str;
+use super::read_until::read_until_internal;
+
+/// Future for the [`read_line`](super::AsyncBufReadExt::read_line) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct ReadLine<'a, R: ?Sized> {
+ reader: &'a mut R,
+ buf: &'a mut String,
+ bytes: Vec<u8>,
+ read: usize,
+}
+
+impl<R: ?Sized + Unpin> Unpin for ReadLine<'_, R> {}
+
+impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> {
+ pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
+ Self {
+ reader,
+ bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
+ buf,
+ read: 0,
+ }
+ }
+}
+
+pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
+ reader: Pin<&mut R>,
+ cx: &mut Context<'_>,
+ buf: &mut String,
+ bytes: &mut Vec<u8>,
+ read: &mut usize,
+) -> Poll<io::Result<usize>> {
+ let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
+ if str::from_utf8(&bytes).is_err() {
+ Poll::Ready(ret.and_then(|_| {
+ Err(io::Error::new(io::ErrorKind::InvalidData, "stream did not contain valid UTF-8"))
+ }))
+ } else {
+ debug_assert!(buf.is_empty());
+ debug_assert_eq!(*read, 0);
+ // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
+ mem::swap(unsafe { buf.as_mut_vec() }, bytes);
+ Poll::Ready(ret)
+ }
+}
+
+impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> {
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let Self { reader, buf, bytes, read } = &mut *self;
+ read_line_internal(Pin::new(reader), cx, buf, bytes, read)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/read_to_end.rs b/third_party/rust/futures-util/src/io/read_to_end.rs
new file mode 100644
index 0000000000..70b057829c
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/read_to_end.rs
@@ -0,0 +1,90 @@
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncRead;
+use std::io;
+use std::pin::Pin;
+use std::vec::Vec;
+
+/// Future for the [`read_to_end`](super::AsyncReadExt::read_to_end) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct ReadToEnd<'a, R: ?Sized> {
+ reader: &'a mut R,
+ buf: &'a mut Vec<u8>,
+ start_len: usize,
+}
+
+impl<R: ?Sized + Unpin> Unpin for ReadToEnd<'_, R> {}
+
+impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToEnd<'a, R> {
+ pub(super) fn new(reader: &'a mut R, buf: &'a mut Vec<u8>) -> Self {
+ let start_len = buf.len();
+ Self {
+ reader,
+ buf,
+ start_len,
+ }
+ }
+}
+
+struct Guard<'a> { buf: &'a mut Vec<u8>, len: usize }
+
+impl Drop for Guard<'_> {
+ fn drop(&mut self) {
+ unsafe { self.buf.set_len(self.len); }
+ }
+}
+
+// This uses an adaptive system to extend the vector when it fills. We want to
+// avoid paying to allocate and zero a huge chunk of memory if the reader only
+// has 4 bytes while still making large reads if the reader does have a ton
+// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
+// time is 4,500 times (!) slower than this if the reader has a very small
+// amount of data to return.
+//
+// Because we're extending the buffer with uninitialized data for trusted
+// readers, we need to make sure to truncate that if any of this panics.
+pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
+ mut rd: Pin<&mut R>,
+ cx: &mut Context<'_>,
+ buf: &mut Vec<u8>,
+ start_len: usize,
+) -> Poll<io::Result<usize>> {
+ let mut g = Guard { len: buf.len(), buf };
+ let ret;
+ loop {
+ if g.len == g.buf.len() {
+ unsafe {
+ g.buf.reserve(32);
+ let capacity = g.buf.capacity();
+ g.buf.set_len(capacity);
+ super::initialize(&rd, &mut g.buf[g.len..]);
+ }
+ }
+
+ match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
+ Ok(0) => {
+ ret = Poll::Ready(Ok(g.len - start_len));
+ break;
+ }
+ Ok(n) => g.len += n,
+ Err(e) => {
+ ret = Poll::Ready(Err(e));
+ break;
+ }
+ }
+ }
+
+ ret
+}
+
+impl<A> Future for ReadToEnd<'_, A>
+ where A: AsyncRead + ?Sized + Unpin,
+{
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf, this.start_len)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/read_to_string.rs b/third_party/rust/futures-util/src/io/read_to_string.rs
new file mode 100644
index 0000000000..56c95ce18d
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/read_to_string.rs
@@ -0,0 +1,66 @@
+use super::read_to_end::read_to_end_internal;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncRead;
+use std::pin::Pin;
+use std::vec::Vec;
+use std::{io, mem, str};
+
+/// Future for the [`read_to_string`](super::AsyncReadExt::read_to_string) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct ReadToString<'a, R: ?Sized> {
+ reader: &'a mut R,
+ buf: &'a mut String,
+ bytes: Vec<u8>,
+ start_len: usize,
+}
+
+impl<R: ?Sized + Unpin> Unpin for ReadToString<'_, R> {}
+
+impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToString<'a, R> {
+ pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
+ let start_len = buf.len();
+ Self {
+ reader,
+ bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
+ buf,
+ start_len,
+ }
+ }
+}
+
+fn read_to_string_internal<R: AsyncRead + ?Sized>(
+ reader: Pin<&mut R>,
+ cx: &mut Context<'_>,
+ buf: &mut String,
+ bytes: &mut Vec<u8>,
+ start_len: usize,
+) -> Poll<io::Result<usize>> {
+ let ret = ready!(read_to_end_internal(reader, cx, bytes, start_len));
+ if str::from_utf8(&bytes).is_err() {
+ Poll::Ready(ret.and_then(|_| {
+ Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "stream did not contain valid UTF-8",
+ ))
+ }))
+ } else {
+ debug_assert!(buf.is_empty());
+ // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
+ mem::swap(unsafe { buf.as_mut_vec() }, bytes);
+ Poll::Ready(ret)
+ }
+}
+
+impl<A> Future for ReadToString<'_, A>
+where
+ A: AsyncRead + ?Sized + Unpin,
+{
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let Self { reader, buf, bytes, start_len } = &mut *self;
+ read_to_string_internal(Pin::new(reader), cx, buf, bytes, *start_len)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/read_until.rs b/third_party/rust/futures-util/src/io/read_until.rs
new file mode 100644
index 0000000000..95c47e06b8
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/read_until.rs
@@ -0,0 +1,59 @@
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncBufRead;
+use std::io;
+use std::mem;
+use std::pin::Pin;
+
+/// Future for the [`read_until`](super::AsyncBufReadExt::read_until) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct ReadUntil<'a, R: ?Sized> {
+ reader: &'a mut R,
+ byte: u8,
+ buf: &'a mut Vec<u8>,
+ read: usize,
+}
+
+impl<R: ?Sized + Unpin> Unpin for ReadUntil<'_, R> {}
+
+impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> {
+ pub(super) fn new(reader: &'a mut R, byte: u8, buf: &'a mut Vec<u8>) -> Self {
+ Self { reader, byte, buf, read: 0 }
+ }
+}
+
+pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
+ mut reader: Pin<&mut R>,
+ cx: &mut Context<'_>,
+ byte: u8,
+ buf: &mut Vec<u8>,
+ read: &mut usize,
+) -> Poll<io::Result<usize>> {
+ loop {
+ let (done, used) = {
+ let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
+ if let Some(i) = memchr::memchr(byte, available) {
+ buf.extend_from_slice(&available[..=i]);
+ (true, i + 1)
+ } else {
+ buf.extend_from_slice(available);
+ (false, available.len())
+ }
+ };
+ reader.as_mut().consume(used);
+ *read += used;
+ if done || used == 0 {
+ return Poll::Ready(Ok(mem::replace(read, 0)));
+ }
+ }
+}
+
+impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let Self { reader, byte, buf, read } = &mut *self;
+ read_until_internal(Pin::new(reader), cx, *byte, buf, read)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/read_vectored.rs b/third_party/rust/futures-util/src/io/read_vectored.rs
new file mode 100644
index 0000000000..4e22df57e9
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/read_vectored.rs
@@ -0,0 +1,30 @@
+use crate::io::AsyncRead;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use std::io::{self, IoSliceMut};
+use std::pin::Pin;
+
+/// Future for the [`read_vectored`](super::AsyncReadExt::read_vectored) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct ReadVectored<'a, R: ?Sized> {
+ reader: &'a mut R,
+ bufs: &'a mut [IoSliceMut<'a>],
+}
+
+impl<R: ?Sized + Unpin> Unpin for ReadVectored<'_, R> {}
+
+impl<'a, R: AsyncRead + ?Sized + Unpin> ReadVectored<'a, R> {
+ pub(super) fn new(reader: &'a mut R, bufs: &'a mut [IoSliceMut<'a>]) -> Self {
+ Self { reader, bufs }
+ }
+}
+
+impl<R: AsyncRead + ?Sized + Unpin> Future for ReadVectored<'_, R> {
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ Pin::new(&mut this.reader).poll_read_vectored(cx, this.bufs)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/repeat.rs b/third_party/rust/futures-util/src/io/repeat.rs
new file mode 100644
index 0000000000..84abd7f769
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/repeat.rs
@@ -0,0 +1,73 @@
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "read-initializer")]
+use futures_io::Initializer;
+use futures_io::{AsyncRead, IoSliceMut};
+use std::fmt;
+use std::io;
+use std::pin::Pin;
+
+/// Reader for the [`repeat()`] function.
+#[must_use = "readers do nothing unless polled"]
+pub struct Repeat {
+ byte: u8,
+}
+
+/// Creates an instance of a reader that infinitely repeats one byte.
+///
+/// All reads from this reader will succeed by filling the specified buffer with
+/// the given byte.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::io::{self, AsyncReadExt};
+///
+/// let mut buffer = [0; 3];
+/// let mut reader = io::repeat(0b101);
+/// reader.read_exact(&mut buffer).await.unwrap();
+/// assert_eq!(buffer, [0b101, 0b101, 0b101]);
+/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+/// ```
+pub fn repeat(byte: u8) -> Repeat {
+ Repeat { byte }
+}
+
+impl AsyncRead for Repeat {
+ #[inline]
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ for slot in &mut *buf {
+ *slot = self.byte;
+ }
+ Poll::Ready(Ok(buf.len()))
+ }
+
+ #[inline]
+ fn poll_read_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<io::Result<usize>> {
+ let mut nwritten = 0;
+ for buf in bufs {
+ nwritten += ready!(self.as_mut().poll_read(cx, buf))?;
+ }
+ Poll::Ready(Ok(nwritten))
+ }
+
+ #[cfg(feature = "read-initializer")]
+ #[inline]
+ unsafe fn initializer(&self) -> Initializer {
+ Initializer::nop()
+ }
+}
+
+impl fmt::Debug for Repeat {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Repeat { .. }")
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/seek.rs b/third_party/rust/futures-util/src/io/seek.rs
new file mode 100644
index 0000000000..0aa2371393
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/seek.rs
@@ -0,0 +1,30 @@
+use crate::io::{AsyncSeek, SeekFrom};
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use std::io;
+use std::pin::Pin;
+
+/// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Seek<'a, S: ?Sized> {
+ seek: &'a mut S,
+ pos: SeekFrom,
+}
+
+impl<S: ?Sized + Unpin> Unpin for Seek<'_, S> {}
+
+impl<'a, S: AsyncSeek + ?Sized + Unpin> Seek<'a, S> {
+ pub(super) fn new(seek: &'a mut S, pos: SeekFrom) -> Self {
+ Self { seek, pos }
+ }
+}
+
+impl<S: AsyncSeek + ?Sized + Unpin> Future for Seek<'_, S> {
+ type Output = io::Result<u64>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ Pin::new(&mut this.seek).poll_seek(cx, this.pos)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/sink.rs b/third_party/rust/futures-util/src/io/sink.rs
new file mode 100644
index 0000000000..4a32ca7041
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/sink.rs
@@ -0,0 +1,67 @@
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncWrite, IoSlice};
+use std::fmt;
+use std::io;
+use std::pin::Pin;
+
+/// Writer for the [`sink()`] function.
+#[must_use = "writers do nothing unless polled"]
+pub struct Sink {
+ _priv: (),
+}
+
+/// Creates an instance of a writer which will successfully consume all data.
+///
+/// All calls to `poll_write` on the returned instance will return `Poll::Ready(Ok(buf.len()))`
+/// and the contents of the buffer will not be inspected.
+///
+/// # Examples
+///
+/// ```rust
+/// # futures::executor::block_on(async {
+/// use futures::io::{self, AsyncWriteExt};
+///
+/// let buffer = vec![1, 2, 3, 5, 8];
+/// let mut writer = io::sink();
+/// let num_bytes = writer.write(&buffer).await?;
+/// assert_eq!(num_bytes, 5);
+/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+/// ```
+pub fn sink() -> Sink {
+ Sink { _priv: () }
+}
+
+impl AsyncWrite for Sink {
+ #[inline]
+ fn poll_write(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(Ok(buf.len()))
+ }
+
+ #[inline]
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ Poll::Ready(Ok(bufs.iter().map(|b| b.len()).sum()))
+ }
+
+ #[inline]
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+ #[inline]
+ fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl fmt::Debug for Sink {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("Sink { .. }")
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/split.rs b/third_party/rust/futures-util/src/io/split.rs
new file mode 100644
index 0000000000..d12df867fc
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/split.rs
@@ -0,0 +1,69 @@
+use crate::lock::BiLock;
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut};
+use std::io;
+use std::pin::Pin;
+
+/// The readable half of an object returned from `AsyncRead::split`.
+#[derive(Debug)]
+pub struct ReadHalf<T> {
+ handle: BiLock<T>,
+}
+
+/// The writable half of an object returned from `AsyncRead::split`.
+#[derive(Debug)]
+pub struct WriteHalf<T> {
+ handle: BiLock<T>,
+}
+
+fn lock_and_then<T, U, E, F>(
+ lock: &BiLock<T>,
+ cx: &mut Context<'_>,
+ f: F
+) -> Poll<Result<U, E>>
+ where F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<U, E>>
+{
+ let mut l = ready!(lock.poll_lock(cx));
+ f(l.as_pin_mut(), cx)
+}
+
+pub(super) fn split<T: AsyncRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) {
+ let (a, b) = BiLock::new(t);
+ (ReadHalf { handle: a }, WriteHalf { handle: b })
+}
+
+impl<R: AsyncRead> AsyncRead for ReadHalf<R> {
+ fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
+ -> Poll<io::Result<usize>>
+ {
+ lock_and_then(&self.handle, cx, |l, cx| l.poll_read(cx, buf))
+ }
+
+ fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>])
+ -> Poll<io::Result<usize>>
+ {
+ lock_and_then(&self.handle, cx, |l, cx| l.poll_read_vectored(cx, bufs))
+ }
+}
+
+impl<W: AsyncWrite> AsyncWrite for WriteHalf<W> {
+ fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8])
+ -> Poll<io::Result<usize>>
+ {
+ lock_and_then(&self.handle, cx, |l, cx| l.poll_write(cx, buf))
+ }
+
+ fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>])
+ -> Poll<io::Result<usize>>
+ {
+ lock_and_then(&self.handle, cx, |l, cx| l.poll_write_vectored(cx, bufs))
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ lock_and_then(&self.handle, cx, |l, cx| l.poll_flush(cx))
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ lock_and_then(&self.handle, cx, |l, cx| l.poll_close(cx))
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/take.rs b/third_party/rust/futures-util/src/io/take.rs
new file mode 100644
index 0000000000..b1f33fa468
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/take.rs
@@ -0,0 +1,210 @@
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "read-initializer")]
+use futures_io::Initializer;
+use futures_io::{AsyncRead, AsyncBufRead};
+use pin_utils::{unsafe_pinned, unsafe_unpinned};
+use std::{cmp, io};
+use std::pin::Pin;
+
+/// Reader for the [`take`](super::AsyncReadExt::take) method.
+#[derive(Debug)]
+#[must_use = "readers do nothing unless you `.await` or poll them"]
+pub struct Take<R> {
+ inner: R,
+ // Add '_' to avoid conflicts with `limit` method.
+ limit_: u64,
+}
+
+impl<R: Unpin> Unpin for Take<R> { }
+
+impl<R: AsyncRead> Take<R> {
+ unsafe_pinned!(inner: R);
+ unsafe_unpinned!(limit_: u64);
+
+ pub(super) fn new(inner: R, limit: u64) -> Self {
+ Self { inner, limit_: limit }
+ }
+
+ /// Returns the remaining number of bytes that can be
+ /// read before this instance will return EOF.
+ ///
+ /// # Note
+ ///
+ /// This instance may reach `EOF` after reading fewer bytes than indicated by
+ /// this method if the underlying [`AsyncRead`] instance reaches EOF.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let reader = Cursor::new(&b"12345678"[..]);
+ /// let mut buffer = [0; 2];
+ ///
+ /// let mut take = reader.take(4);
+ /// let n = take.read(&mut buffer).await?;
+ ///
+ /// assert_eq!(take.limit(), 2);
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ pub fn limit(&self) -> u64 {
+ self.limit_
+ }
+
+ /// Sets the number of bytes that can be read before this instance will
+ /// return EOF. This is the same as constructing a new `Take` instance, so
+ /// the amount of bytes read and the previous limit value don't matter when
+ /// calling this method.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let reader = Cursor::new(&b"12345678"[..]);
+ /// let mut buffer = [0; 4];
+ ///
+ /// let mut take = reader.take(4);
+ /// let n = take.read(&mut buffer).await?;
+ ///
+ /// assert_eq!(n, 4);
+ /// assert_eq!(take.limit(), 0);
+ ///
+ /// take.set_limit(10);
+ /// let n = take.read(&mut buffer).await?;
+ /// assert_eq!(n, 4);
+ ///
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ pub fn set_limit(&mut self, limit: u64) {
+ self.limit_ = limit
+ }
+
+ /// Gets a reference to the underlying reader.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let reader = Cursor::new(&b"12345678"[..]);
+ /// let mut buffer = [0; 4];
+ ///
+ /// let mut take = reader.take(4);
+ /// let n = take.read(&mut buffer).await?;
+ ///
+ /// let cursor_ref = take.get_ref();
+ /// assert_eq!(cursor_ref.position(), 4);
+ ///
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ pub fn get_ref(&self) -> &R {
+ &self.inner
+ }
+
+ /// Gets a mutable reference to the underlying reader.
+ ///
+ /// Care should be taken to avoid modifying the internal I/O state of the
+ /// underlying reader as doing so may corrupt the internal limit of this
+ /// `Take`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let reader = Cursor::new(&b"12345678"[..]);
+ /// let mut buffer = [0; 4];
+ ///
+ /// let mut take = reader.take(4);
+ /// let n = take.read(&mut buffer).await?;
+ ///
+ /// let cursor_mut = take.get_mut();
+ ///
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ pub fn get_mut(&mut self) -> &mut R {
+ &mut self.inner
+ }
+
+ /// Gets a pinned mutable reference to the underlying reader.
+ ///
+ /// Care should be taken to avoid modifying the internal I/O state of the
+ /// underlying reader as doing so may corrupt the internal limit of this
+ /// `Take`.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
+ self.inner()
+ }
+
+ /// Consumes the `Take`, returning the wrapped reader.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::io::{AsyncReadExt, Cursor};
+ ///
+ /// let reader = Cursor::new(&b"12345678"[..]);
+ /// let mut buffer = [0; 4];
+ ///
+ /// let mut take = reader.take(4);
+ /// let n = take.read(&mut buffer).await?;
+ ///
+ /// let cursor = take.into_inner();
+ /// assert_eq!(cursor.position(), 4);
+ ///
+ /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+ /// ```
+ pub fn into_inner(self) -> R {
+ self.inner
+ }
+}
+
+impl<R: AsyncRead> AsyncRead for Take<R> {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<Result<usize, io::Error>> {
+ if self.limit_ == 0 {
+ return Poll::Ready(Ok(0));
+ }
+
+ let max = std::cmp::min(buf.len() as u64, self.limit_) as usize;
+ let n = ready!(self.as_mut().inner().poll_read(cx, &mut buf[..max]))?;
+ *self.as_mut().limit_() -= n as u64;
+ Poll::Ready(Ok(n))
+ }
+
+ #[cfg(feature = "read-initializer")]
+ unsafe fn initializer(&self) -> Initializer {
+ self.inner.initializer()
+ }
+}
+
+impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+ let Self { inner, limit_ } = unsafe { self.get_unchecked_mut() };
+ let inner = unsafe { Pin::new_unchecked(inner) };
+
+ // Don't call into inner reader at all at EOF because it may still block
+ if *limit_ == 0 {
+ return Poll::Ready(Ok(&[]));
+ }
+
+ let buf = ready!(inner.poll_fill_buf(cx)?);
+ let cap = cmp::min(buf.len() as u64, *limit_) as usize;
+ Poll::Ready(Ok(&buf[..cap]))
+ }
+
+ fn consume(mut self: Pin<&mut Self>, amt: usize) {
+ // Don't let callers reset the limit by passing an overlarge value
+ let amt = cmp::min(amt as u64, self.limit_) as usize;
+ *self.as_mut().limit_() -= amt as u64;
+ self.inner().consume(amt);
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/window.rs b/third_party/rust/futures-util/src/io/window.rs
new file mode 100644
index 0000000000..3424197d75
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/window.rs
@@ -0,0 +1,107 @@
+use std::ops::{Bound, Range, RangeBounds};
+
+/// A owned window around an underlying buffer.
+///
+/// Normally slices work great for considering sub-portions of a buffer, but
+/// unfortunately a slice is a *borrowed* type in Rust which has an associated
+/// lifetime. When working with future and async I/O these lifetimes are not
+/// always appropriate, and are sometimes difficult to store in tasks. This
+/// type strives to fill this gap by providing an "owned slice" around an
+/// underlying buffer of bytes.
+///
+/// A `Window<T>` wraps an underlying buffer, `T`, and has configurable
+/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation
+/// that this type carries.
+///
+/// This type can be particularly useful when working with the `write_all`
+/// combinator in this crate. Data can be sliced via `Window`, consumed by
+/// `write_all`, and then earned back once the write operation finishes through
+/// the `into_inner` method on this type.
+#[derive(Debug)]
+pub struct Window<T> {
+ inner: T,
+ range: Range<usize>,
+}
+
+impl<T: AsRef<[u8]>> Window<T> {
+ /// Creates a new window around the buffer `t` defaulting to the entire
+ /// slice.
+ ///
+ /// Further methods can be called on the returned `Window<T>` to alter the
+ /// window into the data provided.
+ pub fn new(t: T) -> Self {
+ Self {
+ range: 0..t.as_ref().len(),
+ inner: t,
+ }
+ }
+
+ /// Gets a shared reference to the underlying buffer inside of this
+ /// `Window`.
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Gets a mutable reference to the underlying buffer inside of this
+ /// `Window`.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Consumes this `Window`, returning the underlying buffer.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ /// Returns the starting index of this window into the underlying buffer
+ /// `T`.
+ pub fn start(&self) -> usize {
+ self.range.start
+ }
+
+ /// Returns the end index of this window into the underlying buffer
+ /// `T`.
+ pub fn end(&self) -> usize {
+ self.range.end
+ }
+
+ /// Changes the range of this window to the range specified.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `range` is out of bounds for the underlying
+ /// slice or if [`start_bound()`] of `range` comes after the [`end_bound()`].
+ ///
+ /// [`start_bound()`]: std::ops::RangeBounds::start_bound
+ /// [`end_bound()`]: std::ops::RangeBounds::end_bound
+ pub fn set<R: RangeBounds<usize>>(&mut self, range: R) {
+ let start = match range.start_bound() {
+ Bound::Included(n) => *n,
+ Bound::Excluded(n) => *n + 1,
+ Bound::Unbounded => 0,
+ };
+ let end = match range.end_bound() {
+ Bound::Included(n) => *n + 1,
+ Bound::Excluded(n) => *n,
+ Bound::Unbounded => self.inner.as_ref().len(),
+ };
+
+ assert!(end <= self.inner.as_ref().len());
+ assert!(start <= end);
+
+ self.range.start = start;
+ self.range.end = end;
+ }
+}
+
+impl<T: AsRef<[u8]>> AsRef<[u8]> for Window<T> {
+ fn as_ref(&self) -> &[u8] {
+ &self.inner.as_ref()[self.range.start..self.range.end]
+ }
+}
+
+impl<T: AsMut<[u8]>> AsMut<[u8]> for Window<T> {
+ fn as_mut(&mut self) -> &mut [u8] {
+ &mut self.inner.as_mut()[self.range.start..self.range.end]
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/write.rs b/third_party/rust/futures-util/src/io/write.rs
new file mode 100644
index 0000000000..c47ef9e2eb
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/write.rs
@@ -0,0 +1,30 @@
+use crate::io::AsyncWrite;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use std::io;
+use std::pin::Pin;
+
+/// Future for the [`write`](super::AsyncWriteExt::write) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct Write<'a, W: ?Sized> {
+ writer: &'a mut W,
+ buf: &'a [u8],
+}
+
+impl<W: ?Sized + Unpin> Unpin for Write<'_, W> {}
+
+impl<'a, W: AsyncWrite + ?Sized + Unpin> Write<'a, W> {
+ pub(super) fn new(writer: &'a mut W, buf: &'a [u8]) -> Self {
+ Self { writer, buf }
+ }
+}
+
+impl<W: AsyncWrite + ?Sized + Unpin> Future for Write<'_, W> {
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ Pin::new(&mut this.writer).poll_write(cx, this.buf)
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/write_all.rs b/third_party/rust/futures-util/src/io/write_all.rs
new file mode 100644
index 0000000000..57f1400b0e
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/write_all.rs
@@ -0,0 +1,42 @@
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncWrite;
+use std::io;
+use std::mem;
+use std::pin::Pin;
+
+/// Future for the [`write_all`](super::AsyncWriteExt::write_all) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct WriteAll<'a, W: ?Sized> {
+ writer: &'a mut W,
+ buf: &'a [u8],
+}
+
+impl<W: ?Sized + Unpin> Unpin for WriteAll<'_, W> {}
+
+impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAll<'a, W> {
+ pub(super) fn new(writer: &'a mut W, buf: &'a [u8]) -> Self {
+ WriteAll { writer, buf }
+ }
+}
+
+impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAll<'_, W> {
+ type Output = io::Result<()>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let this = &mut *self;
+ while !this.buf.is_empty() {
+ let n = ready!(Pin::new(&mut this.writer).poll_write(cx, this.buf))?;
+ {
+ let (_, rest) = mem::replace(&mut this.buf, &[]).split_at(n);
+ this.buf = rest;
+ }
+ if n == 0 {
+ return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
+ }
+ }
+
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/third_party/rust/futures-util/src/io/write_vectored.rs b/third_party/rust/futures-util/src/io/write_vectored.rs
new file mode 100644
index 0000000000..14a01d7302
--- /dev/null
+++ b/third_party/rust/futures-util/src/io/write_vectored.rs
@@ -0,0 +1,30 @@
+use crate::io::AsyncWrite;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use std::io::{self, IoSlice};
+use std::pin::Pin;
+
+/// Future for the [`write_vectored`](super::AsyncWriteExt::write_vectored) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct WriteVectored<'a, W: ?Sized> {
+ writer: &'a mut W,
+ bufs: &'a [IoSlice<'a>],
+}
+
+impl<W: ?Sized + Unpin> Unpin for WriteVectored<'_, W> {}
+
+impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteVectored<'a, W> {
+ pub(super) fn new(writer: &'a mut W, bufs: &'a [IoSlice<'a>]) -> Self {
+ Self { writer, bufs }
+ }
+}
+
+impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteVectored<'_, W> {
+ type Output = io::Result<usize>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = &mut *self;
+ Pin::new(&mut this.writer).poll_write_vectored(cx, this.bufs)
+ }
+}