summaryrefslogtreecommitdiffstats
path: root/vendor/futures-util/src/stream/try_stream/into_async_read.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/futures-util/src/stream/try_stream/into_async_read.rs')
-rw-r--r--vendor/futures-util/src/stream/try_stream/into_async_read.rs101
1 files changed, 51 insertions, 50 deletions
diff --git a/vendor/futures-util/src/stream/try_stream/into_async_read.rs b/vendor/futures-util/src/stream/try_stream/into_async_read.rs
index 914b277a0..ffbfc7eae 100644
--- a/vendor/futures-util/src/stream/try_stream/into_async_read.rs
+++ b/vendor/futures-util/src/stream/try_stream/into_async_read.rs
@@ -1,30 +1,26 @@
-use crate::stream::TryStreamExt;
use core::pin::Pin;
use futures_core::ready;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
+use pin_project_lite::pin_project;
use std::cmp;
use std::io::{Error, Result};
-/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
-#[derive(Debug)]
-#[must_use = "readers do nothing unless polled"]
-#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
-pub struct IntoAsyncRead<St>
-where
- St: TryStream<Error = Error> + Unpin,
- St::Ok: AsRef<[u8]>,
-{
- stream: St,
- state: ReadState<St::Ok>,
-}
-
-impl<St> Unpin for IntoAsyncRead<St>
-where
- St: TryStream<Error = Error> + Unpin,
- St::Ok: AsRef<[u8]>,
-{
+pin_project! {
+ /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
+ #[derive(Debug)]
+ #[must_use = "readers do nothing unless polled"]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
+ pub struct IntoAsyncRead<St>
+ where
+ St: TryStream<Error = Error>,
+ St::Ok: AsRef<[u8]>,
+ {
+ #[pin]
+ stream: St,
+ state: ReadState<St::Ok>,
+ }
}
#[derive(Debug)]
@@ -36,7 +32,7 @@ enum ReadState<T: AsRef<[u8]>> {
impl<St> IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + Unpin,
+ St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
pub(super) fn new(stream: St) -> Self {
@@ -46,16 +42,18 @@ where
impl<St> AsyncRead for IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + Unpin,
+ St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
fn poll_read(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
+ let mut this = self.project();
+
loop {
- match &mut self.state {
+ match this.state {
ReadState::Ready { chunk, chunk_start } => {
let chunk = chunk.as_ref();
let len = cmp::min(buf.len(), chunk.len() - *chunk_start);
@@ -64,23 +62,23 @@ where
*chunk_start += len;
if chunk.len() == *chunk_start {
- self.state = ReadState::PendingChunk;
+ *this.state = ReadState::PendingChunk;
}
return Poll::Ready(Ok(len));
}
- ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) {
+ ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
- self.state = ReadState::Ready { chunk, chunk_start: 0 };
+ *this.state = ReadState::Ready { chunk, chunk_start: 0 };
}
}
Some(Err(err)) => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Err(err));
}
None => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Ok(0));
}
},
@@ -94,51 +92,52 @@ where
impl<St> AsyncWrite for IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + AsyncWrite + Unpin,
+ St: TryStream<Error = Error> + AsyncWrite,
St::Ok: AsRef<[u8]>,
{
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<Result<usize>> {
- Pin::new(&mut self.stream).poll_write(cx, buf)
+ fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
+ let this = self.project();
+ this.stream.poll_write(cx, buf)
}
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- Pin::new(&mut self.stream).poll_flush(cx)
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ let this = self.project();
+ this.stream.poll_flush(cx)
}
- fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- Pin::new(&mut self.stream).poll_close(cx)
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ let this = self.project();
+ this.stream.poll_close(cx)
}
}
impl<St> AsyncBufRead for IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + Unpin,
+ St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
- fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
- while let ReadState::PendingChunk = self.state {
- match ready!(self.stream.try_poll_next_unpin(cx)) {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
+ let mut this = self.project();
+
+ while let ReadState::PendingChunk = this.state {
+ match ready!(this.stream.as_mut().try_poll_next(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
- self.state = ReadState::Ready { chunk, chunk_start: 0 };
+ *this.state = ReadState::Ready { chunk, chunk_start: 0 };
}
}
Some(Err(err)) => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Err(err));
}
None => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Ok(&[]));
}
}
}
- if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state {
+ if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state {
let chunk = chunk.as_ref();
return Poll::Ready(Ok(&chunk[chunk_start..]));
}
@@ -147,16 +146,18 @@ where
Poll::Ready(Ok(&[]))
}
- fn consume(mut self: Pin<&mut Self>, amount: usize) {
+ fn consume(self: Pin<&mut Self>, amount: usize) {
+ let this = self.project();
+
// https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295
if amount == 0 {
return;
}
- if let ReadState::Ready { chunk, chunk_start } = &mut self.state {
+ if let ReadState::Ready { chunk, chunk_start } = this.state {
*chunk_start += amount;
debug_assert!(*chunk_start <= chunk.as_ref().len());
if *chunk_start >= chunk.as_ref().len() {
- self.state = ReadState::PendingChunk;
+ *this.state = ReadState::PendingChunk;
}
} else {
debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk");