diff options
Diffstat (limited to 'compress.cc')
-rw-r--r-- | compress.cc | 184 |
1 files changed, 110 insertions, 74 deletions
diff --git a/compress.cc b/compress.cc index cf0135a..c4428ea 100644 --- a/compress.cc +++ b/compress.cc @@ -1,6 +1,6 @@ /* Plzip - A parallel compressor compatible with lzip Copyright (C) 2009 Laszlo Ersek. - Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz. + Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -28,22 +28,53 @@ #include <queue> #include <string> #include <vector> -#include <inttypes.h> #include <pthread.h> +#include <stdint.h> #include <unistd.h> #include <lzlib.h> -#include "plzip.h" +#include "lzip.h" #ifndef LLONG_MAX #define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL #endif -#ifndef LLONG_MIN -#define LLONG_MIN (-LLONG_MAX - 1LL) -#endif -#ifndef ULLONG_MAX -#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL -#endif + + +// Returns the number of bytes really read. +// If (returned value < size) and (errno == 0), means EOF was reached. +// +int readblock( const int fd, uint8_t * const buf, const int size ) + { + int rest = size; + errno = 0; + while( rest > 0 ) + { + const int n = read( fd, buf + size - rest, rest ); + if( n > 0 ) rest -= n; + else if( n == 0 ) break; // EOF + else if( errno != EINTR && errno != EAGAIN ) break; + errno = 0; + } + return size - rest; + } + + +// Returns the number of bytes really written. +// If (returned value < size), it is always an error. +// +int writeblock( const int fd, const uint8_t * const buf, const int size ) + { + int rest = size; + errno = 0; + while( rest > 0 ) + { + const int n = write( fd, buf + size - rest, rest ); + if( n > 0 ) rest -= n; + else if( n < 0 && errno != EINTR && errno != EAGAIN ) break; + errno = 0; + } + return size - rest; + } void xinit( pthread_mutex_t * const mutex ) @@ -109,13 +140,14 @@ void xbroadcast( pthread_cond_t * const cond ) namespace { -long long in_size = 0; -long long out_size = 0; +unsigned long long in_size = 0; +unsigned long long out_size = 0; +const char * const mem_msg = "Not enough memory. Try a smaller dictionary size"; struct Packet // data block with a serial number { - unsigned long long id; // serial number assigned as received + unsigned id; // serial number assigned as received uint8_t * data; int size; // number of bytes in data (if any) }; @@ -124,16 +156,16 @@ struct Packet // data block with a serial number class Packet_courier // moves packets around { public: - unsigned long icheck_counter; - unsigned long iwait_counter; - unsigned long ocheck_counter; - unsigned long owait_counter; + unsigned icheck_counter; + unsigned iwait_counter; + unsigned ocheck_counter; + unsigned owait_counter; private: - unsigned long long receive_id; // id assigned to next packet received - unsigned long long deliver_id; // id of next packet to be delivered + unsigned receive_id; // id assigned to next packet received + unsigned deliver_id; // id of next packet to be delivered Slot_tally slot_tally; // limits the number of input packets std::queue< Packet * > packet_queue; - std::vector< Packet * > circular_buffer; + std::vector< const Packet * > circular_buffer; int num_working; // number of workers still running const int num_slots; // max packets in circulation pthread_mutex_t imutex; @@ -163,12 +195,10 @@ public: xdestroy( &iav_or_eof ); xdestroy( &imutex ); } - const Slot_tally & tally() const { return slot_tally; } - // make a packet with data received from splitter void receive_packet( uint8_t * const data, const int size ) { - Packet * ipacket = new Packet; + Packet * const ipacket = new Packet; ipacket->id = receive_id++; ipacket->data = data; ipacket->size = size; @@ -189,7 +219,6 @@ public: { ++iwait_counter; xwait( &iav_or_eof, &imutex ); - ++icheck_counter; } if( !packet_queue.empty() ) { @@ -197,7 +226,7 @@ public: packet_queue.pop(); } xunlock( &imutex ); - if( ipacket == 0 ) + if( !ipacket ) { // notify muxer when last worker exits xlock( &omutex ); @@ -208,36 +237,43 @@ public: } // collect a packet from a worker - void collect_packet( Packet * const opacket ) + void collect_packet( const Packet * const opacket ) { + const int i = opacket->id%num_slots; xlock( &omutex ); // id collision shouldn't happen - if( circular_buffer[opacket->id%num_slots] != 0 ) + if( circular_buffer[i] != 0 ) internal_error( "id collision in collect_packet" ); // merge packet into circular buffer - circular_buffer[opacket->id%num_slots] = opacket; + circular_buffer[i] = opacket; if( opacket->id == deliver_id ) xsignal( &oav_or_exit ); xunlock( &omutex ); } - // deliver a packet to muxer - Packet * deliver_packet() + // deliver packets to muxer + void deliver_packets( std::vector< const Packet * > & packet_vector ) { xlock( &omutex ); ++ocheck_counter; - while( circular_buffer[deliver_id%num_slots] == 0 && num_working > 0 ) + int i = deliver_id % num_slots; + while( circular_buffer[i] == 0 && num_working > 0 ) { ++owait_counter; xwait( &oav_or_exit, &omutex ); - ++ocheck_counter; } - Packet * opacket = circular_buffer[deliver_id%num_slots]; - circular_buffer[deliver_id%num_slots] = 0; - ++deliver_id; + packet_vector.clear(); + while( true ) + { + const Packet * const opacket = circular_buffer[i]; + if( !opacket ) break; + packet_vector.push_back( opacket ); + circular_buffer[i] = 0; + ++deliver_id; + i = deliver_id % num_slots; + } xunlock( &omutex ); - if( opacket != 0 ) - slot_tally.leave_slot(); // return a slot to the tally - return opacket; + if( packet_vector.size() ) // return slots to the tally + slot_tally.leave_slots( packet_vector.size() ); } void finish() // splitter has no more packets to send @@ -281,12 +317,12 @@ extern "C" void * csplitter( void * arg ) for( bool first_post = true; ; first_post = false ) { uint8_t * const data = new( std::nothrow ) uint8_t[data_size]; - if( data == 0 ) { pp( "Not enough memory" ); fatal(); } + if( !data ) { pp( mem_msg ); fatal(); } const int size = readblock( infd, data, data_size ); if( size != data_size && errno ) { pp(); show_error( "Read error", errno ); fatal(); } - if( size > 0 || first_post ) // first packet can be empty + if( size > 0 || first_post ) // first packet may be empty { in_size += size; courier.receive_packet( data, size ); @@ -325,11 +361,11 @@ extern "C" void * cworker( void * arg ) while( true ) { Packet * const packet = courier.distribute_packet(); - if( packet == 0 ) break; // no more packets to process + if( !packet ) break; // no more packets to process const int max_compr_size = 42 + packet->size + ( ( packet->size + 7 ) / 8 ); uint8_t * const new_data = new( std::nothrow ) uint8_t[max_compr_size]; - if( new_data == 0 ) { pp( "Not enough memory" ); fatal(); } + if( !new_data ) { pp( mem_msg ); fatal(); } const int dict_size = std::max( LZ_min_dictionary_size(), std::min( dictionary_size, packet->size ) ); LZ_Encoder * const encoder = @@ -337,14 +373,14 @@ extern "C" void * cworker( void * arg ) if( !encoder || LZ_compress_errno( encoder ) != LZ_ok ) { if( !encoder || LZ_compress_errno( encoder ) == LZ_mem_error ) - pp( "Not enough memory. Try a smaller dictionary size" ); + pp( mem_msg ); else internal_error( "invalid argument to encoder" ); fatal(); } int written = 0; - int new_size = 0; + int new_pos = 0; while( true ) { if( LZ_compress_write_size( encoder ) > 0 ) @@ -359,8 +395,8 @@ extern "C" void * cworker( void * arg ) if( written >= packet->size ) { delete[] packet->data; LZ_compress_finish( encoder ); } } - const int rd = LZ_compress_read( encoder, new_data + new_size, - max_compr_size - new_size ); + const int rd = LZ_compress_read( encoder, new_data + new_pos, + max_compr_size - new_pos ); if( rd < 0 ) { pp(); @@ -369,8 +405,8 @@ extern "C" void * cworker( void * arg ) LZ_strerror( LZ_compress_errno( encoder ) ) ); fatal(); } - new_size += rd; - if( new_size > max_compr_size ) + new_pos += rd; + if( new_pos > max_compr_size ) internal_error( "packet size exceeded in worker" ); if( LZ_compress_finished( encoder ) == 1 ) break; } @@ -379,7 +415,7 @@ extern "C" void * cworker( void * arg ) { pp( "LZ_compress_close failed" ); fatal(); } packet->data = new_data; - packet->size = new_size; + packet->size = new_pos; courier.collect_packet( packet ); } return 0; @@ -390,21 +426,26 @@ extern "C" void * cworker( void * arg ) // their contents to the output file. void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd ) { + std::vector< const Packet * > packet_vector; while( true ) { - Packet * opacket = courier.deliver_packet(); - if( opacket == 0 ) break; // queue is empty. all workers exited - - out_size += opacket->size; + courier.deliver_packets( packet_vector ); + if( packet_vector.size() == 0 ) break; // all workers exited - if( outfd >= 0 ) + for( unsigned i = 0; i < packet_vector.size(); ++i ) { - const int wr = writeblock( outfd, opacket->data, opacket->size ); - if( wr != opacket->size ) - { pp(); show_error( "Write error", errno ); fatal(); } + const Packet * const opacket = packet_vector[i]; + out_size += opacket->size; + + if( outfd >= 0 ) + { + const int wr = writeblock( outfd, opacket->data, opacket->size ); + if( wr != opacket->size ) + { pp(); show_error( "Write error", errno ); fatal(); } + } + delete[] opacket->data; + delete opacket; } - delete[] opacket->data; - delete opacket; } } @@ -419,11 +460,11 @@ int compress( const int data_size, const int dictionary_size, const Pretty_print & pp, const int debug_level ) { const int slots_per_worker = 2; - const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ? - num_workers * slots_per_worker : INT_MAX ); + const int num_slots = + ( ( num_workers > 1 ) ? num_workers * slots_per_worker : 1 ); in_size = 0; out_size = 0; - Packet_courier courier( num_workers, num_slots - 1 ); + Packet_courier courier( num_workers, num_slots ); Splitter_arg splitter_arg; splitter_arg.courier = &courier; @@ -443,8 +484,7 @@ int compress( const int data_size, const int dictionary_size, worker_arg.match_len_limit = match_len_limit; pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; - if( worker_threads == 0 ) - { pp( "Not enough memory" ); fatal(); } + if( !worker_threads ) { pp( mem_msg ); fatal(); } for( int i = 0; i < num_workers; ++i ) { errcode = pthread_create( worker_threads + i, 0, cworker, &worker_arg ); @@ -460,7 +500,7 @@ int compress( const int data_size, const int dictionary_size, if( errcode ) { show_error( "Can't join worker threads", errcode ); fatal(); } } - delete[] worker_threads; worker_threads = 0; + delete[] worker_threads; errcode = pthread_join( splitter_thread, 0 ); if( errcode ) @@ -468,11 +508,11 @@ int compress( const int data_size, const int dictionary_size, if( verbosity >= 1 ) { - if( in_size <= 0 || out_size <= 0 ) + if( in_size == 0 || out_size == 0 ) std::fprintf( stderr, " no data compressed.\n" ); else std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, " - "%5.2f%% saved, %lld in, %lld out.\n", + "%5.2f%% saved, %llu in, %llu out.\n", (double)in_size / out_size, ( 8.0 * out_size ) / in_size, 100.0 * ( 1.0 - ( (double)out_size / in_size ) ), @@ -481,14 +521,10 @@ int compress( const int data_size, const int dictionary_size, if( debug_level & 1 ) std::fprintf( stderr, - "splitter tried to send a packet %8lu times\n" - "splitter had to wait %8lu times\n" - "any worker tried to consume from splitter %8lu times\n" - "any worker had to wait %8lu times\n" - "muxer tried to consume from workers %8lu times\n" - "muxer had to wait %8lu times\n", - courier.tally().check_counter, - courier.tally().wait_counter, + "any worker tried to consume from splitter %8u times\n" + "any worker had to wait %8u times\n" + "muxer tried to consume from workers %8u times\n" + "muxer had to wait %8u times\n", courier.icheck_counter, courier.iwait_counter, courier.ocheck_counter, |