diff options
Diffstat (limited to 'create_lz.cc')
-rw-r--r-- | create_lz.cc | 60 |
1 files changed, 49 insertions, 11 deletions
diff --git a/create_lz.cc b/create_lz.cc index 48c6a3a..797427b 100644 --- a/create_lz.cc +++ b/create_lz.cc @@ -45,6 +45,42 @@ Packet_courier * courierp = 0; // local vars needed by add_member unsigned long long partial_data_size = 0; // size of current block +class Slot_tally + { + const int num_slots; // total slots + int num_free; // remaining free slots + pthread_mutex_t mutex; + pthread_cond_t slot_av; // slot available + + Slot_tally( const Slot_tally & ); // declared as private + void operator=( const Slot_tally & ); // declared as private + +public: + explicit Slot_tally( const int slots ) + : num_slots( slots ), num_free( slots ) + { xinit_mutex( &mutex ); xinit_cond( &slot_av ); } + + ~Slot_tally() { xdestroy_cond( &slot_av ); xdestroy_mutex( &mutex ); } + + bool all_free() { return ( num_free == num_slots ); } + + void get_slot() // wait for a free slot + { + xlock( &mutex ); + while( num_free <= 0 ) xwait( &slot_av, &mutex ); + --num_free; + xunlock( &mutex ); + } + + void leave_slot() // return a slot to the tally + { + xlock( &mutex ); + if( ++num_free == 1 ) xsignal( &slot_av ); // num_free was 0 + xunlock( &mutex ); + } + }; + + struct Ipacket // filename, file size and headers { const unsigned long long file_size; @@ -458,7 +494,7 @@ extern "C" void * cworker( void * arg ) /* Get from courier the processed and sorted packets, and write their contents to the output archive. */ -bool muxer( Packet_courier & courier, const char * const archive_name, +void muxer( Packet_courier & courier, const char * const archive_name, const int outfd ) { while( true ) @@ -466,13 +502,12 @@ bool muxer( Packet_courier & courier, const char * const archive_name, const Opacket * const opacket = courier.deliver_packet(); if( !opacket ) break; // queue is empty. all workers exited - const int wr = writeblock( outfd, opacket->data, opacket->size ); - if( wr != opacket->size ) - { show_file_error( archive_name, "Write error", errno ); return false; } + if( writeblock( outfd, opacket->data, opacket->size ) != opacket->size ) + { show_file_error( archive_name, "Write error", errno ); + cleanup_and_fail(); } delete[] opacket->data; delete opacket; } - return true; } } // end namespace @@ -488,6 +523,8 @@ int encode_lz( const char * const archive_name, const Arg_parser & parser, num_workers * in_slots : INT_MAX; const int out_slots = 64; + /* If an error happens after any threads have been started, exit must be + called before courier goes out of scope. */ Packet_courier courier( num_workers, total_in_slots, out_slots ); courierp = &courier; // needed by add_member @@ -498,11 +535,12 @@ int encode_lz( const char * const archive_name, const Arg_parser & parser, pthread_t grouper_thread; int errcode = pthread_create( &grouper_thread, 0, grouper, &grouper_arg ); if( errcode ) - { show_error( "Can't create grouper thread", errcode ); return 1; } + { show_error( "Can't create grouper thread", errcode ); cleanup_and_fail(); } Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers]; pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; - if( !worker_args || !worker_threads ) { show_error( mem_msg ); return 1; } + if( !worker_args || !worker_threads ) + { show_error( mem_msg ); cleanup_and_fail(); } for( int i = 0; i < num_workers; ++i ) { worker_args[i].courier = &courier; @@ -511,23 +549,23 @@ int encode_lz( const char * const archive_name, const Arg_parser & parser, worker_args[i].worker_id = i; errcode = pthread_create( &worker_threads[i], 0, cworker, &worker_args[i] ); if( errcode ) - { show_error( "Can't create worker threads", errcode ); return 1; } + { show_error( "Can't create worker threads", errcode ); cleanup_and_fail(); } } - if( !muxer( courier, archive_name, outfd ) ) return 1; + muxer( courier, archive_name, outfd ); for( int i = num_workers - 1; i >= 0; --i ) { errcode = pthread_join( worker_threads[i], 0 ); if( errcode ) - { show_error( "Can't join worker threads", errcode ); return 1; } + { show_error( "Can't join worker threads", errcode ); cleanup_and_fail(); } } delete[] worker_threads; delete[] worker_args; errcode = pthread_join( grouper_thread, 0 ); if( errcode ) - { show_error( "Can't join grouper thread", errcode ); return 1; } + { show_error( "Can't join grouper thread", errcode ); cleanup_and_fail(); } // write End-Of-Archive records int retval = 0; |