summaryrefslogtreecommitdiffstats
path: root/decode_lz.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-14 12:57:29 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-14 12:57:29 +0000
commit29146f385a524ad6a4b1b127cc3d9641a8fe0adc (patch)
tree1caea11496a3d9e0333cdf649d9f9be6d5a67b78 /decode_lz.cc
parentInitial commit. (diff)
downloadtarlz-29146f385a524ad6a4b1b127cc3d9641a8fe0adc.tar.xz
tarlz-29146f385a524ad6a4b1b127cc3d9641a8fe0adc.zip
Adding upstream version 0.25.upstream/0.25upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'decode_lz.cc')
-rw-r--r--decode_lz.cc765
1 files changed, 765 insertions, 0 deletions
diff --git a/decode_lz.cc b/decode_lz.cc
new file mode 100644
index 0000000..867ffa5
--- /dev/null
+++ b/decode_lz.cc
@@ -0,0 +1,765 @@
+/* Tarlz - Archiver with multimember lzip compression
+ Copyright (C) 2013-2024 Antonio Diaz Diaz.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#define _FILE_OFFSET_BITS 64
+
+#include <algorithm>
+#include <cerrno>
+#include <cstdio>
+#include <queue>
+#include <pthread.h>
+#include <stdint.h> // for lzlib.h
+#include <unistd.h>
+#include <utime.h>
+#include <sys/stat.h>
+#if !defined __FreeBSD__ && !defined __OpenBSD__ && !defined __NetBSD__ && \
+ !defined __DragonFly__ && !defined __APPLE__ && !defined __OS2__
+#include <sys/sysmacros.h> // for major, minor, makedev
+#else
+#include <sys/types.h> // for major, minor, makedev
+#endif
+#include <lzlib.h>
+
+#include "tarlz.h"
+#include "arg_parser.h"
+#include "lzip_index.h"
+#include "archive_reader.h"
+#include "common_mutex.h"
+#include "decode.h"
+
+/* When a problem is detected by any worker:
+ - the worker requests mastership and returns.
+ - the courier discards new packets received or collected.
+ - the other workers return.
+ - the muxer drains the queue and returns. */
+
+namespace {
+
+const char * const other_msg = "Other worker found an error.";
+
+/* line is preformatted and newline terminated except for prefix, error.
+ ok with an empty line is a no-op. */
+struct Packet // member name and metadata or error message
+ {
+ enum Status { ok, member_done, diag, prefix, error1, error2 };
+
+ long member_id; // lzip member containing the header of this tar member
+ std::string line; // member name and metadata ready to print, if any
+ Status status; // diagnostics and errors go to stderr
+ int errcode; // for error
+ Packet( const long i, const char * const msg, const Status s, const int e )
+ : member_id( i ), line( msg ), status( s ), errcode( e ) {}
+ };
+
+
+class Packet_courier // moves packets around
+ {
+public:
+ unsigned ocheck_counter;
+ unsigned owait_counter;
+private:
+ long error_member_id; // first lzip member with error/misalign/eoa/eof
+ int deliver_worker_id; // worker queue currently delivering packets
+ int master_worker_id; // worker in charge if error/misalign/eoa/eof
+ std::vector< std::queue< const Packet * > > opacket_queues;
+ int num_working; // number of workers still running
+ const int num_workers; // number of workers
+ const unsigned out_slots; // max output packets per queue
+ pthread_mutex_t omutex;
+ pthread_cond_t oav_or_exit; // output packet available or all workers exited
+ std::vector< pthread_cond_t > slot_av; // output slot available
+ pthread_cond_t check_master;
+ bool eoa_found_; // EOA blocks found
+
+ Packet_courier( const Packet_courier & ); // declared as private
+ void operator=( const Packet_courier & ); // declared as private
+
+public:
+ Packet_courier( const int workers, const int slots )
+ : ocheck_counter( 0 ), owait_counter( 0 ),
+ error_member_id( -1 ), deliver_worker_id( 0 ), master_worker_id( -1 ),
+ opacket_queues( workers ), num_working( workers ),
+ num_workers( workers ), out_slots( slots ), slot_av( workers ),
+ eoa_found_( false )
+ {
+ xinit_mutex( &omutex ); xinit_cond( &oav_or_exit );
+ for( unsigned i = 0; i < slot_av.size(); ++i ) xinit_cond( &slot_av[i] );
+ xinit_cond( &check_master );
+ }
+
+ ~Packet_courier()
+ {
+ xdestroy_cond( &check_master );
+ for( unsigned i = 0; i < slot_av.size(); ++i ) xdestroy_cond( &slot_av[i] );
+ xdestroy_cond( &oav_or_exit ); xdestroy_mutex( &omutex );
+ }
+
+ bool eoa_found() const { return eoa_found_; }
+ void report_eoa() { eoa_found_ = true; }
+
+ bool mastership_granted() const { return master_worker_id >= 0; }
+
+ bool request_mastership( const long member_id, const int worker_id )
+ {
+ xlock( &omutex );
+ if( mastership_granted() ) // already granted
+ { xunlock( &omutex ); return ( master_worker_id == worker_id ); }
+ if( error_member_id < 0 || error_member_id > member_id )
+ error_member_id = member_id;
+ while( !mastership_granted() && ( worker_id != deliver_worker_id ||
+ !opacket_queues[deliver_worker_id].empty() ) )
+ xwait( &check_master, &omutex );
+ if( !mastership_granted() && worker_id == deliver_worker_id &&
+ opacket_queues[deliver_worker_id].empty() )
+ {
+ master_worker_id = worker_id; // grant mastership
+ for( int i = 0; i < num_workers; ++i ) // delete all packets
+ while( !opacket_queues[i].empty() )
+ opacket_queues[i].pop();
+ xbroadcast( &check_master );
+ xunlock( &omutex );
+ return true;
+ }
+ xunlock( &omutex );
+ return false; // mastership granted to another worker
+ }
+
+ void worker_finished()
+ {
+ // notify muxer when last worker exits
+ xlock( &omutex );
+ if( --num_working == 0 ) xsignal( &oav_or_exit );
+ xunlock( &omutex );
+ }
+
+ /* Collect a packet from a worker.
+ If a packet is rejected, the worker must terminate. */
+ bool collect_packet( const long member_id, const int worker_id,
+ const char * const msg, const Packet::Status status,
+ const int errcode = 0 )
+ {
+ const Packet * const opacket = new Packet( member_id, msg, status, errcode );
+ xlock( &omutex );
+ if( ( mastership_granted() && master_worker_id != worker_id ) ||
+ ( error_member_id >= 0 && error_member_id < opacket->member_id ) )
+ { xunlock( &omutex ); delete opacket; return false; } // reject packet
+ while( opacket_queues[worker_id].size() >= out_slots )
+ xwait( &slot_av[worker_id], &omutex );
+ opacket_queues[worker_id].push( opacket );
+ if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit );
+ xunlock( &omutex );
+ return true;
+ }
+
+ /* Deliver a packet to muxer.
+ If packet.status == Packet::member_done, move to next queue.
+ If packet.line.empty(), wait again (empty lzip member). */
+ const Packet * deliver_packet()
+ {
+ const Packet * opacket = 0;
+ xlock( &omutex );
+ ++ocheck_counter;
+ while( true )
+ {
+ while( opacket_queues[deliver_worker_id].empty() && num_working > 0 )
+ {
+ ++owait_counter;
+ if( !mastership_granted() && error_member_id >= 0 )
+ xbroadcast( &check_master ); // mastership requested not yet granted
+ xwait( &oav_or_exit, &omutex );
+ }
+ if( opacket_queues[deliver_worker_id].empty() ) break;
+ opacket = opacket_queues[deliver_worker_id].front();
+ opacket_queues[deliver_worker_id].pop();
+ if( opacket_queues[deliver_worker_id].size() + 1 == out_slots )
+ xsignal( &slot_av[deliver_worker_id] );
+ if( opacket->status == Packet::member_done && !mastership_granted() )
+ { if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0; }
+ if( !opacket->line.empty() ) break;
+ delete opacket; opacket = 0;
+ }
+ xunlock( &omutex );
+ return opacket;
+ }
+
+ bool finished() // all packets delivered to muxer
+ {
+ if( num_working != 0 ) return false;
+ for( int i = 0; i < num_workers; ++i )
+ if( !opacket_queues[i].empty() ) return false;
+ return true;
+ }
+ };
+
+
+// prevent two threads from extracting the same file at the same time
+class Name_monitor
+ {
+ std::vector< unsigned > crc_vector;
+ std::vector< std::string > name_vector;
+ pthread_mutex_t mutex;
+
+public:
+ Name_monitor( const int num_workers )
+ : crc_vector( num_workers ), name_vector( num_workers )
+ { if( num_workers > 0 ) xinit_mutex( &mutex ); }
+
+ bool reserve_name( const unsigned worker_id, const std::string & filename )
+ {
+ // compare the CRCs of the names; compare the names if the CRCs collide
+ const unsigned crc =
+ crc32c.compute_crc( (const uint8_t *)filename.c_str(), filename.size() );
+ xlock( &mutex );
+ for( unsigned i = 0; i < crc_vector.size(); ++i )
+ if( crc_vector[i] == crc && crc != 0 && i != worker_id &&
+ name_vector[i] == filename )
+ { xunlock( &mutex ); return false; } // filename already reserved
+ crc_vector[worker_id] = crc; name_vector[worker_id] = filename;
+ xunlock( &mutex );
+ return true;
+ }
+ };
+
+
+struct Trival // triple result value
+ {
+ const char * msg;
+ int errcode;
+ int retval;
+ explicit Trival( const char * const s = 0, const int e = 0, const int r = 0 )
+ : msg( s ), errcode( e ), retval( r ) {}
+ };
+
+
+Trival skip_member_lz( Archive_reader_i & ar, Packet_courier & courier,
+ const Extended & extended, const long member_id,
+ const int worker_id, const Typeflag typeflag )
+ {
+ if( data_may_follow( typeflag ) )
+ { const int ret = ar.skip_member( extended );
+ if( ret != 0 ) return Trival( ar.e_msg(), ar.e_code(), ret ); }
+ if( ar.at_member_end() &&
+ !courier.collect_packet( member_id, worker_id, "", Packet::member_done ) )
+ return Trival( other_msg, 0, 1);
+ return Trival();
+ }
+
+
+Trival compare_member_lz( const Cl_options & cl_opts,
+ Archive_reader_i & ar, Packet_courier & courier,
+ const Extended & extended, const Tar_header header,
+ Resizable_buffer & rbuf, const long member_id,
+ const int worker_id )
+ {
+ if( verbosity < 1 ) rbuf()[0] = 0;
+ else if( !format_member_name( extended, header, rbuf, verbosity > 1 ) )
+ return Trival( mem_msg, 0, 1 );
+ std::string estr, ostr;
+ const bool stat_differs =
+ !compare_file_type( estr, ostr, cl_opts, extended, header );
+ if( ( rbuf()[0] && !courier.collect_packet( member_id, worker_id, rbuf(),
+ Packet::ok ) ) ||
+ ( estr.size() && !courier.collect_packet( member_id, worker_id,
+ estr.c_str(), Packet::diag ) ) ||
+ ( ostr.size() && !courier.collect_packet( member_id, worker_id,
+ ostr.c_str(), Packet::ok ) ) ||
+ ( extended.file_size() <= 0 && ar.at_member_end() &&
+ !courier.collect_packet( member_id, worker_id, "", Packet::member_done ) ) )
+ return Trival( other_msg, 0, 1 );
+ if( extended.file_size() <= 0 ) return Trival();
+ const Typeflag typeflag = (Typeflag)header[typeflag_o];
+ if( ( typeflag != tf_regular && typeflag != tf_hiperf ) || stat_differs )
+ return skip_member_lz( ar, courier, extended, member_id, worker_id, typeflag );
+ // else compare file contents
+ const char * const filename = extended.path().c_str();
+ const int infd2 = open_instream( filename );
+ if( infd2 < 0 ) { set_error_status( 1 );
+ return skip_member_lz( ar, courier, extended, member_id, worker_id, typeflag ); }
+ const int ret = compare_file_contents( estr, ostr, ar, extended.file_size(),
+ filename, infd2 );
+ if( ret != 0 ) return Trival( ar.e_msg(), ar.e_code(), ret );
+ if( ( estr.size() && !courier.collect_packet( member_id, worker_id,
+ estr.c_str(), Packet::diag ) ) ||
+ ( ostr.size() && !courier.collect_packet( member_id, worker_id,
+ ostr.c_str(), Packet::ok ) ) ||
+ ( ar.at_member_end() &&
+ !courier.collect_packet( member_id, worker_id, "", Packet::member_done ) ) )
+ return Trival( other_msg, 0, 1 );
+ return Trival();
+ }
+
+
+Trival list_member_lz( Archive_reader_i & ar, Packet_courier & courier,
+ const Extended & extended, const Tar_header header,
+ Resizable_buffer & rbuf, const long member_id,
+ const int worker_id )
+ {
+ if( verbosity < 0 ) rbuf()[0] = 0;
+ else if( !format_member_name( extended, header, rbuf, verbosity > 0 ) )
+ return Trival( mem_msg, 0, 1 );
+ const int ret = data_may_follow( (Typeflag)header[typeflag_o] ) ?
+ ar.skip_member( extended ) : 0; // print name even on read error
+ if( !courier.collect_packet( member_id, worker_id, rbuf(),
+ ar.at_member_end() ? Packet::member_done : Packet::ok ) )
+ return Trival( other_msg, 0, 1 );
+ if( ret != 0 ) return Trival( ar.e_msg(), ar.e_code(), ret );
+ return Trival();
+ }
+
+
+Trival extract_member_lz( const Cl_options & cl_opts,
+ Archive_reader_i & ar, Packet_courier & courier,
+ const Extended & extended, const Tar_header header,
+ Resizable_buffer & rbuf, const long member_id,
+ const int worker_id, Name_monitor & name_monitor )
+ {
+ const char * const filename = extended.path().c_str();
+ const Typeflag typeflag = (Typeflag)header[typeflag_o];
+ if( contains_dotdot( filename ) )
+ {
+ if( format_file_error( rbuf, filename, dotdot_msg ) &&
+ !courier.collect_packet( member_id, worker_id, rbuf(), Packet::diag ) )
+ return Trival( other_msg, 0, 1 );
+ return skip_member_lz( ar, courier, extended, member_id, worker_id, typeflag );
+ }
+ // skip member if another copy is already being extracted by another thread
+ if( !name_monitor.reserve_name( worker_id, extended.path() ) )
+ {
+ if( verbosity >= 3 && format_file_error( rbuf, filename,
+ "Is being extracted by another thread, skipping." ) &&
+ !courier.collect_packet( member_id, worker_id, rbuf(), Packet::diag ) )
+ return Trival( other_msg, 0, 1 );
+ return skip_member_lz( ar, courier, extended, member_id, worker_id, typeflag );
+ }
+ mode_t mode = parse_octal( header + mode_o, mode_l ); // 12 bits
+ if( geteuid() != 0 && !cl_opts.preserve_permissions ) mode &= ~get_umask();
+ int outfd = -1;
+
+ if( verbosity >= 1 )
+ {
+ if( !format_member_name( extended, header, rbuf, verbosity > 1 ) )
+ return Trival( mem_msg, 0, 1 );
+ if( !courier.collect_packet( member_id, worker_id, rbuf(), Packet::ok ) )
+ return Trival( other_msg, 0, 1 );
+ }
+ /* Remove file before extraction to prevent following links.
+ Don't remove an empty dir because other thread may need it. */
+ if( typeflag != tf_directory ) std::remove( filename );
+ if( !make_dirs( filename ) )
+ {
+ if( format_file_error( rbuf, filename, intdir_msg, errno ) &&
+ !courier.collect_packet( member_id, worker_id, rbuf(), Packet::diag ) )
+ return Trival( other_msg, 0, 1 );
+ set_error_status( 1 );
+ return skip_member_lz( ar, courier, extended, member_id, worker_id, typeflag );
+ }
+
+ switch( typeflag )
+ {
+ case tf_regular:
+ case tf_hiperf:
+ outfd = open_outstream( filename, true, &rbuf );
+ if( outfd < 0 )
+ {
+ if( verbosity >= 0 &&
+ !courier.collect_packet( member_id, worker_id, rbuf(), Packet::diag ) )
+ return Trival( other_msg, 0, 1 );
+ set_error_status( 1 );
+ return skip_member_lz( ar, courier, extended, member_id, worker_id,
+ typeflag );
+ }
+ break;
+ case tf_link:
+ case tf_symlink:
+ {
+ const char * const linkname = extended.linkpath().c_str();
+ const bool hard = typeflag == tf_link;
+ if( ( hard && link( linkname, filename ) != 0 ) ||
+ ( !hard && symlink( linkname, filename ) != 0 ) )
+ {
+ if( format_error( rbuf, errno, cantln_msg, hard ? "" : "sym",
+ linkname, filename ) &&
+ !courier.collect_packet( member_id, worker_id, rbuf(), Packet::diag ) )
+ return Trival( other_msg, 0, 1 );
+ set_error_status( 1 );
+ }
+ } break;
+ case tf_directory:
+ {
+ struct stat st;
+ bool exists = ( stat( filename, &st ) == 0 );
+ if( exists && !S_ISDIR( st.st_mode ) )
+ { exists = false; std::remove( filename ); }
+ if( !exists && mkdir( filename, mode ) != 0 && errno != EEXIST )
+ {
+ if( format_file_error( rbuf, filename, mkdir_msg, errno ) &&
+ !courier.collect_packet( member_id, worker_id, rbuf(), Packet::diag ) )
+ return Trival( other_msg, 0, 1 );
+ set_error_status( 1 );
+ }
+ } break;
+ case tf_chardev:
+ case tf_blockdev:
+ {
+ const unsigned dev =
+ makedev( parse_octal( header + devmajor_o, devmajor_l ),
+ parse_octal( header + devminor_o, devminor_l ) );
+ const int dmode = ( typeflag == tf_chardev ? S_IFCHR : S_IFBLK ) | mode;
+ if( mknod( filename, dmode, dev ) != 0 )
+ {
+ if( format_file_error( rbuf, filename, mknod_msg, errno ) &&
+ !courier.collect_packet( member_id, worker_id, rbuf(), Packet::diag ) )
+ return Trival( other_msg, 0, 1 );
+ set_error_status( 1 );
+ }
+ break;
+ }
+ case tf_fifo:
+ if( mkfifo( filename, mode ) != 0 )
+ {
+ if( format_file_error( rbuf, filename, mkfifo_msg, errno ) &&
+ !courier.collect_packet( member_id, worker_id, rbuf(), Packet::diag ) )
+ return Trival( other_msg, 0, 1 );
+ set_error_status( 1 );
+ }
+ break;
+ default:
+ if( format_error( rbuf, 0, uftype_msg, filename, typeflag ) &&
+ !courier.collect_packet( member_id, worker_id, rbuf(), Packet::diag ) )
+ return Trival( other_msg, 0, 1 );
+ set_error_status( 2 );
+ return skip_member_lz( ar, courier, extended, member_id, worker_id,
+ typeflag );
+ }
+
+ const bool islink = ( typeflag == tf_link || typeflag == tf_symlink );
+ errno = 0;
+ if( !islink &&
+ ( !uid_gid_in_range( extended.get_uid(), extended.get_gid() ) ||
+ chown( filename, extended.get_uid(), extended.get_gid() ) != 0 ) )
+ {
+ if( outfd >= 0 ) mode &= ~( S_ISUID | S_ISGID | S_ISVTX );
+ // chown in many cases returns with EPERM, which can be safely ignored.
+ if( errno != EPERM && errno != EINVAL )
+ {
+ if( format_file_error( rbuf, filename, chown_msg, errno ) &&
+ !courier.collect_packet( member_id, worker_id, rbuf(), Packet::diag ) )
+ return Trival( other_msg, 0, 1 );
+ set_error_status( 1 );
+ }
+ }
+
+ if( outfd >= 0 ) fchmod( outfd, mode ); // ignore errors
+
+ if( data_may_follow( typeflag ) )
+ {
+ const int bufsize = 32 * header_size;
+ uint8_t buf[bufsize];
+ long long rest = extended.file_size();
+ const int rem = rest % header_size;
+ const int padding = rem ? header_size - rem : 0;
+ while( rest > 0 )
+ {
+ const int rsize = ( rest >= bufsize ) ? bufsize : rest + padding;
+ const int ret = ar.read( buf, rsize );
+ if( ret != 0 )
+ {
+ if( outfd >= 0 )
+ {
+ if( cl_opts.keep_damaged )
+ { writeblock( outfd, buf, std::min( rest, (long long)ar.e_size() ) );
+ close( outfd ); }
+ else { close( outfd ); std::remove( filename ); }
+ }
+ return Trival( ar.e_msg(), ar.e_code(), ret );
+ }
+ const int wsize = ( rest >= bufsize ) ? bufsize : rest;
+ if( outfd >= 0 && writeblock( outfd, buf, wsize ) != wsize )
+ { format_file_error( rbuf, filename, werr_msg, errno );
+ return Trival( rbuf(), 0, 1 ); }
+ rest -= wsize;
+ }
+ }
+ if( outfd >= 0 && close( outfd ) != 0 )
+ { format_file_error( rbuf, filename, eclosf_msg, errno );
+ return Trival( rbuf(), 0, 1 ); }
+ if( !islink )
+ {
+ struct utimbuf t;
+ t.actime = extended.atime().sec();
+ t.modtime = extended.mtime().sec();
+ utime( filename, &t ); // ignore errors
+ }
+ if( ar.at_member_end() &&
+ !courier.collect_packet( member_id, worker_id, "", Packet::member_done ) )
+ return Trival( other_msg, 0, 1 );
+ return Trival();
+ }
+
+
+struct Worker_arg
+ {
+ const Cl_options * cl_opts;
+ const Archive_descriptor * ad;
+ Packet_courier * courier;
+ Name_monitor * name_monitor;
+ std::vector< char > * name_pending;
+ int worker_id;
+ int num_workers;
+ };
+
+
+/* Read lzip members from archive, decode their tar members, and give the
+ packets produced to courier.
+*/
+extern "C" void * dworker( void * arg )
+ {
+ const Worker_arg & tmp = *(const Worker_arg *)arg;
+ const Cl_options & cl_opts = *tmp.cl_opts;
+ const Archive_descriptor & ad = *tmp.ad;
+ Packet_courier & courier = *tmp.courier;
+ Name_monitor & name_monitor = *tmp.name_monitor;
+ std::vector< char > & name_pending = *tmp.name_pending;
+ const int worker_id = tmp.worker_id;
+ const int num_workers = tmp.num_workers;
+
+ bool master = false;
+ Resizable_buffer rbuf;
+ Archive_reader_i ar( ad ); // 1 of N parallel readers
+ if( !rbuf.size() || ar.fatal() )
+ { if( courier.request_mastership( worker_id, worker_id ) )
+ courier.collect_packet( worker_id, worker_id, mem_msg, Packet::error1 );
+ goto done; }
+
+ for( long i = worker_id; !master && i < ad.lzip_index.members(); i += num_workers )
+ {
+ if( ad.lzip_index.dblock( i ).size() <= 0 ) // empty lzip member
+ {
+ if( courier.collect_packet( i, worker_id, "", Packet::member_done ) )
+ continue; else break;
+ }
+
+ long long data_end = ad.lzip_index.dblock( i ).end();
+ Extended extended; // metadata from extended records
+ bool prev_extended = false; // prev header was extended
+ ar.set_member( i ); // prepare for new member
+ while( true ) // process one tar header per iteration
+ {
+ if( ar.data_pos() >= data_end ) // dblock.end or udata_size
+ {
+ if( ar.data_pos() == data_end && !prev_extended ) break;
+ // member end exceeded or ends in extended, process rest of file
+ if( !courier.request_mastership( i, worker_id ) ) goto done;
+ master = true;
+ if( data_end >= ad.lzip_index.udata_size() )
+ { courier.collect_packet( i, worker_id, end_msg, Packet::error2 );
+ goto done; }
+ data_end = ad.lzip_index.udata_size();
+ if( ar.data_pos() == data_end && !prev_extended ) break;
+ }
+ Tar_header header;
+ const int ret = ar.read( header, header_size );
+ if( ret != 0 )
+ { if( courier.request_mastership( i, worker_id ) )
+ courier.collect_packet( i, worker_id, ar.e_msg(),
+ ( ret == 1 ) ? Packet::error1 : Packet::error2, ar.e_code() );
+ goto done; }
+ if( !check_ustar_chksum( header ) ) // error or EOA
+ {
+ if( !courier.request_mastership( i, worker_id ) ) goto done;
+ if( block_is_zero( header, header_size ) ) // EOA
+ {
+ if( !prev_extended || cl_opts.permissive ) courier.report_eoa();
+ else courier.collect_packet( i, worker_id, fv_msg1, Packet::error2 );
+ goto done;
+ }
+ courier.collect_packet( i, worker_id, ( ar.data_pos() > header_size ) ?
+ bad_hdr_msg : posix_lz_msg, Packet::error2 );
+ goto done;
+ }
+
+ const Typeflag typeflag = (Typeflag)header[typeflag_o];
+ if( typeflag == tf_global )
+ {
+ const char * msg = 0; int ret = 2;
+ Extended dummy; // global headers are parsed and ignored
+ if( prev_extended && !cl_opts.permissive ) msg = fv_msg2;
+ else if( ( ret = ar.parse_records( dummy, header, rbuf, gblrec_msg,
+ true ) ) != 0 ) msg = ar.e_msg();
+ else
+ {
+ if( ar.data_pos() == data_end && // end of lzip member or EOF
+ !courier.collect_packet( i, worker_id, "", Packet::member_done ) )
+ goto done;
+ continue;
+ }
+ if( courier.request_mastership( i, worker_id ) )
+ courier.collect_packet( i, worker_id, msg, ( ret == 1 ) ?
+ Packet::error1 : Packet::error2 );
+ goto done;
+ }
+ if( typeflag == tf_extended )
+ {
+ const char * msg = 0; int ret = 2;
+ if( prev_extended && !cl_opts.permissive ) msg = fv_msg3;
+ else if( ( ret = ar.parse_records( extended, header, rbuf, extrec_msg,
+ cl_opts.permissive ) ) != 0 ) msg = ar.e_msg();
+ else if( !extended.crc_present() && cl_opts.missing_crc )
+ { msg = miscrc_msg; ret = 2; }
+ else { prev_extended = true; continue; }
+ if( courier.request_mastership( i, worker_id ) )
+ courier.collect_packet( i, worker_id, msg, ( ret == 1 ) ?
+ Packet::error1 : Packet::error2 );
+ goto done;
+ }
+ prev_extended = false;
+
+ extended.fill_from_ustar( header ); // copy metadata from header
+
+ /* Skip members with an empty name in the ustar header. If there is an
+ extended header in a previous lzip member, its worker will request
+ mastership. Else the ustar-only unnamed member will be ignored. */
+ Trival trival;
+ if( check_skip_filename( cl_opts, name_pending, extended.path().c_str() ) )
+ trival = skip_member_lz( ar, courier, extended, i, worker_id, typeflag );
+ else
+ {
+ std::string rpmsg;
+ if( print_removed_prefix( extended.removed_prefix, &rpmsg ) &&
+ !courier.collect_packet( i, worker_id, rpmsg.c_str(), Packet::prefix ) )
+ { trival = Trival( other_msg, 0, 1 ); goto fatal; }
+ if( cl_opts.program_mode == m_list )
+ trival = list_member_lz( ar, courier, extended, header, rbuf, i, worker_id );
+ else if( extended.path().empty() )
+ trival = skip_member_lz( ar, courier, extended, i, worker_id, typeflag );
+ else if( cl_opts.program_mode == m_diff )
+ trival = compare_member_lz( cl_opts, ar, courier, extended, header,
+ rbuf, i, worker_id );
+ else trival = extract_member_lz( cl_opts, ar, courier, extended, header,
+ rbuf, i, worker_id, name_monitor );
+ }
+ if( trival.retval ) // fatal error
+fatal: { if( courier.request_mastership( i, worker_id ) )
+ courier.collect_packet( i, worker_id, trival.msg,
+ ( trival.retval == 1 ) ? Packet::error1 : Packet::error2,
+ trival.errcode );
+ goto done; }
+ extended.reset();
+ }
+ }
+done:
+ courier.worker_finished();
+ return 0;
+ }
+
+
+/* Get from courier the processed and sorted packets, and print
+ the member lines on stdout or the diagnostics and errors on stderr.
+*/
+void muxer( const char * const archive_namep, Packet_courier & courier )
+ {
+ int retval = 0;
+ while( retval == 0 )
+ {
+ const Packet * const opacket = courier.deliver_packet();
+ if( !opacket ) break; // queue is empty. all workers exited
+
+ switch( opacket->status )
+ {
+ case Packet::error1:
+ case Packet::error2:
+ show_file_error( archive_namep, opacket->line.c_str(), opacket->errcode );
+ retval = ( opacket->status == Packet::error1 ) ? 1 : 2; break;
+ case Packet::prefix: show_error( opacket->line.c_str() ); break;
+ case Packet::diag: std::fputs( opacket->line.c_str(), stderr ); break;
+ default: if( opacket->line.size() )
+ { std::fputs( opacket->line.c_str(), stdout ); std::fflush( stdout ); }
+ }
+ delete opacket;
+ }
+ if( retval == 0 && !courier.eoa_found() ) // no worker found EOA blocks
+ { show_file_error( archive_namep, end_msg ); retval = 2; }
+ if( retval ) exit_fail_mt( retval );
+ }
+
+} // end namespace
+
+
+// init the courier, then start the workers and call the muxer.
+int decode_lz( const Cl_options & cl_opts, const Archive_descriptor & ad,
+ std::vector< char > & name_pending )
+ {
+ const int out_slots = 65536; // max small files (<=512B) in 64 MiB
+ const int num_workers = // limited to number of members
+ std::min( (long)cl_opts.num_workers, ad.lzip_index.members() );
+ if( cl_opts.program_mode == m_extract ) get_umask(); // cache the umask
+ Name_monitor
+ name_monitor( ( cl_opts.program_mode == m_extract ) ? num_workers : 0 );
+
+ /* If an error happens after any threads have been started, exit must be
+ called before courier goes out of scope. */
+ Packet_courier courier( num_workers, out_slots );
+
+ Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers];
+ pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
+ if( !worker_args || !worker_threads ) { show_error( mem_msg ); return 1; }
+ for( int i = 0; i < num_workers; ++i )
+ {
+ worker_args[i].cl_opts = &cl_opts;
+ worker_args[i].ad = &ad;
+ worker_args[i].courier = &courier;
+ worker_args[i].name_monitor = &name_monitor;
+ worker_args[i].name_pending = &name_pending;
+ worker_args[i].worker_id = i;
+ worker_args[i].num_workers = num_workers;
+ const int errcode =
+ pthread_create( &worker_threads[i], 0, dworker, &worker_args[i] );
+ if( errcode )
+ { show_error( "Can't create worker threads", errcode ); exit_fail_mt(); }
+ }
+
+ muxer( ad.namep, courier );
+
+ for( int i = num_workers - 1; i >= 0; --i )
+ {
+ const int errcode = pthread_join( worker_threads[i], 0 );
+ if( errcode )
+ { show_error( "Can't join worker threads", errcode ); exit_fail_mt(); }
+ }
+ delete[] worker_threads;
+ delete[] worker_args;
+
+ int retval = 0;
+ if( close( ad.infd ) != 0 )
+ { show_file_error( ad.namep, eclosa_msg, errno ); retval = 1; }
+
+ if( retval == 0 )
+ for( int i = 0; i < cl_opts.parser.arguments(); ++i )
+ if( nonempty_arg( cl_opts.parser, i ) && name_pending[i] )
+ { show_file_error( cl_opts.parser.argument( i ).c_str(), nfound_msg );
+ retval = 1; }
+
+ if( cl_opts.debug_level & 1 )
+ std::fprintf( stderr,
+ "muxer tried to consume from workers %8u times\n"
+ "muxer had to wait %8u times\n",
+ courier.ocheck_counter,
+ courier.owait_counter );
+
+ if( !courier.finished() ) internal_error( conofin_msg );
+ return final_exit_status( retval, cl_opts.program_mode != m_diff );
+ }