diff options
Diffstat (limited to '')
-rw-r--r-- | compress.cc | 237 |
1 files changed, 122 insertions, 115 deletions
diff --git a/compress.cc b/compress.cc index af36f95..d8e2536 100644 --- a/compress.cc +++ b/compress.cc @@ -1,19 +1,19 @@ -/* Plzip - Massively parallel implementation of lzip - Copyright (C) 2009 Laszlo Ersek. - Copyright (C) 2009-2019 Antonio Diaz Diaz. - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. +/* Plzip - Massively parallel implementation of lzip + Copyright (C) 2009 Laszlo Ersek. + Copyright (C) 2009-2021 Antonio Diaz Diaz. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. */ #define _FILE_OFFSET_BITS 64 @@ -27,7 +27,6 @@ #include <cstring> #include <string> #include <vector> -#include <pthread.h> #include <stdint.h> #include <unistd.h> #include <lzlib.h> @@ -39,9 +38,9 @@ #endif -// Returns the number of bytes really read. -// If (returned value < size) and (errno == 0), means EOF was reached. -// +/* Returns the number of bytes really read. + If (returned value < size) and (errno == 0), means EOF was reached. +*/ int readblock( const int fd, uint8_t * const buf, const int size ) { int sz = 0; @@ -58,9 +57,9 @@ int readblock( const int fd, uint8_t * const buf, const int size ) } -// Returns the number of bytes really written. -// If (returned value < size), it is always an error. -// +/* Returns the number of bytes really written. + If (returned value < size), it is always an error. +*/ int writeblock( const int fd, const uint8_t * const buf, const int size ) { int sz = 0; @@ -150,7 +149,7 @@ namespace { unsigned long long in_size = 0; unsigned long long out_size = 0; -const char * const mem_msg = "Not enough memory. Try a smaller dictionary size"; +const char * const mem_msg2 = "Not enough memory. Try a smaller dictionary size."; struct Packet // data block with a serial number @@ -235,8 +234,7 @@ public: xunlock( &imutex ); if( !ipacket ) // EOF { - // notify muxer when last worker exits - xlock( &omutex ); + xlock( &omutex ); // notify muxer when last worker exits if( --num_working == 0 ) xsignal( &oav_or_exit ); xunlock( &omutex ); } @@ -284,12 +282,16 @@ public: void return_empty_packet() // return a slot to the tally { slot_tally.leave_slot(); } - void finish() // splitter has no more packets to send + void finish( const int workers_spared ) { - xlock( &imutex ); + xlock( &imutex ); // splitter has no more packets to send eof = true; xbroadcast( &iav_or_eof ); xunlock( &imutex ); + xlock( &omutex ); // notify muxer if all workers have exited + num_working -= workers_spared; + if( num_working <= 0 ) xsignal( &oav_or_exit ); + xunlock( &omutex ); } bool finished() // all packets delivered to muxer @@ -303,52 +305,6 @@ public: }; -struct Splitter_arg - { - Packet_courier * courier; - const Pretty_print * pp; - int infd; - int data_size; - int offset; - }; - - - // split data from input file into chunks and pass them to - // courier for packaging and distribution to workers. -extern "C" void * csplitter( void * arg ) - { - const Splitter_arg & tmp = *(const Splitter_arg *)arg; - Packet_courier & courier = *tmp.courier; - const Pretty_print & pp = *tmp.pp; - const int infd = tmp.infd; - const int data_size = tmp.data_size; - const int offset = tmp.offset; - - for( bool first_post = true; ; first_post = false ) - { - uint8_t * const data = new( std::nothrow ) uint8_t[offset+data_size]; - if( !data ) { pp( mem_msg ); cleanup_and_fail(); } - const int size = readblock( infd, data + offset, data_size ); - if( size != data_size && errno ) - { pp(); show_error( "Read error", errno ); cleanup_and_fail(); } - - if( size > 0 || first_post ) // first packet may be empty - { - in_size += size; - courier.receive_packet( data, size ); - if( size < data_size ) break; // EOF - } - else - { - delete[] data; - break; - } - } - courier.finish(); // no more packets to send - return 0; - } - - struct Worker_arg { Packet_courier * courier; @@ -358,9 +314,18 @@ struct Worker_arg int offset; }; +struct Splitter_arg + { + struct Worker_arg worker_arg; + pthread_t * worker_threads; + int infd; + int data_size; + int num_workers; // returned by splitter to main thread + }; + - // get packets from courier, replace their contents, and return - // them to courier. +/* Get packets from courier, replace their contents, and return them to + courier. */ extern "C" void * cworker( void * arg ) { const Worker_arg & tmp = *(const Worker_arg *)arg; @@ -386,7 +351,7 @@ extern "C" void * cworker( void * arg ) if( !encoder || LZ_compress_errno( encoder ) != LZ_ok ) { if( !encoder || LZ_compress_errno( encoder ) == LZ_mem_error ) - pp( mem_msg ); + pp( mem_msg2 ); else internal_error( "invalid argument to encoder." ); cleanup_and_fail(); @@ -435,8 +400,57 @@ extern "C" void * cworker( void * arg ) } - // get from courier the processed and sorted packets, and write - // their contents to the output file. +/* Split data from input file into chunks and pass them to courier for + packaging and distribution to workers. + Start a worker per packet up to a maximum of num_workers. +*/ +extern "C" void * csplitter( void * arg ) + { + Splitter_arg & tmp = *(Splitter_arg *)arg; + Packet_courier & courier = *tmp.worker_arg.courier; + const Pretty_print & pp = *tmp.worker_arg.pp; + pthread_t * const worker_threads = tmp.worker_threads; + const int offset = tmp.worker_arg.offset; + const int infd = tmp.infd; + const int data_size = tmp.data_size; + int i = 0; // number of workers started + + for( bool first_post = true; ; first_post = false ) + { + uint8_t * const data = new( std::nothrow ) uint8_t[offset+data_size]; + if( !data ) { pp( mem_msg2 ); cleanup_and_fail(); } + const int size = readblock( infd, data + offset, data_size ); + if( size != data_size && errno ) + { pp(); show_error( "Read error", errno ); cleanup_and_fail(); } + + if( size > 0 || first_post ) // first packet may be empty + { + in_size += size; + courier.receive_packet( data, size ); + if( i < tmp.num_workers ) // start a new worker + { + const int errcode = + pthread_create( &worker_threads[i++], 0, cworker, &tmp.worker_arg ); + if( errcode ) { show_error( "Can't create worker threads", errcode ); + cleanup_and_fail(); } + } + if( size < data_size ) break; // EOF + } + else + { + delete[] data; + break; + } + } + courier.finish( tmp.num_workers - i ); // no more packets to send + tmp.num_workers = i; + return 0; + } + + +/* Get from courier the processed and sorted packets, and write their + contents to the output file. +*/ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd ) { std::vector< const Packet * > packet_vector; @@ -450,8 +464,7 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd ) const Packet * const opacket = packet_vector[i]; out_size += opacket->size; - const int wr = writeblock( outfd, opacket->data, opacket->size ); - if( wr != opacket->size ) + if( writeblock( outfd, opacket->data, opacket->size ) != opacket->size ) { pp(); show_error( "Write error", errno ); cleanup_and_fail(); } delete[] opacket->data; courier.return_empty_packet(); @@ -462,8 +475,8 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd ) } // end namespace - // init the courier, then start the splitter and the workers and - // call the muxer. +/* Init the courier, then start the splitter and the workers and call the + muxer. */ int compress( const unsigned long long cfile_size, const int data_size, const int dictionary_size, const int match_len_limit, const int num_workers, @@ -478,50 +491,44 @@ int compress( const unsigned long long cfile_size, out_size = 0; Packet_courier courier( num_workers, num_slots ); + if( debug_level & 2 ) std::fputs( "compress.\n", stderr ); + + pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; + if( !worker_threads ) { pp( mem_msg ); return 1; } + Splitter_arg splitter_arg; - splitter_arg.courier = &courier; - splitter_arg.pp = &pp; + splitter_arg.worker_arg.courier = &courier; + splitter_arg.worker_arg.pp = &pp; + splitter_arg.worker_arg.dictionary_size = dictionary_size; + splitter_arg.worker_arg.match_len_limit = match_len_limit; + splitter_arg.worker_arg.offset = offset; + splitter_arg.worker_threads = worker_threads; splitter_arg.infd = infd; splitter_arg.data_size = data_size; - splitter_arg.offset = offset; + splitter_arg.num_workers = num_workers; pthread_t splitter_thread; int errcode = pthread_create( &splitter_thread, 0, csplitter, &splitter_arg ); if( errcode ) - { show_error( "Can't create splitter thread", errcode ); cleanup_and_fail(); } + { show_error( "Can't create splitter thread", errcode ); + delete[] worker_threads; return 1; } if( verbosity >= 1 ) pp(); show_progress( 0, cfile_size, &pp ); // init - Worker_arg worker_arg; - worker_arg.courier = &courier; - worker_arg.pp = &pp; - worker_arg.dictionary_size = dictionary_size; - worker_arg.match_len_limit = match_len_limit; - worker_arg.offset = offset; - - pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; - if( !worker_threads ) { pp( mem_msg ); cleanup_and_fail(); } - for( int i = 0; i < num_workers; ++i ) - { - errcode = pthread_create( worker_threads + i, 0, cworker, &worker_arg ); - if( errcode ) - { show_error( "Can't create worker threads", errcode ); cleanup_and_fail(); } - } - muxer( courier, pp, outfd ); - for( int i = num_workers - 1; i >= 0; --i ) - { + errcode = pthread_join( splitter_thread, 0 ); + if( errcode ) { show_error( "Can't join splitter thread", errcode ); + cleanup_and_fail(); } + + for( int i = splitter_arg.num_workers; --i >= 0; ) + { // join only the workers started errcode = pthread_join( worker_threads[i], 0 ); - if( errcode ) - { show_error( "Can't join worker threads", errcode ); cleanup_and_fail(); } + if( errcode ) { show_error( "Can't join worker threads", errcode ); + cleanup_and_fail(); } } delete[] worker_threads; - errcode = pthread_join( splitter_thread, 0 ); - if( errcode ) - { show_error( "Can't join splitter thread", errcode ); cleanup_and_fail(); } - if( verbosity >= 1 ) { if( in_size == 0 || out_size == 0 ) @@ -537,14 +544,14 @@ int compress( const unsigned long long cfile_size, if( debug_level & 1 ) std::fprintf( stderr, + "workers started %8u\n" "any worker tried to consume from splitter %8u times\n" "any worker had to wait %8u times\n" "muxer tried to consume from workers %8u times\n" "muxer had to wait %8u times\n", - courier.icheck_counter, - courier.iwait_counter, - courier.ocheck_counter, - courier.owait_counter ); + splitter_arg.num_workers, + courier.icheck_counter, courier.iwait_counter, + courier.ocheck_counter, courier.owait_counter ); if( !courier.finished() ) internal_error( "courier not finished." ); return 0; |