diff options
Diffstat (limited to '')
-rw-r--r-- | vendor/gix-features-0.35.0/src/io.rs | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/vendor/gix-features-0.35.0/src/io.rs b/vendor/gix-features-0.35.0/src/io.rs new file mode 100644 index 000000000..fba273693 --- /dev/null +++ b/vendor/gix-features-0.35.0/src/io.rs @@ -0,0 +1,94 @@ +//!A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle. + +/// A unidirectional pipe for bytes, analogous to a unix pipe. Available with the `io-pipe` feature toggle. +#[cfg(feature = "io-pipe")] +pub mod pipe { + use std::io; + + use bytes::{Buf, BufMut, BytesMut}; + + /// The write-end of the pipe, receiving items to become available in the [`Reader`]. + /// + /// It's commonly used with the [`std::io::Write`] trait it implements. + pub struct Writer { + /// The channel through which bytes are transferred. Useful for sending [`std::io::Error`]s instead. + pub channel: std::sync::mpsc::SyncSender<io::Result<BytesMut>>, + buf: BytesMut, + } + + /// The read-end of the pipe, implementing the [`std::io::Read`] trait. + pub struct Reader { + channel: std::sync::mpsc::Receiver<io::Result<BytesMut>>, + buf: BytesMut, + } + + impl io::BufRead for Reader { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + if self.buf.is_empty() { + match self.channel.recv() { + Ok(Ok(buf)) => self.buf = buf, + Ok(Err(err)) => return Err(err), + Err(_) => {} + } + }; + Ok(&self.buf) + } + + fn consume(&mut self, amt: usize) { + self.buf.advance(amt.min(self.buf.len())); + } + } + + impl io::Read for Reader { + fn read(&mut self, mut out: &mut [u8]) -> io::Result<usize> { + let mut written = 0; + while !out.is_empty() { + if self.buf.is_empty() { + match self.channel.recv() { + Ok(Ok(buf)) => self.buf = buf, + Ok(Err(err)) => return Err(err), + Err(_) => break, + } + } + let bytes_to_write = self.buf.len().min(out.len()); + let (to_write, rest) = out.split_at_mut(bytes_to_write); + self.buf.split_to(bytes_to_write).copy_to_slice(to_write); + out = rest; + written += bytes_to_write; + } + Ok(written) + } + } + + impl io::Write for Writer { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.buf.put_slice(buf); + self.channel + .send(Ok(self.buf.split())) + .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?; + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + /// Returns the _([`write`][Writer], [`read`][Reader])_ ends of a pipe for transferring bytes, analogous to a unix pipe. + /// + /// * `in_flight_writes` defines the amount of chunks of bytes to keep in memory until the `write` end will block when writing. + /// If `0`, the `write` end will always block until the `read` end consumes the transferred bytes. + pub fn unidirectional(in_flight_writes: usize) -> (Writer, Reader) { + let (tx, rx) = std::sync::mpsc::sync_channel(in_flight_writes); + ( + Writer { + channel: tx, + buf: BytesMut::with_capacity(4096), + }, + Reader { + channel: rx, + buf: BytesMut::new(), + }, + ) + } +} |