diff options
Diffstat (limited to 'dec_stream.cc')
-rw-r--r-- | dec_stream.cc | 64 |
1 files changed, 34 insertions, 30 deletions
diff --git a/dec_stream.cc b/dec_stream.cc index f22f790..64eaa30 100644 --- a/dec_stream.cc +++ b/dec_stream.cc @@ -47,6 +47,8 @@ struct Packet // data block { uint8_t * data; // data == 0 means end of member int size; // number of bytes in data (if any) + explicit Packet( uint8_t * const d = 0, const int s = 0 ) + : data( d ), size( s ) {} }; @@ -102,9 +104,7 @@ public: // if data == 0, move to next queue void receive_packet( uint8_t * const data, const int size ) { - Packet * ipacket = new Packet; - ipacket->data = data; - ipacket->size = size; + Packet * const ipacket = new Packet( data, size ); if( data ) { in_size += size; slot_tally.get_slot(); } // wait for a free slot xlock( &imutex ); @@ -185,6 +185,13 @@ public: return opacket; } + void add_out_size( const unsigned long long partial_out_size ) + { + xlock( &omutex ); + out_size += partial_out_size; + xunlock( &omutex ); + } + void finish() // splitter has no more packets to send { xlock( &imutex ); @@ -269,6 +276,7 @@ extern "C" void * dsplitter_s( void * arg ) header.version() ); } cleanup_and_fail( 2 ); } + show_header( header.dictionary_size() ); unsigned long long partial_member_size = 0; while( true ) @@ -337,22 +345,25 @@ struct Worker_arg Packet_courier * courier; const Pretty_print * pp; int worker_id; + bool testing; }; - // consume packets from courier, decompress their contents, and - // give the produced packets to courier. + // consume packets from courier, decompress their contents and, + // if not testing, give the produced packets to courier. extern "C" void * dworker_s( void * arg ) { const Worker_arg & tmp = *(Worker_arg *)arg; Packet_courier & courier = *tmp.courier; const Pretty_print & pp = *tmp.pp; const int worker_id = tmp.worker_id; + const bool testing = tmp.testing; uint8_t * new_data = new( std::nothrow ) uint8_t[max_packet_size]; LZ_Decoder * const decoder = LZ_decompress_open(); if( !new_data || !decoder || LZ_decompress_errno( decoder ) != LZ_ok ) { pp( "Not enough memory." ); cleanup_and_fail(); } + unsigned long long partial_out_size = 0; int new_pos = 0; bool trailing_garbage_found = false; @@ -391,24 +402,21 @@ extern "C" void * dworker_s( void * arg ) if( new_pos == max_packet_size || trailing_garbage_found || LZ_decompress_finished( decoder ) == 1 ) { - if( new_pos > 0 ) // make data packet + if( !testing && new_pos > 0 ) // make data packet { - Packet * opacket = new Packet; - opacket->data = new_data; - opacket->size = new_pos; + Packet * const opacket = new Packet( new_data, new_pos ); courier.collect_packet( opacket, worker_id ); - new_pos = 0; new_data = new( std::nothrow ) uint8_t[max_packet_size]; if( !new_data ) { pp( "Not enough memory." ); cleanup_and_fail(); } } + partial_out_size += new_pos; + new_pos = 0; if( trailing_garbage_found || LZ_decompress_finished( decoder ) == 1 ) { + if( !testing ) // end of member token + courier.collect_packet( new Packet, worker_id ); LZ_decompress_reset( decoder ); // prepare for new member - Packet * opacket = new Packet; // end of member token - opacket->data = 0; - opacket->size = 0; - courier.collect_packet( opacket, worker_id ); break; } } @@ -421,6 +429,7 @@ extern "C" void * dworker_s( void * arg ) } delete[] new_data; + courier.add_out_size( partial_out_size ); if( LZ_decompress_member_position( decoder ) != 0 ) { pp( "Error, some data remains in decoder." ); cleanup_and_fail(); } if( LZ_decompress_close( decoder ) < 0 ) @@ -435,17 +444,12 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd ) { while( true ) { - Packet * opacket = courier.deliver_packet(); + Packet * const opacket = courier.deliver_packet(); if( !opacket ) break; // queue is empty. all workers exited - out_size += opacket->size; - - if( outfd >= 0 ) - { - const int wr = writeblock( outfd, opacket->data, opacket->size ); - if( wr != opacket->size ) - { pp(); show_error( "Write error", errno ); cleanup_and_fail(); } - } + const int wr = writeblock( outfd, opacket->data, opacket->size ); + if( wr != opacket->size ) + { pp(); show_error( "Write error", errno ); cleanup_and_fail(); } delete[] opacket->data; delete opacket; } @@ -454,11 +458,10 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd ) } // end namespace - // init the courier, then start the splitter and the workers and - // call the muxer. + // init the courier, then start the splitter and the workers and, + // if not testing, call the muxer. int dec_stream( const int num_workers, const int infd, const int outfd, - const Pretty_print & pp, const int debug_level, - const bool testing ) + const Pretty_print & pp, const int debug_level ) { const int in_slots_per_worker = 2; const int out_slots = 32; @@ -487,12 +490,13 @@ int dec_stream( const int num_workers, const int infd, const int outfd, worker_args[i].courier = &courier; worker_args[i].pp = &pp; worker_args[i].worker_id = i; + worker_args[i].testing = ( outfd < 0 ); errcode = pthread_create( &worker_threads[i], 0, dworker_s, &worker_args[i] ); if( errcode ) { show_error( "Can't create worker threads", errcode ); cleanup_and_fail(); } } - muxer( courier, pp, outfd ); + if( outfd >= 0 ) muxer( courier, pp, outfd ); for( int i = num_workers - 1; i >= 0; --i ) { @@ -512,11 +516,11 @@ int dec_stream( const int num_workers, const int infd, const int outfd, (double)out_size / in_size, ( 8.0 * in_size ) / out_size, 100.0 * ( 1.0 - ( (double)in_size / out_size ) ) ); - if( verbosity >= 3 ) + if( verbosity >= 4 ) std::fprintf( stderr, "decompressed size %9llu, size %9llu. ", out_size, in_size ); - if( verbosity >= 1 ) std::fprintf( stderr, testing ? "ok\n" : "done\n" ); + if( verbosity >= 1 ) std::fprintf( stderr, (outfd < 0) ? "ok\n" : "done\n" ); if( debug_level & 1 ) std::fprintf( stderr, |