summaryrefslogtreecommitdiffstats
path: root/create_lz.cc
diff options
context:
space:
mode:
Diffstat (limited to 'create_lz.cc')
-rw-r--r--create_lz.cc60
1 files changed, 49 insertions, 11 deletions
diff --git a/create_lz.cc b/create_lz.cc
index 48c6a3a..797427b 100644
--- a/create_lz.cc
+++ b/create_lz.cc
@@ -45,6 +45,42 @@ Packet_courier * courierp = 0; // local vars needed by add_member
unsigned long long partial_data_size = 0; // size of current block
+class Slot_tally
+ {
+ const int num_slots; // total slots
+ int num_free; // remaining free slots
+ pthread_mutex_t mutex;
+ pthread_cond_t slot_av; // slot available
+
+ Slot_tally( const Slot_tally & ); // declared as private
+ void operator=( const Slot_tally & ); // declared as private
+
+public:
+ explicit Slot_tally( const int slots )
+ : num_slots( slots ), num_free( slots )
+ { xinit_mutex( &mutex ); xinit_cond( &slot_av ); }
+
+ ~Slot_tally() { xdestroy_cond( &slot_av ); xdestroy_mutex( &mutex ); }
+
+ bool all_free() { return ( num_free == num_slots ); }
+
+ void get_slot() // wait for a free slot
+ {
+ xlock( &mutex );
+ while( num_free <= 0 ) xwait( &slot_av, &mutex );
+ --num_free;
+ xunlock( &mutex );
+ }
+
+ void leave_slot() // return a slot to the tally
+ {
+ xlock( &mutex );
+ if( ++num_free == 1 ) xsignal( &slot_av ); // num_free was 0
+ xunlock( &mutex );
+ }
+ };
+
+
struct Ipacket // filename, file size and headers
{
const unsigned long long file_size;
@@ -458,7 +494,7 @@ extern "C" void * cworker( void * arg )
/* Get from courier the processed and sorted packets, and write
their contents to the output archive. */
-bool muxer( Packet_courier & courier, const char * const archive_name,
+void muxer( Packet_courier & courier, const char * const archive_name,
const int outfd )
{
while( true )
@@ -466,13 +502,12 @@ bool muxer( Packet_courier & courier, const char * const archive_name,
const Opacket * const opacket = courier.deliver_packet();
if( !opacket ) break; // queue is empty. all workers exited
- const int wr = writeblock( outfd, opacket->data, opacket->size );
- if( wr != opacket->size )
- { show_file_error( archive_name, "Write error", errno ); return false; }
+ if( writeblock( outfd, opacket->data, opacket->size ) != opacket->size )
+ { show_file_error( archive_name, "Write error", errno );
+ cleanup_and_fail(); }
delete[] opacket->data;
delete opacket;
}
- return true;
}
} // end namespace
@@ -488,6 +523,8 @@ int encode_lz( const char * const archive_name, const Arg_parser & parser,
num_workers * in_slots : INT_MAX;
const int out_slots = 64;
+ /* If an error happens after any threads have been started, exit must be
+ called before courier goes out of scope. */
Packet_courier courier( num_workers, total_in_slots, out_slots );
courierp = &courier; // needed by add_member
@@ -498,11 +535,12 @@ int encode_lz( const char * const archive_name, const Arg_parser & parser,
pthread_t grouper_thread;
int errcode = pthread_create( &grouper_thread, 0, grouper, &grouper_arg );
if( errcode )
- { show_error( "Can't create grouper thread", errcode ); return 1; }
+ { show_error( "Can't create grouper thread", errcode ); cleanup_and_fail(); }
Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers];
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
- if( !worker_args || !worker_threads ) { show_error( mem_msg ); return 1; }
+ if( !worker_args || !worker_threads )
+ { show_error( mem_msg ); cleanup_and_fail(); }
for( int i = 0; i < num_workers; ++i )
{
worker_args[i].courier = &courier;
@@ -511,23 +549,23 @@ int encode_lz( const char * const archive_name, const Arg_parser & parser,
worker_args[i].worker_id = i;
errcode = pthread_create( &worker_threads[i], 0, cworker, &worker_args[i] );
if( errcode )
- { show_error( "Can't create worker threads", errcode ); return 1; }
+ { show_error( "Can't create worker threads", errcode ); cleanup_and_fail(); }
}
- if( !muxer( courier, archive_name, outfd ) ) return 1;
+ muxer( courier, archive_name, outfd );
for( int i = num_workers - 1; i >= 0; --i )
{
errcode = pthread_join( worker_threads[i], 0 );
if( errcode )
- { show_error( "Can't join worker threads", errcode ); return 1; }
+ { show_error( "Can't join worker threads", errcode ); cleanup_and_fail(); }
}
delete[] worker_threads;
delete[] worker_args;
errcode = pthread_join( grouper_thread, 0 );
if( errcode )
- { show_error( "Can't join grouper thread", errcode ); return 1; }
+ { show_error( "Can't join grouper thread", errcode ); cleanup_and_fail(); }
// write End-Of-Archive records
int retval = 0;