From 25ff7222c94a48b16ce76f161b6e2ae6efc54c5a Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 7 Nov 2015 16:39:51 +0100 Subject: Adding upstream version 1.3~pre1. Signed-off-by: Daniel Baumann --- compress.cc | 47 +++++++++++++++++++++++------------------------ 1 file changed, 23 insertions(+), 24 deletions(-) (limited to 'compress.cc') diff --git a/compress.cc b/compress.cc index d47d4c9..74f8aa1 100644 --- a/compress.cc +++ b/compress.cc @@ -156,9 +156,11 @@ const char * const mem_msg = "Not enough memory. Try a smaller dictionary size"; struct Packet // data block with a serial number { - unsigned id; // serial number assigned as received uint8_t * data; int size; // number of bytes in data (if any) + unsigned id; // serial number assigned as received + Packet( uint8_t * const d, const int s, const unsigned i ) + : data( d ), size( s ), id( i ) {} }; @@ -207,10 +209,7 @@ public: // make a packet with data received from splitter void receive_packet( uint8_t * const data, const int size ) { - Packet * const ipacket = new Packet; - ipacket->id = receive_id++; - ipacket->data = data; - ipacket->size = size; + Packet * const ipacket = new Packet( data, size, receive_id++ ); slot_tally.get_slot(); // wait for a free slot xlock( &imutex ); packet_queue.push( ipacket ); @@ -310,6 +309,7 @@ struct Splitter_arg const Pretty_print * pp; int infd; int data_size; + int offset; }; @@ -322,12 +322,13 @@ extern "C" void * csplitter( void * arg ) const Pretty_print & pp = *tmp.pp; const int infd = tmp.infd; const int data_size = tmp.data_size; + const int offset = tmp.offset; for( bool first_post = true; ; first_post = false ) { - uint8_t * const data = new( std::nothrow ) uint8_t[data_size]; + uint8_t * const data = new( std::nothrow ) uint8_t[offset+data_size]; if( !data ) { pp( mem_msg ); cleanup_and_fail(); } - const int size = readblock( infd, data, data_size ); + const int size = readblock( infd, data + offset, data_size ); if( size != data_size && errno ) { pp(); show_error( "Read error", errno ); cleanup_and_fail(); } @@ -354,6 +355,7 @@ struct Worker_arg const Pretty_print * pp; int dictionary_size; int match_len_limit; + int offset; }; @@ -366,15 +368,13 @@ extern "C" void * cworker( void * arg ) const Pretty_print & pp = *tmp.pp; const int dictionary_size = tmp.dictionary_size; const int match_len_limit = tmp.match_len_limit; + const int offset = tmp.offset; while( true ) { Packet * const packet = courier.distribute_packet(); if( !packet ) break; // no more packets to process - const int max_compr_size = 42 + packet->size + ( ( packet->size + 7 ) / 8 ); - uint8_t * const new_data = new( std::nothrow ) uint8_t[max_compr_size]; - if( !new_data ) { pp( mem_msg ); cleanup_and_fail(); } const int dict_size = std::max( LZ_min_dictionary_size(), std::min( dictionary_size, packet->size ) ); LZ_Encoder * const encoder = @@ -396,16 +396,16 @@ extern "C" void * cworker( void * arg ) { if( written < packet->size ) { - const int wr = LZ_compress_write( encoder, packet->data + written, + const int wr = LZ_compress_write( encoder, + packet->data + offset + written, packet->size - written ); if( wr < 0 ) internal_error( "library error (LZ_compress_write)" ); written += wr; } - if( written >= packet->size ) - { delete[] packet->data; LZ_compress_finish( encoder ); } + if( written >= packet->size ) LZ_compress_finish( encoder ); } - const int rd = LZ_compress_read( encoder, new_data + new_pos, - max_compr_size - new_pos ); + const int rd = LZ_compress_read( encoder, packet->data + new_pos, + offset + written - new_pos ); if( rd < 0 ) { pp(); @@ -415,7 +415,7 @@ extern "C" void * cworker( void * arg ) cleanup_and_fail(); } new_pos += rd; - if( new_pos > max_compr_size ) + if( new_pos >= offset + written ) internal_error( "packet size exceeded in worker" ); if( LZ_compress_finished( encoder ) == 1 ) break; } @@ -423,8 +423,7 @@ extern "C" void * cworker( void * arg ) if( LZ_compress_close( encoder ) < 0 ) { pp( "LZ_compress_close failed." ); cleanup_and_fail(); } - if( verbosity >= 2 && packet->size > 0 ) show_progress( packet->size ); - packet->data = new_data; + if( packet->size > 0 ) show_progress( packet->size ); packet->size = new_pos; courier.collect_packet( packet ); } @@ -447,12 +446,9 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd ) const Packet * const opacket = packet_vector[i]; out_size += opacket->size; - if( outfd >= 0 ) - { - const int wr = writeblock( outfd, opacket->data, opacket->size ); - if( wr != opacket->size ) - { pp(); show_error( "Write error", errno ); cleanup_and_fail(); } - } + const int wr = writeblock( outfd, opacket->data, opacket->size ); + if( wr != opacket->size ) + { pp(); show_error( "Write error", errno ); cleanup_and_fail(); } delete[] opacket->data; delete opacket; } @@ -469,6 +465,7 @@ int compress( const int data_size, const int dictionary_size, const int infd, const int outfd, const Pretty_print & pp, const int debug_level ) { + const int offset = data_size / 8; const int slots_per_worker = 2; const int num_slots = ( ( num_workers > 1 ) ? num_workers * slots_per_worker : 1 ); @@ -481,6 +478,7 @@ int compress( const int data_size, const int dictionary_size, splitter_arg.pp = &pp; splitter_arg.infd = infd; splitter_arg.data_size = data_size; + splitter_arg.offset = offset; pthread_t splitter_thread; int errcode = pthread_create( &splitter_thread, 0, csplitter, &splitter_arg ); @@ -492,6 +490,7 @@ int compress( const int data_size, const int dictionary_size, worker_arg.pp = &pp; worker_arg.dictionary_size = dictionary_size; worker_arg.match_len_limit = match_len_limit; + worker_arg.offset = offset; pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; if( !worker_threads ) { pp( mem_msg ); cleanup_and_fail(); } -- cgit v1.2.3