diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2020-08-08 17:10:15 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2020-08-08 17:10:15 +0000 |
commit | 3c320348b5b78d2a9923527f9eac089eb1e2778d (patch) | |
tree | d17322c3a703eb4683e689482c3dbd3041970180 /list_lz.cc | |
parent | Adding upstream version 0.16. (diff) | |
download | tarlz-upstream/0.17.tar.xz tarlz-upstream/0.17.zip |
Adding upstream version 0.17.upstream/0.17
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | list_lz.cc | 586 |
1 files changed, 0 insertions, 586 deletions
diff --git a/list_lz.cc b/list_lz.cc deleted file mode 100644 index bdfad36..0000000 --- a/list_lz.cc +++ /dev/null @@ -1,586 +0,0 @@ -/* Tarlz - Archiver with multimember lzip compression - Copyright (C) 2013-2019 Antonio Diaz Diaz. - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#define _FILE_OFFSET_BITS 64 - -#include <algorithm> -#include <cerrno> -#include <climits> -#include <cstdio> -#include <cstdlib> -#include <cstring> -#include <queue> -#include <string> -#include <vector> -#include <pthread.h> -#include <stdint.h> -#include <unistd.h> -#include <lzlib.h> - -#include "arg_parser.h" -#include "lzip_index.h" -#include "tarlz.h" - - -namespace { - -// Returns the number of bytes really read. -// If (returned value < size) and (errno == 0), means EOF was reached. -// -int preadblock( const int fd, uint8_t * const buf, const int size, - const long long pos ) - { - int sz = 0; - errno = 0; - while( sz < size ) - { - const int n = pread( fd, buf + sz, size - sz, pos + sz ); - if( n > 0 ) sz += n; - else if( n == 0 ) break; // EOF - else if( errno != EINTR ) break; - errno = 0; - } - return sz; - } - -/* -// Returns the number of bytes really written. -// If (returned value < size), it is always an error. -// -int pwriteblock( const int fd, const uint8_t * const buf, const int size, - const long long pos ) - { - int sz = 0; - errno = 0; - while( sz < size ) - { - const int n = pwrite( fd, buf + sz, size - sz, pos + sz ); - if( n > 0 ) sz += n; - else if( n < 0 && errno != EINTR ) break; - errno = 0; - } - return sz; - } -*/ - -struct Packet // member name and metadata or error message - { - enum Status { ok, member_done, error }; - long member_id; // lzip member containing the header of this tar member - std::string line; // member name and metadata ready to print, if any - Status status; - Packet( const long i, const char * const msg, const Status s = ok ) - : member_id( i ), line( msg ), status( s ) {} - }; - - -class Packet_courier // moves packets around - { -public: - unsigned ocheck_counter; - unsigned owait_counter; -private: - long error_member_id; // first lzip member with error/misalign/eof - int deliver_worker_id; // worker queue currently delivering packets - int master_worker_id; // worker in charge if error/misalignment/eof - std::vector< std::queue< const Packet * > > opacket_queues; - int num_working; // number of workers still running - const int num_workers; // number of workers - const unsigned out_slots; // max output packets per queue - pthread_mutex_t omutex; - pthread_cond_t oav_or_exit; // output packet available or all workers exited - std::vector< pthread_cond_t > slot_av; // output slot available - pthread_cond_t check_master; - bool eof_found_; - - Packet_courier( const Packet_courier & ); // declared as private - void operator=( const Packet_courier & ); // declared as private - -public: - Packet_courier( const int workers, const int slots ) - : ocheck_counter( 0 ), owait_counter( 0 ), - error_member_id( -1 ), deliver_worker_id( 0 ), master_worker_id( -1 ), - opacket_queues( workers ), num_working( workers ), - num_workers( workers ), out_slots( slots ), slot_av( workers ), - eof_found_( false ) - { - xinit_mutex( &omutex ); xinit_cond( &oav_or_exit ); - for( unsigned i = 0; i < slot_av.size(); ++i ) xinit_cond( &slot_av[i] ); - xinit_cond( &check_master ); - } - - ~Packet_courier() - { - xdestroy_cond( &check_master ); - for( unsigned i = 0; i < slot_av.size(); ++i ) xdestroy_cond( &slot_av[i] ); - xdestroy_cond( &oav_or_exit ); xdestroy_mutex( &omutex ); - } - - bool eof_found() const { return eof_found_; } - void report_eof() { eof_found_ = true; } - - bool mastership_granted() const { return master_worker_id >= 0; } - - bool request_mastership( const long member_id, const int worker_id ) - { - xlock( &omutex ); - if( mastership_granted() ) // already granted - { xunlock( &omutex ); return ( master_worker_id == worker_id ); } - if( error_member_id < 0 || error_member_id > member_id ) - error_member_id = member_id; - while( !mastership_granted() && ( worker_id != deliver_worker_id || - !opacket_queues[deliver_worker_id].empty() ) ) - xwait( &check_master, &omutex ); - if( !mastership_granted() && worker_id == deliver_worker_id && - opacket_queues[deliver_worker_id].empty() ) - { - master_worker_id = worker_id; // grant mastership - for( int i = 0; i < num_workers; ++i ) // delete all packets - while( !opacket_queues[i].empty() ) - opacket_queues[i].pop(); - xbroadcast( &check_master ); - xunlock( &omutex ); - return true; - } - xunlock( &omutex ); - return false; // mastership granted to another worker - } - - void worker_finished() - { - // notify muxer when last worker exits - xlock( &omutex ); - if( --num_working == 0 ) xsignal( &oav_or_exit ); - xunlock( &omutex ); - } - - /* Collect a packet from a worker. - If a packet is rejected, the worker must terminate. */ - bool collect_packet( const int worker_id, const long member_id, - const char * const msg, - const Packet::Status status = Packet::ok ) - { - const Packet * const opacket = new Packet( member_id, msg, status ); - xlock( &omutex ); - if( ( mastership_granted() && master_worker_id != worker_id ) || - ( error_member_id >= 0 && error_member_id < opacket->member_id ) ) - { xunlock( &omutex ); delete opacket; return false; } // reject packet - while( opacket_queues[worker_id].size() >= out_slots ) - xwait( &slot_av[worker_id], &omutex ); - opacket_queues[worker_id].push( opacket ); - if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit ); - xunlock( &omutex ); - return true; - } - - /* Deliver a packet to muxer. - If packet.status == Packet::member_done, move to next queue. - If packet.line.empty(), wait again (empty lzip member). */ - const Packet * deliver_packet() - { - const Packet * opacket = 0; - xlock( &omutex ); - ++ocheck_counter; - while( true ) - { - while( opacket_queues[deliver_worker_id].empty() && num_working > 0 ) - { - ++owait_counter; - if( !mastership_granted() && error_member_id >= 0 ) - xbroadcast( &check_master ); // mastership requested not yet granted - xwait( &oav_or_exit, &omutex ); - } - if( opacket_queues[deliver_worker_id].empty() ) break; - opacket = opacket_queues[deliver_worker_id].front(); - opacket_queues[deliver_worker_id].pop(); - if( opacket_queues[deliver_worker_id].size() + 1 == out_slots ) - xsignal( &slot_av[deliver_worker_id] ); - if( opacket->status == Packet::member_done && !mastership_granted() ) - { if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0; } - if( !opacket->line.empty() ) break; - delete opacket; opacket = 0; - } - xunlock( &omutex ); - return opacket; - } - - bool finished() // all packets delivered to muxer - { - if( num_working != 0 ) return false; - for( int i = 0; i < num_workers; ++i ) - if( !opacket_queues[i].empty() ) return false; - return true; - } - }; - - -int list_member_lz( LZ_Decoder * const decoder, const int infd, - long long & file_pos, const long long member_end, - const long long cdata_size, long long & data_pos, - const long long mdata_end, Packet_courier & courier, - const Extended & extended, const Tar_header header, - Resizable_buffer & rbuf, const long member_id, - const int worker_id, const char ** msg, const bool skip ) - { - long long rest = extended.file_size(); - const int rem = rest % header_size; - if( rem ) rest += header_size - rem; // padding - const long long data_rest = mdata_end - ( data_pos + rest ); - - if( verbosity < 0 || skip ) rbuf()[0] = 0; - else if( !format_member_name( extended, header, rbuf, verbosity > 0 ) ) - { *msg = mem_msg; return 1; } - if( !courier.collect_packet( worker_id, member_id, rbuf(), - data_rest ? Packet::ok : Packet::member_done ) ) - { *msg = "other worker found an error"; return 1; } - if( data_rest ) - return skip_member_lz( decoder, infd, file_pos, member_end, cdata_size, - data_pos, rest, msg ); - data_pos = mdata_end; - return 0; - } - - -struct Worker_arg - { - const Lzip_index * lzip_index; - Packet_courier * courier; - const Arg_parser * parser; - std::vector< char > * name_pending; - int worker_id; - int num_workers; - int infd; - int filenames; - bool missing_crc; - bool permissive; - }; - - -/* Read lzip members from archive, list their tar members, and give the - packets produced to courier. */ -extern "C" void * tworker( void * arg ) - { - const Worker_arg & tmp = *(const Worker_arg *)arg; - const Lzip_index & lzip_index = *tmp.lzip_index; - Packet_courier & courier = *tmp.courier; - const Arg_parser & parser = *tmp.parser; - std::vector< char > & name_pending = *tmp.name_pending; - const int worker_id = tmp.worker_id; - const int num_workers = tmp.num_workers; - const int infd = tmp.infd; - const int filenames = tmp.filenames; - const int missing_crc = tmp.missing_crc; - const bool permissive = tmp.permissive; - - Resizable_buffer rbuf; - LZ_Decoder * const decoder = LZ_decompress_open(); - if( !rbuf.size() || !decoder || LZ_decompress_errno( decoder ) != LZ_ok ) - { show_error( mem_msg ); cleanup_and_fail(); } - - const long long cdata_size = lzip_index.cdata_size(); - bool master = false; - for( long i = worker_id; !master && i < lzip_index.members(); i += num_workers ) - { - long long data_pos = lzip_index.dblock( i ).pos(); - const long long mdata_end = lzip_index.dblock( i ).end(); - long long data_end = mdata_end; - long long file_pos = lzip_index.mblock( i ).pos(); - const long long member_end = lzip_index.mblock( i ).end(); - if( data_pos >= data_end ) // empty lzip member - { - if( courier.collect_packet( worker_id, i, "", Packet::member_done ) ) - continue; else break; - } - - Extended extended; // metadata from extended records - bool prev_extended = false; // prev header was extended - LZ_decompress_reset( decoder ); // prepare for new member - while( true ) // process one tar header per iteration - { - if( data_pos >= data_end ) - { - if( data_pos == data_end && !prev_extended ) break; - // member end exceeded or ends in extended, process rest of file - if( !courier.request_mastership( i, worker_id ) ) goto done; - master = true; - if( data_end < lzip_index.udata_size() ) - data_end = lzip_index.udata_size(); - else - { courier.collect_packet( worker_id, i, end_msg, Packet::error ); - goto done; } - } - Tar_header header; - const char * msg = 0; - const int ret = archive_read_lz( decoder, infd, file_pos, member_end, - cdata_size, header, header_size, &msg ); - if( ret != 0 ) - { - if( !courier.request_mastership( i, worker_id ) ) goto done; - master = true; - courier.collect_packet( worker_id, i, msg, Packet::error ); - goto done; - } - data_pos += header_size; - if( !verify_ustar_chksum( header ) ) - { - if( !courier.request_mastership( i, worker_id ) ) goto done; - master = true; - if( block_is_zero( header, header_size ) ) // EOF - { - if( !prev_extended || permissive ) courier.report_eof(); - else courier.collect_packet( worker_id, i, fv_msg1, Packet::error ); - goto done; - } - courier.collect_packet( worker_id, i, ( data_pos > header_size ) ? - bad_hdr_msg : posix_lz_msg, Packet::error ); - goto done; - } - - const Typeflag typeflag = (Typeflag)header[typeflag_o]; - if( typeflag == tf_global ) - { - if( prev_extended && !permissive ) - { courier.collect_packet( worker_id, i, fv_msg2, Packet::error ); - goto done; } - Extended dummy; // global headers are parsed and ignored - if( parse_records_lz( decoder, infd, file_pos, member_end, cdata_size, - data_pos, dummy, header, rbuf, &msg, true ) == 0 ) - { - if( data_pos == data_end && // end of lzip member - !courier.collect_packet( worker_id, i, "", Packet::member_done ) ) - goto done; - continue; - } - if( courier.request_mastership( i, worker_id ) ) - courier.collect_packet( worker_id, i, msg ? msg : gblrec_msg, - Packet::error ); - goto done; - } - if( typeflag == tf_extended ) - { - int ret = 0; - if( prev_extended && !permissive ) { msg = fv_msg3; ret = 2; } - else ret = parse_records_lz( decoder, infd, file_pos, member_end, - cdata_size, data_pos, extended, header, - rbuf, &msg, permissive ); - if( ret == 0 && !extended.crc_present() && missing_crc ) - { msg = mcrc_msg; ret = 2; } - if( ret == 0 ) { prev_extended = true; continue; } - if( courier.request_mastership( i, worker_id ) ) - courier.collect_packet( worker_id, i, msg ? msg : extrec_msg, - Packet::error ); - goto done; - } - prev_extended = false; - - extended.fill_from_ustar( header ); // copy metadata from header - - const bool skip = check_skip_filename( parser, name_pending, - extended.path().c_str(), filenames ); - - if( list_member_lz( decoder, infd, file_pos, member_end, cdata_size, - data_pos, mdata_end, courier, extended, - header, rbuf, i, worker_id, &msg, skip ) != 0 ) - { courier.collect_packet( worker_id, i, msg, Packet::error ); - goto done; } - extended.reset(); - } - } -done: - if( LZ_decompress_close( decoder ) < 0 ) - courier.collect_packet( worker_id, lzip_index.members(), - "LZ_decompress_close failed.", Packet::error ); - courier.worker_finished(); - return 0; - } - - -/* Get from courier the processed and sorted packets, and print - the member lines on stdout or the diagnostics on stderr. */ -void muxer( const char * const archive_namep, Packet_courier & courier ) - { - while( true ) - { - const Packet * const opacket = courier.deliver_packet(); - if( !opacket ) break; // queue is empty. all workers exited - - if( opacket->status == Packet::error ) - { show_file_error( archive_namep, opacket->line.c_str() ); - cleanup_and_fail( 2 ); } - if( opacket->line.size() ) - { std::fputs( opacket->line.c_str(), stdout ); std::fflush( stdout ); } - delete opacket; - } - if( !courier.eof_found() ) // no worker found EOF blocks - { show_file_error( archive_namep, end_msg ); cleanup_and_fail( 2 ); } - } - -} // end namespace - - -/* Read 'size' decompressed bytes from the archive. - Return value: 0 = OK, 1 = damaged member, 2 = fatal error. */ -int archive_read_lz( LZ_Decoder * const decoder, const int infd, - long long & file_pos, const long long member_end, - const long long cdata_size, uint8_t * const buf, - const int size, const char ** msg ) - { - int sz = 0; - - while( sz < size ) - { - const int rd = LZ_decompress_read( decoder, buf + sz, size - sz ); - if( rd < 0 ) - { *msg = LZ_strerror( LZ_decompress_errno( decoder ) ); return 1; } - if( rd == 0 && LZ_decompress_finished( decoder ) == 1 ) - { *msg = end_msg; return 2; } - sz += rd; - if( sz < size && LZ_decompress_write_size( decoder ) > 0 ) - { - const long long ibuf_size = 16384; // try 65536 - uint8_t ibuf[ibuf_size]; - const long long rest = ( file_pos < member_end ) ? - member_end - file_pos : cdata_size - file_pos; - const int rsize = std::min( LZ_decompress_write_size( decoder ), - (int)std::min( ibuf_size, rest ) ); - if( rsize <= 0 ) LZ_decompress_finish( decoder ); - else - { - const int rd = preadblock( infd, ibuf, rsize, file_pos ); - if( LZ_decompress_write( decoder, ibuf, rd ) != rd ) - internal_error( "library error (LZ_decompress_write)." ); - file_pos += rd; - if( rd < rsize ) - { - LZ_decompress_finish( decoder ); - if( errno ) { *msg = "Error reading archive"; return 2; } - } - } - } - } - return 0; - } - - -int parse_records_lz( LZ_Decoder * const decoder, const int infd, - long long & file_pos, const long long member_end, - const long long cdata_size, long long & data_pos, - Extended & extended, const Tar_header header, - Resizable_buffer & rbuf, const char ** msg, - const bool permissive ) - { - const long long edsize = parse_octal( header + size_o, size_l ); - const long long bufsize = round_up( edsize ); - if( edsize <= 0 || edsize >= 1LL << 33 || bufsize >= INT_MAX ) - return 1; // overflow or no extended data - if( !rbuf.resize( bufsize ) ) return 1; // extended records buffer - int retval = archive_read_lz( decoder, infd, file_pos, member_end, - cdata_size, (uint8_t *)rbuf(), bufsize, msg ); - if( retval == 0 ) - { if( extended.parse( rbuf(), edsize, permissive ) ) data_pos += bufsize; - else retval = 1; } - return retval; - } - - -int skip_member_lz( LZ_Decoder * const decoder, const int infd, - long long & file_pos, const long long member_end, - const long long cdata_size, long long & data_pos, - long long rest, const char ** msg ) - { - const int bufsize = 32 * header_size; - uint8_t buf[bufsize]; - while( rest > 0 ) // skip tar member - { - const int rsize = ( rest >= bufsize ) ? bufsize : rest; - const int ret = archive_read_lz( decoder, infd, file_pos, member_end, - cdata_size, buf, rsize, msg ); - if( ret != 0 ) return ret; - data_pos += rsize; - rest -= rsize; - } - return 0; - } - - - // init the courier, then start the workers and call the muxer. -int list_lz( const char * const archive_namep, const Arg_parser & parser, - std::vector< char > & name_pending, const Lzip_index & lzip_index, - const int filenames, const int debug_level, const int infd, - const int num_workers, const bool missing_crc, - const bool permissive ) - { - const int out_slots = 65536; // max small files (<=512B) in 64 MiB - - /* If an error happens after any threads have been started, exit must be - called before courier goes out of scope. */ - Packet_courier courier( num_workers, out_slots ); - - Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers]; - pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; - if( !worker_args || !worker_threads ) { show_error( mem_msg ); return 1; } - for( int i = 0; i < num_workers; ++i ) - { - worker_args[i].lzip_index = &lzip_index; - worker_args[i].courier = &courier; - worker_args[i].parser = &parser; - worker_args[i].name_pending = &name_pending; - worker_args[i].worker_id = i; - worker_args[i].num_workers = num_workers; - worker_args[i].infd = infd; - worker_args[i].filenames = filenames; - worker_args[i].missing_crc = missing_crc; - worker_args[i].permissive = permissive; - const int errcode = - pthread_create( &worker_threads[i], 0, tworker, &worker_args[i] ); - if( errcode ) - { show_error( "Can't create worker threads", errcode ); cleanup_and_fail(); } - } - - muxer( archive_namep, courier ); - - for( int i = num_workers - 1; i >= 0; --i ) - { - const int errcode = pthread_join( worker_threads[i], 0 ); - if( errcode ) - { show_error( "Can't join worker threads", errcode ); cleanup_and_fail(); } - } - delete[] worker_threads; - delete[] worker_args; - - int retval = 0; - if( close( infd ) != 0 ) - { show_file_error( archive_namep, "Error closing archive", errno ); - retval = 1; } - - if( retval == 0 ) for( int i = 0; i < parser.arguments(); ++i ) - if( !parser.code( i ) && parser.argument( i ).size() && name_pending[i] ) - { - show_file_error( parser.argument( i ).c_str(), "Not found in archive." ); - retval = 1; - } - - if( debug_level & 1 ) - std::fprintf( stderr, - "muxer tried to consume from workers %8u times\n" - "muxer had to wait %8u times\n", - courier.ocheck_counter, - courier.owait_counter ); - - if( !courier.finished() ) internal_error( "courier not finished." ); - return retval; - } |