//!A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle. /// A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle. #[cfg(feature = "io-pipe")] pub mod pipe { use std::io; use bytes::{Buf, BufMut, BytesMut}; /// The write-end of the pipe, receiving items to become available in the [`Reader`]. /// /// It's commonly used with the [`std::io::Write`] trait it implements. pub struct Writer { /// The channel through which bytes are transferred. Useful for sending [`std::io::Error`]s instead. pub channel: std::sync::mpsc::SyncSender>, buf: BytesMut, } /// The read-end of the pipe, implementing the [`std::io::Read`] trait. pub struct Reader { channel: std::sync::mpsc::Receiver>, buf: BytesMut, } impl io::BufRead for Reader { fn fill_buf(&mut self) -> io::Result<&[u8]> { if self.buf.is_empty() { match self.channel.recv() { Ok(Ok(buf)) => self.buf = buf, Ok(Err(err)) => return Err(err), Err(_) => {} } }; Ok(&self.buf) } fn consume(&mut self, amt: usize) { self.buf.advance(amt.min(self.buf.len())); } } impl io::Read for Reader { fn read(&mut self, mut out: &mut [u8]) -> io::Result { let mut written = 0; while !out.is_empty() { if self.buf.is_empty() { match self.channel.recv() { Ok(Ok(buf)) => self.buf = buf, Ok(Err(err)) => return Err(err), Err(_) => break, } } let bytes_to_write = self.buf.len().min(out.len()); let (to_write, rest) = out.split_at_mut(bytes_to_write); self.buf.split_to(bytes_to_write).copy_to_slice(to_write); out = rest; written += bytes_to_write; } Ok(written) } } impl io::Write for Writer { fn write(&mut self, buf: &[u8]) -> io::Result { self.buf.put_slice(buf); self.channel .send(Ok(self.buf.split())) .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?; Ok(buf.len()) } fn flush(&mut self) -> io::Result<()> { Ok(()) } } /// Returns the _([`write`][Writer], [`read`][Reader])_ ends of a pipe for transferring bytes, analogous to a unix pipe. /// /// * `in_flight_writes` defines the amount of chunks of bytes to keep in memory until the `write` end will block when writing. /// If `0`, the `write` end will always block until the `read` end consumes the transferred bytes. pub fn unidirectional(in_flight_writes: usize) -> (Writer, Reader) { let (tx, rx) = std::sync::mpsc::sync_channel(in_flight_writes); ( Writer { channel: tx, buf: BytesMut::with_capacity(4096), }, Reader { channel: rx, buf: BytesMut::new(), }, ) } }