summaryrefslogtreecommitdiffstats
path: root/compress.cc
diff options
context:
space:
mode:
Diffstat (limited to 'compress.cc')
-rw-r--r--compress.cc184
1 files changed, 110 insertions, 74 deletions
diff --git a/compress.cc b/compress.cc
index cf0135a..c4428ea 100644
--- a/compress.cc
+++ b/compress.cc
@@ -1,6 +1,6 @@
/* Plzip - A parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek.
- Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz.
+ Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -28,22 +28,53 @@
#include <queue>
#include <string>
#include <vector>
-#include <inttypes.h>
#include <pthread.h>
+#include <stdint.h>
#include <unistd.h>
#include <lzlib.h>
-#include "plzip.h"
+#include "lzip.h"
#ifndef LLONG_MAX
#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL
#endif
-#ifndef LLONG_MIN
-#define LLONG_MIN (-LLONG_MAX - 1LL)
-#endif
-#ifndef ULLONG_MAX
-#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL
-#endif
+
+
+// Returns the number of bytes really read.
+// If (returned value < size) and (errno == 0), means EOF was reached.
+//
+int readblock( const int fd, uint8_t * const buf, const int size )
+ {
+ int rest = size;
+ errno = 0;
+ while( rest > 0 )
+ {
+ const int n = read( fd, buf + size - rest, rest );
+ if( n > 0 ) rest -= n;
+ else if( n == 0 ) break; // EOF
+ else if( errno != EINTR && errno != EAGAIN ) break;
+ errno = 0;
+ }
+ return size - rest;
+ }
+
+
+// Returns the number of bytes really written.
+// If (returned value < size), it is always an error.
+//
+int writeblock( const int fd, const uint8_t * const buf, const int size )
+ {
+ int rest = size;
+ errno = 0;
+ while( rest > 0 )
+ {
+ const int n = write( fd, buf + size - rest, rest );
+ if( n > 0 ) rest -= n;
+ else if( n < 0 && errno != EINTR && errno != EAGAIN ) break;
+ errno = 0;
+ }
+ return size - rest;
+ }
void xinit( pthread_mutex_t * const mutex )
@@ -109,13 +140,14 @@ void xbroadcast( pthread_cond_t * const cond )
namespace {
-long long in_size = 0;
-long long out_size = 0;
+unsigned long long in_size = 0;
+unsigned long long out_size = 0;
+const char * const mem_msg = "Not enough memory. Try a smaller dictionary size";
struct Packet // data block with a serial number
{
- unsigned long long id; // serial number assigned as received
+ unsigned id; // serial number assigned as received
uint8_t * data;
int size; // number of bytes in data (if any)
};
@@ -124,16 +156,16 @@ struct Packet // data block with a serial number
class Packet_courier // moves packets around
{
public:
- unsigned long icheck_counter;
- unsigned long iwait_counter;
- unsigned long ocheck_counter;
- unsigned long owait_counter;
+ unsigned icheck_counter;
+ unsigned iwait_counter;
+ unsigned ocheck_counter;
+ unsigned owait_counter;
private:
- unsigned long long receive_id; // id assigned to next packet received
- unsigned long long deliver_id; // id of next packet to be delivered
+ unsigned receive_id; // id assigned to next packet received
+ unsigned deliver_id; // id of next packet to be delivered
Slot_tally slot_tally; // limits the number of input packets
std::queue< Packet * > packet_queue;
- std::vector< Packet * > circular_buffer;
+ std::vector< const Packet * > circular_buffer;
int num_working; // number of workers still running
const int num_slots; // max packets in circulation
pthread_mutex_t imutex;
@@ -163,12 +195,10 @@ public:
xdestroy( &iav_or_eof ); xdestroy( &imutex );
}
- const Slot_tally & tally() const { return slot_tally; }
-
// make a packet with data received from splitter
void receive_packet( uint8_t * const data, const int size )
{
- Packet * ipacket = new Packet;
+ Packet * const ipacket = new Packet;
ipacket->id = receive_id++;
ipacket->data = data;
ipacket->size = size;
@@ -189,7 +219,6 @@ public:
{
++iwait_counter;
xwait( &iav_or_eof, &imutex );
- ++icheck_counter;
}
if( !packet_queue.empty() )
{
@@ -197,7 +226,7 @@ public:
packet_queue.pop();
}
xunlock( &imutex );
- if( ipacket == 0 )
+ if( !ipacket )
{
// notify muxer when last worker exits
xlock( &omutex );
@@ -208,36 +237,43 @@ public:
}
// collect a packet from a worker
- void collect_packet( Packet * const opacket )
+ void collect_packet( const Packet * const opacket )
{
+ const int i = opacket->id%num_slots;
xlock( &omutex );
// id collision shouldn't happen
- if( circular_buffer[opacket->id%num_slots] != 0 )
+ if( circular_buffer[i] != 0 )
internal_error( "id collision in collect_packet" );
// merge packet into circular buffer
- circular_buffer[opacket->id%num_slots] = opacket;
+ circular_buffer[i] = opacket;
if( opacket->id == deliver_id ) xsignal( &oav_or_exit );
xunlock( &omutex );
}
- // deliver a packet to muxer
- Packet * deliver_packet()
+ // deliver packets to muxer
+ void deliver_packets( std::vector< const Packet * > & packet_vector )
{
xlock( &omutex );
++ocheck_counter;
- while( circular_buffer[deliver_id%num_slots] == 0 && num_working > 0 )
+ int i = deliver_id % num_slots;
+ while( circular_buffer[i] == 0 && num_working > 0 )
{
++owait_counter;
xwait( &oav_or_exit, &omutex );
- ++ocheck_counter;
}
- Packet * opacket = circular_buffer[deliver_id%num_slots];
- circular_buffer[deliver_id%num_slots] = 0;
- ++deliver_id;
+ packet_vector.clear();
+ while( true )
+ {
+ const Packet * const opacket = circular_buffer[i];
+ if( !opacket ) break;
+ packet_vector.push_back( opacket );
+ circular_buffer[i] = 0;
+ ++deliver_id;
+ i = deliver_id % num_slots;
+ }
xunlock( &omutex );
- if( opacket != 0 )
- slot_tally.leave_slot(); // return a slot to the tally
- return opacket;
+ if( packet_vector.size() ) // return slots to the tally
+ slot_tally.leave_slots( packet_vector.size() );
}
void finish() // splitter has no more packets to send
@@ -281,12 +317,12 @@ extern "C" void * csplitter( void * arg )
for( bool first_post = true; ; first_post = false )
{
uint8_t * const data = new( std::nothrow ) uint8_t[data_size];
- if( data == 0 ) { pp( "Not enough memory" ); fatal(); }
+ if( !data ) { pp( mem_msg ); fatal(); }
const int size = readblock( infd, data, data_size );
if( size != data_size && errno )
{ pp(); show_error( "Read error", errno ); fatal(); }
- if( size > 0 || first_post ) // first packet can be empty
+ if( size > 0 || first_post ) // first packet may be empty
{
in_size += size;
courier.receive_packet( data, size );
@@ -325,11 +361,11 @@ extern "C" void * cworker( void * arg )
while( true )
{
Packet * const packet = courier.distribute_packet();
- if( packet == 0 ) break; // no more packets to process
+ if( !packet ) break; // no more packets to process
const int max_compr_size = 42 + packet->size + ( ( packet->size + 7 ) / 8 );
uint8_t * const new_data = new( std::nothrow ) uint8_t[max_compr_size];
- if( new_data == 0 ) { pp( "Not enough memory" ); fatal(); }
+ if( !new_data ) { pp( mem_msg ); fatal(); }
const int dict_size = std::max( LZ_min_dictionary_size(),
std::min( dictionary_size, packet->size ) );
LZ_Encoder * const encoder =
@@ -337,14 +373,14 @@ extern "C" void * cworker( void * arg )
if( !encoder || LZ_compress_errno( encoder ) != LZ_ok )
{
if( !encoder || LZ_compress_errno( encoder ) == LZ_mem_error )
- pp( "Not enough memory. Try a smaller dictionary size" );
+ pp( mem_msg );
else
internal_error( "invalid argument to encoder" );
fatal();
}
int written = 0;
- int new_size = 0;
+ int new_pos = 0;
while( true )
{
if( LZ_compress_write_size( encoder ) > 0 )
@@ -359,8 +395,8 @@ extern "C" void * cworker( void * arg )
if( written >= packet->size )
{ delete[] packet->data; LZ_compress_finish( encoder ); }
}
- const int rd = LZ_compress_read( encoder, new_data + new_size,
- max_compr_size - new_size );
+ const int rd = LZ_compress_read( encoder, new_data + new_pos,
+ max_compr_size - new_pos );
if( rd < 0 )
{
pp();
@@ -369,8 +405,8 @@ extern "C" void * cworker( void * arg )
LZ_strerror( LZ_compress_errno( encoder ) ) );
fatal();
}
- new_size += rd;
- if( new_size > max_compr_size )
+ new_pos += rd;
+ if( new_pos > max_compr_size )
internal_error( "packet size exceeded in worker" );
if( LZ_compress_finished( encoder ) == 1 ) break;
}
@@ -379,7 +415,7 @@ extern "C" void * cworker( void * arg )
{ pp( "LZ_compress_close failed" ); fatal(); }
packet->data = new_data;
- packet->size = new_size;
+ packet->size = new_pos;
courier.collect_packet( packet );
}
return 0;
@@ -390,21 +426,26 @@ extern "C" void * cworker( void * arg )
// their contents to the output file.
void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
{
+ std::vector< const Packet * > packet_vector;
while( true )
{
- Packet * opacket = courier.deliver_packet();
- if( opacket == 0 ) break; // queue is empty. all workers exited
-
- out_size += opacket->size;
+ courier.deliver_packets( packet_vector );
+ if( packet_vector.size() == 0 ) break; // all workers exited
- if( outfd >= 0 )
+ for( unsigned i = 0; i < packet_vector.size(); ++i )
{
- const int wr = writeblock( outfd, opacket->data, opacket->size );
- if( wr != opacket->size )
- { pp(); show_error( "Write error", errno ); fatal(); }
+ const Packet * const opacket = packet_vector[i];
+ out_size += opacket->size;
+
+ if( outfd >= 0 )
+ {
+ const int wr = writeblock( outfd, opacket->data, opacket->size );
+ if( wr != opacket->size )
+ { pp(); show_error( "Write error", errno ); fatal(); }
+ }
+ delete[] opacket->data;
+ delete opacket;
}
- delete[] opacket->data;
- delete opacket;
}
}
@@ -419,11 +460,11 @@ int compress( const int data_size, const int dictionary_size,
const Pretty_print & pp, const int debug_level )
{
const int slots_per_worker = 2;
- const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ?
- num_workers * slots_per_worker : INT_MAX );
+ const int num_slots =
+ ( ( num_workers > 1 ) ? num_workers * slots_per_worker : 1 );
in_size = 0;
out_size = 0;
- Packet_courier courier( num_workers, num_slots - 1 );
+ Packet_courier courier( num_workers, num_slots );
Splitter_arg splitter_arg;
splitter_arg.courier = &courier;
@@ -443,8 +484,7 @@ int compress( const int data_size, const int dictionary_size,
worker_arg.match_len_limit = match_len_limit;
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
- if( worker_threads == 0 )
- { pp( "Not enough memory" ); fatal(); }
+ if( !worker_threads ) { pp( mem_msg ); fatal(); }
for( int i = 0; i < num_workers; ++i )
{
errcode = pthread_create( worker_threads + i, 0, cworker, &worker_arg );
@@ -460,7 +500,7 @@ int compress( const int data_size, const int dictionary_size,
if( errcode )
{ show_error( "Can't join worker threads", errcode ); fatal(); }
}
- delete[] worker_threads; worker_threads = 0;
+ delete[] worker_threads;
errcode = pthread_join( splitter_thread, 0 );
if( errcode )
@@ -468,11 +508,11 @@ int compress( const int data_size, const int dictionary_size,
if( verbosity >= 1 )
{
- if( in_size <= 0 || out_size <= 0 )
+ if( in_size == 0 || out_size == 0 )
std::fprintf( stderr, " no data compressed.\n" );
else
std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, "
- "%5.2f%% saved, %lld in, %lld out.\n",
+ "%5.2f%% saved, %llu in, %llu out.\n",
(double)in_size / out_size,
( 8.0 * out_size ) / in_size,
100.0 * ( 1.0 - ( (double)out_size / in_size ) ),
@@ -481,14 +521,10 @@ int compress( const int data_size, const int dictionary_size,
if( debug_level & 1 )
std::fprintf( stderr,
- "splitter tried to send a packet %8lu times\n"
- "splitter had to wait %8lu times\n"
- "any worker tried to consume from splitter %8lu times\n"
- "any worker had to wait %8lu times\n"
- "muxer tried to consume from workers %8lu times\n"
- "muxer had to wait %8lu times\n",
- courier.tally().check_counter,
- courier.tally().wait_counter,
+ "any worker tried to consume from splitter %8u times\n"
+ "any worker had to wait %8u times\n"
+ "muxer tried to consume from workers %8u times\n"
+ "muxer had to wait %8u times\n",
courier.icheck_counter,
courier.iwait_counter,
courier.ocheck_counter,