diff options
Diffstat (limited to 'plzip.cc')
-rw-r--r-- | plzip.cc | 669 |
1 files changed, 669 insertions, 0 deletions
diff --git a/plzip.cc b/plzip.cc new file mode 100644 index 0000000..f110437 --- /dev/null +++ b/plzip.cc @@ -0,0 +1,669 @@ +/* Plzip - A parallel version of the lzip data compressor + Copyright (C) 2009 Laszlo Ersek. + Copyright (C) 2009 Antonio Diaz Diaz. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#define _FILE_OFFSET_BITS 64 + +#include <cassert> +#include <cerrno> +#include <climits> +#include <csignal> +#include <cstdio> +#include <cstdlib> +#include <stdint.h> +#include <unistd.h> +#include <lzlib.h> + +#include "main.h" +#include "plzip.h" +#include "lacos_rbtree.h" + + +struct S2w_blk /* Splitter to workers. */ +{ + uint64_t id; /* Block serial number as read from infd. */ + S2w_blk *next; /* Next in queue. */ + size_t loaded; /* # of bytes in plain, may be 0 for 1st. */ + char unsigned plain[1]; /* Data read from infd, allocated: sizeof_plain. */ +}; + + +struct S2w_q +{ + Cond av_or_eof; /* New block available or splitter done. */ + S2w_blk *tail, /* Splitter will append here. */ + *head; /* Next ready worker shall compress this. */ + int eof; /* Splitter done. */ +}; + + +static void +s2w_q_init(S2w_q *s2w_q) +{ + xinit(&s2w_q->av_or_eof); + s2w_q->tail = 0; + s2w_q->head = 0; + s2w_q->eof = 0; +} + + +static void +s2w_q_uninit(S2w_q *s2w_q) +{ + assert(0 != s2w_q->eof); + assert(0 == s2w_q->head); + assert(0 == s2w_q->tail); + xdestroy(&s2w_q->av_or_eof); +} + + +struct W2m_blk /* Workers to muxer. */ +{ + uint64_t id; /* Block index as read from infd. */ + W2m_blk *next; /* Next block in list (unordered). */ + size_t produced; /* Number of bytes in compr. */ + char unsigned compr[1]; /* Data to write to outfd, alloc.: sizeof_compr. */ +}; + + +static int +w2m_blk_cmp(const void *v_a, const void *v_b) +{ + uint64_t a, + b; + + a = ((const W2m_blk *)v_a)->id; + b = ((const W2m_blk *)v_b)->id; + + return + a < b ? -1 + : a > b ? 1 + : 0; +} + + +struct W2m_q +{ + Cond av_or_exit; /* New block available or all workers exited. */ + uint64_t needed; /* Block needed for resuming writing. */ + W2m_blk *head; /* Block list (unordered). */ + unsigned working; /* Number of workers still running. */ +}; + + +static void +w2m_q_init(W2m_q *w2m_q, unsigned num_worker) +{ + assert(0u < num_worker); + xinit(&w2m_q->av_or_exit); + w2m_q->needed = 0u; + w2m_q->head = 0; + w2m_q->working = num_worker; +} + + +static void +w2m_q_uninit(W2m_q *w2m_q) +{ + assert(0u == w2m_q->working); + assert(0 == w2m_q->head); + xdestroy(&w2m_q->av_or_exit); +} + + +struct M2s_q /* Muxer to splitter. */ +{ + Cond av; /* Free slot available. */ + unsigned num_free; /* Number of free slots. */ +}; + + +static void +m2s_q_init(M2s_q *m2s_q, unsigned num_free) +{ + assert(0u < num_free); + xinit(&m2s_q->av); + m2s_q->num_free = num_free; +} + + +static void +m2s_q_uninit(M2s_q *m2s_q, unsigned num_free) +{ + assert(m2s_q->num_free == num_free); + xdestroy(&m2s_q->av); +} + + +static void +split(M2s_q *m2s_q, S2w_q *s2w_q, int infd, + const size_t sizeof_plain, const size_t sizeof_s2w_blk) +{ + uint64_t id; + ssize_t rd; + + id = 0u; + do { + S2w_blk *s2w_blk; + size_t vacant; + + /* Grab a free slot. */ + xlock_pred(&m2s_q->av); + while (0u == m2s_q->num_free) { + xwait(&m2s_q->av); + } + --m2s_q->num_free; + xunlock(&m2s_q->av); + s2w_blk = (S2w_blk *)xalloc(sizeof_s2w_blk); + + /* Fill block. */ + vacant = sizeof_plain; + do { + rd = read(infd, s2w_blk->plain + (sizeof_plain - vacant), + vacant > (size_t)SSIZE_MAX ? (size_t)SSIZE_MAX : vacant); + } while (0 < rd && 0u < (vacant -= (size_t)rd)); + + /* Read error. */ + if (-1 == rd) { + fail("read()", errno); + } + + if (sizeof_plain == vacant && 0u < id) { + /* EOF on first read, but not for first input block. */ + (*freef)(s2w_blk); + xlock(&m2s_q->av); + ++m2s_q->num_free; + xunlock(&m2s_q->av); + } + else { + s2w_blk->id = id; + s2w_blk->next = 0; + s2w_blk->loaded = sizeof_plain - vacant; + } + + /* We either push a block, or set EOF, or both. */ + assert(sizeof_plain > vacant || 0 == rd); + + xlock(&s2w_q->av_or_eof); + if (0 == s2w_q->head) { + xbroadcast(&s2w_q->av_or_eof); + } + + if (sizeof_plain > vacant || 0u == id) { + if (0 == s2w_q->tail) { + s2w_q->head = s2w_blk; + } + else { + s2w_q->tail->next = s2w_blk; + } + s2w_q->tail = s2w_blk; + } + s2w_q->eof = (0 == rd); + xunlock(&s2w_q->av_or_eof); + + /* + If we didn't push a block, then this is bogus, but then we did set EOF, + so it doesn't matter, because we'll leave immediately. + */ + ++id; + } while (0 < rd); +} + + +struct Split_arg +{ + M2s_q *m2s_q; + S2w_q *s2w_q; + int infd; + size_t sizeof_plain, + sizeof_S2w_blk; +}; + + +static void * +split_wrap(void *v_split_arg) +{ + Split_arg *split_arg = (Split_arg *)v_split_arg; + + split( + split_arg->m2s_q, + split_arg->s2w_q, + split_arg->infd, + split_arg->sizeof_plain, + split_arg->sizeof_S2w_blk + ); + return 0; +} + + +static void +work_lz_rd(W2m_blk *w2m_blk, const size_t sizeof_compr, void *lz) +{ + int rd; + + assert(w2m_blk->produced < sizeof_compr); + rd = LZ_compress_read(lz, w2m_blk->compr + w2m_blk->produced, + sizeof_compr - w2m_blk->produced); + if (-1 == rd) { + show_error( "LZ_compress_read() failed." ); + fatal(); + } + w2m_blk->produced += (size_t)rd; +} + + +struct Compr_lev +{ + unsigned dict_size, + mx_match; +}; + + +static const Compr_lev compr_lev[] = { + { 1u * 1024u * 1024u, 10u }, + { 1u * 1024u * 1024u, 12u }, + { 1u * 1024u * 1024u, 17u }, + { 2u * 1024u * 1024u, 26u }, + { 4u * 1024u * 1024u, 44u }, + { 8u * 1024u * 1024u, 80u }, + { 16u * 1024u * 1024u, 108u }, + { 16u * 1024u * 1024u, 163u }, + { 32u * 1024u * 1024u, 273u } +}; + + +static void +work_compr(S2w_blk *s2w_blk, W2m_q *w2m_q, unsigned clidx, + const size_t sizeof_compr, const size_t sizeof_w2m_blk) +{ + W2m_blk *w2m_blk; + + assert(0u < s2w_blk->loaded || 0u == s2w_blk->id); + + w2m_blk = (W2m_blk *)xalloc(sizeof_w2m_blk); + + /* Single member compression. Settings like with lzip -6. */ + { + void *lz; + size_t written; + + lz = LZ_compress_open(compr_lev[clidx].dict_size, + compr_lev[clidx].mx_match, (uint64_t)-1 / 2u); + if (LZ_ok != LZ_compress_errno(lz)) { + show_error( "LZ_compress_open() failed." ); + fatal(); + } + + written = 0u; + w2m_blk->produced = 0u; + while (written < s2w_blk->loaded) { + int wr; + + wr = LZ_compress_write(lz, s2w_blk->plain + written, + s2w_blk->loaded - written); + if (-1 == wr) { + show_error( "LZ_compress_write() failed." ); + fatal(); + } + written += (size_t)wr; + + work_lz_rd(w2m_blk, sizeof_compr, lz); + } + + if (-1 == LZ_compress_finish(lz)) { + show_error( "LZ_compress_finish() failed." ); + fatal(); + } + + while (!LZ_compress_finished(lz)) { + work_lz_rd(w2m_blk, sizeof_compr, lz); + } + + if (-1 == LZ_compress_close(lz)) { + show_error( "LZ_compress_close() failed." ); + fatal(); + } + } + + w2m_blk->id = s2w_blk->id; + + /* Push block to muxer. */ + xlock(&w2m_q->av_or_exit); + w2m_blk->next = w2m_q->head; + w2m_q->head = w2m_blk; + if (w2m_blk->id == w2m_q->needed) { + xsignal(&w2m_q->av_or_exit); + } + xunlock(&w2m_q->av_or_exit); +} + + +static void +work(S2w_q *s2w_q, W2m_q *w2m_q, unsigned clidx, + const size_t sizeof_compr, const size_t sizeof_w2m_blk) +{ + for (;;) { + S2w_blk *s2w_blk; + + /* Grab a block to work on. */ + xlock_pred(&s2w_q->av_or_eof); + while (0 == s2w_q->head && !s2w_q->eof) { + xwait(&s2w_q->av_or_eof); + } + if (0 == s2w_q->head) { + /* No blocks available and splitter exited. */ + xunlock(&s2w_q->av_or_eof); + break; + } + s2w_blk = s2w_q->head; + s2w_q->head = s2w_blk->next; + if (0 == s2w_q->head) { + s2w_q->tail = 0; + } + xunlock(&s2w_q->av_or_eof); + + work_compr(s2w_blk, w2m_q, clidx, sizeof_compr, sizeof_w2m_blk); + (*freef)(s2w_blk); + } + + /* Notify muxer when last worker exits. */ + xlock(&w2m_q->av_or_exit); + if (0u == --w2m_q->working && 0 == w2m_q->head) { + xsignal(&w2m_q->av_or_exit); + } + xunlock(&w2m_q->av_or_exit); +} + + +struct Work_arg +{ + S2w_q *s2w_q; + W2m_q *w2m_q; + unsigned clidx; + size_t sizeof_compr, + sizeof_W2m_blk; +}; + + +static void * +work_wrap(void *v_work_arg) +{ + Work_arg *work_arg; + + work_arg = (Work_arg *)v_work_arg; + work( + work_arg->s2w_q, + work_arg->w2m_q, + work_arg->clidx, + work_arg->sizeof_compr, + work_arg->sizeof_W2m_blk + ); + return 0; +} + + +static void * +reord_alloc(size_t size, void *) +{ + return (*mallocf)(size); +} + + +static void +reord_dealloc(void *ptr, void *) +{ + (*freef)(ptr); +} + + +static void +mux_write(M2s_q *m2s_q, lacos_rbtree_node **reord, + uint64_t *reord_needed, int outfd) +{ + assert(0 != *reord); + + /* + Go on until the tree becomes empty or the next block is found to be + missing. + */ + do { + lacos_rbtree_node *reord_head; + W2m_blk *reord_w2m_blk; + + reord_head = lacos_rbtree_min(*reord); + assert(0 != reord_head); + + reord_w2m_blk = (W2m_blk *)(*(void **)reord_head); + if (reord_w2m_blk->id != *reord_needed) { + break; + } + + /* Write out "reord_w2m_blk". */ + if (-1 != outfd) { + char unsigned *cp; + + cp = reord_w2m_blk->compr; + while (reord_w2m_blk->produced > 0u) { + ssize_t written; + + written = write(outfd, cp, reord_w2m_blk->produced > (size_t)SSIZE_MAX + ? (size_t)SSIZE_MAX : reord_w2m_blk->produced); + if (-1 == written) { + fail("write()", errno); + } + + reord_w2m_blk->produced -= (size_t)written; + cp += written; + } + } + + ++*reord_needed; + + xlock(&m2s_q->av); + if (0u == m2s_q->num_free++) { + xsignal(&m2s_q->av); + } + xunlock(&m2s_q->av); + + lacos_rbtree_delete( + reord, /* new_root */ + reord_head, /* old_node */ + 0, /* old_data */ + reord_dealloc, /* dealloc() */ + 0 /* alloc_ctl */ + ); + + /* Release "reord_w2m_blk". */ + (*freef)(reord_w2m_blk); + } while (0 != *reord); +} + + +static void +mux(W2m_q *w2m_q, M2s_q *m2s_q, int outfd) +{ + lacos_rbtree_node *reord; + uint64_t reord_needed; + + reord = 0; + reord_needed = 0u; + + xlock_pred(&w2m_q->av_or_exit); + for (;;) { + W2m_blk *w2m_blk; + + /* Grab all available compressed blocks in one step. */ + while (0 == w2m_q->head && 0u < w2m_q->working) { + xwait(&w2m_q->av_or_exit); + } + + if (0 == w2m_q->head) { + /* w2m_q is empty and all workers exited */ + break; + } + + w2m_blk = w2m_q->head; + w2m_q->head = 0; + xunlock(&w2m_q->av_or_exit); + + /* Merge blocks fetched this time into tree. */ + do { + lacos_rbtree_node *new_node; + W2m_blk *next; + + if (-1 == lacos_rbtree_insert( + &reord, /* new_root */ + &new_node, /* new_node */ + w2m_blk, /* new_data */ + w2m_blk_cmp, /* cmp() */ + reord_alloc, /* alloc() */ + 0 /* alloc_ctl */ + )) { + /* id collision shouldn't happen */ + assert(0 == new_node); + show_error( "lacos_rbtree_insert(): out of memory." ); + fatal(); + } + + next = w2m_blk->next; + w2m_blk->next = 0; + w2m_blk = next; + } while (0 != w2m_blk); + + /* Write out initial continuous sequence of reordered blocks. */ + mux_write(m2s_q, &reord, &reord_needed, outfd); + + xlock_pred(&w2m_q->av_or_exit); + w2m_q->needed = reord_needed; + } + xunlock(&w2m_q->av_or_exit); + + assert(0 == reord); +} + + +static void +plzip(unsigned num_worker, unsigned num_slot, unsigned clidx, int print_cctrs, + int infd, int outfd) +{ + S2w_q s2w_q; + W2m_q w2m_q; + M2s_q m2s_q; + Split_arg split_arg; + pthread_t splitter; + Work_arg work_arg; + pthread_t *worker; + unsigned i; + + assert(clidx < sizeof compr_lev / sizeof compr_lev[0]); + + s2w_q_init(&s2w_q); + w2m_q_init(&w2m_q, num_worker); + m2s_q_init(&m2s_q, num_slot); + +#define SIZES(struc, arr, arsz_unsigned, arg) \ + do { \ + unsigned tmp; \ +\ + tmp = arsz_unsigned; \ + if ((size_t)-1 < tmp) { \ + show_error( "size_t overflow in sizeof_" #arr "." ); \ + fatal(); \ + } \ + arg ## _arg . sizeof_ ## arr = tmp; \ +\ + if ((size_t)-1 - sizeof(struc) \ + < arg ## _arg . sizeof_ ## arr - (size_t)1) { \ + show_error( "size_t overflow in sizeof_" #struc "." ); \ + fatal(); \ + } \ + arg ## _arg . sizeof_ ## struc = sizeof(struc) \ + + (arg ## _arg . sizeof_ ## arr - (size_t)1); \ + } while (0) + + split_arg.m2s_q = &m2s_q; + split_arg.s2w_q = &s2w_q; + split_arg.infd = infd; + SIZES(S2w_blk, plain, 2u * compr_lev[clidx].dict_size, split); + xcreate(&splitter, split_wrap, &split_arg); + + work_arg.s2w_q = &s2w_q; + work_arg.w2m_q = &w2m_q; + work_arg.clidx = clidx; + SIZES(W2m_blk, compr, (4u + 1u + 1u) + + ((unsigned)split_arg.sizeof_plain * 9u + 7u) / 8u + (4u + 8u + 8u), + work); + +#undef SIZES + + assert(0u < num_worker); + assert((size_t)-1 / sizeof *worker >= num_worker); + worker = (pthread_t *)xalloc(num_worker * sizeof *worker); + for (i = 0u; i < num_worker; ++i) { + xcreate(&worker[i], work_wrap, &work_arg); + } + + mux(&w2m_q, &m2s_q, outfd); + + i = num_worker; + do { + xjoin(worker[--i]); + } while (0u < i); + (*freef)(worker); + + xjoin(splitter); + + const int FW = ( sizeof(long unsigned) * 8 ) / 3 + 1; + if (print_cctrs && 0 > fprintf(stderr, + "any worker tried to consume from splitter: %*lu\n" + "any worker stalled : %*lu\n" + "muxer tried to consume from workers : %*lu\n" + "muxer stalled : %*lu\n" + "splitter tried to consume from muxer : %*lu\n" + "splitter stalled : %*lu\n", + FW, s2w_q.av_or_eof.ccount, + FW, s2w_q.av_or_eof.wcount, + FW, w2m_q.av_or_exit.ccount, + FW, w2m_q.av_or_exit.wcount, + FW, m2s_q.av.ccount, + FW, m2s_q.av.wcount) ) + { + fatal(); + } + + m2s_q_uninit(&m2s_q, num_slot); + w2m_q_uninit(&w2m_q); + s2w_q_uninit(&s2w_q); +} + + +void * +plzip_wrap(void *v_arg) +{ + Plzip_arg *arg = (Plzip_arg *)v_arg; + plzip( + arg->num_worker, + arg->num_slot, + arg->clidx, + arg->print_cctrs, + arg->infd, + arg->outfd + ); + + xraise(SIGUSR2); + return 0; +} |