1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
//!A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle.
/// A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle.
#[cfg(feature = "io-pipe")]
pub mod pipe {
use std::io;
use bytes::{Buf, BufMut, BytesMut};
/// The write-end of the pipe, receiving items to become available in the [`Reader`].
///
/// It's commonly used with the [`std::io::Write`] trait it implements.
pub struct Writer {
/// The channel through which bytes are transferred. Useful for sending [`std::io::Error`]s instead.
pub channel: std::sync::mpsc::SyncSender<io::Result<BytesMut>>,
buf: BytesMut,
}
/// The read-end of the pipe, implementing the [`std::io::Read`] trait.
pub struct Reader {
channel: std::sync::mpsc::Receiver<io::Result<BytesMut>>,
buf: BytesMut,
}
impl io::BufRead for Reader {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if self.buf.is_empty() {
match self.channel.recv() {
Ok(Ok(buf)) => self.buf = buf,
Ok(Err(err)) => return Err(err),
Err(_) => {}
}
};
Ok(&self.buf)
}
fn consume(&mut self, amt: usize) {
self.buf.advance(amt.min(self.buf.len()));
}
}
impl io::Read for Reader {
fn read(&mut self, mut out: &mut [u8]) -> io::Result<usize> {
let mut written = 0;
while !out.is_empty() {
if self.buf.is_empty() {
match self.channel.recv() {
Ok(Ok(buf)) => self.buf = buf,
Ok(Err(err)) => return Err(err),
Err(_) => break,
}
}
let bytes_to_write = self.buf.len().min(out.len());
let (to_write, rest) = out.split_at_mut(bytes_to_write);
self.buf.split_to(bytes_to_write).copy_to_slice(to_write);
out = rest;
written += bytes_to_write;
}
Ok(written)
}
}
impl io::Write for Writer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buf.put_slice(buf);
self.channel
.send(Ok(self.buf.split()))
.map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
/// Returns the _([`write`][Writer], [`read`][Reader])_ ends of a pipe for transferring bytes, analogous to a unix pipe.
///
/// * `in_flight_writes` defines the amount of chunks of bytes to keep in memory until the `write` end will block when writing.
/// If `None` or `0`, the `write` end will always block until the `read` end consumes the transferred bytes.
pub fn unidirectional(in_flight_writes: impl Into<Option<usize>>) -> (Writer, Reader) {
let (tx, rx) = std::sync::mpsc::sync_channel(in_flight_writes.into().unwrap_or(0));
(
Writer {
channel: tx,
buf: BytesMut::with_capacity(4096),
},
Reader {
channel: rx,
buf: BytesMut::new(),
},
)
}
}
|