diff options
Diffstat (limited to 'decode_lz.cc')
-rw-r--r-- | decode_lz.cc | 107 |
1 files changed, 56 insertions, 51 deletions
diff --git a/decode_lz.cc b/decode_lz.cc index 867ffa5..b8763f3 100644 --- a/decode_lz.cc +++ b/decode_lz.cc @@ -22,15 +22,15 @@ #include <cstdio> #include <queue> #include <pthread.h> -#include <stdint.h> // for lzlib.h +#include <stdint.h> // for lzlib.h #include <unistd.h> #include <utime.h> #include <sys/stat.h> #if !defined __FreeBSD__ && !defined __OpenBSD__ && !defined __NetBSD__ && \ !defined __DragonFly__ && !defined __APPLE__ && !defined __OS2__ -#include <sys/sysmacros.h> // for major, minor, makedev +#include <sys/sysmacros.h> // for major, minor, makedev #else -#include <sys/types.h> // for major, minor, makedev +#include <sys/types.h> // for major, minor, makedev #endif #include <lzlib.h> @@ -73,8 +73,8 @@ public: unsigned owait_counter; private: long error_member_id; // first lzip member with error/misalign/eoa/eof - int deliver_worker_id; // worker queue currently delivering packets - int master_worker_id; // worker in charge if error/misalign/eoa/eof + int deliver_id; // worker queue currently delivering packets + int master_id; // worker in charge if error/misalign/eoa/eof std::vector< std::queue< const Packet * > > opacket_queues; int num_working; // number of workers still running const int num_workers; // number of workers @@ -90,11 +90,10 @@ private: public: Packet_courier( const int workers, const int slots ) - : ocheck_counter( 0 ), owait_counter( 0 ), - error_member_id( -1 ), deliver_worker_id( 0 ), master_worker_id( -1 ), - opacket_queues( workers ), num_working( workers ), - num_workers( workers ), out_slots( slots ), slot_av( workers ), - eoa_found_( false ) + : ocheck_counter( 0 ), owait_counter( 0 ), error_member_id( -1 ), + deliver_id( 0 ), master_id( -1 ), opacket_queues( workers ), + num_working( workers ), num_workers( workers ), + out_slots( slots ), slot_av( workers ), eoa_found_( false ) { xinit_mutex( &omutex ); xinit_cond( &oav_or_exit ); for( unsigned i = 0; i < slot_av.size(); ++i ) xinit_cond( &slot_av[i] ); @@ -111,22 +110,22 @@ public: bool eoa_found() const { return eoa_found_; } void report_eoa() { eoa_found_ = true; } - bool mastership_granted() const { return master_worker_id >= 0; } + bool mastership_granted() const { return master_id >= 0; } bool request_mastership( const long member_id, const int worker_id ) { xlock( &omutex ); if( mastership_granted() ) // already granted - { xunlock( &omutex ); return ( master_worker_id == worker_id ); } + { xunlock( &omutex ); return master_id == worker_id; } if( error_member_id < 0 || error_member_id > member_id ) error_member_id = member_id; - while( !mastership_granted() && ( worker_id != deliver_worker_id || - !opacket_queues[deliver_worker_id].empty() ) ) + while( !mastership_granted() && ( worker_id != deliver_id || + !opacket_queues[deliver_id].empty() ) ) xwait( &check_master, &omutex ); - if( !mastership_granted() && worker_id == deliver_worker_id && - opacket_queues[deliver_worker_id].empty() ) + if( !mastership_granted() && worker_id == deliver_id && + opacket_queues[deliver_id].empty() ) { - master_worker_id = worker_id; // grant mastership + master_id = worker_id; // grant mastership for( int i = 0; i < num_workers; ++i ) // delete all packets while( !opacket_queues[i].empty() ) opacket_queues[i].pop(); @@ -154,46 +153,47 @@ public: { const Packet * const opacket = new Packet( member_id, msg, status, errcode ); xlock( &omutex ); - if( ( mastership_granted() && master_worker_id != worker_id ) || + if( ( mastership_granted() && master_id != worker_id ) || ( error_member_id >= 0 && error_member_id < opacket->member_id ) ) { xunlock( &omutex ); delete opacket; return false; } // reject packet while( opacket_queues[worker_id].size() >= out_slots ) xwait( &slot_av[worker_id], &omutex ); opacket_queues[worker_id].push( opacket ); - if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit ); + if( worker_id == deliver_id ) xsignal( &oav_or_exit ); xunlock( &omutex ); return true; } - /* Deliver a packet to muxer. + /* Deliver packets to muxer. If packet.status == Packet::member_done, move to next queue. - If packet.line.empty(), wait again (empty lzip member). */ - const Packet * deliver_packet() + If packet.line.empty(), wait again (empty lzip member or -q). */ + void deliver_packets( std::vector< const Packet * > & opacket_vector ) { - const Packet * opacket = 0; + opacket_vector.clear(); xlock( &omutex ); ++ocheck_counter; - while( true ) - { - while( opacket_queues[deliver_worker_id].empty() && num_working > 0 ) + do { + while( opacket_queues[deliver_id].empty() && num_working > 0 ) { ++owait_counter; if( !mastership_granted() && error_member_id >= 0 ) xbroadcast( &check_master ); // mastership requested not yet granted xwait( &oav_or_exit, &omutex ); } - if( opacket_queues[deliver_worker_id].empty() ) break; - opacket = opacket_queues[deliver_worker_id].front(); - opacket_queues[deliver_worker_id].pop(); - if( opacket_queues[deliver_worker_id].size() + 1 == out_slots ) - xsignal( &slot_av[deliver_worker_id] ); - if( opacket->status == Packet::member_done && !mastership_granted() ) - { if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0; } - if( !opacket->line.empty() ) break; - delete opacket; opacket = 0; + while( !opacket_queues[deliver_id].empty() ) + { + const Packet * opacket = opacket_queues[deliver_id].front(); + opacket_queues[deliver_id].pop(); + if( opacket_queues[deliver_id].size() + 1 == out_slots ) + xsignal( &slot_av[deliver_id] ); + if( opacket->status == Packet::member_done && !mastership_granted() ) + { if( ++deliver_id >= num_workers ) deliver_id = 0; } + if( !opacket->line.empty() ) opacket_vector.push_back( opacket ); + else delete opacket; + } } + while( opacket_vector.empty() && num_working > 0 ); xunlock( &omutex ); - return opacket; } bool finished() // all packets delivered to muxer @@ -401,7 +401,7 @@ Trival extract_member_lz( const Cl_options & cl_opts, case tf_directory: { struct stat st; - bool exists = ( stat( filename, &st ) == 0 ); + bool exists = stat( filename, &st ) == 0; if( exists && !S_ISDIR( st.st_mode ) ) { exists = false; std::remove( filename ); } if( !exists && mkdir( filename, mode ) != 0 && errno != EEXIST ) @@ -446,7 +446,7 @@ Trival extract_member_lz( const Cl_options & cl_opts, typeflag ); } - const bool islink = ( typeflag == tf_link || typeflag == tf_symlink ); + const bool islink = typeflag == tf_link || typeflag == tf_symlink; errno = 0; if( !islink && ( !uid_gid_in_range( extended.get_uid(), extended.get_gid() ) || @@ -489,7 +489,7 @@ Trival extract_member_lz( const Cl_options & cl_opts, } const int wsize = ( rest >= bufsize ) ? bufsize : rest; if( outfd >= 0 && writeblock( outfd, buf, wsize ) != wsize ) - { format_file_error( rbuf, filename, werr_msg, errno ); + { format_file_error( rbuf, filename, wr_err_msg, errno ); return Trival( rbuf(), 0, 1 ); } rest -= wsize; } @@ -672,24 +672,29 @@ done: */ void muxer( const char * const archive_namep, Packet_courier & courier ) { + std::vector< const Packet * > opacket_vector; int retval = 0; while( retval == 0 ) { - const Packet * const opacket = courier.deliver_packet(); - if( !opacket ) break; // queue is empty. all workers exited + courier.deliver_packets( opacket_vector ); + if( opacket_vector.empty() ) break; // queue is empty. all workers exited - switch( opacket->status ) + for( unsigned i = 0; i < opacket_vector.size(); ++i ) { - case Packet::error1: - case Packet::error2: - show_file_error( archive_namep, opacket->line.c_str(), opacket->errcode ); - retval = ( opacket->status == Packet::error1 ) ? 1 : 2; break; - case Packet::prefix: show_error( opacket->line.c_str() ); break; - case Packet::diag: std::fputs( opacket->line.c_str(), stderr ); break; - default: if( opacket->line.size() ) - { std::fputs( opacket->line.c_str(), stdout ); std::fflush( stdout ); } + const Packet * const opacket = opacket_vector[i]; + switch( opacket->status ) + { + case Packet::error1: + case Packet::error2: + show_file_error( archive_namep, opacket->line.c_str(), opacket->errcode ); + retval = ( opacket->status == Packet::error1 ) ? 1 : 2; break; + case Packet::prefix: show_error( opacket->line.c_str() ); break; + case Packet::diag: std::fputs( opacket->line.c_str(), stderr ); break; + default: if( opacket->line.size() ) + { std::fputs( opacket->line.c_str(), stdout ); std::fflush( stdout ); } + } + delete opacket; } - delete opacket; } if( retval == 0 && !courier.eoa_found() ) // no worker found EOA blocks { show_file_error( archive_namep, end_msg ); retval = 2; } |