1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
use bytes::{Bytes, BytesMut};
use futures_core::stream::Stream;
use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncRead;
const DEFAULT_CAPACITY: usize = 4096;
pin_project! {
/// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks.
///
/// This stream is fused. It performs the inverse operation of
/// [`StreamReader`].
///
/// # Example
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> std::io::Result<()> {
/// use tokio_stream::StreamExt;
/// use tokio_util::io::ReaderStream;
///
/// // Create a stream of data.
/// let data = b"hello, world!";
/// let mut stream = ReaderStream::new(&data[..]);
///
/// // Read all of the chunks into a vector.
/// let mut stream_contents = Vec::new();
/// while let Some(chunk) = stream.next().await {
/// stream_contents.extend_from_slice(&chunk?);
/// }
///
/// // Once the chunks are concatenated, we should have the
/// // original data.
/// assert_eq!(stream_contents, data);
/// # Ok(())
/// # }
/// ```
///
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`StreamReader`]: crate::io::StreamReader
/// [`Stream`]: futures_core::Stream
#[derive(Debug)]
pub struct ReaderStream<R> {
// Reader itself.
//
// This value is `None` if the stream has terminated.
#[pin]
reader: Option<R>,
// Working buffer, used to optimize allocations.
buf: BytesMut,
capacity: usize,
}
}
impl<R: AsyncRead> ReaderStream<R> {
/// Convert an [`AsyncRead`] into a [`Stream`] with item type
/// `Result<Bytes, std::io::Error>`.
///
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`Stream`]: futures_core::Stream
pub fn new(reader: R) -> Self {
ReaderStream {
reader: Some(reader),
buf: BytesMut::new(),
capacity: DEFAULT_CAPACITY,
}
}
/// Convert an [`AsyncRead`] into a [`Stream`] with item type
/// `Result<Bytes, std::io::Error>`,
/// with a specific read buffer initial capacity.
///
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`Stream`]: futures_core::Stream
pub fn with_capacity(reader: R, capacity: usize) -> Self {
ReaderStream {
reader: Some(reader),
buf: BytesMut::with_capacity(capacity),
capacity,
}
}
}
impl<R: AsyncRead> Stream for ReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use crate::util::poll_read_buf;
let mut this = self.as_mut().project();
let reader = match this.reader.as_pin_mut() {
Some(r) => r,
None => return Poll::Ready(None),
};
if this.buf.capacity() == 0 {
this.buf.reserve(*this.capacity);
}
match poll_read_buf(reader, cx, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
self.project().reader.set(None);
Poll::Ready(Some(Err(err)))
}
Poll::Ready(Ok(0)) => {
self.project().reader.set(None);
Poll::Ready(None)
}
Poll::Ready(Ok(_)) => {
let chunk = this.buf.split();
Poll::Ready(Some(Ok(chunk.freeze())))
}
}
}
}
|