/* Plzip - A parallel version of the lzip data compressor
Copyright (C) 2009 Laszlo Ersek.
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
*/
#define _FILE_OFFSET_BITS 64
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "main.h"
#include "plzip.h"
#ifndef LLONG_MAX
#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL
#endif
#ifndef LLONG_MIN
#define LLONG_MIN (-LLONG_MAX - 1LL)
#endif
#ifndef ULLONG_MAX
#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL
#endif
namespace {
long long in_size = 0;
long long out_size = 0;
void *(*mallocf)(size_t size);
void (*freef)(void *ptr);
void * trace_malloc(size_t size)
{
int save_errno = 0;
void * ret = malloc(size);
if( ret == 0 ) save_errno = errno;
fprintf(stderr, "malloc(%lu) == %p\n", (long unsigned)size, ret);
if( ret == 0 ) errno = save_errno;
return ret;
}
void trace_free(void *ptr)
{
fprintf(stderr, "free(%p)\n", ptr);
free(ptr);
}
void * xalloc(size_t size)
{
void *ret = (*mallocf)(size);
if( 0 == ret ) fail("(*mallocf)()", errno);
return ret;
}
struct S2w_blk /* Splitter to workers. */
{
unsigned long long id; /* Block serial number as read from infd. */
S2w_blk *next; /* Next in queue. */
size_t loaded; /* # of bytes in plain, may be 0 for 1st. */
unsigned char plain[1]; /* Data read from infd, allocated: sizeof_plain. */
};
struct S2w_q
{
Cond av_or_eof; /* New block available or splitter done. */
S2w_blk *tail, /* Splitter will append here. */
*head; /* Next ready worker shall compress this. */
int eof; /* Splitter done. */
};
void
s2w_q_init(S2w_q *s2w_q)
{
xinit(&s2w_q->av_or_eof);
s2w_q->tail = 0;
s2w_q->head = 0;
s2w_q->eof = 0;
}
void
s2w_q_uninit(S2w_q *s2w_q)
{
assert(0 != s2w_q->eof);
assert(0 == s2w_q->head);
assert(0 == s2w_q->tail);
xdestroy(&s2w_q->av_or_eof);
}
struct W2m_blk /* Workers to muxer data block. */
{
unsigned long long id; /* Block index as read from infd. */
W2m_blk *next; /* Next block in list (unordered). */
int produced; /* Number of bytes in compr. */
unsigned char compr[1]; /* Data to write to outfd, alloc.: sizeof_compr. */
};
struct W2m_q
{
Cond av_or_exit; /* New block available or all workers exited. */
unsigned long long needed; /* Block needed for resuming writing. */
W2m_blk *head; /* Block list (unordered). */
unsigned working; /* Number of workers still running. */
};
void
w2m_q_init(W2m_q *w2m_q, int num_workers)
{
assert(0 < num_workers);
xinit(&w2m_q->av_or_exit);
w2m_q->needed = 0;
w2m_q->head = 0;
w2m_q->working = num_workers;
}
void
w2m_q_uninit(W2m_q *w2m_q)
{
assert(0 == w2m_q->working);
assert(0 == w2m_q->head);
xdestroy(&w2m_q->av_or_exit);
}
struct M2s_q // Muxer to splitter queue
{
Cond av; // Free slot available
int num_free; // Number of free slots
M2s_q( const int slots )
{
xinit(&av);
num_free = slots;
}
~M2s_q() { xdestroy(&av); }
};
struct Splitter_arg
{
M2s_q *m2s_q;
S2w_q *s2w_q;
int infd;
int sizeof_plain;
size_t sizeof_S2w_blk;
};
void * splitter( void * arg )
{
const Splitter_arg & tmp = *(Splitter_arg *)arg;
M2s_q *m2s_q = tmp.m2s_q;
S2w_q *s2w_q = tmp.s2w_q;
const int infd = tmp.infd;
const int sizeof_plain = tmp.sizeof_plain;
const size_t sizeof_s2w_blk = tmp.sizeof_S2w_blk;
for( unsigned long long id = 0; ; ++id )
{
/* Grab a free slot. */
xlock_pred(&m2s_q->av);
while( m2s_q->num_free == 0 ) xwait(&m2s_q->av);
--m2s_q->num_free;
xunlock(&m2s_q->av);
S2w_blk * s2w_blk = (S2w_blk *)xalloc(sizeof_s2w_blk);
/* Fill block. */
const int rd = readblock( infd, (char *)s2w_blk->plain, sizeof_plain );
if( rd != sizeof_plain && errno ) fail("read()", errno);
if( rd == 0 && id != 0 )
{
/* EOF on first read, but not for first input block. */
(*freef)(s2w_blk);
xlock(&m2s_q->av);
++m2s_q->num_free;
xunlock(&m2s_q->av);
}
else
{
s2w_blk->id = id;
s2w_blk->next = 0;
s2w_blk->loaded = rd;
in_size += rd;
}
xlock(&s2w_q->av_or_eof);
if( s2w_q->head == 0 ) xbroadcast(&s2w_q->av_or_eof);
if( rd > 0 || id == 0 )
{
if( s2w_q->tail == 0 ) s2w_q->head = s2w_blk;
else s2w_q->tail->next = s2w_blk;
s2w_q->tail = s2w_blk;
}
s2w_q->eof = ( rd == 0 );
xunlock(&s2w_q->av_or_eof);
if( rd <= 0 ) break;
}
return 0;
}
void work_lz_rd(W2m_blk *w2m_blk, const int sizeof_compr, LZ_Encoder * lz)
{
int rd;
assert(w2m_blk->produced < sizeof_compr);
rd = LZ_compress_read(lz, w2m_blk->compr + w2m_blk->produced,
sizeof_compr - w2m_blk->produced);
if( -1 == rd ) {
show_error( "LZ_compress_read() failed." );
fatal();
}
w2m_blk->produced += rd;
}
void work_compr( const int dictionary_size, const int match_len_limit,
S2w_blk *s2w_blk, W2m_q *w2m_q,
const int sizeof_compr, const size_t sizeof_w2m_blk )
{
W2m_blk *w2m_blk;
assert(0 < s2w_blk->loaded || 0 == s2w_blk->id);
w2m_blk = (W2m_blk *)xalloc(sizeof_w2m_blk);
/* Single member compression. Settings like with lzip -6. */
{
LZ_Encoder * lz;
size_t written;
lz = LZ_compress_open( dictionary_size, match_len_limit, LLONG_MAX );
if( LZ_ok != LZ_compress_errno(lz) ) {
show_error( "LZ_compress_open() failed." );
fatal();
}
written = 0;
w2m_blk->produced = 0;
while( written < s2w_blk->loaded ) {
int wr;
wr = LZ_compress_write(lz, s2w_blk->plain + written,
s2w_blk->loaded - written);
if( -1 == wr ) {
show_error( "LZ_compress_write() failed." );
fatal();
}
written += (size_t)wr;
work_lz_rd(w2m_blk, sizeof_compr, lz);
}
if( -1 == LZ_compress_finish(lz) ) {
show_error( "LZ_compress_finish() failed." );
fatal();
}
while( !LZ_compress_finished(lz) ) {
work_lz_rd(w2m_blk, sizeof_compr, lz);
}
if( -1 == LZ_compress_close(lz) ) {
show_error( "LZ_compress_close() failed." );
fatal();
}
}
w2m_blk->id = s2w_blk->id;
/* Push block to muxer. */
xlock(&w2m_q->av_or_exit);
w2m_blk->next = w2m_q->head;
w2m_q->head = w2m_blk;
if( w2m_blk->id == w2m_q->needed ) {
xsignal(&w2m_q->av_or_exit);
}
xunlock(&w2m_q->av_or_exit);
}
struct Worker_arg
{
int dictionary_size;
int match_len_limit;
S2w_q *s2w_q;
W2m_q *w2m_q;
int sizeof_compr;
size_t sizeof_W2m_blk;
};
void * worker( void * arg )
{
const Worker_arg & tmp = *(Worker_arg *)arg;
const int dictionary_size = tmp.dictionary_size;
const int match_len_limit = tmp.match_len_limit;
S2w_q *s2w_q = tmp.s2w_q;
W2m_q *w2m_q = tmp.w2m_q;
const int sizeof_compr = tmp.sizeof_compr;
const size_t sizeof_w2m_blk = tmp.sizeof_W2m_blk;
while( true )
{
S2w_blk *s2w_blk;
/* Grab a block to work on. */
xlock_pred(&s2w_q->av_or_eof);
while( 0 == s2w_q->head && !s2w_q->eof ) {
xwait(&s2w_q->av_or_eof);
}
if( 0 == s2w_q->head ) {
/* No blocks available and splitter exited. */
xunlock(&s2w_q->av_or_eof);
break;
}
s2w_blk = s2w_q->head;
s2w_q->head = s2w_blk->next;
if( 0 == s2w_q->head ) {
s2w_q->tail = 0;
}
xunlock(&s2w_q->av_or_eof);
work_compr( dictionary_size, match_len_limit, s2w_blk, w2m_q,
sizeof_compr, sizeof_w2m_blk );
(*freef)(s2w_blk);
}
/* Notify muxer when last worker exits. */
xlock(&w2m_q->av_or_exit);
if( 0 == --w2m_q->working && 0 == w2m_q->head ) {
xsignal(&w2m_q->av_or_exit);
}
xunlock(&w2m_q->av_or_exit);
return 0;
}
void muxer_loop( W2m_q *w2m_q, M2s_q *m2s_q, const int num_slots, const int outfd )
{
unsigned long long needed_id = 0;
std::vector< W2m_blk * > circular_buffer( num_slots, (W2m_blk *)0 );
xlock_pred(&w2m_q->av_or_exit);
while( true )
{
/* Grab all available compressed blocks in one step. */
while( w2m_q->head == 0 && w2m_q->working > 0 )
xwait(&w2m_q->av_or_exit);
if( w2m_q->head == 0 ) break; // queue is empty. all workers exited
W2m_blk * w2m_blk = w2m_q->head;
w2m_q->head = 0;
xunlock(&w2m_q->av_or_exit);
// Merge blocks fetched this time into circular buffer
do {
// id collision shouldn't happen
assert( circular_buffer[w2m_blk->id%num_slots] == 0 );
circular_buffer[w2m_blk->id%num_slots] = w2m_blk;
W2m_blk * next = w2m_blk->next;
w2m_blk->next = 0;
w2m_blk = next;
} while( w2m_blk != 0 );
// Write out initial continuous sequence of reordered blocks
while( true )
{
W2m_blk * needed_w2m_blk = circular_buffer[needed_id%num_slots];
if( needed_w2m_blk == 0 ) break;
out_size += needed_w2m_blk->produced;
if( outfd >= 0 )
{
const int wr = writeblock( outfd, (char *)needed_w2m_blk->compr, needed_w2m_blk->produced );
if( wr != needed_w2m_blk->produced ) fail("write()", errno);
}
circular_buffer[needed_id%num_slots] = 0;
++needed_id;
xlock(&m2s_q->av);
if( 0 == m2s_q->num_free++ ) xsignal(&m2s_q->av);
xunlock(&m2s_q->av);
(*freef)(needed_w2m_blk);
}
xlock_pred(&w2m_q->av_or_exit);
w2m_q->needed = needed_id;
}
xunlock(&w2m_q->av_or_exit);
for( int i = 0; i < num_slots; ++i )
if( circular_buffer[i] != 0 )
{ show_error( "circular buffer not empty" ); fatal(); }
}
} // end namespace
void * muxer( void * arg )
{
const Muxer_arg & tmp = *(Muxer_arg *)arg;
const int dictionary_size = tmp.dictionary_size;
const int match_len_limit = tmp.match_len_limit;
const int num_workers = tmp.num_workers;
const int num_slots = tmp.num_slots;
const int debug_level = tmp.debug_level;
const int infd = tmp.infd;
const int outfd = tmp.outfd;
S2w_q s2w_q;
W2m_q w2m_q;
if( debug_level & 2 ) { mallocf = trace_malloc; freef = trace_free; }
else { mallocf = malloc; freef = free; }
s2w_q_init(&s2w_q);
w2m_q_init(&w2m_q, num_workers);
M2s_q m2s_q( num_slots );
Splitter_arg splitter_arg;
splitter_arg.m2s_q = &m2s_q;
splitter_arg.s2w_q = &s2w_q;
splitter_arg.infd = infd;
splitter_arg.sizeof_plain = 2 * std::max( 65536, dictionary_size );
splitter_arg.sizeof_S2w_blk = sizeof(S2w_blk) + splitter_arg.sizeof_plain - 1;
pthread_t splitter_thread;
xcreate(&splitter_thread, splitter, &splitter_arg);
Worker_arg worker_arg;
worker_arg.dictionary_size = dictionary_size;
worker_arg.match_len_limit = match_len_limit;
worker_arg.s2w_q = &s2w_q;
worker_arg.w2m_q = &w2m_q;
worker_arg.sizeof_compr = 6 + 20 + ( ( splitter_arg.sizeof_plain / 8 ) * 9 );
worker_arg.sizeof_W2m_blk = sizeof(W2m_blk) + worker_arg.sizeof_compr - 1;
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
if( worker_threads == 0 ) fail("not enough memory.", errno);
for( int i = 0; i < num_workers; ++i )
xcreate(&worker_threads[i], worker, &worker_arg);
muxer_loop( &w2m_q, &m2s_q, num_slots, outfd );
for( int i = num_workers - 1; i >= 0; --i )
xjoin(worker_threads[i]);
delete[] worker_threads; worker_threads = 0;
xjoin(splitter_thread);
if( verbosity >= 1 )
{
if( in_size <= 0 || out_size <= 0 )
std::fprintf( stderr, "no data compressed.\n" );
else
std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, "
"%5.2f%% saved, %lld in, %lld out.\n",
(double)in_size / out_size,
( 8.0 * out_size ) / in_size,
100.0 * ( 1.0 - ( (double)out_size / in_size ) ),
in_size, out_size );
}
const int FW = ( sizeof(long unsigned) * 8 ) / 3 + 1;
if( ( debug_level & 1 ) && 0 > fprintf(stderr,
"any worker tried to consume from splitter: %*lu\n"
"any worker stalled : %*lu\n"
"muxer tried to consume from workers : %*lu\n"
"muxer stalled : %*lu\n"
"splitter tried to consume from muxer : %*lu\n"
"splitter stalled : %*lu\n",
FW, s2w_q.av_or_eof.ccount,
FW, s2w_q.av_or_eof.wcount,
FW, w2m_q.av_or_exit.ccount,
FW, w2m_q.av_or_exit.wcount,
FW, m2s_q.av.ccount,
FW, m2s_q.av.wcount) )
{
fatal();
}
assert( m2s_q.num_free == num_slots );
w2m_q_uninit(&w2m_q);
s2w_q_uninit(&s2w_q);
xraise(SIGUSR2);
return 0;
}