summaryrefslogtreecommitdiffstats
path: root/create_lz.cc
diff options
context:
space:
mode:
Diffstat (limited to 'create_lz.cc')
-rw-r--r--create_lz.cc88
1 files changed, 44 insertions, 44 deletions
diff --git a/create_lz.cc b/create_lz.cc
index 5436bf5..b390290 100644
--- a/create_lz.cc
+++ b/create_lz.cc
@@ -22,7 +22,7 @@
#include <cstdio>
#include <queue>
#include <pthread.h>
-#include <stdint.h> // for lzlib.h
+#include <stdint.h> // for lzlib.h
#include <unistd.h>
#include <sys/stat.h>
#include <ftw.h>
@@ -60,7 +60,7 @@ public:
~Slot_tally() { xdestroy_cond( &slot_av ); xdestroy_mutex( &mutex ); }
- bool all_free() { return ( num_free == num_slots ); }
+ bool all_free() { return num_free == num_slots; }
void get_slot() // wait for a free slot
{
@@ -94,8 +94,8 @@ struct Ipacket // filename, file size and headers
struct Opacket // compressed data to be written to the archive
{
- const uint8_t * const data; // data == 0 means end of lzip member
- const int size; // number of bytes in data (if any)
+ const uint8_t * data; // data == 0 means end of lzip member
+ int size; // number of bytes in data (if any)
Opacket() : data( 0 ), size( 0 ) {}
Opacket( uint8_t * const d, const int s ) : data( d ), size( s ) {}
@@ -110,11 +110,11 @@ public:
unsigned ocheck_counter;
unsigned owait_counter;
private:
- int receive_worker_id; // worker queue currently receiving packets
- int deliver_worker_id; // worker queue currently delivering packets
+ int receive_id; // worker queue currently receiving packets
+ int deliver_id; // worker queue currently delivering packets
Slot_tally slot_tally; // limits the number of input packets
std::vector< std::queue< const Ipacket * > > ipacket_queues;
- std::vector< std::queue< const Opacket * > > opacket_queues;
+ std::vector< std::queue< Opacket > > opacket_queues;
int num_working; // number of workers still running
const int num_workers; // number of workers
const unsigned out_slots; // max output packets per queue
@@ -132,11 +132,10 @@ public:
Packet_courier( const int workers, const int in_slots, const int oslots )
: icheck_counter( 0 ), iwait_counter( 0 ),
ocheck_counter( 0 ), owait_counter( 0 ),
- receive_worker_id( 0 ), deliver_worker_id( 0 ),
- slot_tally( in_slots ), ipacket_queues( workers ),
- opacket_queues( workers ), num_working( workers ),
- num_workers( workers ), out_slots( oslots ), slot_av( workers ),
- eof( false )
+ receive_id( 0 ), deliver_id( 0 ), slot_tally( in_slots ),
+ ipacket_queues( workers ), opacket_queues( workers ),
+ num_working( workers ), num_workers( workers ),
+ out_slots( oslots ), slot_av( workers ), eof( false )
{
xinit_mutex( &imutex ); xinit_cond( &iav_or_eof );
xinit_mutex( &omutex ); xinit_cond( &oav_or_exit );
@@ -157,9 +156,9 @@ public:
if( !ipacket->filename.empty() )
slot_tally.get_slot(); // wait for a free slot
xlock( &imutex );
- ipacket_queues[receive_worker_id].push( ipacket );
- if( ipacket->filename.empty() && ++receive_worker_id >= num_workers )
- receive_worker_id = 0;
+ ipacket_queues[receive_id].push( ipacket );
+ if( ipacket->filename.empty() && ++receive_id >= num_workers )
+ receive_id = 0;
xbroadcast( &iav_or_eof );
xunlock( &imutex );
}
@@ -194,44 +193,41 @@ public:
}
// collect an opacket from a worker
- void collect_packet( const Opacket * const opacket, const int worker_id )
+ void collect_packet( const Opacket & opacket, const int worker_id )
{
xlock( &omutex );
- if( opacket->data )
+ if( opacket.data )
{
while( opacket_queues[worker_id].size() >= out_slots )
xwait( &slot_av[worker_id], &omutex );
}
opacket_queues[worker_id].push( opacket );
- if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit );
+ if( worker_id == deliver_id ) xsignal( &oav_or_exit );
xunlock( &omutex );
}
- /* Deliver an opacket to muxer.
- If opacket data == 0, move to next queue and wait again. */
- const Opacket * deliver_packet()
+ /* Deliver opackets to muxer.
+ If opacket.data == 0, skip opacket and move to next queue. */
+ void deliver_packets( std::vector< Opacket > & opacket_vector )
{
- const Opacket * opacket = 0;
+ opacket_vector.clear();
xlock( &omutex );
++ocheck_counter;
- while( true )
- {
- while( opacket_queues[deliver_worker_id].empty() && num_working > 0 )
+ do {
+ while( opacket_queues[deliver_id].empty() && num_working > 0 )
+ { ++owait_counter; xwait( &oav_or_exit, &omutex ); }
+ while( !opacket_queues[deliver_id].empty() )
{
- ++owait_counter;
- xwait( &oav_or_exit, &omutex );
+ Opacket opacket = opacket_queues[deliver_id].front();
+ opacket_queues[deliver_id].pop();
+ if( opacket_queues[deliver_id].size() + 1 == out_slots )
+ xsignal( &slot_av[deliver_id] );
+ if( opacket.data ) opacket_vector.push_back( opacket );
+ else if( ++deliver_id >= num_workers ) deliver_id = 0;
}
- if( opacket_queues[deliver_worker_id].empty() ) break;
- opacket = opacket_queues[deliver_worker_id].front();
- opacket_queues[deliver_worker_id].pop();
- if( opacket_queues[deliver_worker_id].size() + 1 == out_slots )
- xsignal( &slot_av[deliver_worker_id] );
- if( opacket->data ) break;
- if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0;
- delete opacket; opacket = 0;
}
+ while( opacket_vector.empty() && num_working > 0 );
xunlock( &omutex );
- return opacket;
}
void finish() // grouper has no more packets to send
@@ -371,7 +367,7 @@ void loop_encode( const uint8_t * const ibuf, const int isize,
{
if( opos > max_packet_size )
internal_error( "opacket size exceeded in worker." );
- courier.collect_packet( new Opacket( obuf, opos ), worker_id );
+ courier.collect_packet( Opacket( obuf, opos ), worker_id );
opos = 0; obuf = new( std::nothrow ) uint8_t[max_packet_size];
if( !obuf ) { show_error( mem_msg2 ); exit_fail_mt(); }
if( LZ_compress_finished( encoder ) == 1 )
@@ -421,7 +417,7 @@ extern "C" void * cworker( void * arg )
{
if( !flushed ) // this lzip member is not empty
loop_encode( 0, 0, data, opos, courier, encoder, worker_id, true );
- courier.collect_packet( new Opacket, worker_id ); // end of member token
+ courier.collect_packet( Opacket(), worker_id ); // end of member token
flushed = true; delete ipacket; continue;
}
@@ -501,15 +497,19 @@ extern "C" void * cworker( void * arg )
*/
void muxer( Packet_courier & courier, const int outfd )
{
+ std::vector< Opacket > opacket_vector;
while( true )
{
- const Opacket * const opacket = courier.deliver_packet();
- if( !opacket ) break; // queue is empty. all workers exited
+ courier.deliver_packets( opacket_vector );
+ if( opacket_vector.empty() ) break; // queue is empty. all workers exited
- if( !writeblock_wrapper( outfd, opacket->data, opacket->size ) )
- exit_fail_mt();
- delete[] opacket->data;
- delete opacket;
+ for( unsigned i = 0; i < opacket_vector.size(); ++i )
+ {
+ Opacket & opacket = opacket_vector[i];
+ if( !writeblock_wrapper( outfd, opacket.data, opacket.size ) )
+ exit_fail_mt();
+ delete[] opacket.data;
+ }
}
}