use crate::io::{self, BufWriter, IoSlice, Write}; use crate::sys_common::memchr; /// Private helper struct for implementing the line-buffered writing logic. /// This shim temporarily wraps a BufWriter, and uses its internals to /// implement a line-buffered writer (specifically by using the internal /// methods like write_to_buf and flush_buf). In this way, a more /// efficient abstraction can be created than one that only had access to /// `write` and `flush`, without needlessly duplicating a lot of the /// implementation details of BufWriter. This also allows existing /// `BufWriters` to be temporarily given line-buffering logic; this is what /// enables Stdout to be alternately in line-buffered or block-buffered mode. #[derive(Debug)] pub struct LineWriterShim<'a, W: Write> { buffer: &'a mut BufWriter, } impl<'a, W: Write> LineWriterShim<'a, W> { pub fn new(buffer: &'a mut BufWriter) -> Self { Self { buffer } } /// Get a reference to the inner writer (that is, the writer /// wrapped by the BufWriter). fn inner(&self) -> &W { self.buffer.get_ref() } /// Get a mutable reference to the inner writer (that is, the writer /// wrapped by the BufWriter). Be careful with this writer, as writes to /// it will bypass the buffer. fn inner_mut(&mut self) -> &mut W { self.buffer.get_mut() } /// Get the content currently buffered in self.buffer fn buffered(&self) -> &[u8] { self.buffer.buffer() } /// Flush the buffer iff the last byte is a newline (indicating that an /// earlier write only succeeded partially, and we want to retry flushing /// the buffered line before continuing with a subsequent write) fn flush_if_completed_line(&mut self) -> io::Result<()> { match self.buffered().last().copied() { Some(b'\n') => self.buffer.flush_buf(), _ => Ok(()), } } } impl<'a, W: Write> Write for LineWriterShim<'a, W> { /// Write some data into this BufReader with line buffering. This means /// that, if any newlines are present in the data, the data up to the last /// newline is sent directly to the underlying writer, and data after it /// is buffered. Returns the number of bytes written. /// /// This function operates on a "best effort basis"; in keeping with the /// convention of `Write::write`, it makes at most one attempt to write /// new data to the underlying writer. If that write only reports a partial /// success, the remaining data will be buffered. /// /// Because this function attempts to send completed lines to the underlying /// writer, it will also flush the existing buffer if it ends with a /// newline, even if the incoming data does not contain any newlines. fn write(&mut self, buf: &[u8]) -> io::Result { let newline_idx = match memchr::memrchr(b'\n', buf) { // If there are no new newlines (that is, if this write is less than // one line), just do a regular buffered write (which may flush if // we exceed the inner buffer's size) None => { self.flush_if_completed_line()?; return self.buffer.write(buf); } // Otherwise, arrange for the lines to be written directly to the // inner writer. Some(newline_idx) => newline_idx + 1, }; // Flush existing content to prepare for our write. We have to do this // before attempting to write `buf` in order to maintain consistency; // if we add `buf` to the buffer then try to flush it all at once, // we're obligated to return Ok(), which would mean suppressing any // errors that occur during flush. self.buffer.flush_buf()?; // This is what we're going to try to write directly to the inner // writer. The rest will be buffered, if nothing goes wrong. let lines = &buf[..newline_idx]; // Write `lines` directly to the inner writer. In keeping with the // `write` convention, make at most one attempt to add new (unbuffered) // data. Because this write doesn't touch the BufWriter state directly, // and the buffer is known to be empty, we don't need to worry about // self.buffer.panicked here. let flushed = self.inner_mut().write(lines)?; // If buffer returns Ok(0), propagate that to the caller without // doing additional buffering; otherwise we're just guaranteeing // an "ErrorKind::WriteZero" later. if flushed == 0 { return Ok(0); } // Now that the write has succeeded, buffer the rest (or as much of // the rest as possible). If there were any unwritten newlines, we // only buffer out to the last unwritten newline that fits in the // buffer; this helps prevent flushing partial lines on subsequent // calls to LineWriterShim::write. // Handle the cases in order of most-common to least-common, under // the presumption that most writes succeed in totality, and that most // writes are smaller than the buffer. // - Is this a partial line (ie, no newlines left in the unwritten tail) // - If not, does the data out to the last unwritten newline fit in // the buffer? // - If not, scan for the last newline that *does* fit in the buffer let tail = if flushed >= newline_idx { &buf[flushed..] } else if newline_idx - flushed <= self.buffer.capacity() { &buf[flushed..newline_idx] } else { let scan_area = &buf[flushed..]; let scan_area = &scan_area[..self.buffer.capacity()]; match memchr::memrchr(b'\n', scan_area) { Some(newline_idx) => &scan_area[..newline_idx + 1], None => scan_area, } }; let buffered = self.buffer.write_to_buf(tail); Ok(flushed + buffered) } fn flush(&mut self) -> io::Result<()> { self.buffer.flush() } /// Write some vectored data into this BufReader with line buffering. This /// means that, if any newlines are present in the data, the data up to /// and including the buffer containing the last newline is sent directly /// to the inner writer, and the data after it is buffered. Returns the /// number of bytes written. /// /// This function operates on a "best effort basis"; in keeping with the /// convention of `Write::write`, it makes at most one attempt to write /// new data to the underlying writer. /// /// Because this function attempts to send completed lines to the underlying /// writer, it will also flush the existing buffer if it contains any /// newlines. /// /// Because sorting through an array of `IoSlice` can be a bit convoluted, /// This method differs from write in the following ways: /// /// - It attempts to write the full content of all the buffers up to and /// including the one containing the last newline. This means that it /// may attempt to write a partial line, that buffer has data past the /// newline. /// - If the write only reports partial success, it does not attempt to /// find the precise location of the written bytes and buffer the rest. /// /// If the underlying vector doesn't support vectored writing, we instead /// simply write the first non-empty buffer with `write`. This way, we /// get the benefits of more granular partial-line handling without losing /// anything in efficiency fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { // If there's no specialized behavior for write_vectored, just use // write. This has the benefit of more granular partial-line handling. if !self.is_write_vectored() { return match bufs.iter().find(|buf| !buf.is_empty()) { Some(buf) => self.write(buf), None => Ok(0), }; } // Find the buffer containing the last newline let last_newline_buf_idx = bufs .iter() .enumerate() .rev() .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i)); // If there are no new newlines (that is, if this write is less than // one line), just do a regular buffered write let last_newline_buf_idx = match last_newline_buf_idx { // No newlines; just do a normal buffered write None => { self.flush_if_completed_line()?; return self.buffer.write_vectored(bufs); } Some(i) => i, }; // Flush existing content to prepare for our write self.buffer.flush_buf()?; // This is what we're going to try to write directly to the inner // writer. The rest will be buffered, if nothing goes wrong. let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1); // Write `lines` directly to the inner writer. In keeping with the // `write` convention, make at most one attempt to add new (unbuffered) // data. Because this write doesn't touch the BufWriter state directly, // and the buffer is known to be empty, we don't need to worry about // self.panicked here. let flushed = self.inner_mut().write_vectored(lines)?; // If inner returns Ok(0), propagate that to the caller without // doing additional buffering; otherwise we're just guaranteeing // an "ErrorKind::WriteZero" later. if flushed == 0 { return Ok(0); } // Don't try to reconstruct the exact amount written; just bail // in the event of a partial write let lines_len = lines.iter().map(|buf| buf.len()).sum(); if flushed < lines_len { return Ok(flushed); } // Now that the write has succeeded, buffer the rest (or as much of the // rest as possible) let buffered: usize = tail .iter() .filter(|buf| !buf.is_empty()) .map(|buf| self.buffer.write_to_buf(buf)) .take_while(|&n| n > 0) .sum(); Ok(flushed + buffered) } fn is_write_vectored(&self) -> bool { self.inner().is_write_vectored() } /// Write some data into this BufReader with line buffering. This means /// that, if any newlines are present in the data, the data up to the last /// newline is sent directly to the underlying writer, and data after it /// is buffered. /// /// Because this function attempts to send completed lines to the underlying /// writer, it will also flush the existing buffer if it contains any /// newlines, even if the incoming data does not contain any newlines. fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { match memchr::memrchr(b'\n', buf) { // If there are no new newlines (that is, if this write is less than // one line), just do a regular buffered write (which may flush if // we exceed the inner buffer's size) None => { self.flush_if_completed_line()?; self.buffer.write_all(buf) } Some(newline_idx) => { let (lines, tail) = buf.split_at(newline_idx + 1); if self.buffered().is_empty() { self.inner_mut().write_all(lines)?; } else { // If there is any buffered data, we add the incoming lines // to that buffer before flushing, which saves us at least // one write call. We can't really do this with `write`, // since we can't do this *and* not suppress errors *and* // report a consistent state to the caller in a return // value, but here in write_all it's fine. self.buffer.write_all(lines)?; self.buffer.flush_buf()?; } self.buffer.write_all(tail) } } } }