diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:02:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-17 12:02:58 +0000 |
commit | 698f8c2f01ea549d77d7dc3338a12e04c11057b9 (patch) | |
tree | 173a775858bd501c378080a10dca74132f05bc50 /vendor/snap/src/read.rs | |
parent | Initial commit. (diff) | |
download | rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.tar.xz rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.zip |
Adding upstream version 1.64.0+dfsg1.upstream/1.64.0+dfsg1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/snap/src/read.rs')
-rw-r--r-- | vendor/snap/src/read.rs | 450 |
1 files changed, 450 insertions, 0 deletions
diff --git a/vendor/snap/src/read.rs b/vendor/snap/src/read.rs new file mode 100644 index 000000000..a924bf91d --- /dev/null +++ b/vendor/snap/src/read.rs @@ -0,0 +1,450 @@ +/*! +This module provides two `std::io::Read` implementations: + +* [`read::FrameDecoder`](struct.FrameDecoder.html) + wraps another `std::io::Read` implemenation, and decompresses data encoded + using the Snappy frame format. Use this if you have a compressed data source + and wish to read it as uncompressed data. +* [`read::FrameEncoder`](struct.FrameEncoder.html) + wraps another `std::io::Read` implemenation, and compresses data encoded + using the Snappy frame format. Use this if you have uncompressed data source + and wish to read it as compressed data. + +Typically, `read::FrameDecoder` is the version that you'll want. +*/ + +use std::cmp; +use std::fmt; +use std::io; + +use crate::bytes; +use crate::compress::Encoder; +use crate::crc32::CheckSummer; +use crate::decompress::{decompress_len, Decoder}; +use crate::error::Error; +use crate::frame::{ + compress_frame, ChunkType, CHUNK_HEADER_AND_CRC_SIZE, + MAX_COMPRESS_BLOCK_SIZE, STREAM_BODY, STREAM_IDENTIFIER, +}; +use crate::MAX_BLOCK_SIZE; + +/// The maximum size of a compressed block, including the header and stream +/// identifier, that can be emitted by FrameEncoder. +const MAX_READ_FRAME_ENCODER_BLOCK_SIZE: usize = STREAM_IDENTIFIER.len() + + CHUNK_HEADER_AND_CRC_SIZE + + MAX_COMPRESS_BLOCK_SIZE; + +/// A reader for decompressing a Snappy stream. +/// +/// This `FrameDecoder` wraps any other reader that implements `std::io::Read`. +/// Bytes read from this reader are decompressed using the +/// [Snappy frame format](https://github.com/google/snappy/blob/master/framing_format.txt) +/// (file extension `sz`, MIME type `application/x-snappy-framed`). +/// +/// This reader can potentially make many small reads from the underlying +/// stream depending on its format, therefore, passing in a buffered reader +/// may be beneficial. +pub struct FrameDecoder<R: io::Read> { + /// The underlying reader. + r: R, + /// A Snappy decoder that we reuse that does the actual block based + /// decompression. + dec: Decoder, + /// A CRC32 checksummer that is configured to either use the portable + /// fallback version or the SSE4.2 accelerated version when the right CPU + /// features are available. + checksummer: CheckSummer, + /// The compressed bytes buffer, taken from the underlying reader. + src: Vec<u8>, + /// The decompressed bytes buffer. Bytes are decompressed from src to dst + /// before being passed back to the caller. + dst: Vec<u8>, + /// Index into dst: starting point of bytes not yet given back to caller. + dsts: usize, + /// Index into dst: ending point of bytes not yet given back to caller. + dste: usize, + /// Whether we've read the special stream header or not. + read_stream_ident: bool, +} + +impl<R: io::Read> FrameDecoder<R> { + /// Create a new reader for streaming Snappy decompression. + pub fn new(rdr: R) -> FrameDecoder<R> { + FrameDecoder { + r: rdr, + dec: Decoder::new(), + checksummer: CheckSummer::new(), + src: vec![0; MAX_COMPRESS_BLOCK_SIZE], + dst: vec![0; MAX_BLOCK_SIZE], + dsts: 0, + dste: 0, + read_stream_ident: false, + } + } + + /// Gets a reference to the underlying reader in this decoder. + pub fn get_ref(&self) -> &R { + &self.r + } + + /// Gets a mutable reference to the underlying reader in this decoder. + /// + /// Note that mutation of the stream may result in surprising results if + /// this decoder is continued to be used. + pub fn get_mut(&mut self) -> &mut R { + &mut self.r + } +} + +impl<R: io::Read> io::Read for FrameDecoder<R> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + macro_rules! fail { + ($err:expr) => { + return Err(io::Error::from($err)) + }; + } + loop { + if self.dsts < self.dste { + let len = cmp::min(self.dste - self.dsts, buf.len()); + let dste = self.dsts.checked_add(len).unwrap(); + buf[0..len].copy_from_slice(&self.dst[self.dsts..dste]); + self.dsts = dste; + return Ok(len); + } + if !read_exact_eof(&mut self.r, &mut self.src[0..4])? { + return Ok(0); + } + let ty = ChunkType::from_u8(self.src[0]); + if !self.read_stream_ident { + if ty != Ok(ChunkType::Stream) { + fail!(Error::StreamHeader { byte: self.src[0] }); + } + self.read_stream_ident = true; + } + let len64 = bytes::read_u24_le(&self.src[1..]) as u64; + if len64 > self.src.len() as u64 { + fail!(Error::UnsupportedChunkLength { + len: len64, + header: false, + }); + } + let len = len64 as usize; + match ty { + Err(b) if 0x02 <= b && b <= 0x7F => { + // Spec says that chunk types 0x02-0x7F are reserved and + // conformant decoders must return an error. + fail!(Error::UnsupportedChunkType { byte: b }); + } + Err(b) if 0x80 <= b && b <= 0xFD => { + // Spec says that chunk types 0x80-0xFD are reserved but + // skippable. + self.r.read_exact(&mut self.src[0..len])?; + } + Err(b) => { + // Can never happen. 0x02-0x7F and 0x80-0xFD are handled + // above in the error case. That leaves 0x00, 0x01, 0xFE + // and 0xFF, each of which correspond to one of the four + // defined chunk types. + unreachable!("BUG: unhandled chunk type: {}", b); + } + Ok(ChunkType::Padding) => { + // Just read and move on. + self.r.read_exact(&mut self.src[0..len])?; + } + Ok(ChunkType::Stream) => { + if len != STREAM_BODY.len() { + fail!(Error::UnsupportedChunkLength { + len: len64, + header: true, + }) + } + self.r.read_exact(&mut self.src[0..len])?; + if &self.src[0..len] != STREAM_BODY { + fail!(Error::StreamHeaderMismatch { + bytes: self.src[0..len].to_vec(), + }); + } + } + Ok(ChunkType::Uncompressed) => { + if len < 4 { + fail!(Error::UnsupportedChunkLength { + len: len as u64, + header: false, + }); + } + let expected_sum = bytes::io_read_u32_le(&mut self.r)?; + let n = len - 4; + if n > self.dst.len() { + fail!(Error::UnsupportedChunkLength { + len: n as u64, + header: false, + }); + } + self.r.read_exact(&mut self.dst[0..n])?; + let got_sum = + self.checksummer.crc32c_masked(&self.dst[0..n]); + if expected_sum != got_sum { + fail!(Error::Checksum { + expected: expected_sum, + got: got_sum, + }); + } + self.dsts = 0; + self.dste = n; + } + Ok(ChunkType::Compressed) => { + if len < 4 { + fail!(Error::UnsupportedChunkLength { + len: len as u64, + header: false, + }); + } + let expected_sum = bytes::io_read_u32_le(&mut self.r)?; + let sn = len - 4; + if sn > self.src.len() { + fail!(Error::UnsupportedChunkLength { + len: len64, + header: false, + }); + } + self.r.read_exact(&mut self.src[0..sn])?; + let dn = decompress_len(&self.src)?; + if dn > self.dst.len() { + fail!(Error::UnsupportedChunkLength { + len: dn as u64, + header: false, + }); + } + self.dec + .decompress(&self.src[0..sn], &mut self.dst[0..dn])?; + let got_sum = + self.checksummer.crc32c_masked(&self.dst[0..dn]); + if expected_sum != got_sum { + fail!(Error::Checksum { + expected: expected_sum, + got: got_sum, + }); + } + self.dsts = 0; + self.dste = dn; + } + } + } + } +} + +impl<R: fmt::Debug + io::Read> fmt::Debug for FrameDecoder<R> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FrameDecoder") + .field("r", &self.r) + .field("dec", &self.dec) + .field("checksummer", &self.checksummer) + .field("src", &"[...]") + .field("dst", &"[...]") + .field("dsts", &self.dsts) + .field("dste", &self.dste) + .field("read_stream_ident", &self.read_stream_ident) + .finish() + } +} + +/// A reader for compressing data using snappy as it is read. +/// +/// This `FrameEncoder` wraps any other reader that implements `std::io::Read`. +/// Bytes read from this reader are compressed using the +/// [Snappy frame format](https://github.com/google/snappy/blob/master/framing_format.txt) +/// (file extension `sz`, MIME type `application/x-snappy-framed`). +/// +/// Usually you'll want +/// [`read::FrameDecoder`](struct.FrameDecoder.html) +/// (for decompressing while reading) or +/// [`write::FrameEncoder`](../write/struct.FrameEncoder.html) +/// (for compressing while writing) instead. +/// +/// Unlike `FrameDecoder`, this will attempt to make large reads roughly +/// equivalent to the size of a single Snappy block. Therefore, callers may not +/// benefit from using a buffered reader. +pub struct FrameEncoder<R: io::Read> { + /// Internally, we split `FrameEncoder` in two to keep the borrow checker + /// happy. The `inner` member contains everything that `read_frame` needs + /// to fetch a frame's worth of data and compress it. + inner: Inner<R>, + /// Data that we've encoded and are ready to return to our caller. + dst: Vec<u8>, + /// Starting point of bytes in `dst` not yet given back to the caller. + dsts: usize, + /// Ending point of bytes in `dst` that we want to give to our caller. + dste: usize, +} + +struct Inner<R: io::Read> { + /// The underlying data source. + r: R, + /// An encoder that we reuse that does the actual block based compression. + enc: Encoder, + /// A CRC32 checksummer that is configured to either use the portable + /// fallback version or the SSE4.2 accelerated version when the right CPU + /// features are available. + checksummer: CheckSummer, + /// Data taken from the underlying `r`, and not yet compressed. + src: Vec<u8>, + /// Have we written the standard snappy header to `dst` yet? + wrote_stream_ident: bool, +} + +impl<R: io::Read> FrameEncoder<R> { + /// Create a new reader for streaming Snappy compression. + pub fn new(rdr: R) -> FrameEncoder<R> { + FrameEncoder { + inner: Inner { + r: rdr, + enc: Encoder::new(), + checksummer: CheckSummer::new(), + src: vec![0; MAX_BLOCK_SIZE], + wrote_stream_ident: false, + }, + dst: vec![0; MAX_READ_FRAME_ENCODER_BLOCK_SIZE], + dsts: 0, + dste: 0, + } + } + + /// Gets a reference to the underlying reader in this decoder. + pub fn get_ref(&self) -> &R { + &self.inner.r + } + + /// Gets a mutable reference to the underlying reader in this decoder. + /// + /// Note that mutation of the stream may result in surprising results if + /// this encoder is continued to be used. + pub fn get_mut(&mut self) -> &mut R { + &mut self.inner.r + } + + /// Read previously compressed data from `self.dst`, returning the number of + /// bytes read. If `self.dst` is empty, returns 0. + fn read_from_dst(&mut self, buf: &mut [u8]) -> usize { + let available_bytes = self.dste - self.dsts; + let count = cmp::min(available_bytes, buf.len()); + buf[..count].copy_from_slice(&self.dst[self.dsts..self.dsts + count]); + self.dsts += count; + count + } +} + +impl<R: io::Read> io::Read for FrameEncoder<R> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + // Try reading previously compressed bytes from our `dst` buffer, if + // any. + let count = self.read_from_dst(buf); + + if count > 0 { + // We had some bytes in our `dst` buffer that we used. + Ok(count) + } else if buf.len() >= MAX_READ_FRAME_ENCODER_BLOCK_SIZE { + // Our output `buf` is big enough that we can directly write into + // it, so bypass `dst` entirely. + self.inner.read_frame(buf) + } else { + // We need to refill `self.dst`, and then return some bytes from + // that. + let count = self.inner.read_frame(&mut self.dst)?; + self.dsts = 0; + self.dste = count; + Ok(self.read_from_dst(buf)) + } + } +} + +impl<R: io::Read> Inner<R> { + /// Read from `self.r`, and create a new frame, writing it to `dst`, which + /// must be at least `MAX_READ_FRAME_ENCODER_BLOCK_SIZE` bytes in size. + fn read_frame(&mut self, dst: &mut [u8]) -> io::Result<usize> { + debug_assert!(dst.len() >= MAX_READ_FRAME_ENCODER_BLOCK_SIZE); + + // We make one read to the underlying reader. If the underlying reader + // doesn't fill the buffer but there are still bytes to be read, then + // compression won't be optimal. The alternative would be to block + // until our buffer is maximally full (or we see EOF), but this seems + // more surprising. In general, io::Read implementations should try to + // fill the caller's buffer as much as they can, so this seems like the + // better choice. + let nread = self.r.read(&mut self.src)?; + if nread == 0 { + return Ok(0); + } + + // If we haven't yet written the stream header to `dst`, write it. + let mut dst_write_start = 0; + if !self.wrote_stream_ident { + dst[0..STREAM_IDENTIFIER.len()].copy_from_slice(STREAM_IDENTIFIER); + dst_write_start += STREAM_IDENTIFIER.len(); + self.wrote_stream_ident = true; + } + + // Reserve space for our chunk header. We need to use `split_at_mut` so + // that we can get two mutable slices pointing at non-overlapping parts + // of `dst`. + let (chunk_header, remaining_dst) = + dst[dst_write_start..].split_at_mut(CHUNK_HEADER_AND_CRC_SIZE); + dst_write_start += CHUNK_HEADER_AND_CRC_SIZE; + + // Compress our frame if possible, telling `compress_frame` to always + // put the output in `dst`. + let frame_data = compress_frame( + &mut self.enc, + self.checksummer, + &self.src[..nread], + chunk_header, + remaining_dst, + true, + )?; + Ok(dst_write_start + frame_data.len()) + } +} + +impl<R: fmt::Debug + io::Read> fmt::Debug for FrameEncoder<R> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FrameEncoder") + .field("inner", &self.inner) + .field("dst", &"[...]") + .field("dsts", &self.dsts) + .field("dste", &self.dste) + .finish() + } +} + +impl<R: fmt::Debug + io::Read> fmt::Debug for Inner<R> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Inner") + .field("r", &self.r) + .field("enc", &self.enc) + .field("checksummer", &self.checksummer) + .field("src", &"[...]") + .field("wrote_stream_ident", &self.wrote_stream_ident) + .finish() + } +} + +// read_exact_eof is like Read::read_exact, except it detects EOF +// and returns Ok(false) instead of an error. +// +// If buf was read successfully, it returns Ok(true). +fn read_exact_eof<R: io::Read>( + rdr: &mut R, + buf: &mut [u8], +) -> io::Result<bool> { + match rdr.read(buf) { + // EOF + Ok(0) => Ok(false), + // Read everything w/ the read call + Ok(i) if i == buf.len() => Ok(true), + // There's some bytes left to fill, which can be deferred to read_exact + Ok(i) => { + rdr.read_exact(&mut buf[i..])?; + Ok(true) + } + Err(e) => Err(e), + } +} |