diff options
Diffstat (limited to 'plzip.cc')
-rw-r--r-- | plzip.cc | 684 |
1 files changed, 272 insertions, 412 deletions
@@ -1,6 +1,6 @@ /* Plzip - A parallel version of the lzip data compressor Copyright (C) 2009 Laszlo Ersek. - Copyright (C) 2009 Antonio Diaz Diaz. + Copyright (C) 2009, 2010 Antonio Diaz Diaz. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -18,27 +18,74 @@ #define _FILE_OFFSET_BITS 64 +#include <algorithm> #include <cassert> #include <cerrno> #include <climits> #include <csignal> #include <cstdio> #include <cstdlib> +#include <vector> #include <stdint.h> #include <unistd.h> #include <lzlib.h> #include "main.h" #include "plzip.h" -#include "lacos_rbtree.h" + +#ifndef LLONG_MAX +#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL +#endif +#ifndef LLONG_MIN +#define LLONG_MIN (-LLONG_MAX - 1LL) +#endif +#ifndef ULLONG_MAX +#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL +#endif + + +namespace { + +long long in_size = 0; +long long out_size = 0; + +void *(*mallocf)(size_t size); +void (*freef)(void *ptr); + + +void * trace_malloc(size_t size) +{ + int save_errno = 0; + + void * ret = malloc(size); + if( ret == 0 ) save_errno = errno; + fprintf(stderr, "malloc(%lu) == %p\n", (long unsigned)size, ret); + if( ret == 0 ) errno = save_errno; + return ret; +} + + +void trace_free(void *ptr) +{ + fprintf(stderr, "free(%p)\n", ptr); + free(ptr); +} + + +void * xalloc(size_t size) +{ + void *ret = (*mallocf)(size); + if( 0 == ret ) fail("(*mallocf)()", errno); + return ret; +} struct S2w_blk /* Splitter to workers. */ { - uint64_t id; /* Block serial number as read from infd. */ + unsigned long long id; /* Block serial number as read from infd. */ S2w_blk *next; /* Next in queue. */ size_t loaded; /* # of bytes in plain, may be 0 for 1st. */ - char unsigned plain[1]; /* Data read from infd, allocated: sizeof_plain. */ + unsigned char plain[1]; /* Data read from infd, allocated: sizeof_plain. */ }; @@ -51,7 +98,7 @@ struct S2w_q }; -static void +void s2w_q_init(S2w_q *s2w_q) { xinit(&s2w_q->av_or_eof); @@ -61,7 +108,7 @@ s2w_q_init(S2w_q *s2w_q) } -static void +void s2w_q_uninit(S2w_q *s2w_q) { assert(0 != s2w_q->eof); @@ -71,251 +118,169 @@ s2w_q_uninit(S2w_q *s2w_q) } -struct W2m_blk /* Workers to muxer. */ +struct W2m_blk /* Workers to muxer data block. */ { - uint64_t id; /* Block index as read from infd. */ + unsigned long long id; /* Block index as read from infd. */ W2m_blk *next; /* Next block in list (unordered). */ - size_t produced; /* Number of bytes in compr. */ - char unsigned compr[1]; /* Data to write to outfd, alloc.: sizeof_compr. */ + int produced; /* Number of bytes in compr. */ + unsigned char compr[1]; /* Data to write to outfd, alloc.: sizeof_compr. */ }; -static int -w2m_blk_cmp(const void *v_a, const void *v_b) -{ - uint64_t a, - b; - - a = ((const W2m_blk *)v_a)->id; - b = ((const W2m_blk *)v_b)->id; - - return - a < b ? -1 - : a > b ? 1 - : 0; -} - - struct W2m_q { Cond av_or_exit; /* New block available or all workers exited. */ - uint64_t needed; /* Block needed for resuming writing. */ + unsigned long long needed; /* Block needed for resuming writing. */ W2m_blk *head; /* Block list (unordered). */ unsigned working; /* Number of workers still running. */ }; -static void -w2m_q_init(W2m_q *w2m_q, unsigned num_worker) +void +w2m_q_init(W2m_q *w2m_q, int num_workers) { - assert(0u < num_worker); + assert(0 < num_workers); xinit(&w2m_q->av_or_exit); - w2m_q->needed = 0u; + w2m_q->needed = 0; w2m_q->head = 0; - w2m_q->working = num_worker; + w2m_q->working = num_workers; } -static void +void w2m_q_uninit(W2m_q *w2m_q) { - assert(0u == w2m_q->working); + assert(0 == w2m_q->working); assert(0 == w2m_q->head); xdestroy(&w2m_q->av_or_exit); } -struct M2s_q /* Muxer to splitter. */ -{ - Cond av; /* Free slot available. */ - unsigned num_free; /* Number of free slots. */ -}; - - -static void -m2s_q_init(M2s_q *m2s_q, unsigned num_free) -{ - assert(0u < num_free); - xinit(&m2s_q->av); - m2s_q->num_free = num_free; -} +struct M2s_q // Muxer to splitter queue + { + Cond av; // Free slot available + int num_free; // Number of free slots + M2s_q( const int slots ) + { + xinit(&av); + num_free = slots; + } -static void -m2s_q_uninit(M2s_q *m2s_q, unsigned num_free) -{ - assert(m2s_q->num_free == num_free); - xdestroy(&m2s_q->av); -} + ~M2s_q() { xdestroy(&av); } + }; -static void -split(M2s_q *m2s_q, S2w_q *s2w_q, int infd, - const size_t sizeof_plain, const size_t sizeof_s2w_blk) -{ - uint64_t id; - ssize_t rd; +struct Splitter_arg + { + M2s_q *m2s_q; + S2w_q *s2w_q; + int infd; + int sizeof_plain; + size_t sizeof_S2w_blk; + }; - id = 0u; - do { - S2w_blk *s2w_blk; - size_t vacant; +void * splitter( void * arg ) + { + const Splitter_arg & tmp = *(Splitter_arg *)arg; + M2s_q *m2s_q = tmp.m2s_q; + S2w_q *s2w_q = tmp.s2w_q; + const int infd = tmp.infd; + const int sizeof_plain = tmp.sizeof_plain; + const size_t sizeof_s2w_blk = tmp.sizeof_S2w_blk; + + for( unsigned long long id = 0; ; ++id ) + { /* Grab a free slot. */ xlock_pred(&m2s_q->av); - while (0u == m2s_q->num_free) { - xwait(&m2s_q->av); - } + while( m2s_q->num_free == 0 ) xwait(&m2s_q->av); --m2s_q->num_free; xunlock(&m2s_q->av); - s2w_blk = (S2w_blk *)xalloc(sizeof_s2w_blk); + S2w_blk * s2w_blk = (S2w_blk *)xalloc(sizeof_s2w_blk); /* Fill block. */ - vacant = sizeof_plain; - do { - rd = read(infd, s2w_blk->plain + (sizeof_plain - vacant), - vacant > (size_t)SSIZE_MAX ? (size_t)SSIZE_MAX : vacant); - } while (0 < rd && 0u < (vacant -= (size_t)rd)); + const int rd = readblock( infd, (char *)s2w_blk->plain, sizeof_plain ); + if( rd != sizeof_plain && errno ) fail("read()", errno); - /* Read error. */ - if (-1 == rd) { - fail("read()", errno); - } - - if (sizeof_plain == vacant && 0u < id) { + if( rd == 0 && id != 0 ) + { /* EOF on first read, but not for first input block. */ (*freef)(s2w_blk); xlock(&m2s_q->av); ++m2s_q->num_free; xunlock(&m2s_q->av); - } - else { + } + else + { s2w_blk->id = id; s2w_blk->next = 0; - s2w_blk->loaded = sizeof_plain - vacant; - } - - /* We either push a block, or set EOF, or both. */ - assert(sizeof_plain > vacant || 0 == rd); + s2w_blk->loaded = rd; + in_size += rd; + } xlock(&s2w_q->av_or_eof); - if (0 == s2w_q->head) { - xbroadcast(&s2w_q->av_or_eof); - } + if( s2w_q->head == 0 ) xbroadcast(&s2w_q->av_or_eof); - if (sizeof_plain > vacant || 0u == id) { - if (0 == s2w_q->tail) { - s2w_q->head = s2w_blk; - } - else { - s2w_q->tail->next = s2w_blk; - } + if( rd > 0 || id == 0 ) + { + if( s2w_q->tail == 0 ) s2w_q->head = s2w_blk; + else s2w_q->tail->next = s2w_blk; s2w_q->tail = s2w_blk; - } - s2w_q->eof = (0 == rd); + } + s2w_q->eof = ( rd == 0 ); xunlock(&s2w_q->av_or_eof); - /* - If we didn't push a block, then this is bogus, but then we did set EOF, - so it doesn't matter, because we'll leave immediately. - */ - ++id; - } while (0 < rd); -} - - -struct Split_arg -{ - M2s_q *m2s_q; - S2w_q *s2w_q; - int infd; - size_t sizeof_plain, - sizeof_S2w_blk; -}; - - -static void * -split_wrap(void *v_split_arg) -{ - Split_arg *split_arg = (Split_arg *)v_split_arg; - - split( - split_arg->m2s_q, - split_arg->s2w_q, - split_arg->infd, - split_arg->sizeof_plain, - split_arg->sizeof_S2w_blk - ); + if( rd <= 0 ) break; + } return 0; -} + } -static void -work_lz_rd(W2m_blk *w2m_blk, const size_t sizeof_compr, void *lz) +void work_lz_rd(W2m_blk *w2m_blk, const int sizeof_compr, LZ_Encoder * lz) { int rd; assert(w2m_blk->produced < sizeof_compr); rd = LZ_compress_read(lz, w2m_blk->compr + w2m_blk->produced, sizeof_compr - w2m_blk->produced); - if (-1 == rd) { + if( -1 == rd ) { show_error( "LZ_compress_read() failed." ); fatal(); } - w2m_blk->produced += (size_t)rd; + w2m_blk->produced += rd; } -struct Compr_lev -{ - unsigned dict_size, - mx_match; -}; - - -static const Compr_lev compr_lev[] = { - { 1u * 1024u * 1024u, 10u }, - { 1u * 1024u * 1024u, 12u }, - { 1u * 1024u * 1024u, 17u }, - { 2u * 1024u * 1024u, 26u }, - { 4u * 1024u * 1024u, 44u }, - { 8u * 1024u * 1024u, 80u }, - { 16u * 1024u * 1024u, 108u }, - { 16u * 1024u * 1024u, 163u }, - { 32u * 1024u * 1024u, 273u } -}; - - -static void -work_compr(S2w_blk *s2w_blk, W2m_q *w2m_q, unsigned clidx, - const size_t sizeof_compr, const size_t sizeof_w2m_blk) +void work_compr( const int dictionary_size, const int match_len_limit, + S2w_blk *s2w_blk, W2m_q *w2m_q, + const int sizeof_compr, const size_t sizeof_w2m_blk ) { W2m_blk *w2m_blk; - assert(0u < s2w_blk->loaded || 0u == s2w_blk->id); + assert(0 < s2w_blk->loaded || 0 == s2w_blk->id); w2m_blk = (W2m_blk *)xalloc(sizeof_w2m_blk); /* Single member compression. Settings like with lzip -6. */ { - void *lz; + LZ_Encoder * lz; size_t written; - lz = LZ_compress_open(compr_lev[clidx].dict_size, - compr_lev[clidx].mx_match, (uint64_t)-1 / 2u); - if (LZ_ok != LZ_compress_errno(lz)) { + lz = LZ_compress_open( dictionary_size, match_len_limit, LLONG_MAX ); + if( LZ_ok != LZ_compress_errno(lz) ) { show_error( "LZ_compress_open() failed." ); fatal(); } - written = 0u; - w2m_blk->produced = 0u; - while (written < s2w_blk->loaded) { + written = 0; + w2m_blk->produced = 0; + while( written < s2w_blk->loaded ) { int wr; wr = LZ_compress_write(lz, s2w_blk->plain + written, s2w_blk->loaded - written); - if (-1 == wr) { + if( -1 == wr ) { show_error( "LZ_compress_write() failed." ); fatal(); } @@ -324,16 +289,16 @@ work_compr(S2w_blk *s2w_blk, W2m_q *w2m_q, unsigned clidx, work_lz_rd(w2m_blk, sizeof_compr, lz); } - if (-1 == LZ_compress_finish(lz)) { + if( -1 == LZ_compress_finish(lz) ) { show_error( "LZ_compress_finish() failed." ); fatal(); } - while (!LZ_compress_finished(lz)) { + while( !LZ_compress_finished(lz) ) { work_lz_rd(w2m_blk, sizeof_compr, lz); } - if (-1 == LZ_compress_close(lz)) { + if( -1 == LZ_compress_close(lz) ) { show_error( "LZ_compress_close() failed." ); fatal(); } @@ -345,290 +310,201 @@ work_compr(S2w_blk *s2w_blk, W2m_q *w2m_q, unsigned clidx, xlock(&w2m_q->av_or_exit); w2m_blk->next = w2m_q->head; w2m_q->head = w2m_blk; - if (w2m_blk->id == w2m_q->needed) { + if( w2m_blk->id == w2m_q->needed ) { xsignal(&w2m_q->av_or_exit); } xunlock(&w2m_q->av_or_exit); } -static void -work(S2w_q *s2w_q, W2m_q *w2m_q, unsigned clidx, - const size_t sizeof_compr, const size_t sizeof_w2m_blk) -{ - for (;;) { +struct Worker_arg + { + int dictionary_size; + int match_len_limit; + S2w_q *s2w_q; + W2m_q *w2m_q; + int sizeof_compr; + size_t sizeof_W2m_blk; + }; + + +void * worker( void * arg ) + { + const Worker_arg & tmp = *(Worker_arg *)arg; + const int dictionary_size = tmp.dictionary_size; + const int match_len_limit = tmp.match_len_limit; + S2w_q *s2w_q = tmp.s2w_q; + W2m_q *w2m_q = tmp.w2m_q; + const int sizeof_compr = tmp.sizeof_compr; + const size_t sizeof_w2m_blk = tmp.sizeof_W2m_blk; + + while( true ) + { S2w_blk *s2w_blk; /* Grab a block to work on. */ xlock_pred(&s2w_q->av_or_eof); - while (0 == s2w_q->head && !s2w_q->eof) { + while( 0 == s2w_q->head && !s2w_q->eof ) { xwait(&s2w_q->av_or_eof); } - if (0 == s2w_q->head) { + if( 0 == s2w_q->head ) { /* No blocks available and splitter exited. */ xunlock(&s2w_q->av_or_eof); break; } s2w_blk = s2w_q->head; s2w_q->head = s2w_blk->next; - if (0 == s2w_q->head) { + if( 0 == s2w_q->head ) { s2w_q->tail = 0; } xunlock(&s2w_q->av_or_eof); - work_compr(s2w_blk, w2m_q, clidx, sizeof_compr, sizeof_w2m_blk); + work_compr( dictionary_size, match_len_limit, s2w_blk, w2m_q, + sizeof_compr, sizeof_w2m_blk ); (*freef)(s2w_blk); } /* Notify muxer when last worker exits. */ xlock(&w2m_q->av_or_exit); - if (0u == --w2m_q->working && 0 == w2m_q->head) { + if( 0 == --w2m_q->working && 0 == w2m_q->head ) { xsignal(&w2m_q->av_or_exit); } xunlock(&w2m_q->av_or_exit); -} - - -struct Work_arg -{ - S2w_q *s2w_q; - W2m_q *w2m_q; - unsigned clidx; - size_t sizeof_compr, - sizeof_W2m_blk; -}; - - -static void * -work_wrap(void *v_work_arg) -{ - Work_arg *work_arg; - - work_arg = (Work_arg *)v_work_arg; - work( - work_arg->s2w_q, - work_arg->w2m_q, - work_arg->clidx, - work_arg->sizeof_compr, - work_arg->sizeof_W2m_blk - ); return 0; -} - - -static void * -reord_alloc(size_t size, void *) -{ - return (*mallocf)(size); -} - - -static void -reord_dealloc(void *ptr, void *) -{ - (*freef)(ptr); -} - - -static void -mux_write(M2s_q *m2s_q, lacos_rbtree_node **reord, - uint64_t *reord_needed, int outfd) -{ - assert(0 != *reord); - - /* - Go on until the tree becomes empty or the next block is found to be - missing. - */ - do { - lacos_rbtree_node *reord_head; - W2m_blk *reord_w2m_blk; - - reord_head = lacos_rbtree_min(*reord); - assert(0 != reord_head); - - reord_w2m_blk = (W2m_blk *)(*(void **)reord_head); - if (reord_w2m_blk->id != *reord_needed) { - break; - } - - /* Write out "reord_w2m_blk". */ - if (-1 != outfd) { - char unsigned *cp; - - cp = reord_w2m_blk->compr; - while (reord_w2m_blk->produced > 0u) { - ssize_t written; - - written = write(outfd, cp, reord_w2m_blk->produced > (size_t)SSIZE_MAX - ? (size_t)SSIZE_MAX : reord_w2m_blk->produced); - if (-1 == written) { - fail("write()", errno); - } - - reord_w2m_blk->produced -= (size_t)written; - cp += written; - } - } - - ++*reord_needed; - - xlock(&m2s_q->av); - if (0u == m2s_q->num_free++) { - xsignal(&m2s_q->av); - } - xunlock(&m2s_q->av); - - lacos_rbtree_delete( - reord, /* new_root */ - reord_head, /* old_node */ - 0, /* old_data */ - reord_dealloc, /* dealloc() */ - 0 /* alloc_ctl */ - ); - - /* Release "reord_w2m_blk". */ - (*freef)(reord_w2m_blk); - } while (0 != *reord); -} - + } -static void -mux(W2m_q *w2m_q, M2s_q *m2s_q, int outfd) -{ - lacos_rbtree_node *reord; - uint64_t reord_needed; - reord = 0; - reord_needed = 0u; +void muxer_loop( W2m_q *w2m_q, M2s_q *m2s_q, const int num_slots, const int outfd ) + { + unsigned long long needed_id = 0; + std::vector< W2m_blk * > circular_buffer( num_slots, (W2m_blk *)0 ); xlock_pred(&w2m_q->av_or_exit); - for (;;) { - W2m_blk *w2m_blk; - + while( true ) + { /* Grab all available compressed blocks in one step. */ - while (0 == w2m_q->head && 0u < w2m_q->working) { + while( w2m_q->head == 0 && w2m_q->working > 0 ) xwait(&w2m_q->av_or_exit); - } - if (0 == w2m_q->head) { - /* w2m_q is empty and all workers exited */ - break; - } + if( w2m_q->head == 0 ) break; // queue is empty. all workers exited - w2m_blk = w2m_q->head; + W2m_blk * w2m_blk = w2m_q->head; w2m_q->head = 0; xunlock(&w2m_q->av_or_exit); - /* Merge blocks fetched this time into tree. */ + // Merge blocks fetched this time into circular buffer do { - lacos_rbtree_node *new_node; - W2m_blk *next; - - if (-1 == lacos_rbtree_insert( - &reord, /* new_root */ - &new_node, /* new_node */ - w2m_blk, /* new_data */ - w2m_blk_cmp, /* cmp() */ - reord_alloc, /* alloc() */ - 0 /* alloc_ctl */ - )) { - /* id collision shouldn't happen */ - assert(0 == new_node); - show_error( "lacos_rbtree_insert(): out of memory." ); - fatal(); - } - - next = w2m_blk->next; + // id collision shouldn't happen + assert( circular_buffer[w2m_blk->id%num_slots] == 0 ); + circular_buffer[w2m_blk->id%num_slots] = w2m_blk; + W2m_blk * next = w2m_blk->next; w2m_blk->next = 0; w2m_blk = next; - } while (0 != w2m_blk); + } while( w2m_blk != 0 ); + + // Write out initial continuous sequence of reordered blocks + while( true ) + { + W2m_blk * needed_w2m_blk = circular_buffer[needed_id%num_slots]; + if( needed_w2m_blk == 0 ) break; + + out_size += needed_w2m_blk->produced; + + if( outfd >= 0 ) + { + const int wr = writeblock( outfd, (char *)needed_w2m_blk->compr, needed_w2m_blk->produced ); + if( wr != needed_w2m_blk->produced ) fail("write()", errno); + } + circular_buffer[needed_id%num_slots] = 0; + ++needed_id; + + xlock(&m2s_q->av); + if( 0 == m2s_q->num_free++ ) xsignal(&m2s_q->av); + xunlock(&m2s_q->av); - /* Write out initial continuous sequence of reordered blocks. */ - mux_write(m2s_q, &reord, &reord_needed, outfd); + (*freef)(needed_w2m_blk); + } xlock_pred(&w2m_q->av_or_exit); - w2m_q->needed = reord_needed; - } + w2m_q->needed = needed_id; + } xunlock(&w2m_q->av_or_exit); - assert(0 == reord); -} + for( int i = 0; i < num_slots; ++i ) + if( circular_buffer[i] != 0 ) + { show_error( "circular buffer not empty" ); fatal(); } + } +} // end namespace -static void -plzip(unsigned num_worker, unsigned num_slot, unsigned clidx, int print_cctrs, - int infd, int outfd) -{ + +void * muxer( void * arg ) + { + const Muxer_arg & tmp = *(Muxer_arg *)arg; + const int dictionary_size = tmp.dictionary_size; + const int match_len_limit = tmp.match_len_limit; + const int num_workers = tmp.num_workers; + const int num_slots = tmp.num_slots; + const int debug_level = tmp.debug_level; + const int infd = tmp.infd; + const int outfd = tmp.outfd; S2w_q s2w_q; W2m_q w2m_q; - M2s_q m2s_q; - Split_arg split_arg; - pthread_t splitter; - Work_arg work_arg; - pthread_t *worker; - unsigned i; - assert(clidx < sizeof compr_lev / sizeof compr_lev[0]); + if( debug_level & 2 ) { mallocf = trace_malloc; freef = trace_free; } + else { mallocf = malloc; freef = free; } s2w_q_init(&s2w_q); - w2m_q_init(&w2m_q, num_worker); - m2s_q_init(&m2s_q, num_slot); - -#define SIZES(struc, arr, arsz_unsigned, arg) \ - do { \ - unsigned tmp; \ -\ - tmp = arsz_unsigned; \ - if ((size_t)-1 < tmp) { \ - show_error( "size_t overflow in sizeof_" #arr "." ); \ - fatal(); \ - } \ - arg ## _arg . sizeof_ ## arr = tmp; \ -\ - if ((size_t)-1 - sizeof(struc) \ - < arg ## _arg . sizeof_ ## arr - (size_t)1) { \ - show_error( "size_t overflow in sizeof_" #struc "." ); \ - fatal(); \ - } \ - arg ## _arg . sizeof_ ## struc = sizeof(struc) \ - + (arg ## _arg . sizeof_ ## arr - (size_t)1); \ - } while (0) - - split_arg.m2s_q = &m2s_q; - split_arg.s2w_q = &s2w_q; - split_arg.infd = infd; - SIZES(S2w_blk, plain, 2u * compr_lev[clidx].dict_size, split); - xcreate(&splitter, split_wrap, &split_arg); - - work_arg.s2w_q = &s2w_q; - work_arg.w2m_q = &w2m_q; - work_arg.clidx = clidx; - SIZES(W2m_blk, compr, (4u + 1u + 1u) - + ((unsigned)split_arg.sizeof_plain * 9u + 7u) / 8u + (4u + 8u + 8u), - work); - -#undef SIZES - - assert(0u < num_worker); - assert((size_t)-1 / sizeof *worker >= num_worker); - worker = (pthread_t *)xalloc(num_worker * sizeof *worker); - for (i = 0u; i < num_worker; ++i) { - xcreate(&worker[i], work_wrap, &work_arg); - } - - mux(&w2m_q, &m2s_q, outfd); - - i = num_worker; - do { - xjoin(worker[--i]); - } while (0u < i); - (*freef)(worker); - - xjoin(splitter); + w2m_q_init(&w2m_q, num_workers); + M2s_q m2s_q( num_slots ); + + + Splitter_arg splitter_arg; + splitter_arg.m2s_q = &m2s_q; + splitter_arg.s2w_q = &s2w_q; + splitter_arg.infd = infd; + splitter_arg.sizeof_plain = 2 * std::max( 65536, dictionary_size ); + splitter_arg.sizeof_S2w_blk = sizeof(S2w_blk) + splitter_arg.sizeof_plain - 1; + + pthread_t splitter_thread; + xcreate(&splitter_thread, splitter, &splitter_arg); + + Worker_arg worker_arg; + worker_arg.dictionary_size = dictionary_size; + worker_arg.match_len_limit = match_len_limit; + worker_arg.s2w_q = &s2w_q; + worker_arg.w2m_q = &w2m_q; + worker_arg.sizeof_compr = 6 + 20 + ( ( splitter_arg.sizeof_plain / 8 ) * 9 ); + worker_arg.sizeof_W2m_blk = sizeof(W2m_blk) + worker_arg.sizeof_compr - 1; + + pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; + if( worker_threads == 0 ) fail("not enough memory.", errno); + for( int i = 0; i < num_workers; ++i ) + xcreate(&worker_threads[i], worker, &worker_arg); + + muxer_loop( &w2m_q, &m2s_q, num_slots, outfd ); + + for( int i = num_workers - 1; i >= 0; --i ) + xjoin(worker_threads[i]); + delete[] worker_threads; worker_threads = 0; + + xjoin(splitter_thread); + + if( verbosity >= 1 ) + { + if( in_size <= 0 || out_size <= 0 ) + std::fprintf( stderr, "no data compressed.\n" ); + else + std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, " + "%5.2f%% saved, %lld in, %lld out.\n", + (double)in_size / out_size, + ( 8.0 * out_size ) / in_size, + 100.0 * ( 1.0 - ( (double)out_size / in_size ) ), + in_size, out_size ); + } const int FW = ( sizeof(long unsigned) * 8 ) / 3 + 1; - if (print_cctrs && 0 > fprintf(stderr, + if( ( debug_level & 1 ) && 0 > fprintf(stderr, "any worker tried to consume from splitter: %*lu\n" "any worker stalled : %*lu\n" "muxer tried to consume from workers : %*lu\n" @@ -645,25 +521,9 @@ plzip(unsigned num_worker, unsigned num_slot, unsigned clidx, int print_cctrs, fatal(); } - m2s_q_uninit(&m2s_q, num_slot); + assert( m2s_q.num_free == num_slots ); w2m_q_uninit(&w2m_q); s2w_q_uninit(&s2w_q); -} - - -void * -plzip_wrap(void *v_arg) -{ - Plzip_arg *arg = (Plzip_arg *)v_arg; - plzip( - arg->num_worker, - arg->num_slot, - arg->clidx, - arg->print_cctrs, - arg->infd, - arg->outfd - ); - xraise(SIGUSR2); return 0; -} + } |