diff options
-rw-r--r-- | ChangeLog | 10 | ||||
-rw-r--r-- | Makefile.in | 4 | ||||
-rw-r--r-- | NEWS | 13 | ||||
-rwxr-xr-x | configure | 4 | ||||
-rw-r--r-- | doc/plzip.1 | 5 | ||||
-rw-r--r-- | doc/plzip.info | 26 | ||||
-rw-r--r-- | doc/plzip.texinfo | 20 | ||||
-rw-r--r-- | main.cc | 445 | ||||
-rw-r--r-- | main.h | 77 | ||||
-rw-r--r-- | plzip.cc | 641 | ||||
-rw-r--r-- | plzip.h | 26 |
11 files changed, 456 insertions, 815 deletions
@@ -1,3 +1,13 @@ +2010-01-24 Antonio Diaz Diaz <ant_diaz@teleline.es> + + * Version 0.3 released. + * Implemented option "--data-size". + * Output file is now removed if plzip is interrupted. + * This version automatically chooses the smallest possible + dictionary size for each member during compression, saving + memory during decompression. + * main.cc: New constant "o_binary". + 2010-01-17 Antonio Diaz Diaz <ant_diaz@teleline.es> * Version 0.2 released. diff --git a/Makefile.in b/Makefile.in index f1267e2..6eea4bf 100644 --- a/Makefile.in +++ b/Makefile.in @@ -30,8 +30,8 @@ main.o : main.cc $(objs) : Makefile arg_parser.o : arg_parser.h -main.o : arg_parser.h main.h plzip.h -plzip.o : main.h plzip.h +main.o : arg_parser.h plzip.h +plzip.o : plzip.h doc : info man @@ -1,4 +1,11 @@ -Changes in version 0.2: +Changes in version 0.3: -Lzip options "--dictionary-size" and "--match-length" have been -implemented. +New option "--data-size" has been added. + +Output file is now removed if plzip is interrupted. + +This version automatically chooses the smallest possible dictionary size +for each member during compression, saving memory during decompression. + +Regular files are now open in binary mode in non-POSIX platforms +defining the O_BINARY macro. @@ -5,12 +5,12 @@ # This configure script is free software: you have unlimited permission # to copy, distribute and modify it. # -# Date of this version: 2010-01-17 +# Date of this version: 2010-01-24 args= no_create= pkgname=plzip -pkgversion=0.2 +pkgversion=0.3 progname=plzip srctrigger=plzip.h diff --git a/doc/plzip.1 b/doc/plzip.1 index 0fefe38..feb0aa1 100644 --- a/doc/plzip.1 +++ b/doc/plzip.1 @@ -1,5 +1,5 @@ .\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.36. -.TH PLZIP "1" "January 2010" "Plzip 0.2" "User Commands" +.TH PLZIP "1" "January 2010" "Plzip 0.3" "User Commands" .SH NAME Plzip \- data compressor based on the LZMA algorithm .SH SYNOPSIS @@ -15,6 +15,9 @@ display this help and exit \fB\-V\fR, \fB\-\-version\fR output version information and exit .TP +\fB\-B\fR, \fB\-\-data\-size=\fR<n> +set input data block size in bytes +.TP \fB\-c\fR, \fB\-\-stdout\fR send output to standard output .TP diff --git a/doc/plzip.info b/doc/plzip.info index ca7bda4..9a48fcb 100644 --- a/doc/plzip.info +++ b/doc/plzip.info @@ -12,7 +12,7 @@ File: plzip.info, Node: Top, Next: Introduction, Up: (dir) Plzip Manual ************ -This manual is for Plzip (version 0.2, 17 January 2010). +This manual is for Plzip (version 0.3, 24 January 2010). * Menu: @@ -34,9 +34,11 @@ File: plzip.info, Node: Introduction, Next: Invoking Plzip, Prev: Top, Up: T 1 Introduction ************** -Plzip is a parallel version of the lzip data compressor. Currently only -compression is performed in parallel. Parallel decompression is planned -to be implemented later. +Plzip is a parallel version of the lzip data compressor. The files +produced by plzip are fully compatible with lzip-1.4 or newer. Plzip is +intended for faster compression/decompression of big files on +multiprocessor machines. Currently only compression is performed in +parallel. Parallel decompression is planned to be implemented later. Lzip is a lossless data compressor based on the LZMA algorithm, with very safe integrity checking and a user interface similar to the one of @@ -104,6 +106,14 @@ The format for running plzip is: `-V' Print the version number of plzip on the standard output and exit. +`--data-size=SIZE' +`-B' + Set the input data block size in bytes. The input file will be + divided in chunks of this size before compression is performed. + Valid values range from 100kB to 1GiB. Default value is two times + the dictionary size. It is a waste of memory to choose a data size + smaller than the dictionary size. + `--stdout' `-c' Compress or decompress to standard output. Needed when reading @@ -295,9 +305,9 @@ Concept Index Tag Table: Node: Top227 Node: Introduction750 -Node: Invoking Plzip3402 -Node: File Format6747 -Node: Problems8703 -Node: Concept Index9232 +Node: Invoking Plzip3571 +Node: File Format7260 +Node: Problems9216 +Node: Concept Index9745 End Tag Table diff --git a/doc/plzip.texinfo b/doc/plzip.texinfo index 7712600..04bf822 100644 --- a/doc/plzip.texinfo +++ b/doc/plzip.texinfo @@ -5,8 +5,8 @@ @finalout @c %**end of header -@set UPDATED 17 January 2010 -@set VERSION 0.2 +@set UPDATED 24 January 2010 +@set VERSION 0.3 @dircategory Data Compression @direntry @@ -50,9 +50,11 @@ to copy, distribute and modify it. @chapter Introduction @cindex introduction -Plzip is a parallel version of the lzip data compressor. Currently only -compression is performed in parallel. Parallel decompression is planned -to be implemented later. +Plzip is a parallel version of the lzip data compressor. The files +produced by plzip are fully compatible with lzip-1.4 or newer. Plzip is +intended for faster compression/decompression of big files on +multiprocessor machines. Currently only compression is performed in +parallel. Parallel decompression is planned to be implemented later. Lzip is a lossless data compressor based on the LZMA algorithm, with very safe integrity checking and a user interface similar to the one of @@ -127,6 +129,14 @@ Print an informative help message describing the options and exit. @itemx -V Print the version number of plzip on the standard output and exit. +@item --data-size=@var{size} +@itemx -B +Set the input data block size in bytes. The input file will be divided +in chunks of this size before compression is performed. Valid values +range from 100kB to 1GiB. Default value is two times the dictionary +size. It is a waste of memory to choose a data size smaller than the +dictionary size. + @item --stdout @itemx -c Compress or decompress to standard output. Needed when reading from a @@ -25,20 +25,17 @@ #define _FILE_OFFSET_BITS 64 #include <algorithm> -#include <cassert> #include <cerrno> #include <climits> #include <csignal> -#include <cstdarg> -#include <cstddef> #include <cstdio> #include <cstdlib> #include <cstring> #include <string> #include <vector> #include <fcntl.h> -#include <stdint.h> #include <pthread.h> +#include <stdint.h> #include <unistd.h> #include <utime.h> #include <sys/stat.h> @@ -49,7 +46,6 @@ #endif #include "arg_parser.h" -#include "main.h" #include "plzip.h" #ifndef LLONG_MAX @@ -72,6 +68,12 @@ const char * const Program_name = "Plzip"; const char * const program_name = "plzip"; const char * const program_year = "2010"; +#ifdef O_BINARY +const int o_binary = O_BINARY; +#else +const int o_binary = 0; +#endif + struct { const char * from; const char * to; } const known_extensions[] = { { ".lz", "" }, { ".tlz", ".tar" }, @@ -88,6 +90,8 @@ enum Mode { m_compress = 0, m_decompress, m_test }; std::string output_filename; int outhandle = -1; bool delete_output_on_interrupt = false; +pthread_t main_thread; +pid_t main_thread_pid; class Pretty_print { @@ -132,6 +136,7 @@ void show_help() throw() std::printf( " -h, --help display this help and exit\n" ); std::printf( " -V, --version output version information and exit\n" ); // std::printf( " -b, --member-size=<n> set member size limit in bytes\n" ); + std::printf( " -B, --data-size=<n> set input data block size in bytes\n" ); std::printf( " -c, --stdout send output to standard output\n" ); std::printf( " -d, --decompress decompress\n" ); std::printf( " -f, --force overwrite existing output files\n" ); @@ -267,7 +272,7 @@ int open_instream( const std::string & name, struct stat * in_statsp, } else { - inhandle = open( name.c_str(), O_RDONLY ); + inhandle = open( name.c_str(), O_RDONLY | o_binary ); if( inhandle < 0 ) { if( verbosity >= 0 ) @@ -324,9 +329,11 @@ void set_d_outname( const std::string & name, const int i ) throw() bool open_outstream( const bool force ) throw() { if( force ) - outhandle = open( output_filename.c_str(), O_CREAT | O_TRUNC | O_WRONLY, + outhandle = open( output_filename.c_str(), + O_CREAT | O_TRUNC | O_WRONLY | o_binary, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH ); - else outhandle = open( output_filename.c_str(), O_CREAT | O_EXCL | O_WRONLY, + else outhandle = open( output_filename.c_str(), + O_CREAT | O_EXCL | O_WRONLY | o_binary, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH ); if( outhandle < 0 ) { @@ -412,14 +419,13 @@ int do_decompress( LZ_Decoder * const decoder, const int inhandle, const int in_buffer_size = 65536, out_buffer_size = 8 * in_buffer_size; uint8_t in_buffer[in_buffer_size], out_buffer[out_buffer_size]; - if( verbosity >= 1 ) pp(); while( true ) { int in_size = std::min( LZ_decompress_write_size( decoder ), in_buffer_size ); if( in_size > 0 ) { const int max_in_size = in_size; - in_size = readblock( inhandle, (char *)in_buffer, max_in_size ); + in_size = readblock( inhandle, in_buffer, max_in_size ); if( in_size != max_in_size && errno ) { pp(); show_error( "read error", errno ); return 1; } if( in_size == 0 ) LZ_decompress_finish( decoder ); @@ -458,7 +464,7 @@ int do_decompress( LZ_Decoder * const decoder, const int inhandle, } else if( out_size > 0 && outhandle >= 0 ) { - const int wr = writeblock( outhandle, (char *)out_buffer, out_size ); + const int wr = writeblock( outhandle, out_buffer, out_size ); if( wr != out_size ) { pp(); show_error( "write error", errno ); return 1; } } @@ -490,9 +496,12 @@ int decompress( const int inhandle, const Pretty_print & pp, } -extern "C" void signal_handler( int ) throw() +extern "C" void signal_handler( int sig ) throw() { - show_error( "Control-C or similar caught, quitting." ); + if( !pthread_equal( pthread_self(), main_thread ) ) + kill( main_thread_pid, sig ); + if( sig != SIGUSR1 ) + show_error( "Control-C or similar caught, quitting." ); cleanup_and_fail( 1 ); } @@ -502,6 +511,7 @@ void set_signals() throw() signal( SIGHUP, signal_handler ); signal( SIGINT, signal_handler ); signal( SIGTERM, signal_handler ); + signal( SIGUSR1, signal_handler ); } } // end namespace @@ -551,372 +561,16 @@ void internal_error( const char * msg ) } -/* Private stuff needed by fatal(). */ -static pthread_t main_thread; - -static pid_t pid; - - -/* Public utility variables and functions. */ - -/* - This can be called from any thread, main thread or sub-threads alike, since - they all call common helper functions that call fatal() in case of an error. -*/ -void fatal() -{ - if( pthread_equal(pthread_self(), main_thread) ) - cleanup_and_fail( 1 ); - else - { - if( 0 == kill(pid, SIGUSR1) ) - pthread_exit(0); - } - _exit( 1 ); -} - - -void -fail(const char *fmt, int err, ...) -{ - va_list args; - - /* Locking stderr should also protect strerror(). */ - flockfile(stderr); - (void)fprintf(stderr, "%s: ", program_name); - - va_start(args, err); - (void)vfprintf(stderr, fmt, args); - va_end(args); - - (void)fprintf(stderr, ": %s\n", strerror(err)); - funlockfile(stderr); - /* Stream stderr is never fully buffered originally. */ - fatal(); -} - - -void -xinit(Cond *cond) -{ - pthread_mutexattr_t attr; - - int ret = pthread_mutexattr_init(&attr); - if( ret != 0 ) { - fail("pthread_mutexattr_init()", ret); - } - - ret = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK); - if( ret != 0 ) { - fail("pthread_mutexattr_settype()", ret); - } - - ret = pthread_mutex_init(&cond->lock, &attr); - if( ret != 0 ) { - fail("pthread_mutex_init()", ret); - } - - ret = pthread_mutexattr_destroy(&attr); - if( ret != 0 ) { - fail("pthread_mutexattr_destroy()", ret); - } - - ret = pthread_cond_init(&cond->cond, 0); - if( ret != 0 ) { - fail("pthread_cond_init()", ret); - } - - cond->ccount = 0; - cond->wcount = 0; -} - - -void -xdestroy(Cond *cond) -{ - int ret = pthread_cond_destroy(&cond->cond); - if( ret != 0 ) { - fail("pthread_cond_destroy()", ret); - } - - ret = pthread_mutex_destroy(&cond->lock); - if( ret != 0 ) { - fail("pthread_mutex_destroy()", ret); - } -} - - -void -xlock(Cond *cond) -{ - int ret = pthread_mutex_lock(&cond->lock); - if( ret != 0 ) { - fail("pthread_mutex_lock()", ret); - } -} - - -void -xlock_pred(Cond *cond) -{ - xlock(cond); - ++cond->ccount; -} - - -void -xunlock(Cond *cond) -{ - int ret = pthread_mutex_unlock(&cond->lock); - if( ret != 0 ) { - fail("pthread_mutex_unlock()", ret); - } -} - - -void -xwait(Cond *cond) -{ - ++cond->wcount; - int ret = pthread_cond_wait(&cond->cond, &cond->lock); - if( ret != 0 ) { - fail("pthread_cond_wait()", ret); - } - ++cond->ccount; -} - - -void -xsignal(Cond *cond) -{ - int ret = pthread_cond_signal(&cond->cond); - if( ret != 0 ) { - fail("pthread_cond_signal()", ret); - } -} - - -void -xbroadcast(Cond *cond) -{ - int ret = pthread_cond_broadcast(&cond->cond); - if( ret != 0 ) { - fail("pthread_cond_broadcast()", ret); - } -} - - -void -xcreate(pthread_t *thread, void *(*routine)(void *), void *arg) -{ - int ret = pthread_create(thread, 0, routine, arg); - if( ret != 0 ) { - fail("pthread_create()", ret); - } -} - - -void -xjoin(pthread_t thread) -{ - int ret = pthread_join(thread, 0); - if( ret != 0 ) { - fail("pthread_join()", ret); - } -} - - -void -xraise(int sig) -{ - if( -1 == kill(pid, sig) ) { - fail("kill()", errno); - } -} - - -/* Private stuff part 2. */ - - -static void -xsigemptyset(sigset_t *set) -{ - if( -1 == sigemptyset(set) ) { - fail("sigemptyset()", errno); - } -} - - -static void -xsigaddset(sigset_t *set, int signo) -{ - if( -1 == sigaddset(set, signo) ) { - fail("sigaddset()", errno); - } -} - - -static void -xsigmask(int how, const sigset_t *set, sigset_t *oset) -{ - int ret = pthread_sigmask(how, set, oset); - if( ret != 0 ) { - fail("pthread_sigmask()", ret); - } -} - - -static void -xsigaction(int sig, void (*handler)(int)) -{ - struct sigaction act; - - act.sa_handler = handler; - xsigemptyset(&act.sa_mask); - act.sa_flags = 0; - - if( -1 == sigaction(sig, &act, 0) ) { - fail("sigaction()", errno); - } -} - - -enum Caught_sig { CS_INT = 1, CS_TERM, CS_USR1, CS_USR2 }; - -static volatile sig_atomic_t caught_sig; - - -extern "C" void sighandler( int sig ) - { - /* sig_atomic_t is nowhere required to be able to hold signal values. */ - switch( sig ) - { - case SIGINT : caught_sig = CS_INT; break; - case SIGTERM: caught_sig = CS_TERM; break; - case SIGUSR1: caught_sig = CS_USR1; break; - case SIGUSR2: caught_sig = CS_USR2; break; - default: internal_error( "caught signal not in set" ); - } - } - - -static void compress( const lzma_options & encoder_options, const int num_workers, - int debug_level, int num_slots, int infd, int outfd, - const Pretty_print & pp, const sigset_t *unblocked ) - { - /* - We could wait for signals with either sigwait() or sigsuspend(). SUSv2 - states about sigwait() that its effect on signal actions is unspecified. - SUSv3 still claims the same. - - The SUSv2 description of sigsuspend() talks about both the thread and the - whole process being suspended until a signal arrives, although thread - suspension seems much more likely from the wording. They note that they - filed a clarification request for this. SUSv3 cleans this up and chooses - thread suspension which was more logical anyway. - - I favor sigsuspend() because I need to re-raise SIGTERM and SIGINT, and - unspecified action behavior with sigwait() seems messy. - - 13-OCT-2009 lacos - */ - - if( verbosity >= 1 ) pp(); - - Muxer_arg muxer_arg; - muxer_arg.dictionary_size = encoder_options.dictionary_size; - muxer_arg.match_len_limit = encoder_options.match_len_limit; - muxer_arg.num_workers = num_workers; - muxer_arg.num_slots = num_slots; - muxer_arg.debug_level = debug_level; - muxer_arg.infd = infd; - muxer_arg.outfd = outfd; - - pthread_t muxer_thread; - xcreate(&muxer_thread, muxer, &muxer_arg); - - /* Unblock signals, wait for them, then block them again. */ - { - int ret = sigsuspend(unblocked); - assert(-1 == ret && EINTR == errno); - } - - switch( caught_sig ) { - case CS_INT: - case CS_TERM: // FIXME remove output file - { - int sig; - sigset_t mask; - - sig = (CS_INT == caught_sig) ? SIGINT : SIGTERM; - /* - We might have inherited a SIG_IGN from the parent, but that would - make no sense here. 24-OCT-2009 lacos - */ - xsigaction(sig, SIG_DFL); - xraise(sig); - - xsigemptyset(&mask); - xsigaddset(&mask, sig); - xsigmask(SIG_UNBLOCK, &mask, 0); - } - /* - We shouldn't reach this point, but if we do for some reason, fall - through. - */ - - case CS_USR1: - /* Error from a non-main thread via fatal(). */ - fatal(); - - case CS_USR2: - /* Muxer thread joined other sub-threads and finished successfully. */ - break; - - default: - assert(0); - } - - xjoin(muxer_thread); - } - - -static void -sigs_mod(int block_n_catch, sigset_t *oset) - { - void (*handler)(int); - - if( block_n_catch ) { - sigset_t mask; - - xsigemptyset(&mask); - xsigaddset(&mask, SIGINT); - xsigaddset(&mask, SIGTERM); - xsigaddset(&mask, SIGUSR1); - xsigaddset(&mask, SIGUSR2); - xsigmask(SIG_BLOCK, &mask, oset); - - handler = sighandler; - } - else { - handler = SIG_DFL; - } - - xsigaction(SIGINT, handler); - xsigaction(SIGTERM, handler); - xsigaction(SIGUSR1, handler); - xsigaction(SIGUSR2, handler); - - if( !block_n_catch ) { - xsigmask(SIG_SETMASK, oset, 0); - } - } +// This can be called from any thread, main thread or sub-threads alike, since +// they all call common helper functions that call fatal() in case of an error. +// +void fatal() { signal_handler( SIGUSR1 ); } // Returns the number of bytes really read. // If (returned value < size) and (errno == 0), means EOF was reached. // -int readblock( const int fd, char * buf, const int size ) throw() +int readblock( const int fd, uint8_t * buf, const int size ) throw() { int rest = size; errno = 0; @@ -935,7 +589,7 @@ int readblock( const int fd, char * buf, const int size ) throw() // Returns the number of bytes really written. // If (returned value < size), it is always an error. // -int writeblock( const int fd, const char * buf, const int size ) throw() +int writeblock( const int fd, const uint8_t * buf, const int size ) throw() { int rest = size; errno = 0; @@ -966,6 +620,7 @@ int main( const int argc, const char * argv[] ) { 1 << 24, 163 }, // -8 { 1 << 25, 273 } }; // -9 lzma_options encoder_options = option_mapping[5]; // default = "-6" + int data_size = 0; int debug_level = 0; int inhandle = -1; int num_workers = 0; // Start this many worker threads @@ -977,22 +632,18 @@ int main( const int argc, const char * argv[] ) std::string default_output_filename; std::vector< std::string > filenames; invocation_name = argv[0]; + main_thread = pthread_self(); + main_thread_pid = getpid(); if( LZ_version()[0] != LZ_version_string[0] ) internal_error( "bad library version" ); - main_thread = pthread_self(); - pid = getpid(); - - xsigaction(SIGPIPE, SIG_IGN); - xsigaction(SIGXFSZ, SIG_IGN); - const int slots_per_worker = 2; long max_workers = sysconf( _SC_THREAD_THREADS_MAX ); if( max_workers < 1 || max_workers > INT_MAX / slots_per_worker ) max_workers = INT_MAX / slots_per_worker; - if( max_workers > INT_MAX / (int)sizeof( pthread_t ) ) - max_workers = INT_MAX / sizeof( pthread_t ); + if( max_workers > INT_MAX / (int)sizeof (pthread_t) ) + max_workers = INT_MAX / sizeof (pthread_t); const Arg_parser::Option options[] = { @@ -1006,6 +657,7 @@ int main( const int argc, const char * argv[] ) { '8', 0, Arg_parser::no }, { '9', "best", Arg_parser::no }, { 'b', "member-size", Arg_parser::yes }, + { 'B', "data-size", Arg_parser::yes }, { 'c', "stdout", Arg_parser::no }, { 'd', "decompress", Arg_parser::no }, { 'D', "debug", Arg_parser::yes }, @@ -1040,6 +692,8 @@ int main( const int argc, const char * argv[] ) case '7': case '8': case '9': encoder_options = option_mapping[code-'1']; break; case 'b': break; + case 'B': data_size = getnum( arg, 0, 100000, + 2 * LZ_max_dictionary_size() ); break; case 'c': to_stdout = true; break; case 'd': program_mode = m_decompress; break; case 'D': debug_level = getnum( arg, 0, 0, 3 ); @@ -1048,8 +702,8 @@ int main( const int argc, const char * argv[] ) case 'h': show_help(); return 0; case 'k': keep_input_files = true; break; case 'm': encoder_options.match_len_limit = - getnum( arg, 0, LZ_min_match_len_limit(), - LZ_max_match_len_limit() ); break; + getnum( arg, 0, LZ_min_match_len_limit(), + LZ_max_match_len_limit() ); break; case 'o': default_output_filename = arg; break; case 'n': num_workers = getnum( arg, 0, 1, max_workers ); break; case 'q': verbosity = -1; break; @@ -1063,13 +717,16 @@ int main( const int argc, const char * argv[] ) } } + if( data_size <= 0 ) + data_size = 2 * std::max( 65536, encoder_options.dictionary_size ); + if( num_workers <= 0 ) { long num_online = sysconf( _SC_NPROCESSORS_ONLN ); - if( num_online <= 0 ) num_online = 2; + if( num_online <= 0 ) num_online = 1; num_workers = std::min( num_online, max_workers ); } - const int num_slots = num_workers * slots_per_worker; + const int num_slots = std::max( 1, ( num_workers * slots_per_worker ) - 1 ); bool filenames_given = false; for( ; argind < parser.arguments(); ++argind ) @@ -1079,7 +736,9 @@ int main( const int argc, const char * argv[] ) } if( filenames.empty() ) filenames.push_back("-"); - if( filenames_given && program_mode != m_compress ) set_signals(); + if( !to_stdout && program_mode != m_test && + ( filenames_given || default_output_filename.size() ) ) + set_signals(); Pretty_print pp( filenames ); if( program_mode == m_test ) @@ -1144,14 +803,12 @@ int main( const int argc, const char * argv[] ) delete_output_on_interrupt = true; const struct stat * const in_statsp = input_filename.size() ? &in_stats : 0; pp.set_name( input_filename ); + if( verbosity >= 1 ) pp(); int tmp = 0; if( program_mode == m_compress ) - { - sigset_t unblocked; - sigs_mod(1, &unblocked); - compress( encoder_options, num_workers, debug_level, num_slots, inhandle, outhandle, pp, &unblocked ); - sigs_mod(0, &unblocked); - } + tmp = compress( data_size, encoder_options.dictionary_size, + encoder_options.match_len_limit, num_workers, + num_slots, inhandle, outhandle, debug_level ); else tmp = decompress( inhandle, pp, program_mode == m_test ); if( tmp > retval ) retval = tmp; @@ -1,77 +0,0 @@ -/* Plzip - A parallel version of the lzip data compressor - Copyright (C) 2009 Laszlo Ersek. - Copyright (C) 2009, 2010 Antonio Diaz Diaz. - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -void show_error( const char * msg, const int errcode = 0, const bool help = false ) throw(); -int readblock( const int fd, char * buf, const int size ) throw(); -int writeblock( const int fd, const char * buf, const int size ) throw(); - - -struct Cond -{ - pthread_mutex_t lock; /* Lock this to protect shared resource. */ - pthread_cond_t cond; /* Trigger this if predicate becomes true. */ - long unsigned ccount, /* Increment this when checking predicate. */ - wcount; /* Increment this when waiting is necessary. */ -}; - - -/* Terminate the process. */ -void fatal(); - -/* Format operation and append resolved error, then call fatal(). */ -void -fail(const char *fmt, int err, ...) -#ifdef __GNUC__ -__attribute__((format(printf, 1, 3))) -#endif -; - -/* If these primitives fail, they call fail(), which in turn calls fatal(). */ - -void -xinit(Cond *cond); - -void -xdestroy(Cond *cond); - -void -xlock(Cond *cond); - -void -xlock_pred(Cond *cond); - -void -xunlock(Cond *cond); - -void -xwait(Cond *cond); - -void -xsignal(Cond *cond); - -void -xbroadcast(Cond *cond); - -void -xcreate(pthread_t *thread, void *(*routine)(void *), void *arg); - -void -xjoin(pthread_t thread); - -void -xraise(int sig); @@ -26,11 +26,11 @@ #include <cstdio> #include <cstdlib> #include <vector> +#include <pthread.h> #include <stdint.h> #include <unistd.h> #include <lzlib.h> -#include "main.h" #include "plzip.h" #ifndef LLONG_MAX @@ -49,282 +49,304 @@ namespace { long long in_size = 0; long long out_size = 0; -void *(*mallocf)(size_t size); -void (*freef)(void *ptr); +void *(*mallocf)( size_t size ); +void (*freef)( void *ptr ); -void * trace_malloc(size_t size) -{ +void * trace_malloc( size_t size ) + { int save_errno = 0; - void * ret = malloc(size); + void * ret = malloc( size ); if( ret == 0 ) save_errno = errno; - fprintf(stderr, "malloc(%lu) == %p\n", (long unsigned)size, ret); + std::fprintf( stderr, "malloc(%lu) == %p\n", (unsigned long)size, ret ); if( ret == 0 ) errno = save_errno; return ret; -} + } -void trace_free(void *ptr) -{ - fprintf(stderr, "free(%p)\n", ptr); - free(ptr); -} +void trace_free( void *ptr ) + { + std::fprintf( stderr, "free(%p)\n", ptr ); + free( ptr ); + } -void * xalloc(size_t size) -{ - void *ret = (*mallocf)(size); - if( 0 == ret ) fail("(*mallocf)()", errno); +void * xalloc( size_t size ) + { + void *ret = (*mallocf)( size ); + if( ret == 0 ) { show_error( "not enough memory", errno ); fatal(); } return ret; -} - - -struct S2w_blk /* Splitter to workers. */ -{ - unsigned long long id; /* Block serial number as read from infd. */ - S2w_blk *next; /* Next in queue. */ - size_t loaded; /* # of bytes in plain, may be 0 for 1st. */ - unsigned char plain[1]; /* Data read from infd, allocated: sizeof_plain. */ -}; - - -struct S2w_q -{ - Cond av_or_eof; /* New block available or splitter done. */ - S2w_blk *tail, /* Splitter will append here. */ - *head; /* Next ready worker shall compress this. */ - int eof; /* Splitter done. */ -}; - - -void -s2w_q_init(S2w_q *s2w_q) -{ - xinit(&s2w_q->av_or_eof); - s2w_q->tail = 0; - s2w_q->head = 0; - s2w_q->eof = 0; -} - - -void -s2w_q_uninit(S2w_q *s2w_q) -{ - assert(0 != s2w_q->eof); - assert(0 == s2w_q->head); - assert(0 == s2w_q->tail); - xdestroy(&s2w_q->av_or_eof); -} - - -struct W2m_blk /* Workers to muxer data block. */ -{ - unsigned long long id; /* Block index as read from infd. */ - W2m_blk *next; /* Next block in list (unordered). */ - int produced; /* Number of bytes in compr. */ - unsigned char compr[1]; /* Data to write to outfd, alloc.: sizeof_compr. */ -}; - - -struct W2m_q -{ - Cond av_or_exit; /* New block available or all workers exited. */ - unsigned long long needed; /* Block needed for resuming writing. */ - W2m_blk *head; /* Block list (unordered). */ - unsigned working; /* Number of workers still running. */ -}; - - -void -w2m_q_init(W2m_q *w2m_q, int num_workers) -{ - assert(0 < num_workers); - xinit(&w2m_q->av_or_exit); - w2m_q->needed = 0; - w2m_q->head = 0; - w2m_q->working = num_workers; -} - - -void -w2m_q_uninit(W2m_q *w2m_q) -{ - assert(0 == w2m_q->working); - assert(0 == w2m_q->head); - xdestroy(&w2m_q->av_or_exit); -} - - -struct M2s_q // Muxer to splitter queue + } + + +void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex ) { - Cond av; // Free slot available - int num_free; // Number of free slots + int ret = pthread_mutex_init( mutex, 0 ); + if( ret != 0 ) { show_error( "pthread_mutex_init", ret ); fatal(); } - M2s_q( const int slots ) - { - xinit(&av); - num_free = slots; - } + ret = pthread_cond_init( cond, 0 ); + if( ret != 0 ) { show_error( "pthread_cond_init", ret ); fatal(); } + } + + +void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex ) + { + int ret = pthread_cond_destroy( cond ); + if( ret != 0 ) { show_error( "pthread_cond_destroy", ret ); fatal(); } + + ret = pthread_mutex_destroy( mutex ); + if( ret != 0 ) { show_error( "pthread_mutex_destroy", ret ); fatal(); } + } + + +void xlock( pthread_mutex_t * mutex ) + { + int ret = pthread_mutex_lock( mutex ); + if( ret != 0 ) { show_error( "pthread_mutex_lock", ret ); fatal(); } + } + + +void xunlock( pthread_mutex_t * mutex ) + { + int ret = pthread_mutex_unlock( mutex ); + if( ret != 0 ) { show_error( "pthread_mutex_unlock", ret ); fatal(); } + } + + +void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex ) + { + int ret = pthread_cond_wait( cond, mutex ); + if( ret != 0 ) { show_error( "pthread_cond_wait", ret ); fatal(); } + } + + +void xsignal( pthread_cond_t * cond ) + { + int ret = pthread_cond_signal( cond ); + if( ret != 0 ) { show_error( "pthread_cond_signal", ret ); fatal(); } + } + + +void xbroadcast( pthread_cond_t * cond ) + { + int ret = pthread_cond_broadcast( cond ); + if( ret != 0 ) { show_error( "pthread_cond_broadcast", ret ); fatal(); } + } + + +void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg ) + { + int ret = pthread_create( thread, 0, routine, arg ); + if( ret != 0 ) { show_error( "pthread_create", ret ); fatal(); } + } + + +void xjoin( pthread_t thread ) + { + int ret = pthread_join( thread, 0 ); + if( ret != 0 ) { show_error( "pthread_join", ret ); fatal(); } + } + + +struct Slot_tally // Synchronizes splitter to muxer + { + unsigned long check_counter; + unsigned long wait_counter; + int num_free; // Number of free slots + pthread_mutex_t mutex; + pthread_cond_t slot_av; // Free slot available + + Slot_tally( const int slots ) + : check_counter( 0 ), wait_counter( 0 ), num_free( slots ) + { xinit( &slot_av, &mutex ); } + + ~Slot_tally() { xdestroy( &slot_av, &mutex ); } + }; + + +struct S2w_blk // Splitter to worker data block + { + unsigned long long id; // Block serial number as read from infd + S2w_blk *next; // Next in queue + int loaded; // # of bytes in plain, may be 0 for 1st + uint8_t plain[1]; // Data read from infd, allocated: data_size + }; + + +struct S2w_queue + { + S2w_blk * head; // Next ready worker shall compress this + S2w_blk * tail; // Splitter will append here + unsigned long check_counter; + unsigned long wait_counter; + pthread_mutex_t mutex; + pthread_cond_t av_or_eof; // New block available or splitter done + bool eof; // Splitter done + + S2w_queue() + : head( 0 ), tail( 0 ), check_counter( 0 ), wait_counter( 0 ), eof( false ) + { xinit( &av_or_eof, &mutex ); } + + ~S2w_queue() { xdestroy( &av_or_eof, &mutex ); } + }; - ~M2s_q() { xdestroy(&av); } + +struct W2m_blk // Worker to muxer data block + { + unsigned long long id; // Block index as read from infd + W2m_blk *next; // Next block in list (unordered) + int produced; // Number of bytes in compr + uint8_t compr[1]; // Data to write to outfd, alloc.: compr_size + }; + + +struct W2m_queue + { + unsigned long long needed_id; // Block needed for resuming writing + W2m_blk *head; // Block list (unordered) + unsigned long check_counter; + unsigned long wait_counter; + int num_working; // Number of workers still running + pthread_mutex_t mutex; + pthread_cond_t av_or_exit; // New block available or all workers exited + + W2m_queue( const int num_workers ) + : needed_id( 0 ), head( 0 ), check_counter( 0 ), wait_counter( 0 ), + num_working( num_workers ) + { xinit( &av_or_exit, &mutex ); } + + ~W2m_queue() { xdestroy( &av_or_exit, &mutex ); } }; struct Splitter_arg { - M2s_q *m2s_q; - S2w_q *s2w_q; + Slot_tally * slot_tally; + S2w_queue * s2w_queue; int infd; - int sizeof_plain; - size_t sizeof_S2w_blk; + int data_size; + int s2w_blk_size; }; void * splitter( void * arg ) { const Splitter_arg & tmp = *(Splitter_arg *)arg; - M2s_q *m2s_q = tmp.m2s_q; - S2w_q *s2w_q = tmp.s2w_q; + Slot_tally & slot_tally = *tmp.slot_tally; + S2w_queue & s2w_queue = *tmp.s2w_queue; const int infd = tmp.infd; - const int sizeof_plain = tmp.sizeof_plain; - const size_t sizeof_s2w_blk = tmp.sizeof_S2w_blk; + const int data_size = tmp.data_size; + const int s2w_blk_size = tmp.s2w_blk_size; for( unsigned long long id = 0; ; ++id ) { - /* Grab a free slot. */ - xlock_pred(&m2s_q->av); - while( m2s_q->num_free == 0 ) xwait(&m2s_q->av); - --m2s_q->num_free; - xunlock(&m2s_q->av); - S2w_blk * s2w_blk = (S2w_blk *)xalloc(sizeof_s2w_blk); - - /* Fill block. */ - const int rd = readblock( infd, (char *)s2w_blk->plain, sizeof_plain ); - if( rd != sizeof_plain && errno ) fail("read()", errno); - - if( rd == 0 && id != 0 ) - { - /* EOF on first read, but not for first input block. */ - (*freef)(s2w_blk); - xlock(&m2s_q->av); - ++m2s_q->num_free; - xunlock(&m2s_q->av); - } - else + S2w_blk * s2w_blk = (S2w_blk *)xalloc( s2w_blk_size ); + + // Fill block + const int rd = readblock( infd, s2w_blk->plain, data_size ); + if( rd != data_size && errno ) { show_error( "read", errno ); fatal(); } + + if( rd > 0 || id == 0 ) // first block can be empty { s2w_blk->id = id; s2w_blk->next = 0; s2w_blk->loaded = rd; in_size += rd; + xlock( &slot_tally.mutex ); // Grab a free slot + ++slot_tally.check_counter; + while( slot_tally.num_free == 0 ) + { + ++slot_tally.wait_counter; + xwait( &slot_tally.slot_av, &slot_tally.mutex ); + } + --slot_tally.num_free; + xunlock( &slot_tally.mutex ); } + else + { (*freef)( s2w_blk ); s2w_blk = 0; } - xlock(&s2w_q->av_or_eof); - if( s2w_q->head == 0 ) xbroadcast(&s2w_q->av_or_eof); - - if( rd > 0 || id == 0 ) + xlock( &s2w_queue.mutex ); + if( s2w_blk != 0 ) { - if( s2w_q->tail == 0 ) s2w_q->head = s2w_blk; - else s2w_q->tail->next = s2w_blk; - s2w_q->tail = s2w_blk; + if( s2w_queue.tail == 0 ) s2w_queue.head = s2w_blk; + else s2w_queue.tail->next = s2w_blk; + s2w_queue.tail = s2w_blk; + xsignal( &s2w_queue.av_or_eof ); } - s2w_q->eof = ( rd == 0 ); - xunlock(&s2w_q->av_or_eof); + else + { + s2w_queue.eof = true; + xbroadcast( &s2w_queue.av_or_eof ); + } + xunlock( &s2w_queue.mutex ); - if( rd <= 0 ) break; + if( s2w_blk == 0 ) break; } return 0; } -void work_lz_rd(W2m_blk *w2m_blk, const int sizeof_compr, LZ_Encoder * lz) -{ - int rd; - - assert(w2m_blk->produced < sizeof_compr); - rd = LZ_compress_read(lz, w2m_blk->compr + w2m_blk->produced, - sizeof_compr - w2m_blk->produced); - if( -1 == rd ) { - show_error( "LZ_compress_read() failed." ); - fatal(); - } - w2m_blk->produced += rd; -} - - void work_compr( const int dictionary_size, const int match_len_limit, - S2w_blk *s2w_blk, W2m_q *w2m_q, - const int sizeof_compr, const size_t sizeof_w2m_blk ) -{ - W2m_blk *w2m_blk; - - assert(0 < s2w_blk->loaded || 0 == s2w_blk->id); - - w2m_blk = (W2m_blk *)xalloc(sizeof_w2m_blk); - - /* Single member compression. Settings like with lzip -6. */ + const S2w_blk & s2w_blk, W2m_queue & w2m_queue, + const int compr_size, const int w2m_blk_size ) { - LZ_Encoder * lz; - size_t written; + assert( s2w_blk.loaded > 0 || s2w_blk.id == 0 ); - lz = LZ_compress_open( dictionary_size, match_len_limit, LLONG_MAX ); - if( LZ_ok != LZ_compress_errno(lz) ) { - show_error( "LZ_compress_open() failed." ); - fatal(); - } + W2m_blk * w2m_blk = (W2m_blk *)xalloc( w2m_blk_size ); - written = 0; - w2m_blk->produced = 0; - while( written < s2w_blk->loaded ) { - int wr; + const int dict_size = std::max( LZ_min_dictionary_size(), + std::min( dictionary_size, s2w_blk.loaded ) ); + LZ_Encoder * const encoder = + LZ_compress_open( dict_size, match_len_limit, LLONG_MAX ); + if( !encoder || LZ_compress_errno( encoder ) != LZ_ok ) + { show_error( "LZ_compress_open failed." ); fatal(); } - wr = LZ_compress_write(lz, s2w_blk->plain + written, - s2w_blk->loaded - written); - if( -1 == wr ) { - show_error( "LZ_compress_write() failed." ); - fatal(); + int written = 0; + w2m_blk->produced = 0; + while( true ) + { + if( LZ_compress_write_size( encoder ) > 0 ) + { + if( written < s2w_blk.loaded ) + { + const int wr = LZ_compress_write( encoder, s2w_blk.plain + written, + s2w_blk.loaded - written ); + if( wr < 0 ) { show_error( "LZ_compress_write failed." ); fatal(); } + written += wr; + } + if( written >= s2w_blk.loaded ) LZ_compress_finish( encoder ); } - written += (size_t)wr; - - work_lz_rd(w2m_blk, sizeof_compr, lz); + assert( w2m_blk->produced < compr_size ); + const int rd = LZ_compress_read( encoder, w2m_blk->compr + w2m_blk->produced, + compr_size - w2m_blk->produced ); + if( rd < 0 ) { show_error( "LZ_compress_read failed." ); fatal(); } + w2m_blk->produced += rd; + if( LZ_compress_finished( encoder ) == 1 ) break; } - if( -1 == LZ_compress_finish(lz) ) { - show_error( "LZ_compress_finish() failed." ); - fatal(); - } + if( LZ_compress_close( encoder ) < 0 ) + { show_error( "LZ_compress_close failed." ); fatal(); } - while( !LZ_compress_finished(lz) ) { - work_lz_rd(w2m_blk, sizeof_compr, lz); - } + w2m_blk->id = s2w_blk.id; - if( -1 == LZ_compress_close(lz) ) { - show_error( "LZ_compress_close() failed." ); - fatal(); - } + // Push block to muxer queue + xlock( &w2m_queue.mutex ); + w2m_blk->next = w2m_queue.head; + w2m_queue.head = w2m_blk; + if( w2m_blk->id == w2m_queue.needed_id ) xsignal( &w2m_queue.av_or_exit ); + xunlock( &w2m_queue.mutex ); } - w2m_blk->id = s2w_blk->id; - - /* Push block to muxer. */ - xlock(&w2m_q->av_or_exit); - w2m_blk->next = w2m_q->head; - w2m_q->head = w2m_blk; - if( w2m_blk->id == w2m_q->needed ) { - xsignal(&w2m_q->av_or_exit); - } - xunlock(&w2m_q->av_or_exit); -} - struct Worker_arg { int dictionary_size; int match_len_limit; - S2w_q *s2w_q; - W2m_q *w2m_q; - int sizeof_compr; - size_t sizeof_W2m_blk; + S2w_queue * s2w_queue; + W2m_queue * w2m_queue; + int compr_size; + int w2m_blk_size; }; @@ -333,64 +355,68 @@ void * worker( void * arg ) const Worker_arg & tmp = *(Worker_arg *)arg; const int dictionary_size = tmp.dictionary_size; const int match_len_limit = tmp.match_len_limit; - S2w_q *s2w_q = tmp.s2w_q; - W2m_q *w2m_q = tmp.w2m_q; - const int sizeof_compr = tmp.sizeof_compr; - const size_t sizeof_w2m_blk = tmp.sizeof_W2m_blk; + S2w_queue & s2w_queue = *tmp.s2w_queue; + W2m_queue & w2m_queue = *tmp.w2m_queue; + const int compr_size = tmp.compr_size; + const int w2m_blk_size = tmp.w2m_blk_size; while( true ) { S2w_blk *s2w_blk; - /* Grab a block to work on. */ - xlock_pred(&s2w_q->av_or_eof); - while( 0 == s2w_q->head && !s2w_q->eof ) { - xwait(&s2w_q->av_or_eof); - } - if( 0 == s2w_q->head ) { - /* No blocks available and splitter exited. */ - xunlock(&s2w_q->av_or_eof); + // Grab a block to work on + xlock( &s2w_queue.mutex ); + ++s2w_queue.check_counter; + while( s2w_queue.head == 0 && !s2w_queue.eof ) + { + ++s2w_queue.wait_counter; + xwait( &s2w_queue.av_or_eof, &s2w_queue.mutex ); + } + if( s2w_queue.head == 0 ) // No blocks available and splitter exited + { + xunlock( &s2w_queue.mutex ); break; + } + s2w_blk = s2w_queue.head; + s2w_queue.head = s2w_blk->next; + if( s2w_queue.head == 0 ) s2w_queue.tail = 0; + xunlock( &s2w_queue.mutex ); + + work_compr( dictionary_size, match_len_limit, *s2w_blk, w2m_queue, + compr_size, w2m_blk_size ); + (*freef)( s2w_blk ); } - s2w_blk = s2w_q->head; - s2w_q->head = s2w_blk->next; - if( 0 == s2w_q->head ) { - s2w_q->tail = 0; - } - xunlock(&s2w_q->av_or_eof); - work_compr( dictionary_size, match_len_limit, s2w_blk, w2m_q, - sizeof_compr, sizeof_w2m_blk ); - (*freef)(s2w_blk); - } - - /* Notify muxer when last worker exits. */ - xlock(&w2m_q->av_or_exit); - if( 0 == --w2m_q->working && 0 == w2m_q->head ) { - xsignal(&w2m_q->av_or_exit); - } - xunlock(&w2m_q->av_or_exit); + // Notify muxer when last worker exits + xlock( &w2m_queue.mutex ); + if( --w2m_queue.num_working == 0 && w2m_queue.head == 0 ) + xsignal( &w2m_queue.av_or_exit ); + xunlock( &w2m_queue.mutex ); return 0; } -void muxer_loop( W2m_q *w2m_q, M2s_q *m2s_q, const int num_slots, const int outfd ) +void muxer( Slot_tally & slot_tally, W2m_queue & w2m_queue, + const int num_slots, const int outfd ) { unsigned long long needed_id = 0; std::vector< W2m_blk * > circular_buffer( num_slots, (W2m_blk *)0 ); - xlock_pred(&w2m_q->av_or_exit); + xlock( &w2m_queue.mutex ); while( true ) { - /* Grab all available compressed blocks in one step. */ - while( w2m_q->head == 0 && w2m_q->working > 0 ) - xwait(&w2m_q->av_or_exit); - - if( w2m_q->head == 0 ) break; // queue is empty. all workers exited + // Grab all available compressed blocks in one step + ++w2m_queue.check_counter; + while( w2m_queue.head == 0 && w2m_queue.num_working > 0 ) + { + ++w2m_queue.wait_counter; + xwait( &w2m_queue.av_or_exit, &w2m_queue.mutex ); + } + if( w2m_queue.head == 0 ) break; // queue is empty. all workers exited - W2m_blk * w2m_blk = w2m_q->head; - w2m_q->head = 0; - xunlock(&w2m_q->av_or_exit); + W2m_blk * w2m_blk = w2m_queue.head; + w2m_queue.head = 0; + xunlock( &w2m_queue.mutex ); // Merge blocks fetched this time into circular buffer do { @@ -405,30 +431,31 @@ void muxer_loop( W2m_q *w2m_q, M2s_q *m2s_q, const int num_slots, const int outf // Write out initial continuous sequence of reordered blocks while( true ) { - W2m_blk * needed_w2m_blk = circular_buffer[needed_id%num_slots]; - if( needed_w2m_blk == 0 ) break; + w2m_blk = circular_buffer[needed_id%num_slots]; + if( w2m_blk == 0 ) break; - out_size += needed_w2m_blk->produced; + out_size += w2m_blk->produced; if( outfd >= 0 ) { - const int wr = writeblock( outfd, (char *)needed_w2m_blk->compr, needed_w2m_blk->produced ); - if( wr != needed_w2m_blk->produced ) fail("write()", errno); + const int wr = writeblock( outfd, w2m_blk->compr, w2m_blk->produced ); + if( wr != w2m_blk->produced ) + { show_error( "write", errno ); fatal(); } } circular_buffer[needed_id%num_slots] = 0; ++needed_id; - xlock(&m2s_q->av); - if( 0 == m2s_q->num_free++ ) xsignal(&m2s_q->av); - xunlock(&m2s_q->av); + xlock( &slot_tally.mutex ); + if( slot_tally.num_free++ == 0 ) xsignal( &slot_tally.slot_av ); + xunlock( &slot_tally.mutex ); - (*freef)(needed_w2m_blk); + (*freef)( w2m_blk ); } - xlock_pred(&w2m_q->av_or_exit); - w2m_q->needed = needed_id; + xlock( &w2m_queue.mutex ); + w2m_queue.needed_id = needed_id; } - xunlock(&w2m_q->av_or_exit); + xunlock( &w2m_queue.mutex ); for( int i = 0; i < num_slots; ++i ) if( circular_buffer[i] != 0 ) @@ -438,57 +465,49 @@ void muxer_loop( W2m_q *w2m_q, M2s_q *m2s_q, const int num_slots, const int outf } // end namespace -void * muxer( void * arg ) +int compress( const int data_size, const int dictionary_size, + const int match_len_limit, const int num_workers, + const int num_slots, const int infd, const int outfd, + const int debug_level ) { - const Muxer_arg & tmp = *(Muxer_arg *)arg; - const int dictionary_size = tmp.dictionary_size; - const int match_len_limit = tmp.match_len_limit; - const int num_workers = tmp.num_workers; - const int num_slots = tmp.num_slots; - const int debug_level = tmp.debug_level; - const int infd = tmp.infd; - const int outfd = tmp.outfd; - S2w_q s2w_q; - W2m_q w2m_q; - if( debug_level & 2 ) { mallocf = trace_malloc; freef = trace_free; } else { mallocf = malloc; freef = free; } - s2w_q_init(&s2w_q); - w2m_q_init(&w2m_q, num_workers); - M2s_q m2s_q( num_slots ); - + Slot_tally slot_tally( num_slots ); + S2w_queue s2w_queue; + W2m_queue w2m_queue( num_workers ); Splitter_arg splitter_arg; - splitter_arg.m2s_q = &m2s_q; - splitter_arg.s2w_q = &s2w_q; + splitter_arg.slot_tally = &slot_tally; + splitter_arg.s2w_queue = &s2w_queue; splitter_arg.infd = infd; - splitter_arg.sizeof_plain = 2 * std::max( 65536, dictionary_size ); - splitter_arg.sizeof_S2w_blk = sizeof(S2w_blk) + splitter_arg.sizeof_plain - 1; + splitter_arg.data_size = data_size; + splitter_arg.s2w_blk_size = sizeof (S2w_blk) + data_size - 1; pthread_t splitter_thread; - xcreate(&splitter_thread, splitter, &splitter_arg); + xcreate( &splitter_thread, splitter, &splitter_arg ); Worker_arg worker_arg; worker_arg.dictionary_size = dictionary_size; worker_arg.match_len_limit = match_len_limit; - worker_arg.s2w_q = &s2w_q; - worker_arg.w2m_q = &w2m_q; - worker_arg.sizeof_compr = 6 + 20 + ( ( splitter_arg.sizeof_plain / 8 ) * 9 ); - worker_arg.sizeof_W2m_blk = sizeof(W2m_blk) + worker_arg.sizeof_compr - 1; + worker_arg.s2w_queue = &s2w_queue; + worker_arg.w2m_queue = &w2m_queue; + worker_arg.compr_size = 6 + 20 + ( ( data_size / 8 ) * 9 ); + worker_arg.w2m_blk_size = sizeof (W2m_blk) + worker_arg.compr_size - 1; pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; - if( worker_threads == 0 ) fail("not enough memory.", errno); + if( worker_threads == 0 ) + { show_error( "not enough memory.", errno ); fatal(); } for( int i = 0; i < num_workers; ++i ) - xcreate(&worker_threads[i], worker, &worker_arg); + xcreate( &worker_threads[i], worker, &worker_arg ); - muxer_loop( &w2m_q, &m2s_q, num_slots, outfd ); + muxer( slot_tally, w2m_queue, num_slots, outfd ); for( int i = num_workers - 1; i >= 0; --i ) xjoin(worker_threads[i]); delete[] worker_threads; worker_threads = 0; - xjoin(splitter_thread); + xjoin( splitter_thread ); if( verbosity >= 1 ) { @@ -503,27 +522,27 @@ void * muxer( void * arg ) in_size, out_size ); } - const int FW = ( sizeof(long unsigned) * 8 ) / 3 + 1; - if( ( debug_level & 1 ) && 0 > fprintf(stderr, + const int FW = ( sizeof (unsigned long) * 8 ) / 3 + 1; + if( debug_level & 1 ) + std::fprintf( stderr, "any worker tried to consume from splitter: %*lu\n" "any worker stalled : %*lu\n" "muxer tried to consume from workers : %*lu\n" "muxer stalled : %*lu\n" - "splitter tried to consume from muxer : %*lu\n" + "splitter tried to fill a block : %*lu\n" "splitter stalled : %*lu\n", - FW, s2w_q.av_or_eof.ccount, - FW, s2w_q.av_or_eof.wcount, - FW, w2m_q.av_or_exit.ccount, - FW, w2m_q.av_or_exit.wcount, - FW, m2s_q.av.ccount, - FW, m2s_q.av.wcount) ) - { - fatal(); - } - - assert( m2s_q.num_free == num_slots ); - w2m_q_uninit(&w2m_q); - s2w_q_uninit(&s2w_q); - xraise(SIGUSR2); + FW, s2w_queue.check_counter, + FW, s2w_queue.wait_counter, + FW, w2m_queue.check_counter, + FW, w2m_queue.wait_counter, + FW, slot_tally.check_counter, + FW, slot_tally.wait_counter ); + + assert( slot_tally.num_free == num_slots ); + assert( s2w_queue.eof ); + assert( s2w_queue.head == 0 ); + assert( s2w_queue.tail == 0 ); + assert( w2m_queue.num_working == 0 ); + assert( w2m_queue.head == 0 ); return 0; } @@ -16,17 +16,19 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -struct Muxer_arg - { - int dictionary_size; - int match_len_limit; - int num_workers; - int num_slots; - int debug_level; - int infd; - int outfd; - }; - -void * muxer( void * arg ); +int compress( const int data_size, const int dictionary_size, + const int match_len_limit, const int num_workers, + const int num_slots, const int infd, const int outfd, + const int debug_level ); + + +/*----------------------- Defined in main.cc -----------------------*/ + +void show_error( const char * msg, const int errcode = 0, const bool help = false ) throw(); +int readblock( const int fd, uint8_t * buf, const int size ) throw(); +int writeblock( const int fd, const uint8_t * buf, const int size ) throw(); + + +void fatal(); // Terminate the process extern int verbosity; |