//! I/O streams for wrapping `BufRead` types as encoders/decoders use std::io::prelude::*; use std::io; use lzma_sys; #[cfg(feature = "tokio")] use futures::Poll; #[cfg(feature = "tokio")] use tokio_io::{AsyncRead, AsyncWrite}; use stream::{Stream, Check, Action, Status}; /// An xz encoder, or compressor. /// /// This structure implements a `BufRead` interface and will read uncompressed /// data from an underlying stream and emit a stream of compressed data. pub struct XzEncoder { obj: R, data: Stream, } /// A xz decoder, or decompressor. /// /// This structure implements a `BufRead` interface and takes a stream of /// compressed data as input, providing the decompressed data when read from. pub struct XzDecoder { obj: R, data: Stream, } impl XzEncoder { /// Creates a new encoder which will read uncompressed data from the given /// stream and emit the compressed stream. /// /// The `level` argument here is typically 0-9 with 6 being a good default. pub fn new(r: R, level: u32) -> XzEncoder { let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap(); XzEncoder::new_stream(r, stream) } /// Creates a new encoder with a custom `Stream`. /// /// The `Stream` can be pre-configured for multithreaded encoding, different /// compression options/tuning, etc. pub fn new_stream(r: R, stream: Stream) -> XzEncoder { XzEncoder { obj: r, data: stream, } } } impl XzEncoder { /// Acquires a reference to the underlying stream pub fn get_ref(&self) -> &R { &self.obj } /// Acquires a mutable reference to the underlying stream /// /// Note that mutation of the stream may result in surprising results if /// this encoder is continued to be used. pub fn get_mut(&mut self) -> &mut R { &mut self.obj } /// Consumes this encoder, returning the underlying reader. pub fn into_inner(self) -> R { self.obj } /// Returns the number of bytes produced by the compressor /// (e.g. the number of bytes read from this stream) /// /// Note that, due to buffering, this only bears any relation to /// total_in() when the compressor chooses to flush its data /// (unfortunately, this won't happen this won't happen in general /// at the end of the stream, because the compressor doesn't know /// if there's more data to come). At that point, /// `total_out() / total_in()` would be the compression ratio. pub fn total_out(&self) -> u64 { self.data.total_out() } /// Returns the number of bytes consumed by the compressor /// (e.g. the number of bytes read from the underlying stream) pub fn total_in(&self) -> u64 { self.data.total_in() } } impl Read for XzEncoder { fn read(&mut self, buf: &mut [u8]) -> io::Result { loop { let (read, consumed, eof, ret); { let input = self.obj.fill_buf()?; eof = input.is_empty(); let before_out = self.data.total_out(); let before_in = self.data.total_in(); let action = if eof {Action::Finish} else {Action::Run}; ret = self.data.process(input, buf, action); read = (self.data.total_out() - before_out) as usize; consumed = (self.data.total_in() - before_in) as usize; } self.obj.consume(consumed); ret.unwrap(); // If we haven't ready any data and we haven't hit EOF yet, then we // need to keep asking for more data because if we return that 0 // bytes of data have been read then it will be interpreted as EOF. if read == 0 && !eof && buf.len() > 0 { continue } return Ok(read) } } } #[cfg(feature = "tokio")] impl AsyncRead for XzEncoder { } impl Write for XzEncoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) } fn flush(&mut self) -> io::Result<()> { self.get_mut().flush() } } #[cfg(feature = "tokio")] impl AsyncWrite for XzEncoder { fn shutdown(&mut self) -> Poll<(), io::Error> { self.get_mut().shutdown() } } impl XzDecoder { /// Creates a new decoder which will decompress data read from the given /// stream. pub fn new(r: R) -> XzDecoder { let stream = Stream::new_stream_decoder(u64::max_value(), 0).unwrap(); XzDecoder::new_stream(r, stream) } /// Creates a new decoder which will decompress data read from the given /// input. All the concatenated xz streams from input will be consumed. pub fn new_multi_decoder(r: R) -> XzDecoder { let stream = Stream::new_auto_decoder(u64::max_value(), lzma_sys::LZMA_CONCATENATED).unwrap(); XzDecoder::new_stream(r, stream) } /// Creates a new decoder with a custom `Stream`. /// /// The `Stream` can be pre-configured for various checks, different /// decompression options/tuning, etc. pub fn new_stream(r: R, stream: Stream) -> XzDecoder { XzDecoder { obj: r, data: stream } } } impl XzDecoder { /// Acquires a reference to the underlying stream pub fn get_ref(&self) -> &R { &self.obj } /// Acquires a mutable reference to the underlying stream /// /// Note that mutation of the stream may result in surprising results if /// this encoder is continued to be used. pub fn get_mut(&mut self) -> &mut R { &mut self.obj } /// Consumes this decoder, returning the underlying reader. pub fn into_inner(self) -> R { self.obj } /// Returns the number of bytes that the decompressor has consumed. /// /// Note that this will likely be smaller than what the decompressor /// actually read from the underlying stream due to buffering. pub fn total_in(&self) -> u64 { self.data.total_in() } /// Returns the number of bytes that the decompressor has produced. pub fn total_out(&self) -> u64 { self.data.total_out() } } impl Read for XzDecoder { fn read(&mut self, buf: &mut [u8]) -> io::Result { loop { let (read, consumed, eof, ret); { let input = self.obj.fill_buf()?; eof = input.is_empty(); let before_out = self.data.total_out(); let before_in = self.data.total_in(); ret = self.data.process(input, buf, if eof { Action::Finish } else { Action::Run }); read = (self.data.total_out() - before_out) as usize; consumed = (self.data.total_in() - before_in) as usize; } self.obj.consume(consumed); let status = ret?; if read > 0 || eof || buf.len() == 0 { if read == 0 && status != Status::StreamEnd && buf.len() > 0 { return Err(io::Error::new(io::ErrorKind::Other, "premature eof")) } return Ok(read) } if consumed == 0 { return Err(io::Error::new(io::ErrorKind::Other, "corrupt xz stream")) } } } } #[cfg(feature = "tokio")] impl AsyncRead for XzDecoder { } impl Write for XzDecoder { fn write(&mut self, buf: &[u8]) -> io::Result { self.get_mut().write(buf) } fn flush(&mut self) -> io::Result<()> { self.get_mut().flush() } } #[cfg(feature = "tokio")] impl AsyncWrite for XzDecoder { fn shutdown(&mut self) -> Poll<(), io::Error> { self.get_mut().shutdown() } } #[cfg(test)] mod tests { use bufread::{XzEncoder, XzDecoder}; use std::io::Read; #[test] fn compressed_and_trailing_data() { // Make a vector with compressed data... let mut to_compress : Vec = Vec::new(); const COMPRESSED_ORIG_SIZE: usize = 1024; for num in 0..COMPRESSED_ORIG_SIZE { to_compress.push(num as u8) } let mut encoder = XzEncoder::new(&to_compress[..], 6); let mut decoder_input = Vec::new(); encoder.read_to_end(&mut decoder_input).unwrap(); // ...plus additional unrelated trailing data const ADDITIONAL_SIZE : usize = 123; let mut additional_data = Vec::new(); for num in 0..ADDITIONAL_SIZE { additional_data.push(((25 + num) % 256) as u8) } decoder_input.extend(&additional_data); // Decoder must be able to read the compressed xz stream, and keep the trailing data. let mut decoder_reader = &decoder_input[..]; { let mut decoder = XzDecoder::new(&mut decoder_reader); let mut decompressed_data = vec![0u8; to_compress.len()]; assert_eq!(decoder.read(&mut decompressed_data).unwrap(), COMPRESSED_ORIG_SIZE); assert_eq!(decompressed_data, &to_compress[..]); } let mut remaining_data = Vec::new(); let nb_read = decoder_reader.read_to_end(&mut remaining_data).unwrap(); assert_eq!(nb_read, ADDITIONAL_SIZE); assert_eq!(remaining_data, &additional_data[..]); } }