diff options
Diffstat (limited to 'vendor/ruzstd/src/frame_decoder.rs')
-rw-r--r-- | vendor/ruzstd/src/frame_decoder.rs | 569 |
1 files changed, 569 insertions, 0 deletions
diff --git a/vendor/ruzstd/src/frame_decoder.rs b/vendor/ruzstd/src/frame_decoder.rs new file mode 100644 index 000000000..560e82810 --- /dev/null +++ b/vendor/ruzstd/src/frame_decoder.rs @@ -0,0 +1,569 @@ +use super::frame; +use crate::decoding::dictionary::Dictionary; +use crate::decoding::scratch::DecoderScratch; +use crate::decoding::{self, dictionary}; +use std::collections::HashMap; +use std::convert::TryInto; +use std::hash::Hasher; +use std::io::{self, Read}; + +/// This implements a decoder for zstd frames. This decoder is able to decode frames only partially and gives control +/// over how many bytes/blocks will be decoded at a time (so you don't have to decode a 10GB file into memory all at once). +/// It reads bytes as needed from a provided source and can be read from to collect partial results. +/// +/// If you want to just read the whole frame with an io::Read without having to deal with manually calling decode_blocks +/// you can use the provided StreamingDecoder with wraps this FrameDecoder +/// +/// Workflow is as follows: +/// ``` +/// use ruzstd::frame_decoder::BlockDecodingStrategy; +/// use std::io::Read; +/// use std::io::Write; +/// +/// +/// fn decode_this(mut file: impl std::io::Read) { +/// //Create a new decoder +/// let mut frame_dec = ruzstd::FrameDecoder::new(); +/// let mut result = Vec::new(); +/// +/// // Use reset or init to make the decoder ready to decode the frame from the io::Read +/// frame_dec.reset(&mut file).unwrap(); +/// +/// // Loop until the frame has been decoded completely +/// while !frame_dec.is_finished() { +/// // decode (roughly) batch_size many bytes +/// frame_dec.decode_blocks(&mut file, BlockDecodingStrategy::UptoBytes(1024)).unwrap(); +/// +/// // read from the decoder to collect bytes from the internal buffer +/// let bytes_read = frame_dec.read(result.as_mut_slice()).unwrap(); +/// +/// // then do something with it +/// do_something(&result[0..bytes_read]); +/// } +/// +/// // handle the last chunk of data +/// while frame_dec.can_collect() > 0 { +/// let x = frame_dec.read(result.as_mut_slice()).unwrap(); +/// +/// do_something(&result[0..x]); +/// } +/// } +/// +/// fn do_something(data: &[u8]) { +/// std::io::stdout().write_all(data).unwrap(); +/// } +/// ``` +pub struct FrameDecoder { + state: Option<FrameDecoderState>, + dicts: HashMap<u32, Dictionary>, +} + +struct FrameDecoderState { + pub frame: frame::Frame, + decoder_scratch: DecoderScratch, + frame_finished: bool, + block_counter: usize, + bytes_read_counter: u64, + check_sum: Option<u32>, + using_dict: Option<u32>, +} + +pub enum BlockDecodingStrategy { + All, + UptoBlocks(usize), + UptoBytes(usize), +} + +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum FrameDecoderError { + #[error(transparent)] + ReadFrameHeaderError(#[from] frame::ReadFrameHeaderError), + #[error(transparent)] + FrameHeaderError(#[from] frame::FrameHeaderError), + #[error(transparent)] + FrameCheckError(#[from] frame::FrameCheckError), + #[error("Specified window_size is too big; Requested: {requested}, Max: {MAX_WINDOW_SIZE}")] + WindowSizeTooBig { requested: u64 }, + #[error(transparent)] + DictionaryDecodeError(#[from] dictionary::DictionaryDecodeError), + #[error("Failed to parse/decode block body: {0}")] + FailedToReadBlockHeader(#[from] decoding::block_decoder::BlockHeaderReadError), + #[error("Failed to parse block header: {0}")] + FailedToReadBlockBody(decoding::block_decoder::DecodeBlockContentError), + #[error("Failed to read checksum: {0}")] + FailedToReadChecksum(#[source] io::Error), + #[error("Decoder must initialized or reset before using it")] + NotYetInitialized, + #[error("Decoder encountered error while initializing: {0}")] + FailedToInitialize(frame::FrameHeaderError), + #[error("Decoder encountered error while draining the decodebuffer: {0}")] + FailedToDrainDecodebuffer(#[source] io::Error), + #[error("Target must have at least as many bytes as the contentsize of the frame reports")] + TargetTooSmall, + #[error("Frame header specified dictionary id that wasnt provided by add_dict() or reset_with_dict()")] + DictNotProvided, +} + +const MAX_WINDOW_SIZE: u64 = 1024 * 1024 * 100; + +impl FrameDecoderState { + pub fn new(source: impl Read) -> Result<FrameDecoderState, FrameDecoderError> { + let (frame, header_size) = frame::read_frame_header(source)?; + let window_size = frame.header.window_size()?; + frame.check_valid()?; + Ok(FrameDecoderState { + frame, + frame_finished: false, + block_counter: 0, + decoder_scratch: DecoderScratch::new(window_size as usize), + bytes_read_counter: u64::from(header_size), + check_sum: None, + using_dict: None, + }) + } + + pub fn reset(&mut self, source: impl Read) -> Result<(), FrameDecoderError> { + let (frame, header_size) = frame::read_frame_header(source)?; + let window_size = frame.header.window_size()?; + frame.check_valid()?; + + if window_size > MAX_WINDOW_SIZE { + return Err(FrameDecoderError::WindowSizeTooBig { + requested: window_size, + }); + } + + self.frame = frame; + self.frame_finished = false; + self.block_counter = 0; + self.decoder_scratch.reset(window_size as usize); + self.bytes_read_counter = u64::from(header_size); + self.check_sum = None; + self.using_dict = None; + Ok(()) + } +} + +impl Default for FrameDecoder { + fn default() -> Self { + Self::new() + } +} + +impl FrameDecoder { + /// This will create a new decoder without allocating anything yet. + /// init()/reset() will allocate all needed buffers if it is the first time this decoder is used + /// else they just reset these buffers with not further allocations + pub fn new() -> FrameDecoder { + FrameDecoder { + state: None, + dicts: HashMap::new(), + } + } + + /// init() will allocate all needed buffers if it is the first time this decoder is used + /// else they just reset these buffers with not further allocations + /// + /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer() + /// + /// equivalent to reset() + pub fn init(&mut self, source: impl Read) -> Result<(), FrameDecoderError> { + self.reset(source) + } + /// Like init but provides the dict to use for the next frame + pub fn init_with_dict( + &mut self, + source: impl Read, + dict: &[u8], + ) -> Result<(), FrameDecoderError> { + self.reset_with_dict(source, dict) + } + + /// reset() will allocate all needed buffers if it is the first time this decoder is used + /// else they just reset these buffers with not further allocations + /// + /// Note that all bytes currently in the decodebuffer from any previous frame will be lost. Collect them with collect()/collect_to_writer() + /// + /// equivalent to init() + pub fn reset(&mut self, source: impl Read) -> Result<(), FrameDecoderError> { + match &mut self.state { + Some(s) => s.reset(source), + None => { + self.state = Some(FrameDecoderState::new(source)?); + Ok(()) + } + } + } + + /// Like reset but provides the dict to use for the next frame + pub fn reset_with_dict( + &mut self, + source: impl Read, + dict: &[u8], + ) -> Result<(), FrameDecoderError> { + self.reset(source)?; + if let Some(state) = &mut self.state { + let id = state.decoder_scratch.load_dict(dict)?; + state.using_dict = Some(id); + }; + Ok(()) + } + + /// Add a dict to the FrameDecoder that can be used when needed. The FrameDecoder uses the appropriate one dynamically + pub fn add_dict(&mut self, raw_dict: &[u8]) -> Result<(), FrameDecoderError> { + let dict = Dictionary::decode_dict(raw_dict)?; + self.dicts.insert(dict.id, dict); + Ok(()) + } + + /// Returns how many bytes the frame contains after decompression + pub fn content_size(&self) -> Option<u64> { + let state = match &self.state { + None => return Some(0), + Some(s) => s, + }; + + match state.frame.header.frame_content_size() { + Err(_) => None, + Ok(x) => Some(x), + } + } + + /// Returns the checksum that was read from the data. Only available after all bytes have been read. It is the last 4 bytes of a zstd-frame + pub fn get_checksum_from_data(&self) -> Option<u32> { + let state = match &self.state { + None => return None, + Some(s) => s, + }; + + state.check_sum + } + + /// Returns the checksum that was calculated while decoding. + /// Only a sensible value after all decoded bytes have been collected/read from the FrameDecoder + pub fn get_calculated_checksum(&self) -> Option<u32> { + let state = match &self.state { + None => return None, + Some(s) => s, + }; + let cksum_64bit = state.decoder_scratch.buffer.hash.finish(); + //truncate to lower 32bit because reasons... + Some(cksum_64bit as u32) + } + + /// Counter for how many bytes have been consumed while decoding the frame + pub fn bytes_read_from_source(&self) -> u64 { + let state = match &self.state { + None => return 0, + Some(s) => s, + }; + state.bytes_read_counter + } + + /// Whether the current frames last block has been decoded yet + /// If this returns true you can call the drain* functions to get all content + /// (the read() function will drain automatically if this returns true) + pub fn is_finished(&self) -> bool { + let state = match &self.state { + None => return true, + Some(s) => s, + }; + if state.frame.header.descriptor.content_checksum_flag() { + state.frame_finished && state.check_sum.is_some() + } else { + state.frame_finished + } + } + + /// Counter for how many blocks have already been decoded + pub fn blocks_decoded(&self) -> usize { + let state = match &self.state { + None => return 0, + Some(s) => s, + }; + state.block_counter + } + + /// Decodes blocks from a reader. It requires that the framedecoder has been initialized first. + /// The Strategy influences how many blocks will be decoded before the function returns + /// This is important if you want to manage memory consumption carefully. If you don't care + /// about that you can just choose the strategy "All" and have all blocks of the frame decoded into the buffer + pub fn decode_blocks( + &mut self, + mut source: impl Read, + strat: BlockDecodingStrategy, + ) -> Result<bool, FrameDecoderError> { + use FrameDecoderError as err; + let state = self.state.as_mut().ok_or(err::NotYetInitialized)?; + + if let Some(id) = state.frame.header.dictionary_id().map_err( + //should never happen we check this directly after decoding the frame header + err::FailedToInitialize, + )? { + match state.using_dict { + Some(using_id) => { + //happy + debug_assert!(id == using_id); + } + None => { + let dict = self.dicts.get(&id).ok_or(err::DictNotProvided)?; + state.decoder_scratch.use_dict(dict); + state.using_dict = Some(id); + } + } + } + + let mut block_dec = decoding::block_decoder::new(); + + let buffer_size_before = state.decoder_scratch.buffer.len(); + let block_counter_before = state.block_counter; + loop { + if crate::VERBOSE { + println!("################"); + println!("Next Block: {}", state.block_counter); + println!("################"); + } + let (block_header, block_header_size) = block_dec + .read_block_header(&mut source) + .map_err(err::FailedToReadBlockHeader)?; + state.bytes_read_counter += u64::from(block_header_size); + + if crate::VERBOSE { + println!(); + println!( + "Found {} block with size: {}, which will be of size: {}", + block_header.block_type, + block_header.content_size, + block_header.decompressed_size + ); + } + + let bytes_read_in_block_body = block_dec + .decode_block_content(&block_header, &mut state.decoder_scratch, &mut source) + .map_err(err::FailedToReadBlockBody)?; + state.bytes_read_counter += bytes_read_in_block_body; + + state.block_counter += 1; + + if crate::VERBOSE { + println!("Output: {}", state.decoder_scratch.buffer.len()); + } + + if block_header.last_block { + state.frame_finished = true; + if state.frame.header.descriptor.content_checksum_flag() { + let mut chksum = [0u8; 4]; + source + .read_exact(&mut chksum) + .map_err(err::FailedToReadChecksum)?; + state.bytes_read_counter += 4; + let chksum = u32::from_le_bytes(chksum); + state.check_sum = Some(chksum); + } + break; + } + + match strat { + BlockDecodingStrategy::All => { /* keep going */ } + BlockDecodingStrategy::UptoBlocks(n) => { + if state.block_counter - block_counter_before >= n { + break; + } + } + BlockDecodingStrategy::UptoBytes(n) => { + if state.decoder_scratch.buffer.len() - buffer_size_before >= n { + break; + } + } + } + } + + Ok(state.frame_finished) + } + + /// Collect bytes and retain window_size bytes while decoding is still going on. + /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes + pub fn collect(&mut self) -> Option<Vec<u8>> { + let finished = self.is_finished(); + let state = self.state.as_mut()?; + if finished { + Some(state.decoder_scratch.buffer.drain()) + } else { + state.decoder_scratch.buffer.drain_to_window_size() + } + } + + /// Collect bytes and retain window_size bytes while decoding is still going on. + /// After decoding of the frame (is_finished() == true) has finished it will collect all remaining bytes + pub fn collect_to_writer(&mut self, w: impl std::io::Write) -> Result<usize, std::io::Error> { + let finished = self.is_finished(); + let state = match &mut self.state { + None => return Ok(0), + Some(s) => s, + }; + if finished { + state.decoder_scratch.buffer.drain_to_writer(w) + } else { + state.decoder_scratch.buffer.drain_to_window_size_writer(w) + } + } + + /// How many bytes can currently be collected from the decodebuffer, while decoding is going on this will be lower than the actual decodbuffer size + /// because window_size bytes need to be retained for decoding. + /// After decoding of the frame (is_finished() == true) has finished it will report all remaining bytes + pub fn can_collect(&self) -> usize { + let finished = self.is_finished(); + let state = match &self.state { + None => return 0, + Some(s) => s, + }; + if finished { + state.decoder_scratch.buffer.can_drain() + } else { + state + .decoder_scratch + .buffer + .can_drain_to_window_size() + .unwrap_or(0) + } + } + + /// Decodes as many blocks as possible from the source slice and reads from the decodebuffer into the target slice + /// The source slice may contain only parts of a frame but must contain at least one full block to make progress + /// + /// By all means use decode_blocks if you have a io.Reader available. This is just for compatibility with other decompressors + /// which try to serve an old-style c api + /// + /// Returns (read, written), if read == 0 then the source did not contain a full block and further calls with the same + /// input will not make any progress! + /// + /// Note that no kind of block can be bigger than 128kb. + /// So to be safe use at least 128*1024 (max block content size) + 3 (block_header size) + 18 (max frame_header size) bytes as your source buffer + /// + /// You may call this function with an empty source after all bytes have been decoded. This is equivalent to just call decoder.read(&mut target) + pub fn decode_from_to( + &mut self, + source: &[u8], + target: &mut [u8], + ) -> Result<(usize, usize), FrameDecoderError> { + use FrameDecoderError as err; + let bytes_read_at_start = match &self.state { + Some(s) => s.bytes_read_counter, + None => 0, + }; + + if !self.is_finished() || self.state.is_none() { + let mut mt_source = source; + + if self.state.is_none() { + self.init(&mut mt_source)?; + } + + //pseudo block to scope "state" so we can borrow self again after the block + { + let mut state = match &mut self.state { + Some(s) => s, + None => panic!("Bug in library"), + }; + let mut block_dec = decoding::block_decoder::new(); + + if state.frame.header.descriptor.content_checksum_flag() + && state.frame_finished + && state.check_sum.is_none() + { + //this block is needed if the checksum were the only 4 bytes that were not included in the last decode_from_to call for a frame + if mt_source.len() >= 4 { + let chksum = mt_source[..4].try_into().expect("optimized away"); + state.bytes_read_counter += 4; + let chksum = u32::from_le_bytes(chksum); + state.check_sum = Some(chksum); + } + return Ok((4, 0)); + } + + if let Some(id) = state.frame.header.dictionary_id().map_err( + //should never happen we check this directly after decoding the frame header + err::FailedToInitialize, + )? { + match state.using_dict { + Some(using_id) => { + //happy + debug_assert!(id == using_id); + } + None => { + let dict = self.dicts.get(&id).ok_or(err::DictNotProvided)?; + state.decoder_scratch.use_dict(dict); + state.using_dict = Some(id); + } + } + } + + loop { + //check if there are enough bytes for the next header + if mt_source.len() < 3 { + break; + } + let (block_header, block_header_size) = block_dec + .read_block_header(&mut mt_source) + .map_err(err::FailedToReadBlockHeader)?; + + // check the needed size for the block before updating counters. + // If not enough bytes are in the source, the header will have to be read again, so act like we never read it in the first place + if mt_source.len() < block_header.content_size as usize { + break; + } + state.bytes_read_counter += u64::from(block_header_size); + + let bytes_read_in_block_body = block_dec + .decode_block_content( + &block_header, + &mut state.decoder_scratch, + &mut mt_source, + ) + .map_err(err::FailedToReadBlockBody)?; + state.bytes_read_counter += bytes_read_in_block_body; + state.block_counter += 1; + + if block_header.last_block { + state.frame_finished = true; + if state.frame.header.descriptor.content_checksum_flag() { + //if there are enough bytes handle this here. Else the block at the start of this function will handle it at the next call + if mt_source.len() >= 4 { + let chksum = mt_source[..4].try_into().expect("optimized away"); + state.bytes_read_counter += 4; + let chksum = u32::from_le_bytes(chksum); + state.check_sum = Some(chksum); + } + } + break; + } + } + } + } + + let result_len = self.read(target).map_err(err::FailedToDrainDecodebuffer)?; + let bytes_read_at_end = match &mut self.state { + Some(s) => s.bytes_read_counter, + None => panic!("Bug in library"), + }; + let read_len = bytes_read_at_end - bytes_read_at_start; + Ok((read_len as usize, result_len)) + } +} + +/// Read bytes from the decode_buffer that are no longer needed. While the frame is not yet finished +/// this will retain window_size bytes, else it will drain it completely +impl std::io::Read for FrameDecoder { + fn read(&mut self, target: &mut [u8]) -> std::result::Result<usize, std::io::Error> { + let state = match &mut self.state { + None => return Ok(0), + Some(s) => s, + }; + if state.frame_finished { + state.decoder_scratch.buffer.read_all(target) + } else { + state.decoder_scratch.buffer.read(target) + } + } +} |