summaryrefslogtreecommitdiffstats
path: root/decode_lz.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--decode_lz.cc536
1 files changed, 536 insertions, 0 deletions
diff --git a/decode_lz.cc b/decode_lz.cc
new file mode 100644
index 0000000..71c699b
--- /dev/null
+++ b/decode_lz.cc
@@ -0,0 +1,536 @@
+/* Tarlz - Archiver with multimember lzip compression
+ Copyright (C) 2013-2020 Antonio Diaz Diaz.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#define _FILE_OFFSET_BITS 64
+
+#include <algorithm>
+#include <cerrno>
+#include <climits>
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <queue>
+#include <string>
+#include <vector>
+#include <pthread.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <sys/stat.h>
+#include <lzlib.h>
+
+#include "arg_parser.h"
+#include "tarlz.h"
+#include "lzip_index.h"
+#include "archive_reader.h"
+
+/* When a problem is detected by any worker:
+ - the worker requests mastership and returns.
+ - the courier discards new packets received or collected.
+ - the other workers return.
+ - the muxer drains the queue and returns. */
+
+/* Returns the number of bytes really read.
+ If (returned value < size) and (errno == 0), means EOF was reached.
+*/
+int preadblock( const int fd, uint8_t * const buf, const int size,
+ const long long pos )
+ {
+ int sz = 0;
+ errno = 0;
+ while( sz < size )
+ {
+ const int n = pread( fd, buf + sz, size - sz, pos + sz );
+ if( n > 0 ) sz += n;
+ else if( n == 0 ) break; // EOF
+ else if( errno != EINTR ) break;
+ errno = 0;
+ }
+ return sz;
+ }
+
+
+namespace {
+
+/* Returns the number of bytes really written.
+ If (returned value < size), it is always an error.
+*//*
+int pwriteblock( const int fd, const uint8_t * const buf, const int size,
+ const long long pos )
+ {
+ int sz = 0;
+ errno = 0;
+ while( sz < size )
+ {
+ const int n = pwrite( fd, buf + sz, size - sz, pos + sz );
+ if( n > 0 ) sz += n;
+ else if( n < 0 && errno != EINTR ) break;
+ errno = 0;
+ }
+ return sz;
+ }
+*/
+
+const char * const other_msg = "Other worker found an error.";
+
+struct Packet // member name and metadata or error message
+ {
+ enum Status { ok, member_done, diag, error };
+ long member_id; // lzip member containing the header of this tar member
+ std::string line; // member name and metadata ready to print, if any
+ Status status; // diagnostics and errors go to stderr
+ Packet( const long i, const char * const msg, const Status s = ok )
+ : member_id( i ), line( msg ), status( s ) {}
+ };
+
+
+class Packet_courier // moves packets around
+ {
+public:
+ unsigned ocheck_counter;
+ unsigned owait_counter;
+private:
+ long error_member_id; // first lzip member with error/misalign/eof
+ int deliver_worker_id; // worker queue currently delivering packets
+ int master_worker_id; // worker in charge if error/misalignment/eof
+ std::vector< std::queue< const Packet * > > opacket_queues;
+ int num_working; // number of workers still running
+ const int num_workers; // number of workers
+ const unsigned out_slots; // max output packets per queue
+ pthread_mutex_t omutex;
+ pthread_cond_t oav_or_exit; // output packet available or all workers exited
+ std::vector< pthread_cond_t > slot_av; // output slot available
+ pthread_cond_t check_master;
+ bool eof_found_;
+
+ Packet_courier( const Packet_courier & ); // declared as private
+ void operator=( const Packet_courier & ); // declared as private
+
+public:
+ Packet_courier( const int workers, const int slots )
+ : ocheck_counter( 0 ), owait_counter( 0 ),
+ error_member_id( -1 ), deliver_worker_id( 0 ), master_worker_id( -1 ),
+ opacket_queues( workers ), num_working( workers ),
+ num_workers( workers ), out_slots( slots ), slot_av( workers ),
+ eof_found_( false )
+ {
+ xinit_mutex( &omutex ); xinit_cond( &oav_or_exit );
+ for( unsigned i = 0; i < slot_av.size(); ++i ) xinit_cond( &slot_av[i] );
+ xinit_cond( &check_master );
+ }
+
+ ~Packet_courier()
+ {
+ xdestroy_cond( &check_master );
+ for( unsigned i = 0; i < slot_av.size(); ++i ) xdestroy_cond( &slot_av[i] );
+ xdestroy_cond( &oav_or_exit ); xdestroy_mutex( &omutex );
+ }
+
+ bool eof_found() const { return eof_found_; }
+ void report_eof() { eof_found_ = true; }
+
+ bool mastership_granted() const { return master_worker_id >= 0; }
+
+ bool request_mastership( const long member_id, const int worker_id )
+ {
+ xlock( &omutex );
+ if( mastership_granted() ) // already granted
+ { xunlock( &omutex ); return ( master_worker_id == worker_id ); }
+ if( error_member_id < 0 || error_member_id > member_id )
+ error_member_id = member_id;
+ while( !mastership_granted() && ( worker_id != deliver_worker_id ||
+ !opacket_queues[deliver_worker_id].empty() ) )
+ xwait( &check_master, &omutex );
+ if( !mastership_granted() && worker_id == deliver_worker_id &&
+ opacket_queues[deliver_worker_id].empty() )
+ {
+ master_worker_id = worker_id; // grant mastership
+ for( int i = 0; i < num_workers; ++i ) // delete all packets
+ while( !opacket_queues[i].empty() )
+ opacket_queues[i].pop();
+ xbroadcast( &check_master );
+ xunlock( &omutex );
+ return true;
+ }
+ xunlock( &omutex );
+ return false; // mastership granted to another worker
+ }
+
+ void worker_finished()
+ {
+ // notify muxer when last worker exits
+ xlock( &omutex );
+ if( --num_working == 0 ) xsignal( &oav_or_exit );
+ xunlock( &omutex );
+ }
+
+ /* Collect a packet from a worker.
+ If a packet is rejected, the worker must terminate. */
+ bool collect_packet( const long member_id, const int worker_id,
+ const char * const msg,
+ const Packet::Status status = Packet::ok )
+ {
+ const Packet * const opacket = new Packet( member_id, msg, status );
+ xlock( &omutex );
+ if( ( mastership_granted() && master_worker_id != worker_id ) ||
+ ( error_member_id >= 0 && error_member_id < opacket->member_id ) )
+ { xunlock( &omutex ); delete opacket; return false; } // reject packet
+ while( opacket_queues[worker_id].size() >= out_slots )
+ xwait( &slot_av[worker_id], &omutex );
+ opacket_queues[worker_id].push( opacket );
+ if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit );
+ xunlock( &omutex );
+ return true;
+ }
+
+ /* Deliver a packet to muxer.
+ If packet.status == Packet::member_done, move to next queue.
+ If packet.line.empty(), wait again (empty lzip member). */
+ const Packet * deliver_packet()
+ {
+ const Packet * opacket = 0;
+ xlock( &omutex );
+ ++ocheck_counter;
+ while( true )
+ {
+ while( opacket_queues[deliver_worker_id].empty() && num_working > 0 )
+ {
+ ++owait_counter;
+ if( !mastership_granted() && error_member_id >= 0 )
+ xbroadcast( &check_master ); // mastership requested not yet granted
+ xwait( &oav_or_exit, &omutex );
+ }
+ if( opacket_queues[deliver_worker_id].empty() ) break;
+ opacket = opacket_queues[deliver_worker_id].front();
+ opacket_queues[deliver_worker_id].pop();
+ if( opacket_queues[deliver_worker_id].size() + 1 == out_slots )
+ xsignal( &slot_av[deliver_worker_id] );
+ if( opacket->status == Packet::member_done && !mastership_granted() )
+ { if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0; }
+ if( !opacket->line.empty() ) break;
+ delete opacket; opacket = 0;
+ }
+ xunlock( &omutex );
+ return opacket;
+ }
+
+ bool finished() // all packets delivered to muxer
+ {
+ if( num_working != 0 ) return false;
+ for( int i = 0; i < num_workers; ++i )
+ if( !opacket_queues[i].empty() ) return false;
+ return true;
+ }
+ };
+
+
+const char * skip_member_lz( Archive_reader_i & ar, Packet_courier & courier,
+ const Extended & extended, const long member_id,
+ const int worker_id )
+ {
+ if( ar.skip_member( extended ) != 0 ) return ar.e_msg();
+ if( !courier.collect_packet( member_id, worker_id, "",
+ ar.at_member_end() ? Packet::member_done : Packet::ok ) )
+ return other_msg;
+ return 0;
+ }
+
+
+const char * compare_member_lz( const Cl_options & cl_opts,
+ Archive_reader_i & ar, Packet_courier & courier,
+ const Extended & extended, const Tar_header header,
+ Resizable_buffer & rbuf, const long member_id,
+ const int worker_id )
+ {
+ if( verbosity < 1 ) rbuf()[0] = 0;
+ else if( !format_member_name( extended, header, rbuf, verbosity > 1 ) )
+ return mem_msg;
+ std::string estr, ostr;
+ const bool stat_differs =
+ !compare_file_type( estr, ostr, cl_opts, extended, header );
+ if( ( rbuf()[0] && !courier.collect_packet( member_id, worker_id, rbuf(),
+ Packet::ok ) ) ||
+ ( estr.size() && !courier.collect_packet( member_id, worker_id,
+ estr.c_str(), Packet::diag ) ) ||
+ ( ostr.size() && !courier.collect_packet( member_id, worker_id,
+ ostr.c_str(), Packet::ok ) ) ||
+ ( extended.file_size() <= 0 && ar.at_member_end() &&
+ !courier.collect_packet( member_id, worker_id, "", Packet::member_done ) ) )
+ return other_msg;
+ if( extended.file_size() <= 0 ) return 0;
+ const Typeflag typeflag = (Typeflag)header[typeflag_o];
+ if( ( typeflag != tf_regular && typeflag != tf_hiperf ) || stat_differs )
+ return skip_member_lz( ar, courier, extended, member_id, worker_id );
+ // else compare file contents
+ const char * const filename = extended.path().c_str();
+ const int infd2 = open_instream( filename );
+ if( infd2 < 0 ) { set_error_status( 1 );
+ return skip_member_lz( ar, courier, extended, member_id, worker_id ); }
+ int retval = compare_file_contents( estr, ostr, ar, extended.file_size(),
+ filename, infd2 );
+ if( retval ) return ar.e_msg();
+ if( ( estr.size() && !courier.collect_packet( member_id, worker_id,
+ estr.c_str(), Packet::diag ) ) ||
+ ( ostr.size() && !courier.collect_packet( member_id, worker_id,
+ ostr.c_str(), Packet::ok ) ) ||
+ ( ar.at_member_end() &&
+ !courier.collect_packet( member_id, worker_id, "", Packet::member_done ) ) )
+ return other_msg;
+ return 0;
+ }
+
+
+const char * list_member_lz( Archive_reader_i & ar, Packet_courier & courier,
+ const Extended & extended, const Tar_header header,
+ Resizable_buffer & rbuf, const long member_id,
+ const int worker_id )
+ {
+ if( verbosity < 0 ) rbuf()[0] = 0;
+ else if( !format_member_name( extended, header, rbuf, verbosity > 0 ) )
+ return mem_msg;
+ const int ret = ar.skip_member( extended ); // print name even on error
+ if( !courier.collect_packet( member_id, worker_id, rbuf(),
+ ar.at_member_end() ? Packet::member_done : Packet::ok ) )
+ return other_msg;
+ if( ret != 0 ) return ar.e_msg();
+ return 0;
+ }
+
+
+struct Worker_arg
+ {
+ const Cl_options * cl_opts;
+ const Archive_descriptor * ad;
+ Packet_courier * courier;
+ std::vector< char > * name_pending;
+ int worker_id;
+ int num_workers;
+ };
+
+
+/* Read lzip members from archive, decode their tar members, and give the
+ packets produced to courier.
+*/
+extern "C" void * dworker( void * arg )
+ {
+ const Worker_arg & tmp = *(const Worker_arg *)arg;
+ const Cl_options & cl_opts = *tmp.cl_opts;
+ const Archive_descriptor & ad = *tmp.ad;
+ Packet_courier & courier = *tmp.courier;
+ std::vector< char > & name_pending = *tmp.name_pending;
+ const int worker_id = tmp.worker_id;
+ const int num_workers = tmp.num_workers;
+
+ bool master = false;
+ Resizable_buffer rbuf;
+ Archive_reader_i ar( ad ); // 1 of N parallel readers
+ if( !rbuf.size() || ar.fatal() )
+ { if( courier.request_mastership( worker_id, worker_id ) )
+ courier.collect_packet( worker_id, worker_id, mem_msg, Packet::error );
+ goto done; }
+
+ for( long i = worker_id; !master && i < ad.lzip_index.members(); i += num_workers )
+ {
+ if( ad.lzip_index.dblock( i ).size() <= 0 ) // empty lzip member
+ {
+ if( courier.collect_packet( i, worker_id, "", Packet::member_done ) )
+ continue; else break;
+ }
+
+ long long data_end = ad.lzip_index.dblock( i ).end();
+ Extended extended; // metadata from extended records
+ bool prev_extended = false; // prev header was extended
+ ar.set_member( i ); // prepare for new member
+ while( true ) // process one tar header per iteration
+ {
+ if( ar.data_pos() >= data_end ) // dblock.end or udata_size
+ {
+ if( ar.data_pos() == data_end && !prev_extended ) break;
+ // member end exceeded or ends in extended, process rest of file
+ if( !courier.request_mastership( i, worker_id ) ) goto done;
+ master = true;
+ if( data_end >= ad.lzip_index.udata_size() )
+ { courier.collect_packet( i, worker_id, end_msg, Packet::error );
+ goto done; }
+ data_end = ad.lzip_index.udata_size();
+ if( ar.data_pos() == data_end && !prev_extended ) break;
+ }
+ Tar_header header;
+ const int ret = ar.read( header, header_size );
+ if( ret != 0 )
+ { if( courier.request_mastership( i, worker_id ) )
+ courier.collect_packet( i, worker_id, ar.e_msg(), Packet::error );
+ goto done; }
+ if( !verify_ustar_chksum( header ) )
+ {
+ if( !courier.request_mastership( i, worker_id ) ) goto done;
+ if( block_is_zero( header, header_size ) ) // EOF
+ {
+ if( !prev_extended || cl_opts.permissive ) courier.report_eof();
+ else courier.collect_packet( i, worker_id, fv_msg1, Packet::error );
+ goto done;
+ }
+ courier.collect_packet( i, worker_id, ( ar.data_pos() > header_size ) ?
+ bad_hdr_msg : posix_lz_msg, Packet::error );
+ goto done;
+ }
+
+ const Typeflag typeflag = (Typeflag)header[typeflag_o];
+ if( typeflag == tf_global )
+ {
+ const char * msg = 0;
+ Extended dummy; // global headers are parsed and ignored
+ if( prev_extended && !cl_opts.permissive ) msg = fv_msg2;
+ else if( ar.parse_records( dummy, header, rbuf, true ) != 0 )
+ msg = gblrec_msg;
+ else
+ {
+ if( ar.data_pos() == data_end && // end of lzip member or EOF
+ !courier.collect_packet( i, worker_id, "", Packet::member_done ) )
+ goto done;
+ continue;
+ }
+ if( courier.request_mastership( i, worker_id ) )
+ courier.collect_packet( i, worker_id, msg, Packet::error );
+ goto done;
+ }
+ if( typeflag == tf_extended )
+ {
+ const char * msg = 0;
+ if( prev_extended && !cl_opts.permissive ) msg = fv_msg3;
+ else if( ar.parse_records( extended, header, rbuf,
+ cl_opts.permissive ) != 0 ) msg = extrec_msg;
+ else if( !extended.crc_present() && cl_opts.missing_crc )
+ msg = mcrc_msg;
+ else { prev_extended = true; continue; }
+ if( courier.request_mastership( i, worker_id ) )
+ courier.collect_packet( i, worker_id, msg, Packet::error );
+ goto done;
+ }
+ prev_extended = false;
+
+ extended.fill_from_ustar( header ); // copy metadata from header
+
+ const char * msg;
+ if( check_skip_filename( cl_opts, name_pending, extended.path().c_str() ) )
+ msg = skip_member_lz( ar, courier, extended, i, worker_id );
+ else if( cl_opts.program_mode == m_list )
+ msg = list_member_lz( ar, courier, extended, header, rbuf, i, worker_id );
+ else msg = compare_member_lz( cl_opts, ar, courier, extended, header,
+ rbuf, i, worker_id );
+ if( msg )
+ { if( courier.request_mastership( i, worker_id ) )
+ courier.collect_packet( i, worker_id, msg, Packet::error );
+ goto done; }
+ extended.reset();
+ }
+ }
+done:
+ courier.worker_finished();
+ return 0;
+ }
+
+
+/* Get from courier the processed and sorted packets, and print
+ the member lines on stdout or the diagnostics and errors on stderr.
+*/
+void muxer( const char * const archive_namep, Packet_courier & courier )
+ {
+ bool error = false;
+ while( !error )
+ {
+ const Packet * const opacket = courier.deliver_packet();
+ if( !opacket ) break; // queue is empty. all workers exited
+
+ switch( opacket->status )
+ {
+ case Packet::error:
+ show_file_error( archive_namep, opacket->line.c_str() );
+ error = true; break;
+ case Packet::diag: std::fputs( opacket->line.c_str(), stderr ); break;
+ default: if( opacket->line.size() )
+ { std::fputs( opacket->line.c_str(), stdout ); std::fflush( stdout ); }
+ }
+ delete opacket;
+ }
+ if( !error && !courier.eof_found() ) // no worker found EOF blocks
+ { show_file_error( archive_namep, end_msg ); error = true; }
+ if( error ) cleanup_and_fail( 2 );
+ }
+
+} // end namespace
+
+
+// init the courier, then start the workers and call the muxer.
+int decode_lz( const Cl_options & cl_opts, const Archive_descriptor & ad,
+ std::vector< char > & name_pending )
+ {
+ const int out_slots = 65536; // max small files (<=512B) in 64 MiB
+ const int num_workers = // limited to number of members
+ std::min( (long)cl_opts.num_workers, ad.lzip_index.members() );
+
+ /* If an error happens after any threads have been started, exit must be
+ called before courier goes out of scope. */
+ Packet_courier courier( num_workers, out_slots );
+
+ Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers];
+ pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
+ if( !worker_args || !worker_threads ) { show_error( mem_msg ); return 1; }
+ for( int i = 0; i < num_workers; ++i )
+ {
+ worker_args[i].cl_opts = &cl_opts;
+ worker_args[i].ad = &ad;
+ worker_args[i].courier = &courier;
+ worker_args[i].name_pending = &name_pending;
+ worker_args[i].worker_id = i;
+ worker_args[i].num_workers = num_workers;
+ const int errcode =
+ pthread_create( &worker_threads[i], 0, dworker, &worker_args[i] );
+ if( errcode )
+ { show_error( "Can't create worker threads", errcode ); cleanup_and_fail(); }
+ }
+
+ muxer( ad.namep, courier );
+
+ for( int i = num_workers - 1; i >= 0; --i )
+ {
+ const int errcode = pthread_join( worker_threads[i], 0 );
+ if( errcode )
+ { show_error( "Can't join worker threads", errcode ); cleanup_and_fail(); }
+ }
+ delete[] worker_threads;
+ delete[] worker_args;
+
+ int retval = 0;
+ if( close( ad.infd ) != 0 )
+ { show_file_error( ad.namep, "Error closing archive", errno ); retval = 1; }
+
+ if( retval == 0 )
+ for( int i = 0; i < cl_opts.parser.arguments(); ++i )
+ if( nonempty_arg( cl_opts.parser, i ) && name_pending[i] )
+ { show_file_error( cl_opts.parser.argument( i ).c_str(),
+ "Not found in archive." ); retval = 1; }
+
+ if( cl_opts.debug_level & 1 )
+ std::fprintf( stderr,
+ "muxer tried to consume from workers %8u times\n"
+ "muxer had to wait %8u times\n",
+ courier.ocheck_counter,
+ courier.owait_counter );
+
+ if( !courier.finished() ) internal_error( "courier not finished." );
+ return final_exit_status( retval, cl_opts.program_mode != m_diff );
+ }