From e07b6bef9e2cc970899d6e83c3a78245a915ae81 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 7 Nov 2015 16:36:52 +0100 Subject: Adding upstream version 1.2~pre1. Signed-off-by: Daniel Baumann --- dec_stream.cc | 49 +++++++++++++++++++++++-------------------------- 1 file changed, 23 insertions(+), 26 deletions(-) (limited to 'dec_stream.cc') diff --git a/dec_stream.cc b/dec_stream.cc index 64dcce3..2897002 100644 --- a/dec_stream.cc +++ b/dec_stream.cc @@ -1,6 +1,6 @@ /* Plzip - Parallel compressor compatible with lzip Copyright (C) 2009 Laszlo Ersek. - Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz. + Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Antonio Diaz Diaz. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -65,35 +65,36 @@ private: std::vector< std::queue< Packet * > > opacket_queues; int num_working; // number of workers still running const int num_workers; // number of workers - const int num_slots; // max output packets in circulation - int num_free; // remaining free output slots + const unsigned out_slots; // max output packets per queue pthread_mutex_t imutex; pthread_cond_t iav_or_eof; // input packet available or splitter done pthread_mutex_t omutex; pthread_cond_t oav_or_exit; // output packet available or all workers exited - pthread_cond_t slot_av; // free output slot available - bool eof; // splitter done + std::vector< pthread_cond_t > slot_av; // output slot available + bool eof; // splitter done Packet_courier( const Packet_courier & ); // declared as private void operator=( const Packet_courier & ); // declared as private public: - Packet_courier( const int workers, const int slots ) + Packet_courier( const int workers, const int in_slots, const int oslots ) : icheck_counter( 0 ), iwait_counter( 0 ), ocheck_counter( 0 ), owait_counter( 0 ), receive_worker_id( 0 ), deliver_worker_id( 0 ), - slot_tally( slots ), ipacket_queues( workers ), + slot_tally( in_slots ), ipacket_queues( workers ), opacket_queues( workers ), num_working( workers ), - num_workers( workers ), num_slots( 8 * slots ), num_free( num_slots ), + num_workers( workers ), out_slots( oslots ), slot_av( workers ), eof( false ) { xinit( &imutex ); xinit( &iav_or_eof ); - xinit( &omutex ); xinit( &oav_or_exit ); xinit( &slot_av ); + xinit( &omutex ); xinit( &oav_or_exit ); + for( unsigned i = 0; i < slot_av.size(); ++i ) xinit( &slot_av[i] ); } ~Packet_courier() { - xdestroy( &slot_av ); xdestroy( &oav_or_exit ); xdestroy( &omutex ); + for( unsigned i = 0; i < slot_av.size(); ++i ) xdestroy( &slot_av[i] ); + xdestroy( &oav_or_exit ); xdestroy( &omutex ); xdestroy( &iav_or_eof ); xdestroy( &imutex ); } @@ -149,9 +150,8 @@ public: xlock( &omutex ); if( opacket->data ) { - while( worker_id != deliver_worker_id && num_free <= 0 ) - xwait( &slot_av, &omutex ); - --num_free; + while( opacket_queues[worker_id].size() >= out_slots ) + xwait( &slot_av[worker_id], &omutex ); } opacket_queues[worker_id].push( opacket ); if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit ); @@ -175,13 +175,10 @@ public: if( opacket_queues[deliver_worker_id].empty() ) break; opacket = opacket_queues[deliver_worker_id].front(); opacket_queues[deliver_worker_id].pop(); - if( opacket->data ) - { - if( ++num_free == 1 ) xsignal( &slot_av ); - break; - } + if( opacket_queues[deliver_worker_id].size() + 1 == out_slots ) + xsignal( &slot_av[deliver_worker_id] ); + if( opacket->data ) break; if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0; - xbroadcast( &slot_av ); // restart deliver_worker_id thread delete opacket; opacket = 0; } xunlock( &omutex ); @@ -198,8 +195,7 @@ public: bool finished() // all packets delivered to muxer { - if( !slot_tally.all_free() || - num_free != num_slots || !eof || num_working != 0 ) return false; + if( !slot_tally.all_free() || !eof || num_working != 0 ) return false; for( int i = 0; i < num_workers; ++i ) if( !ipacket_queues[i].empty() ) return false; for( int i = 0; i < num_workers; ++i ) @@ -408,7 +404,7 @@ extern "C" void * dworker_s( void * arg ) if( trailing_garbage_found || LZ_decompress_finished( decoder ) == 1 ) { - LZ_decompress_reset( decoder ); // prepare for new ipacket + LZ_decompress_reset( decoder ); // prepare for new member Packet * opacket = new Packet; // end of member token opacket->data = 0; opacket->size = 0; @@ -464,12 +460,13 @@ int dec_stream( const int num_workers, const int infd, const int outfd, const Pretty_print & pp, const int debug_level, const bool testing ) { - const int slots_per_worker = 2; - const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ? - num_workers * slots_per_worker : INT_MAX ); + const int in_slots_per_worker = 2; + const int out_slots = 32; + const int in_slots = ( INT_MAX / num_workers >= in_slots_per_worker ) ? + num_workers * in_slots_per_worker : INT_MAX; in_size = 0; out_size = 0; - Packet_courier courier( num_workers, num_slots ); + Packet_courier courier( num_workers, in_slots, out_slots ); Splitter_arg splitter_arg; splitter_arg.courier = &courier; -- cgit v1.2.3