diff options
Diffstat (limited to '')
-rw-r--r-- | decode_lz.cc | 536 |
1 files changed, 536 insertions, 0 deletions
diff --git a/decode_lz.cc b/decode_lz.cc new file mode 100644 index 0000000..71c699b --- /dev/null +++ b/decode_lz.cc @@ -0,0 +1,536 @@ +/* Tarlz - Archiver with multimember lzip compression + Copyright (C) 2013-2020 Antonio Diaz Diaz. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#define _FILE_OFFSET_BITS 64 + +#include <algorithm> +#include <cerrno> +#include <climits> +#include <cstdio> +#include <cstdlib> +#include <cstring> +#include <queue> +#include <string> +#include <vector> +#include <pthread.h> +#include <stdint.h> +#include <unistd.h> +#include <sys/stat.h> +#include <lzlib.h> + +#include "arg_parser.h" +#include "tarlz.h" +#include "lzip_index.h" +#include "archive_reader.h" + +/* When a problem is detected by any worker: + - the worker requests mastership and returns. + - the courier discards new packets received or collected. + - the other workers return. + - the muxer drains the queue and returns. */ + +/* Returns the number of bytes really read. + If (returned value < size) and (errno == 0), means EOF was reached. +*/ +int preadblock( const int fd, uint8_t * const buf, const int size, + const long long pos ) + { + int sz = 0; + errno = 0; + while( sz < size ) + { + const int n = pread( fd, buf + sz, size - sz, pos + sz ); + if( n > 0 ) sz += n; + else if( n == 0 ) break; // EOF + else if( errno != EINTR ) break; + errno = 0; + } + return sz; + } + + +namespace { + +/* Returns the number of bytes really written. + If (returned value < size), it is always an error. +*//* +int pwriteblock( const int fd, const uint8_t * const buf, const int size, + const long long pos ) + { + int sz = 0; + errno = 0; + while( sz < size ) + { + const int n = pwrite( fd, buf + sz, size - sz, pos + sz ); + if( n > 0 ) sz += n; + else if( n < 0 && errno != EINTR ) break; + errno = 0; + } + return sz; + } +*/ + +const char * const other_msg = "Other worker found an error."; + +struct Packet // member name and metadata or error message + { + enum Status { ok, member_done, diag, error }; + long member_id; // lzip member containing the header of this tar member + std::string line; // member name and metadata ready to print, if any + Status status; // diagnostics and errors go to stderr + Packet( const long i, const char * const msg, const Status s = ok ) + : member_id( i ), line( msg ), status( s ) {} + }; + + +class Packet_courier // moves packets around + { +public: + unsigned ocheck_counter; + unsigned owait_counter; +private: + long error_member_id; // first lzip member with error/misalign/eof + int deliver_worker_id; // worker queue currently delivering packets + int master_worker_id; // worker in charge if error/misalignment/eof + std::vector< std::queue< const Packet * > > opacket_queues; + int num_working; // number of workers still running + const int num_workers; // number of workers + const unsigned out_slots; // max output packets per queue + pthread_mutex_t omutex; + pthread_cond_t oav_or_exit; // output packet available or all workers exited + std::vector< pthread_cond_t > slot_av; // output slot available + pthread_cond_t check_master; + bool eof_found_; + + Packet_courier( const Packet_courier & ); // declared as private + void operator=( const Packet_courier & ); // declared as private + +public: + Packet_courier( const int workers, const int slots ) + : ocheck_counter( 0 ), owait_counter( 0 ), + error_member_id( -1 ), deliver_worker_id( 0 ), master_worker_id( -1 ), + opacket_queues( workers ), num_working( workers ), + num_workers( workers ), out_slots( slots ), slot_av( workers ), + eof_found_( false ) + { + xinit_mutex( &omutex ); xinit_cond( &oav_or_exit ); + for( unsigned i = 0; i < slot_av.size(); ++i ) xinit_cond( &slot_av[i] ); + xinit_cond( &check_master ); + } + + ~Packet_courier() + { + xdestroy_cond( &check_master ); + for( unsigned i = 0; i < slot_av.size(); ++i ) xdestroy_cond( &slot_av[i] ); + xdestroy_cond( &oav_or_exit ); xdestroy_mutex( &omutex ); + } + + bool eof_found() const { return eof_found_; } + void report_eof() { eof_found_ = true; } + + bool mastership_granted() const { return master_worker_id >= 0; } + + bool request_mastership( const long member_id, const int worker_id ) + { + xlock( &omutex ); + if( mastership_granted() ) // already granted + { xunlock( &omutex ); return ( master_worker_id == worker_id ); } + if( error_member_id < 0 || error_member_id > member_id ) + error_member_id = member_id; + while( !mastership_granted() && ( worker_id != deliver_worker_id || + !opacket_queues[deliver_worker_id].empty() ) ) + xwait( &check_master, &omutex ); + if( !mastership_granted() && worker_id == deliver_worker_id && + opacket_queues[deliver_worker_id].empty() ) + { + master_worker_id = worker_id; // grant mastership + for( int i = 0; i < num_workers; ++i ) // delete all packets + while( !opacket_queues[i].empty() ) + opacket_queues[i].pop(); + xbroadcast( &check_master ); + xunlock( &omutex ); + return true; + } + xunlock( &omutex ); + return false; // mastership granted to another worker + } + + void worker_finished() + { + // notify muxer when last worker exits + xlock( &omutex ); + if( --num_working == 0 ) xsignal( &oav_or_exit ); + xunlock( &omutex ); + } + + /* Collect a packet from a worker. + If a packet is rejected, the worker must terminate. */ + bool collect_packet( const long member_id, const int worker_id, + const char * const msg, + const Packet::Status status = Packet::ok ) + { + const Packet * const opacket = new Packet( member_id, msg, status ); + xlock( &omutex ); + if( ( mastership_granted() && master_worker_id != worker_id ) || + ( error_member_id >= 0 && error_member_id < opacket->member_id ) ) + { xunlock( &omutex ); delete opacket; return false; } // reject packet + while( opacket_queues[worker_id].size() >= out_slots ) + xwait( &slot_av[worker_id], &omutex ); + opacket_queues[worker_id].push( opacket ); + if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit ); + xunlock( &omutex ); + return true; + } + + /* Deliver a packet to muxer. + If packet.status == Packet::member_done, move to next queue. + If packet.line.empty(), wait again (empty lzip member). */ + const Packet * deliver_packet() + { + const Packet * opacket = 0; + xlock( &omutex ); + ++ocheck_counter; + while( true ) + { + while( opacket_queues[deliver_worker_id].empty() && num_working > 0 ) + { + ++owait_counter; + if( !mastership_granted() && error_member_id >= 0 ) + xbroadcast( &check_master ); // mastership requested not yet granted + xwait( &oav_or_exit, &omutex ); + } + if( opacket_queues[deliver_worker_id].empty() ) break; + opacket = opacket_queues[deliver_worker_id].front(); + opacket_queues[deliver_worker_id].pop(); + if( opacket_queues[deliver_worker_id].size() + 1 == out_slots ) + xsignal( &slot_av[deliver_worker_id] ); + if( opacket->status == Packet::member_done && !mastership_granted() ) + { if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0; } + if( !opacket->line.empty() ) break; + delete opacket; opacket = 0; + } + xunlock( &omutex ); + return opacket; + } + + bool finished() // all packets delivered to muxer + { + if( num_working != 0 ) return false; + for( int i = 0; i < num_workers; ++i ) + if( !opacket_queues[i].empty() ) return false; + return true; + } + }; + + +const char * skip_member_lz( Archive_reader_i & ar, Packet_courier & courier, + const Extended & extended, const long member_id, + const int worker_id ) + { + if( ar.skip_member( extended ) != 0 ) return ar.e_msg(); + if( !courier.collect_packet( member_id, worker_id, "", + ar.at_member_end() ? Packet::member_done : Packet::ok ) ) + return other_msg; + return 0; + } + + +const char * compare_member_lz( const Cl_options & cl_opts, + Archive_reader_i & ar, Packet_courier & courier, + const Extended & extended, const Tar_header header, + Resizable_buffer & rbuf, const long member_id, + const int worker_id ) + { + if( verbosity < 1 ) rbuf()[0] = 0; + else if( !format_member_name( extended, header, rbuf, verbosity > 1 ) ) + return mem_msg; + std::string estr, ostr; + const bool stat_differs = + !compare_file_type( estr, ostr, cl_opts, extended, header ); + if( ( rbuf()[0] && !courier.collect_packet( member_id, worker_id, rbuf(), + Packet::ok ) ) || + ( estr.size() && !courier.collect_packet( member_id, worker_id, + estr.c_str(), Packet::diag ) ) || + ( ostr.size() && !courier.collect_packet( member_id, worker_id, + ostr.c_str(), Packet::ok ) ) || + ( extended.file_size() <= 0 && ar.at_member_end() && + !courier.collect_packet( member_id, worker_id, "", Packet::member_done ) ) ) + return other_msg; + if( extended.file_size() <= 0 ) return 0; + const Typeflag typeflag = (Typeflag)header[typeflag_o]; + if( ( typeflag != tf_regular && typeflag != tf_hiperf ) || stat_differs ) + return skip_member_lz( ar, courier, extended, member_id, worker_id ); + // else compare file contents + const char * const filename = extended.path().c_str(); + const int infd2 = open_instream( filename ); + if( infd2 < 0 ) { set_error_status( 1 ); + return skip_member_lz( ar, courier, extended, member_id, worker_id ); } + int retval = compare_file_contents( estr, ostr, ar, extended.file_size(), + filename, infd2 ); + if( retval ) return ar.e_msg(); + if( ( estr.size() && !courier.collect_packet( member_id, worker_id, + estr.c_str(), Packet::diag ) ) || + ( ostr.size() && !courier.collect_packet( member_id, worker_id, + ostr.c_str(), Packet::ok ) ) || + ( ar.at_member_end() && + !courier.collect_packet( member_id, worker_id, "", Packet::member_done ) ) ) + return other_msg; + return 0; + } + + +const char * list_member_lz( Archive_reader_i & ar, Packet_courier & courier, + const Extended & extended, const Tar_header header, + Resizable_buffer & rbuf, const long member_id, + const int worker_id ) + { + if( verbosity < 0 ) rbuf()[0] = 0; + else if( !format_member_name( extended, header, rbuf, verbosity > 0 ) ) + return mem_msg; + const int ret = ar.skip_member( extended ); // print name even on error + if( !courier.collect_packet( member_id, worker_id, rbuf(), + ar.at_member_end() ? Packet::member_done : Packet::ok ) ) + return other_msg; + if( ret != 0 ) return ar.e_msg(); + return 0; + } + + +struct Worker_arg + { + const Cl_options * cl_opts; + const Archive_descriptor * ad; + Packet_courier * courier; + std::vector< char > * name_pending; + int worker_id; + int num_workers; + }; + + +/* Read lzip members from archive, decode their tar members, and give the + packets produced to courier. +*/ +extern "C" void * dworker( void * arg ) + { + const Worker_arg & tmp = *(const Worker_arg *)arg; + const Cl_options & cl_opts = *tmp.cl_opts; + const Archive_descriptor & ad = *tmp.ad; + Packet_courier & courier = *tmp.courier; + std::vector< char > & name_pending = *tmp.name_pending; + const int worker_id = tmp.worker_id; + const int num_workers = tmp.num_workers; + + bool master = false; + Resizable_buffer rbuf; + Archive_reader_i ar( ad ); // 1 of N parallel readers + if( !rbuf.size() || ar.fatal() ) + { if( courier.request_mastership( worker_id, worker_id ) ) + courier.collect_packet( worker_id, worker_id, mem_msg, Packet::error ); + goto done; } + + for( long i = worker_id; !master && i < ad.lzip_index.members(); i += num_workers ) + { + if( ad.lzip_index.dblock( i ).size() <= 0 ) // empty lzip member + { + if( courier.collect_packet( i, worker_id, "", Packet::member_done ) ) + continue; else break; + } + + long long data_end = ad.lzip_index.dblock( i ).end(); + Extended extended; // metadata from extended records + bool prev_extended = false; // prev header was extended + ar.set_member( i ); // prepare for new member + while( true ) // process one tar header per iteration + { + if( ar.data_pos() >= data_end ) // dblock.end or udata_size + { + if( ar.data_pos() == data_end && !prev_extended ) break; + // member end exceeded or ends in extended, process rest of file + if( !courier.request_mastership( i, worker_id ) ) goto done; + master = true; + if( data_end >= ad.lzip_index.udata_size() ) + { courier.collect_packet( i, worker_id, end_msg, Packet::error ); + goto done; } + data_end = ad.lzip_index.udata_size(); + if( ar.data_pos() == data_end && !prev_extended ) break; + } + Tar_header header; + const int ret = ar.read( header, header_size ); + if( ret != 0 ) + { if( courier.request_mastership( i, worker_id ) ) + courier.collect_packet( i, worker_id, ar.e_msg(), Packet::error ); + goto done; } + if( !verify_ustar_chksum( header ) ) + { + if( !courier.request_mastership( i, worker_id ) ) goto done; + if( block_is_zero( header, header_size ) ) // EOF + { + if( !prev_extended || cl_opts.permissive ) courier.report_eof(); + else courier.collect_packet( i, worker_id, fv_msg1, Packet::error ); + goto done; + } + courier.collect_packet( i, worker_id, ( ar.data_pos() > header_size ) ? + bad_hdr_msg : posix_lz_msg, Packet::error ); + goto done; + } + + const Typeflag typeflag = (Typeflag)header[typeflag_o]; + if( typeflag == tf_global ) + { + const char * msg = 0; + Extended dummy; // global headers are parsed and ignored + if( prev_extended && !cl_opts.permissive ) msg = fv_msg2; + else if( ar.parse_records( dummy, header, rbuf, true ) != 0 ) + msg = gblrec_msg; + else + { + if( ar.data_pos() == data_end && // end of lzip member or EOF + !courier.collect_packet( i, worker_id, "", Packet::member_done ) ) + goto done; + continue; + } + if( courier.request_mastership( i, worker_id ) ) + courier.collect_packet( i, worker_id, msg, Packet::error ); + goto done; + } + if( typeflag == tf_extended ) + { + const char * msg = 0; + if( prev_extended && !cl_opts.permissive ) msg = fv_msg3; + else if( ar.parse_records( extended, header, rbuf, + cl_opts.permissive ) != 0 ) msg = extrec_msg; + else if( !extended.crc_present() && cl_opts.missing_crc ) + msg = mcrc_msg; + else { prev_extended = true; continue; } + if( courier.request_mastership( i, worker_id ) ) + courier.collect_packet( i, worker_id, msg, Packet::error ); + goto done; + } + prev_extended = false; + + extended.fill_from_ustar( header ); // copy metadata from header + + const char * msg; + if( check_skip_filename( cl_opts, name_pending, extended.path().c_str() ) ) + msg = skip_member_lz( ar, courier, extended, i, worker_id ); + else if( cl_opts.program_mode == m_list ) + msg = list_member_lz( ar, courier, extended, header, rbuf, i, worker_id ); + else msg = compare_member_lz( cl_opts, ar, courier, extended, header, + rbuf, i, worker_id ); + if( msg ) + { if( courier.request_mastership( i, worker_id ) ) + courier.collect_packet( i, worker_id, msg, Packet::error ); + goto done; } + extended.reset(); + } + } +done: + courier.worker_finished(); + return 0; + } + + +/* Get from courier the processed and sorted packets, and print + the member lines on stdout or the diagnostics and errors on stderr. +*/ +void muxer( const char * const archive_namep, Packet_courier & courier ) + { + bool error = false; + while( !error ) + { + const Packet * const opacket = courier.deliver_packet(); + if( !opacket ) break; // queue is empty. all workers exited + + switch( opacket->status ) + { + case Packet::error: + show_file_error( archive_namep, opacket->line.c_str() ); + error = true; break; + case Packet::diag: std::fputs( opacket->line.c_str(), stderr ); break; + default: if( opacket->line.size() ) + { std::fputs( opacket->line.c_str(), stdout ); std::fflush( stdout ); } + } + delete opacket; + } + if( !error && !courier.eof_found() ) // no worker found EOF blocks + { show_file_error( archive_namep, end_msg ); error = true; } + if( error ) cleanup_and_fail( 2 ); + } + +} // end namespace + + +// init the courier, then start the workers and call the muxer. +int decode_lz( const Cl_options & cl_opts, const Archive_descriptor & ad, + std::vector< char > & name_pending ) + { + const int out_slots = 65536; // max small files (<=512B) in 64 MiB + const int num_workers = // limited to number of members + std::min( (long)cl_opts.num_workers, ad.lzip_index.members() ); + + /* If an error happens after any threads have been started, exit must be + called before courier goes out of scope. */ + Packet_courier courier( num_workers, out_slots ); + + Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers]; + pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; + if( !worker_args || !worker_threads ) { show_error( mem_msg ); return 1; } + for( int i = 0; i < num_workers; ++i ) + { + worker_args[i].cl_opts = &cl_opts; + worker_args[i].ad = &ad; + worker_args[i].courier = &courier; + worker_args[i].name_pending = &name_pending; + worker_args[i].worker_id = i; + worker_args[i].num_workers = num_workers; + const int errcode = + pthread_create( &worker_threads[i], 0, dworker, &worker_args[i] ); + if( errcode ) + { show_error( "Can't create worker threads", errcode ); cleanup_and_fail(); } + } + + muxer( ad.namep, courier ); + + for( int i = num_workers - 1; i >= 0; --i ) + { + const int errcode = pthread_join( worker_threads[i], 0 ); + if( errcode ) + { show_error( "Can't join worker threads", errcode ); cleanup_and_fail(); } + } + delete[] worker_threads; + delete[] worker_args; + + int retval = 0; + if( close( ad.infd ) != 0 ) + { show_file_error( ad.namep, "Error closing archive", errno ); retval = 1; } + + if( retval == 0 ) + for( int i = 0; i < cl_opts.parser.arguments(); ++i ) + if( nonempty_arg( cl_opts.parser, i ) && name_pending[i] ) + { show_file_error( cl_opts.parser.argument( i ).c_str(), + "Not found in archive." ); retval = 1; } + + if( cl_opts.debug_level & 1 ) + std::fprintf( stderr, + "muxer tried to consume from workers %8u times\n" + "muxer had to wait %8u times\n", + courier.ocheck_counter, + courier.owait_counter ); + + if( !courier.finished() ) internal_error( "courier not finished." ); + return final_exit_status( retval, cl_opts.program_mode != m_diff ); + } |