diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:02:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:02:58 +0000 |
commit | 698f8c2f01ea549d77d7dc3338a12e04c11057b9 (patch) | |
tree | 173a775858bd501c378080a10dca74132f05bc50 /library/std/src/io/buffered | |
parent | Initial commit. (diff) | |
download | rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.tar.xz rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.zip |
Adding upstream version 1.64.0+dfsg1.upstream/1.64.0+dfsg1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'library/std/src/io/buffered')
-rw-r--r-- | library/std/src/io/buffered/bufreader.rs | 496 | ||||
-rw-r--r-- | library/std/src/io/buffered/bufreader/buffer.rs | 105 | ||||
-rw-r--r-- | library/std/src/io/buffered/bufwriter.rs | 674 | ||||
-rw-r--r-- | library/std/src/io/buffered/linewriter.rs | 232 | ||||
-rw-r--r-- | library/std/src/io/buffered/linewritershim.rs | 276 | ||||
-rw-r--r-- | library/std/src/io/buffered/mod.rs | 196 | ||||
-rw-r--r-- | library/std/src/io/buffered/tests.rs | 1039 |
7 files changed, 3018 insertions, 0 deletions
diff --git a/library/std/src/io/buffered/bufreader.rs b/library/std/src/io/buffered/bufreader.rs new file mode 100644 index 000000000..f7fbaa9c2 --- /dev/null +++ b/library/std/src/io/buffered/bufreader.rs @@ -0,0 +1,496 @@ +mod buffer; + +use crate::fmt; +use crate::io::{ + self, BufRead, IoSliceMut, Read, ReadBuf, Seek, SeekFrom, SizeHint, DEFAULT_BUF_SIZE, +}; +use buffer::Buffer; + +/// The `BufReader<R>` struct adds buffering to any reader. +/// +/// It can be excessively inefficient to work directly with a [`Read`] instance. +/// For example, every call to [`read`][`TcpStream::read`] on [`TcpStream`] +/// results in a system call. A `BufReader<R>` performs large, infrequent reads on +/// the underlying [`Read`] and maintains an in-memory buffer of the results. +/// +/// `BufReader<R>` can improve the speed of programs that make *small* and +/// *repeated* read calls to the same file or network socket. It does not +/// help when reading very large amounts at once, or reading just one or a few +/// times. It also provides no advantage when reading from a source that is +/// already in memory, like a <code>[Vec]\<u8></code>. +/// +/// When the `BufReader<R>` is dropped, the contents of its buffer will be +/// discarded. Creating multiple instances of a `BufReader<R>` on the same +/// stream can cause data loss. Reading from the underlying reader after +/// unwrapping the `BufReader<R>` with [`BufReader::into_inner`] can also cause +/// data loss. +/// +// HACK(#78696): can't use `crate` for associated items +/// [`TcpStream::read`]: super::super::super::net::TcpStream::read +/// [`TcpStream`]: crate::net::TcpStream +/// +/// # Examples +/// +/// ```no_run +/// use std::io::prelude::*; +/// use std::io::BufReader; +/// use std::fs::File; +/// +/// fn main() -> std::io::Result<()> { +/// let f = File::open("log.txt")?; +/// let mut reader = BufReader::new(f); +/// +/// let mut line = String::new(); +/// let len = reader.read_line(&mut line)?; +/// println!("First line is {len} bytes long"); +/// Ok(()) +/// } +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct BufReader<R> { + inner: R, + buf: Buffer, +} + +impl<R: Read> BufReader<R> { + /// Creates a new `BufReader<R>` with a default buffer capacity. The default is currently 8 KB, + /// but may change in the future. + /// + /// # Examples + /// + /// ```no_run + /// use std::io::BufReader; + /// use std::fs::File; + /// + /// fn main() -> std::io::Result<()> { + /// let f = File::open("log.txt")?; + /// let reader = BufReader::new(f); + /// Ok(()) + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn new(inner: R) -> BufReader<R> { + BufReader::with_capacity(DEFAULT_BUF_SIZE, inner) + } + + /// Creates a new `BufReader<R>` with the specified buffer capacity. + /// + /// # Examples + /// + /// Creating a buffer with ten bytes of capacity: + /// + /// ```no_run + /// use std::io::BufReader; + /// use std::fs::File; + /// + /// fn main() -> std::io::Result<()> { + /// let f = File::open("log.txt")?; + /// let reader = BufReader::with_capacity(10, f); + /// Ok(()) + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> { + BufReader { inner, buf: Buffer::with_capacity(capacity) } + } +} + +impl<R> BufReader<R> { + /// Gets a reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + /// + /// # Examples + /// + /// ```no_run + /// use std::io::BufReader; + /// use std::fs::File; + /// + /// fn main() -> std::io::Result<()> { + /// let f1 = File::open("log.txt")?; + /// let reader = BufReader::new(f1); + /// + /// let f2 = reader.get_ref(); + /// Ok(()) + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn get_ref(&self) -> &R { + &self.inner + } + + /// Gets a mutable reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + /// + /// # Examples + /// + /// ```no_run + /// use std::io::BufReader; + /// use std::fs::File; + /// + /// fn main() -> std::io::Result<()> { + /// let f1 = File::open("log.txt")?; + /// let mut reader = BufReader::new(f1); + /// + /// let f2 = reader.get_mut(); + /// Ok(()) + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn get_mut(&mut self) -> &mut R { + &mut self.inner + } + + /// Returns a reference to the internally buffered data. + /// + /// Unlike [`fill_buf`], this will not attempt to fill the buffer if it is empty. + /// + /// [`fill_buf`]: BufRead::fill_buf + /// + /// # Examples + /// + /// ```no_run + /// use std::io::{BufReader, BufRead}; + /// use std::fs::File; + /// + /// fn main() -> std::io::Result<()> { + /// let f = File::open("log.txt")?; + /// let mut reader = BufReader::new(f); + /// assert!(reader.buffer().is_empty()); + /// + /// if reader.fill_buf()?.len() > 0 { + /// assert!(!reader.buffer().is_empty()); + /// } + /// Ok(()) + /// } + /// ``` + #[stable(feature = "bufreader_buffer", since = "1.37.0")] + pub fn buffer(&self) -> &[u8] { + self.buf.buffer() + } + + /// Returns the number of bytes the internal buffer can hold at once. + /// + /// # Examples + /// + /// ```no_run + /// use std::io::{BufReader, BufRead}; + /// use std::fs::File; + /// + /// fn main() -> std::io::Result<()> { + /// let f = File::open("log.txt")?; + /// let mut reader = BufReader::new(f); + /// + /// let capacity = reader.capacity(); + /// let buffer = reader.fill_buf()?; + /// assert!(buffer.len() <= capacity); + /// Ok(()) + /// } + /// ``` + #[stable(feature = "buffered_io_capacity", since = "1.46.0")] + pub fn capacity(&self) -> usize { + self.buf.capacity() + } + + /// Unwraps this `BufReader<R>`, returning the underlying reader. + /// + /// Note that any leftover data in the internal buffer is lost. Therefore, + /// a following read from the underlying reader may lead to data loss. + /// + /// # Examples + /// + /// ```no_run + /// use std::io::BufReader; + /// use std::fs::File; + /// + /// fn main() -> std::io::Result<()> { + /// let f1 = File::open("log.txt")?; + /// let reader = BufReader::new(f1); + /// + /// let f2 = reader.into_inner(); + /// Ok(()) + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn into_inner(self) -> R { + self.inner + } + + /// Invalidates all data in the internal buffer. + #[inline] + fn discard_buffer(&mut self) { + self.buf.discard_buffer() + } +} + +impl<R: Seek> BufReader<R> { + /// Seeks relative to the current position. If the new position lies within the buffer, + /// the buffer will not be flushed, allowing for more efficient seeks. + /// This method does not return the location of the underlying reader, so the caller + /// must track this information themselves if it is required. + #[stable(feature = "bufreader_seek_relative", since = "1.53.0")] + pub fn seek_relative(&mut self, offset: i64) -> io::Result<()> { + let pos = self.buf.pos() as u64; + if offset < 0 { + if let Some(_) = pos.checked_sub((-offset) as u64) { + self.buf.unconsume((-offset) as usize); + return Ok(()); + } + } else if let Some(new_pos) = pos.checked_add(offset as u64) { + if new_pos <= self.buf.filled() as u64 { + self.buf.consume(offset as usize); + return Ok(()); + } + } + + self.seek(SeekFrom::Current(offset)).map(drop) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<R: Read> Read for BufReader<R> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + // If we don't have any buffered data and we're doing a massive read + // (larger than our internal buffer), bypass our internal buffer + // entirely. + if self.buf.pos() == self.buf.filled() && buf.len() >= self.capacity() { + self.discard_buffer(); + return self.inner.read(buf); + } + let nread = { + let mut rem = self.fill_buf()?; + rem.read(buf)? + }; + self.consume(nread); + Ok(nread) + } + + fn read_buf(&mut self, buf: &mut ReadBuf<'_>) -> io::Result<()> { + // If we don't have any buffered data and we're doing a massive read + // (larger than our internal buffer), bypass our internal buffer + // entirely. + if self.buf.pos() == self.buf.filled() && buf.remaining() >= self.capacity() { + self.discard_buffer(); + return self.inner.read_buf(buf); + } + + let prev = buf.filled_len(); + + let mut rem = self.fill_buf()?; + rem.read_buf(buf)?; + + self.consume(buf.filled_len() - prev); //slice impl of read_buf known to never unfill buf + + Ok(()) + } + + // Small read_exacts from a BufReader are extremely common when used with a deserializer. + // The default implementation calls read in a loop, which results in surprisingly poor code + // generation for the common path where the buffer has enough bytes to fill the passed-in + // buffer. + fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> { + if self.buf.consume_with(buf.len(), |claimed| buf.copy_from_slice(claimed)) { + return Ok(()); + } + + crate::io::default_read_exact(self, buf) + } + + fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { + let total_len = bufs.iter().map(|b| b.len()).sum::<usize>(); + if self.buf.pos() == self.buf.filled() && total_len >= self.capacity() { + self.discard_buffer(); + return self.inner.read_vectored(bufs); + } + let nread = { + let mut rem = self.fill_buf()?; + rem.read_vectored(bufs)? + }; + self.consume(nread); + Ok(nread) + } + + fn is_read_vectored(&self) -> bool { + self.inner.is_read_vectored() + } + + // The inner reader might have an optimized `read_to_end`. Drain our buffer and then + // delegate to the inner implementation. + fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> { + let inner_buf = self.buffer(); + buf.extend_from_slice(inner_buf); + let nread = inner_buf.len(); + self.discard_buffer(); + Ok(nread + self.inner.read_to_end(buf)?) + } + + // The inner reader might have an optimized `read_to_end`. Drain our buffer and then + // delegate to the inner implementation. + fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> { + // In the general `else` case below we must read bytes into a side buffer, check + // that they are valid UTF-8, and then append them to `buf`. This requires a + // potentially large memcpy. + // + // If `buf` is empty--the most common case--we can leverage `append_to_string` + // to read directly into `buf`'s internal byte buffer, saving an allocation and + // a memcpy. + if buf.is_empty() { + // `append_to_string`'s safety relies on the buffer only being appended to since + // it only checks the UTF-8 validity of new data. If there were existing content in + // `buf` then an untrustworthy reader (i.e. `self.inner`) could not only append + // bytes but also modify existing bytes and render them invalid. On the other hand, + // if `buf` is empty then by definition any writes must be appends and + // `append_to_string` will validate all of the new bytes. + unsafe { crate::io::append_to_string(buf, |b| self.read_to_end(b)) } + } else { + // We cannot append our byte buffer directly onto the `buf` String as there could + // be an incomplete UTF-8 sequence that has only been partially read. We must read + // everything into a side buffer first and then call `from_utf8` on the complete + // buffer. + let mut bytes = Vec::new(); + self.read_to_end(&mut bytes)?; + let string = crate::str::from_utf8(&bytes).map_err(|_| { + io::const_io_error!( + io::ErrorKind::InvalidData, + "stream did not contain valid UTF-8", + ) + })?; + *buf += string; + Ok(string.len()) + } + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<R: Read> BufRead for BufReader<R> { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + self.buf.fill_buf(&mut self.inner) + } + + fn consume(&mut self, amt: usize) { + self.buf.consume(amt) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<R> fmt::Debug for BufReader<R> +where + R: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BufReader") + .field("reader", &self.inner) + .field( + "buffer", + &format_args!("{}/{}", self.buf.filled() - self.buf.pos(), self.capacity()), + ) + .finish() + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<R: Seek> Seek for BufReader<R> { + /// Seek to an offset, in bytes, in the underlying reader. + /// + /// The position used for seeking with <code>[SeekFrom::Current]\(_)</code> is the + /// position the underlying reader would be at if the `BufReader<R>` had no + /// internal buffer. + /// + /// Seeking always discards the internal buffer, even if the seek position + /// would otherwise fall within it. This guarantees that calling + /// [`BufReader::into_inner()`] immediately after a seek yields the underlying reader + /// at the same position. + /// + /// To seek without discarding the internal buffer, use [`BufReader::seek_relative`]. + /// + /// See [`std::io::Seek`] for more details. + /// + /// Note: In the edge case where you're seeking with <code>[SeekFrom::Current]\(n)</code> + /// where `n` minus the internal buffer length overflows an `i64`, two + /// seeks will be performed instead of one. If the second seek returns + /// [`Err`], the underlying reader will be left at the same position it would + /// have if you called `seek` with <code>[SeekFrom::Current]\(0)</code>. + /// + /// [`std::io::Seek`]: Seek + fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> { + let result: u64; + if let SeekFrom::Current(n) = pos { + let remainder = (self.buf.filled() - self.buf.pos()) as i64; + // it should be safe to assume that remainder fits within an i64 as the alternative + // means we managed to allocate 8 exbibytes and that's absurd. + // But it's not out of the realm of possibility for some weird underlying reader to + // support seeking by i64::MIN so we need to handle underflow when subtracting + // remainder. + if let Some(offset) = n.checked_sub(remainder) { + result = self.inner.seek(SeekFrom::Current(offset))?; + } else { + // seek backwards by our remainder, and then by the offset + self.inner.seek(SeekFrom::Current(-remainder))?; + self.discard_buffer(); + result = self.inner.seek(SeekFrom::Current(n))?; + } + } else { + // Seeking with Start/End doesn't care about our buffer length. + result = self.inner.seek(pos)?; + } + self.discard_buffer(); + Ok(result) + } + + /// Returns the current seek position from the start of the stream. + /// + /// The value returned is equivalent to `self.seek(SeekFrom::Current(0))` + /// but does not flush the internal buffer. Due to this optimization the + /// function does not guarantee that calling `.into_inner()` immediately + /// afterwards will yield the underlying reader at the same position. Use + /// [`BufReader::seek`] instead if you require that guarantee. + /// + /// # Panics + /// + /// This function will panic if the position of the inner reader is smaller + /// than the amount of buffered data. That can happen if the inner reader + /// has an incorrect implementation of [`Seek::stream_position`], or if the + /// position has gone out of sync due to calling [`Seek::seek`] directly on + /// the underlying reader. + /// + /// # Example + /// + /// ```no_run + /// use std::{ + /// io::{self, BufRead, BufReader, Seek}, + /// fs::File, + /// }; + /// + /// fn main() -> io::Result<()> { + /// let mut f = BufReader::new(File::open("foo.txt")?); + /// + /// let before = f.stream_position()?; + /// f.read_line(&mut String::new())?; + /// let after = f.stream_position()?; + /// + /// println!("The first line was {} bytes long", after - before); + /// Ok(()) + /// } + /// ``` + fn stream_position(&mut self) -> io::Result<u64> { + let remainder = (self.buf.filled() - self.buf.pos()) as u64; + self.inner.stream_position().map(|pos| { + pos.checked_sub(remainder).expect( + "overflow when subtracting remaining buffer size from inner stream position", + ) + }) + } +} + +impl<T> SizeHint for BufReader<T> { + #[inline] + fn lower_bound(&self) -> usize { + SizeHint::lower_bound(self.get_ref()) + self.buffer().len() + } + + #[inline] + fn upper_bound(&self) -> Option<usize> { + SizeHint::upper_bound(self.get_ref()).and_then(|up| self.buffer().len().checked_add(up)) + } +} diff --git a/library/std/src/io/buffered/bufreader/buffer.rs b/library/std/src/io/buffered/bufreader/buffer.rs new file mode 100644 index 000000000..8ae01f3b0 --- /dev/null +++ b/library/std/src/io/buffered/bufreader/buffer.rs @@ -0,0 +1,105 @@ +///! An encapsulation of `BufReader`'s buffer management logic. +/// +/// This module factors out the basic functionality of `BufReader` in order to protect two core +/// invariants: +/// * `filled` bytes of `buf` are always initialized +/// * `pos` is always <= `filled` +/// Since this module encapsulates the buffer management logic, we can ensure that the range +/// `pos..filled` is always a valid index into the initialized region of the buffer. This means +/// that user code which wants to do reads from a `BufReader` via `buffer` + `consume` can do so +/// without encountering any runtime bounds checks. +use crate::cmp; +use crate::io::{self, Read, ReadBuf}; +use crate::mem::MaybeUninit; + +pub struct Buffer { + // The buffer. + buf: Box<[MaybeUninit<u8>]>, + // The current seek offset into `buf`, must always be <= `filled`. + pos: usize, + // Each call to `fill_buf` sets `filled` to indicate how many bytes at the start of `buf` are + // initialized with bytes from a read. + filled: usize, +} + +impl Buffer { + #[inline] + pub fn with_capacity(capacity: usize) -> Self { + let buf = Box::new_uninit_slice(capacity); + Self { buf, pos: 0, filled: 0 } + } + + #[inline] + pub fn buffer(&self) -> &[u8] { + // SAFETY: self.pos and self.cap are valid, and self.cap => self.pos, and + // that region is initialized because those are all invariants of this type. + unsafe { MaybeUninit::slice_assume_init_ref(self.buf.get_unchecked(self.pos..self.filled)) } + } + + #[inline] + pub fn capacity(&self) -> usize { + self.buf.len() + } + + #[inline] + pub fn filled(&self) -> usize { + self.filled + } + + #[inline] + pub fn pos(&self) -> usize { + self.pos + } + + #[inline] + pub fn discard_buffer(&mut self) { + self.pos = 0; + self.filled = 0; + } + + #[inline] + pub fn consume(&mut self, amt: usize) { + self.pos = cmp::min(self.pos + amt, self.filled); + } + + /// If there are `amt` bytes available in the buffer, pass a slice containing those bytes to + /// `visitor` and return true. If there are not enough bytes available, return false. + #[inline] + pub fn consume_with<V>(&mut self, amt: usize, mut visitor: V) -> bool + where + V: FnMut(&[u8]), + { + if let Some(claimed) = self.buffer().get(..amt) { + visitor(claimed); + // If the indexing into self.buffer() succeeds, amt must be a valid increment. + self.pos += amt; + true + } else { + false + } + } + + #[inline] + pub fn unconsume(&mut self, amt: usize) { + self.pos = self.pos.saturating_sub(amt); + } + + #[inline] + pub fn fill_buf(&mut self, mut reader: impl Read) -> io::Result<&[u8]> { + // If we've reached the end of our internal buffer then we need to fetch + // some more data from the reader. + // Branch using `>=` instead of the more correct `==` + // to tell the compiler that the pos..cap slice is always valid. + if self.pos >= self.filled { + debug_assert!(self.pos == self.filled); + + let mut readbuf = ReadBuf::uninit(&mut self.buf); + + reader.read_buf(&mut readbuf)?; + + self.filled = readbuf.filled_len(); + self.pos = 0; + } + Ok(self.buffer()) + } +} diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs new file mode 100644 index 000000000..6acb937e7 --- /dev/null +++ b/library/std/src/io/buffered/bufwriter.rs @@ -0,0 +1,674 @@ +use crate::error; +use crate::fmt; +use crate::io::{ + self, ErrorKind, IntoInnerError, IoSlice, Seek, SeekFrom, Write, DEFAULT_BUF_SIZE, +}; +use crate::mem; +use crate::ptr; + +/// Wraps a writer and buffers its output. +/// +/// It can be excessively inefficient to work directly with something that +/// implements [`Write`]. For example, every call to +/// [`write`][`TcpStream::write`] on [`TcpStream`] results in a system call. A +/// `BufWriter<W>` keeps an in-memory buffer of data and writes it to an underlying +/// writer in large, infrequent batches. +/// +/// `BufWriter<W>` can improve the speed of programs that make *small* and +/// *repeated* write calls to the same file or network socket. It does not +/// help when writing very large amounts at once, or writing just one or a few +/// times. It also provides no advantage when writing to a destination that is +/// in memory, like a <code>[Vec]\<u8></code>. +/// +/// It is critical to call [`flush`] before `BufWriter<W>` is dropped. Though +/// dropping will attempt to flush the contents of the buffer, any errors +/// that happen in the process of dropping will be ignored. Calling [`flush`] +/// ensures that the buffer is empty and thus dropping will not even attempt +/// file operations. +/// +/// # Examples +/// +/// Let's write the numbers one through ten to a [`TcpStream`]: +/// +/// ```no_run +/// use std::io::prelude::*; +/// use std::net::TcpStream; +/// +/// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap(); +/// +/// for i in 0..10 { +/// stream.write(&[i+1]).unwrap(); +/// } +/// ``` +/// +/// Because we're not buffering, we write each one in turn, incurring the +/// overhead of a system call per byte written. We can fix this with a +/// `BufWriter<W>`: +/// +/// ```no_run +/// use std::io::prelude::*; +/// use std::io::BufWriter; +/// use std::net::TcpStream; +/// +/// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); +/// +/// for i in 0..10 { +/// stream.write(&[i+1]).unwrap(); +/// } +/// stream.flush().unwrap(); +/// ``` +/// +/// By wrapping the stream with a `BufWriter<W>`, these ten writes are all grouped +/// together by the buffer and will all be written out in one system call when +/// the `stream` is flushed. +/// +// HACK(#78696): can't use `crate` for associated items +/// [`TcpStream::write`]: super::super::super::net::TcpStream::write +/// [`TcpStream`]: crate::net::TcpStream +/// [`flush`]: BufWriter::flush +#[stable(feature = "rust1", since = "1.0.0")] +pub struct BufWriter<W: Write> { + inner: W, + // The buffer. Avoid using this like a normal `Vec` in common code paths. + // That is, don't use `buf.push`, `buf.extend_from_slice`, or any other + // methods that require bounds checking or the like. This makes an enormous + // difference to performance (we may want to stop using a `Vec` entirely). + buf: Vec<u8>, + // #30888: If the inner writer panics in a call to write, we don't want to + // write the buffered data a second time in BufWriter's destructor. This + // flag tells the Drop impl if it should skip the flush. + panicked: bool, +} + +impl<W: Write> BufWriter<W> { + /// Creates a new `BufWriter<W>` with a default buffer capacity. The default is currently 8 KB, + /// but may change in the future. + /// + /// # Examples + /// + /// ```no_run + /// use std::io::BufWriter; + /// use std::net::TcpStream; + /// + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn new(inner: W) -> BufWriter<W> { + BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner) + } + + /// Creates a new `BufWriter<W>` with at least the specified buffer capacity. + /// + /// # Examples + /// + /// Creating a buffer with a buffer of at least a hundred bytes. + /// + /// ```no_run + /// use std::io::BufWriter; + /// use std::net::TcpStream; + /// + /// let stream = TcpStream::connect("127.0.0.1:34254").unwrap(); + /// let mut buffer = BufWriter::with_capacity(100, stream); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> { + BufWriter { inner, buf: Vec::with_capacity(capacity), panicked: false } + } + + /// Send data in our local buffer into the inner writer, looping as + /// necessary until either it's all been sent or an error occurs. + /// + /// Because all the data in the buffer has been reported to our owner as + /// "successfully written" (by returning nonzero success values from + /// `write`), any 0-length writes from `inner` must be reported as i/o + /// errors from this method. + pub(in crate::io) fn flush_buf(&mut self) -> io::Result<()> { + /// Helper struct to ensure the buffer is updated after all the writes + /// are complete. It tracks the number of written bytes and drains them + /// all from the front of the buffer when dropped. + struct BufGuard<'a> { + buffer: &'a mut Vec<u8>, + written: usize, + } + + impl<'a> BufGuard<'a> { + fn new(buffer: &'a mut Vec<u8>) -> Self { + Self { buffer, written: 0 } + } + + /// The unwritten part of the buffer + fn remaining(&self) -> &[u8] { + &self.buffer[self.written..] + } + + /// Flag some bytes as removed from the front of the buffer + fn consume(&mut self, amt: usize) { + self.written += amt; + } + + /// true if all of the bytes have been written + fn done(&self) -> bool { + self.written >= self.buffer.len() + } + } + + impl Drop for BufGuard<'_> { + fn drop(&mut self) { + if self.written > 0 { + self.buffer.drain(..self.written); + } + } + } + + let mut guard = BufGuard::new(&mut self.buf); + while !guard.done() { + self.panicked = true; + let r = self.inner.write(guard.remaining()); + self.panicked = false; + + match r { + Ok(0) => { + return Err(io::const_io_error!( + ErrorKind::WriteZero, + "failed to write the buffered data", + )); + } + Ok(n) => guard.consume(n), + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + Ok(()) + } + + /// Buffer some data without flushing it, regardless of the size of the + /// data. Writes as much as possible without exceeding capacity. Returns + /// the number of bytes written. + pub(super) fn write_to_buf(&mut self, buf: &[u8]) -> usize { + let available = self.spare_capacity(); + let amt_to_buffer = available.min(buf.len()); + + // SAFETY: `amt_to_buffer` is <= buffer's spare capacity by construction. + unsafe { + self.write_to_buffer_unchecked(&buf[..amt_to_buffer]); + } + + amt_to_buffer + } + + /// Gets a reference to the underlying writer. + /// + /// # Examples + /// + /// ```no_run + /// use std::io::BufWriter; + /// use std::net::TcpStream; + /// + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// + /// // we can use reference just like buffer + /// let reference = buffer.get_ref(); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn get_ref(&self) -> &W { + &self.inner + } + + /// Gets a mutable reference to the underlying writer. + /// + /// It is inadvisable to directly write to the underlying writer. + /// + /// # Examples + /// + /// ```no_run + /// use std::io::BufWriter; + /// use std::net::TcpStream; + /// + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// + /// // we can use reference just like buffer + /// let reference = buffer.get_mut(); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn get_mut(&mut self) -> &mut W { + &mut self.inner + } + + /// Returns a reference to the internally buffered data. + /// + /// # Examples + /// + /// ```no_run + /// use std::io::BufWriter; + /// use std::net::TcpStream; + /// + /// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// + /// // See how many bytes are currently buffered + /// let bytes_buffered = buf_writer.buffer().len(); + /// ``` + #[stable(feature = "bufreader_buffer", since = "1.37.0")] + pub fn buffer(&self) -> &[u8] { + &self.buf + } + + /// Returns a mutable reference to the internal buffer. + /// + /// This can be used to write data directly into the buffer without triggering writers + /// to the underlying writer. + /// + /// That the buffer is a `Vec` is an implementation detail. + /// Callers should not modify the capacity as there currently is no public API to do so + /// and thus any capacity changes would be unexpected by the user. + pub(in crate::io) fn buffer_mut(&mut self) -> &mut Vec<u8> { + &mut self.buf + } + + /// Returns the number of bytes the internal buffer can hold without flushing. + /// + /// # Examples + /// + /// ```no_run + /// use std::io::BufWriter; + /// use std::net::TcpStream; + /// + /// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// + /// // Check the capacity of the inner buffer + /// let capacity = buf_writer.capacity(); + /// // Calculate how many bytes can be written without flushing + /// let without_flush = capacity - buf_writer.buffer().len(); + /// ``` + #[stable(feature = "buffered_io_capacity", since = "1.46.0")] + pub fn capacity(&self) -> usize { + self.buf.capacity() + } + + /// Unwraps this `BufWriter<W>`, returning the underlying writer. + /// + /// The buffer is written out before returning the writer. + /// + /// # Errors + /// + /// An [`Err`] will be returned if an error occurs while flushing the buffer. + /// + /// # Examples + /// + /// ```no_run + /// use std::io::BufWriter; + /// use std::net::TcpStream; + /// + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// + /// // unwrap the TcpStream and flush the buffer + /// let stream = buffer.into_inner().unwrap(); + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn into_inner(mut self) -> Result<W, IntoInnerError<BufWriter<W>>> { + match self.flush_buf() { + Err(e) => Err(IntoInnerError::new(self, e)), + Ok(()) => Ok(self.into_parts().0), + } + } + + /// Disassembles this `BufWriter<W>`, returning the underlying writer, and any buffered but + /// unwritten data. + /// + /// If the underlying writer panicked, it is not known what portion of the data was written. + /// In this case, we return `WriterPanicked` for the buffered data (from which the buffer + /// contents can still be recovered). + /// + /// `into_parts` makes no attempt to flush data and cannot fail. + /// + /// # Examples + /// + /// ``` + /// use std::io::{BufWriter, Write}; + /// + /// let mut buffer = [0u8; 10]; + /// let mut stream = BufWriter::new(buffer.as_mut()); + /// write!(stream, "too much data").unwrap(); + /// stream.flush().expect_err("it doesn't fit"); + /// let (recovered_writer, buffered_data) = stream.into_parts(); + /// assert_eq!(recovered_writer.len(), 0); + /// assert_eq!(&buffered_data.unwrap(), b"ata"); + /// ``` + #[stable(feature = "bufwriter_into_parts", since = "1.56.0")] + pub fn into_parts(mut self) -> (W, Result<Vec<u8>, WriterPanicked>) { + let buf = mem::take(&mut self.buf); + let buf = if !self.panicked { Ok(buf) } else { Err(WriterPanicked { buf }) }; + + // SAFETY: forget(self) prevents double dropping inner + let inner = unsafe { ptr::read(&mut self.inner) }; + mem::forget(self); + + (inner, buf) + } + + // Ensure this function does not get inlined into `write`, so that it + // remains inlineable and its common path remains as short as possible. + // If this function ends up being called frequently relative to `write`, + // it's likely a sign that the client is using an improperly sized buffer + // or their write patterns are somewhat pathological. + #[cold] + #[inline(never)] + fn write_cold(&mut self, buf: &[u8]) -> io::Result<usize> { + if buf.len() > self.spare_capacity() { + self.flush_buf()?; + } + + // Why not len > capacity? To avoid a needless trip through the buffer when the input + // exactly fills it. We'd just need to flush it to the underlying writer anyway. + if buf.len() >= self.buf.capacity() { + self.panicked = true; + let r = self.get_mut().write(buf); + self.panicked = false; + r + } else { + // Write to the buffer. In this case, we write to the buffer even if it fills it + // exactly. Doing otherwise would mean flushing the buffer, then writing this + // input to the inner writer, which in many cases would be a worse strategy. + + // SAFETY: There was either enough spare capacity already, or there wasn't and we + // flushed the buffer to ensure that there is. In the latter case, we know that there + // is because flushing ensured that our entire buffer is spare capacity, and we entered + // this block because the input buffer length is less than that capacity. In either + // case, it's safe to write the input buffer to our buffer. + unsafe { + self.write_to_buffer_unchecked(buf); + } + + Ok(buf.len()) + } + } + + // Ensure this function does not get inlined into `write_all`, so that it + // remains inlineable and its common path remains as short as possible. + // If this function ends up being called frequently relative to `write_all`, + // it's likely a sign that the client is using an improperly sized buffer + // or their write patterns are somewhat pathological. + #[cold] + #[inline(never)] + fn write_all_cold(&mut self, buf: &[u8]) -> io::Result<()> { + // Normally, `write_all` just calls `write` in a loop. We can do better + // by calling `self.get_mut().write_all()` directly, which avoids + // round trips through the buffer in the event of a series of partial + // writes in some circumstances. + + if buf.len() > self.spare_capacity() { + self.flush_buf()?; + } + + // Why not len > capacity? To avoid a needless trip through the buffer when the input + // exactly fills it. We'd just need to flush it to the underlying writer anyway. + if buf.len() >= self.buf.capacity() { + self.panicked = true; + let r = self.get_mut().write_all(buf); + self.panicked = false; + r + } else { + // Write to the buffer. In this case, we write to the buffer even if it fills it + // exactly. Doing otherwise would mean flushing the buffer, then writing this + // input to the inner writer, which in many cases would be a worse strategy. + + // SAFETY: There was either enough spare capacity already, or there wasn't and we + // flushed the buffer to ensure that there is. In the latter case, we know that there + // is because flushing ensured that our entire buffer is spare capacity, and we entered + // this block because the input buffer length is less than that capacity. In either + // case, it's safe to write the input buffer to our buffer. + unsafe { + self.write_to_buffer_unchecked(buf); + } + + Ok(()) + } + } + + // SAFETY: Requires `buf.len() <= self.buf.capacity() - self.buf.len()`, + // i.e., that input buffer length is less than or equal to spare capacity. + #[inline] + unsafe fn write_to_buffer_unchecked(&mut self, buf: &[u8]) { + debug_assert!(buf.len() <= self.spare_capacity()); + let old_len = self.buf.len(); + let buf_len = buf.len(); + let src = buf.as_ptr(); + let dst = self.buf.as_mut_ptr().add(old_len); + ptr::copy_nonoverlapping(src, dst, buf_len); + self.buf.set_len(old_len + buf_len); + } + + #[inline] + fn spare_capacity(&self) -> usize { + self.buf.capacity() - self.buf.len() + } +} + +#[stable(feature = "bufwriter_into_parts", since = "1.56.0")] +/// Error returned for the buffered data from `BufWriter::into_parts`, when the underlying +/// writer has previously panicked. Contains the (possibly partly written) buffered data. +/// +/// # Example +/// +/// ``` +/// use std::io::{self, BufWriter, Write}; +/// use std::panic::{catch_unwind, AssertUnwindSafe}; +/// +/// struct PanickingWriter; +/// impl Write for PanickingWriter { +/// fn write(&mut self, buf: &[u8]) -> io::Result<usize> { panic!() } +/// fn flush(&mut self) -> io::Result<()> { panic!() } +/// } +/// +/// let mut stream = BufWriter::new(PanickingWriter); +/// write!(stream, "some data").unwrap(); +/// let result = catch_unwind(AssertUnwindSafe(|| { +/// stream.flush().unwrap() +/// })); +/// assert!(result.is_err()); +/// let (recovered_writer, buffered_data) = stream.into_parts(); +/// assert!(matches!(recovered_writer, PanickingWriter)); +/// assert_eq!(buffered_data.unwrap_err().into_inner(), b"some data"); +/// ``` +pub struct WriterPanicked { + buf: Vec<u8>, +} + +impl WriterPanicked { + /// Returns the perhaps-unwritten data. Some of this data may have been written by the + /// panicking call(s) to the underlying writer, so simply writing it again is not a good idea. + #[must_use = "`self` will be dropped if the result is not used"] + #[stable(feature = "bufwriter_into_parts", since = "1.56.0")] + pub fn into_inner(self) -> Vec<u8> { + self.buf + } + + const DESCRIPTION: &'static str = + "BufWriter inner writer panicked, what data remains unwritten is not known"; +} + +#[stable(feature = "bufwriter_into_parts", since = "1.56.0")] +impl error::Error for WriterPanicked { + #[allow(deprecated, deprecated_in_future)] + fn description(&self) -> &str { + Self::DESCRIPTION + } +} + +#[stable(feature = "bufwriter_into_parts", since = "1.56.0")] +impl fmt::Display for WriterPanicked { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", Self::DESCRIPTION) + } +} + +#[stable(feature = "bufwriter_into_parts", since = "1.56.0")] +impl fmt::Debug for WriterPanicked { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WriterPanicked") + .field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity())) + .finish() + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<W: Write> Write for BufWriter<W> { + #[inline] + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + // Use < instead of <= to avoid a needless trip through the buffer in some cases. + // See `write_cold` for details. + if buf.len() < self.spare_capacity() { + // SAFETY: safe by above conditional. + unsafe { + self.write_to_buffer_unchecked(buf); + } + + Ok(buf.len()) + } else { + self.write_cold(buf) + } + } + + #[inline] + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + // Use < instead of <= to avoid a needless trip through the buffer in some cases. + // See `write_all_cold` for details. + if buf.len() < self.spare_capacity() { + // SAFETY: safe by above conditional. + unsafe { + self.write_to_buffer_unchecked(buf); + } + + Ok(()) + } else { + self.write_all_cold(buf) + } + } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { + // FIXME: Consider applying `#[inline]` / `#[inline(never)]` optimizations already applied + // to `write` and `write_all`. The performance benefits can be significant. See #79930. + if self.get_ref().is_write_vectored() { + // We have to handle the possibility that the total length of the buffers overflows + // `usize` (even though this can only happen if multiple `IoSlice`s reference the + // same underlying buffer, as otherwise the buffers wouldn't fit in memory). If the + // computation overflows, then surely the input cannot fit in our buffer, so we forward + // to the inner writer's `write_vectored` method to let it handle it appropriately. + let saturated_total_len = + bufs.iter().fold(0usize, |acc, b| acc.saturating_add(b.len())); + + if saturated_total_len > self.spare_capacity() { + // Flush if the total length of the input exceeds our buffer's spare capacity. + // If we would have overflowed, this condition also holds, and we need to flush. + self.flush_buf()?; + } + + if saturated_total_len >= self.buf.capacity() { + // Forward to our inner writer if the total length of the input is greater than or + // equal to our buffer capacity. If we would have overflowed, this condition also + // holds, and we punt to the inner writer. + self.panicked = true; + let r = self.get_mut().write_vectored(bufs); + self.panicked = false; + r + } else { + // `saturated_total_len < self.buf.capacity()` implies that we did not saturate. + + // SAFETY: We checked whether or not the spare capacity was large enough above. If + // it was, then we're safe already. If it wasn't, we flushed, making sufficient + // room for any input <= the buffer size, which includes this input. + unsafe { + bufs.iter().for_each(|b| self.write_to_buffer_unchecked(b)); + }; + + Ok(saturated_total_len) + } + } else { + let mut iter = bufs.iter(); + let mut total_written = if let Some(buf) = iter.by_ref().find(|&buf| !buf.is_empty()) { + // This is the first non-empty slice to write, so if it does + // not fit in the buffer, we still get to flush and proceed. + if buf.len() > self.spare_capacity() { + self.flush_buf()?; + } + if buf.len() >= self.buf.capacity() { + // The slice is at least as large as the buffering capacity, + // so it's better to write it directly, bypassing the buffer. + self.panicked = true; + let r = self.get_mut().write(buf); + self.panicked = false; + return r; + } else { + // SAFETY: We checked whether or not the spare capacity was large enough above. + // If it was, then we're safe already. If it wasn't, we flushed, making + // sufficient room for any input <= the buffer size, which includes this input. + unsafe { + self.write_to_buffer_unchecked(buf); + } + + buf.len() + } + } else { + return Ok(0); + }; + debug_assert!(total_written != 0); + for buf in iter { + if buf.len() <= self.spare_capacity() { + // SAFETY: safe by above conditional. + unsafe { + self.write_to_buffer_unchecked(buf); + } + + // This cannot overflow `usize`. If we are here, we've written all of the bytes + // so far to our buffer, and we've ensured that we never exceed the buffer's + // capacity. Therefore, `total_written` <= `self.buf.capacity()` <= `usize::MAX`. + total_written += buf.len(); + } else { + break; + } + } + Ok(total_written) + } + } + + fn is_write_vectored(&self) -> bool { + true + } + + fn flush(&mut self) -> io::Result<()> { + self.flush_buf().and_then(|()| self.get_mut().flush()) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<W: Write> fmt::Debug for BufWriter<W> +where + W: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BufWriter") + .field("writer", &self.inner) + .field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity())) + .finish() + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<W: Write + Seek> Seek for BufWriter<W> { + /// Seek to the offset, in bytes, in the underlying writer. + /// + /// Seeking always writes out the internal buffer before seeking. + fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> { + self.flush_buf()?; + self.get_mut().seek(pos) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<W: Write> Drop for BufWriter<W> { + fn drop(&mut self) { + if !self.panicked { + // dtors should not panic, so we ignore a failed flush + let _r = self.flush_buf(); + } + } +} diff --git a/library/std/src/io/buffered/linewriter.rs b/library/std/src/io/buffered/linewriter.rs new file mode 100644 index 000000000..a26a4ab33 --- /dev/null +++ b/library/std/src/io/buffered/linewriter.rs @@ -0,0 +1,232 @@ +use crate::fmt; +use crate::io::{self, buffered::LineWriterShim, BufWriter, IntoInnerError, IoSlice, Write}; + +/// Wraps a writer and buffers output to it, flushing whenever a newline +/// (`0x0a`, `'\n'`) is detected. +/// +/// The [`BufWriter`] struct wraps a writer and buffers its output. +/// But it only does this batched write when it goes out of scope, or when the +/// internal buffer is full. Sometimes, you'd prefer to write each line as it's +/// completed, rather than the entire buffer at once. Enter `LineWriter`. It +/// does exactly that. +/// +/// Like [`BufWriter`], a `LineWriter`’s buffer will also be flushed when the +/// `LineWriter` goes out of scope or when its internal buffer is full. +/// +/// If there's still a partial line in the buffer when the `LineWriter` is +/// dropped, it will flush those contents. +/// +/// # Examples +/// +/// We can use `LineWriter` to write one line at a time, significantly +/// reducing the number of actual writes to the file. +/// +/// ```no_run +/// use std::fs::{self, File}; +/// use std::io::prelude::*; +/// use std::io::LineWriter; +/// +/// fn main() -> std::io::Result<()> { +/// let road_not_taken = b"I shall be telling this with a sigh +/// Somewhere ages and ages hence: +/// Two roads diverged in a wood, and I - +/// I took the one less traveled by, +/// And that has made all the difference."; +/// +/// let file = File::create("poem.txt")?; +/// let mut file = LineWriter::new(file); +/// +/// file.write_all(b"I shall be telling this with a sigh")?; +/// +/// // No bytes are written until a newline is encountered (or +/// // the internal buffer is filled). +/// assert_eq!(fs::read_to_string("poem.txt")?, ""); +/// file.write_all(b"\n")?; +/// assert_eq!( +/// fs::read_to_string("poem.txt")?, +/// "I shall be telling this with a sigh\n", +/// ); +/// +/// // Write the rest of the poem. +/// file.write_all(b"Somewhere ages and ages hence: +/// Two roads diverged in a wood, and I - +/// I took the one less traveled by, +/// And that has made all the difference.")?; +/// +/// // The last line of the poem doesn't end in a newline, so +/// // we have to flush or drop the `LineWriter` to finish +/// // writing. +/// file.flush()?; +/// +/// // Confirm the whole poem was written. +/// assert_eq!(fs::read("poem.txt")?, &road_not_taken[..]); +/// Ok(()) +/// } +/// ``` +#[stable(feature = "rust1", since = "1.0.0")] +pub struct LineWriter<W: Write> { + inner: BufWriter<W>, +} + +impl<W: Write> LineWriter<W> { + /// Creates a new `LineWriter`. + /// + /// # Examples + /// + /// ```no_run + /// use std::fs::File; + /// use std::io::LineWriter; + /// + /// fn main() -> std::io::Result<()> { + /// let file = File::create("poem.txt")?; + /// let file = LineWriter::new(file); + /// Ok(()) + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn new(inner: W) -> LineWriter<W> { + // Lines typically aren't that long, don't use a giant buffer + LineWriter::with_capacity(1024, inner) + } + + /// Creates a new `LineWriter` with at least the specified capacity for the + /// internal buffer. + /// + /// # Examples + /// + /// ```no_run + /// use std::fs::File; + /// use std::io::LineWriter; + /// + /// fn main() -> std::io::Result<()> { + /// let file = File::create("poem.txt")?; + /// let file = LineWriter::with_capacity(100, file); + /// Ok(()) + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn with_capacity(capacity: usize, inner: W) -> LineWriter<W> { + LineWriter { inner: BufWriter::with_capacity(capacity, inner) } + } + + /// Gets a reference to the underlying writer. + /// + /// # Examples + /// + /// ```no_run + /// use std::fs::File; + /// use std::io::LineWriter; + /// + /// fn main() -> std::io::Result<()> { + /// let file = File::create("poem.txt")?; + /// let file = LineWriter::new(file); + /// + /// let reference = file.get_ref(); + /// Ok(()) + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn get_ref(&self) -> &W { + self.inner.get_ref() + } + + /// Gets a mutable reference to the underlying writer. + /// + /// Caution must be taken when calling methods on the mutable reference + /// returned as extra writes could corrupt the output stream. + /// + /// # Examples + /// + /// ```no_run + /// use std::fs::File; + /// use std::io::LineWriter; + /// + /// fn main() -> std::io::Result<()> { + /// let file = File::create("poem.txt")?; + /// let mut file = LineWriter::new(file); + /// + /// // we can use reference just like file + /// let reference = file.get_mut(); + /// Ok(()) + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn get_mut(&mut self) -> &mut W { + self.inner.get_mut() + } + + /// Unwraps this `LineWriter`, returning the underlying writer. + /// + /// The internal buffer is written out before returning the writer. + /// + /// # Errors + /// + /// An [`Err`] will be returned if an error occurs while flushing the buffer. + /// + /// # Examples + /// + /// ```no_run + /// use std::fs::File; + /// use std::io::LineWriter; + /// + /// fn main() -> std::io::Result<()> { + /// let file = File::create("poem.txt")?; + /// + /// let writer: LineWriter<File> = LineWriter::new(file); + /// + /// let file: File = writer.into_inner()?; + /// Ok(()) + /// } + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn into_inner(self) -> Result<W, IntoInnerError<LineWriter<W>>> { + self.inner.into_inner().map_err(|err| err.new_wrapped(|inner| LineWriter { inner })) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<W: Write> Write for LineWriter<W> { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + LineWriterShim::new(&mut self.inner).write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { + LineWriterShim::new(&mut self.inner).write_vectored(bufs) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } + + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + LineWriterShim::new(&mut self.inner).write_all(buf) + } + + fn write_all_vectored(&mut self, bufs: &mut [IoSlice<'_>]) -> io::Result<()> { + LineWriterShim::new(&mut self.inner).write_all_vectored(bufs) + } + + fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> { + LineWriterShim::new(&mut self.inner).write_fmt(fmt) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<W: Write> fmt::Debug for LineWriter<W> +where + W: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("LineWriter") + .field("writer", &self.get_ref()) + .field( + "buffer", + &format_args!("{}/{}", self.inner.buffer().len(), self.inner.capacity()), + ) + .finish_non_exhaustive() + } +} diff --git a/library/std/src/io/buffered/linewritershim.rs b/library/std/src/io/buffered/linewritershim.rs new file mode 100644 index 000000000..0175d2693 --- /dev/null +++ b/library/std/src/io/buffered/linewritershim.rs @@ -0,0 +1,276 @@ +use crate::io::{self, BufWriter, IoSlice, Write}; +use crate::sys_common::memchr; + +/// Private helper struct for implementing the line-buffered writing logic. +/// This shim temporarily wraps a BufWriter, and uses its internals to +/// implement a line-buffered writer (specifically by using the internal +/// methods like write_to_buf and flush_buf). In this way, a more +/// efficient abstraction can be created than one that only had access to +/// `write` and `flush`, without needlessly duplicating a lot of the +/// implementation details of BufWriter. This also allows existing +/// `BufWriters` to be temporarily given line-buffering logic; this is what +/// enables Stdout to be alternately in line-buffered or block-buffered mode. +#[derive(Debug)] +pub struct LineWriterShim<'a, W: Write> { + buffer: &'a mut BufWriter<W>, +} + +impl<'a, W: Write> LineWriterShim<'a, W> { + pub fn new(buffer: &'a mut BufWriter<W>) -> Self { + Self { buffer } + } + + /// Get a reference to the inner writer (that is, the writer + /// wrapped by the BufWriter). + fn inner(&self) -> &W { + self.buffer.get_ref() + } + + /// Get a mutable reference to the inner writer (that is, the writer + /// wrapped by the BufWriter). Be careful with this writer, as writes to + /// it will bypass the buffer. + fn inner_mut(&mut self) -> &mut W { + self.buffer.get_mut() + } + + /// Get the content currently buffered in self.buffer + fn buffered(&self) -> &[u8] { + self.buffer.buffer() + } + + /// Flush the buffer iff the last byte is a newline (indicating that an + /// earlier write only succeeded partially, and we want to retry flushing + /// the buffered line before continuing with a subsequent write) + fn flush_if_completed_line(&mut self) -> io::Result<()> { + match self.buffered().last().copied() { + Some(b'\n') => self.buffer.flush_buf(), + _ => Ok(()), + } + } +} + +impl<'a, W: Write> Write for LineWriterShim<'a, W> { + /// Write some data into this BufReader with line buffering. This means + /// that, if any newlines are present in the data, the data up to the last + /// newline is sent directly to the underlying writer, and data after it + /// is buffered. Returns the number of bytes written. + /// + /// This function operates on a "best effort basis"; in keeping with the + /// convention of `Write::write`, it makes at most one attempt to write + /// new data to the underlying writer. If that write only reports a partial + /// success, the remaining data will be buffered. + /// + /// Because this function attempts to send completed lines to the underlying + /// writer, it will also flush the existing buffer if it ends with a + /// newline, even if the incoming data does not contain any newlines. + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + let newline_idx = match memchr::memrchr(b'\n', buf) { + // If there are no new newlines (that is, if this write is less than + // one line), just do a regular buffered write (which may flush if + // we exceed the inner buffer's size) + None => { + self.flush_if_completed_line()?; + return self.buffer.write(buf); + } + // Otherwise, arrange for the lines to be written directly to the + // inner writer. + Some(newline_idx) => newline_idx + 1, + }; + + // Flush existing content to prepare for our write. We have to do this + // before attempting to write `buf` in order to maintain consistency; + // if we add `buf` to the buffer then try to flush it all at once, + // we're obligated to return Ok(), which would mean suppressing any + // errors that occur during flush. + self.buffer.flush_buf()?; + + // This is what we're going to try to write directly to the inner + // writer. The rest will be buffered, if nothing goes wrong. + let lines = &buf[..newline_idx]; + + // Write `lines` directly to the inner writer. In keeping with the + // `write` convention, make at most one attempt to add new (unbuffered) + // data. Because this write doesn't touch the BufWriter state directly, + // and the buffer is known to be empty, we don't need to worry about + // self.buffer.panicked here. + let flushed = self.inner_mut().write(lines)?; + + // If buffer returns Ok(0), propagate that to the caller without + // doing additional buffering; otherwise we're just guaranteeing + // an "ErrorKind::WriteZero" later. + if flushed == 0 { + return Ok(0); + } + + // Now that the write has succeeded, buffer the rest (or as much of + // the rest as possible). If there were any unwritten newlines, we + // only buffer out to the last unwritten newline that fits in the + // buffer; this helps prevent flushing partial lines on subsequent + // calls to LineWriterShim::write. + + // Handle the cases in order of most-common to least-common, under + // the presumption that most writes succeed in totality, and that most + // writes are smaller than the buffer. + // - Is this a partial line (ie, no newlines left in the unwritten tail) + // - If not, does the data out to the last unwritten newline fit in + // the buffer? + // - If not, scan for the last newline that *does* fit in the buffer + let tail = if flushed >= newline_idx { + &buf[flushed..] + } else if newline_idx - flushed <= self.buffer.capacity() { + &buf[flushed..newline_idx] + } else { + let scan_area = &buf[flushed..]; + let scan_area = &scan_area[..self.buffer.capacity()]; + match memchr::memrchr(b'\n', scan_area) { + Some(newline_idx) => &scan_area[..newline_idx + 1], + None => scan_area, + } + }; + + let buffered = self.buffer.write_to_buf(tail); + Ok(flushed + buffered) + } + + fn flush(&mut self) -> io::Result<()> { + self.buffer.flush() + } + + /// Write some vectored data into this BufReader with line buffering. This + /// means that, if any newlines are present in the data, the data up to + /// and including the buffer containing the last newline is sent directly + /// to the inner writer, and the data after it is buffered. Returns the + /// number of bytes written. + /// + /// This function operates on a "best effort basis"; in keeping with the + /// convention of `Write::write`, it makes at most one attempt to write + /// new data to the underlying writer. + /// + /// Because this function attempts to send completed lines to the underlying + /// writer, it will also flush the existing buffer if it contains any + /// newlines. + /// + /// Because sorting through an array of `IoSlice` can be a bit convoluted, + /// This method differs from write in the following ways: + /// + /// - It attempts to write the full content of all the buffers up to and + /// including the one containing the last newline. This means that it + /// may attempt to write a partial line, that buffer has data past the + /// newline. + /// - If the write only reports partial success, it does not attempt to + /// find the precise location of the written bytes and buffer the rest. + /// + /// If the underlying vector doesn't support vectored writing, we instead + /// simply write the first non-empty buffer with `write`. This way, we + /// get the benefits of more granular partial-line handling without losing + /// anything in efficiency + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { + // If there's no specialized behavior for write_vectored, just use + // write. This has the benefit of more granular partial-line handling. + if !self.is_write_vectored() { + return match bufs.iter().find(|buf| !buf.is_empty()) { + Some(buf) => self.write(buf), + None => Ok(0), + }; + } + + // Find the buffer containing the last newline + let last_newline_buf_idx = bufs + .iter() + .enumerate() + .rev() + .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i)); + + // If there are no new newlines (that is, if this write is less than + // one line), just do a regular buffered write + let last_newline_buf_idx = match last_newline_buf_idx { + // No newlines; just do a normal buffered write + None => { + self.flush_if_completed_line()?; + return self.buffer.write_vectored(bufs); + } + Some(i) => i, + }; + + // Flush existing content to prepare for our write + self.buffer.flush_buf()?; + + // This is what we're going to try to write directly to the inner + // writer. The rest will be buffered, if nothing goes wrong. + let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1); + + // Write `lines` directly to the inner writer. In keeping with the + // `write` convention, make at most one attempt to add new (unbuffered) + // data. Because this write doesn't touch the BufWriter state directly, + // and the buffer is known to be empty, we don't need to worry about + // self.panicked here. + let flushed = self.inner_mut().write_vectored(lines)?; + + // If inner returns Ok(0), propagate that to the caller without + // doing additional buffering; otherwise we're just guaranteeing + // an "ErrorKind::WriteZero" later. + if flushed == 0 { + return Ok(0); + } + + // Don't try to reconstruct the exact amount written; just bail + // in the event of a partial write + let lines_len = lines.iter().map(|buf| buf.len()).sum(); + if flushed < lines_len { + return Ok(flushed); + } + + // Now that the write has succeeded, buffer the rest (or as much of the + // rest as possible) + let buffered: usize = tail + .iter() + .filter(|buf| !buf.is_empty()) + .map(|buf| self.buffer.write_to_buf(buf)) + .take_while(|&n| n > 0) + .sum(); + + Ok(flushed + buffered) + } + + fn is_write_vectored(&self) -> bool { + self.inner().is_write_vectored() + } + + /// Write some data into this BufReader with line buffering. This means + /// that, if any newlines are present in the data, the data up to the last + /// newline is sent directly to the underlying writer, and data after it + /// is buffered. + /// + /// Because this function attempts to send completed lines to the underlying + /// writer, it will also flush the existing buffer if it contains any + /// newlines, even if the incoming data does not contain any newlines. + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + match memchr::memrchr(b'\n', buf) { + // If there are no new newlines (that is, if this write is less than + // one line), just do a regular buffered write (which may flush if + // we exceed the inner buffer's size) + None => { + self.flush_if_completed_line()?; + self.buffer.write_all(buf) + } + Some(newline_idx) => { + let (lines, tail) = buf.split_at(newline_idx + 1); + + if self.buffered().is_empty() { + self.inner_mut().write_all(lines)?; + } else { + // If there is any buffered data, we add the incoming lines + // to that buffer before flushing, which saves us at least + // one write call. We can't really do this with `write`, + // since we can't do this *and* not suppress errors *and* + // report a consistent state to the caller in a return + // value, but here in write_all it's fine. + self.buffer.write_all(lines)?; + self.buffer.flush_buf()?; + } + + self.buffer.write_all(tail) + } + } + } +} diff --git a/library/std/src/io/buffered/mod.rs b/library/std/src/io/buffered/mod.rs new file mode 100644 index 000000000..100dab1e2 --- /dev/null +++ b/library/std/src/io/buffered/mod.rs @@ -0,0 +1,196 @@ +//! Buffering wrappers for I/O traits + +mod bufreader; +mod bufwriter; +mod linewriter; +mod linewritershim; + +#[cfg(test)] +mod tests; + +use crate::error; +use crate::fmt; +use crate::io::Error; + +#[stable(feature = "rust1", since = "1.0.0")] +pub use self::{bufreader::BufReader, bufwriter::BufWriter, linewriter::LineWriter}; +use linewritershim::LineWriterShim; + +#[stable(feature = "bufwriter_into_parts", since = "1.56.0")] +pub use bufwriter::WriterPanicked; + +/// An error returned by [`BufWriter::into_inner`] which combines an error that +/// happened while writing out the buffer, and the buffered writer object +/// which may be used to recover from the condition. +/// +/// # Examples +/// +/// ```no_run +/// use std::io::BufWriter; +/// use std::net::TcpStream; +/// +/// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); +/// +/// // do stuff with the stream +/// +/// // we want to get our `TcpStream` back, so let's try: +/// +/// let stream = match stream.into_inner() { +/// Ok(s) => s, +/// Err(e) => { +/// // Here, e is an IntoInnerError +/// panic!("An error occurred"); +/// } +/// }; +/// ``` +#[derive(Debug)] +#[stable(feature = "rust1", since = "1.0.0")] +pub struct IntoInnerError<W>(W, Error); + +impl<W> IntoInnerError<W> { + /// Construct a new IntoInnerError + fn new(writer: W, error: Error) -> Self { + Self(writer, error) + } + + /// Helper to construct a new IntoInnerError; intended to help with + /// adapters that wrap other adapters + fn new_wrapped<W2>(self, f: impl FnOnce(W) -> W2) -> IntoInnerError<W2> { + let Self(writer, error) = self; + IntoInnerError::new(f(writer), error) + } + + /// Returns the error which caused the call to [`BufWriter::into_inner()`] + /// to fail. + /// + /// This error was returned when attempting to write the internal buffer. + /// + /// # Examples + /// + /// ```no_run + /// use std::io::BufWriter; + /// use std::net::TcpStream; + /// + /// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// + /// // do stuff with the stream + /// + /// // we want to get our `TcpStream` back, so let's try: + /// + /// let stream = match stream.into_inner() { + /// Ok(s) => s, + /// Err(e) => { + /// // Here, e is an IntoInnerError, let's log the inner error. + /// // + /// // We'll just 'log' to stdout for this example. + /// println!("{}", e.error()); + /// + /// panic!("An unexpected error occurred."); + /// } + /// }; + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn error(&self) -> &Error { + &self.1 + } + + /// Returns the buffered writer instance which generated the error. + /// + /// The returned object can be used for error recovery, such as + /// re-inspecting the buffer. + /// + /// # Examples + /// + /// ```no_run + /// use std::io::BufWriter; + /// use std::net::TcpStream; + /// + /// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// + /// // do stuff with the stream + /// + /// // we want to get our `TcpStream` back, so let's try: + /// + /// let stream = match stream.into_inner() { + /// Ok(s) => s, + /// Err(e) => { + /// // Here, e is an IntoInnerError, let's re-examine the buffer: + /// let buffer = e.into_inner(); + /// + /// // do stuff to try to recover + /// + /// // afterwards, let's just return the stream + /// buffer.into_inner().unwrap() + /// } + /// }; + /// ``` + #[stable(feature = "rust1", since = "1.0.0")] + pub fn into_inner(self) -> W { + self.0 + } + + /// Consumes the [`IntoInnerError`] and returns the error which caused the call to + /// [`BufWriter::into_inner()`] to fail. Unlike `error`, this can be used to + /// obtain ownership of the underlying error. + /// + /// # Example + /// ``` + /// use std::io::{BufWriter, ErrorKind, Write}; + /// + /// let mut not_enough_space = [0u8; 10]; + /// let mut stream = BufWriter::new(not_enough_space.as_mut()); + /// write!(stream, "this cannot be actually written").unwrap(); + /// let into_inner_err = stream.into_inner().expect_err("now we discover it's too small"); + /// let err = into_inner_err.into_error(); + /// assert_eq!(err.kind(), ErrorKind::WriteZero); + /// ``` + #[stable(feature = "io_into_inner_error_parts", since = "1.55.0")] + pub fn into_error(self) -> Error { + self.1 + } + + /// Consumes the [`IntoInnerError`] and returns the error which caused the call to + /// [`BufWriter::into_inner()`] to fail, and the underlying writer. + /// + /// This can be used to simply obtain ownership of the underlying error; it can also be used for + /// advanced error recovery. + /// + /// # Example + /// ``` + /// use std::io::{BufWriter, ErrorKind, Write}; + /// + /// let mut not_enough_space = [0u8; 10]; + /// let mut stream = BufWriter::new(not_enough_space.as_mut()); + /// write!(stream, "this cannot be actually written").unwrap(); + /// let into_inner_err = stream.into_inner().expect_err("now we discover it's too small"); + /// let (err, recovered_writer) = into_inner_err.into_parts(); + /// assert_eq!(err.kind(), ErrorKind::WriteZero); + /// assert_eq!(recovered_writer.buffer(), b"t be actually written"); + /// ``` + #[stable(feature = "io_into_inner_error_parts", since = "1.55.0")] + pub fn into_parts(self) -> (Error, W) { + (self.1, self.0) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<W> From<IntoInnerError<W>> for Error { + fn from(iie: IntoInnerError<W>) -> Error { + iie.1 + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<W: Send + fmt::Debug> error::Error for IntoInnerError<W> { + #[allow(deprecated, deprecated_in_future)] + fn description(&self) -> &str { + error::Error::description(self.error()) + } +} + +#[stable(feature = "rust1", since = "1.0.0")] +impl<W> fmt::Display for IntoInnerError<W> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.error().fmt(f) + } +} diff --git a/library/std/src/io/buffered/tests.rs b/library/std/src/io/buffered/tests.rs new file mode 100644 index 000000000..fe45b1326 --- /dev/null +++ b/library/std/src/io/buffered/tests.rs @@ -0,0 +1,1039 @@ +use crate::io::prelude::*; +use crate::io::{self, BufReader, BufWriter, ErrorKind, IoSlice, LineWriter, ReadBuf, SeekFrom}; +use crate::mem::MaybeUninit; +use crate::panic; +use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::thread; + +/// A dummy reader intended at testing short-reads propagation. +pub struct ShortReader { + lengths: Vec<usize>, +} + +// FIXME: rustfmt and tidy disagree about the correct formatting of this +// function. This leads to issues for users with editors configured to +// rustfmt-on-save. +impl Read for ShortReader { + fn read(&mut self, _: &mut [u8]) -> io::Result<usize> { + if self.lengths.is_empty() { Ok(0) } else { Ok(self.lengths.remove(0)) } + } +} + +#[test] +fn test_buffered_reader() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, inner); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf); + assert_eq!(nread.unwrap(), 3); + assert_eq!(buf, [5, 6, 7]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0, 0]; + let nread = reader.read(&mut buf); + assert_eq!(nread.unwrap(), 2); + assert_eq!(buf, [0, 1]); + assert_eq!(reader.buffer(), []); + + let mut buf = [0]; + let nread = reader.read(&mut buf); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [2]); + assert_eq!(reader.buffer(), [3]); + + let mut buf = [0, 0, 0]; + let nread = reader.read(&mut buf); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [3, 0, 0]); + assert_eq!(reader.buffer(), []); + + let nread = reader.read(&mut buf); + assert_eq!(nread.unwrap(), 1); + assert_eq!(buf, [4, 0, 0]); + assert_eq!(reader.buffer(), []); + + assert_eq!(reader.read(&mut buf).unwrap(), 0); +} + +#[test] +fn test_buffered_reader_read_buf() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, inner); + + let mut buf = [MaybeUninit::uninit(); 3]; + let mut buf = ReadBuf::uninit(&mut buf); + + reader.read_buf(&mut buf).unwrap(); + + assert_eq!(buf.filled(), [5, 6, 7]); + assert_eq!(reader.buffer(), []); + + let mut buf = [MaybeUninit::uninit(); 2]; + let mut buf = ReadBuf::uninit(&mut buf); + + reader.read_buf(&mut buf).unwrap(); + + assert_eq!(buf.filled(), [0, 1]); + assert_eq!(reader.buffer(), []); + + let mut buf = [MaybeUninit::uninit(); 1]; + let mut buf = ReadBuf::uninit(&mut buf); + + reader.read_buf(&mut buf).unwrap(); + + assert_eq!(buf.filled(), [2]); + assert_eq!(reader.buffer(), [3]); + + let mut buf = [MaybeUninit::uninit(); 3]; + let mut buf = ReadBuf::uninit(&mut buf); + + reader.read_buf(&mut buf).unwrap(); + + assert_eq!(buf.filled(), [3]); + assert_eq!(reader.buffer(), []); + + reader.read_buf(&mut buf).unwrap(); + + assert_eq!(buf.filled(), [3, 4]); + assert_eq!(reader.buffer(), []); + + buf.clear(); + + reader.read_buf(&mut buf).unwrap(); + + assert_eq!(buf.filled_len(), 0); +} + +#[test] +fn test_buffered_reader_seek() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, io::Cursor::new(inner)); + + assert_eq!(reader.seek(SeekFrom::Start(3)).ok(), Some(3)); + assert_eq!(reader.fill_buf().ok(), Some(&[0, 1][..])); + assert_eq!(reader.seek(SeekFrom::Current(0)).ok(), Some(3)); + assert_eq!(reader.fill_buf().ok(), Some(&[0, 1][..])); + assert_eq!(reader.seek(SeekFrom::Current(1)).ok(), Some(4)); + assert_eq!(reader.fill_buf().ok(), Some(&[1, 2][..])); + reader.consume(1); + assert_eq!(reader.seek(SeekFrom::Current(-2)).ok(), Some(3)); +} + +#[test] +fn test_buffered_reader_seek_relative() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, io::Cursor::new(inner)); + + assert!(reader.seek_relative(3).is_ok()); + assert_eq!(reader.fill_buf().ok(), Some(&[0, 1][..])); + assert!(reader.seek_relative(0).is_ok()); + assert_eq!(reader.fill_buf().ok(), Some(&[0, 1][..])); + assert!(reader.seek_relative(1).is_ok()); + assert_eq!(reader.fill_buf().ok(), Some(&[1][..])); + assert!(reader.seek_relative(-1).is_ok()); + assert_eq!(reader.fill_buf().ok(), Some(&[0, 1][..])); + assert!(reader.seek_relative(2).is_ok()); + assert_eq!(reader.fill_buf().ok(), Some(&[2, 3][..])); +} + +#[test] +fn test_buffered_reader_stream_position() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(2, io::Cursor::new(inner)); + + assert_eq!(reader.stream_position().ok(), Some(0)); + assert_eq!(reader.seek(SeekFrom::Start(3)).ok(), Some(3)); + assert_eq!(reader.stream_position().ok(), Some(3)); + // relative seeking within the buffer and reading position should keep the buffer + assert_eq!(reader.fill_buf().ok(), Some(&[0, 1][..])); + assert!(reader.seek_relative(0).is_ok()); + assert_eq!(reader.stream_position().ok(), Some(3)); + assert_eq!(reader.buffer(), &[0, 1][..]); + assert!(reader.seek_relative(1).is_ok()); + assert_eq!(reader.stream_position().ok(), Some(4)); + assert_eq!(reader.buffer(), &[1][..]); + assert!(reader.seek_relative(-1).is_ok()); + assert_eq!(reader.stream_position().ok(), Some(3)); + assert_eq!(reader.buffer(), &[0, 1][..]); + // relative seeking outside the buffer will discard it + assert!(reader.seek_relative(2).is_ok()); + assert_eq!(reader.stream_position().ok(), Some(5)); + assert_eq!(reader.buffer(), &[][..]); +} + +#[test] +fn test_buffered_reader_stream_position_panic() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(4, io::Cursor::new(inner)); + + // cause internal buffer to be filled but read only partially + let mut buffer = [0, 0]; + assert!(reader.read_exact(&mut buffer).is_ok()); + // rewinding the internal reader will cause buffer to loose sync + let inner = reader.get_mut(); + assert!(inner.seek(SeekFrom::Start(0)).is_ok()); + // overflow when subtracting the remaining buffer size from current position + let result = panic::catch_unwind(panic::AssertUnwindSafe(|| reader.stream_position().ok())); + assert!(result.is_err()); +} + +#[test] +fn test_buffered_reader_invalidated_after_read() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(3, io::Cursor::new(inner)); + + assert_eq!(reader.fill_buf().ok(), Some(&[5, 6, 7][..])); + reader.consume(3); + + let mut buffer = [0, 0, 0, 0, 0]; + assert_eq!(reader.read(&mut buffer).ok(), Some(5)); + assert_eq!(buffer, [0, 1, 2, 3, 4]); + + assert!(reader.seek_relative(-2).is_ok()); + let mut buffer = [0, 0]; + assert_eq!(reader.read(&mut buffer).ok(), Some(2)); + assert_eq!(buffer, [3, 4]); +} + +#[test] +fn test_buffered_reader_invalidated_after_seek() { + let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; + let mut reader = BufReader::with_capacity(3, io::Cursor::new(inner)); + + assert_eq!(reader.fill_buf().ok(), Some(&[5, 6, 7][..])); + reader.consume(3); + + assert!(reader.seek(SeekFrom::Current(5)).is_ok()); + + assert!(reader.seek_relative(-2).is_ok()); + let mut buffer = [0, 0]; + assert_eq!(reader.read(&mut buffer).ok(), Some(2)); + assert_eq!(buffer, [3, 4]); +} + +#[test] +fn test_buffered_reader_seek_underflow() { + // gimmick reader that yields its position modulo 256 for each byte + struct PositionReader { + pos: u64, + } + impl Read for PositionReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + let len = buf.len(); + for x in buf { + *x = self.pos as u8; + self.pos = self.pos.wrapping_add(1); + } + Ok(len) + } + } + impl Seek for PositionReader { + fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> { + match pos { + SeekFrom::Start(n) => { + self.pos = n; + } + SeekFrom::Current(n) => { + self.pos = self.pos.wrapping_add(n as u64); + } + SeekFrom::End(n) => { + self.pos = u64::MAX.wrapping_add(n as u64); + } + } + Ok(self.pos) + } + } + + let mut reader = BufReader::with_capacity(5, PositionReader { pos: 0 }); + assert_eq!(reader.fill_buf().ok(), Some(&[0, 1, 2, 3, 4][..])); + assert_eq!(reader.seek(SeekFrom::End(-5)).ok(), Some(u64::MAX - 5)); + assert_eq!(reader.fill_buf().ok().map(|s| s.len()), Some(5)); + // the following seek will require two underlying seeks + let expected = 9223372036854775802; + assert_eq!(reader.seek(SeekFrom::Current(i64::MIN)).ok(), Some(expected)); + assert_eq!(reader.fill_buf().ok().map(|s| s.len()), Some(5)); + // seeking to 0 should empty the buffer. + assert_eq!(reader.seek(SeekFrom::Current(0)).ok(), Some(expected)); + assert_eq!(reader.get_ref().pos, expected); +} + +#[test] +fn test_buffered_reader_seek_underflow_discard_buffer_between_seeks() { + // gimmick reader that returns Err after first seek + struct ErrAfterFirstSeekReader { + first_seek: bool, + } + impl Read for ErrAfterFirstSeekReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + for x in &mut *buf { + *x = 0; + } + Ok(buf.len()) + } + } + impl Seek for ErrAfterFirstSeekReader { + fn seek(&mut self, _: SeekFrom) -> io::Result<u64> { + if self.first_seek { + self.first_seek = false; + Ok(0) + } else { + Err(io::Error::new(io::ErrorKind::Other, "oh no!")) + } + } + } + + let mut reader = BufReader::with_capacity(5, ErrAfterFirstSeekReader { first_seek: true }); + assert_eq!(reader.fill_buf().ok(), Some(&[0, 0, 0, 0, 0][..])); + + // The following seek will require two underlying seeks. The first will + // succeed but the second will fail. This should still invalidate the + // buffer. + assert!(reader.seek(SeekFrom::Current(i64::MIN)).is_err()); + assert_eq!(reader.buffer().len(), 0); +} + +#[test] +fn test_buffered_reader_read_to_end_consumes_buffer() { + let data: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7]; + let mut reader = BufReader::with_capacity(3, data); + let mut buf = Vec::new(); + assert_eq!(reader.fill_buf().ok(), Some(&[0, 1, 2][..])); + assert_eq!(reader.read_to_end(&mut buf).ok(), Some(8)); + assert_eq!(&buf, &[0, 1, 2, 3, 4, 5, 6, 7]); + assert!(reader.buffer().is_empty()); +} + +#[test] +fn test_buffered_reader_read_to_string_consumes_buffer() { + let data: &[u8] = "deadbeef".as_bytes(); + let mut reader = BufReader::with_capacity(3, data); + let mut buf = String::new(); + assert_eq!(reader.fill_buf().ok(), Some("dea".as_bytes())); + assert_eq!(reader.read_to_string(&mut buf).ok(), Some(8)); + assert_eq!(&buf, "deadbeef"); + assert!(reader.buffer().is_empty()); +} + +#[test] +fn test_buffered_writer() { + let inner = Vec::new(); + let mut writer = BufWriter::with_capacity(2, inner); + + writer.write(&[0, 1]).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1]); + + writer.write(&[2]).unwrap(); + assert_eq!(writer.buffer(), [2]); + assert_eq!(*writer.get_ref(), [0, 1]); + + writer.write(&[3]).unwrap(); + assert_eq!(writer.buffer(), [2, 3]); + assert_eq!(*writer.get_ref(), [0, 1]); + + writer.flush().unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); + + writer.write(&[4]).unwrap(); + writer.write(&[5]).unwrap(); + assert_eq!(writer.buffer(), [4, 5]); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); + + writer.write(&[6]).unwrap(); + assert_eq!(writer.buffer(), [6]); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]); + + writer.write(&[7, 8]).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8]); + + writer.write(&[9, 10, 11]).unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + + writer.flush().unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); +} + +#[test] +fn test_buffered_writer_inner_flushes() { + let mut w = BufWriter::with_capacity(3, Vec::new()); + w.write(&[0, 1]).unwrap(); + assert_eq!(*w.get_ref(), []); + let w = w.into_inner().unwrap(); + assert_eq!(w, [0, 1]); +} + +#[test] +fn test_buffered_writer_seek() { + let mut w = BufWriter::with_capacity(3, io::Cursor::new(Vec::new())); + w.write_all(&[0, 1, 2, 3, 4, 5]).unwrap(); + w.write_all(&[6, 7]).unwrap(); + assert_eq!(w.seek(SeekFrom::Current(0)).ok(), Some(8)); + assert_eq!(&w.get_ref().get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]); + assert_eq!(w.seek(SeekFrom::Start(2)).ok(), Some(2)); + w.write_all(&[8, 9]).unwrap(); + assert_eq!(&w.into_inner().unwrap().into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]); +} + +#[test] +fn test_read_until() { + let inner: &[u8] = &[0, 1, 2, 1, 0]; + let mut reader = BufReader::with_capacity(2, inner); + let mut v = Vec::new(); + reader.read_until(0, &mut v).unwrap(); + assert_eq!(v, [0]); + v.truncate(0); + reader.read_until(2, &mut v).unwrap(); + assert_eq!(v, [1, 2]); + v.truncate(0); + reader.read_until(1, &mut v).unwrap(); + assert_eq!(v, [1]); + v.truncate(0); + reader.read_until(8, &mut v).unwrap(); + assert_eq!(v, [0]); + v.truncate(0); + reader.read_until(9, &mut v).unwrap(); + assert_eq!(v, []); +} + +#[test] +fn test_line_buffer() { + let mut writer = LineWriter::new(Vec::new()); + writer.write(&[0]).unwrap(); + assert_eq!(*writer.get_ref(), []); + writer.write(&[1]).unwrap(); + assert_eq!(*writer.get_ref(), []); + writer.flush().unwrap(); + assert_eq!(*writer.get_ref(), [0, 1]); + writer.write(&[0, b'\n', 1, b'\n', 2]).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n']); + writer.flush().unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2]); + writer.write(&[3, b'\n']).unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']); +} + +#[test] +fn test_read_line() { + let in_buf: &[u8] = b"a\nb\nc"; + let mut reader = BufReader::with_capacity(2, in_buf); + let mut s = String::new(); + reader.read_line(&mut s).unwrap(); + assert_eq!(s, "a\n"); + s.truncate(0); + reader.read_line(&mut s).unwrap(); + assert_eq!(s, "b\n"); + s.truncate(0); + reader.read_line(&mut s).unwrap(); + assert_eq!(s, "c"); + s.truncate(0); + reader.read_line(&mut s).unwrap(); + assert_eq!(s, ""); +} + +#[test] +fn test_lines() { + let in_buf: &[u8] = b"a\nb\nc"; + let reader = BufReader::with_capacity(2, in_buf); + let mut it = reader.lines(); + assert_eq!(it.next().unwrap().unwrap(), "a".to_string()); + assert_eq!(it.next().unwrap().unwrap(), "b".to_string()); + assert_eq!(it.next().unwrap().unwrap(), "c".to_string()); + assert!(it.next().is_none()); +} + +#[test] +fn test_short_reads() { + let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] }; + let mut reader = BufReader::new(inner); + let mut buf = [0, 0]; + assert_eq!(reader.read(&mut buf).unwrap(), 0); + assert_eq!(reader.read(&mut buf).unwrap(), 1); + assert_eq!(reader.read(&mut buf).unwrap(), 2); + assert_eq!(reader.read(&mut buf).unwrap(), 0); + assert_eq!(reader.read(&mut buf).unwrap(), 1); + assert_eq!(reader.read(&mut buf).unwrap(), 0); + assert_eq!(reader.read(&mut buf).unwrap(), 0); +} + +#[test] +#[should_panic] +fn dont_panic_in_drop_on_panicked_flush() { + struct FailFlushWriter; + + impl Write for FailFlushWriter { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + Ok(buf.len()) + } + fn flush(&mut self) -> io::Result<()> { + Err(io::Error::last_os_error()) + } + } + + let writer = FailFlushWriter; + let _writer = BufWriter::new(writer); + + // If writer panics *again* due to the flush error then the process will + // abort. + panic!(); +} + +#[test] +#[cfg_attr(target_os = "emscripten", ignore)] +fn panic_in_write_doesnt_flush_in_drop() { + static WRITES: AtomicUsize = AtomicUsize::new(0); + + struct PanicWriter; + + impl Write for PanicWriter { + fn write(&mut self, _: &[u8]) -> io::Result<usize> { + WRITES.fetch_add(1, Ordering::SeqCst); + panic!(); + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + thread::spawn(|| { + let mut writer = BufWriter::new(PanicWriter); + let _ = writer.write(b"hello world"); + let _ = writer.flush(); + }) + .join() + .unwrap_err(); + + assert_eq!(WRITES.load(Ordering::SeqCst), 1); +} + +#[bench] +fn bench_buffered_reader(b: &mut test::Bencher) { + b.iter(|| BufReader::new(io::empty())); +} + +#[bench] +fn bench_buffered_reader_small_reads(b: &mut test::Bencher) { + let data = (0..u8::MAX).cycle().take(1024 * 4).collect::<Vec<_>>(); + b.iter(|| { + let mut reader = BufReader::new(&data[..]); + let mut buf = [0u8; 4]; + for _ in 0..1024 { + reader.read_exact(&mut buf).unwrap(); + core::hint::black_box(&buf); + } + }); +} + +#[bench] +fn bench_buffered_writer(b: &mut test::Bencher) { + b.iter(|| BufWriter::new(io::sink())); +} + +/// A simple `Write` target, designed to be wrapped by `LineWriter` / +/// `BufWriter` / etc, that can have its `write` & `flush` behavior +/// configured +#[derive(Default, Clone)] +struct ProgrammableSink { + // Writes append to this slice + pub buffer: Vec<u8>, + + // If true, writes will always be an error + pub always_write_error: bool, + + // If true, flushes will always be an error + pub always_flush_error: bool, + + // If set, only up to this number of bytes will be written in a single + // call to `write` + pub accept_prefix: Option<usize>, + + // If set, counts down with each write, and writes return an error + // when it hits 0 + pub max_writes: Option<usize>, + + // If set, attempting to write when max_writes == Some(0) will be an + // error; otherwise, it will return Ok(0). + pub error_after_max_writes: bool, +} + +impl Write for ProgrammableSink { + fn write(&mut self, data: &[u8]) -> io::Result<usize> { + if self.always_write_error { + return Err(io::Error::new(io::ErrorKind::Other, "test - always_write_error")); + } + + match self.max_writes { + Some(0) if self.error_after_max_writes => { + return Err(io::Error::new(io::ErrorKind::Other, "test - max_writes")); + } + Some(0) => return Ok(0), + Some(ref mut count) => *count -= 1, + None => {} + } + + let len = match self.accept_prefix { + None => data.len(), + Some(prefix) => data.len().min(prefix), + }; + + let data = &data[..len]; + self.buffer.extend_from_slice(data); + + Ok(len) + } + + fn flush(&mut self) -> io::Result<()> { + if self.always_flush_error { + Err(io::Error::new(io::ErrorKind::Other, "test - always_flush_error")) + } else { + Ok(()) + } + } +} + +/// Previously the `LineWriter` could successfully write some bytes but +/// then fail to report that it has done so. Additionally, an erroneous +/// flush after a successful write was permanently ignored. +/// +/// Test that a line writer correctly reports the number of written bytes, +/// and that it attempts to flush buffered lines from previous writes +/// before processing new data +/// +/// Regression test for #37807 +#[test] +fn erroneous_flush_retried() { + let writer = ProgrammableSink { + // Only write up to 4 bytes at a time + accept_prefix: Some(4), + + // Accept the first two writes, then error the others + max_writes: Some(2), + error_after_max_writes: true, + + ..Default::default() + }; + + // This should write the first 4 bytes. The rest will be buffered, out + // to the last newline. + let mut writer = LineWriter::new(writer); + assert_eq!(writer.write(b"a\nb\nc\nd\ne").unwrap(), 8); + + // This write should attempt to flush "c\nd\n", then buffer "e". No + // errors should happen here because no further writes should be + // attempted against `writer`. + assert_eq!(writer.write(b"e").unwrap(), 1); + assert_eq!(&writer.get_ref().buffer, b"a\nb\nc\nd\n"); +} + +#[test] +fn line_vectored() { + let mut a = LineWriter::new(Vec::new()); + assert_eq!( + a.write_vectored(&[ + IoSlice::new(&[]), + IoSlice::new(b"\n"), + IoSlice::new(&[]), + IoSlice::new(b"a"), + ]) + .unwrap(), + 2, + ); + assert_eq!(a.get_ref(), b"\n"); + + assert_eq!( + a.write_vectored(&[ + IoSlice::new(&[]), + IoSlice::new(b"b"), + IoSlice::new(&[]), + IoSlice::new(b"a"), + IoSlice::new(&[]), + IoSlice::new(b"c"), + ]) + .unwrap(), + 3, + ); + assert_eq!(a.get_ref(), b"\n"); + a.flush().unwrap(); + assert_eq!(a.get_ref(), b"\nabac"); + assert_eq!(a.write_vectored(&[]).unwrap(), 0); + assert_eq!( + a.write_vectored(&[ + IoSlice::new(&[]), + IoSlice::new(&[]), + IoSlice::new(&[]), + IoSlice::new(&[]), + ]) + .unwrap(), + 0, + ); + assert_eq!(a.write_vectored(&[IoSlice::new(b"a\nb"),]).unwrap(), 3); + assert_eq!(a.get_ref(), b"\nabaca\nb"); +} + +#[test] +fn line_vectored_partial_and_errors() { + use crate::collections::VecDeque; + + enum Call { + Write { inputs: Vec<&'static [u8]>, output: io::Result<usize> }, + Flush { output: io::Result<()> }, + } + + #[derive(Default)] + struct Writer { + calls: VecDeque<Call>, + } + + impl Write for Writer { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.write_vectored(&[IoSlice::new(buf)]) + } + + fn write_vectored(&mut self, buf: &[IoSlice<'_>]) -> io::Result<usize> { + match self.calls.pop_front().expect("unexpected call to write") { + Call::Write { inputs, output } => { + assert_eq!(inputs, buf.iter().map(|b| &**b).collect::<Vec<_>>()); + output + } + Call::Flush { .. } => panic!("unexpected call to write; expected a flush"), + } + } + + fn is_write_vectored(&self) -> bool { + true + } + + fn flush(&mut self) -> io::Result<()> { + match self.calls.pop_front().expect("Unexpected call to flush") { + Call::Flush { output } => output, + Call::Write { .. } => panic!("unexpected call to flush; expected a write"), + } + } + } + + impl Drop for Writer { + fn drop(&mut self) { + if !thread::panicking() { + assert_eq!(self.calls.len(), 0); + } + } + } + + // partial writes keep going + let mut a = LineWriter::new(Writer::default()); + a.write_vectored(&[IoSlice::new(&[]), IoSlice::new(b"abc")]).unwrap(); + + a.get_mut().calls.push_back(Call::Write { inputs: vec![b"abc"], output: Ok(1) }); + a.get_mut().calls.push_back(Call::Write { inputs: vec![b"bc"], output: Ok(2) }); + a.get_mut().calls.push_back(Call::Write { inputs: vec![b"x", b"\n"], output: Ok(2) }); + + a.write_vectored(&[IoSlice::new(b"x"), IoSlice::new(b"\n")]).unwrap(); + + a.get_mut().calls.push_back(Call::Flush { output: Ok(()) }); + a.flush().unwrap(); + + // erroneous writes stop and don't write more + a.get_mut().calls.push_back(Call::Write { inputs: vec![b"x", b"\na"], output: Err(err()) }); + a.get_mut().calls.push_back(Call::Flush { output: Ok(()) }); + assert!(a.write_vectored(&[IoSlice::new(b"x"), IoSlice::new(b"\na")]).is_err()); + a.flush().unwrap(); + + fn err() -> io::Error { + io::Error::new(io::ErrorKind::Other, "x") + } +} + +/// Test that, in cases where vectored writing is not enabled, the +/// LineWriter uses the normal `write` call, which more-correctly handles +/// partial lines +#[test] +fn line_vectored_ignored() { + let writer = ProgrammableSink::default(); + let mut writer = LineWriter::new(writer); + + let content = [ + IoSlice::new(&[]), + IoSlice::new(b"Line 1\nLine"), + IoSlice::new(b" 2\nLine 3\nL"), + IoSlice::new(&[]), + IoSlice::new(&[]), + IoSlice::new(b"ine 4"), + IoSlice::new(b"\nLine 5\n"), + ]; + + let count = writer.write_vectored(&content).unwrap(); + assert_eq!(count, 11); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); + + let count = writer.write_vectored(&content[2..]).unwrap(); + assert_eq!(count, 11); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\n"); + + let count = writer.write_vectored(&content[5..]).unwrap(); + assert_eq!(count, 5); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\n"); + + let count = writer.write_vectored(&content[6..]).unwrap(); + assert_eq!(count, 8); + assert_eq!( + writer.get_ref().buffer.as_slice(), + b"Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n".as_ref() + ); +} + +/// Test that, given this input: +/// +/// Line 1\n +/// Line 2\n +/// Line 3\n +/// Line 4 +/// +/// And given a result that only writes to midway through Line 2 +/// +/// That only up to the end of Line 3 is buffered +/// +/// This behavior is desirable because it prevents flushing partial lines +#[test] +fn partial_write_buffers_line() { + let writer = ProgrammableSink { accept_prefix: Some(13), ..Default::default() }; + let mut writer = LineWriter::new(writer); + + assert_eq!(writer.write(b"Line 1\nLine 2\nLine 3\nLine4").unwrap(), 21); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2"); + + assert_eq!(writer.write(b"Line 4").unwrap(), 6); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\n"); +} + +/// Test that, given this input: +/// +/// Line 1\n +/// Line 2\n +/// Line 3 +/// +/// And given that the full write of lines 1 and 2 was successful +/// That data up to Line 3 is buffered +#[test] +fn partial_line_buffered_after_line_write() { + let writer = ProgrammableSink::default(); + let mut writer = LineWriter::new(writer); + + assert_eq!(writer.write(b"Line 1\nLine 2\nLine 3").unwrap(), 20); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\n"); + + assert!(writer.flush().is_ok()); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3"); +} + +/// Test that, given a partial line that exceeds the length of +/// LineBuffer's buffer (that is, without a trailing newline), that that +/// line is written to the inner writer +#[test] +fn long_line_flushed() { + let writer = ProgrammableSink::default(); + let mut writer = LineWriter::with_capacity(5, writer); + + assert_eq!(writer.write(b"0123456789").unwrap(), 10); + assert_eq!(&writer.get_ref().buffer, b"0123456789"); +} + +/// Test that, given a very long partial line *after* successfully +/// flushing a complete line, that that line is buffered unconditionally, +/// and no additional writes take place. This assures the property that +/// `write` should make at-most-one attempt to write new data. +#[test] +fn line_long_tail_not_flushed() { + let writer = ProgrammableSink::default(); + let mut writer = LineWriter::with_capacity(5, writer); + + // Assert that Line 1\n is flushed, and 01234 is buffered + assert_eq!(writer.write(b"Line 1\n0123456789").unwrap(), 12); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); + + // Because the buffer is full, this subsequent write will flush it + assert_eq!(writer.write(b"5").unwrap(), 1); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n01234"); +} + +/// Test that, if an attempt to pre-flush buffered data returns Ok(0), +/// this is propagated as an error. +#[test] +fn line_buffer_write0_error() { + let writer = ProgrammableSink { + // Accept one write, then return Ok(0) on subsequent ones + max_writes: Some(1), + + ..Default::default() + }; + let mut writer = LineWriter::new(writer); + + // This should write "Line 1\n" and buffer "Partial" + assert_eq!(writer.write(b"Line 1\nPartial").unwrap(), 14); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); + + // This will attempt to flush "partial", which will return Ok(0), which + // needs to be an error, because we've already informed the client + // that we accepted the write. + let err = writer.write(b" Line End\n").unwrap_err(); + assert_eq!(err.kind(), ErrorKind::WriteZero); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); +} + +/// Test that, if a write returns Ok(0) after a successful pre-flush, this +/// is propagated as Ok(0) +#[test] +fn line_buffer_write0_normal() { + let writer = ProgrammableSink { + // Accept two writes, then return Ok(0) on subsequent ones + max_writes: Some(2), + + ..Default::default() + }; + let mut writer = LineWriter::new(writer); + + // This should write "Line 1\n" and buffer "Partial" + assert_eq!(writer.write(b"Line 1\nPartial").unwrap(), 14); + assert_eq!(&writer.get_ref().buffer, b"Line 1\n"); + + // This will flush partial, which will succeed, but then return Ok(0) + // when flushing " Line End\n" + assert_eq!(writer.write(b" Line End\n").unwrap(), 0); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nPartial"); +} + +/// LineWriter has a custom `write_all`; make sure it works correctly +#[test] +fn line_write_all() { + let writer = ProgrammableSink { + // Only write 5 bytes at a time + accept_prefix: Some(5), + ..Default::default() + }; + let mut writer = LineWriter::new(writer); + + writer.write_all(b"Line 1\nLine 2\nLine 3\nLine 4\nPartial").unwrap(); + assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\nLine 4\n"); + writer.write_all(b" Line 5\n").unwrap(); + assert_eq!( + writer.get_ref().buffer.as_slice(), + b"Line 1\nLine 2\nLine 3\nLine 4\nPartial Line 5\n".as_ref(), + ); +} + +#[test] +fn line_write_all_error() { + let writer = ProgrammableSink { + // Only accept up to 3 writes of up to 5 bytes each + accept_prefix: Some(5), + max_writes: Some(3), + ..Default::default() + }; + + let mut writer = LineWriter::new(writer); + let res = writer.write_all(b"Line 1\nLine 2\nLine 3\nLine 4\nPartial"); + assert!(res.is_err()); + // An error from write_all leaves everything in an indeterminate state, + // so there's nothing else to test here +} + +/// Under certain circumstances, the old implementation of LineWriter +/// would try to buffer "to the last newline" but be forced to buffer +/// less than that, leading to inappropriate partial line writes. +/// Regression test for that issue. +#[test] +fn partial_multiline_buffering() { + let writer = ProgrammableSink { + // Write only up to 5 bytes at a time + accept_prefix: Some(5), + ..Default::default() + }; + + let mut writer = LineWriter::with_capacity(10, writer); + + let content = b"AAAAABBBBB\nCCCCDDDDDD\nEEE"; + + // When content is written, LineWriter will try to write blocks A, B, + // C, and D. Only block A will succeed. Under the old behavior, LineWriter + // would then try to buffer B, C and D, but because its capacity is 10, + // it will only be able to buffer B and C. We don't want to buffer + // partial lines concurrent with whole lines, so the correct behavior + // is to buffer only block B (out to the newline) + assert_eq!(writer.write(content).unwrap(), 11); + assert_eq!(writer.get_ref().buffer, *b"AAAAA"); + + writer.flush().unwrap(); + assert_eq!(writer.get_ref().buffer, *b"AAAAABBBBB\n"); +} + +/// Same as test_partial_multiline_buffering, but in the event NO full lines +/// fit in the buffer, just buffer as much as possible +#[test] +fn partial_multiline_buffering_without_full_line() { + let writer = ProgrammableSink { + // Write only up to 5 bytes at a time + accept_prefix: Some(5), + ..Default::default() + }; + + let mut writer = LineWriter::with_capacity(5, writer); + + let content = b"AAAAABBBBBBBBBB\nCCCCC\nDDDDD"; + + // When content is written, LineWriter will try to write blocks A, B, + // and C. Only block A will succeed. Under the old behavior, LineWriter + // would then try to buffer B and C, but because its capacity is 5, + // it will only be able to buffer part of B. Because it's not possible + // for it to buffer any complete lines, it should buffer as much of B as + // possible + assert_eq!(writer.write(content).unwrap(), 10); + assert_eq!(writer.get_ref().buffer, *b"AAAAA"); + + writer.flush().unwrap(); + assert_eq!(writer.get_ref().buffer, *b"AAAAABBBBB"); +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum RecordedEvent { + Write(String), + Flush, +} + +#[derive(Debug, Clone, Default)] +struct WriteRecorder { + pub events: Vec<RecordedEvent>, +} + +impl Write for WriteRecorder { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + use crate::str::from_utf8; + + self.events.push(RecordedEvent::Write(from_utf8(buf).unwrap().to_string())); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + self.events.push(RecordedEvent::Flush); + Ok(()) + } +} + +/// Test that a normal, formatted writeln only results in a single write +/// call to the underlying writer. A naive implementation of +/// LineWriter::write_all results in two writes: one of the buffered data, +/// and another of the final substring in the formatted set +#[test] +fn single_formatted_write() { + let writer = WriteRecorder::default(); + let mut writer = LineWriter::new(writer); + + // Under a naive implementation of LineWriter, this will result in two + // writes: "hello, world" and "!\n", because write() has to flush the + // buffer before attempting to write the last "!\n". write_all shouldn't + // have this limitation. + writeln!(&mut writer, "{}, {}!", "hello", "world").unwrap(); + assert_eq!(writer.get_ref().events, [RecordedEvent::Write("hello, world!\n".to_string())]); +} |