summaryrefslogtreecommitdiffstats
path: root/dec_stdout.cc
diff options
context:
space:
mode:
Diffstat (limited to 'dec_stdout.cc')
-rw-r--r--dec_stdout.cc42
1 files changed, 20 insertions, 22 deletions
diff --git a/dec_stdout.cc b/dec_stdout.cc
index fda4e9e..6c750c6 100644
--- a/dec_stdout.cc
+++ b/dec_stdout.cc
@@ -1,6 +1,6 @@
/* Plzip - Parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek.
- Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
+ Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -59,11 +59,10 @@ private:
std::vector< std::queue< Packet * > > opacket_queues;
int num_working; // number of workers still running
const int num_workers; // number of workers
- const int num_slots; // max output packets in circulation
- int num_free; // remaining free output slots
+ const unsigned out_slots; // max output packets per queue
pthread_mutex_t omutex;
pthread_cond_t oav_or_exit; // output packet available or all workers exited
- pthread_cond_t slot_av; // free output slot available
+ std::vector< pthread_cond_t > slot_av; // output slot available
Packet_courier( const Packet_courier & ); // declared as private
void operator=( const Packet_courier & ); // declared as private
@@ -73,11 +72,17 @@ public:
: ocheck_counter( 0 ), owait_counter( 0 ),
deliver_worker_id( 0 ),
opacket_queues( workers ), num_working( workers ),
- num_workers( workers ), num_slots( 8 * slots ), num_free( num_slots )
- { xinit( &omutex ); xinit( &oav_or_exit ); xinit( &slot_av ); }
+ num_workers( workers ), out_slots( slots ), slot_av( workers )
+ {
+ xinit( &omutex ); xinit( &oav_or_exit );
+ for( unsigned i = 0; i < slot_av.size(); ++i ) xinit( &slot_av[i] );
+ }
~Packet_courier()
- { xdestroy( &slot_av ); xdestroy( &oav_or_exit ); xdestroy( &omutex ); }
+ {
+ for( unsigned i = 0; i < slot_av.size(); ++i ) xdestroy( &slot_av[i] );
+ xdestroy( &oav_or_exit ); xdestroy( &omutex );
+ }
void worker_finished()
{
@@ -93,9 +98,8 @@ public:
xlock( &omutex );
if( opacket->data )
{
- while( worker_id != deliver_worker_id && num_free <= 0 )
- xwait( &slot_av, &omutex );
- --num_free;
+ while( opacket_queues[worker_id].size() >= out_slots )
+ xwait( &slot_av[worker_id], &omutex );
}
opacket_queues[worker_id].push( opacket );
if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit );
@@ -119,13 +123,10 @@ public:
if( opacket_queues[deliver_worker_id].empty() ) break;
opacket = opacket_queues[deliver_worker_id].front();
opacket_queues[deliver_worker_id].pop();
- if( opacket->data )
- {
- if( ++num_free == 1 ) xsignal( &slot_av );
- break;
- }
+ if( opacket_queues[deliver_worker_id].size() + 1 == out_slots )
+ xsignal( &slot_av[deliver_worker_id] );
+ if( opacket->data ) break;
if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0;
- xbroadcast( &slot_av ); // restart deliver_worker_id thread
delete opacket; opacket = 0;
}
xunlock( &omutex );
@@ -134,7 +135,7 @@ public:
bool finished() // all packets delivered to muxer
{
- if( num_free != num_slots || num_working != 0 ) return false;
+ if( num_working != 0 ) return false;
for( int i = 0; i < num_workers; ++i )
if( !opacket_queues[i].empty() ) return false;
return true;
@@ -271,11 +272,8 @@ int dec_stdout( const int num_workers, const int infd, const int outfd,
const Pretty_print & pp, const int debug_level,
const File_index & file_index )
{
- const int slots_per_worker = 2;
- const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ?
- num_workers * slots_per_worker : INT_MAX );
-
- Packet_courier courier( num_workers, num_slots );
+ const int out_slots = 32;
+ Packet_courier courier( num_workers, out_slots );
Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers];
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];