summaryrefslogtreecommitdiffstats
path: root/decode_lz.cc
diff options
context:
space:
mode:
Diffstat (limited to 'decode_lz.cc')
-rw-r--r--decode_lz.cc107
1 files changed, 56 insertions, 51 deletions
diff --git a/decode_lz.cc b/decode_lz.cc
index 867ffa5..b8763f3 100644
--- a/decode_lz.cc
+++ b/decode_lz.cc
@@ -22,15 +22,15 @@
#include <cstdio>
#include <queue>
#include <pthread.h>
-#include <stdint.h> // for lzlib.h
+#include <stdint.h> // for lzlib.h
#include <unistd.h>
#include <utime.h>
#include <sys/stat.h>
#if !defined __FreeBSD__ && !defined __OpenBSD__ && !defined __NetBSD__ && \
!defined __DragonFly__ && !defined __APPLE__ && !defined __OS2__
-#include <sys/sysmacros.h> // for major, minor, makedev
+#include <sys/sysmacros.h> // for major, minor, makedev
#else
-#include <sys/types.h> // for major, minor, makedev
+#include <sys/types.h> // for major, minor, makedev
#endif
#include <lzlib.h>
@@ -73,8 +73,8 @@ public:
unsigned owait_counter;
private:
long error_member_id; // first lzip member with error/misalign/eoa/eof
- int deliver_worker_id; // worker queue currently delivering packets
- int master_worker_id; // worker in charge if error/misalign/eoa/eof
+ int deliver_id; // worker queue currently delivering packets
+ int master_id; // worker in charge if error/misalign/eoa/eof
std::vector< std::queue< const Packet * > > opacket_queues;
int num_working; // number of workers still running
const int num_workers; // number of workers
@@ -90,11 +90,10 @@ private:
public:
Packet_courier( const int workers, const int slots )
- : ocheck_counter( 0 ), owait_counter( 0 ),
- error_member_id( -1 ), deliver_worker_id( 0 ), master_worker_id( -1 ),
- opacket_queues( workers ), num_working( workers ),
- num_workers( workers ), out_slots( slots ), slot_av( workers ),
- eoa_found_( false )
+ : ocheck_counter( 0 ), owait_counter( 0 ), error_member_id( -1 ),
+ deliver_id( 0 ), master_id( -1 ), opacket_queues( workers ),
+ num_working( workers ), num_workers( workers ),
+ out_slots( slots ), slot_av( workers ), eoa_found_( false )
{
xinit_mutex( &omutex ); xinit_cond( &oav_or_exit );
for( unsigned i = 0; i < slot_av.size(); ++i ) xinit_cond( &slot_av[i] );
@@ -111,22 +110,22 @@ public:
bool eoa_found() const { return eoa_found_; }
void report_eoa() { eoa_found_ = true; }
- bool mastership_granted() const { return master_worker_id >= 0; }
+ bool mastership_granted() const { return master_id >= 0; }
bool request_mastership( const long member_id, const int worker_id )
{
xlock( &omutex );
if( mastership_granted() ) // already granted
- { xunlock( &omutex ); return ( master_worker_id == worker_id ); }
+ { xunlock( &omutex ); return master_id == worker_id; }
if( error_member_id < 0 || error_member_id > member_id )
error_member_id = member_id;
- while( !mastership_granted() && ( worker_id != deliver_worker_id ||
- !opacket_queues[deliver_worker_id].empty() ) )
+ while( !mastership_granted() && ( worker_id != deliver_id ||
+ !opacket_queues[deliver_id].empty() ) )
xwait( &check_master, &omutex );
- if( !mastership_granted() && worker_id == deliver_worker_id &&
- opacket_queues[deliver_worker_id].empty() )
+ if( !mastership_granted() && worker_id == deliver_id &&
+ opacket_queues[deliver_id].empty() )
{
- master_worker_id = worker_id; // grant mastership
+ master_id = worker_id; // grant mastership
for( int i = 0; i < num_workers; ++i ) // delete all packets
while( !opacket_queues[i].empty() )
opacket_queues[i].pop();
@@ -154,46 +153,47 @@ public:
{
const Packet * const opacket = new Packet( member_id, msg, status, errcode );
xlock( &omutex );
- if( ( mastership_granted() && master_worker_id != worker_id ) ||
+ if( ( mastership_granted() && master_id != worker_id ) ||
( error_member_id >= 0 && error_member_id < opacket->member_id ) )
{ xunlock( &omutex ); delete opacket; return false; } // reject packet
while( opacket_queues[worker_id].size() >= out_slots )
xwait( &slot_av[worker_id], &omutex );
opacket_queues[worker_id].push( opacket );
- if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit );
+ if( worker_id == deliver_id ) xsignal( &oav_or_exit );
xunlock( &omutex );
return true;
}
- /* Deliver a packet to muxer.
+ /* Deliver packets to muxer.
If packet.status == Packet::member_done, move to next queue.
- If packet.line.empty(), wait again (empty lzip member). */
- const Packet * deliver_packet()
+ If packet.line.empty(), wait again (empty lzip member or -q). */
+ void deliver_packets( std::vector< const Packet * > & opacket_vector )
{
- const Packet * opacket = 0;
+ opacket_vector.clear();
xlock( &omutex );
++ocheck_counter;
- while( true )
- {
- while( opacket_queues[deliver_worker_id].empty() && num_working > 0 )
+ do {
+ while( opacket_queues[deliver_id].empty() && num_working > 0 )
{
++owait_counter;
if( !mastership_granted() && error_member_id >= 0 )
xbroadcast( &check_master ); // mastership requested not yet granted
xwait( &oav_or_exit, &omutex );
}
- if( opacket_queues[deliver_worker_id].empty() ) break;
- opacket = opacket_queues[deliver_worker_id].front();
- opacket_queues[deliver_worker_id].pop();
- if( opacket_queues[deliver_worker_id].size() + 1 == out_slots )
- xsignal( &slot_av[deliver_worker_id] );
- if( opacket->status == Packet::member_done && !mastership_granted() )
- { if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0; }
- if( !opacket->line.empty() ) break;
- delete opacket; opacket = 0;
+ while( !opacket_queues[deliver_id].empty() )
+ {
+ const Packet * opacket = opacket_queues[deliver_id].front();
+ opacket_queues[deliver_id].pop();
+ if( opacket_queues[deliver_id].size() + 1 == out_slots )
+ xsignal( &slot_av[deliver_id] );
+ if( opacket->status == Packet::member_done && !mastership_granted() )
+ { if( ++deliver_id >= num_workers ) deliver_id = 0; }
+ if( !opacket->line.empty() ) opacket_vector.push_back( opacket );
+ else delete opacket;
+ }
}
+ while( opacket_vector.empty() && num_working > 0 );
xunlock( &omutex );
- return opacket;
}
bool finished() // all packets delivered to muxer
@@ -401,7 +401,7 @@ Trival extract_member_lz( const Cl_options & cl_opts,
case tf_directory:
{
struct stat st;
- bool exists = ( stat( filename, &st ) == 0 );
+ bool exists = stat( filename, &st ) == 0;
if( exists && !S_ISDIR( st.st_mode ) )
{ exists = false; std::remove( filename ); }
if( !exists && mkdir( filename, mode ) != 0 && errno != EEXIST )
@@ -446,7 +446,7 @@ Trival extract_member_lz( const Cl_options & cl_opts,
typeflag );
}
- const bool islink = ( typeflag == tf_link || typeflag == tf_symlink );
+ const bool islink = typeflag == tf_link || typeflag == tf_symlink;
errno = 0;
if( !islink &&
( !uid_gid_in_range( extended.get_uid(), extended.get_gid() ) ||
@@ -489,7 +489,7 @@ Trival extract_member_lz( const Cl_options & cl_opts,
}
const int wsize = ( rest >= bufsize ) ? bufsize : rest;
if( outfd >= 0 && writeblock( outfd, buf, wsize ) != wsize )
- { format_file_error( rbuf, filename, werr_msg, errno );
+ { format_file_error( rbuf, filename, wr_err_msg, errno );
return Trival( rbuf(), 0, 1 ); }
rest -= wsize;
}
@@ -672,24 +672,29 @@ done:
*/
void muxer( const char * const archive_namep, Packet_courier & courier )
{
+ std::vector< const Packet * > opacket_vector;
int retval = 0;
while( retval == 0 )
{
- const Packet * const opacket = courier.deliver_packet();
- if( !opacket ) break; // queue is empty. all workers exited
+ courier.deliver_packets( opacket_vector );
+ if( opacket_vector.empty() ) break; // queue is empty. all workers exited
- switch( opacket->status )
+ for( unsigned i = 0; i < opacket_vector.size(); ++i )
{
- case Packet::error1:
- case Packet::error2:
- show_file_error( archive_namep, opacket->line.c_str(), opacket->errcode );
- retval = ( opacket->status == Packet::error1 ) ? 1 : 2; break;
- case Packet::prefix: show_error( opacket->line.c_str() ); break;
- case Packet::diag: std::fputs( opacket->line.c_str(), stderr ); break;
- default: if( opacket->line.size() )
- { std::fputs( opacket->line.c_str(), stdout ); std::fflush( stdout ); }
+ const Packet * const opacket = opacket_vector[i];
+ switch( opacket->status )
+ {
+ case Packet::error1:
+ case Packet::error2:
+ show_file_error( archive_namep, opacket->line.c_str(), opacket->errcode );
+ retval = ( opacket->status == Packet::error1 ) ? 1 : 2; break;
+ case Packet::prefix: show_error( opacket->line.c_str() ); break;
+ case Packet::diag: std::fputs( opacket->line.c_str(), stderr ); break;
+ default: if( opacket->line.size() )
+ { std::fputs( opacket->line.c_str(), stdout ); std::fflush( stdout ); }
+ }
+ delete opacket;
}
- delete opacket;
}
if( retval == 0 && !courier.eoa_found() ) // no worker found EOA blocks
{ show_file_error( archive_namep, end_msg ); retval = 2; }