diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/futures-util/src/io | |
parent | Initial commit. (diff) | |
download | firefox-upstream.tar.xz firefox-upstream.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/futures-util/src/io')
29 files changed, 3207 insertions, 0 deletions
diff --git a/third_party/rust/futures-util/src/io/allow_std.rs b/third_party/rust/futures-util/src/io/allow_std.rs new file mode 100644 index 0000000000..346e9babea --- /dev/null +++ b/third_party/rust/futures-util/src/io/allow_std.rs @@ -0,0 +1,179 @@ +use futures_core::task::{Context, Poll}; +#[cfg(feature = "read-initializer")] +use futures_io::Initializer; +use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, IoSlice, IoSliceMut, SeekFrom}; +use std::{fmt, io}; +use std::pin::Pin; + +/// A simple wrapper type which allows types which implement only +/// implement `std::io::Read` or `std::io::Write` +/// to be used in contexts which expect an `AsyncRead` or `AsyncWrite`. +/// +/// If these types issue an error with the kind `io::ErrorKind::WouldBlock`, +/// it is expected that they will notify the current task on readiness. +/// Synchronous `std` types should not issue errors of this kind and +/// are safe to use in this context. However, using these types with +/// `AllowStdIo` will cause the event loop to block, so they should be used +/// with care. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct AllowStdIo<T>(T); + +impl<T> Unpin for AllowStdIo<T> {} + +macro_rules! try_with_interrupt { + ($e:expr) => { + loop { + match $e { + Ok(e) => { + break e; + } + Err(ref e) if e.kind() == ::std::io::ErrorKind::Interrupted => { + continue; + } + Err(e) => { + return Poll::Ready(Err(e)); + } + } + } + } +} + +impl<T> AllowStdIo<T> { + /// Creates a new `AllowStdIo` from an existing IO object. + pub fn new(io: T) -> Self { + AllowStdIo(io) + } + + /// Returns a reference to the contained IO object. + pub fn get_ref(&self) -> &T { + &self.0 + } + + /// Returns a mutable reference to the contained IO object. + pub fn get_mut(&mut self) -> &mut T { + &mut self.0 + } + + /// Consumes self and returns the contained IO object. + pub fn into_inner(self) -> T { + self.0 + } +} + +impl<T> io::Write for AllowStdIo<T> where T: io::Write { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0.write(buf) + } + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { + self.0.write_vectored(bufs) + } + fn flush(&mut self) -> io::Result<()> { + self.0.flush() + } + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + self.0.write_all(buf) + } + fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> { + self.0.write_fmt(fmt) + } +} + +impl<T> AsyncWrite for AllowStdIo<T> where T: io::Write { + fn poll_write(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) + -> Poll<io::Result<usize>> + { + Poll::Ready(Ok(try_with_interrupt!(self.0.write(buf)))) + } + + fn poll_write_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>]) + -> Poll<io::Result<usize>> + { + Poll::Ready(Ok(try_with_interrupt!(self.0.write_vectored(bufs)))) + } + + fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + try_with_interrupt!(self.0.flush()); + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.poll_flush(cx) + } +} + +impl<T> io::Read for AllowStdIo<T> where T: io::Read { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.0.read(buf) + } + fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { + self.0.read_vectored(bufs) + } + #[cfg(feature = "read-initializer")] + unsafe fn initializer(&self) -> Initializer { + self.0.initializer() + } + fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> { + self.0.read_to_end(buf) + } + fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> { + self.0.read_to_string(buf) + } + fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> { + self.0.read_exact(buf) + } +} + +impl<T> AsyncRead for AllowStdIo<T> where T: io::Read { + fn poll_read(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) + -> Poll<io::Result<usize>> + { + Poll::Ready(Ok(try_with_interrupt!(self.0.read(buf)))) + } + + fn poll_read_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) + -> Poll<io::Result<usize>> + { + Poll::Ready(Ok(try_with_interrupt!(self.0.read_vectored(bufs)))) + } + + #[cfg(feature = "read-initializer")] + unsafe fn initializer(&self) -> Initializer { + self.0.initializer() + } +} + +impl<T> io::Seek for AllowStdIo<T> where T: io::Seek { + fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> { + self.0.seek(pos) + } +} + +impl<T> AsyncSeek for AllowStdIo<T> where T: io::Seek { + fn poll_seek(mut self: Pin<&mut Self>, _: &mut Context<'_>, pos: SeekFrom) + -> Poll<io::Result<u64>> + { + Poll::Ready(Ok(try_with_interrupt!(self.0.seek(pos)))) + } +} + +impl<T> io::BufRead for AllowStdIo<T> where T: io::BufRead { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + self.0.fill_buf() + } + fn consume(&mut self, amt: usize) { + self.0.consume(amt) + } +} + +impl<T> AsyncBufRead for AllowStdIo<T> where T: io::BufRead { + fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) + -> Poll<io::Result<&[u8]>> + { + let this: *mut Self = &mut *self as *mut _; + Poll::Ready(Ok(try_with_interrupt!(unsafe { &mut *this }.0.fill_buf()))) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.0.consume(amt) + } +} diff --git a/third_party/rust/futures-util/src/io/buf_reader.rs b/third_party/rust/futures-util/src/io/buf_reader.rs new file mode 100644 index 0000000000..96d3f2815e --- /dev/null +++ b/third_party/rust/futures-util/src/io/buf_reader.rs @@ -0,0 +1,259 @@ +use futures_core::task::{Context, Poll}; +#[cfg(feature = "read-initializer")] +use futures_io::Initializer; +use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use std::io::{self, Read}; +use std::pin::Pin; +use std::{cmp, fmt}; +use super::DEFAULT_BUF_SIZE; + +/// The `BufReader` struct adds buffering to any reader. +/// +/// It can be excessively inefficient to work directly with a [`AsyncRead`] +/// instance. A `BufReader` performs large, infrequent reads on the underlying +/// [`AsyncRead`] and maintains an in-memory buffer of the results. +/// +/// `BufReader` can improve the speed of programs that make *small* and +/// *repeated* read calls to the same file or network socket. It does not +/// help when reading very large amounts at once, or reading just one or a few +/// times. It also provides no advantage when reading from a source that is +/// already in memory, like a `Vec<u8>`. +/// +/// When the `BufReader` is dropped, the contents of its buffer will be +/// discarded. Creating multiple instances of a `BufReader` on the same +/// stream can cause data loss. +/// +/// [`AsyncRead`]: futures_io::AsyncRead +/// +// TODO: Examples +pub struct BufReader<R> { + inner: R, + buf: Box<[u8]>, + pos: usize, + cap: usize, +} + +impl<R> BufReader<R> { + unsafe_pinned!(inner: R); + unsafe_unpinned!(pos: usize); + unsafe_unpinned!(cap: usize); +} + +impl<R: AsyncRead> BufReader<R> { + /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, + /// but may change in the future. + pub fn new(inner: R) -> Self { + Self::with_capacity(DEFAULT_BUF_SIZE, inner) + } + + /// Creates a new `BufReader` with the specified buffer capacity. + pub fn with_capacity(capacity: usize, inner: R) -> Self { + unsafe { + let mut buffer = Vec::with_capacity(capacity); + buffer.set_len(capacity); + super::initialize(&inner, &mut buffer); + Self { + inner, + buf: buffer.into_boxed_slice(), + pos: 0, + cap: 0, + } + } + } + + /// Gets a reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + pub fn get_ref(&self) -> &R { + &self.inner + } + + /// Gets a mutable reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + pub fn get_mut(&mut self) -> &mut R { + &mut self.inner + } + + /// Gets a pinned mutable reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { + self.inner() + } + + /// Consumes this `BufWriter`, returning the underlying reader. + /// + /// Note that any leftover data in the internal buffer is lost. + pub fn into_inner(self) -> R { + self.inner + } + + /// Returns a reference to the internally buffered data. + /// + /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty. + pub fn buffer(&self) -> &[u8] { + &self.buf[self.pos..self.cap] + } + + /// Invalidates all data in the internal buffer. + #[inline] + fn discard_buffer(mut self: Pin<&mut Self>) { + *self.as_mut().pos() = 0; + *self.cap() = 0; + } +} + +impl<R: AsyncRead> AsyncRead for BufReader<R> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + // If we don't have any buffered data and we're doing a massive read + // (larger than our internal buffer), bypass our internal buffer + // entirely. + if self.pos == self.cap && buf.len() >= self.buf.len() { + let res = ready!(self.as_mut().inner().poll_read(cx, buf)); + self.discard_buffer(); + return Poll::Ready(res); + } + let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; + let nread = rem.read(buf)?; + self.consume(nread); + Poll::Ready(Ok(nread)) + } + + fn poll_read_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll<io::Result<usize>> { + let total_len = bufs.iter().map(|b| b.len()).sum::<usize>(); + if self.pos == self.cap && total_len >= self.buf.len() { + let res = ready!(self.as_mut().inner().poll_read_vectored(cx, bufs)); + self.discard_buffer(); + return Poll::Ready(res); + } + let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; + let nread = rem.read_vectored(bufs)?; + self.consume(nread); + Poll::Ready(Ok(nread)) + } + + // we can't skip unconditionally because of the large buffer case in read. + #[cfg(feature = "read-initializer")] + unsafe fn initializer(&self) -> Initializer { + self.inner.initializer() + } +} + +impl<R: AsyncRead> AsyncBufRead for BufReader<R> { + fn poll_fill_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<&[u8]>> { + let Self { inner, buf, cap, pos } = unsafe { self.get_unchecked_mut() }; + let mut inner = unsafe { Pin::new_unchecked(inner) }; + + // If we've reached the end of our internal buffer then we need to fetch + // some more data from the underlying reader. + // Branch using `>=` instead of the more correct `==` + // to tell the compiler that the pos..cap slice is always valid. + if *pos >= *cap { + debug_assert!(*pos == *cap); + *cap = ready!(inner.as_mut().poll_read(cx, buf))?; + *pos = 0; + } + Poll::Ready(Ok(&buf[*pos..*cap])) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + *self.as_mut().pos() = cmp::min(self.pos + amt, self.cap); + } +} + +impl<R: AsyncWrite> AsyncWrite for BufReader<R> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.inner().poll_write(cx, buf) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<io::Result<usize>> { + self.inner().poll_write_vectored(cx, bufs) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.inner().poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.inner().poll_close(cx) + } +} + +impl<R: fmt::Debug> fmt::Debug for BufReader<R> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BufReader") + .field("reader", &self.inner) + .field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buf.len())) + .finish() + } +} + +impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> { + /// Seek to an offset, in bytes, in the underlying reader. + /// + /// The position used for seeking with `SeekFrom::Current(_)` is the + /// position the underlying reader would be at if the `BufReader` had no + /// internal buffer. + /// + /// Seeking always discards the internal buffer, even if the seek position + /// would otherwise fall within it. This guarantees that calling + /// `.into_inner()` immediately after a seek yields the underlying reader + /// at the same position. + /// + /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details. + /// + /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` + /// where `n` minus the internal buffer length overflows an `i64`, two + /// seeks will be performed instead of one. If the second seek returns + /// `Err`, the underlying reader will be left at the same position it would + /// have if you called `seek` with `SeekFrom::Current(0)`. + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<u64>> { + let result: u64; + if let SeekFrom::Current(n) = pos { + let remainder = (self.cap - self.pos) as i64; + // it should be safe to assume that remainder fits within an i64 as the alternative + // means we managed to allocate 8 exbibytes and that's absurd. + // But it's not out of the realm of possibility for some weird underlying reader to + // support seeking by i64::min_value() so we need to handle underflow when subtracting + // remainder. + if let Some(offset) = n.checked_sub(remainder) { + result = ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(offset)))?; + } else { + // seek backwards by our remainder, and then by the offset + ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(-remainder)))?; + self.as_mut().discard_buffer(); + result = ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(n)))?; + } + } else { + // Seeking with Start/End doesn't care about our buffer length. + result = ready!(self.as_mut().inner().poll_seek(cx, pos))?; + } + self.discard_buffer(); + Poll::Ready(Ok(result)) + } +} diff --git a/third_party/rust/futures-util/src/io/buf_writer.rs b/third_party/rust/futures-util/src/io/buf_writer.rs new file mode 100644 index 0000000000..b0afbd81f2 --- /dev/null +++ b/third_party/rust/futures-util/src/io/buf_writer.rs @@ -0,0 +1,222 @@ +use futures_core::task::{Context, Poll}; +#[cfg(feature = "read-initializer")] +use futures_io::Initializer; +use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use std::fmt; +use std::io::{self, Write}; +use std::pin::Pin; +use super::DEFAULT_BUF_SIZE; + +/// Wraps a writer and buffers its output. +/// +/// It can be excessively inefficient to work directly with something that +/// implements [`AsyncWrite`]. A `BufWriter` keeps an in-memory buffer of data and +/// writes it to an underlying writer in large, infrequent batches. +/// +/// `BufWriter` can improve the speed of programs that make *small* and +/// *repeated* write calls to the same file or network socket. It does not +/// help when writing very large amounts at once, or writing just one or a few +/// times. It also provides no advantage when writing to a destination that is +/// in memory, like a `Vec<u8>`. +/// +/// When the `BufWriter` is dropped, the contents of its buffer will be +/// discarded. Creating multiple instances of a `BufWriter` on the same +/// stream can cause data loss. If you need to write out the contents of its +/// buffer, you must manually call flush before the writer is dropped. +/// +/// [`AsyncWrite`]: futures_io::AsyncWrite +/// [`flush`]: super::AsyncWriteExt::flush +/// +// TODO: Examples +pub struct BufWriter<W> { + inner: W, + buf: Vec<u8>, + written: usize, +} + +impl<W> BufWriter<W> { + unsafe_pinned!(inner: W); + unsafe_unpinned!(buf: Vec<u8>); +} + +impl<W: AsyncWrite> BufWriter<W> { + /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, + /// but may change in the future. + pub fn new(inner: W) -> Self { + Self::with_capacity(DEFAULT_BUF_SIZE, inner) + } + + /// Creates a new `BufWriter` with the specified buffer capacity. + pub fn with_capacity(cap: usize, inner: W) -> Self { + Self { + inner, + buf: Vec::with_capacity(cap), + written: 0, + } + } + + fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + let Self { inner, buf, written } = unsafe { self.get_unchecked_mut() }; + let mut inner = unsafe { Pin::new_unchecked(inner) }; + + let len = buf.len(); + let mut ret = Ok(()); + while *written < len { + match ready!(inner.as_mut().poll_write(cx, &buf[*written..])) { + Ok(0) => { + ret = Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write the buffered data", + )); + break; + } + Ok(n) => *written += n, + Err(e) => { + ret = Err(e); + break; + } + } + } + if *written > 0 { + buf.drain(..*written); + } + *written = 0; + Poll::Ready(ret) + } + + /// Gets a reference to the underlying writer. + pub fn get_ref(&self) -> &W { + &self.inner + } + + /// Gets a mutable reference to the underlying writer. + /// + /// It is inadvisable to directly write to the underlying writer. + pub fn get_mut(&mut self) -> &mut W { + &mut self.inner + } + + /// Gets a pinned mutable reference to the underlying writer. + /// + /// It is inadvisable to directly write to the underlying writer. + pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { + self.inner() + } + + /// Consumes this `BufWriter`, returning the underlying writer. + /// + /// Note that any leftover data in the internal buffer is lost. + pub fn into_inner(self) -> W { + self.inner + } + + /// Returns a reference to the internally buffered data. + pub fn buffer(&self) -> &[u8] { + &self.buf + } +} + +impl<W: AsyncWrite> AsyncWrite for BufWriter<W> { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + if self.buf.len() + buf.len() > self.buf.capacity() { + ready!(self.as_mut().flush_buf(cx))?; + } + if buf.len() >= self.buf.capacity() { + self.inner().poll_write(cx, buf) + } else { + Poll::Ready(self.buf().write(buf)) + } + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<io::Result<usize>> { + let total_len = bufs.iter().map(|b| b.len()).sum::<usize>(); + if self.buf.len() + total_len > self.buf.capacity() { + ready!(self.as_mut().flush_buf(cx))?; + } + if total_len >= self.buf.capacity() { + self.inner().poll_write_vectored(cx, bufs) + } else { + Poll::Ready(self.buf().write_vectored(bufs)) + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + ready!(self.as_mut().flush_buf(cx))?; + self.inner().poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + ready!(self.as_mut().flush_buf(cx))?; + self.inner().poll_close(cx) + } +} + +impl<W: AsyncRead> AsyncRead for BufWriter<W> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + self.inner().poll_read(cx, buf) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll<io::Result<usize>> { + self.inner().poll_read_vectored(cx, bufs) + } + + // we can't skip unconditionally because of the large buffer case in read. + #[cfg(feature = "read-initializer")] + unsafe fn initializer(&self) -> Initializer { + self.inner.initializer() + } +} + +impl<W: AsyncBufRead> AsyncBufRead for BufWriter<W> { + fn poll_fill_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<&[u8]>> { + self.inner().poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.inner().consume(amt) + } +} + +impl<W: fmt::Debug> fmt::Debug for BufWriter<W> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BufWriter") + .field("writer", &self.inner) + .field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity())) + .field("written", &self.written) + .finish() + } +} + +impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> { + /// Seek to the offset, in bytes, in the underlying writer. + /// + /// Seeking always writes out the internal buffer before seeking. + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<u64>> { + ready!(self.as_mut().flush_buf(cx))?; + self.inner().poll_seek(cx, pos) + } +} diff --git a/third_party/rust/futures-util/src/io/chain.rs b/third_party/rust/futures-util/src/io/chain.rs new file mode 100644 index 0000000000..64bbdec87a --- /dev/null +++ b/third_party/rust/futures-util/src/io/chain.rs @@ -0,0 +1,166 @@ +use futures_core::task::{Context, Poll}; +#[cfg(feature = "read-initializer")] +use futures_io::Initializer; +use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use std::fmt; +use std::io; +use std::pin::Pin; + +/// Reader for the [`chain`](super::AsyncReadExt::chain) method. +#[must_use = "readers do nothing unless polled"] +pub struct Chain<T, U> { + first: T, + second: U, + done_first: bool, +} + +impl<T, U> Unpin for Chain<T, U> +where + T: Unpin, + U: Unpin, +{ +} + +impl<T, U> Chain<T, U> +where + T: AsyncRead, + U: AsyncRead, +{ + unsafe_pinned!(first: T); + unsafe_pinned!(second: U); + unsafe_unpinned!(done_first: bool); + + pub(super) fn new(first: T, second: U) -> Self { + Self { + first, + second, + done_first: false, + } + } + + /// Gets references to the underlying readers in this `Chain`. + pub fn get_ref(&self) -> (&T, &U) { + (&self.first, &self.second) + } + + /// Gets mutable references to the underlying readers in this `Chain`. + /// + /// Care should be taken to avoid modifying the internal I/O state of the + /// underlying readers as doing so may corrupt the internal state of this + /// `Chain`. + pub fn get_mut(&mut self) -> (&mut T, &mut U) { + (&mut self.first, &mut self.second) + } + + /// Gets pinned mutable references to the underlying readers in this `Chain`. + /// + /// Care should be taken to avoid modifying the internal I/O state of the + /// underlying readers as doing so may corrupt the internal state of this + /// `Chain`. + pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut T>, Pin<&mut U>) { + unsafe { + let Self { first, second, .. } = self.get_unchecked_mut(); + (Pin::new_unchecked(first), Pin::new_unchecked(second)) + } + } + + /// Consumes the `Chain`, returning the wrapped readers. + pub fn into_inner(self) -> (T, U) { + (self.first, self.second) + } +} + +impl<T, U> fmt::Debug for Chain<T, U> +where + T: fmt::Debug, + U: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Chain") + .field("t", &self.first) + .field("u", &self.second) + .field("done_first", &self.done_first) + .finish() + } +} + +impl<T, U> AsyncRead for Chain<T, U> +where + T: AsyncRead, + U: AsyncRead, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + if !self.done_first { + match ready!(self.as_mut().first().poll_read(cx, buf)?) { + 0 if !buf.is_empty() => *self.as_mut().done_first() = true, + n => return Poll::Ready(Ok(n)), + } + } + self.second().poll_read(cx, buf) + } + + fn poll_read_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll<io::Result<usize>> { + if !self.done_first { + let n = ready!(self.as_mut().first().poll_read_vectored(cx, bufs)?); + if n == 0 && bufs.iter().any(|b| !b.is_empty()) { + *self.as_mut().done_first() = true + } else { + return Poll::Ready(Ok(n)); + } + } + self.second().poll_read_vectored(cx, bufs) + } + + #[cfg(feature = "read-initializer")] + unsafe fn initializer(&self) -> Initializer { + let initializer = self.first.initializer(); + if initializer.should_initialize() { + initializer + } else { + self.second.initializer() + } + } +} + +impl<T, U> AsyncBufRead for Chain<T, U> +where + T: AsyncBufRead, + U: AsyncBufRead, +{ + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + let Self { + first, + second, + done_first, + } = unsafe { self.get_unchecked_mut() }; + let first = unsafe { Pin::new_unchecked(first) }; + let second = unsafe { Pin::new_unchecked(second) }; + + if !*done_first { + match ready!(first.poll_fill_buf(cx)?) { + buf if buf.is_empty() => { + *done_first = true; + } + buf => return Poll::Ready(Ok(buf)), + } + } + second.poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + if !self.done_first { + self.first().consume(amt) + } else { + self.second().consume(amt) + } + } +} diff --git a/third_party/rust/futures-util/src/io/close.rs b/third_party/rust/futures-util/src/io/close.rs new file mode 100644 index 0000000000..4d5669680b --- /dev/null +++ b/third_party/rust/futures-util/src/io/close.rs @@ -0,0 +1,28 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncWrite; +use std::io; +use std::pin::Pin; + +/// Future for the [`close`](super::AsyncWriteExt::close) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Close<'a, W: ?Sized> { + writer: &'a mut W, +} + +impl<W: ?Sized + Unpin> Unpin for Close<'_, W> {} + +impl<'a, W: AsyncWrite + ?Sized + Unpin> Close<'a, W> { + pub(super) fn new(writer: &'a mut W) -> Self { + Close { writer } + } +} + +impl<W: AsyncWrite + ?Sized + Unpin> Future for Close<'_, W> { + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + Pin::new(&mut *self.writer).poll_close(cx) + } +} diff --git a/third_party/rust/futures-util/src/io/copy.rs b/third_party/rust/futures-util/src/io/copy.rs new file mode 100644 index 0000000000..9531aab996 --- /dev/null +++ b/third_party/rust/futures-util/src/io/copy.rs @@ -0,0 +1,63 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncRead, AsyncWrite}; +use std::io; +use std::pin::Pin; +use super::{BufReader, copy_buf, CopyBuf}; +use pin_utils::unsafe_pinned; + +/// Creates a future which copies all the bytes from one object to another. +/// +/// The returned future will copy all the bytes read from this `AsyncRead` into the +/// `writer` specified. This future will only complete once the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::io::{self, AsyncWriteExt, Cursor}; +/// +/// let reader = Cursor::new([1, 2, 3, 4]); +/// let mut writer = Cursor::new(vec![0u8; 5]); +/// +/// let bytes = io::copy(reader, &mut writer).await?; +/// writer.close().await?; +/// +/// assert_eq!(bytes, 4); +/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); +/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); +/// ``` +pub fn copy<R, W>(reader: R, writer: &mut W) -> Copy<'_, R, W> +where + R: AsyncRead, + W: AsyncWrite + Unpin + ?Sized, +{ + Copy { + inner: copy_buf(BufReader::new(reader), writer), + } +} + +/// Future for the [`copy()`] function. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Copy<'a, R, W: ?Sized> { + inner: CopyBuf<'a, BufReader<R>, W>, +} + +impl<'a, R: AsyncRead, W: ?Sized> Unpin for Copy<'a, R, W> where CopyBuf<'a, BufReader<R>, W>: Unpin {} + +impl<'a, R: AsyncRead, W: ?Sized> Copy<'a, R, W> { + unsafe_pinned!(inner: CopyBuf<'a, BufReader<R>, W>); +} + +impl<R: AsyncRead, W: AsyncWrite + Unpin + ?Sized> Future for Copy<'_, R, W> { + type Output = io::Result<u64>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.inner().poll(cx) + } +} diff --git a/third_party/rust/futures-util/src/io/copy_buf.rs b/third_party/rust/futures-util/src/io/copy_buf.rs new file mode 100644 index 0000000000..98811825e0 --- /dev/null +++ b/third_party/rust/futures-util/src/io/copy_buf.rs @@ -0,0 +1,87 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncBufRead, AsyncWrite}; +use std::io; +use std::pin::Pin; + +/// Creates a future which copies all the bytes from one object to another. +/// +/// The returned future will copy all the bytes read from this `AsyncBufRead` into the +/// `writer` specified. This future will only complete once the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::io::{self, AsyncWriteExt, Cursor}; +/// +/// let reader = Cursor::new([1, 2, 3, 4]); +/// let mut writer = Cursor::new(vec![0u8; 5]); +/// +/// let bytes = io::copy_buf(reader, &mut writer).await?; +/// writer.close().await?; +/// +/// assert_eq!(bytes, 4); +/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); +/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); +/// ``` +pub fn copy_buf<R, W>(reader: R, writer: &mut W) -> CopyBuf<'_, R, W> +where + R: AsyncBufRead, + W: AsyncWrite + Unpin + ?Sized, +{ + CopyBuf { + reader, + writer, + amt: 0, + } +} + +/// Future for the [`copy_buf()`] function. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct CopyBuf<'a, R, W: ?Sized> { + reader: R, + writer: &'a mut W, + amt: u64, +} + +impl<R: Unpin, W: ?Sized> Unpin for CopyBuf<'_, R, W> {} + +impl<R, W: Unpin + ?Sized> CopyBuf<'_, R, W> { + fn project(self: Pin<&mut Self>) -> (Pin<&mut R>, Pin<&mut W>, &mut u64) { + unsafe { + let this = self.get_unchecked_mut(); + (Pin::new_unchecked(&mut this.reader), Pin::new(&mut *this.writer), &mut this.amt) + } + } +} + +impl<R, W> Future for CopyBuf<'_, R, W> + where R: AsyncBufRead, + W: AsyncWrite + Unpin + ?Sized, +{ + type Output = io::Result<u64>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let (mut reader, mut writer, amt) = self.project(); + loop { + let buffer = ready!(reader.as_mut().poll_fill_buf(cx))?; + if buffer.is_empty() { + ready!(writer.as_mut().poll_flush(cx))?; + return Poll::Ready(Ok(*amt)); + } + + let i = ready!(writer.as_mut().poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) + } + *amt += i as u64; + reader.as_mut().consume(i); + } + } +} diff --git a/third_party/rust/futures-util/src/io/cursor.rs b/third_party/rust/futures-util/src/io/cursor.rs new file mode 100644 index 0000000000..d1359231c5 --- /dev/null +++ b/third_party/rust/futures-util/src/io/cursor.rs @@ -0,0 +1,238 @@ +use futures_core::task::{Context, Poll}; +#[cfg(feature = "read_initializer")] +use futures_io::Initializer; +use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom}; +use std::io; +use std::pin::Pin; + +/// A `Cursor` wraps an in-memory buffer and provides it with a +/// [`AsyncSeek`] implementation. +/// +/// `Cursor`s are used with in-memory buffers, anything implementing +/// `AsRef<[u8]>`, to allow them to implement [`AsyncRead`] and/or [`AsyncWrite`], +/// allowing these buffers to be used anywhere you might use a reader or writer +/// that does actual I/O. +/// +/// The standard library implements some I/O traits on various types which +/// are commonly used as a buffer, like `Cursor<`[`Vec`]`<u8>>` and +/// `Cursor<`[`&[u8]`][bytes]`>`. +/// +/// [`AsyncSeek`]: trait.AsyncSeek.html +/// [`AsyncRead`]: trait.AsyncRead.html +/// [`AsyncWrite`]: trait.AsyncWrite.html +/// [bytes]: https://doc.rust-lang.org/std/primitive.slice.html +#[derive(Clone, Debug, Default)] +pub struct Cursor<T> { + inner: io::Cursor<T>, +} + +impl<T> Cursor<T> { + /// Creates a new cursor wrapping the provided underlying in-memory buffer. + /// + /// Cursor initial position is `0` even if underlying buffer (e.g., `Vec`) + /// is not empty. So writing to cursor starts with overwriting `Vec` + /// content, not with appending to it. + /// + /// # Examples + /// + /// ``` + /// use futures::io::Cursor; + /// + /// let buff = Cursor::new(Vec::new()); + /// # fn force_inference(_: &Cursor<Vec<u8>>) {} + /// # force_inference(&buff); + /// ``` + pub fn new(inner: T) -> Cursor<T> { + Cursor { + inner: io::Cursor::new(inner), + } + } + + /// Consumes this cursor, returning the underlying value. + /// + /// # Examples + /// + /// ``` + /// use futures::io::Cursor; + /// + /// let buff = Cursor::new(Vec::new()); + /// # fn force_inference(_: &Cursor<Vec<u8>>) {} + /// # force_inference(&buff); + /// + /// let vec = buff.into_inner(); + /// ``` + pub fn into_inner(self) -> T { + self.inner.into_inner() + } + + /// Gets a reference to the underlying value in this cursor. + /// + /// # Examples + /// + /// ``` + /// use futures::io::Cursor; + /// + /// let buff = Cursor::new(Vec::new()); + /// # fn force_inference(_: &Cursor<Vec<u8>>) {} + /// # force_inference(&buff); + /// + /// let reference = buff.get_ref(); + /// ``` + pub fn get_ref(&self) -> &T { + self.inner.get_ref() + } + + /// Gets a mutable reference to the underlying value in this cursor. + /// + /// Care should be taken to avoid modifying the internal I/O state of the + /// underlying value as it may corrupt this cursor's position. + /// + /// # Examples + /// + /// ``` + /// use futures::io::Cursor; + /// + /// let mut buff = Cursor::new(Vec::new()); + /// # fn force_inference(_: &Cursor<Vec<u8>>) {} + /// # force_inference(&buff); + /// + /// let reference = buff.get_mut(); + /// ``` + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut() + } + + /// Returns the current position of this cursor. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncSeekExt, Cursor, SeekFrom}; + /// + /// let mut buff = Cursor::new(vec![1, 2, 3, 4, 5]); + /// + /// assert_eq!(buff.position(), 0); + /// + /// buff.seek(SeekFrom::Current(2)).await?; + /// assert_eq!(buff.position(), 2); + /// + /// buff.seek(SeekFrom::Current(-1)).await?; + /// assert_eq!(buff.position(), 1); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + pub fn position(&self) -> u64 { + self.inner.position() + } + + /// Sets the position of this cursor. + /// + /// # Examples + /// + /// ``` + /// use futures::io::Cursor; + /// + /// let mut buff = Cursor::new(vec![1, 2, 3, 4, 5]); + /// + /// assert_eq!(buff.position(), 0); + /// + /// buff.set_position(2); + /// assert_eq!(buff.position(), 2); + /// + /// buff.set_position(4); + /// assert_eq!(buff.position(), 4); + /// ``` + pub fn set_position(&mut self, pos: u64) { + self.inner.set_position(pos) + } +} + +impl<T> AsyncSeek for Cursor<T> +where + T: AsRef<[u8]> + Unpin, +{ + fn poll_seek( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll<io::Result<u64>> { + Poll::Ready(io::Seek::seek(&mut self.inner, pos)) + } +} + +impl<T: AsRef<[u8]> + Unpin> AsyncRead for Cursor<T> { + #[cfg(feature = "read_initializer")] + #[inline] + unsafe fn initializer(&self) -> Initializer { + io::Read::initializer(&self.inner) + } + + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + Poll::Ready(io::Read::read(&mut self.inner, buf)) + } + + fn poll_read_vectored( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll<io::Result<usize>> { + Poll::Ready(io::Read::read_vectored(&mut self.inner, bufs)) + } +} + +impl<T> AsyncBufRead for Cursor<T> +where + T: AsRef<[u8]> + Unpin, +{ + fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + Poll::Ready(io::BufRead::fill_buf(&mut self.get_mut().inner)) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + io::BufRead::consume(&mut self.inner, amt) + } +} + +macro_rules! delegate_async_write_to_stdio { + () => { + fn poll_write(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) + -> Poll<io::Result<usize>> + { + Poll::Ready(io::Write::write(&mut self.inner, buf)) + } + + fn poll_write_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>]) + -> Poll<io::Result<usize>> + { + Poll::Ready(io::Write::write_vectored(&mut self.inner, bufs)) + } + + fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(io::Write::flush(&mut self.inner)) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.poll_flush(cx) + } + } +} + +impl AsyncWrite for Cursor<&mut [u8]> { + delegate_async_write_to_stdio!(); +} + +impl AsyncWrite for Cursor<&mut Vec<u8>> { + delegate_async_write_to_stdio!(); +} + +impl AsyncWrite for Cursor<Vec<u8>> { + delegate_async_write_to_stdio!(); +} + +impl AsyncWrite for Cursor<Box<[u8]>> { + delegate_async_write_to_stdio!(); +} diff --git a/third_party/rust/futures-util/src/io/empty.rs b/third_party/rust/futures-util/src/io/empty.rs new file mode 100644 index 0000000000..ab2395a8af --- /dev/null +++ b/third_party/rust/futures-util/src/io/empty.rs @@ -0,0 +1,67 @@ +use futures_core::task::{Context, Poll}; +#[cfg(feature = "read-initializer")] +use futures_io::Initializer; +use futures_io::{AsyncBufRead, AsyncRead}; +use std::fmt; +use std::io; +use std::pin::Pin; + +/// Reader for the [`empty()`] function. +#[must_use = "readers do nothing unless polled"] +pub struct Empty { + _priv: (), +} + +/// Constructs a new handle to an empty reader. +/// +/// All reads from the returned reader will return `Poll::Ready(Ok(0))`. +/// +/// # Examples +/// +/// A slightly sad example of not reading anything into a buffer: +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::io::{self, AsyncReadExt}; +/// +/// let mut buffer = String::new(); +/// let mut reader = io::empty(); +/// reader.read_to_string(&mut buffer).await?; +/// assert!(buffer.is_empty()); +/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); +/// ``` +pub fn empty() -> Empty { + Empty { _priv: () } +} + +impl AsyncRead for Empty { + #[inline] + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context<'_>, + _: &mut [u8], + ) -> Poll<io::Result<usize>> { + Poll::Ready(Ok(0)) + } + + #[cfg(feature = "read-initializer")] + #[inline] + unsafe fn initializer(&self) -> Initializer { + Initializer::nop() + } +} + +impl AsyncBufRead for Empty { + #[inline] + fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + Poll::Ready(Ok(&[])) + } + #[inline] + fn consume(self: Pin<&mut Self>, _: usize) {} +} + +impl fmt::Debug for Empty { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Empty { .. }") + } +} diff --git a/third_party/rust/futures-util/src/io/flush.rs b/third_party/rust/futures-util/src/io/flush.rs new file mode 100644 index 0000000000..70b867a207 --- /dev/null +++ b/third_party/rust/futures-util/src/io/flush.rs @@ -0,0 +1,30 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncWrite; +use std::io; +use std::pin::Pin; + +/// Future for the [`flush`](super::AsyncWriteExt::flush) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Flush<'a, W: ?Sized> { + writer: &'a mut W, +} + +impl<W: ?Sized + Unpin> Unpin for Flush<'_, W> {} + +impl<'a, W: AsyncWrite + ?Sized + Unpin> Flush<'a, W> { + pub(super) fn new(writer: &'a mut W) -> Self { + Flush { writer } + } +} + +impl<W> Future for Flush<'_, W> + where W: AsyncWrite + ?Sized + Unpin, +{ + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + Pin::new(&mut *self.writer).poll_flush(cx) + } +} diff --git a/third_party/rust/futures-util/src/io/into_sink.rs b/third_party/rust/futures-util/src/io/into_sink.rs new file mode 100644 index 0000000000..bdc6b34140 --- /dev/null +++ b/third_party/rust/futures-util/src/io/into_sink.rs @@ -0,0 +1,108 @@ +use futures_core::task::{Context, Poll}; +use futures_io::AsyncWrite; +use futures_sink::Sink; +use std::io; +use std::pin::Pin; +use std::marker::Unpin; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; + +#[derive(Debug)] +struct Block<Item> { + offset: usize, + bytes: Item, +} + +/// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method. +#[must_use = "sinks do nothing unless polled"] +#[derive(Debug)] +pub struct IntoSink<W, Item> { + writer: W, + /// An outstanding block for us to push into the underlying writer, along with an offset of how + /// far into this block we have written already. + buffer: Option<Block<Item>>, +} + +impl<W: Unpin, Item> Unpin for IntoSink<W, Item> {} + +impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> { + unsafe_pinned!(writer: W); + unsafe_unpinned!(buffer: Option<Block<Item>>); + + pub(super) fn new(writer: W) -> Self { + IntoSink { writer, buffer: None } + } + + fn project(self: Pin<&mut Self>) -> (Pin<&mut W>, &mut Option<Block<Item>>) { + unsafe { + let this = self.get_unchecked_mut(); + (Pin::new_unchecked(&mut this.writer), &mut this.buffer) + } + } + + /// If we have an outstanding block in `buffer` attempt to push it into the writer, does _not_ + /// flush the writer after it succeeds in pushing the block into it. + fn poll_flush_buffer( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), io::Error>> + { + let (mut writer, buffer) = self.project(); + if let Some(buffer) = buffer { + loop { + let bytes = buffer.bytes.as_ref(); + let written = ready!(writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?; + buffer.offset += written; + if buffer.offset == bytes.len() { + break; + } + } + } + *buffer = None; + Poll::Ready(Ok(())) + } + +} + +impl<W: AsyncWrite, Item: AsRef<[u8]>> Sink<Item> for IntoSink<W, Item> { + type Error = io::Error; + + fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> + { + ready!(self.as_mut().poll_flush_buffer(cx))?; + Poll::Ready(Ok(())) + } + + #[allow(clippy::debug_assert_with_mut_call)] + fn start_send( + mut self: Pin<&mut Self>, + item: Item, + ) -> Result<(), Self::Error> + { + debug_assert!(self.as_mut().buffer().is_none()); + *self.as_mut().buffer() = Some(Block { offset: 0, bytes: item }); + Ok(()) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> + { + ready!(self.as_mut().poll_flush_buffer(cx))?; + ready!(self.as_mut().writer().poll_flush(cx))?; + Poll::Ready(Ok(())) + } + + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> + { + ready!(self.as_mut().poll_flush_buffer(cx))?; + ready!(self.as_mut().writer().poll_close(cx))?; + Poll::Ready(Ok(())) + } +} diff --git a/third_party/rust/futures-util/src/io/lines.rs b/third_party/rust/futures-util/src/io/lines.rs new file mode 100644 index 0000000000..2e1261689b --- /dev/null +++ b/third_party/rust/futures-util/src/io/lines.rs @@ -0,0 +1,50 @@ +use futures_core::stream::Stream; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncBufRead; +use std::io; +use std::mem; +use std::pin::Pin; +use super::read_line::read_line_internal; + +/// Stream for the [`lines`](super::AsyncBufReadExt::lines) method. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Lines<R> { + reader: R, + buf: String, + bytes: Vec<u8>, + read: usize, +} + +impl<R: Unpin> Unpin for Lines<R> {} + +impl<R: AsyncBufRead> Lines<R> { + pub(super) fn new(reader: R) -> Self { + Self { + reader, + buf: String::new(), + bytes: Vec::new(), + read: 0, + } + } +} + +impl<R: AsyncBufRead> Stream for Lines<R> { + type Item = io::Result<String>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let Self { reader, buf, bytes, read } = unsafe { self.get_unchecked_mut() }; + let reader = unsafe { Pin::new_unchecked(reader) }; + let n = ready!(read_line_internal(reader, cx, buf, bytes, read))?; + if n == 0 && buf.is_empty() { + return Poll::Ready(None) + } + if buf.ends_with('\n') { + buf.pop(); + if buf.ends_with('\r') { + buf.pop(); + } + } + Poll::Ready(Some(Ok(mem::replace(buf, String::new())))) + } +} diff --git a/third_party/rust/futures-util/src/io/mod.rs b/third_party/rust/futures-util/src/io/mod.rs new file mode 100644 index 0000000000..43f183f424 --- /dev/null +++ b/third_party/rust/futures-util/src/io/mod.rs @@ -0,0 +1,675 @@ +//! IO +//! +//! This module contains a number of functions for working with +//! `AsyncRead`, `AsyncWrite`, `AsyncSeek`, and `AsyncBufRead` types, including +//! the `AsyncReadExt`, `AsyncWriteExt`, `AsyncSeekExt`, and `AsyncBufReadExt` +//! traits which add methods to the `AsyncRead`, `AsyncWrite`, `AsyncSeek`, +//! and `AsyncBufRead` types. +//! +//! This module is only available when the `io` and `std` features of this +//! library is activated, and it is activated by default. + +#[cfg(feature = "io-compat")] +use crate::compat::Compat; +use std::ptr; + +pub use futures_io::{ + AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind, + IoSlice, IoSliceMut, Result, SeekFrom, +}; +#[cfg(feature = "read-initializer")] +pub use futures_io::Initializer; + +// used by `BufReader` and `BufWriter` +// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1 +const DEFAULT_BUF_SIZE: usize = 8 * 1024; + +/// Initializes a buffer if necessary. +/// +/// A buffer is always initialized if `read-initializer` feature is disabled. +#[inline] +unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) { + #[cfg(feature = "read-initializer")] + { + if !_reader.initializer().should_initialize() { + return; + } + } + ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) +} + +mod allow_std; +pub use self::allow_std::AllowStdIo; + +mod buf_reader; +pub use self::buf_reader::BufReader; + +mod buf_writer; +pub use self::buf_writer::BufWriter; + +mod chain; +pub use self::chain::Chain; + +mod close; +pub use self::close::Close; + +mod copy; +pub use self::copy::{copy, Copy}; + +mod copy_buf; +pub use self::copy_buf::{copy_buf, CopyBuf}; + +mod cursor; +pub use self::cursor::Cursor; + +mod empty; +pub use self::empty::{empty, Empty}; + +mod flush; +pub use self::flush::Flush; + +#[cfg(feature = "sink")] +mod into_sink; +#[cfg(feature = "sink")] +pub use self::into_sink::IntoSink; + +mod lines; +pub use self::lines::Lines; + +mod read; +pub use self::read::Read; + +mod read_vectored; +pub use self::read_vectored::ReadVectored; + +mod read_exact; +pub use self::read_exact::ReadExact; + +mod read_line; +pub use self::read_line::ReadLine; + +mod read_to_end; +pub use self::read_to_end::ReadToEnd; + +mod read_to_string; +pub use self::read_to_string::ReadToString; + +mod read_until; +pub use self::read_until::ReadUntil; + +mod repeat; +pub use self::repeat::{repeat, Repeat}; + +mod seek; +pub use self::seek::Seek; + +mod sink; +pub use self::sink::{sink, Sink}; + +mod split; +pub use self::split::{ReadHalf, WriteHalf}; + +mod take; +pub use self::take::Take; + +mod window; +pub use self::window::Window; + +mod write; +pub use self::write::Write; + +mod write_vectored; +pub use self::write_vectored::WriteVectored; + +mod write_all; +pub use self::write_all::WriteAll; + +/// An extension trait which adds utility methods to `AsyncRead` types. +pub trait AsyncReadExt: AsyncRead { + /// Creates an adaptor which will chain this stream with another. + /// + /// The returned `AsyncRead` instance will first read all bytes from this object + /// until EOF is encountered. Afterwards the output is equivalent to the + /// output of `next`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncReadExt, Cursor}; + /// + /// let reader1 = Cursor::new([1, 2, 3, 4]); + /// let reader2 = Cursor::new([5, 6, 7, 8]); + /// + /// let mut reader = reader1.chain(reader2); + /// let mut buffer = Vec::new(); + /// + /// // read the value into a Vec. + /// reader.read_to_end(&mut buffer).await?; + /// assert_eq!(buffer, [1, 2, 3, 4, 5, 6, 7, 8]); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + fn chain<R>(self, next: R) -> Chain<Self, R> + where + Self: Sized, + R: AsyncRead, + { + Chain::new(self, next) + } + + /// Tries to read some bytes directly into the given `buf` in asynchronous + /// manner, returning a future type. + /// + /// The returned future will resolve to the number of bytes read once the read + /// operation is completed. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncReadExt, Cursor}; + /// + /// let mut reader = Cursor::new([1, 2, 3, 4]); + /// let mut output = [0u8; 5]; + /// + /// let bytes = reader.read(&mut output[..]).await?; + /// + /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous + /// // reader. In a real system you could get anywhere from 1 to + /// // `output.len()` bytes in a single read. + /// assert_eq!(bytes, 4); + /// assert_eq!(output, [1, 2, 3, 4, 0]); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self> + where Self: Unpin, + { + Read::new(self, buf) + } + + /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored + /// IO operations. + /// + /// The returned future will resolve to the number of bytes read once the read + /// operation is completed. + fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self> + where Self: Unpin, + { + ReadVectored::new(self, bufs) + } + + /// Creates a future which will read exactly enough bytes to fill `buf`, + /// returning an error if end of file (EOF) is hit sooner. + /// + /// The returned future will resolve once the read operation is completed. + /// + /// In the case of an error the buffer and the object will be discarded, with + /// the error yielded. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncReadExt, Cursor}; + /// + /// let mut reader = Cursor::new([1, 2, 3, 4]); + /// let mut output = [0u8; 4]; + /// + /// reader.read_exact(&mut output).await?; + /// + /// assert_eq!(output, [1, 2, 3, 4]); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + /// + /// ## EOF is hit before `buf` is filled + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{self, AsyncReadExt, Cursor}; + /// + /// let mut reader = Cursor::new([1, 2, 3, 4]); + /// let mut output = [0u8; 5]; + /// + /// let result = reader.read_exact(&mut output).await; + /// + /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof); + /// # }); + /// ``` + fn read_exact<'a>( + &'a mut self, + buf: &'a mut [u8], + ) -> ReadExact<'a, Self> + where Self: Unpin, + { + ReadExact::new(self, buf) + } + + /// Creates a future which will read all the bytes from this `AsyncRead`. + /// + /// On success the total number of bytes read is returned. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncReadExt, Cursor}; + /// + /// let mut reader = Cursor::new([1, 2, 3, 4]); + /// let mut output = Vec::with_capacity(4); + /// + /// let bytes = reader.read_to_end(&mut output).await?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(output, vec![1, 2, 3, 4]); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + fn read_to_end<'a>( + &'a mut self, + buf: &'a mut Vec<u8>, + ) -> ReadToEnd<'a, Self> + where Self: Unpin, + { + ReadToEnd::new(self, buf) + } + + /// Creates a future which will read all the bytes from this `AsyncRead`. + /// + /// On success the total number of bytes read is returned. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncReadExt, Cursor}; + /// + /// let mut reader = Cursor::new(&b"1234"[..]); + /// let mut buffer = String::with_capacity(4); + /// + /// let bytes = reader.read_to_string(&mut buffer).await?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(buffer, String::from("1234")); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + fn read_to_string<'a>( + &'a mut self, + buf: &'a mut String, + ) -> ReadToString<'a, Self> + where Self: Unpin, + { + ReadToString::new(self, buf) + } + + /// Helper method for splitting this read/write object into two halves. + /// + /// The two halves returned implement the `AsyncRead` and `AsyncWrite` + /// traits, respectively. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{self, AsyncReadExt, Cursor}; + /// + /// // Note that for `Cursor` the read and write halves share a single + /// // seek position. This may or may not be true for other types that + /// // implement both `AsyncRead` and `AsyncWrite`. + /// + /// let reader = Cursor::new([1, 2, 3, 4]); + /// let mut buffer = Cursor::new(vec![0, 0, 0, 0, 5, 6, 7, 8]); + /// let mut writer = Cursor::new(vec![0u8; 5]); + /// + /// { + /// let (buffer_reader, mut buffer_writer) = (&mut buffer).split(); + /// io::copy(reader, &mut buffer_writer).await?; + /// io::copy(buffer_reader, &mut writer).await?; + /// } + /// + /// assert_eq!(buffer.into_inner(), [1, 2, 3, 4, 5, 6, 7, 8]); + /// assert_eq!(writer.into_inner(), [5, 6, 7, 8, 0]); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>) + where Self: AsyncWrite + Sized, + { + split::split(self) + } + + /// Creates an AsyncRead adapter which will read at most `limit` bytes + /// from the underlying reader. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncReadExt, Cursor}; + /// + /// let reader = Cursor::new(&b"12345678"[..]); + /// let mut buffer = [0; 5]; + /// + /// let mut take = reader.take(4); + /// let n = take.read(&mut buffer).await?; + /// + /// assert_eq!(n, 4); + /// assert_eq!(&buffer, b"1234\0"); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + fn take(self, limit: u64) -> Take<Self> + where Self: Sized + { + Take::new(self, limit) + } + + /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be + /// used as a futures 0.1 / tokio-io 0.1 `AsyncRead`. If the wrapped type + /// implements [`AsyncWrite`] as well, the result will also implement the + /// futures 0.1 / tokio 0.1 `AsyncWrite` trait. + /// + /// Requires the `io-compat` feature to enable. + #[cfg(feature = "io-compat")] + fn compat(self) -> Compat<Self> + where Self: Sized + Unpin, + { + Compat::new(self) + } +} + +impl<R: AsyncRead + ?Sized> AsyncReadExt for R {} + +/// An extension trait which adds utility methods to `AsyncWrite` types. +pub trait AsyncWriteExt: AsyncWrite { + /// Creates a future which will entirely flush this `AsyncWrite`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AllowStdIo, AsyncWriteExt}; + /// use std::io::{BufWriter, Cursor}; + /// + /// let mut output = vec![0u8; 5]; + /// + /// { + /// let writer = Cursor::new(&mut output); + /// let mut buffered = AllowStdIo::new(BufWriter::new(writer)); + /// buffered.write_all(&[1, 2]).await?; + /// buffered.write_all(&[3, 4]).await?; + /// buffered.flush().await?; + /// } + /// + /// assert_eq!(output, [1, 2, 3, 4, 0]); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + fn flush(&mut self) -> Flush<'_, Self> + where Self: Unpin, + { + Flush::new(self) + } + + /// Creates a future which will entirely close this `AsyncWrite`. + fn close(&mut self) -> Close<'_, Self> + where Self: Unpin, + { + Close::new(self) + } + + /// Creates a future which will write bytes from `buf` into the object. + /// + /// The returned future will resolve to the number of bytes written once the write + /// operation is completed. + fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self> + where Self: Unpin, + { + Write::new(self, buf) + } + + /// Creates a future which will write bytes from `bufs` into the object using vectored + /// IO operations. + /// + /// The returned future will resolve to the number of bytes written once the write + /// operation is completed. + fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self> + where Self: Unpin, + { + WriteVectored::new(self, bufs) + } + + /// Write data into this object. + /// + /// Creates a future that will write the entire contents of the buffer `buf` into + /// this `AsyncWrite`. + /// + /// The returned future will not complete until all the data has been written. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncWriteExt, Cursor}; + /// + /// let mut writer = Cursor::new(vec![0u8; 5]); + /// + /// writer.write_all(&[1, 2, 3, 4]).await?; + /// + /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self> + where Self: Unpin, + { + WriteAll::new(self, buf) + } + + /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be + /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`. + /// Requires the `io-compat` feature to enable. + #[cfg(feature = "io-compat")] + fn compat_write(self) -> Compat<Self> + where Self: Sized + Unpin, + { + Compat::new(self) + } + + + /// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`. + /// + /// This adapter produces a sink that will write each value passed to it + /// into the underlying writer. + /// + /// Note that this function consumes the given writer, returning a wrapped + /// version. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::AsyncWriteExt; + /// use futures::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]); + /// + /// let mut writer = vec![]; + /// + /// stream.forward((&mut writer).into_sink()).await?; + /// + /// assert_eq!(writer, vec![1, 2, 3, 4, 5, 6]); + /// # Ok::<(), Box<dyn std::error::Error>>(()) + /// # })?; + /// # Ok::<(), Box<dyn std::error::Error>>(()) + /// ``` + #[cfg(feature = "sink")] + fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item> + where Self: Sized, + { + IntoSink::new(self) + } +} + +impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {} + +/// An extension trait which adds utility methods to `AsyncSeek` types. +pub trait AsyncSeekExt: AsyncSeek { + /// Creates a future which will seek an IO object, and then yield the + /// new position in the object and the object itself. + /// + /// In the case of an error the buffer and the object will be discarded, with + /// the error yielded. + fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self> + where Self: Unpin, + { + Seek::new(self, pos) + } +} + +impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {} + +/// An extension trait which adds utility methods to `AsyncBufRead` types. +pub trait AsyncBufReadExt: AsyncBufRead { + /// Creates a future which will read all the bytes associated with this I/O + /// object into `buf` until the delimiter `byte` or EOF is reached. + /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until). + /// + /// This function will read bytes from the underlying stream until the + /// delimiter or EOF is found. Once found, all bytes up to, and including, + /// the delimiter (if found) will be appended to `buf`. + /// + /// The returned future will resolve to the number of bytes read once the read + /// operation is completed. + /// + /// In the case of an error the buffer and the object will be discarded, with + /// the error yielded. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncBufReadExt, Cursor}; + /// + /// let mut cursor = Cursor::new(b"lorem-ipsum"); + /// let mut buf = vec![]; + /// + /// // cursor is at 'l' + /// let num_bytes = cursor.read_until(b'-', &mut buf).await?; + /// assert_eq!(num_bytes, 6); + /// assert_eq!(buf, b"lorem-"); + /// buf.clear(); + /// + /// // cursor is at 'i' + /// let num_bytes = cursor.read_until(b'-', &mut buf).await?; + /// assert_eq!(num_bytes, 5); + /// assert_eq!(buf, b"ipsum"); + /// buf.clear(); + /// + /// // cursor is at EOF + /// let num_bytes = cursor.read_until(b'-', &mut buf).await?; + /// assert_eq!(num_bytes, 0); + /// assert_eq!(buf, b""); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + fn read_until<'a>( + &'a mut self, + byte: u8, + buf: &'a mut Vec<u8>, + ) -> ReadUntil<'a, Self> + where Self: Unpin, + { + ReadUntil::new(self, byte, buf) + } + + /// Creates a future which will read all the bytes associated with this I/O + /// object into `buf` until a newline (the 0xA byte) or EOF is reached, + /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line). + /// + /// This function will read bytes from the underlying stream until the + /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes + /// up to, and including, the delimiter (if found) will be appended to + /// `buf`. + /// + /// The returned future will resolve to the number of bytes read once the read + /// operation is completed. + /// + /// In the case of an error the buffer and the object will be discarded, with + /// the error yielded. + /// + /// # Errors + /// + /// This function has the same error semantics as [`read_until`] and will + /// also return an error if the read bytes are not valid UTF-8. If an I/O + /// error is encountered then `buf` may contain some bytes already read in + /// the event that all data read so far was valid UTF-8. + /// + /// [`read_until`]: AsyncBufReadExt::read_until + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncBufReadExt, Cursor}; + /// + /// let mut cursor = Cursor::new(b"foo\nbar"); + /// let mut buf = String::new(); + /// + /// // cursor is at 'f' + /// let num_bytes = cursor.read_line(&mut buf).await?; + /// assert_eq!(num_bytes, 4); + /// assert_eq!(buf, "foo\n"); + /// buf.clear(); + /// + /// // cursor is at 'b' + /// let num_bytes = cursor.read_line(&mut buf).await?; + /// assert_eq!(num_bytes, 3); + /// assert_eq!(buf, "bar"); + /// buf.clear(); + /// + /// // cursor is at EOF + /// let num_bytes = cursor.read_line(&mut buf).await?; + /// assert_eq!(num_bytes, 0); + /// assert_eq!(buf, ""); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> + where Self: Unpin, + { + ReadLine::new(self, buf) + } + + /// Returns a stream over the lines of this reader. + /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines). + /// + /// The stream returned from this function will yield instances of + /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline + /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end. + /// + /// [`io::Result`]: std::io::Result + /// [`String`]: String + /// + /// # Errors + /// + /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`]. + /// + /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncBufReadExt, Cursor}; + /// use futures::stream::StreamExt; + /// + /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor"); + /// + /// let mut lines_stream = cursor.lines().map(|l| l.unwrap()); + /// assert_eq!(lines_stream.next().await, Some(String::from("lorem"))); + /// assert_eq!(lines_stream.next().await, Some(String::from("ipsum"))); + /// assert_eq!(lines_stream.next().await, Some(String::from("dolor"))); + /// assert_eq!(lines_stream.next().await, None); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + fn lines(self) -> Lines<Self> + where Self: Sized, + { + Lines::new(self) + } +} + +impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {} diff --git a/third_party/rust/futures-util/src/io/read.rs b/third_party/rust/futures-util/src/io/read.rs new file mode 100644 index 0000000000..ea25959056 --- /dev/null +++ b/third_party/rust/futures-util/src/io/read.rs @@ -0,0 +1,30 @@ +use crate::io::AsyncRead; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use std::io; +use std::pin::Pin; + +/// Future for the [`read`](super::AsyncReadExt::read) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Read<'a, R: ?Sized> { + reader: &'a mut R, + buf: &'a mut [u8], +} + +impl<R: ?Sized + Unpin> Unpin for Read<'_, R> {} + +impl<'a, R: AsyncRead + ?Sized + Unpin> Read<'a, R> { + pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self { + Read { reader, buf } + } +} + +impl<R: AsyncRead + ?Sized + Unpin> Future for Read<'_, R> { + type Output = io::Result<usize>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = &mut *self; + Pin::new(&mut this.reader).poll_read(cx, this.buf) + } +} diff --git a/third_party/rust/futures-util/src/io/read_exact.rs b/third_party/rust/futures-util/src/io/read_exact.rs new file mode 100644 index 0000000000..a2bbd400ea --- /dev/null +++ b/third_party/rust/futures-util/src/io/read_exact.rs @@ -0,0 +1,41 @@ +use crate::io::AsyncRead; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use std::io; +use std::mem; +use std::pin::Pin; + +/// Future for the [`read_exact`](super::AsyncReadExt::read_exact) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ReadExact<'a, R: ?Sized> { + reader: &'a mut R, + buf: &'a mut [u8], +} + +impl<R: ?Sized + Unpin> Unpin for ReadExact<'_, R> {} + +impl<'a, R: AsyncRead + ?Sized + Unpin> ReadExact<'a, R> { + pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self { + ReadExact { reader, buf } + } +} + +impl<R: AsyncRead + ?Sized + Unpin> Future for ReadExact<'_, R> { + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = &mut *self; + while !this.buf.is_empty() { + let n = ready!(Pin::new(&mut this.reader).poll_read(cx, this.buf))?; + { + let (_, rest) = mem::replace(&mut this.buf, &mut []).split_at_mut(n); + this.buf = rest; + } + if n == 0 { + return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())) + } + } + Poll::Ready(Ok(())) + } +} diff --git a/third_party/rust/futures-util/src/io/read_line.rs b/third_party/rust/futures-util/src/io/read_line.rs new file mode 100644 index 0000000000..d830514b92 --- /dev/null +++ b/third_party/rust/futures-util/src/io/read_line.rs @@ -0,0 +1,61 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncBufRead; +use std::io; +use std::mem; +use std::pin::Pin; +use std::str; +use super::read_until::read_until_internal; + +/// Future for the [`read_line`](super::AsyncBufReadExt::read_line) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ReadLine<'a, R: ?Sized> { + reader: &'a mut R, + buf: &'a mut String, + bytes: Vec<u8>, + read: usize, +} + +impl<R: ?Sized + Unpin> Unpin for ReadLine<'_, R> {} + +impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> { + pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { + Self { + reader, + bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) }, + buf, + read: 0, + } + } +} + +pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>( + reader: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut String, + bytes: &mut Vec<u8>, + read: &mut usize, +) -> Poll<io::Result<usize>> { + let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read)); + if str::from_utf8(&bytes).is_err() { + Poll::Ready(ret.and_then(|_| { + Err(io::Error::new(io::ErrorKind::InvalidData, "stream did not contain valid UTF-8")) + })) + } else { + debug_assert!(buf.is_empty()); + debug_assert_eq!(*read, 0); + // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`. + mem::swap(unsafe { buf.as_mut_vec() }, bytes); + Poll::Ready(ret) + } +} + +impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> { + type Output = io::Result<usize>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let Self { reader, buf, bytes, read } = &mut *self; + read_line_internal(Pin::new(reader), cx, buf, bytes, read) + } +} diff --git a/third_party/rust/futures-util/src/io/read_to_end.rs b/third_party/rust/futures-util/src/io/read_to_end.rs new file mode 100644 index 0000000000..70b057829c --- /dev/null +++ b/third_party/rust/futures-util/src/io/read_to_end.rs @@ -0,0 +1,90 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncRead; +use std::io; +use std::pin::Pin; +use std::vec::Vec; + +/// Future for the [`read_to_end`](super::AsyncReadExt::read_to_end) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ReadToEnd<'a, R: ?Sized> { + reader: &'a mut R, + buf: &'a mut Vec<u8>, + start_len: usize, +} + +impl<R: ?Sized + Unpin> Unpin for ReadToEnd<'_, R> {} + +impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToEnd<'a, R> { + pub(super) fn new(reader: &'a mut R, buf: &'a mut Vec<u8>) -> Self { + let start_len = buf.len(); + Self { + reader, + buf, + start_len, + } + } +} + +struct Guard<'a> { buf: &'a mut Vec<u8>, len: usize } + +impl Drop for Guard<'_> { + fn drop(&mut self) { + unsafe { self.buf.set_len(self.len); } + } +} + +// This uses an adaptive system to extend the vector when it fills. We want to +// avoid paying to allocate and zero a huge chunk of memory if the reader only +// has 4 bytes while still making large reads if the reader does have a ton +// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every +// time is 4,500 times (!) slower than this if the reader has a very small +// amount of data to return. +// +// Because we're extending the buffer with uninitialized data for trusted +// readers, we need to make sure to truncate that if any of this panics. +pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>( + mut rd: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut Vec<u8>, + start_len: usize, +) -> Poll<io::Result<usize>> { + let mut g = Guard { len: buf.len(), buf }; + let ret; + loop { + if g.len == g.buf.len() { + unsafe { + g.buf.reserve(32); + let capacity = g.buf.capacity(); + g.buf.set_len(capacity); + super::initialize(&rd, &mut g.buf[g.len..]); + } + } + + match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) { + Ok(0) => { + ret = Poll::Ready(Ok(g.len - start_len)); + break; + } + Ok(n) => g.len += n, + Err(e) => { + ret = Poll::Ready(Err(e)); + break; + } + } + } + + ret +} + +impl<A> Future for ReadToEnd<'_, A> + where A: AsyncRead + ?Sized + Unpin, +{ + type Output = io::Result<usize>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = &mut *self; + read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf, this.start_len) + } +} diff --git a/third_party/rust/futures-util/src/io/read_to_string.rs b/third_party/rust/futures-util/src/io/read_to_string.rs new file mode 100644 index 0000000000..56c95ce18d --- /dev/null +++ b/third_party/rust/futures-util/src/io/read_to_string.rs @@ -0,0 +1,66 @@ +use super::read_to_end::read_to_end_internal; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncRead; +use std::pin::Pin; +use std::vec::Vec; +use std::{io, mem, str}; + +/// Future for the [`read_to_string`](super::AsyncReadExt::read_to_string) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ReadToString<'a, R: ?Sized> { + reader: &'a mut R, + buf: &'a mut String, + bytes: Vec<u8>, + start_len: usize, +} + +impl<R: ?Sized + Unpin> Unpin for ReadToString<'_, R> {} + +impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToString<'a, R> { + pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { + let start_len = buf.len(); + Self { + reader, + bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) }, + buf, + start_len, + } + } +} + +fn read_to_string_internal<R: AsyncRead + ?Sized>( + reader: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut String, + bytes: &mut Vec<u8>, + start_len: usize, +) -> Poll<io::Result<usize>> { + let ret = ready!(read_to_end_internal(reader, cx, bytes, start_len)); + if str::from_utf8(&bytes).is_err() { + Poll::Ready(ret.and_then(|_| { + Err(io::Error::new( + io::ErrorKind::InvalidData, + "stream did not contain valid UTF-8", + )) + })) + } else { + debug_assert!(buf.is_empty()); + // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`. + mem::swap(unsafe { buf.as_mut_vec() }, bytes); + Poll::Ready(ret) + } +} + +impl<A> Future for ReadToString<'_, A> +where + A: AsyncRead + ?Sized + Unpin, +{ + type Output = io::Result<usize>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let Self { reader, buf, bytes, start_len } = &mut *self; + read_to_string_internal(Pin::new(reader), cx, buf, bytes, *start_len) + } +} diff --git a/third_party/rust/futures-util/src/io/read_until.rs b/third_party/rust/futures-util/src/io/read_until.rs new file mode 100644 index 0000000000..95c47e06b8 --- /dev/null +++ b/third_party/rust/futures-util/src/io/read_until.rs @@ -0,0 +1,59 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncBufRead; +use std::io; +use std::mem; +use std::pin::Pin; + +/// Future for the [`read_until`](super::AsyncBufReadExt::read_until) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ReadUntil<'a, R: ?Sized> { + reader: &'a mut R, + byte: u8, + buf: &'a mut Vec<u8>, + read: usize, +} + +impl<R: ?Sized + Unpin> Unpin for ReadUntil<'_, R> {} + +impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> { + pub(super) fn new(reader: &'a mut R, byte: u8, buf: &'a mut Vec<u8>) -> Self { + Self { reader, byte, buf, read: 0 } + } +} + +pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>( + mut reader: Pin<&mut R>, + cx: &mut Context<'_>, + byte: u8, + buf: &mut Vec<u8>, + read: &mut usize, +) -> Poll<io::Result<usize>> { + loop { + let (done, used) = { + let available = ready!(reader.as_mut().poll_fill_buf(cx))?; + if let Some(i) = memchr::memchr(byte, available) { + buf.extend_from_slice(&available[..=i]); + (true, i + 1) + } else { + buf.extend_from_slice(available); + (false, available.len()) + } + }; + reader.as_mut().consume(used); + *read += used; + if done || used == 0 { + return Poll::Ready(Ok(mem::replace(read, 0))); + } + } +} + +impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> { + type Output = io::Result<usize>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let Self { reader, byte, buf, read } = &mut *self; + read_until_internal(Pin::new(reader), cx, *byte, buf, read) + } +} diff --git a/third_party/rust/futures-util/src/io/read_vectored.rs b/third_party/rust/futures-util/src/io/read_vectored.rs new file mode 100644 index 0000000000..4e22df57e9 --- /dev/null +++ b/third_party/rust/futures-util/src/io/read_vectored.rs @@ -0,0 +1,30 @@ +use crate::io::AsyncRead; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use std::io::{self, IoSliceMut}; +use std::pin::Pin; + +/// Future for the [`read_vectored`](super::AsyncReadExt::read_vectored) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ReadVectored<'a, R: ?Sized> { + reader: &'a mut R, + bufs: &'a mut [IoSliceMut<'a>], +} + +impl<R: ?Sized + Unpin> Unpin for ReadVectored<'_, R> {} + +impl<'a, R: AsyncRead + ?Sized + Unpin> ReadVectored<'a, R> { + pub(super) fn new(reader: &'a mut R, bufs: &'a mut [IoSliceMut<'a>]) -> Self { + Self { reader, bufs } + } +} + +impl<R: AsyncRead + ?Sized + Unpin> Future for ReadVectored<'_, R> { + type Output = io::Result<usize>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = &mut *self; + Pin::new(&mut this.reader).poll_read_vectored(cx, this.bufs) + } +} diff --git a/third_party/rust/futures-util/src/io/repeat.rs b/third_party/rust/futures-util/src/io/repeat.rs new file mode 100644 index 0000000000..84abd7f769 --- /dev/null +++ b/third_party/rust/futures-util/src/io/repeat.rs @@ -0,0 +1,73 @@ +use futures_core::task::{Context, Poll}; +#[cfg(feature = "read-initializer")] +use futures_io::Initializer; +use futures_io::{AsyncRead, IoSliceMut}; +use std::fmt; +use std::io; +use std::pin::Pin; + +/// Reader for the [`repeat()`] function. +#[must_use = "readers do nothing unless polled"] +pub struct Repeat { + byte: u8, +} + +/// Creates an instance of a reader that infinitely repeats one byte. +/// +/// All reads from this reader will succeed by filling the specified buffer with +/// the given byte. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::io::{self, AsyncReadExt}; +/// +/// let mut buffer = [0; 3]; +/// let mut reader = io::repeat(0b101); +/// reader.read_exact(&mut buffer).await.unwrap(); +/// assert_eq!(buffer, [0b101, 0b101, 0b101]); +/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); +/// ``` +pub fn repeat(byte: u8) -> Repeat { + Repeat { byte } +} + +impl AsyncRead for Repeat { + #[inline] + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + for slot in &mut *buf { + *slot = self.byte; + } + Poll::Ready(Ok(buf.len())) + } + + #[inline] + fn poll_read_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll<io::Result<usize>> { + let mut nwritten = 0; + for buf in bufs { + nwritten += ready!(self.as_mut().poll_read(cx, buf))?; + } + Poll::Ready(Ok(nwritten)) + } + + #[cfg(feature = "read-initializer")] + #[inline] + unsafe fn initializer(&self) -> Initializer { + Initializer::nop() + } +} + +impl fmt::Debug for Repeat { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Repeat { .. }") + } +} diff --git a/third_party/rust/futures-util/src/io/seek.rs b/third_party/rust/futures-util/src/io/seek.rs new file mode 100644 index 0000000000..0aa2371393 --- /dev/null +++ b/third_party/rust/futures-util/src/io/seek.rs @@ -0,0 +1,30 @@ +use crate::io::{AsyncSeek, SeekFrom}; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use std::io; +use std::pin::Pin; + +/// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Seek<'a, S: ?Sized> { + seek: &'a mut S, + pos: SeekFrom, +} + +impl<S: ?Sized + Unpin> Unpin for Seek<'_, S> {} + +impl<'a, S: AsyncSeek + ?Sized + Unpin> Seek<'a, S> { + pub(super) fn new(seek: &'a mut S, pos: SeekFrom) -> Self { + Self { seek, pos } + } +} + +impl<S: AsyncSeek + ?Sized + Unpin> Future for Seek<'_, S> { + type Output = io::Result<u64>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = &mut *self; + Pin::new(&mut this.seek).poll_seek(cx, this.pos) + } +} diff --git a/third_party/rust/futures-util/src/io/sink.rs b/third_party/rust/futures-util/src/io/sink.rs new file mode 100644 index 0000000000..4a32ca7041 --- /dev/null +++ b/third_party/rust/futures-util/src/io/sink.rs @@ -0,0 +1,67 @@ +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncWrite, IoSlice}; +use std::fmt; +use std::io; +use std::pin::Pin; + +/// Writer for the [`sink()`] function. +#[must_use = "writers do nothing unless polled"] +pub struct Sink { + _priv: (), +} + +/// Creates an instance of a writer which will successfully consume all data. +/// +/// All calls to `poll_write` on the returned instance will return `Poll::Ready(Ok(buf.len()))` +/// and the contents of the buffer will not be inspected. +/// +/// # Examples +/// +/// ```rust +/// # futures::executor::block_on(async { +/// use futures::io::{self, AsyncWriteExt}; +/// +/// let buffer = vec![1, 2, 3, 5, 8]; +/// let mut writer = io::sink(); +/// let num_bytes = writer.write(&buffer).await?; +/// assert_eq!(num_bytes, 5); +/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); +/// ``` +pub fn sink() -> Sink { + Sink { _priv: () } +} + +impl AsyncWrite for Sink { + #[inline] + fn poll_write( + self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + Poll::Ready(Ok(buf.len())) + } + + #[inline] + fn poll_write_vectored( + self: Pin<&mut Self>, + _: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll<io::Result<usize>> { + Poll::Ready(Ok(bufs.iter().map(|b| b.len()).sum())) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(Ok(())) + } + #[inline] + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(Ok(())) + } +} + +impl fmt::Debug for Sink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Sink { .. }") + } +} diff --git a/third_party/rust/futures-util/src/io/split.rs b/third_party/rust/futures-util/src/io/split.rs new file mode 100644 index 0000000000..d12df867fc --- /dev/null +++ b/third_party/rust/futures-util/src/io/split.rs @@ -0,0 +1,69 @@ +use crate::lock::BiLock; +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut}; +use std::io; +use std::pin::Pin; + +/// The readable half of an object returned from `AsyncRead::split`. +#[derive(Debug)] +pub struct ReadHalf<T> { + handle: BiLock<T>, +} + +/// The writable half of an object returned from `AsyncRead::split`. +#[derive(Debug)] +pub struct WriteHalf<T> { + handle: BiLock<T>, +} + +fn lock_and_then<T, U, E, F>( + lock: &BiLock<T>, + cx: &mut Context<'_>, + f: F +) -> Poll<Result<U, E>> + where F: FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<U, E>> +{ + let mut l = ready!(lock.poll_lock(cx)); + f(l.as_pin_mut(), cx) +} + +pub(super) fn split<T: AsyncRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) { + let (a, b) = BiLock::new(t); + (ReadHalf { handle: a }, WriteHalf { handle: b }) +} + +impl<R: AsyncRead> AsyncRead for ReadHalf<R> { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) + -> Poll<io::Result<usize>> + { + lock_and_then(&self.handle, cx, |l, cx| l.poll_read(cx, buf)) + } + + fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) + -> Poll<io::Result<usize>> + { + lock_and_then(&self.handle, cx, |l, cx| l.poll_read_vectored(cx, bufs)) + } +} + +impl<W: AsyncWrite> AsyncWrite for WriteHalf<W> { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) + -> Poll<io::Result<usize>> + { + lock_and_then(&self.handle, cx, |l, cx| l.poll_write(cx, buf)) + } + + fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>]) + -> Poll<io::Result<usize>> + { + lock_and_then(&self.handle, cx, |l, cx| l.poll_write_vectored(cx, bufs)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + lock_and_then(&self.handle, cx, |l, cx| l.poll_flush(cx)) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + lock_and_then(&self.handle, cx, |l, cx| l.poll_close(cx)) + } +} diff --git a/third_party/rust/futures-util/src/io/take.rs b/third_party/rust/futures-util/src/io/take.rs new file mode 100644 index 0000000000..b1f33fa468 --- /dev/null +++ b/third_party/rust/futures-util/src/io/take.rs @@ -0,0 +1,210 @@ +use futures_core::task::{Context, Poll}; +#[cfg(feature = "read-initializer")] +use futures_io::Initializer; +use futures_io::{AsyncRead, AsyncBufRead}; +use pin_utils::{unsafe_pinned, unsafe_unpinned}; +use std::{cmp, io}; +use std::pin::Pin; + +/// Reader for the [`take`](super::AsyncReadExt::take) method. +#[derive(Debug)] +#[must_use = "readers do nothing unless you `.await` or poll them"] +pub struct Take<R> { + inner: R, + // Add '_' to avoid conflicts with `limit` method. + limit_: u64, +} + +impl<R: Unpin> Unpin for Take<R> { } + +impl<R: AsyncRead> Take<R> { + unsafe_pinned!(inner: R); + unsafe_unpinned!(limit_: u64); + + pub(super) fn new(inner: R, limit: u64) -> Self { + Self { inner, limit_: limit } + } + + /// Returns the remaining number of bytes that can be + /// read before this instance will return EOF. + /// + /// # Note + /// + /// This instance may reach `EOF` after reading fewer bytes than indicated by + /// this method if the underlying [`AsyncRead`] instance reaches EOF. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncReadExt, Cursor}; + /// + /// let reader = Cursor::new(&b"12345678"[..]); + /// let mut buffer = [0; 2]; + /// + /// let mut take = reader.take(4); + /// let n = take.read(&mut buffer).await?; + /// + /// assert_eq!(take.limit(), 2); + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + pub fn limit(&self) -> u64 { + self.limit_ + } + + /// Sets the number of bytes that can be read before this instance will + /// return EOF. This is the same as constructing a new `Take` instance, so + /// the amount of bytes read and the previous limit value don't matter when + /// calling this method. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncReadExt, Cursor}; + /// + /// let reader = Cursor::new(&b"12345678"[..]); + /// let mut buffer = [0; 4]; + /// + /// let mut take = reader.take(4); + /// let n = take.read(&mut buffer).await?; + /// + /// assert_eq!(n, 4); + /// assert_eq!(take.limit(), 0); + /// + /// take.set_limit(10); + /// let n = take.read(&mut buffer).await?; + /// assert_eq!(n, 4); + /// + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + pub fn set_limit(&mut self, limit: u64) { + self.limit_ = limit + } + + /// Gets a reference to the underlying reader. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncReadExt, Cursor}; + /// + /// let reader = Cursor::new(&b"12345678"[..]); + /// let mut buffer = [0; 4]; + /// + /// let mut take = reader.take(4); + /// let n = take.read(&mut buffer).await?; + /// + /// let cursor_ref = take.get_ref(); + /// assert_eq!(cursor_ref.position(), 4); + /// + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + pub fn get_ref(&self) -> &R { + &self.inner + } + + /// Gets a mutable reference to the underlying reader. + /// + /// Care should be taken to avoid modifying the internal I/O state of the + /// underlying reader as doing so may corrupt the internal limit of this + /// `Take`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncReadExt, Cursor}; + /// + /// let reader = Cursor::new(&b"12345678"[..]); + /// let mut buffer = [0; 4]; + /// + /// let mut take = reader.take(4); + /// let n = take.read(&mut buffer).await?; + /// + /// let cursor_mut = take.get_mut(); + /// + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + pub fn get_mut(&mut self) -> &mut R { + &mut self.inner + } + + /// Gets a pinned mutable reference to the underlying reader. + /// + /// Care should be taken to avoid modifying the internal I/O state of the + /// underlying reader as doing so may corrupt the internal limit of this + /// `Take`. + pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { + self.inner() + } + + /// Consumes the `Take`, returning the wrapped reader. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncReadExt, Cursor}; + /// + /// let reader = Cursor::new(&b"12345678"[..]); + /// let mut buffer = [0; 4]; + /// + /// let mut take = reader.take(4); + /// let n = take.read(&mut buffer).await?; + /// + /// let cursor = take.into_inner(); + /// assert_eq!(cursor.position(), 4); + /// + /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap(); + /// ``` + pub fn into_inner(self) -> R { + self.inner + } +} + +impl<R: AsyncRead> AsyncRead for Take<R> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<Result<usize, io::Error>> { + if self.limit_ == 0 { + return Poll::Ready(Ok(0)); + } + + let max = std::cmp::min(buf.len() as u64, self.limit_) as usize; + let n = ready!(self.as_mut().inner().poll_read(cx, &mut buf[..max]))?; + *self.as_mut().limit_() -= n as u64; + Poll::Ready(Ok(n)) + } + + #[cfg(feature = "read-initializer")] + unsafe fn initializer(&self) -> Initializer { + self.inner.initializer() + } +} + +impl<R: AsyncBufRead> AsyncBufRead for Take<R> { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { + let Self { inner, limit_ } = unsafe { self.get_unchecked_mut() }; + let inner = unsafe { Pin::new_unchecked(inner) }; + + // Don't call into inner reader at all at EOF because it may still block + if *limit_ == 0 { + return Poll::Ready(Ok(&[])); + } + + let buf = ready!(inner.poll_fill_buf(cx)?); + let cap = cmp::min(buf.len() as u64, *limit_) as usize; + Poll::Ready(Ok(&buf[..cap])) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + // Don't let callers reset the limit by passing an overlarge value + let amt = cmp::min(amt as u64, self.limit_) as usize; + *self.as_mut().limit_() -= amt as u64; + self.inner().consume(amt); + } +} diff --git a/third_party/rust/futures-util/src/io/window.rs b/third_party/rust/futures-util/src/io/window.rs new file mode 100644 index 0000000000..3424197d75 --- /dev/null +++ b/third_party/rust/futures-util/src/io/window.rs @@ -0,0 +1,107 @@ +use std::ops::{Bound, Range, RangeBounds}; + +/// A owned window around an underlying buffer. +/// +/// Normally slices work great for considering sub-portions of a buffer, but +/// unfortunately a slice is a *borrowed* type in Rust which has an associated +/// lifetime. When working with future and async I/O these lifetimes are not +/// always appropriate, and are sometimes difficult to store in tasks. This +/// type strives to fill this gap by providing an "owned slice" around an +/// underlying buffer of bytes. +/// +/// A `Window<T>` wraps an underlying buffer, `T`, and has configurable +/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation +/// that this type carries. +/// +/// This type can be particularly useful when working with the `write_all` +/// combinator in this crate. Data can be sliced via `Window`, consumed by +/// `write_all`, and then earned back once the write operation finishes through +/// the `into_inner` method on this type. +#[derive(Debug)] +pub struct Window<T> { + inner: T, + range: Range<usize>, +} + +impl<T: AsRef<[u8]>> Window<T> { + /// Creates a new window around the buffer `t` defaulting to the entire + /// slice. + /// + /// Further methods can be called on the returned `Window<T>` to alter the + /// window into the data provided. + pub fn new(t: T) -> Self { + Self { + range: 0..t.as_ref().len(), + inner: t, + } + } + + /// Gets a shared reference to the underlying buffer inside of this + /// `Window`. + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Gets a mutable reference to the underlying buffer inside of this + /// `Window`. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consumes this `Window`, returning the underlying buffer. + pub fn into_inner(self) -> T { + self.inner + } + + /// Returns the starting index of this window into the underlying buffer + /// `T`. + pub fn start(&self) -> usize { + self.range.start + } + + /// Returns the end index of this window into the underlying buffer + /// `T`. + pub fn end(&self) -> usize { + self.range.end + } + + /// Changes the range of this window to the range specified. + /// + /// # Panics + /// + /// This method will panic if `range` is out of bounds for the underlying + /// slice or if [`start_bound()`] of `range` comes after the [`end_bound()`]. + /// + /// [`start_bound()`]: std::ops::RangeBounds::start_bound + /// [`end_bound()`]: std::ops::RangeBounds::end_bound + pub fn set<R: RangeBounds<usize>>(&mut self, range: R) { + let start = match range.start_bound() { + Bound::Included(n) => *n, + Bound::Excluded(n) => *n + 1, + Bound::Unbounded => 0, + }; + let end = match range.end_bound() { + Bound::Included(n) => *n + 1, + Bound::Excluded(n) => *n, + Bound::Unbounded => self.inner.as_ref().len(), + }; + + assert!(end <= self.inner.as_ref().len()); + assert!(start <= end); + + self.range.start = start; + self.range.end = end; + } +} + +impl<T: AsRef<[u8]>> AsRef<[u8]> for Window<T> { + fn as_ref(&self) -> &[u8] { + &self.inner.as_ref()[self.range.start..self.range.end] + } +} + +impl<T: AsMut<[u8]>> AsMut<[u8]> for Window<T> { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.inner.as_mut()[self.range.start..self.range.end] + } +} diff --git a/third_party/rust/futures-util/src/io/write.rs b/third_party/rust/futures-util/src/io/write.rs new file mode 100644 index 0000000000..c47ef9e2eb --- /dev/null +++ b/third_party/rust/futures-util/src/io/write.rs @@ -0,0 +1,30 @@ +use crate::io::AsyncWrite; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use std::io; +use std::pin::Pin; + +/// Future for the [`write`](super::AsyncWriteExt::write) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Write<'a, W: ?Sized> { + writer: &'a mut W, + buf: &'a [u8], +} + +impl<W: ?Sized + Unpin> Unpin for Write<'_, W> {} + +impl<'a, W: AsyncWrite + ?Sized + Unpin> Write<'a, W> { + pub(super) fn new(writer: &'a mut W, buf: &'a [u8]) -> Self { + Self { writer, buf } + } +} + +impl<W: AsyncWrite + ?Sized + Unpin> Future for Write<'_, W> { + type Output = io::Result<usize>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = &mut *self; + Pin::new(&mut this.writer).poll_write(cx, this.buf) + } +} diff --git a/third_party/rust/futures-util/src/io/write_all.rs b/third_party/rust/futures-util/src/io/write_all.rs new file mode 100644 index 0000000000..57f1400b0e --- /dev/null +++ b/third_party/rust/futures-util/src/io/write_all.rs @@ -0,0 +1,42 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncWrite; +use std::io; +use std::mem; +use std::pin::Pin; + +/// Future for the [`write_all`](super::AsyncWriteExt::write_all) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct WriteAll<'a, W: ?Sized> { + writer: &'a mut W, + buf: &'a [u8], +} + +impl<W: ?Sized + Unpin> Unpin for WriteAll<'_, W> {} + +impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAll<'a, W> { + pub(super) fn new(writer: &'a mut W, buf: &'a [u8]) -> Self { + WriteAll { writer, buf } + } +} + +impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAll<'_, W> { + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + let this = &mut *self; + while !this.buf.is_empty() { + let n = ready!(Pin::new(&mut this.writer).poll_write(cx, this.buf))?; + { + let (_, rest) = mem::replace(&mut this.buf, &[]).split_at(n); + this.buf = rest; + } + if n == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) + } + } + + Poll::Ready(Ok(())) + } +} diff --git a/third_party/rust/futures-util/src/io/write_vectored.rs b/third_party/rust/futures-util/src/io/write_vectored.rs new file mode 100644 index 0000000000..14a01d7302 --- /dev/null +++ b/third_party/rust/futures-util/src/io/write_vectored.rs @@ -0,0 +1,30 @@ +use crate::io::AsyncWrite; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use std::io::{self, IoSlice}; +use std::pin::Pin; + +/// Future for the [`write_vectored`](super::AsyncWriteExt::write_vectored) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct WriteVectored<'a, W: ?Sized> { + writer: &'a mut W, + bufs: &'a [IoSlice<'a>], +} + +impl<W: ?Sized + Unpin> Unpin for WriteVectored<'_, W> {} + +impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteVectored<'a, W> { + pub(super) fn new(writer: &'a mut W, bufs: &'a [IoSlice<'a>]) -> Self { + Self { writer, bufs } + } +} + +impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteVectored<'_, W> { + type Output = io::Result<usize>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = &mut *self; + Pin::new(&mut this.writer).poll_write_vectored(cx, this.bufs) + } +} |