diff options
-rw-r--r-- | ChangeLog | 21 | ||||
-rw-r--r-- | INSTALL | 11 | ||||
-rw-r--r-- | Makefile.in | 39 | ||||
-rw-r--r-- | NEWS | 20 | ||||
-rw-r--r-- | README | 30 | ||||
-rw-r--r-- | arg_parser.cc | 20 | ||||
-rw-r--r-- | arg_parser.h | 24 | ||||
-rw-r--r-- | compress.cc | 184 | ||||
-rwxr-xr-x | configure | 93 | ||||
-rw-r--r-- | dec_stdout.cc | 331 | ||||
-rw-r--r-- | dec_stream.cc | 520 | ||||
-rw-r--r-- | decompress.cc | 551 | ||||
-rw-r--r-- | doc/plzip.1 | 8 | ||||
-rw-r--r-- | doc/plzip.info | 116 | ||||
-rw-r--r-- | doc/plzip.texinfo | 90 | ||||
-rw-r--r-- | file_index.cc | 143 | ||||
-rw-r--r-- | file_index.h | 77 | ||||
-rw-r--r-- | lzip.h | 246 | ||||
-rw-r--r-- | main.cc | 175 | ||||
-rw-r--r-- | plzip.h | 131 | ||||
-rwxr-xr-x | testsuite/check.sh | 67 | ||||
-rw-r--r-- | testsuite/test.txt.lz | bin | 0 -> 11518 bytes | |||
-rw-r--r-- | testsuite/test_v0.lz | bin | 11540 -> 0 bytes | |||
-rw-r--r-- | testsuite/test_v1.lz | bin | 11548 -> 0 bytes |
24 files changed, 1985 insertions, 912 deletions
@@ -1,3 +1,22 @@ +2013-03-08 Antonio Diaz Diaz <ant_diaz@teleline.es> + + * Version 1.0-rc1 released. + * compress.cc: 'deliver_packet' changed to 'deliver_packets'. + * Scalability of decompression from/to regular files has been + increased by removing splitter and muxer when not needed. + * The number of worker threads is now limited to the number of + members when decompressing from a regular file. + * Makefile.in: Added new target 'install-as-lzip'. + * Makefile.in: Added new target 'install-bin'. + * main.cc: Use 'setmode' instead of '_setmode' on Windows and OS/2. + * main.cc: Define 'strtoull' to 'std::strtoul' on Windows. + +2012-03-01 Antonio Diaz Diaz <ant_diaz@teleline.es> + + * Version 0.9 released. + * Minor fixes and cleanups. + * configure: 'datadir' renamed to 'datarootdir'. + 2012-01-17 Antonio Diaz Diaz <ant_diaz@teleline.es> * Version 0.8 released. @@ -76,7 +95,7 @@ until something better appears on the net. -Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz. +Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. This file is a collection of facts, and thus it is not copyrightable, but just in case, you have unlimited permission to copy, distribute and @@ -1,7 +1,7 @@ Requirements ------------ You will need a C++ compiler and the lzlib compression library installed. -I use gcc 4.3.5 and 3.3.6, but the code should compile with any +I use gcc 4.7.2 and 3.3.6, but the code should compile with any standards compliant compiler. Lzlib must be version 1.0 or newer. Gcc is available at http://gcc.gnu.org. @@ -34,6 +34,13 @@ the main archive. 5. Type 'make install' to install the program and any data files and documentation. + You can install only the program, the info manual or the man page + typing 'make install-bin', 'make install-info' or 'make install-man' + respectively. + +5a. Type 'make install-as-lzip' to install the program and any data + files and documentation, and link the program to the name 'lzip'. + Another way ----------- @@ -52,7 +59,7 @@ After running 'configure', you can run 'make' and 'make install' as explained above. -Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz. +Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. This file is free documentation: you have unlimited permission to copy, distribute and modify it. diff --git a/Makefile.in b/Makefile.in index 02f9cd8..4c78781 100644 --- a/Makefile.in +++ b/Makefile.in @@ -7,20 +7,21 @@ INSTALL_DIR = $(INSTALL) -d -m 755 LIBS = -llz -lpthread SHELL = /bin/sh -objs = arg_parser.o compress.o decompress.o main.o +objs = arg_parser.o file_index.o compress.o dec_stdout.o dec_stream.o \ + decompress.o main.o -.PHONY : all install install-info install-man install-strip \ - uninstall uninstall-info uninstall-man \ +.PHONY : all install install-bin install-info install-man install-strip \ + install-as-lzip uninstall uninstall-bin uninstall-info uninstall-man \ doc info man check dist clean distclean all : $(progname) $(progname) : $(objs) - $(CXX) $(LDFLAGS) -o $@ $^ $(LIBS) + $(CXX) $(LDFLAGS) -o $@ $(objs) $(LIBS) $(progname)_profiled : $(objs) - $(CXX) $(LDFLAGS) -pg -o $@ $^ $(LIBS) + $(CXX) $(LDFLAGS) -pg -o $@ $(objs) $(LIBS) main.o : main.cc $(CXX) $(CPPFLAGS) $(CXXFLAGS) -DPROGVERSION=\"$(pkgversion)\" -c -o $@ $< @@ -28,11 +29,15 @@ main.o : main.cc %.o : %.cc $(CXX) $(CPPFLAGS) $(CXXFLAGS) -c -o $@ $< -$(objs) : Makefile -arg_parser.o : arg_parser.h -compress.o : plzip.h -decompress.o : plzip.h -main.o : arg_parser.h plzip.h +$(objs) : Makefile +arg_parser.o : arg_parser.h +compress.o : lzip.h +dec_stdout.o : lzip.h file_index.h +dec_stream.o : lzip.h +decompress.o : lzip.h file_index.h +file_index.o : lzip.h file_index.h +main.o : arg_parser.h lzip.h + doc : info man @@ -53,7 +58,9 @@ Makefile : $(VPATH)/configure $(VPATH)/Makefile.in check : all @$(VPATH)/testsuite/check.sh $(VPATH)/testsuite $(pkgversion) -install : all install-info install-man +install : install-bin install-info install-man + +install-bin : all if [ ! -d "$(DESTDIR)$(bindir)" ] ; then $(INSTALL_DIR) "$(DESTDIR)$(bindir)" ; fi $(INSTALL_PROGRAM) ./$(progname) "$(DESTDIR)$(bindir)/$(progname)" @@ -69,7 +76,13 @@ install-man : install-strip : all $(MAKE) INSTALL_PROGRAM='$(INSTALL_PROGRAM) -s' install -uninstall : uninstall-info uninstall-man +install-as-lzip : install + -rm -f "$(DESTDIR)$(bindir)/lzip" + cd "$(DESTDIR)$(bindir)" && ln -s $(progname) lzip + +uninstall : uninstall-bin uninstall-info uninstall-man + +uninstall-bin : -rm -f "$(DESTDIR)$(bindir)/$(progname)" uninstall-info : @@ -95,7 +108,7 @@ dist : doc $(DISTNAME)/doc/$(pkgname).texinfo \ $(DISTNAME)/testsuite/check.sh \ $(DISTNAME)/testsuite/test.txt \ - $(DISTNAME)/testsuite/test_v[01].lz \ + $(DISTNAME)/testsuite/test.txt.lz \ $(DISTNAME)/*.h \ $(DISTNAME)/*.cc rm -f $(DISTNAME) @@ -1,17 +1,13 @@ -Changes in version 0.8: +Changes in version 1.0: -The option "-F, --recompress", which forces recompression of files whose -name already has the ".lz" or ".tlz" suffix, has been added. +Scalability of compression (max number of useful worker threads) has +been increased. -The options "-d, --decompress" and "-t, --test" now also show -compression ratio. +Scalability when decompressing from/to regular files has been increased. -Inability to change output file attributes has been downgraded from -error to warning. +The number of worker threads is now limited to the number of members in +the input file when decompressing from a regular file. -A small change has been made in the "--help" output and man page. +The target "install-as-lzip" has been added to the Makefile. -Quote characters in messages have been changed as advised by GNU Coding -Standards. - -Stdin and stdout are now set in binary mode on OS2. +The target "install-bin" has been added to the Makefile. @@ -11,8 +11,36 @@ multiprocessor machines, which makes it specially well suited for distribution of big software files and large scale data archiving. On files big enough, plzip can use hundreds of processors. +Plzip replaces every file given in the command line with a compressed +version of itself, with the name "original_name.lz". Each compressed +file has the same modification date, permissions, and, when possible, +ownership as the corresponding original, so that these properties can be +correctly restored at decompression time. Plzip is able to read from some +types of non regular files if the "--stdout" option is specified. -Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz. +If no file names are specified, plzip compresses (or decompresses) from +standard input to standard output. In this case, plzip will decline to +write compressed output to a terminal, as this would be entirely +incomprehensible and therefore pointless. + +Plzip will correctly decompress a file which is the concatenation of two +or more compressed files. The result is the concatenation of the +corresponding uncompressed files. Integrity testing of concatenated +compressed files is also supported. + +As a self-check for your protection, plzip stores in the member trailer +the 32-bit CRC of the original data and the size of the original data, +to make sure that the decompressed version of the data is identical to +the original. This guards against corruption of the compressed data, and +against undetected bugs in plzip (hopefully very unlikely). The chances +of data corruption going undetected are microscopic, less than one +chance in 4000 million for each member processed. Be aware, though, that +the check occurs upon decompression, so it can only tell you that +something is wrong. It can't help you recover the original uncompressed +data. + + +Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. This file is free documentation: you have unlimited permission to copy, distribute and modify it. diff --git a/arg_parser.cc b/arg_parser.cc index 27137a1..a28d2ba 100644 --- a/arg_parser.cc +++ b/arg_parser.cc @@ -1,5 +1,5 @@ /* Arg_parser - POSIX/GNU command line argument parser. (C++ version) - Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 + Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. This library is free software: you can redistribute it and/or modify @@ -36,7 +36,7 @@ bool Arg_parser::parse_long_option( const char * const opt, const char * const arg, const Option options[], int & argind ) { - unsigned int len; + unsigned len; int index = -1; bool exact = false, ambig = false; @@ -44,7 +44,7 @@ bool Arg_parser::parse_long_option( const char * const opt, const char * const a // Test all long options for either exact match or abbreviated matches. for( int i = 0; options[i].code != 0; ++i ) - if( options[i].name && !std::strncmp( options[i].name, &opt[2], len ) ) + if( options[i].name && std::strncmp( options[i].name, &opt[2], len ) == 0 ) { if( std::strlen( options[i].name ) == len ) // Exact match found { index = i; exact = true; break; } @@ -56,30 +56,30 @@ bool Arg_parser::parse_long_option( const char * const opt, const char * const a if( ambig && !exact ) { - error_ = "option `"; error_ += opt; error_ += "' is ambiguous"; + error_ = "option '"; error_ += opt; error_ += "' is ambiguous"; return false; } if( index < 0 ) // nothing found { - error_ = "unrecognized option `"; error_ += opt; error_ += '\''; + error_ = "unrecognized option '"; error_ += opt; error_ += '\''; return false; } ++argind; data.push_back( Record( options[index].code ) ); - if( opt[len+2] ) // `--<long_option>=<argument>' syntax + if( opt[len+2] ) // '--<long_option>=<argument>' syntax { if( options[index].has_arg == no ) { - error_ = "option `--"; error_ += options[index].name; + error_ = "option '--"; error_ += options[index].name; error_ += "' doesn't allow an argument"; return false; } if( options[index].has_arg == yes && !opt[len+3] ) { - error_ = "option `--"; error_ += options[index].name; + error_ = "option '--"; error_ += options[index].name; error_ += "' requires an argument"; return false; } @@ -91,7 +91,7 @@ bool Arg_parser::parse_long_option( const char * const opt, const char * const a { if( !arg || !arg[0] ) { - error_ = "option `--"; error_ += options[index].name; + error_ = "option '--"; error_ += options[index].name; error_ += "' requires an argument"; return false; } @@ -178,7 +178,7 @@ Arg_parser::Arg_parser( const int argc, const char * const argv[], if( error_.size() ) data.clear(); else { - for( unsigned int i = 0; i < non_options.size(); ++i ) + for( unsigned i = 0; i < non_options.size(); ++i ) { data.push_back( Record() ); data.back().argument.swap( non_options[i] ); } while( argind < argc ) { data.push_back( Record() ); data.back().argument = argv[argind++]; } diff --git a/arg_parser.h b/arg_parser.h index 5d036ab..5248cb1 100644 --- a/arg_parser.h +++ b/arg_parser.h @@ -1,5 +1,5 @@ /* Arg_parser - POSIX/GNU command line argument parser. (C++ version) - Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 + Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. This library is free software: you can redistribute it and/or modify @@ -26,12 +26,12 @@ Public License. */ -/* Arg_parser reads the arguments in `argv' and creates a number of +/* Arg_parser reads the arguments in 'argv' and creates a number of option codes, option arguments and non-option arguments. - In case of error, `error' returns a non-empty error message. + In case of error, 'error' returns a non-empty error message. - `options' is an array of `struct Option' terminated by an element + 'options' is an array of 'struct Option' terminated by an element containing a code which is zero. A null name means a short-only option. A code value outside the unsigned char range means a long-only option. @@ -40,13 +40,13 @@ were specified before all the non-option arguments for the purposes of parsing, even if the user of your program intermixed option and non-option arguments. If you want the arguments in the exact order - the user typed them, call `Arg_parser' with `in_order' = true. + the user typed them, call 'Arg_parser' with 'in_order' = true. - The argument `--' terminates all options; any following arguments are + The argument '--' terminates all options; any following arguments are treated as non-option arguments, even if they begin with a hyphen. - The syntax for optional option arguments is `-<short_option><argument>' - (without whitespace), or `--<long_option>=<argument>'. + The syntax for optional option arguments is '-<short_option><argument>' + (without whitespace), or '--<long_option>=<argument>'. */ class Arg_parser @@ -85,20 +85,20 @@ public: Arg_parser( const char * const opt, const char * const arg, const Option options[] ); - const std::string & error() const throw() { return error_; } + const std::string & error() const { return error_; } // The number of arguments parsed (may be different from argc) - int arguments() const throw() { return data.size(); } + int arguments() const { return data.size(); } // If code( i ) is 0, argument( i ) is a non-option. // Else argument( i ) is the option's argument (or empty). - int code( const int i ) const throw() + int code( const int i ) const { if( i >= 0 && i < arguments() ) return data[i].code; else return 0; } - const std::string & argument( const int i ) const throw() + const std::string & argument( const int i ) const { if( i >= 0 && i < arguments() ) return data[i].argument; else return error_; diff --git a/compress.cc b/compress.cc index cf0135a..c4428ea 100644 --- a/compress.cc +++ b/compress.cc @@ -1,6 +1,6 @@ /* Plzip - A parallel compressor compatible with lzip Copyright (C) 2009 Laszlo Ersek. - Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz. + Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -28,22 +28,53 @@ #include <queue> #include <string> #include <vector> -#include <inttypes.h> #include <pthread.h> +#include <stdint.h> #include <unistd.h> #include <lzlib.h> -#include "plzip.h" +#include "lzip.h" #ifndef LLONG_MAX #define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL #endif -#ifndef LLONG_MIN -#define LLONG_MIN (-LLONG_MAX - 1LL) -#endif -#ifndef ULLONG_MAX -#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL -#endif + + +// Returns the number of bytes really read. +// If (returned value < size) and (errno == 0), means EOF was reached. +// +int readblock( const int fd, uint8_t * const buf, const int size ) + { + int rest = size; + errno = 0; + while( rest > 0 ) + { + const int n = read( fd, buf + size - rest, rest ); + if( n > 0 ) rest -= n; + else if( n == 0 ) break; // EOF + else if( errno != EINTR && errno != EAGAIN ) break; + errno = 0; + } + return size - rest; + } + + +// Returns the number of bytes really written. +// If (returned value < size), it is always an error. +// +int writeblock( const int fd, const uint8_t * const buf, const int size ) + { + int rest = size; + errno = 0; + while( rest > 0 ) + { + const int n = write( fd, buf + size - rest, rest ); + if( n > 0 ) rest -= n; + else if( n < 0 && errno != EINTR && errno != EAGAIN ) break; + errno = 0; + } + return size - rest; + } void xinit( pthread_mutex_t * const mutex ) @@ -109,13 +140,14 @@ void xbroadcast( pthread_cond_t * const cond ) namespace { -long long in_size = 0; -long long out_size = 0; +unsigned long long in_size = 0; +unsigned long long out_size = 0; +const char * const mem_msg = "Not enough memory. Try a smaller dictionary size"; struct Packet // data block with a serial number { - unsigned long long id; // serial number assigned as received + unsigned id; // serial number assigned as received uint8_t * data; int size; // number of bytes in data (if any) }; @@ -124,16 +156,16 @@ struct Packet // data block with a serial number class Packet_courier // moves packets around { public: - unsigned long icheck_counter; - unsigned long iwait_counter; - unsigned long ocheck_counter; - unsigned long owait_counter; + unsigned icheck_counter; + unsigned iwait_counter; + unsigned ocheck_counter; + unsigned owait_counter; private: - unsigned long long receive_id; // id assigned to next packet received - unsigned long long deliver_id; // id of next packet to be delivered + unsigned receive_id; // id assigned to next packet received + unsigned deliver_id; // id of next packet to be delivered Slot_tally slot_tally; // limits the number of input packets std::queue< Packet * > packet_queue; - std::vector< Packet * > circular_buffer; + std::vector< const Packet * > circular_buffer; int num_working; // number of workers still running const int num_slots; // max packets in circulation pthread_mutex_t imutex; @@ -163,12 +195,10 @@ public: xdestroy( &iav_or_eof ); xdestroy( &imutex ); } - const Slot_tally & tally() const { return slot_tally; } - // make a packet with data received from splitter void receive_packet( uint8_t * const data, const int size ) { - Packet * ipacket = new Packet; + Packet * const ipacket = new Packet; ipacket->id = receive_id++; ipacket->data = data; ipacket->size = size; @@ -189,7 +219,6 @@ public: { ++iwait_counter; xwait( &iav_or_eof, &imutex ); - ++icheck_counter; } if( !packet_queue.empty() ) { @@ -197,7 +226,7 @@ public: packet_queue.pop(); } xunlock( &imutex ); - if( ipacket == 0 ) + if( !ipacket ) { // notify muxer when last worker exits xlock( &omutex ); @@ -208,36 +237,43 @@ public: } // collect a packet from a worker - void collect_packet( Packet * const opacket ) + void collect_packet( const Packet * const opacket ) { + const int i = opacket->id%num_slots; xlock( &omutex ); // id collision shouldn't happen - if( circular_buffer[opacket->id%num_slots] != 0 ) + if( circular_buffer[i] != 0 ) internal_error( "id collision in collect_packet" ); // merge packet into circular buffer - circular_buffer[opacket->id%num_slots] = opacket; + circular_buffer[i] = opacket; if( opacket->id == deliver_id ) xsignal( &oav_or_exit ); xunlock( &omutex ); } - // deliver a packet to muxer - Packet * deliver_packet() + // deliver packets to muxer + void deliver_packets( std::vector< const Packet * > & packet_vector ) { xlock( &omutex ); ++ocheck_counter; - while( circular_buffer[deliver_id%num_slots] == 0 && num_working > 0 ) + int i = deliver_id % num_slots; + while( circular_buffer[i] == 0 && num_working > 0 ) { ++owait_counter; xwait( &oav_or_exit, &omutex ); - ++ocheck_counter; } - Packet * opacket = circular_buffer[deliver_id%num_slots]; - circular_buffer[deliver_id%num_slots] = 0; - ++deliver_id; + packet_vector.clear(); + while( true ) + { + const Packet * const opacket = circular_buffer[i]; + if( !opacket ) break; + packet_vector.push_back( opacket ); + circular_buffer[i] = 0; + ++deliver_id; + i = deliver_id % num_slots; + } xunlock( &omutex ); - if( opacket != 0 ) - slot_tally.leave_slot(); // return a slot to the tally - return opacket; + if( packet_vector.size() ) // return slots to the tally + slot_tally.leave_slots( packet_vector.size() ); } void finish() // splitter has no more packets to send @@ -281,12 +317,12 @@ extern "C" void * csplitter( void * arg ) for( bool first_post = true; ; first_post = false ) { uint8_t * const data = new( std::nothrow ) uint8_t[data_size]; - if( data == 0 ) { pp( "Not enough memory" ); fatal(); } + if( !data ) { pp( mem_msg ); fatal(); } const int size = readblock( infd, data, data_size ); if( size != data_size && errno ) { pp(); show_error( "Read error", errno ); fatal(); } - if( size > 0 || first_post ) // first packet can be empty + if( size > 0 || first_post ) // first packet may be empty { in_size += size; courier.receive_packet( data, size ); @@ -325,11 +361,11 @@ extern "C" void * cworker( void * arg ) while( true ) { Packet * const packet = courier.distribute_packet(); - if( packet == 0 ) break; // no more packets to process + if( !packet ) break; // no more packets to process const int max_compr_size = 42 + packet->size + ( ( packet->size + 7 ) / 8 ); uint8_t * const new_data = new( std::nothrow ) uint8_t[max_compr_size]; - if( new_data == 0 ) { pp( "Not enough memory" ); fatal(); } + if( !new_data ) { pp( mem_msg ); fatal(); } const int dict_size = std::max( LZ_min_dictionary_size(), std::min( dictionary_size, packet->size ) ); LZ_Encoder * const encoder = @@ -337,14 +373,14 @@ extern "C" void * cworker( void * arg ) if( !encoder || LZ_compress_errno( encoder ) != LZ_ok ) { if( !encoder || LZ_compress_errno( encoder ) == LZ_mem_error ) - pp( "Not enough memory. Try a smaller dictionary size" ); + pp( mem_msg ); else internal_error( "invalid argument to encoder" ); fatal(); } int written = 0; - int new_size = 0; + int new_pos = 0; while( true ) { if( LZ_compress_write_size( encoder ) > 0 ) @@ -359,8 +395,8 @@ extern "C" void * cworker( void * arg ) if( written >= packet->size ) { delete[] packet->data; LZ_compress_finish( encoder ); } } - const int rd = LZ_compress_read( encoder, new_data + new_size, - max_compr_size - new_size ); + const int rd = LZ_compress_read( encoder, new_data + new_pos, + max_compr_size - new_pos ); if( rd < 0 ) { pp(); @@ -369,8 +405,8 @@ extern "C" void * cworker( void * arg ) LZ_strerror( LZ_compress_errno( encoder ) ) ); fatal(); } - new_size += rd; - if( new_size > max_compr_size ) + new_pos += rd; + if( new_pos > max_compr_size ) internal_error( "packet size exceeded in worker" ); if( LZ_compress_finished( encoder ) == 1 ) break; } @@ -379,7 +415,7 @@ extern "C" void * cworker( void * arg ) { pp( "LZ_compress_close failed" ); fatal(); } packet->data = new_data; - packet->size = new_size; + packet->size = new_pos; courier.collect_packet( packet ); } return 0; @@ -390,21 +426,26 @@ extern "C" void * cworker( void * arg ) // their contents to the output file. void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd ) { + std::vector< const Packet * > packet_vector; while( true ) { - Packet * opacket = courier.deliver_packet(); - if( opacket == 0 ) break; // queue is empty. all workers exited - - out_size += opacket->size; + courier.deliver_packets( packet_vector ); + if( packet_vector.size() == 0 ) break; // all workers exited - if( outfd >= 0 ) + for( unsigned i = 0; i < packet_vector.size(); ++i ) { - const int wr = writeblock( outfd, opacket->data, opacket->size ); - if( wr != opacket->size ) - { pp(); show_error( "Write error", errno ); fatal(); } + const Packet * const opacket = packet_vector[i]; + out_size += opacket->size; + + if( outfd >= 0 ) + { + const int wr = writeblock( outfd, opacket->data, opacket->size ); + if( wr != opacket->size ) + { pp(); show_error( "Write error", errno ); fatal(); } + } + delete[] opacket->data; + delete opacket; } - delete[] opacket->data; - delete opacket; } } @@ -419,11 +460,11 @@ int compress( const int data_size, const int dictionary_size, const Pretty_print & pp, const int debug_level ) { const int slots_per_worker = 2; - const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ? - num_workers * slots_per_worker : INT_MAX ); + const int num_slots = + ( ( num_workers > 1 ) ? num_workers * slots_per_worker : 1 ); in_size = 0; out_size = 0; - Packet_courier courier( num_workers, num_slots - 1 ); + Packet_courier courier( num_workers, num_slots ); Splitter_arg splitter_arg; splitter_arg.courier = &courier; @@ -443,8 +484,7 @@ int compress( const int data_size, const int dictionary_size, worker_arg.match_len_limit = match_len_limit; pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; - if( worker_threads == 0 ) - { pp( "Not enough memory" ); fatal(); } + if( !worker_threads ) { pp( mem_msg ); fatal(); } for( int i = 0; i < num_workers; ++i ) { errcode = pthread_create( worker_threads + i, 0, cworker, &worker_arg ); @@ -460,7 +500,7 @@ int compress( const int data_size, const int dictionary_size, if( errcode ) { show_error( "Can't join worker threads", errcode ); fatal(); } } - delete[] worker_threads; worker_threads = 0; + delete[] worker_threads; errcode = pthread_join( splitter_thread, 0 ); if( errcode ) @@ -468,11 +508,11 @@ int compress( const int data_size, const int dictionary_size, if( verbosity >= 1 ) { - if( in_size <= 0 || out_size <= 0 ) + if( in_size == 0 || out_size == 0 ) std::fprintf( stderr, " no data compressed.\n" ); else std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, " - "%5.2f%% saved, %lld in, %lld out.\n", + "%5.2f%% saved, %llu in, %llu out.\n", (double)in_size / out_size, ( 8.0 * out_size ) / in_size, 100.0 * ( 1.0 - ( (double)out_size / in_size ) ), @@ -481,14 +521,10 @@ int compress( const int data_size, const int dictionary_size, if( debug_level & 1 ) std::fprintf( stderr, - "splitter tried to send a packet %8lu times\n" - "splitter had to wait %8lu times\n" - "any worker tried to consume from splitter %8lu times\n" - "any worker had to wait %8lu times\n" - "muxer tried to consume from workers %8lu times\n" - "muxer had to wait %8lu times\n", - courier.tally().check_counter, - courier.tally().wait_counter, + "any worker tried to consume from splitter %8u times\n" + "any worker had to wait %8u times\n" + "muxer tried to consume from workers %8u times\n" + "muxer had to wait %8u times\n", courier.icheck_counter, courier.iwait_counter, courier.ocheck_counter, @@ -1,6 +1,6 @@ #! /bin/sh # configure script for Plzip - A parallel compressor compatible with lzip -# Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz. +# Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. # # This configure script is free software: you have unlimited permission # to copy, distribute and modify it. @@ -8,9 +8,9 @@ args= no_create= pkgname=plzip -pkgversion=0.8 +pkgversion=1.0-rc1 progname=plzip -srctrigger=plzip.h +srctrigger=doc/plzip.texinfo # clear some things potentially inherited from environment. LC_ALL=C @@ -19,15 +19,22 @@ srcdir= prefix=/usr/local exec_prefix='$(prefix)' bindir='$(exec_prefix)/bin' -datadir='$(prefix)/share' -infodir='$(datadir)/info' -mandir='$(datadir)/man' -sysconfdir='$(prefix)/etc' -CXX= +datarootdir='$(prefix)/share' +infodir='$(datarootdir)/info' +mandir='$(datarootdir)/man' +CXX=g++ CPPFLAGS= CXXFLAGS='-Wall -W -O2' LDFLAGS= +# checking whether we are using GNU C++. +if [ ! -x /bin/g++ ] && + [ ! -x /usr/bin/g++ ] && + [ ! -x /usr/local/bin/g++ ] ; then + CXX=c++ + CXXFLAGS='-W -O2' +fi + # Loop over all args while [ -n "$1" ] ; do @@ -40,12 +47,12 @@ while [ -n "$1" ] ; do # Split out the argument for options that take them case ${option} in - *=*) optarg=`echo ${option} | sed -e 's,^[^=]*=,,'` ;; + *=*) optarg=`echo ${option} | sed -e 's,^[^=]*=,,;s,/$,,'` ;; esac # Process the options case ${option} in - --help | --he* | -h) + --help | -h) echo "Usage: configure [options]" echo echo "Options: [defaults in brackets]" @@ -55,42 +62,31 @@ while [ -n "$1" ] ; do echo " --prefix=DIR install into DIR [${prefix}]" echo " --exec-prefix=DIR base directory for arch-dependent files [${exec_prefix}]" echo " --bindir=DIR user executables directory [${bindir}]" - echo " --datadir=DIR base directory for doc and data [${datadir}]" + echo " --datarootdir=DIR base directory for doc and data [${datarootdir}]" echo " --infodir=DIR info files directory [${infodir}]" echo " --mandir=DIR man pages directory [${mandir}]" - echo " --sysconfdir=DIR read-only single-machine data directory [${sysconfdir}]" echo " CXX=COMPILER C++ compiler to use [g++]" echo " CPPFLAGS=OPTIONS command line options for the preprocessor [${CPPFLAGS}]" echo " CXXFLAGS=OPTIONS command line options for the C++ compiler [${CXXFLAGS}]" echo " LDFLAGS=OPTIONS command line options for the linker [${LDFLAGS}]" echo exit 0 ;; - --version | --ve* | -V) + --version | -V) echo "Configure script for ${pkgname} version ${pkgversion}" exit 0 ;; - --srcdir* | --sr*) - srcdir=`echo ${optarg} | sed -e 's,/$,,'` ;; - --prefix* | --pr*) - prefix=`echo ${optarg} | sed -e 's,/$,,'` ;; - --exec-prefix* | --ex*) - exec_prefix=`echo ${optarg} | sed -e 's,/$,,'` ;; - --bindir* | --bi*) - bindir=`echo ${optarg} | sed -e 's,/$,,'` ;; - --datadir* | --da*) - datadir=`echo ${optarg} | sed -e 's,/$,,'` ;; - --infodir* | --inf*) - infodir=`echo ${optarg} | sed -e 's,/$,,'` ;; - --mandir* | --ma*) - mandir=`echo ${optarg} | sed -e 's,/$,,'` ;; - --sysconfdir* | --sy*) - sysconfdir=`echo ${optarg} | sed -e 's,/$,,'` ;; - --no-create | --no-c*) - no_create=yes ;; - - CXX=*) CXX=${optarg} ;; + --srcdir=*) srcdir=${optarg} ;; + --prefix=*) prefix=${optarg} ;; + --exec-prefix=*) exec_prefix=${optarg} ;; + --bindir=*) bindir=${optarg} ;; + --datarootdir=*) datarootdir=${optarg} ;; + --infodir=*) infodir=${optarg} ;; + --mandir=*) mandir=${optarg} ;; + --no-create) no_create=yes ;; + + CXX=*) CXX=${optarg} ;; CPPFLAGS=*) CPPFLAGS=${optarg} ;; CXXFLAGS=*) CXXFLAGS=${optarg} ;; - LDFLAGS=*) LDFLAGS=${optarg} ;; + LDFLAGS=*) LDFLAGS=${optarg} ;; --* | *=* | *-*-*) ;; *) @@ -103,14 +99,14 @@ done srcdirtext= if [ -z "${srcdir}" ] ; then srcdirtext="or . or .." ; srcdir=. - if [ ! -r ${srcdir}/${srctrigger} ] ; then srcdir=.. ; fi - if [ ! -r ${srcdir}/${srctrigger} ] ; then + if [ ! -r "${srcdir}/${srctrigger}" ] ; then srcdir=.. ; fi + if [ ! -r "${srcdir}/${srctrigger}" ] ; then ## the sed command below emulates the dirname command srcdir=`echo $0 | sed -e 's,[^/]*$,,;s,/$,,;s,^$,.,'` fi fi -if [ ! -r ${srcdir}/${srctrigger} ] ; then +if [ ! -r "${srcdir}/${srctrigger}" ] ; then exec 1>&2 echo echo "configure: Can't find sources in ${srcdir} ${srcdirtext}" @@ -119,18 +115,7 @@ if [ ! -r ${srcdir}/${srctrigger} ] ; then fi # Set srcdir to . if that's what it is. -if [ "`pwd`" = "`cd ${srcdir} ; pwd`" ] ; then srcdir=. ; fi - -# checking whether we are using GNU C++. -if [ -z "${CXX}" ] ; then # Let the user override the test. - if [ -x /bin/g++ ] || - [ -x /usr/bin/g++ ] || - [ -x /usr/local/bin/g++ ] ; then - CXX="g++" - else - CXX="c++" - fi -fi +if [ "`pwd`" = "`cd "${srcdir}" ; pwd`" ] ; then srcdir=. ; fi echo if [ -z "${no_create}" ] ; then @@ -154,10 +139,9 @@ echo "VPATH = ${srcdir}" echo "prefix = ${prefix}" echo "exec_prefix = ${exec_prefix}" echo "bindir = ${bindir}" -echo "datadir = ${datadir}" +echo "datarootdir = ${datarootdir}" echo "infodir = ${infodir}" echo "mandir = ${mandir}" -echo "sysconfdir = ${sysconfdir}" echo "CXX = ${CXX}" echo "CPPFLAGS = ${CPPFLAGS}" echo "CXXFLAGS = ${CXXFLAGS}" @@ -165,7 +149,7 @@ echo "LDFLAGS = ${LDFLAGS}" rm -f Makefile cat > Makefile << EOF # Makefile for Plzip - A parallel compressor compatible with lzip -# Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz. +# Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. # This file was generated automatically by configure. Do not edit. # # This Makefile is free software: you have unlimited permission @@ -178,16 +162,15 @@ VPATH = ${srcdir} prefix = ${prefix} exec_prefix = ${exec_prefix} bindir = ${bindir} -datadir = ${datadir} +datarootdir = ${datarootdir} infodir = ${infodir} mandir = ${mandir} -sysconfdir = ${sysconfdir} CXX = ${CXX} CPPFLAGS = ${CPPFLAGS} CXXFLAGS = ${CXXFLAGS} LDFLAGS = ${LDFLAGS} EOF -cat ${srcdir}/Makefile.in >> Makefile +cat "${srcdir}/Makefile.in" >> Makefile echo "OK. Now you can run make." echo "If make fails, verify that the lzlib compression library is correctly" diff --git a/dec_stdout.cc b/dec_stdout.cc new file mode 100644 index 0000000..36be19b --- /dev/null +++ b/dec_stdout.cc @@ -0,0 +1,331 @@ +/* Plzip - A parallel compressor compatible with lzip + Copyright (C) 2009 Laszlo Ersek. + Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#define _FILE_OFFSET_BITS 64 + +#include <algorithm> +#include <cerrno> +#include <climits> +#include <csignal> +#include <cstdio> +#include <cstdlib> +#include <cstring> +#include <queue> +#include <string> +#include <vector> +#include <pthread.h> +#include <stdint.h> +#include <unistd.h> +#include <lzlib.h> + +#include "lzip.h" +#include "file_index.h" + + +namespace { + +enum { max_packet_size = 1 << 20 }; + + +struct Packet // data block + { + uint8_t * data; // data == 0 means end of member + int size; // number of bytes in data (if any) + }; + + +class Packet_courier // moves packets around + { +public: + unsigned ocheck_counter; + unsigned owait_counter; +private: + int deliver_worker_id; // worker queue currently delivering packets + std::vector< std::queue< Packet * > > opacket_queues; + int num_working; // number of workers still running + const int num_workers; // number of workers + const int num_slots; // max output packets in circulation + int num_free; // remaining free output slots + pthread_mutex_t omutex; + pthread_cond_t oav_or_exit; // output packet available or all workers exited + pthread_cond_t slot_av; // free output slot available + + Packet_courier( const Packet_courier & ); // declared as private + void operator=( const Packet_courier & ); // declared as private + +public: + Packet_courier( const int workers, const int slots ) + : ocheck_counter( 0 ), owait_counter( 0 ), + deliver_worker_id( 0 ), + opacket_queues( workers ), num_working( workers ), + num_workers( workers ), num_slots( 8 * slots ), num_free( num_slots ) + { xinit( &omutex ); xinit( &oav_or_exit ); xinit( &slot_av ); } + + ~Packet_courier() + { xdestroy( &slot_av ); xdestroy( &oav_or_exit ); xdestroy( &omutex ); } + + void worker_finished() + { + // notify muxer when last worker exits + xlock( &omutex ); + if( --num_working == 0 ) xsignal( &oav_or_exit ); + xunlock( &omutex ); + } + + // collect a packet from a worker + void collect_packet( Packet * const opacket, const int worker_id ) + { + xlock( &omutex ); + if( opacket->data ) + { + while( worker_id != deliver_worker_id && num_free <= 0 ) + xwait( &slot_av, &omutex ); + --num_free; + } + opacket_queues[worker_id].push( opacket ); + if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit ); + xunlock( &omutex ); + } + + // deliver a packet to muxer + // if packet data == 0, move to next queue and wait again + Packet * deliver_packet() + { + Packet * opacket = 0; + xlock( &omutex ); + ++ocheck_counter; + while( true ) + { + while( opacket_queues[deliver_worker_id].empty() && num_working > 0 ) + { + ++owait_counter; + xwait( &oav_or_exit, &omutex ); + } + if( opacket_queues[deliver_worker_id].empty() ) break; + opacket = opacket_queues[deliver_worker_id].front(); + opacket_queues[deliver_worker_id].pop(); + if( opacket->data ) + { + if( ++num_free == 1 ) xsignal( &slot_av ); + break; + } + if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0; + xbroadcast( &slot_av ); // restart deliver_worker_id thread + delete opacket; opacket = 0; + } + xunlock( &omutex ); + return opacket; + } + + bool finished() // all packets delivered to muxer + { + if( num_free != num_slots || num_working != 0 ) return false; + for( int i = 0; i < num_workers; ++i ) + if( !opacket_queues[i].empty() ) return false; + return true; + } + }; + + +struct Worker_arg + { + const File_index * file_index; + Packet_courier * courier; + const Pretty_print * pp; + int worker_id; + int num_workers; + int infd; + }; + + + // read members from file, decompress their contents, and + // give the produced packets to courier. +extern "C" void * dworker_o( void * arg ) + { + const Worker_arg & tmp = *(Worker_arg *)arg; + const File_index & file_index = *tmp.file_index; + Packet_courier & courier = *tmp.courier; + const Pretty_print & pp = *tmp.pp; + const int worker_id = tmp.worker_id; + const int num_workers = tmp.num_workers; + const int infd = tmp.infd; + const int buffer_size = 65536; + + uint8_t * new_data = new( std::nothrow ) uint8_t[max_packet_size]; + uint8_t * const ibuffer = new( std::nothrow ) uint8_t[buffer_size]; + LZ_Decoder * const decoder = LZ_decompress_open(); + if( !new_data || !ibuffer || !decoder || + LZ_decompress_errno( decoder ) != LZ_ok ) + { pp( "Not enough memory" ); fatal(); } + int new_pos = 0; + + for( int i = worker_id; i < file_index.members(); i += num_workers ) + { + long long member_pos = file_index.mblock( i ).pos(); + long long member_rest = file_index.mblock( i ).size(); + + while( member_rest > 0 ) + { + while( LZ_decompress_write_size( decoder ) > 0 ) + { + const int size = std::min( LZ_decompress_write_size( decoder ), + (int)std::min( (long long)buffer_size, member_rest ) ); + if( size > 0 ) + { + if( preadblock( infd, ibuffer, size, member_pos ) != size ) + { pp(); show_error( "Read error", errno ); fatal(); } + member_pos += size; + member_rest -= size; + if( LZ_decompress_write( decoder, ibuffer, size ) != size ) + internal_error( "library error (LZ_decompress_write)" ); + } + if( member_rest <= 0 ) { LZ_decompress_finish( decoder ); break; } + } + while( true ) // read and pack decompressed data + { + const int rd = LZ_decompress_read( decoder, new_data + new_pos, + max_packet_size - new_pos ); + if( rd < 0 ) + fatal( decompress_read_error( decoder, pp, worker_id ) ); + new_pos += rd; + if( new_pos > max_packet_size ) + internal_error( "opacket size exceeded in worker" ); + if( new_pos == max_packet_size || + LZ_decompress_finished( decoder ) == 1 ) + { + if( new_pos > 0 ) // make data packet + { + Packet * opacket = new Packet; + opacket->data = new_data; + opacket->size = new_pos; + courier.collect_packet( opacket, worker_id ); + new_pos = 0; + new_data = new( std::nothrow ) uint8_t[max_packet_size]; + if( !new_data ) { pp( "Not enough memory" ); fatal(); } + } + if( LZ_decompress_finished( decoder ) == 1 ) + { + LZ_decompress_reset( decoder ); // prepare for new member + Packet * opacket = new Packet; // end of member token + opacket->data = 0; + opacket->size = 0; + courier.collect_packet( opacket, worker_id ); + break; + } + } + if( rd == 0 ) break; + } + } + } + + delete[] ibuffer; delete[] new_data; + if( LZ_decompress_member_position( decoder ) != 0 ) + { pp( "Error, some data remains in decoder" ); fatal(); } + if( LZ_decompress_close( decoder ) < 0 ) + { pp( "LZ_decompress_close failed" ); fatal(); } + courier.worker_finished(); + return 0; + } + + + // get from courier the processed and sorted packets, and write + // their contents to the output file. +void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd ) + { + while( true ) + { + Packet * opacket = courier.deliver_packet(); + if( !opacket ) break; // queue is empty. all workers exited + + if( outfd >= 0 ) + { + const int wr = writeblock( outfd, opacket->data, opacket->size ); + if( wr != opacket->size ) + { pp(); show_error( "Write error", errno ); fatal(); } + } + delete[] opacket->data; + delete opacket; + } + } + +} // end namespace + + + // init the courier, then start the workers and call the muxer. +int dec_stdout( const int num_workers, const int infd, const int outfd, + const Pretty_print & pp, const int debug_level, + const File_index & file_index ) + { + const int slots_per_worker = 2; + const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ? + num_workers * slots_per_worker : INT_MAX ); + + Packet_courier courier( num_workers, num_slots ); + + Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers]; + pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; + if( !worker_args || !worker_threads ) + { pp( "Not enough memory" ); fatal(); } + for( int i = 0; i < num_workers; ++i ) + { + worker_args[i].file_index = &file_index; + worker_args[i].courier = &courier; + worker_args[i].pp = &pp; + worker_args[i].worker_id = i; + worker_args[i].num_workers = num_workers; + worker_args[i].infd = infd; + const int errcode = + pthread_create( &worker_threads[i], 0, dworker_o, &worker_args[i] ); + if( errcode ) + { show_error( "Can't create worker threads", errcode ); fatal(); } + } + + muxer( courier, pp, outfd ); + + for( int i = num_workers - 1; i >= 0; --i ) + { + const int errcode = pthread_join( worker_threads[i], 0 ); + if( errcode ) + { show_error( "Can't join worker threads", errcode ); fatal(); } + } + delete[] worker_threads; + delete[] worker_args; + + const unsigned long long in_size = file_index.file_end(); + const unsigned long long out_size = file_index.data_end(); + if( verbosity >= 2 && out_size > 0 && in_size > 0 ) + std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, %5.2f%% saved. ", + (double)out_size / in_size, + ( 8.0 * in_size ) / out_size, + 100.0 * ( 1.0 - ( (double)in_size / out_size ) ) ); + if( verbosity >= 3 ) + std::fprintf( stderr, "decompressed size %9llu, size %9llu. ", + out_size, in_size ); + + if( verbosity >= 1 ) std::fprintf( stderr, "done\n" ); + + if( debug_level & 1 ) + std::fprintf( stderr, + "muxer tried to consume from workers %8u times\n" + "muxer had to wait %8u times\n", + courier.ocheck_counter, + courier.owait_counter ); + + if( !courier.finished() ) internal_error( "courier not finished" ); + return 0; + } diff --git a/dec_stream.cc b/dec_stream.cc new file mode 100644 index 0000000..91659da --- /dev/null +++ b/dec_stream.cc @@ -0,0 +1,520 @@ +/* Plzip - A parallel compressor compatible with lzip + Copyright (C) 2009 Laszlo Ersek. + Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#define _FILE_OFFSET_BITS 64 + +#include <algorithm> +#include <cerrno> +#include <climits> +#include <csignal> +#include <cstdio> +#include <cstdlib> +#include <cstring> +#include <queue> +#include <string> +#include <vector> +#include <pthread.h> +#include <stdint.h> +#include <unistd.h> +#include <lzlib.h> + +#include "lzip.h" + + +namespace { + +enum { max_packet_size = 1 << 20 }; +unsigned long long in_size = 0; +unsigned long long out_size = 0; + + +struct Packet // data block + { + uint8_t * data; // data == 0 means end of member + int size; // number of bytes in data (if any) + }; + + +class Packet_courier // moves packets around + { +public: + unsigned icheck_counter; + unsigned iwait_counter; + unsigned ocheck_counter; + unsigned owait_counter; +private: + int receive_worker_id; // worker queue currently receiving packets + int deliver_worker_id; // worker queue currently delivering packets + Slot_tally slot_tally; // limits the number of input packets + std::vector< std::queue< Packet * > > ipacket_queues; + std::vector< std::queue< Packet * > > opacket_queues; + int num_working; // number of workers still running + const int num_workers; // number of workers + const int num_slots; // max output packets in circulation + int num_free; // remaining free output slots + pthread_mutex_t imutex; + pthread_cond_t iav_or_eof; // input packet available or splitter done + pthread_mutex_t omutex; + pthread_cond_t oav_or_exit; // output packet available or all workers exited + pthread_cond_t slot_av; // free output slot available + bool eof; // splitter done + + Packet_courier( const Packet_courier & ); // declared as private + void operator=( const Packet_courier & ); // declared as private + +public: + Packet_courier( const int workers, const int slots ) + : icheck_counter( 0 ), iwait_counter( 0 ), + ocheck_counter( 0 ), owait_counter( 0 ), + receive_worker_id( 0 ), deliver_worker_id( 0 ), + slot_tally( slots ), ipacket_queues( workers ), + opacket_queues( workers ), num_working( workers ), + num_workers( workers ), num_slots( 8 * slots ), num_free( num_slots ), + eof( false ) + { + xinit( &imutex ); xinit( &iav_or_eof ); + xinit( &omutex ); xinit( &oav_or_exit ); xinit( &slot_av ); + } + + ~Packet_courier() + { + xdestroy( &slot_av ); xdestroy( &oav_or_exit ); xdestroy( &omutex ); + xdestroy( &iav_or_eof ); xdestroy( &imutex ); + } + + // make a packet with data received from splitter + // if data == 0, move to next queue + void receive_packet( uint8_t * const data, const int size ) + { + Packet * ipacket = new Packet; + ipacket->data = data; + ipacket->size = size; + if( data ) + { in_size += size; slot_tally.get_slot(); } // wait for a free slot + xlock( &imutex ); + ipacket_queues[receive_worker_id].push( ipacket ); + xbroadcast( &iav_or_eof ); + xunlock( &imutex ); + if( !data && ++receive_worker_id >= num_workers ) + receive_worker_id = 0; + } + + // distribute a packet to a worker + Packet * distribute_packet( const int worker_id ) + { + Packet * ipacket = 0; + xlock( &imutex ); + ++icheck_counter; + while( ipacket_queues[worker_id].empty() && !eof ) + { + ++iwait_counter; + xwait( &iav_or_eof, &imutex ); + } + if( !ipacket_queues[worker_id].empty() ) + { + ipacket = ipacket_queues[worker_id].front(); + ipacket_queues[worker_id].pop(); + } + xunlock( &imutex ); + if( ipacket ) + { if( ipacket->data ) slot_tally.leave_slot(); } + else + { + // notify muxer when last worker exits + xlock( &omutex ); + if( --num_working == 0 ) xsignal( &oav_or_exit ); + xunlock( &omutex ); + } + return ipacket; + } + + // collect a packet from a worker + void collect_packet( Packet * const opacket, const int worker_id ) + { + xlock( &omutex ); + if( opacket->data ) + { + while( worker_id != deliver_worker_id && num_free <= 0 ) + xwait( &slot_av, &omutex ); + --num_free; + } + opacket_queues[worker_id].push( opacket ); + if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit ); + xunlock( &omutex ); + } + + // deliver a packet to muxer + // if packet data == 0, move to next queue and wait again + Packet * deliver_packet() + { + Packet * opacket = 0; + xlock( &omutex ); + ++ocheck_counter; + while( true ) + { + while( opacket_queues[deliver_worker_id].empty() && num_working > 0 ) + { + ++owait_counter; + xwait( &oav_or_exit, &omutex ); + } + if( opacket_queues[deliver_worker_id].empty() ) break; + opacket = opacket_queues[deliver_worker_id].front(); + opacket_queues[deliver_worker_id].pop(); + if( opacket->data ) + { + if( ++num_free == 1 ) xsignal( &slot_av ); + break; + } + if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0; + xbroadcast( &slot_av ); // restart deliver_worker_id thread + delete opacket; opacket = 0; + } + xunlock( &omutex ); + return opacket; + } + + void finish() // splitter has no more packets to send + { + xlock( &imutex ); + eof = true; + xbroadcast( &iav_or_eof ); + xunlock( &imutex ); + } + + bool finished() // all packets delivered to muxer + { + if( !slot_tally.all_free() || + num_free != num_slots || !eof || num_working != 0 ) return false; + for( int i = 0; i < num_workers; ++i ) + if( !ipacket_queues[i].empty() ) return false; + for( int i = 0; i < num_workers; ++i ) + if( !opacket_queues[i].empty() ) return false; + return true; + } + }; + + +// Search forward from 'pos' for "LZIP" (Boyer-Moore algorithm) +// Return pos of found string or 'pos+size' if not found. +// +int find_magic( const uint8_t * const buffer, const int pos, const int size ) + { + const uint8_t table[256] = { + 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4, + 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4, + 4,4,4,4,4,4,4,4,4,1,4,4,3,4,4,4,4,4,4,4,4,4,4,4,4,4,2,4,4,4,4,4, + 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4, + 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4, + 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4, + 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4, + 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4 }; + + for( int i = pos; i <= pos + size - 4; i += table[buffer[i+3]] ) + if( buffer[i] == 'L' && buffer[i+1] == 'Z' && + buffer[i+2] == 'I' && buffer[i+3] == 'P' ) + return i; // magic string found + return pos + size; + } + + +struct Splitter_arg + { + Packet_courier * courier; + const Pretty_print * pp; + int infd; + }; + + + // split data from input file into chunks and pass them to + // courier for packaging and distribution to workers. +extern "C" void * dsplitter_s( void * arg ) + { + const Splitter_arg & tmp = *(Splitter_arg *)arg; + Packet_courier & courier = *tmp.courier; + const Pretty_print & pp = *tmp.pp; + const int infd = tmp.infd; + const int hsize = 6; // header size + const int tsize = 20; // trailer size + const int buffer_size = max_packet_size; + const int base_buffer_size = tsize + buffer_size + hsize; + uint8_t * const base_buffer = new( std::nothrow ) uint8_t[base_buffer_size]; + if( !base_buffer ) { pp( "Not enough memory" ); fatal(); } + uint8_t * const buffer = base_buffer + tsize; + + int size = readblock( infd, buffer, buffer_size + hsize ) - hsize; + bool at_stream_end = ( size < buffer_size ); + if( size != buffer_size && errno ) + { pp(); show_error( "Read error", errno ); fatal(); } + if( size <= tsize ) + { pp( "Error reading member header" ); fatal(); } + if( find_magic( buffer, 0, 4 ) != 0 ) + { pp( "Bad magic number (file not in lzip format)" ); fatal(); } + + unsigned long long partial_member_size = 0; + while( true ) + { + int pos = 0; + for( int newpos = 1; newpos <= size; ++newpos ) + { + newpos = find_magic( buffer, newpos, size + 4 - newpos ); + if( newpos <= size ) + { + unsigned long long member_size = 0; + for( int i = 1; i <= 8; ++i ) + { member_size <<= 8; member_size += base_buffer[tsize+newpos-i]; } + if( partial_member_size + newpos - pos == member_size ) + { // header found + uint8_t * const data = new( std::nothrow ) uint8_t[newpos - pos]; + if( !data ) { pp( "Not enough memory" ); fatal(); } + std::memcpy( data, buffer + pos, newpos - pos ); + courier.receive_packet( data, newpos - pos ); + courier.receive_packet( 0, 0 ); // end of member token + partial_member_size = 0; + pos = newpos; + } + } + } + + if( at_stream_end ) + { + uint8_t * data = new( std::nothrow ) uint8_t[size + hsize - pos]; + if( !data ) { pp( "Not enough memory" ); fatal(); } + std::memcpy( data, buffer + pos, size + hsize - pos ); + courier.receive_packet( data, size + hsize - pos ); + courier.receive_packet( 0, 0 ); // end of member token + break; + } + if( pos < buffer_size ) + { + partial_member_size += buffer_size - pos; + uint8_t * data = new( std::nothrow ) uint8_t[buffer_size - pos]; + if( !data ) { pp( "Not enough memory" ); fatal(); } + std::memcpy( data, buffer + pos, buffer_size - pos ); + courier.receive_packet( data, buffer_size - pos ); + } + std::memcpy( base_buffer, base_buffer + buffer_size, tsize + hsize ); + size = readblock( infd, buffer + hsize, buffer_size ); + at_stream_end = ( size < buffer_size ); + if( size != buffer_size && errno ) + { pp(); show_error( "Read error", errno ); fatal(); } + } + delete[] base_buffer; + courier.finish(); // no more packets to send + return 0; + } + + +struct Worker_arg + { + Packet_courier * courier; + const Pretty_print * pp; + int worker_id; + }; + + + // consume packets from courier, decompress their contents, and + // give the produced packets to courier. +extern "C" void * dworker_s( void * arg ) + { + const Worker_arg & tmp = *(Worker_arg *)arg; + Packet_courier & courier = *tmp.courier; + const Pretty_print & pp = *tmp.pp; + const int worker_id = tmp.worker_id; + + uint8_t * new_data = new( std::nothrow ) uint8_t[max_packet_size]; + LZ_Decoder * const decoder = LZ_decompress_open(); + if( !new_data || !decoder || LZ_decompress_errno( decoder ) != LZ_ok ) + { pp( "Not enough memory" ); fatal(); } + int new_pos = 0; + bool trailing_garbage_found = false; + + while( true ) + { + const Packet * const ipacket = courier.distribute_packet( worker_id ); + if( !ipacket ) break; // no more packets to process + if( !ipacket->data ) LZ_decompress_finish( decoder ); + + int written = 0; + while( !trailing_garbage_found ) + { + if( LZ_decompress_write_size( decoder ) > 0 && written < ipacket->size ) + { + const int wr = LZ_decompress_write( decoder, ipacket->data + written, + ipacket->size - written ); + if( wr < 0 ) internal_error( "library error (LZ_decompress_write)" ); + written += wr; + if( written > ipacket->size ) + internal_error( "ipacket size exceeded in worker" ); + } + while( !trailing_garbage_found ) // read and pack decompressed data + { + const int rd = LZ_decompress_read( decoder, new_data + new_pos, + max_packet_size - new_pos ); + if( rd < 0 ) + { + if( LZ_decompress_errno( decoder ) == LZ_header_error ) + trailing_garbage_found = true; + else + fatal( decompress_read_error( decoder, pp, worker_id ) ); + } + else new_pos += rd; + if( new_pos > max_packet_size ) + internal_error( "opacket size exceeded in worker" ); + if( new_pos == max_packet_size || trailing_garbage_found || + LZ_decompress_finished( decoder ) == 1 ) + { + if( new_pos > 0 ) // make data packet + { + Packet * opacket = new Packet; + opacket->data = new_data; + opacket->size = new_pos; + courier.collect_packet( opacket, worker_id ); + new_pos = 0; + new_data = new( std::nothrow ) uint8_t[max_packet_size]; + if( !new_data ) { pp( "Not enough memory" ); fatal(); } + } + if( trailing_garbage_found || + LZ_decompress_finished( decoder ) == 1 ) + { + LZ_decompress_reset( decoder ); // prepare for new ipacket + Packet * opacket = new Packet; // end of member token + opacket->data = 0; + opacket->size = 0; + courier.collect_packet( opacket, worker_id ); + break; + } + } + if( rd == 0 ) break; + } + if( !ipacket->data || written == ipacket->size ) break; + } + if( ipacket->data ) delete[] ipacket->data; + delete ipacket; + } + + delete[] new_data; + if( LZ_decompress_member_position( decoder ) != 0 ) + { pp( "Error, some data remains in decoder" ); fatal(); } + if( LZ_decompress_close( decoder ) < 0 ) + { pp( "LZ_decompress_close failed" ); fatal(); } + return 0; + } + + + // get from courier the processed and sorted packets, and write + // their contents to the output file. +void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd ) + { + while( true ) + { + Packet * opacket = courier.deliver_packet(); + if( !opacket ) break; // queue is empty. all workers exited + + out_size += opacket->size; + + if( outfd >= 0 ) + { + const int wr = writeblock( outfd, opacket->data, opacket->size ); + if( wr != opacket->size ) + { pp(); show_error( "Write error", errno ); fatal(); } + } + delete[] opacket->data; + delete opacket; + } + } + +} // end namespace + + + // init the courier, then start the splitter and the workers and + // call the muxer. +int dec_stream( const int num_workers, const int infd, const int outfd, + const Pretty_print & pp, const int debug_level, + const bool testing ) + { + const int slots_per_worker = 2; + const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ? + num_workers * slots_per_worker : INT_MAX ); + in_size = 0; + out_size = 0; + Packet_courier courier( num_workers, num_slots ); + + Splitter_arg splitter_arg; + splitter_arg.courier = &courier; + splitter_arg.pp = &pp; + splitter_arg.infd = infd; + + pthread_t splitter_thread; + int errcode = pthread_create( &splitter_thread, 0, dsplitter_s, &splitter_arg ); + if( errcode ) + { show_error( "Can't create splitter thread", errcode ); fatal(); } + + Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers]; + pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; + if( !worker_args || !worker_threads ) + { pp( "Not enough memory" ); fatal(); } + for( int i = 0; i < num_workers; ++i ) + { + worker_args[i].courier = &courier; + worker_args[i].pp = &pp; + worker_args[i].worker_id = i; + errcode = pthread_create( &worker_threads[i], 0, dworker_s, &worker_args[i] ); + if( errcode ) + { show_error( "Can't create worker threads", errcode ); fatal(); } + } + + muxer( courier, pp, outfd ); + + for( int i = num_workers - 1; i >= 0; --i ) + { + errcode = pthread_join( worker_threads[i], 0 ); + if( errcode ) + { show_error( "Can't join worker threads", errcode ); fatal(); } + } + delete[] worker_threads; + delete[] worker_args; + + errcode = pthread_join( splitter_thread, 0 ); + if( errcode ) + { show_error( "Can't join splitter thread", errcode ); fatal(); } + + if( verbosity >= 2 && out_size > 0 && in_size > 0 ) + std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, %5.2f%% saved. ", + (double)out_size / in_size, + ( 8.0 * in_size ) / out_size, + 100.0 * ( 1.0 - ( (double)in_size / out_size ) ) ); + if( verbosity >= 3 ) + std::fprintf( stderr, "decompressed size %9llu, size %9llu. ", + out_size, in_size ); + + if( verbosity >= 1 ) std::fprintf( stderr, testing ? "ok\n" : "done\n" ); + + if( debug_level & 1 ) + std::fprintf( stderr, + "any worker tried to consume from splitter %8u times\n" + "any worker had to wait %8u times\n" + "muxer tried to consume from workers %8u times\n" + "muxer had to wait %8u times\n", + courier.icheck_counter, + courier.iwait_counter, + courier.ocheck_counter, + courier.owait_counter ); + + if( !courier.finished() ) internal_error( "courier not finished" ); + return 0; + } diff --git a/decompress.cc b/decompress.cc index ef098ae..c861b4d 100644 --- a/decompress.cc +++ b/decompress.cc @@ -1,6 +1,6 @@ /* Plzip - A parallel compressor compatible with lzip Copyright (C) 2009 Laszlo Ersek. - Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz. + Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -25,387 +25,164 @@ #include <cstdio> #include <cstdlib> #include <cstring> -#include <queue> #include <string> #include <vector> -#include <inttypes.h> #include <pthread.h> +#include <stdint.h> #include <unistd.h> +#include <sys/stat.h> #include <lzlib.h> -#include "plzip.h" +#include "lzip.h" +#include "file_index.h" -namespace { - -enum { max_packet_size = 1 << 20 }; -long long in_size = 0; -long long out_size = 0; - - -struct Packet // data block - { - uint8_t * data; // data == 0 means end of member - int size; // number of bytes in data (if any) - }; - - -class Packet_courier // moves packets around +// Returns the number of bytes really read. +// If (returned value < size) and (errno == 0), means EOF was reached. +// +int preadblock( const int fd, uint8_t * const buf, const int size, + const long long pos ) { -public: - unsigned long icheck_counter; - unsigned long iwait_counter; - unsigned long ocheck_counter; - unsigned long owait_counter; -private: - int receive_worker_id; // worker queue currently receiving packets - int deliver_worker_id; // worker queue currently delivering packets - Slot_tally slot_tally; // limits the number of input packets - std::vector< std::queue< Packet * > > ipacket_queues; - std::vector< std::queue< Packet * > > opacket_queues; - int num_working; // number of workers still running - const int num_workers; // number of workers - int num_free; // remaining free output slots - pthread_mutex_t imutex; - pthread_cond_t iav_or_eof; // input packet available or splitter done - pthread_mutex_t omutex; - pthread_cond_t oav_or_exit; // output packet available or all workers exited - pthread_cond_t slot_av; // free output slot available - bool eof; // splitter done - - Packet_courier( const Packet_courier & ); // declared as private - void operator=( const Packet_courier & ); // declared as private - -public: - Packet_courier( const int workers, const int slots ) - : icheck_counter( 0 ), iwait_counter( 0 ), - ocheck_counter( 0 ), owait_counter( 0 ), - receive_worker_id( 0 ), deliver_worker_id( 0 ), - slot_tally( slots ), ipacket_queues( workers ), - opacket_queues( workers ), num_working( workers ), - num_workers( workers ), num_free( 8 * slots ), eof( false ) - { - xinit( &imutex ); xinit( &iav_or_eof ); - xinit( &omutex ); xinit( &oav_or_exit ); xinit( &slot_av ); - } - - ~Packet_courier() - { - xdestroy( &slot_av ); xdestroy( &oav_or_exit ); xdestroy( &omutex ); - xdestroy( &iav_or_eof ); xdestroy( &imutex ); - } - - const Slot_tally & tally() const { return slot_tally; } - - // make a packet with data received from splitter - // if data == 0, move to next queue - void receive_packet( uint8_t * const data, const int size ) + int rest = size; + errno = 0; + while( rest > 0 ) { - Packet * ipacket = new Packet; - ipacket->data = data; - ipacket->size = size; - if( data != 0 ) - { in_size += size; slot_tally.get_slot(); } // wait for a free slot - xlock( &imutex ); - ipacket_queues[receive_worker_id].push( ipacket ); - xbroadcast( &iav_or_eof ); - xunlock( &imutex ); - if( data == 0 && ++receive_worker_id >= num_workers ) - receive_worker_id = 0; + const int n = pread( fd, buf + size - rest, rest, pos + size - rest ); + if( n > 0 ) rest -= n; + else if( n == 0 ) break; // EOF + else if( errno != EINTR && errno != EAGAIN ) break; + errno = 0; } - - // distribute a packet to a worker - Packet * distribute_packet( const int worker_id ) - { - Packet * ipacket = 0; - xlock( &imutex ); - ++icheck_counter; - while( ipacket_queues[worker_id].empty() && !eof ) - { - ++iwait_counter; - xwait( &iav_or_eof, &imutex ); - ++icheck_counter; - } - if( !ipacket_queues[worker_id].empty() ) - { - ipacket = ipacket_queues[worker_id].front(); - ipacket_queues[worker_id].pop(); - } - xunlock( &imutex ); - if( ipacket != 0 ) - { if( ipacket->data != 0 ) slot_tally.leave_slot(); } - else - { - // notify muxer when last worker exits - xlock( &omutex ); - if( --num_working == 0 ) xsignal( &oav_or_exit ); - xunlock( &omutex ); - } - return ipacket; - } - - // collect a packet from a worker - void collect_packet( Packet * const opacket, const int worker_id ) - { - xlock( &omutex ); - if( opacket->data != 0 ) - { - while( worker_id != deliver_worker_id && num_free <= 0 ) - xwait( &slot_av, &omutex ); - --num_free; - } - opacket_queues[worker_id].push( opacket ); - if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit ); - xunlock( &omutex ); - } - - // deliver a packet to muxer - // if packet data == 0, move to next queue and wait again - Packet * deliver_packet() - { - Packet * opacket = 0; - xlock( &omutex ); - ++ocheck_counter; - while( true ) - { - while( opacket_queues[deliver_worker_id].empty() && num_working > 0 ) - { - ++owait_counter; - xwait( &oav_or_exit, &omutex ); - ++ocheck_counter; - } - if( opacket_queues[deliver_worker_id].empty() ) break; - opacket = opacket_queues[deliver_worker_id].front(); - opacket_queues[deliver_worker_id].pop(); - if( opacket->data != 0 ) - { - if( ++num_free == 1 ) xsignal( &slot_av ); - break; - } - if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0; - xbroadcast( &slot_av ); // restart deliver_worker_id thread - delete opacket; opacket = 0; - } - xunlock( &omutex ); - return opacket; - } - - void finish() // splitter has no more packets to send - { - xlock( &imutex ); - eof = true; - xbroadcast( &iav_or_eof ); - xunlock( &imutex ); - } - - bool finished() // all packets delivered to muxer - { - if( !slot_tally.all_free() || !eof || num_working != 0 ) return false; - for( int i = 0; i < num_workers; ++i ) - if( !ipacket_queues[i].empty() ) return false; - for( int i = 0; i < num_workers; ++i ) - if( !opacket_queues[i].empty() ) return false; - return true; - } - }; + return size - rest; + } -// Search forward from 'pos' for "LZIP" (Boyer-Moore algorithm) -// Return pos of found string or 'pos+size' if not found. +// Returns the number of bytes really written. +// If (returned value < size), it is always an error. // -int find_magic( const uint8_t * const buffer, const int pos, const int size ) throw() +int pwriteblock( const int fd, const uint8_t * const buf, const int size, + const long long pos ) { - const uint8_t table[256] = { - 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4, - 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4, - 4,4,4,4,4,4,4,4,4,1,4,4,3,4,4,4,4,4,4,4,4,4,4,4,4,4,2,4,4,4,4,4, - 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4, - 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4, - 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4, - 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4, - 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4 }; - - for( int i = pos; i <= pos + size - 4; i += table[buffer[i+3]] ) - if( buffer[i] == 'L' && buffer[i+1] == 'Z' && - buffer[i+2] == 'I' && buffer[i+3] == 'P' ) - return i; // magic string found - return pos + size; + int rest = size; + errno = 0; + while( rest > 0 ) + { + const int n = pwrite( fd, buf + size - rest, rest, pos + size - rest ); + if( n > 0 ) rest -= n; + else if( n < 0 && errno != EINTR && errno != EAGAIN ) break; + errno = 0; + } + return size - rest; } -struct Splitter_arg - { - Packet_courier * courier; - const Pretty_print * pp; - int infd; - }; - - - // split data from input file into chunks and pass them to - // courier for packaging and distribution to workers. -extern "C" void * dsplitter( void * arg ) +int decompress_read_error( struct LZ_Decoder * const decoder, + const Pretty_print & pp, const int worker_id ) { - const Splitter_arg & tmp = *(Splitter_arg *)arg; - Packet_courier & courier = *tmp.courier; - const Pretty_print & pp = *tmp.pp; - const int infd = tmp.infd; - const int hsize = 6; // header size - const int tsize = 20; // trailer size - const int buffer_size = max_packet_size; - const int base_buffer_size = tsize + buffer_size + hsize; - uint8_t * const base_buffer = new( std::nothrow ) uint8_t[base_buffer_size]; - if( base_buffer == 0 ) { pp( "Not enough memory" ); fatal(); } - uint8_t * const buffer = base_buffer + tsize; - - int size = readblock( infd, buffer, buffer_size + hsize ) - hsize; - bool at_stream_end = ( size < buffer_size ); - if( size != buffer_size && errno ) - { pp(); show_error( "Read error", errno ); fatal(); } - if( size <= tsize || find_magic( buffer, 0, 4 ) != 0 ) - { pp( "Bad magic number (file not in lzip format)" ); fatal(); } - - long long partial_member_size = 0; - while( true ) - { - int pos = 0; - for( int newpos = 1; newpos <= size; ++newpos ) - { - newpos = find_magic( buffer, newpos, size + 4 - newpos ); - if( newpos <= size ) - { - long long member_size = 0; - for( int i = 1; i <= 8; ++i ) - { member_size <<= 8; member_size += base_buffer[tsize+newpos-i]; } - if( partial_member_size + newpos - pos == member_size ) - { // header found - uint8_t * const data = new( std::nothrow ) uint8_t[newpos - pos]; - if( data == 0 ) { pp( "Not enough memory" ); fatal(); } - std::memcpy( data, buffer + pos, newpos - pos ); - courier.receive_packet( data, newpos - pos ); - courier.receive_packet( 0, 0 ); // end of member token - partial_member_size = 0; - pos = newpos; - } - } - } - - if( at_stream_end ) - { - uint8_t * data = new( std::nothrow ) uint8_t[size + hsize - pos]; - if( data == 0 ) { pp( "Not enough memory" ); fatal(); } - std::memcpy( data, buffer + pos, size + hsize - pos ); - courier.receive_packet( data, size + hsize - pos ); - courier.receive_packet( 0, 0 ); // end of member token - break; - } - if( pos < buffer_size ) - { - partial_member_size += buffer_size - pos; - uint8_t * data = new( std::nothrow ) uint8_t[buffer_size - pos]; - if( data == 0 ) { pp( "Not enough memory" ); fatal(); } - std::memcpy( data, buffer + pos, buffer_size - pos ); - courier.receive_packet( data, buffer_size - pos ); - } - std::memcpy( base_buffer, base_buffer + buffer_size, tsize + hsize ); - size = readblock( infd, buffer + hsize, buffer_size ); - at_stream_end = ( size < buffer_size ); - if( size != buffer_size && errno ) - { pp(); show_error( "Read error", errno ); fatal(); } - } - delete[] base_buffer; - courier.finish(); // no more packets to send - return 0; + const LZ_Errno errcode = LZ_decompress_errno( decoder ); + pp(); + if( verbosity >= 0 ) + std::fprintf( stderr, "LZ_decompress_read error in worker %d: %s.\n", + worker_id, LZ_strerror( errcode ) ); + if( errcode == LZ_header_error || errcode == LZ_unexpected_eof || + errcode == LZ_data_error ) + return 2; + return 1; } +namespace { + struct Worker_arg { - Packet_courier * courier; + const File_index * file_index; const Pretty_print * pp; int worker_id; + int num_workers; + int infd; + int outfd; }; - // consume packets from courier, decompress their contents, and - // give the produced packets to courier. + // read members from file, decompress their contents, and + // write the produced data to file. extern "C" void * dworker( void * arg ) { const Worker_arg & tmp = *(Worker_arg *)arg; - Packet_courier & courier = *tmp.courier; + const File_index & file_index = *tmp.file_index; const Pretty_print & pp = *tmp.pp; const int worker_id = tmp.worker_id; - const int new_data_size = max_packet_size; + const int num_workers = tmp.num_workers; + const int infd = tmp.infd; + const int outfd = tmp.outfd; + const int buffer_size = 65536; - uint8_t * new_data = new( std::nothrow ) uint8_t[new_data_size]; + uint8_t * const ibuffer = new( std::nothrow ) uint8_t[buffer_size]; + uint8_t * const obuffer = new( std::nothrow ) uint8_t[buffer_size]; LZ_Decoder * const decoder = LZ_decompress_open(); - if( !new_data || !decoder || LZ_decompress_errno( decoder ) != LZ_ok ) + if( !ibuffer || !obuffer || !decoder || + LZ_decompress_errno( decoder ) != LZ_ok ) { pp( "Not enough memory" ); fatal(); } - int new_pos = 0; - while( true ) + for( int i = worker_id; i < file_index.members(); i += num_workers ) { - const Packet * const ipacket = courier.distribute_packet( worker_id ); - if( ipacket == 0 ) break; // no more packets to process - if( ipacket->data == 0 ) LZ_decompress_finish( decoder ); + long long data_pos = file_index.dblock( i ).pos(); + long long data_rest = file_index.dblock( i ).size(); + long long member_pos = file_index.mblock( i ).pos(); + long long member_rest = file_index.mblock( i ).size(); - int written = 0; - while( true ) + while( member_rest > 0 ) { - if( LZ_decompress_write_size( decoder ) > 0 && written < ipacket->size ) + while( LZ_decompress_write_size( decoder ) > 0 ) { - const int wr = LZ_decompress_write( decoder, ipacket->data + written, - ipacket->size - written ); - if( wr < 0 ) internal_error( "library error (LZ_decompress_write)" ); - written += wr; - if( written > ipacket->size ) - internal_error( "ipacket size exceeded in worker" ); + const int size = std::min( LZ_decompress_write_size( decoder ), + (int)std::min( (long long)buffer_size, member_rest ) ); + if( size > 0 ) + { + if( preadblock( infd, ibuffer, size, member_pos ) != size ) + { pp(); show_error( "Read error", errno ); fatal(); } + member_pos += size; + member_rest -= size; + if( LZ_decompress_write( decoder, ibuffer, size ) != size ) + internal_error( "library error (LZ_decompress_write)" ); + } + if( member_rest <= 0 ) { LZ_decompress_finish( decoder ); break; } } - while( true ) // read and pack decompressed data + while( true ) // write decompressed data to file { - const int rd = LZ_decompress_read( decoder, new_data + new_pos, - new_data_size - new_pos ); + const int rd = LZ_decompress_read( decoder, obuffer, buffer_size ); if( rd < 0 ) + fatal( decompress_read_error( decoder, pp, worker_id ) ); + if( rd > 0 && outfd >= 0 ) { - pp(); - if( verbosity >= 0 ) - std::fprintf( stderr, "LZ_decompress_read error in worker %d: %s.\n", - worker_id, LZ_strerror( LZ_decompress_errno( decoder ) ) ); - fatal(); - } - new_pos += rd; - if( new_pos > new_data_size ) - internal_error( "opacket size exceeded in worker" ); - if( new_pos == new_data_size || LZ_decompress_finished( decoder ) == 1 ) - { - if( new_pos > 0 ) // make data packet - { - Packet * opacket = new Packet; - opacket->data = new_data; - opacket->size = new_pos; - courier.collect_packet( opacket, worker_id ); - new_pos = 0; - new_data = new( std::nothrow ) uint8_t[new_data_size]; - if( new_data == 0 ) { pp( "Not enough memory" ); fatal(); } - } - if( LZ_decompress_finished( decoder ) == 1 ) + const int wr = pwriteblock( outfd, obuffer, rd, data_pos ); + if( wr != rd ) { - LZ_decompress_reset( decoder ); // prepare for new ipacket - Packet * opacket = new Packet; // end of member token - opacket->data = 0; - opacket->size = 0; - courier.collect_packet( opacket, worker_id ); - break; + pp(); + if( verbosity >= 0 ) + std::fprintf( stderr, "Write error in worker %d: %s\n", + worker_id, std::strerror( errno ) ); + fatal(); } } + if( rd > 0 ) + { + data_pos += rd; + data_rest -= rd; + } + if( LZ_decompress_finished( decoder ) == 1 ) + { + if( data_rest != 0 ) + internal_error( "final data_rest != 0" ); + LZ_decompress_reset( decoder ); // prepare for new member + break; + } if( rd == 0 ) break; } - if( ipacket->data == 0 ) { delete ipacket; break; } - if( written == ipacket->size ) - { delete[] ipacket->data; delete ipacket; break; } } } - delete[] new_data; + delete[] obuffer; delete[] ibuffer; if( LZ_decompress_member_position( decoder ) != 0 ) { pp( "Error, some data remains in decoder" ); fatal(); } if( LZ_decompress_close( decoder ) < 0 ) @@ -413,112 +190,76 @@ extern "C" void * dworker( void * arg ) return 0; } +} // end namespace - // get from courier the processed and sorted packets, and write - // their contents to the output file. -void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd ) - { - while( true ) - { - Packet * opacket = courier.deliver_packet(); - if( opacket == 0 ) break; // queue is empty. all workers exited - out_size += opacket->size; + // start the workers and wait for them to finish. +int decompress( int num_workers, const int infd, const int outfd, + const Pretty_print & pp, const int debug_level, + const bool testing, const bool infd_isreg ) + { + if( !infd_isreg ) + return dec_stream( num_workers, infd, outfd, pp, debug_level, testing ); - if( outfd >= 0 ) - { - const int wr = writeblock( outfd, opacket->data, opacket->size ); - if( wr != opacket->size ) - { pp(); show_error( "Write error", errno ); fatal(); } - } - delete[] opacket->data; - delete opacket; + const File_index file_index( infd ); + if( file_index.retval() == 1 ) + { + lseek( infd, 0, SEEK_SET ); + return dec_stream( num_workers, infd, outfd, pp, debug_level, testing ); } - } - -} // end namespace + if( file_index.retval() != 0 ) + { show_error( file_index.error().c_str() ); return file_index.retval(); } + if( num_workers > file_index.members() ) + num_workers = file_index.members(); - // init the courier, then start the splitter and the workers and - // call the muxer. -int decompress( const int num_workers, const int infd, const int outfd, - const Pretty_print & pp, const int debug_level, - const bool testing ) - { - const int slots_per_worker = 2; - const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ? - num_workers * slots_per_worker : INT_MAX ); - in_size = 0; - out_size = 0; - Packet_courier courier( num_workers, num_slots ); - - Splitter_arg splitter_arg; - splitter_arg.courier = &courier; - splitter_arg.pp = &pp; - splitter_arg.infd = infd; - - pthread_t splitter_thread; - int errcode = pthread_create( &splitter_thread, 0, dsplitter, &splitter_arg ); - if( errcode ) - { show_error( "Can't create splitter thread", errcode ); fatal(); } + if( outfd >= 0 ) + { + struct stat st; + if( fstat( outfd, &st ) != 0 || !S_ISREG( st.st_mode ) || + lseek( outfd, 0, SEEK_CUR ) < 0 ) + return dec_stdout( num_workers, infd, outfd, pp, debug_level, file_index ); + } Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers]; pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; - if( worker_args == 0 || worker_threads == 0 ) + if( !worker_args || !worker_threads ) { pp( "Not enough memory" ); fatal(); } for( int i = 0; i < num_workers; ++i ) { - worker_args[i].courier = &courier; + worker_args[i].file_index = &file_index; worker_args[i].pp = &pp; worker_args[i].worker_id = i; - errcode = pthread_create( &worker_threads[i], 0, dworker, &worker_args[i] ); + worker_args[i].num_workers = num_workers; + worker_args[i].infd = infd; + worker_args[i].outfd = outfd; + const int errcode = + pthread_create( &worker_threads[i], 0, dworker, &worker_args[i] ); if( errcode ) { show_error( "Can't create worker threads", errcode ); fatal(); } } - muxer( courier, pp, outfd ); - for( int i = num_workers - 1; i >= 0; --i ) { - errcode = pthread_join( worker_threads[i], 0 ); + const int errcode = pthread_join( worker_threads[i], 0 ); if( errcode ) { show_error( "Can't join worker threads", errcode ); fatal(); } } - delete[] worker_threads; worker_threads = 0; - delete[] worker_args; worker_args = 0; - - errcode = pthread_join( splitter_thread, 0 ); - if( errcode ) - { show_error( "Can't join splitter thread", errcode ); fatal(); } + delete[] worker_threads; + delete[] worker_args; - if( verbosity >= 3 && out_size > 0 && in_size > 0 ) + const unsigned long long in_size = file_index.file_end(); + const unsigned long long out_size = file_index.data_end(); + if( verbosity >= 2 && out_size > 0 && in_size > 0 ) std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, %5.2f%% saved. ", (double)out_size / in_size, ( 8.0 * in_size ) / out_size, 100.0 * ( 1.0 - ( (double)in_size / out_size ) ) ); - if( verbosity >= 2 ) - std::fprintf( stderr, "decompressed size %9lld, size %9lld. ", + if( verbosity >= 3 ) + std::fprintf( stderr, "decompressed size %9llu, size %9llu. ", out_size, in_size ); - if( verbosity >= 1 ) - { if( testing ) std::fprintf( stderr, "ok\n" ); - else std::fprintf( stderr, "done\n" ); } - - if( debug_level & 1 ) - std::fprintf( stderr, - "splitter tried to send a packet %8lu times\n" - "splitter had to wait %8lu times\n" - "any worker tried to consume from splitter %8lu times\n" - "any worker had to wait %8lu times\n" - "muxer tried to consume from workers %8lu times\n" - "muxer had to wait %8lu times\n", - courier.tally().check_counter, - courier.tally().wait_counter, - courier.icheck_counter, - courier.iwait_counter, - courier.ocheck_counter, - courier.owait_counter ); - - if( !courier.finished() ) internal_error( "courier not finished" ); + if( verbosity >= 1 ) std::fprintf( stderr, testing ? "ok\n" : "done\n" ); + return 0; } diff --git a/doc/plzip.1 b/doc/plzip.1 index ec3fc36..2b1261b 100644 --- a/doc/plzip.1 +++ b/doc/plzip.1 @@ -1,5 +1,5 @@ .\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.37.1. -.TH PLZIP "1" "January 2012" "Plzip 0.8" "User Commands" +.TH PLZIP "1" "March 2013" "Plzip 1.0-rc1" "User Commands" .SH NAME Plzip \- reduces the size of files .SH SYNOPSIS @@ -37,7 +37,7 @@ keep (don't delete) input files set match length limit in bytes [36] .TP \fB\-n\fR, \fB\-\-threads=\fR<n> -set the number of (de)compression threads +set number of (de)compression threads [1] .TP \fB\-o\fR, \fB\-\-output=\fR<file> if reading stdin, place the output into <file> @@ -78,8 +78,8 @@ Plzip home page: http://www.nongnu.org/lzip/plzip.html .SH COPYRIGHT Copyright \(co 2009 Laszlo Ersek. .br -Copyright \(co 2012 Antonio Diaz Diaz. -Using Lzlib 1.3\-rc1 +Copyright \(co 2013 Antonio Diaz Diaz. +Using Lzlib 1.4\-rc2 License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html> .br This is free software: you are free to change and redistribute it. diff --git a/doc/plzip.info b/doc/plzip.info index 3a12bef..bf22e32 100644 --- a/doc/plzip.info +++ b/doc/plzip.info @@ -12,25 +12,25 @@ File: plzip.info, Node: Top, Next: Introduction, Up: (dir) Plzip Manual ************ -This manual is for Plzip (version 0.8, 17 January 2012). +This manual is for Plzip (version 1.0-rc1, 8 March 2013). * Menu: -* Introduction:: Purpose and features of plzip -* Invoking Plzip:: Command line interface -* Program Design:: Internal structure of plzip -* File Format:: Detailed format of the compressed file -* Problems:: Reporting bugs -* Concept Index:: Index of concepts +* Introduction:: Purpose and features of plzip +* Program Design:: Internal structure of plzip +* Invoking Plzip:: Command line interface +* File Format:: Detailed format of the compressed file +* Problems:: Reporting bugs +* Concept Index:: Index of concepts - Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz. + Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. This manual is free documentation: you have unlimited permission to copy, distribute and modify it. -File: plzip.info, Node: Introduction, Next: Invoking Plzip, Prev: Top, Up: Top +File: plzip.info, Node: Introduction, Next: Program Design, Prev: Top, Up: Top 1 Introduction ************** @@ -81,15 +81,45 @@ processed. Be aware, though, that the check occurs upon decompression, so it can only tell you that something is wrong. It can't help you recover the original uncompressed data. + WARNING! Even if plzip is bug-free, other causes may result in a +corrupt compressed file (bugs in the system libraries, memory errors, +etc). Therefore, if the data you are going to compress is important, +give the `--keep' option to plzip and do not remove the original file +until you verify the compressed file with a command like +`plzip -cd file.lz | cmp file -'. + Return values: 0 for a normal exit, 1 for environmental problems (file not found, invalid flags, I/O errors, etc), 2 to indicate a corrupt or invalid input file, 3 for an internal consistency error (eg, bug) which caused plzip to panic. -File: plzip.info, Node: Invoking Plzip, Next: Program Design, Prev: Introduction, Up: Top +File: plzip.info, Node: Program Design, Next: Invoking Plzip, Prev: Introduction, Up: Top + +2 Program Design +**************** + +For each input file, a splitter thread and several worker threads are +created, acting the main thread as muxer (multiplexer) thread. A "packet +courier" takes care of data transfers among threads and limits the +maximum number of data blocks (packets) being processed simultaneously. + + The splitter reads data blocks from the input file, and distributes +them to the workers. The workers (de)compress the blocks received from +the splitter. The muxer collects processed packets from the workers, and +writes them to the output file. + + When decompressing from a regular file, the splitter is removed and +the workers read directly from the input file. If the output file is +also a regular file, the muxer is also removed, and the workers write +directly to the output file. With these optimizations, decompression +speed of large files with many members is only limited by the number of +processors available and by I/O speed. + + +File: plzip.info, Node: Invoking Plzip, Next: File Format, Prev: Program Design, Up: Top -2 Invoking Plzip +3 Invoking Plzip **************** The format for running plzip is: @@ -149,7 +179,8 @@ The format for running plzip is: Set the number of worker threads. Valid values range from 1 to "as many as your system can support". If this option is not used, plzip tries to detect the number of processors in the system and - use it as default value. + use it as default value. `plzip --help' shows the system's default + value. `-o FILE' `--output=FILE' @@ -236,28 +267,17 @@ Z zettabyte (10^21) | Zi zebibyte (2^70) Y yottabyte (10^24) | Yi yobibyte (2^80) -File: plzip.info, Node: Program Design, Next: File Format, Prev: Invoking Plzip, Up: Top - -3 Program Design -**************** - -For each input file, a splitter thread and several worker threads are -created, acting the main thread as muxer (multiplexer) thread. A "packet -courier" takes care of data transfers among threads and limits the -maximum number of data blocks (packets) being processed simultaneously. - - The splitter reads data blocks from the input file, and distributes -them to the workers. The workers (de)compress the blocks received from -the splitter. The muxer collects processed packets from the workers, and -writes them to the output file. - - -File: plzip.info, Node: File Format, Next: Problems, Prev: Program Design, Up: Top +File: plzip.info, Node: File Format, Next: Problems, Prev: Invoking Plzip, Up: Top 4 File Format ************* -In the diagram below, a box like this: +Perfection is reached, not when there is no longer anything to add, but +when there is no longer anything to take away. +-- Antoine de Saint-Exupery + + + In the diagram below, a box like this: +---+ | | <-- the vertical bars might be missing +---+ @@ -286,15 +306,19 @@ additional information before, between, or after them. "LZIP". `VN (version number, 1 byte)' - Just in case something needs to be modified in the future. Valid - values are 0 and 1. Version 0 files are deprecated. They can - contain only one member and lack the `Member size' field. + Just in case something needs to be modified in the future. 1 for + now. `DS (coded dictionary size, 1 byte)' - Bits 4-0 contain the base 2 logarithm of the base dictionary size. - Bits 7-5 contain the number of "wedges" to substract from the base - dictionary size to obtain the dictionary size. The size of a wedge - is (base dictionary size / 16). + Lzip divides the distance between any two powers of 2 into 8 + equally spaced intervals, named "wedges". The dictionary size is + calculated by taking a power of 2 (the base size) and substracting + from it a number of wedges between 0 and 7. The size of a wedge is + (base_size / 16). + Bits 4-0 contain the base 2 logarithm of the base size (12 to 29). + Bits 7-5 contain the number of wedges (0 to 7) to substract from + the base size to obtain the dictionary size. + Example: 0xD3 = (2^19 - 6 * 2^15) = (512KiB - 6 * 32KiB) = 320KiB Valid values for dictionary size range from 4KiB to 512MiB. `Lzma stream' @@ -308,9 +332,9 @@ additional information before, between, or after them. Size of the uncompressed original data. `Member size (8 bytes)' - Total size of the member, including header and trailer. This - facilitates safe recovery of undamaged members from multi-member - files. + Total size of the member, including header and trailer. This field + acts as a distributed index, and facilitates safe recovery of + undamaged members from multi-member files. @@ -351,12 +375,12 @@ Concept Index Tag Table: Node: Top223 -Node: Introduction845 -Node: Invoking Plzip3641 -Node: Program Design8597 -Node: File Format9259 -Node: Problems11254 -Node: Concept Index11783 +Node: Introduction864 +Node: Program Design4030 +Node: Invoking Plzip5084 +Node: File Format10093 +Node: Problems12473 +Node: Concept Index13002 End Tag Table diff --git a/doc/plzip.texinfo b/doc/plzip.texinfo index c83d5a5..5e62234 100644 --- a/doc/plzip.texinfo +++ b/doc/plzip.texinfo @@ -6,8 +6,8 @@ @finalout @c %**end of header -@set UPDATED 17 January 2012 -@set VERSION 0.8 +@set UPDATED 8 March 2013 +@set VERSION 1.0-rc1 @dircategory Data Compression @direntry @@ -35,16 +35,16 @@ This manual is for Plzip (version @value{VERSION}, @value{UPDATED}). @menu -* Introduction:: Purpose and features of plzip -* Invoking Plzip:: Command line interface -* Program Design:: Internal structure of plzip -* File Format:: Detailed format of the compressed file -* Problems:: Reporting bugs -* Concept Index:: Index of concepts +* Introduction:: Purpose and features of plzip +* Program Design:: Internal structure of plzip +* Invoking Plzip:: Command line interface +* File Format:: Detailed format of the compressed file +* Problems:: Reporting bugs +* Concept Index:: Index of concepts @end menu @sp 1 -Copyright @copyright{} 2009, 2010, 2011, 2012 Antonio Diaz Diaz. +Copyright @copyright{} 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. This manual is free documentation: you have unlimited permission to copy, distribute and modify it. @@ -102,12 +102,41 @@ the check occurs upon decompression, so it can only tell you that something is wrong. It can't help you recover the original uncompressed data. +WARNING! Even if plzip is bug-free, other causes may result in a corrupt +compressed file (bugs in the system libraries, memory errors, etc). +Therefore, if the data you are going to compress is important, give the +@samp{--keep} option to plzip and do not remove the original file until +you verify the compressed file with a command like +@w{@samp{plzip -cd file.lz | cmp file -}}. + Return values: 0 for a normal exit, 1 for environmental problems (file not found, invalid flags, I/O errors, etc), 2 to indicate a corrupt or invalid input file, 3 for an internal consistency error (eg, bug) which caused plzip to panic. +@node Program Design +@chapter Program Design +@cindex program design + +For each input file, a splitter thread and several worker threads are +created, acting the main thread as muxer (multiplexer) thread. A "packet +courier" takes care of data transfers among threads and limits the +maximum number of data blocks (packets) being processed simultaneously. + +The splitter reads data blocks from the input file, and distributes them +to the workers. The workers (de)compress the blocks received from the +splitter. The muxer collects processed packets from the workers, and +writes them to the output file. + +When decompressing from a regular file, the splitter is removed and the +workers read directly from the input file. If the output file is also a +regular file, the muxer is also removed, and the workers write directly +to the output file. With these optimizations, decompression speed of +large files with many members is only limited by the number of +processors available and by I/O speed. + + @node Invoking Plzip @chapter Invoking Plzip @cindex invoking @@ -173,7 +202,7 @@ usually give better compression ratios but longer compression times. Set the number of worker threads. Valid values range from 1 to "as many as your system can support". If this option is not used, plzip tries to detect the number of processors in the system and use it as default -value. +value. @w{@samp{plzip --help}} shows the system's default value. @item -o @var{file} @itemx --output=@var{file} @@ -261,25 +290,15 @@ Table of SI and binary prefixes (unit multipliers): @end multitable -@node Program Design -@chapter Program Design -@cindex program design - -For each input file, a splitter thread and several worker threads are -created, acting the main thread as muxer (multiplexer) thread. A "packet -courier" takes care of data transfers among threads and limits the -maximum number of data blocks (packets) being processed simultaneously. - -The splitter reads data blocks from the input file, and distributes them -to the workers. The workers (de)compress the blocks received from the -splitter. The muxer collects processed packets from the workers, and -writes them to the output file. - - @node File Format @chapter File Format @cindex file format +Perfection is reached, not when there is no longer anything to add, but +when there is no longer anything to take away.@* +--- Antoine de Saint-Exupery + +@sp 1 In the diagram below, a box like this: @verbatim +---+ @@ -315,15 +334,17 @@ All multibyte values are stored in little endian order. A four byte string, identifying the lzip format, with the value "LZIP". @item VN (version number, 1 byte) -Just in case something needs to be modified in the future. Valid values -are 0 and 1. Version 0 files are deprecated. They can contain only one -member and lack the @samp{Member size} field. +Just in case something needs to be modified in the future. 1 for now. @item DS (coded dictionary size, 1 byte) -Bits 4-0 contain the base 2 logarithm of the base dictionary size.@* -Bits 7-5 contain the number of "wedges" to substract from the base -dictionary size to obtain the dictionary size. The size of a wedge is -(base dictionary size / 16).@* +Lzip divides the distance between any two powers of 2 into 8 equally +spaced intervals, named "wedges". The dictionary size is calculated by +taking a power of 2 (the base size) and substracting from it a number of +wedges between 0 and 7. The size of a wedge is (base_size / 16).@* +Bits 4-0 contain the base 2 logarithm of the base size (12 to 29).@* +Bits 7-5 contain the number of wedges (0 to 7) to substract from the +base size to obtain the dictionary size.@* +Example: 0xD3 = (2^19 - 6 * 2^15) = (512KiB - 6 * 32KiB) = 320KiB@* Valid values for dictionary size range from 4KiB to 512MiB. @item Lzma stream @@ -337,8 +358,9 @@ CRC of the uncompressed original data. Size of the uncompressed original data. @item Member size (8 bytes) -Total size of the member, including header and trailer. This facilitates -safe recovery of undamaged members from multi-member files. +Total size of the member, including header and trailer. This field acts +as a distributed index, and facilitates safe recovery of undamaged +members from multi-member files. @end table diff --git a/file_index.cc b/file_index.cc new file mode 100644 index 0000000..5cdba46 --- /dev/null +++ b/file_index.cc @@ -0,0 +1,143 @@ +/* Plzip - A parallel compressor compatible with lzip + Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <algorithm> +#include <cerrno> +#include <cstdio> +#include <cstring> +#include <string> +#include <vector> +#include <stdint.h> +#include <unistd.h> + +#include "lzip.h" +#include "file_index.h" + + +int seek_read( const int fd, uint8_t * const buf, const int size, + const long long pos ) + { + if( lseek( fd, pos, SEEK_SET ) == pos ) + return readblock( fd, buf, size ); + return 0; + } + + +const char * format_num( unsigned long long num, + unsigned long long limit = -1ULL, + const int set_prefix = 0 ) + { + const char * const si_prefix[8] = + { "k", "M", "G", "T", "P", "E", "Z", "Y" }; + const char * const binary_prefix[8] = + { "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi", "Yi" }; + static bool si = true; + static char buf[32]; + + if( set_prefix ) si = ( set_prefix > 0 ); + const unsigned factor = ( si ? 1000 : 1024 ); + const char * const * prefix = ( si ? si_prefix : binary_prefix ); + const char * p = ""; + bool exact = ( num % factor == 0 ); + + for( int i = 0; i < 8 && ( num > limit || ( exact && num >= factor ) ); ++i ) + { num /= factor; if( num % factor != 0 ) exact = false; p = prefix[i]; } + snprintf( buf, sizeof buf, "%llu %s", num, p ); + return buf; + } + + +File_index::File_index( const int infd ) : retval_( 0 ) + { + const long long isize = lseek( infd, 0, SEEK_END ); + if( isize < 0 ) + { error_ = "Input file is not seekable :"; + error_ += std::strerror( errno ); retval_ = 1; return; } + if( isize > INT64_MAX ) + { error_ = "Input file is too long (2^63 bytes or more)."; + retval_ = 2; return; } + long long pos = isize; // always points to a header or EOF + File_header header; + File_trailer trailer; + + if( isize < min_member_size ) + { error_ = "Input file is too short."; retval_ = 2; return; } + if( seek_read( infd, header.data, File_header::size, 0 ) != File_header::size ) + { error_ = "Error reading member header :"; + error_ += std::strerror( errno ); retval_ = 1; return; } + if( !header.verify_magic() ) + { error_ = "Bad magic number (file not in lzip format)."; + retval_ = 2; return; } + if( !header.verify_version() ) + { error_ = "Version "; error_ += format_num( header.version() ); + error_ += "member format not supported."; retval_ = 2; return; } + + while( pos >= min_member_size ) + { + if( seek_read( infd, trailer.data, File_trailer::size, + pos - File_trailer::size ) != File_trailer::size ) + { error_ = "Error reading member trailer :"; + error_ += std::strerror( errno ); retval_ = 1; break; } + const long long member_size = trailer.member_size(); + if( member_size < min_member_size || member_size > pos ) + { + if( member_vector.size() == 0 ) // maybe trailing garbage + { --pos; continue; } + error_ = "Member size in trailer is corrupt at pos "; + error_ += format_num( pos - 8 ); retval_ = 2; break; + } + if( seek_read( infd, header.data, File_header::size, + pos - member_size ) != File_header::size ) + { error_ = "Error reading member header :"; + error_ += std::strerror( errno ); retval_ = 1; break; } + if( !header.verify_magic() || !header.verify_version() ) + { + if( member_vector.size() == 0 ) // maybe trailing garbage + { --pos; continue; } + error_ = "Bad header at pos "; + error_ += format_num( pos - member_size ); retval_ = 2; break; + } + if( member_vector.size() == 0 && isize - pos > File_header::size && + seek_read( infd, header.data, File_header::size, pos ) == File_header::size && + header.verify_magic() && header.verify_version() ) + { // last trailer is corrupt + error_ = "Member size in trailer is corrupt at pos "; + error_ += format_num( isize - 8 ); retval_ = 2; break; + } + pos -= member_size; + member_vector.push_back( Member( 0, trailer.data_size(), + pos, member_size ) ); + } + if( pos != 0 || member_vector.size() == 0 ) + { + member_vector.clear(); + if( retval_ == 0 ) { error_ = "Can't create file index."; retval_ = 2; } + return; + } + std::reverse( member_vector.begin(), member_vector.end() ); + for( unsigned i = 0; i < member_vector.size() - 1; ++i ) + { + const long long end = member_vector[i].dblock.end(); + if( end < 0 || end > INT64_MAX ) + { + member_vector.clear(); + error_ = "Data in input file is too long (2^63 bytes or more)."; + retval_ = 2; return; + } + member_vector[i+1].dblock.pos( end ); + } + } diff --git a/file_index.h b/file_index.h new file mode 100644 index 0000000..1dfbcf4 --- /dev/null +++ b/file_index.h @@ -0,0 +1,77 @@ +/* Plzip - A parallel compressor compatible with lzip + Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef INT64_MAX +#define INT64_MAX 0x7FFFFFFFFFFFFFFFLL +#endif + + +class Block + { + long long pos_, size_; // pos + size <= INT64_MAX + +public: + Block( const long long p, const long long s ) : pos_( p ), size_( s ) {} + + long long pos() const { return pos_; } + long long size() const { return size_; } + long long end() const { return pos_ + size_; } + + void pos( const long long p ) { pos_ = p; } + void size( const long long s ) { size_ = s; } + + bool overlaps( const Block & b ) const + { return ( pos_ < b.end() && b.pos_ < end() ); } + void shift( Block & b ) { ++size_; ++b.pos_; --b.size_; } + }; + + +class File_index + { + struct Member + { + Block dblock, mblock; // data block, member block + + Member( const long long dp, const long long ds, + const long long mp, const long long ms ) + : dblock( dp, ds ), mblock( mp, ms ) {} + }; + + std::vector< Member > member_vector; + std::string error_; + int retval_; + +public: + File_index( const int infd ); + + const std::string & error() const { return error_; } + int retval() const { return retval_; } + + long long data_end() const + { if( member_vector.size() ) return member_vector.back().dblock.end(); + else return 0; } + + long long file_end() const + { if( member_vector.size() ) return member_vector.back().mblock.end(); + else return 0; } + + const Block & dblock( const int i ) const + { return member_vector[i].dblock; } + const Block & mblock( const int i ) const + { return member_vector[i].mblock; } + int members() const { return (int)member_vector.size(); } + }; @@ -0,0 +1,246 @@ +/* Plzip - A parallel compressor compatible with lzip + Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +enum { + min_dictionary_bits = 12, + min_dictionary_size = 1 << min_dictionary_bits, + max_dictionary_bits = 29, + max_dictionary_size = 1 << max_dictionary_bits, + min_member_size = 36 }; + + +class Pretty_print + { + std::string name_; + const char * const stdin_name; + unsigned longest_name; + mutable bool first_post; + +public: + explicit Pretty_print( const std::vector< std::string > & filenames ) + : stdin_name( "(stdin)" ), longest_name( 0 ), first_post( false ) + { + const unsigned stdin_name_len = std::strlen( stdin_name ); + for( unsigned i = 0; i < filenames.size(); ++i ) + { + const std::string & s = filenames[i]; + const unsigned len = ( ( s == "-" ) ? stdin_name_len : s.size() ); + if( len > longest_name ) longest_name = len; + } + if( longest_name == 0 ) longest_name = stdin_name_len; + } + + void set_name( const std::string & filename ) + { + if( filename.size() && filename != "-" ) name_ = filename; + else name_ = stdin_name; + first_post = true; + } + + void reset() const { if( name_.size() ) first_post = true; } + const char * name() const { return name_.c_str(); } + void operator()( const char * const msg = 0 ) const; + }; + + +inline int real_bits( unsigned value ) + { + int bits = 0; + while( value > 0 ) { value >>= 1; ++bits; } + return bits; + } + + +const uint8_t magic_string[4] = { 0x4C, 0x5A, 0x49, 0x50 }; // "LZIP" + +struct File_header + { + uint8_t data[6]; // 0-3 magic bytes + // 4 version + // 5 coded_dict_size + enum { size = 6 }; + + void set_magic() { std::memcpy( data, magic_string, 4 ); data[4] = 1; } + bool verify_magic() const + { return ( std::memcmp( data, magic_string, 4 ) == 0 ); } + + uint8_t version() const { return data[4]; } + bool verify_version() const { return ( data[4] == 1 ); } + + unsigned dictionary_size() const + { + unsigned sz = ( 1 << ( data[5] & 0x1F ) ); + if( sz > min_dictionary_size ) + sz -= ( sz / 16 ) * ( ( data[5] >> 5 ) & 7 ); + return sz; + } + + bool dictionary_size( const int sz ) + { + if( sz >= min_dictionary_size && sz <= max_dictionary_size ) + { + data[5] = real_bits( sz - 1 ); + if( sz > min_dictionary_size ) + { + const int base_size = 1 << data[5]; + const int wedge = base_size / 16; + for( int i = 7; i >= 1; --i ) + if( base_size - ( i * wedge ) >= sz ) + { data[5] |= ( i << 5 ); break; } + } + return true; + } + return false; + } + }; + + +struct File_trailer + { + uint8_t data[20]; // 0-3 CRC32 of the uncompressed data + // 4-11 size of the uncompressed data + // 12-19 member size including header and trailer + + enum { size = 20 }; + + unsigned data_crc() const + { + unsigned tmp = 0; + for( int i = 3; i >= 0; --i ) { tmp <<= 8; tmp += data[i]; } + return tmp; + } + + void data_crc( unsigned crc ) + { for( int i = 0; i <= 3; ++i ) { data[i] = (uint8_t)crc; crc >>= 8; } } + + unsigned long long data_size() const + { + unsigned long long tmp = 0; + for( int i = 11; i >= 4; --i ) { tmp <<= 8; tmp += data[i]; } + return tmp; + } + + void data_size( unsigned long long sz ) + { + for( int i = 4; i <= 11; ++i ) { data[i] = (uint8_t)sz; sz >>= 8; } + } + + unsigned long long member_size() const + { + unsigned long long tmp = 0; + for( int i = 19; i >= 12; --i ) { tmp <<= 8; tmp += data[i]; } + return tmp; + } + + void member_size( unsigned long long sz ) + { + for( int i = 12; i <= 19; ++i ) { data[i] = (uint8_t)sz; sz >>= 8; } + } + }; + + +// defined in compress.cc +int readblock( const int fd, uint8_t * const buf, const int size ); +int writeblock( const int fd, const uint8_t * const buf, const int size ); +void xinit( pthread_mutex_t * const mutex ); +void xinit( pthread_cond_t * const cond ); +void xdestroy( pthread_mutex_t * const mutex ); +void xdestroy( pthread_cond_t * const cond ); +void xlock( pthread_mutex_t * const mutex ); +void xunlock( pthread_mutex_t * const mutex ); +void xwait( pthread_cond_t * const cond, pthread_mutex_t * const mutex ); +void xsignal( pthread_cond_t * const cond ); +void xbroadcast( pthread_cond_t * const cond ); +int compress( const int data_size, const int dictionary_size, + const int match_len_limit, int num_workers, + const int infd, const int outfd, + const Pretty_print & pp, const int debug_level ); + +// defined in file_index.cc +class File_index; + +// defined in dec_stdout.cc +int dec_stdout( const int num_workers, const int infd, const int outfd, + const Pretty_print & pp, const int debug_level, + const File_index & file_index ); + +// defined in dec_stream.cc +int dec_stream( const int num_workers, const int infd, const int outfd, + const Pretty_print & pp, const int debug_level, + const bool testing ); + +// defined in decompress.cc +int preadblock( const int fd, uint8_t * const buf, const int size, + const long long pos ); +int pwriteblock( const int fd, const uint8_t * const buf, const int size, + const long long pos ); +int decompress_read_error( struct LZ_Decoder * const decoder, + const Pretty_print & pp, const int worker_id ); +int decompress( int num_workers, const int infd, const int outfd, + const Pretty_print & pp, const int debug_level, + const bool testing, const bool infd_isreg ); + +// defined in main.cc +extern int verbosity; +void fatal( const int retval = 1 ); // terminate the program +void show_error( const char * const msg, const int errcode = 0, + const bool help = false ); +void internal_error( const char * const msg ); + + +class Slot_tally + { + const int num_slots; // total slots + int num_free; // remaining free slots + pthread_mutex_t mutex; + pthread_cond_t slot_av; // free slot available + + Slot_tally( const Slot_tally & ); // declared as private + void operator=( const Slot_tally & ); // declared as private + +public: + explicit Slot_tally( const int slots ) + : num_slots( slots ), num_free( slots ) + { xinit( &mutex ); xinit( &slot_av ); } + + ~Slot_tally() { xdestroy( &slot_av ); xdestroy( &mutex ); } + + bool all_free() { return ( num_free == num_slots ); } + + void get_slot() // wait for a free slot + { + xlock( &mutex ); + while( num_free <= 0 ) xwait( &slot_av, &mutex ); + --num_free; + xunlock( &mutex ); + } + + void leave_slot() // return a slot to the tally + { + xlock( &mutex ); + if( ++num_free == 1 ) xsignal( &slot_av ); // num_free was 0 + xunlock( &mutex ); + } + + void leave_slots( const int slots ) // return slots to the tally + { + xlock( &mutex ); + num_free += slots; + if( num_free == slots ) xsignal( &slot_av ); // num_free was 0 + xunlock( &mutex ); + } + }; @@ -1,6 +1,6 @@ /* Plzip - A parallel compressor compatible with lzip Copyright (C) 2009 Laszlo Ersek. - Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz. + Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -34,36 +34,41 @@ #include <string> #include <vector> #include <fcntl.h> -#include <inttypes.h> #include <pthread.h> +#include <stdint.h> #include <unistd.h> #include <utime.h> #include <sys/stat.h> #include <lzlib.h> +#if defined(__MSVCRT__) +#include <io.h> +#define fchmod(x,y) 0 +#define fchown(x,y,z) 0 +#define strtoull std::strtoul +#define SIGHUP SIGTERM +#define S_ISSOCK(x) 0 +#define S_IRGRP 0 +#define S_IWGRP 0 +#define S_IROTH 0 +#define S_IWOTH 0 +#endif +#if defined(__OS2__) +#include <io.h> +#endif #include "arg_parser.h" -#include "plzip.h" +#include "lzip.h" #if CHAR_BIT != 8 #error "Environments where CHAR_BIT != 8 are not supported." #endif -#ifndef LLONG_MAX -#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL -#endif -#ifndef LLONG_MIN -#define LLONG_MIN (-LLONG_MAX - 1LL) -#endif -#ifndef ULLONG_MAX -#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL -#endif - namespace { const char * const Program_name = "Plzip"; const char * const program_name = "plzip"; -const char * const program_year = "2012"; +const char * const program_year = "2013"; const char * invocation_name = 0; #ifdef O_BINARY @@ -87,13 +92,15 @@ enum Mode { m_compress, m_decompress, m_test }; std::string output_filename; int outfd = -1; -mode_t outfd_mode = S_IRUSR | S_IWUSR; +const mode_t usr_rw = S_IRUSR | S_IWUSR; +const mode_t all_rw = usr_rw | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH; +mode_t outfd_mode = usr_rw; bool delete_output_on_interrupt = false; pthread_t main_thread; pid_t main_thread_pid; -void show_help() throw() +void show_help( const long num_online ) { std::printf( "%s - A parallel compressor compatible with lzip.\n", Program_name ); std::printf( "\nUsage: %s [options] [files]\n", invocation_name ); @@ -107,7 +114,7 @@ void show_help() throw() " -F, --recompress force recompression of compressed files\n" " -k, --keep keep (don't delete) input files\n" " -m, --match-length=<bytes> set match length limit in bytes [36]\n" - " -n, --threads=<n> set the number of (de)compression threads\n" + " -n, --threads=<n> set number of (de)compression threads [%ld]\n" " -o, --output=<file> if reading stdin, place the output into <file>\n" " -q, --quiet suppress all messages\n" " -s, --dictionary-size=<bytes> set dictionary size limit in bytes [8MiB]\n" @@ -115,7 +122,7 @@ void show_help() throw() " -v, --verbose be verbose (a 2nd -v gives more)\n" " -1 .. -9 set compression level [default 6]\n" " --fast alias for -1\n" - " --best alias for -9\n" ); + " --best alias for -9\n", num_online ); if( verbosity > 0 ) { std::printf( " -D, --debug=<level> (0-1) print debug statistics to stderr\n" ); @@ -133,7 +140,7 @@ void show_help() throw() } -void show_version() throw() +void show_version() { std::printf( "%s %s\n", Program_name, PROGVERSION ); std::printf( "Copyright (C) 2009 Laszlo Ersek.\n" @@ -145,13 +152,13 @@ void show_version() throw() } -long long getnum( const char * const ptr, - const long long llimit = LLONG_MIN + 1, - const long long ulimit = LLONG_MAX ) throw() +unsigned long long getnum( const char * const ptr, + const unsigned long long llimit, + const unsigned long long ulimit ) { errno = 0; - char *tail; - long long result = strtoll( ptr, &tail, 0 ); + char * tail; + unsigned long long result = strtoull( ptr, &tail, 0 ); if( tail == ptr ) { show_error( "Bad or missing numerical argument.", 0, true ); @@ -186,7 +193,7 @@ long long getnum( const char * const ptr, } for( int i = 0; i < exponent; ++i ) { - if( LLONG_MAX / factor >= llabs( result ) ) result *= factor; + if( ulimit / factor >= result ) result *= factor; else { errno = ERANGE; break; } } } @@ -200,9 +207,9 @@ long long getnum( const char * const ptr, } -int get_dict_size( const char * const arg ) throw() +int get_dict_size( const char * const arg ) { - char *tail; + char * tail; int bits = std::strtol( arg, &tail, 0 ); if( bits >= LZ_min_dictionary_bits() && bits <= LZ_max_dictionary_bits() && *tail == 0 ) @@ -211,7 +218,7 @@ int get_dict_size( const char * const arg ) throw() } -int extension_index( const std::string & name ) throw() +int extension_index( const std::string & name ) { for( int i = 0; known_extensions[i].from; ++i ) { @@ -226,7 +233,7 @@ int extension_index( const std::string & name ) throw() int open_instream( const std::string & name, struct stat * const in_statsp, const Mode program_mode, const int eindex, - const bool recompress, const bool to_stdout ) throw() + const bool recompress, const bool to_stdout ) { int infd = -1; if( program_mode == m_compress && !recompress && eindex >= 0 ) @@ -248,7 +255,7 @@ int open_instream( const std::string & name, struct stat * const in_statsp, else { const int i = fstat( infd, in_statsp ); - const mode_t & mode = in_statsp->st_mode; + const mode_t mode = in_statsp->st_mode; const bool can_read = ( i == 0 && ( S_ISBLK( mode ) || S_ISCHR( mode ) || S_ISFIFO( mode ) || S_ISSOCK( mode ) ) ); @@ -268,14 +275,14 @@ int open_instream( const std::string & name, struct stat * const in_statsp, } -void set_c_outname( const std::string & name ) throw() +void set_c_outname( const std::string & name ) { output_filename = name; output_filename += known_extensions[0].from; } -void set_d_outname( const std::string & name, const int i ) throw() +void set_d_outname( const std::string & name, const int i ) { if( i >= 0 ) { @@ -294,7 +301,7 @@ void set_d_outname( const std::string & name, const int i ) throw() } -bool open_outstream( const bool force ) throw() +bool open_outstream( const bool force ) { int flags = O_CREAT | O_WRONLY | o_binary; if( force ) flags |= O_TRUNC; else flags |= O_EXCL; @@ -313,7 +320,7 @@ bool open_outstream( const bool force ) throw() } -bool check_tty( const int infd, const Mode program_mode ) throw() +bool check_tty( const int infd, const Mode program_mode ) { if( program_mode == m_compress && outfd >= 0 && isatty( outfd ) ) { @@ -330,7 +337,7 @@ bool check_tty( const int infd, const Mode program_mode ) throw() } -void cleanup_and_fail( const int retval ) throw() +void cleanup_and_fail( const int retval ) { if( delete_output_on_interrupt ) { @@ -372,17 +379,17 @@ void close_and_set_permissions( const struct stat * const in_statsp ) } -extern "C" void signal_handler( int sig ) throw() +extern "C" void signal_handler( int sig ) { if( !pthread_equal( pthread_self(), main_thread ) ) kill( main_thread_pid, sig ); - if( sig != SIGUSR1 ) + if( sig != SIGUSR1 && sig != SIGUSR2 ) show_error( "Control-C or similar caught, quitting." ); - cleanup_and_fail( 1 ); + cleanup_and_fail( ( sig != SIGUSR2 ) ? 1 : 2 ); } -void set_signals() throw() +void set_signals() { std::signal( SIGHUP, signal_handler ); std::signal( SIGINT, signal_handler ); @@ -399,10 +406,11 @@ int verbosity = 0; // since they all call common helper functions that call fatal() in case // of an error. // -void fatal() { signal_handler( SIGUSR1 ); } +void fatal( const int retval ) + { signal_handler( ( retval != 2 ) ? SIGUSR1 : SIGUSR2 ); } -void Pretty_print::operator()( const char * const msg ) const throw() +void Pretty_print::operator()( const char * const msg ) const { if( verbosity >= 0 ) { @@ -410,7 +418,7 @@ void Pretty_print::operator()( const char * const msg ) const throw() { first_post = false; std::fprintf( stderr, " %s: ", name_.c_str() ); - for( unsigned int i = 0; i < longest_name - name_.size(); ++i ) + for( unsigned i = 0; i < longest_name - name_.size(); ++i ) std::fprintf( stderr, " " ); if( !msg ) std::fflush( stderr ); } @@ -419,7 +427,7 @@ void Pretty_print::operator()( const char * const msg ) const throw() } -void show_error( const char * const msg, const int errcode, const bool help ) throw() +void show_error( const char * const msg, const int errcode, const bool help ) { if( verbosity >= 0 ) { @@ -430,14 +438,14 @@ void show_error( const char * const msg, const int errcode, const bool help ) th std::fprintf( stderr, ": %s", std::strerror( errcode ) ); std::fprintf( stderr, "\n" ); } - if( help && invocation_name && invocation_name[0] ) + if( help ) std::fprintf( stderr, "Try '%s --help' for more information.\n", invocation_name ); } } -void internal_error( const char * const msg ) throw() +void internal_error( const char * const msg ) { if( verbosity >= 0 ) std::fprintf( stderr, "%s: internal error: %s.\n", program_name, msg ); @@ -445,43 +453,6 @@ void internal_error( const char * const msg ) throw() } -// Returns the number of bytes really read. -// If (returned value < size) and (errno == 0), means EOF was reached. -// -int readblock( const int fd, uint8_t * const buf, const int size ) throw() - { - int rest = size; - errno = 0; - while( rest > 0 ) - { - errno = 0; - const int n = read( fd, buf + size - rest, rest ); - if( n > 0 ) rest -= n; - else if( n == 0 ) break; - else if( errno != EINTR && errno != EAGAIN ) break; - } - return ( rest > 0 ) ? size - rest : size; - } - - -// Returns the number of bytes really written. -// If (returned value < size), it is always an error. -// -int writeblock( const int fd, const uint8_t * const buf, const int size ) throw() - { - int rest = size; - errno = 0; - while( rest > 0 ) - { - errno = 0; - const int n = write( fd, buf + size - rest, rest ); - if( n > 0 ) rest -= n; - else if( n < 0 && errno != EINTR && errno != EAGAIN ) break; - } - return ( rest > 0 ) ? size - rest : size; - } - - int main( const int argc, const char * const argv[] ) { // Mapping from gzip/bzip2 style 1..9 compression modes @@ -499,6 +470,9 @@ int main( const int argc, const char * const argv[] ) { 3 << 23, 132 }, // -8 { 1 << 25, 273 } }; // -9 Lzma_options encoder_options = option_mapping[6]; // default = "-6" + std::string input_filename; + std::string default_output_filename; + std::vector< std::string > filenames; int data_size = 0; int debug_level = 0; int infd = -1; @@ -508,9 +482,6 @@ int main( const int argc, const char * const argv[] ) bool keep_input_files = false; bool recompress = false; bool to_stdout = false; - std::string input_filename; - std::string default_output_filename; - std::vector< std::string > filenames; invocation_name = argv[0]; main_thread = pthread_self(); main_thread_pid = getpid(); @@ -518,6 +489,7 @@ int main( const int argc, const char * const argv[] ) if( LZ_version()[0] != LZ_version_string[0] ) internal_error( "bad library version" ); + const long num_online = std::max( 1L, sysconf( _SC_NPROCESSORS_ONLN ) ); long max_workers = sysconf( _SC_THREAD_THREADS_MAX ); if( max_workers < 1 || max_workers > INT_MAX / (int)sizeof (pthread_t) ) max_workers = INT_MAX / sizeof (pthread_t); @@ -566,7 +538,8 @@ int main( const int argc, const char * const argv[] ) const char * const arg = parser.argument( argind ).c_str(); switch( code ) { - case '0': case '1': case '2': case '3': case '4': + case '0': + case '1': case '2': case '3': case '4': case '5': case '6': case '7': case '8': case '9': encoder_options = option_mapping[code-'0']; break; case 'b': break; @@ -577,7 +550,7 @@ int main( const int argc, const char * const argv[] ) case 'D': debug_level = getnum( arg, 0, 3 ); break; case 'f': force = true; break; case 'F': recompress = true; break; - case 'h': show_help(); return 0; + case 'h': show_help( num_online ); return 0; case 'k': keep_input_files = true; break; case 'm': encoder_options.match_len_limit = getnum( arg, LZ_min_match_len_limit(), @@ -595,9 +568,9 @@ int main( const int argc, const char * const argv[] ) } } // end process options -#if defined(__OS2__) - _fsetmode( stdin, "b" ); - _fsetmode( stdout, "b" ); +#if defined(__MSVCRT__) || defined(__OS2__) + setmode( STDIN_FILENO, O_BINARY ); + setmode( STDOUT_FILENO, O_BINARY ); #endif if( program_mode == m_test ) @@ -609,17 +582,13 @@ int main( const int argc, const char * const argv[] ) encoder_options.dictionary_size = std::max( data_size, LZ_min_dictionary_size() ); if( num_workers <= 0 ) - { - long num_online = sysconf( _SC_NPROCESSORS_ONLN ); - if( num_online <= 0 ) num_online = 1; num_workers = std::min( num_online, max_workers ); - } bool filenames_given = false; for( ; argind < parser.arguments(); ++argind ) { - if( parser.argument( argind ) != "-" ) filenames_given = true; filenames.push_back( parser.argument( argind ) ); + if( filenames.back() != "-" ) filenames_given = true; } if( filenames.empty() ) filenames.push_back("-"); @@ -627,11 +596,12 @@ int main( const int argc, const char * const argv[] ) ( filenames_given || default_output_filename.size() ) ) set_signals(); std::signal( SIGUSR1, signal_handler ); + std::signal( SIGUSR2, signal_handler ); Pretty_print pp( filenames ); int retval = 0; - for( unsigned int i = 0; i < filenames.size(); ++i ) + for( unsigned i = 0; i < filenames.size(); ++i ) { struct stat in_stats; output_filename.clear(); @@ -649,10 +619,10 @@ int main( const int argc, const char * const argv[] ) if( program_mode == m_compress ) set_c_outname( default_output_filename ); else output_filename = default_output_filename; - outfd_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH; + outfd_mode = all_rw; if( !open_outstream( force ) ) { - if( outfd == -1 && retval < 1 ) retval = 1; + if( retval < 1 ) retval = 1; close( infd ); infd = -1; continue; } @@ -674,10 +644,10 @@ int main( const int argc, const char * const argv[] ) if( program_mode == m_compress ) set_c_outname( input_filename ); else set_d_outname( input_filename, eindex ); - outfd_mode = S_IRUSR | S_IWUSR; + outfd_mode = usr_rw; if( !open_outstream( force ) ) { - if( outfd == -1 && retval < 1 ) retval = 1; + if( retval < 1 ) retval = 1; close( infd ); infd = -1; continue; } @@ -690,16 +660,17 @@ int main( const int argc, const char * const argv[] ) if( output_filename.size() && !to_stdout && program_mode != m_test ) delete_output_on_interrupt = true; const struct stat * const in_statsp = input_filename.size() ? &in_stats : 0; + const bool infd_isreg = in_statsp && S_ISREG( in_statsp->st_mode ); pp.set_name( input_filename ); if( verbosity >= 1 ) pp(); - int tmp = 0; + int tmp; if( program_mode == m_compress ) tmp = compress( data_size, encoder_options.dictionary_size, encoder_options.match_len_limit, num_workers, infd, outfd, pp, debug_level ); else tmp = decompress( num_workers, infd, outfd, pp, debug_level, - program_mode == m_test ); + program_mode == m_test, infd_isreg ); if( tmp > retval ) retval = tmp; if( tmp && program_mode != m_test ) cleanup_and_fail( retval ); diff --git a/plzip.h b/plzip.h deleted file mode 100644 index b12bbbd..0000000 --- a/plzip.h +++ /dev/null @@ -1,131 +0,0 @@ -/* Plzip - A parallel compressor compatible with lzip - Copyright (C) 2009 Laszlo Ersek. - Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz. - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -class Pretty_print - { - const char * const stdin_name; - unsigned int longest_name; - std::string name_; - mutable bool first_post; - -public: - explicit Pretty_print( const std::vector< std::string > & filenames ) - : stdin_name( "(stdin)" ), longest_name( 0 ), first_post( false ) - { - const unsigned int stdin_name_len = std::strlen( stdin_name ); - for( unsigned int i = 0; i < filenames.size(); ++i ) - { - const std::string & s = filenames[i]; - const unsigned int len = ( ( s == "-" ) ? stdin_name_len : s.size() ); - if( len > longest_name ) longest_name = len; - } - if( longest_name == 0 ) longest_name = stdin_name_len; - } - - void set_name( const std::string & filename ) - { - if( filename.size() && filename != "-" ) name_ = filename; - else name_ = stdin_name; - first_post = true; - } - - void reset() const throw() { if( name_.size() ) first_post = true; } - const char * name() const throw() { return name_.c_str(); } - void operator()( const char * const msg = 0 ) const throw(); - }; - - -/*--------------------- Defined in compress.cc ---------------------*/ - -void xinit( pthread_mutex_t * const mutex ); -void xinit( pthread_cond_t * const cond ); -void xdestroy( pthread_mutex_t * const mutex ); -void xdestroy( pthread_cond_t * const cond ); -void xlock( pthread_mutex_t * const mutex ); -void xunlock( pthread_mutex_t * const mutex ); -void xwait( pthread_cond_t * const cond, pthread_mutex_t * const mutex ); -void xsignal( pthread_cond_t * const cond ); -void xbroadcast( pthread_cond_t * const cond ); - - -class Slot_tally - { -public: - unsigned long check_counter; - unsigned long wait_counter; -private: - const int num_slots; // total slots - int num_free; // remaining free slots - pthread_mutex_t mutex; - pthread_cond_t slot_av; // free slot available - - Slot_tally( const Slot_tally & ); // declared as private - void operator=( const Slot_tally & ); // declared as private - -public: - explicit Slot_tally( const int slots ) - : check_counter( 0 ), wait_counter( 0 ), - num_slots( slots ), num_free( slots ) - { xinit( &mutex ); xinit( &slot_av ); } - - ~Slot_tally() { xdestroy( &slot_av ); xdestroy( &mutex ); } - - bool all_free() { return ( num_free == num_slots ); } - - void get_slot() // wait for a free slot - { - xlock( &mutex ); - ++check_counter; - while( num_free <= 0 ) - { ++wait_counter; xwait( &slot_av, &mutex ); ++check_counter; } - --num_free; - xunlock( &mutex ); - } - - void leave_slot() // return a slot to the tally - { - xlock( &mutex ); - if( ++num_free == 1 ) xsignal( &slot_av ); - xunlock( &mutex ); - } - }; - - -int compress( const int data_size, const int dictionary_size, - const int match_len_limit, const int num_workers, - const int infd, const int outfd, - const Pretty_print & pp, const int debug_level ); - - -/*-------------------- Defined in decompress.cc --------------------*/ - -int decompress( const int num_workers, const int infd, const int outfd, - const Pretty_print & pp, const int debug_level, - const bool testing ); - - -/*----------------------- Defined in main.cc -----------------------*/ - -extern int verbosity; - -void fatal(); // terminate the program - -void show_error( const char * const msg, const int errcode = 0, const bool help = false ) throw(); -void internal_error( const char * const msg ) throw(); -int readblock( const int fd, uint8_t * const buf, const int size ) throw(); -int writeblock( const int fd, const uint8_t * const buf, const int size ) throw(); diff --git a/testsuite/check.sh b/testsuite/check.sh index d4f919a..a044738 100755 --- a/testsuite/check.sh +++ b/testsuite/check.sh @@ -1,6 +1,6 @@ #! /bin/sh # check script for Plzip - A parallel compressor compatible with lzip -# Copyright (C) 2009, 2010 Antonio Diaz Diaz. +# Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. # # This script is free software: you have unlimited permission # to copy, distribute and modify it. @@ -22,27 +22,36 @@ mkdir tmp cd "${objdir}"/tmp cat "${testdir}"/test.txt > in || framework_failure -cat in in in in > in4 || framework_failure +in_lz="${testdir}"/test.txt.lz fail=0 printf "testing plzip-%s..." "$2" -"${LZIP}" -t "${testdir}"/test_v0.lz || fail=1 -printf . -"${LZIP}" -cd "${testdir}"/test_v0.lz > copy || fail=1 +"${LZIP}" -t "${in_lz}" || fail=1 +"${LZIP}" -cd "${in_lz}" > copy || fail=1 cmp in copy || fail=1 printf . -"${LZIP}" -t "${testdir}"/test_v1.lz || fail=1 -printf . -"${LZIP}" -cd "${testdir}"/test_v1.lz > copy || fail=1 +"${LZIP}" -cfq "${in_lz}" > out +if [ $? != 1 ] ; then fail=1 ; printf - ; else printf . ; fi +"${LZIP}" -cF "${in_lz}" > out || fail=1 +"${LZIP}" -cd out | "${LZIP}" -d > copy || fail=1 cmp in copy || fail=1 printf . +"${LZIP}" -cqs-1 in > out +if [ $? != 1 ] ; then fail=1 ; printf - ; else printf . ; fi +"${LZIP}" -cqs0 in > out +if [ $? != 1 ] ; then fail=1 ; printf - ; else printf . ; fi +"${LZIP}" -cqs4095 in > out +if [ $? != 1 ] ; then fail=1 ; printf - ; else printf . ; fi +"${LZIP}" -cqm274 in > out +if [ $? != 1 ] ; then fail=1 ; printf - ; else printf . ; fi + for i in s4Ki 0 1 2 3 4 5 6 7 8 9 ; do "${LZIP}" -k -$i in || fail=1 mv -f in.lz copy.lz || fail=1 -# printf "garbage" >> copy.lz || fail=1 + printf "garbage" >> copy.lz || fail=1 "${LZIP}" -df copy.lz || fail=1 cmp in copy || fail=1 printf . @@ -50,7 +59,7 @@ done for i in s4Ki 0 1 2 3 4 5 6 7 8 9 ; do "${LZIP}" -c -$i in > out || fail=1 -# printf "g" >> out || fail=1 + printf "g" >> out || fail=1 "${LZIP}" -cd out > copy || fail=1 cmp in copy || fail=1 printf . @@ -58,6 +67,7 @@ done for i in s4Ki 0 1 2 3 4 5 6 7 8 9 ; do "${LZIP}" -$i < in > out || fail=1 + printf "garbage" >> out || fail=1 "${LZIP}" -d < out > copy || fail=1 cmp in copy || fail=1 printf . @@ -65,6 +75,7 @@ done for i in s4Ki 0 1 2 3 4 5 6 7 8 9 ; do "${LZIP}" -f -$i -o out < in || fail=1 + printf "g" >> out.lz || fail=1 "${LZIP}" -df -o copy < out.lz || fail=1 cmp in copy || fail=1 printf . @@ -75,13 +86,49 @@ done cmp in anyothername.out || fail=1 printf . +cat in in in in > in4 || framework_failure +for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ; do + "${LZIP}" -c -s4Ki -B8Ki -n$i in4 > out4.lz || fail=1 + printf "g" >> out4.lz || fail=1 + "${LZIP}" -cd -n$i out4.lz > copy4 || fail=1 + cmp in4 copy4 || fail=1 + "${LZIP}" -d -n$i out4.lz || fail=1 + cmp in4 out4 || fail=1 + rm -f out4 + printf . +done + for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ; do "${LZIP}" -s4Ki -B8Ki -n$i < in4 > out4 || fail=1 + printf "g" >> out4 || fail=1 "${LZIP}" -d -n$i < out4 > copy4 || fail=1 cmp in4 copy4 || fail=1 printf . done +cat "${in_lz}" > ingin.lz || framework_failure +printf "g" >> ingin.lz || framework_failure +cat "${in_lz}" >> ingin.lz || framework_failure +"${LZIP}" -tq ingin.lz +if [ $? != 2 ] ; then fail=1 ; printf - ; else printf . ; fi +"${LZIP}" -cdq ingin.lz > out +if [ $? != 2 ] ; then fail=1 ; printf - ; else printf . ; fi +"${LZIP}" -t < ingin.lz || fail=1 +printf . +"${LZIP}" -d < ingin.lz > copy || fail=1 +cmp in copy || fail=1 +printf . + +dd if="${in_lz}" bs=1024 count=10 > trunc.lz 2> /dev/null || framework_failure +"${LZIP}" -tq trunc.lz +if [ $? != 2 ] ; then fail=1 ; printf - ; else printf . ; fi +"${LZIP}" -cdq trunc.lz > out +if [ $? != 2 ] ; then fail=1 ; printf - ; else printf . ; fi +"${LZIP}" -tq < trunc.lz +if [ $? != 2 ] ; then fail=1 ; printf - ; else printf . ; fi +"${LZIP}" -dq < trunc.lz > out +if [ $? != 2 ] ; then fail=1 ; printf - ; else printf . ; fi + echo if [ ${fail} = 0 ] ; then echo "tests completed successfully." diff --git a/testsuite/test.txt.lz b/testsuite/test.txt.lz Binary files differnew file mode 100644 index 0000000..4db881a --- /dev/null +++ b/testsuite/test.txt.lz diff --git a/testsuite/test_v0.lz b/testsuite/test_v0.lz Binary files differdeleted file mode 100644 index a09b1e8..0000000 --- a/testsuite/test_v0.lz +++ /dev/null diff --git a/testsuite/test_v1.lz b/testsuite/test_v1.lz Binary files differdeleted file mode 100644 index f1c79eb..0000000 --- a/testsuite/test_v1.lz +++ /dev/null |