summaryrefslogtreecommitdiffstats
path: root/plzip.cc
diff options
context:
space:
mode:
Diffstat (limited to 'plzip.cc')
-rw-r--r--plzip.cc684
1 files changed, 272 insertions, 412 deletions
diff --git a/plzip.cc b/plzip.cc
index f110437..7ac8144 100644
--- a/plzip.cc
+++ b/plzip.cc
@@ -1,6 +1,6 @@
/* Plzip - A parallel version of the lzip data compressor
Copyright (C) 2009 Laszlo Ersek.
- Copyright (C) 2009 Antonio Diaz Diaz.
+ Copyright (C) 2009, 2010 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -18,27 +18,74 @@
#define _FILE_OFFSET_BITS 64
+#include <algorithm>
#include <cassert>
#include <cerrno>
#include <climits>
#include <csignal>
#include <cstdio>
#include <cstdlib>
+#include <vector>
#include <stdint.h>
#include <unistd.h>
#include <lzlib.h>
#include "main.h"
#include "plzip.h"
-#include "lacos_rbtree.h"
+
+#ifndef LLONG_MAX
+#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL
+#endif
+#ifndef LLONG_MIN
+#define LLONG_MIN (-LLONG_MAX - 1LL)
+#endif
+#ifndef ULLONG_MAX
+#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL
+#endif
+
+
+namespace {
+
+long long in_size = 0;
+long long out_size = 0;
+
+void *(*mallocf)(size_t size);
+void (*freef)(void *ptr);
+
+
+void * trace_malloc(size_t size)
+{
+ int save_errno = 0;
+
+ void * ret = malloc(size);
+ if( ret == 0 ) save_errno = errno;
+ fprintf(stderr, "malloc(%lu) == %p\n", (long unsigned)size, ret);
+ if( ret == 0 ) errno = save_errno;
+ return ret;
+}
+
+
+void trace_free(void *ptr)
+{
+ fprintf(stderr, "free(%p)\n", ptr);
+ free(ptr);
+}
+
+
+void * xalloc(size_t size)
+{
+ void *ret = (*mallocf)(size);
+ if( 0 == ret ) fail("(*mallocf)()", errno);
+ return ret;
+}
struct S2w_blk /* Splitter to workers. */
{
- uint64_t id; /* Block serial number as read from infd. */
+ unsigned long long id; /* Block serial number as read from infd. */
S2w_blk *next; /* Next in queue. */
size_t loaded; /* # of bytes in plain, may be 0 for 1st. */
- char unsigned plain[1]; /* Data read from infd, allocated: sizeof_plain. */
+ unsigned char plain[1]; /* Data read from infd, allocated: sizeof_plain. */
};
@@ -51,7 +98,7 @@ struct S2w_q
};
-static void
+void
s2w_q_init(S2w_q *s2w_q)
{
xinit(&s2w_q->av_or_eof);
@@ -61,7 +108,7 @@ s2w_q_init(S2w_q *s2w_q)
}
-static void
+void
s2w_q_uninit(S2w_q *s2w_q)
{
assert(0 != s2w_q->eof);
@@ -71,251 +118,169 @@ s2w_q_uninit(S2w_q *s2w_q)
}
-struct W2m_blk /* Workers to muxer. */
+struct W2m_blk /* Workers to muxer data block. */
{
- uint64_t id; /* Block index as read from infd. */
+ unsigned long long id; /* Block index as read from infd. */
W2m_blk *next; /* Next block in list (unordered). */
- size_t produced; /* Number of bytes in compr. */
- char unsigned compr[1]; /* Data to write to outfd, alloc.: sizeof_compr. */
+ int produced; /* Number of bytes in compr. */
+ unsigned char compr[1]; /* Data to write to outfd, alloc.: sizeof_compr. */
};
-static int
-w2m_blk_cmp(const void *v_a, const void *v_b)
-{
- uint64_t a,
- b;
-
- a = ((const W2m_blk *)v_a)->id;
- b = ((const W2m_blk *)v_b)->id;
-
- return
- a < b ? -1
- : a > b ? 1
- : 0;
-}
-
-
struct W2m_q
{
Cond av_or_exit; /* New block available or all workers exited. */
- uint64_t needed; /* Block needed for resuming writing. */
+ unsigned long long needed; /* Block needed for resuming writing. */
W2m_blk *head; /* Block list (unordered). */
unsigned working; /* Number of workers still running. */
};
-static void
-w2m_q_init(W2m_q *w2m_q, unsigned num_worker)
+void
+w2m_q_init(W2m_q *w2m_q, int num_workers)
{
- assert(0u < num_worker);
+ assert(0 < num_workers);
xinit(&w2m_q->av_or_exit);
- w2m_q->needed = 0u;
+ w2m_q->needed = 0;
w2m_q->head = 0;
- w2m_q->working = num_worker;
+ w2m_q->working = num_workers;
}
-static void
+void
w2m_q_uninit(W2m_q *w2m_q)
{
- assert(0u == w2m_q->working);
+ assert(0 == w2m_q->working);
assert(0 == w2m_q->head);
xdestroy(&w2m_q->av_or_exit);
}
-struct M2s_q /* Muxer to splitter. */
-{
- Cond av; /* Free slot available. */
- unsigned num_free; /* Number of free slots. */
-};
-
-
-static void
-m2s_q_init(M2s_q *m2s_q, unsigned num_free)
-{
- assert(0u < num_free);
- xinit(&m2s_q->av);
- m2s_q->num_free = num_free;
-}
+struct M2s_q // Muxer to splitter queue
+ {
+ Cond av; // Free slot available
+ int num_free; // Number of free slots
+ M2s_q( const int slots )
+ {
+ xinit(&av);
+ num_free = slots;
+ }
-static void
-m2s_q_uninit(M2s_q *m2s_q, unsigned num_free)
-{
- assert(m2s_q->num_free == num_free);
- xdestroy(&m2s_q->av);
-}
+ ~M2s_q() { xdestroy(&av); }
+ };
-static void
-split(M2s_q *m2s_q, S2w_q *s2w_q, int infd,
- const size_t sizeof_plain, const size_t sizeof_s2w_blk)
-{
- uint64_t id;
- ssize_t rd;
+struct Splitter_arg
+ {
+ M2s_q *m2s_q;
+ S2w_q *s2w_q;
+ int infd;
+ int sizeof_plain;
+ size_t sizeof_S2w_blk;
+ };
- id = 0u;
- do {
- S2w_blk *s2w_blk;
- size_t vacant;
+void * splitter( void * arg )
+ {
+ const Splitter_arg & tmp = *(Splitter_arg *)arg;
+ M2s_q *m2s_q = tmp.m2s_q;
+ S2w_q *s2w_q = tmp.s2w_q;
+ const int infd = tmp.infd;
+ const int sizeof_plain = tmp.sizeof_plain;
+ const size_t sizeof_s2w_blk = tmp.sizeof_S2w_blk;
+
+ for( unsigned long long id = 0; ; ++id )
+ {
/* Grab a free slot. */
xlock_pred(&m2s_q->av);
- while (0u == m2s_q->num_free) {
- xwait(&m2s_q->av);
- }
+ while( m2s_q->num_free == 0 ) xwait(&m2s_q->av);
--m2s_q->num_free;
xunlock(&m2s_q->av);
- s2w_blk = (S2w_blk *)xalloc(sizeof_s2w_blk);
+ S2w_blk * s2w_blk = (S2w_blk *)xalloc(sizeof_s2w_blk);
/* Fill block. */
- vacant = sizeof_plain;
- do {
- rd = read(infd, s2w_blk->plain + (sizeof_plain - vacant),
- vacant > (size_t)SSIZE_MAX ? (size_t)SSIZE_MAX : vacant);
- } while (0 < rd && 0u < (vacant -= (size_t)rd));
+ const int rd = readblock( infd, (char *)s2w_blk->plain, sizeof_plain );
+ if( rd != sizeof_plain && errno ) fail("read()", errno);
- /* Read error. */
- if (-1 == rd) {
- fail("read()", errno);
- }
-
- if (sizeof_plain == vacant && 0u < id) {
+ if( rd == 0 && id != 0 )
+ {
/* EOF on first read, but not for first input block. */
(*freef)(s2w_blk);
xlock(&m2s_q->av);
++m2s_q->num_free;
xunlock(&m2s_q->av);
- }
- else {
+ }
+ else
+ {
s2w_blk->id = id;
s2w_blk->next = 0;
- s2w_blk->loaded = sizeof_plain - vacant;
- }
-
- /* We either push a block, or set EOF, or both. */
- assert(sizeof_plain > vacant || 0 == rd);
+ s2w_blk->loaded = rd;
+ in_size += rd;
+ }
xlock(&s2w_q->av_or_eof);
- if (0 == s2w_q->head) {
- xbroadcast(&s2w_q->av_or_eof);
- }
+ if( s2w_q->head == 0 ) xbroadcast(&s2w_q->av_or_eof);
- if (sizeof_plain > vacant || 0u == id) {
- if (0 == s2w_q->tail) {
- s2w_q->head = s2w_blk;
- }
- else {
- s2w_q->tail->next = s2w_blk;
- }
+ if( rd > 0 || id == 0 )
+ {
+ if( s2w_q->tail == 0 ) s2w_q->head = s2w_blk;
+ else s2w_q->tail->next = s2w_blk;
s2w_q->tail = s2w_blk;
- }
- s2w_q->eof = (0 == rd);
+ }
+ s2w_q->eof = ( rd == 0 );
xunlock(&s2w_q->av_or_eof);
- /*
- If we didn't push a block, then this is bogus, but then we did set EOF,
- so it doesn't matter, because we'll leave immediately.
- */
- ++id;
- } while (0 < rd);
-}
-
-
-struct Split_arg
-{
- M2s_q *m2s_q;
- S2w_q *s2w_q;
- int infd;
- size_t sizeof_plain,
- sizeof_S2w_blk;
-};
-
-
-static void *
-split_wrap(void *v_split_arg)
-{
- Split_arg *split_arg = (Split_arg *)v_split_arg;
-
- split(
- split_arg->m2s_q,
- split_arg->s2w_q,
- split_arg->infd,
- split_arg->sizeof_plain,
- split_arg->sizeof_S2w_blk
- );
+ if( rd <= 0 ) break;
+ }
return 0;
-}
+ }
-static void
-work_lz_rd(W2m_blk *w2m_blk, const size_t sizeof_compr, void *lz)
+void work_lz_rd(W2m_blk *w2m_blk, const int sizeof_compr, LZ_Encoder * lz)
{
int rd;
assert(w2m_blk->produced < sizeof_compr);
rd = LZ_compress_read(lz, w2m_blk->compr + w2m_blk->produced,
sizeof_compr - w2m_blk->produced);
- if (-1 == rd) {
+ if( -1 == rd ) {
show_error( "LZ_compress_read() failed." );
fatal();
}
- w2m_blk->produced += (size_t)rd;
+ w2m_blk->produced += rd;
}
-struct Compr_lev
-{
- unsigned dict_size,
- mx_match;
-};
-
-
-static const Compr_lev compr_lev[] = {
- { 1u * 1024u * 1024u, 10u },
- { 1u * 1024u * 1024u, 12u },
- { 1u * 1024u * 1024u, 17u },
- { 2u * 1024u * 1024u, 26u },
- { 4u * 1024u * 1024u, 44u },
- { 8u * 1024u * 1024u, 80u },
- { 16u * 1024u * 1024u, 108u },
- { 16u * 1024u * 1024u, 163u },
- { 32u * 1024u * 1024u, 273u }
-};
-
-
-static void
-work_compr(S2w_blk *s2w_blk, W2m_q *w2m_q, unsigned clidx,
- const size_t sizeof_compr, const size_t sizeof_w2m_blk)
+void work_compr( const int dictionary_size, const int match_len_limit,
+ S2w_blk *s2w_blk, W2m_q *w2m_q,
+ const int sizeof_compr, const size_t sizeof_w2m_blk )
{
W2m_blk *w2m_blk;
- assert(0u < s2w_blk->loaded || 0u == s2w_blk->id);
+ assert(0 < s2w_blk->loaded || 0 == s2w_blk->id);
w2m_blk = (W2m_blk *)xalloc(sizeof_w2m_blk);
/* Single member compression. Settings like with lzip -6. */
{
- void *lz;
+ LZ_Encoder * lz;
size_t written;
- lz = LZ_compress_open(compr_lev[clidx].dict_size,
- compr_lev[clidx].mx_match, (uint64_t)-1 / 2u);
- if (LZ_ok != LZ_compress_errno(lz)) {
+ lz = LZ_compress_open( dictionary_size, match_len_limit, LLONG_MAX );
+ if( LZ_ok != LZ_compress_errno(lz) ) {
show_error( "LZ_compress_open() failed." );
fatal();
}
- written = 0u;
- w2m_blk->produced = 0u;
- while (written < s2w_blk->loaded) {
+ written = 0;
+ w2m_blk->produced = 0;
+ while( written < s2w_blk->loaded ) {
int wr;
wr = LZ_compress_write(lz, s2w_blk->plain + written,
s2w_blk->loaded - written);
- if (-1 == wr) {
+ if( -1 == wr ) {
show_error( "LZ_compress_write() failed." );
fatal();
}
@@ -324,16 +289,16 @@ work_compr(S2w_blk *s2w_blk, W2m_q *w2m_q, unsigned clidx,
work_lz_rd(w2m_blk, sizeof_compr, lz);
}
- if (-1 == LZ_compress_finish(lz)) {
+ if( -1 == LZ_compress_finish(lz) ) {
show_error( "LZ_compress_finish() failed." );
fatal();
}
- while (!LZ_compress_finished(lz)) {
+ while( !LZ_compress_finished(lz) ) {
work_lz_rd(w2m_blk, sizeof_compr, lz);
}
- if (-1 == LZ_compress_close(lz)) {
+ if( -1 == LZ_compress_close(lz) ) {
show_error( "LZ_compress_close() failed." );
fatal();
}
@@ -345,290 +310,201 @@ work_compr(S2w_blk *s2w_blk, W2m_q *w2m_q, unsigned clidx,
xlock(&w2m_q->av_or_exit);
w2m_blk->next = w2m_q->head;
w2m_q->head = w2m_blk;
- if (w2m_blk->id == w2m_q->needed) {
+ if( w2m_blk->id == w2m_q->needed ) {
xsignal(&w2m_q->av_or_exit);
}
xunlock(&w2m_q->av_or_exit);
}
-static void
-work(S2w_q *s2w_q, W2m_q *w2m_q, unsigned clidx,
- const size_t sizeof_compr, const size_t sizeof_w2m_blk)
-{
- for (;;) {
+struct Worker_arg
+ {
+ int dictionary_size;
+ int match_len_limit;
+ S2w_q *s2w_q;
+ W2m_q *w2m_q;
+ int sizeof_compr;
+ size_t sizeof_W2m_blk;
+ };
+
+
+void * worker( void * arg )
+ {
+ const Worker_arg & tmp = *(Worker_arg *)arg;
+ const int dictionary_size = tmp.dictionary_size;
+ const int match_len_limit = tmp.match_len_limit;
+ S2w_q *s2w_q = tmp.s2w_q;
+ W2m_q *w2m_q = tmp.w2m_q;
+ const int sizeof_compr = tmp.sizeof_compr;
+ const size_t sizeof_w2m_blk = tmp.sizeof_W2m_blk;
+
+ while( true )
+ {
S2w_blk *s2w_blk;
/* Grab a block to work on. */
xlock_pred(&s2w_q->av_or_eof);
- while (0 == s2w_q->head && !s2w_q->eof) {
+ while( 0 == s2w_q->head && !s2w_q->eof ) {
xwait(&s2w_q->av_or_eof);
}
- if (0 == s2w_q->head) {
+ if( 0 == s2w_q->head ) {
/* No blocks available and splitter exited. */
xunlock(&s2w_q->av_or_eof);
break;
}
s2w_blk = s2w_q->head;
s2w_q->head = s2w_blk->next;
- if (0 == s2w_q->head) {
+ if( 0 == s2w_q->head ) {
s2w_q->tail = 0;
}
xunlock(&s2w_q->av_or_eof);
- work_compr(s2w_blk, w2m_q, clidx, sizeof_compr, sizeof_w2m_blk);
+ work_compr( dictionary_size, match_len_limit, s2w_blk, w2m_q,
+ sizeof_compr, sizeof_w2m_blk );
(*freef)(s2w_blk);
}
/* Notify muxer when last worker exits. */
xlock(&w2m_q->av_or_exit);
- if (0u == --w2m_q->working && 0 == w2m_q->head) {
+ if( 0 == --w2m_q->working && 0 == w2m_q->head ) {
xsignal(&w2m_q->av_or_exit);
}
xunlock(&w2m_q->av_or_exit);
-}
-
-
-struct Work_arg
-{
- S2w_q *s2w_q;
- W2m_q *w2m_q;
- unsigned clidx;
- size_t sizeof_compr,
- sizeof_W2m_blk;
-};
-
-
-static void *
-work_wrap(void *v_work_arg)
-{
- Work_arg *work_arg;
-
- work_arg = (Work_arg *)v_work_arg;
- work(
- work_arg->s2w_q,
- work_arg->w2m_q,
- work_arg->clidx,
- work_arg->sizeof_compr,
- work_arg->sizeof_W2m_blk
- );
return 0;
-}
-
-
-static void *
-reord_alloc(size_t size, void *)
-{
- return (*mallocf)(size);
-}
-
-
-static void
-reord_dealloc(void *ptr, void *)
-{
- (*freef)(ptr);
-}
-
-
-static void
-mux_write(M2s_q *m2s_q, lacos_rbtree_node **reord,
- uint64_t *reord_needed, int outfd)
-{
- assert(0 != *reord);
-
- /*
- Go on until the tree becomes empty or the next block is found to be
- missing.
- */
- do {
- lacos_rbtree_node *reord_head;
- W2m_blk *reord_w2m_blk;
-
- reord_head = lacos_rbtree_min(*reord);
- assert(0 != reord_head);
-
- reord_w2m_blk = (W2m_blk *)(*(void **)reord_head);
- if (reord_w2m_blk->id != *reord_needed) {
- break;
- }
-
- /* Write out "reord_w2m_blk". */
- if (-1 != outfd) {
- char unsigned *cp;
-
- cp = reord_w2m_blk->compr;
- while (reord_w2m_blk->produced > 0u) {
- ssize_t written;
-
- written = write(outfd, cp, reord_w2m_blk->produced > (size_t)SSIZE_MAX
- ? (size_t)SSIZE_MAX : reord_w2m_blk->produced);
- if (-1 == written) {
- fail("write()", errno);
- }
-
- reord_w2m_blk->produced -= (size_t)written;
- cp += written;
- }
- }
-
- ++*reord_needed;
-
- xlock(&m2s_q->av);
- if (0u == m2s_q->num_free++) {
- xsignal(&m2s_q->av);
- }
- xunlock(&m2s_q->av);
-
- lacos_rbtree_delete(
- reord, /* new_root */
- reord_head, /* old_node */
- 0, /* old_data */
- reord_dealloc, /* dealloc() */
- 0 /* alloc_ctl */
- );
-
- /* Release "reord_w2m_blk". */
- (*freef)(reord_w2m_blk);
- } while (0 != *reord);
-}
-
+ }
-static void
-mux(W2m_q *w2m_q, M2s_q *m2s_q, int outfd)
-{
- lacos_rbtree_node *reord;
- uint64_t reord_needed;
- reord = 0;
- reord_needed = 0u;
+void muxer_loop( W2m_q *w2m_q, M2s_q *m2s_q, const int num_slots, const int outfd )
+ {
+ unsigned long long needed_id = 0;
+ std::vector< W2m_blk * > circular_buffer( num_slots, (W2m_blk *)0 );
xlock_pred(&w2m_q->av_or_exit);
- for (;;) {
- W2m_blk *w2m_blk;
-
+ while( true )
+ {
/* Grab all available compressed blocks in one step. */
- while (0 == w2m_q->head && 0u < w2m_q->working) {
+ while( w2m_q->head == 0 && w2m_q->working > 0 )
xwait(&w2m_q->av_or_exit);
- }
- if (0 == w2m_q->head) {
- /* w2m_q is empty and all workers exited */
- break;
- }
+ if( w2m_q->head == 0 ) break; // queue is empty. all workers exited
- w2m_blk = w2m_q->head;
+ W2m_blk * w2m_blk = w2m_q->head;
w2m_q->head = 0;
xunlock(&w2m_q->av_or_exit);
- /* Merge blocks fetched this time into tree. */
+ // Merge blocks fetched this time into circular buffer
do {
- lacos_rbtree_node *new_node;
- W2m_blk *next;
-
- if (-1 == lacos_rbtree_insert(
- &reord, /* new_root */
- &new_node, /* new_node */
- w2m_blk, /* new_data */
- w2m_blk_cmp, /* cmp() */
- reord_alloc, /* alloc() */
- 0 /* alloc_ctl */
- )) {
- /* id collision shouldn't happen */
- assert(0 == new_node);
- show_error( "lacos_rbtree_insert(): out of memory." );
- fatal();
- }
-
- next = w2m_blk->next;
+ // id collision shouldn't happen
+ assert( circular_buffer[w2m_blk->id%num_slots] == 0 );
+ circular_buffer[w2m_blk->id%num_slots] = w2m_blk;
+ W2m_blk * next = w2m_blk->next;
w2m_blk->next = 0;
w2m_blk = next;
- } while (0 != w2m_blk);
+ } while( w2m_blk != 0 );
+
+ // Write out initial continuous sequence of reordered blocks
+ while( true )
+ {
+ W2m_blk * needed_w2m_blk = circular_buffer[needed_id%num_slots];
+ if( needed_w2m_blk == 0 ) break;
+
+ out_size += needed_w2m_blk->produced;
+
+ if( outfd >= 0 )
+ {
+ const int wr = writeblock( outfd, (char *)needed_w2m_blk->compr, needed_w2m_blk->produced );
+ if( wr != needed_w2m_blk->produced ) fail("write()", errno);
+ }
+ circular_buffer[needed_id%num_slots] = 0;
+ ++needed_id;
+
+ xlock(&m2s_q->av);
+ if( 0 == m2s_q->num_free++ ) xsignal(&m2s_q->av);
+ xunlock(&m2s_q->av);
- /* Write out initial continuous sequence of reordered blocks. */
- mux_write(m2s_q, &reord, &reord_needed, outfd);
+ (*freef)(needed_w2m_blk);
+ }
xlock_pred(&w2m_q->av_or_exit);
- w2m_q->needed = reord_needed;
- }
+ w2m_q->needed = needed_id;
+ }
xunlock(&w2m_q->av_or_exit);
- assert(0 == reord);
-}
+ for( int i = 0; i < num_slots; ++i )
+ if( circular_buffer[i] != 0 )
+ { show_error( "circular buffer not empty" ); fatal(); }
+ }
+} // end namespace
-static void
-plzip(unsigned num_worker, unsigned num_slot, unsigned clidx, int print_cctrs,
- int infd, int outfd)
-{
+
+void * muxer( void * arg )
+ {
+ const Muxer_arg & tmp = *(Muxer_arg *)arg;
+ const int dictionary_size = tmp.dictionary_size;
+ const int match_len_limit = tmp.match_len_limit;
+ const int num_workers = tmp.num_workers;
+ const int num_slots = tmp.num_slots;
+ const int debug_level = tmp.debug_level;
+ const int infd = tmp.infd;
+ const int outfd = tmp.outfd;
S2w_q s2w_q;
W2m_q w2m_q;
- M2s_q m2s_q;
- Split_arg split_arg;
- pthread_t splitter;
- Work_arg work_arg;
- pthread_t *worker;
- unsigned i;
- assert(clidx < sizeof compr_lev / sizeof compr_lev[0]);
+ if( debug_level & 2 ) { mallocf = trace_malloc; freef = trace_free; }
+ else { mallocf = malloc; freef = free; }
s2w_q_init(&s2w_q);
- w2m_q_init(&w2m_q, num_worker);
- m2s_q_init(&m2s_q, num_slot);
-
-#define SIZES(struc, arr, arsz_unsigned, arg) \
- do { \
- unsigned tmp; \
-\
- tmp = arsz_unsigned; \
- if ((size_t)-1 < tmp) { \
- show_error( "size_t overflow in sizeof_" #arr "." ); \
- fatal(); \
- } \
- arg ## _arg . sizeof_ ## arr = tmp; \
-\
- if ((size_t)-1 - sizeof(struc) \
- < arg ## _arg . sizeof_ ## arr - (size_t)1) { \
- show_error( "size_t overflow in sizeof_" #struc "." ); \
- fatal(); \
- } \
- arg ## _arg . sizeof_ ## struc = sizeof(struc) \
- + (arg ## _arg . sizeof_ ## arr - (size_t)1); \
- } while (0)
-
- split_arg.m2s_q = &m2s_q;
- split_arg.s2w_q = &s2w_q;
- split_arg.infd = infd;
- SIZES(S2w_blk, plain, 2u * compr_lev[clidx].dict_size, split);
- xcreate(&splitter, split_wrap, &split_arg);
-
- work_arg.s2w_q = &s2w_q;
- work_arg.w2m_q = &w2m_q;
- work_arg.clidx = clidx;
- SIZES(W2m_blk, compr, (4u + 1u + 1u)
- + ((unsigned)split_arg.sizeof_plain * 9u + 7u) / 8u + (4u + 8u + 8u),
- work);
-
-#undef SIZES
-
- assert(0u < num_worker);
- assert((size_t)-1 / sizeof *worker >= num_worker);
- worker = (pthread_t *)xalloc(num_worker * sizeof *worker);
- for (i = 0u; i < num_worker; ++i) {
- xcreate(&worker[i], work_wrap, &work_arg);
- }
-
- mux(&w2m_q, &m2s_q, outfd);
-
- i = num_worker;
- do {
- xjoin(worker[--i]);
- } while (0u < i);
- (*freef)(worker);
-
- xjoin(splitter);
+ w2m_q_init(&w2m_q, num_workers);
+ M2s_q m2s_q( num_slots );
+
+
+ Splitter_arg splitter_arg;
+ splitter_arg.m2s_q = &m2s_q;
+ splitter_arg.s2w_q = &s2w_q;
+ splitter_arg.infd = infd;
+ splitter_arg.sizeof_plain = 2 * std::max( 65536, dictionary_size );
+ splitter_arg.sizeof_S2w_blk = sizeof(S2w_blk) + splitter_arg.sizeof_plain - 1;
+
+ pthread_t splitter_thread;
+ xcreate(&splitter_thread, splitter, &splitter_arg);
+
+ Worker_arg worker_arg;
+ worker_arg.dictionary_size = dictionary_size;
+ worker_arg.match_len_limit = match_len_limit;
+ worker_arg.s2w_q = &s2w_q;
+ worker_arg.w2m_q = &w2m_q;
+ worker_arg.sizeof_compr = 6 + 20 + ( ( splitter_arg.sizeof_plain / 8 ) * 9 );
+ worker_arg.sizeof_W2m_blk = sizeof(W2m_blk) + worker_arg.sizeof_compr - 1;
+
+ pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
+ if( worker_threads == 0 ) fail("not enough memory.", errno);
+ for( int i = 0; i < num_workers; ++i )
+ xcreate(&worker_threads[i], worker, &worker_arg);
+
+ muxer_loop( &w2m_q, &m2s_q, num_slots, outfd );
+
+ for( int i = num_workers - 1; i >= 0; --i )
+ xjoin(worker_threads[i]);
+ delete[] worker_threads; worker_threads = 0;
+
+ xjoin(splitter_thread);
+
+ if( verbosity >= 1 )
+ {
+ if( in_size <= 0 || out_size <= 0 )
+ std::fprintf( stderr, "no data compressed.\n" );
+ else
+ std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, "
+ "%5.2f%% saved, %lld in, %lld out.\n",
+ (double)in_size / out_size,
+ ( 8.0 * out_size ) / in_size,
+ 100.0 * ( 1.0 - ( (double)out_size / in_size ) ),
+ in_size, out_size );
+ }
const int FW = ( sizeof(long unsigned) * 8 ) / 3 + 1;
- if (print_cctrs && 0 > fprintf(stderr,
+ if( ( debug_level & 1 ) && 0 > fprintf(stderr,
"any worker tried to consume from splitter: %*lu\n"
"any worker stalled : %*lu\n"
"muxer tried to consume from workers : %*lu\n"
@@ -645,25 +521,9 @@ plzip(unsigned num_worker, unsigned num_slot, unsigned clidx, int print_cctrs,
fatal();
}
- m2s_q_uninit(&m2s_q, num_slot);
+ assert( m2s_q.num_free == num_slots );
w2m_q_uninit(&w2m_q);
s2w_q_uninit(&s2w_q);
-}
-
-
-void *
-plzip_wrap(void *v_arg)
-{
- Plzip_arg *arg = (Plzip_arg *)v_arg;
- plzip(
- arg->num_worker,
- arg->num_slot,
- arg->clidx,
- arg->print_cctrs,
- arg->infd,
- arg->outfd
- );
-
xraise(SIGUSR2);
return 0;
-}
+ }