summaryrefslogtreecommitdiffstats
path: root/compress.cc
diff options
context:
space:
mode:
Diffstat (limited to 'compress.cc')
-rw-r--r--compress.cc103
1 files changed, 56 insertions, 47 deletions
diff --git a/compress.cc b/compress.cc
index 5bcd999..beae59e 100644
--- a/compress.cc
+++ b/compress.cc
@@ -1,6 +1,6 @@
/* Plzip - Parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek.
- Copyright (C) 2009-2017 Antonio Diaz Diaz.
+ Copyright (C) 2009-2018 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -159,8 +159,9 @@ struct Packet // data block with a serial number
uint8_t * data;
int size; // number of bytes in data (if any)
unsigned id; // serial number assigned as received
- Packet( uint8_t * const d, const int s, const unsigned i )
- : data( d ), size( s ), id( i ) {}
+ Packet() : data( 0 ), size( 0 ), id( 0 ) {}
+ void init( uint8_t * const d, const int s, const unsigned i )
+ { data = d; size = s; id = i; }
};
@@ -173,10 +174,11 @@ public:
unsigned owait_counter;
private:
unsigned receive_id; // id assigned to next packet received
+ unsigned distrib_id; // id of next packet to be distributed
unsigned deliver_id; // id of next packet to be delivered
Slot_tally slot_tally; // limits the number of input packets
- std::queue< Packet * > packet_queue;
- std::vector< const Packet * > circular_buffer;
+ std::vector< Packet > circular_ibuffer;
+ std::vector< const Packet * > circular_obuffer;
int num_working; // number of workers still running
const int num_slots; // max packets in circulation
pthread_mutex_t imutex;
@@ -192,8 +194,9 @@ public:
Packet_courier( const int workers, const int slots )
: icheck_counter( 0 ), iwait_counter( 0 ),
ocheck_counter( 0 ), owait_counter( 0 ),
- receive_id( 0 ), deliver_id( 0 ),
- slot_tally( slots ), circular_buffer( slots, (Packet *) 0 ),
+ receive_id( 0 ), distrib_id( 0 ), deliver_id( 0 ),
+ slot_tally( slots ), circular_ibuffer( slots ),
+ circular_obuffer( slots, (Packet *) 0 ),
num_working( workers ), num_slots( slots ), eof( false )
{
xinit_mutex( &imutex ); xinit_cond( &iav_or_eof );
@@ -206,13 +209,13 @@ public:
xdestroy_cond( &iav_or_eof ); xdestroy_mutex( &imutex );
}
- // make a packet with data received from splitter
+ // fill a packet with data received from splitter
void receive_packet( uint8_t * const data, const int size )
{
- Packet * const ipacket = new Packet( data, size, receive_id++ );
slot_tally.get_slot(); // wait for a free slot
xlock( &imutex );
- packet_queue.push( ipacket );
+ circular_ibuffer[receive_id % num_slots].init( data, size, receive_id );
+ ++receive_id;
xsignal( &iav_or_eof );
xunlock( &imutex );
}
@@ -223,18 +226,15 @@ public:
Packet * ipacket = 0;
xlock( &imutex );
++icheck_counter;
- while( packet_queue.empty() && !eof )
+ while( receive_id == distrib_id && !eof ) // no packets to distribute
{
++iwait_counter;
xwait( &iav_or_eof, &imutex );
}
- if( !packet_queue.empty() )
- {
- ipacket = packet_queue.front();
- packet_queue.pop();
- }
+ if( receive_id != distrib_id )
+ { ipacket = &circular_ibuffer[distrib_id % num_slots]; ++distrib_id; }
xunlock( &imutex );
- if( !ipacket )
+ if( !ipacket ) // EOF
{
// notify muxer when last worker exits
xlock( &omutex );
@@ -250,10 +250,10 @@ public:
const int i = opacket->id % num_slots;
xlock( &omutex );
// id collision shouldn't happen
- if( circular_buffer[i] != 0 )
+ if( circular_obuffer[i] != 0 )
internal_error( "id collision in collect_packet." );
// merge packet into circular buffer
- circular_buffer[i] = opacket;
+ circular_obuffer[i] = opacket;
if( opacket->id == deliver_id ) xsignal( &oav_or_exit );
xunlock( &omutex );
}
@@ -264,7 +264,7 @@ public:
xlock( &omutex );
++ocheck_counter;
int i = deliver_id % num_slots;
- while( circular_buffer[i] == 0 && num_working > 0 )
+ while( circular_obuffer[i] == 0 && num_working > 0 )
{
++owait_counter;
xwait( &oav_or_exit, &omutex );
@@ -272,18 +272,19 @@ public:
packet_vector.clear();
while( true )
{
- const Packet * const opacket = circular_buffer[i];
+ const Packet * const opacket = circular_obuffer[i];
if( !opacket ) break;
packet_vector.push_back( opacket );
- circular_buffer[i] = 0;
+ circular_obuffer[i] = 0;
++deliver_id;
i = deliver_id % num_slots;
}
xunlock( &omutex );
- if( packet_vector.size() ) // return slots to the tally
- slot_tally.leave_slots( packet_vector.size() );
}
+ void return_empty_packet() // return a slot to the tally
+ { slot_tally.leave_slot(); }
+
void finish() // splitter has no more packets to send
{
xlock( &imutex );
@@ -294,10 +295,10 @@ public:
bool finished() // all packets delivered to muxer
{
- if( !slot_tally.all_free() || !eof || !packet_queue.empty() ||
+ if( !slot_tally.all_free() || !eof || receive_id != distrib_id ||
num_working != 0 ) return false;
for( int i = 0; i < num_slots; ++i )
- if( circular_buffer[i] != 0 ) return false;
+ if( circular_obuffer[i] != 0 ) return false;
return true;
}
};
@@ -369,26 +370,32 @@ extern "C" void * cworker( void * arg )
const int dictionary_size = tmp.dictionary_size;
const int match_len_limit = tmp.match_len_limit;
const int offset = tmp.offset;
+ LZ_Encoder * encoder = 0;
while( true )
{
Packet * const packet = courier.distribute_packet();
if( !packet ) break; // no more packets to process
- const bool fast = dictionary_size == 65535 && match_len_limit == 16;
- const int dict_size = fast ? dictionary_size :
- std::max( std::min( dictionary_size, packet->size ),
- LZ_min_dictionary_size() );
- LZ_Encoder * const encoder =
- LZ_compress_open( dict_size, match_len_limit, LLONG_MAX );
- if( !encoder || LZ_compress_errno( encoder ) != LZ_ok )
+ if( !encoder )
{
- if( !encoder || LZ_compress_errno( encoder ) == LZ_mem_error )
- pp( mem_msg );
- else
- internal_error( "invalid argument to encoder." );
- cleanup_and_fail();
+ const bool fast = dictionary_size == 65535 && match_len_limit == 16;
+ const int dict_size = fast ? dictionary_size :
+ std::max( std::min( dictionary_size, packet->size ),
+ LZ_min_dictionary_size() );
+ encoder = LZ_compress_open( dict_size, match_len_limit, LLONG_MAX );
+ if( !encoder || LZ_compress_errno( encoder ) != LZ_ok )
+ {
+ if( !encoder || LZ_compress_errno( encoder ) == LZ_mem_error )
+ pp( mem_msg );
+ else
+ internal_error( "invalid argument to encoder." );
+ cleanup_and_fail();
+ }
}
+ else
+ if( LZ_compress_restart_member( encoder, LLONG_MAX ) < 0 )
+ { pp( "LZ_compress_restart_member failed." ); cleanup_and_fail(); }
int written = 0;
int new_pos = 0;
@@ -422,13 +429,12 @@ extern "C" void * cworker( void * arg )
if( LZ_compress_finished( encoder ) == 1 ) break;
}
- if( LZ_compress_close( encoder ) < 0 )
- { pp( "LZ_compress_close failed." ); cleanup_and_fail(); }
-
if( packet->size > 0 ) show_progress( packet->size );
packet->size = new_pos;
courier.collect_packet( packet );
}
+ if( encoder && LZ_compress_close( encoder ) < 0 )
+ { pp( "LZ_compress_close failed." ); cleanup_and_fail(); }
return 0;
}
@@ -452,7 +458,7 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
if( wr != opacket->size )
{ pp(); show_error( "Write error", errno ); cleanup_and_fail(); }
delete[] opacket->data;
- delete opacket;
+ courier.return_empty_packet();
}
}
}
@@ -462,7 +468,8 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
// init the courier, then start the splitter and the workers and
// call the muxer.
-int compress( const int data_size, const int dictionary_size,
+int compress( const unsigned long long cfile_size,
+ const int data_size, const int dictionary_size,
const int match_len_limit, const int num_workers,
const int infd, const int outfd,
const Pretty_print & pp, const int debug_level )
@@ -486,6 +493,8 @@ int compress( const int data_size, const int dictionary_size,
int errcode = pthread_create( &splitter_thread, 0, csplitter, &splitter_arg );
if( errcode )
{ show_error( "Can't create splitter thread", errcode ); cleanup_and_fail(); }
+ if( verbosity >= 1 ) pp();
+ show_progress( 0, cfile_size, &pp ); // init
Worker_arg worker_arg;
worker_arg.courier = &courier;
@@ -522,11 +531,11 @@ int compress( const int data_size, const int dictionary_size,
if( in_size == 0 || out_size == 0 )
std::fputs( " no data compressed.\n", stderr );
else
- std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, "
- "%5.2f%% saved, %llu in, %llu out.\n",
+ std::fprintf( stderr, "%6.3f:1, %5.2f%% ratio, %5.2f%% saved, "
+ "%llu in, %llu out.\n",
(double)in_size / out_size,
- ( 8.0 * out_size ) / in_size,
- 100.0 * ( 1.0 - ( (double)out_size / in_size ) ),
+ ( 100.0 * out_size ) / in_size,
+ 100.0 - ( ( 100.0 * out_size ) / in_size ),
in_size, out_size );
}