summaryrefslogtreecommitdiffstats
path: root/plzip.cc
diff options
context:
space:
mode:
Diffstat (limited to 'plzip.cc')
-rw-r--r--plzip.cc669
1 files changed, 669 insertions, 0 deletions
diff --git a/plzip.cc b/plzip.cc
new file mode 100644
index 0000000..f110437
--- /dev/null
+++ b/plzip.cc
@@ -0,0 +1,669 @@
+/* Plzip - A parallel version of the lzip data compressor
+ Copyright (C) 2009 Laszlo Ersek.
+ Copyright (C) 2009 Antonio Diaz Diaz.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#define _FILE_OFFSET_BITS 64
+
+#include <cassert>
+#include <cerrno>
+#include <climits>
+#include <csignal>
+#include <cstdio>
+#include <cstdlib>
+#include <stdint.h>
+#include <unistd.h>
+#include <lzlib.h>
+
+#include "main.h"
+#include "plzip.h"
+#include "lacos_rbtree.h"
+
+
+struct S2w_blk /* Splitter to workers. */
+{
+ uint64_t id; /* Block serial number as read from infd. */
+ S2w_blk *next; /* Next in queue. */
+ size_t loaded; /* # of bytes in plain, may be 0 for 1st. */
+ char unsigned plain[1]; /* Data read from infd, allocated: sizeof_plain. */
+};
+
+
+struct S2w_q
+{
+ Cond av_or_eof; /* New block available or splitter done. */
+ S2w_blk *tail, /* Splitter will append here. */
+ *head; /* Next ready worker shall compress this. */
+ int eof; /* Splitter done. */
+};
+
+
+static void
+s2w_q_init(S2w_q *s2w_q)
+{
+ xinit(&s2w_q->av_or_eof);
+ s2w_q->tail = 0;
+ s2w_q->head = 0;
+ s2w_q->eof = 0;
+}
+
+
+static void
+s2w_q_uninit(S2w_q *s2w_q)
+{
+ assert(0 != s2w_q->eof);
+ assert(0 == s2w_q->head);
+ assert(0 == s2w_q->tail);
+ xdestroy(&s2w_q->av_or_eof);
+}
+
+
+struct W2m_blk /* Workers to muxer. */
+{
+ uint64_t id; /* Block index as read from infd. */
+ W2m_blk *next; /* Next block in list (unordered). */
+ size_t produced; /* Number of bytes in compr. */
+ char unsigned compr[1]; /* Data to write to outfd, alloc.: sizeof_compr. */
+};
+
+
+static int
+w2m_blk_cmp(const void *v_a, const void *v_b)
+{
+ uint64_t a,
+ b;
+
+ a = ((const W2m_blk *)v_a)->id;
+ b = ((const W2m_blk *)v_b)->id;
+
+ return
+ a < b ? -1
+ : a > b ? 1
+ : 0;
+}
+
+
+struct W2m_q
+{
+ Cond av_or_exit; /* New block available or all workers exited. */
+ uint64_t needed; /* Block needed for resuming writing. */
+ W2m_blk *head; /* Block list (unordered). */
+ unsigned working; /* Number of workers still running. */
+};
+
+
+static void
+w2m_q_init(W2m_q *w2m_q, unsigned num_worker)
+{
+ assert(0u < num_worker);
+ xinit(&w2m_q->av_or_exit);
+ w2m_q->needed = 0u;
+ w2m_q->head = 0;
+ w2m_q->working = num_worker;
+}
+
+
+static void
+w2m_q_uninit(W2m_q *w2m_q)
+{
+ assert(0u == w2m_q->working);
+ assert(0 == w2m_q->head);
+ xdestroy(&w2m_q->av_or_exit);
+}
+
+
+struct M2s_q /* Muxer to splitter. */
+{
+ Cond av; /* Free slot available. */
+ unsigned num_free; /* Number of free slots. */
+};
+
+
+static void
+m2s_q_init(M2s_q *m2s_q, unsigned num_free)
+{
+ assert(0u < num_free);
+ xinit(&m2s_q->av);
+ m2s_q->num_free = num_free;
+}
+
+
+static void
+m2s_q_uninit(M2s_q *m2s_q, unsigned num_free)
+{
+ assert(m2s_q->num_free == num_free);
+ xdestroy(&m2s_q->av);
+}
+
+
+static void
+split(M2s_q *m2s_q, S2w_q *s2w_q, int infd,
+ const size_t sizeof_plain, const size_t sizeof_s2w_blk)
+{
+ uint64_t id;
+ ssize_t rd;
+
+ id = 0u;
+ do {
+ S2w_blk *s2w_blk;
+ size_t vacant;
+
+ /* Grab a free slot. */
+ xlock_pred(&m2s_q->av);
+ while (0u == m2s_q->num_free) {
+ xwait(&m2s_q->av);
+ }
+ --m2s_q->num_free;
+ xunlock(&m2s_q->av);
+ s2w_blk = (S2w_blk *)xalloc(sizeof_s2w_blk);
+
+ /* Fill block. */
+ vacant = sizeof_plain;
+ do {
+ rd = read(infd, s2w_blk->plain + (sizeof_plain - vacant),
+ vacant > (size_t)SSIZE_MAX ? (size_t)SSIZE_MAX : vacant);
+ } while (0 < rd && 0u < (vacant -= (size_t)rd));
+
+ /* Read error. */
+ if (-1 == rd) {
+ fail("read()", errno);
+ }
+
+ if (sizeof_plain == vacant && 0u < id) {
+ /* EOF on first read, but not for first input block. */
+ (*freef)(s2w_blk);
+ xlock(&m2s_q->av);
+ ++m2s_q->num_free;
+ xunlock(&m2s_q->av);
+ }
+ else {
+ s2w_blk->id = id;
+ s2w_blk->next = 0;
+ s2w_blk->loaded = sizeof_plain - vacant;
+ }
+
+ /* We either push a block, or set EOF, or both. */
+ assert(sizeof_plain > vacant || 0 == rd);
+
+ xlock(&s2w_q->av_or_eof);
+ if (0 == s2w_q->head) {
+ xbroadcast(&s2w_q->av_or_eof);
+ }
+
+ if (sizeof_plain > vacant || 0u == id) {
+ if (0 == s2w_q->tail) {
+ s2w_q->head = s2w_blk;
+ }
+ else {
+ s2w_q->tail->next = s2w_blk;
+ }
+ s2w_q->tail = s2w_blk;
+ }
+ s2w_q->eof = (0 == rd);
+ xunlock(&s2w_q->av_or_eof);
+
+ /*
+ If we didn't push a block, then this is bogus, but then we did set EOF,
+ so it doesn't matter, because we'll leave immediately.
+ */
+ ++id;
+ } while (0 < rd);
+}
+
+
+struct Split_arg
+{
+ M2s_q *m2s_q;
+ S2w_q *s2w_q;
+ int infd;
+ size_t sizeof_plain,
+ sizeof_S2w_blk;
+};
+
+
+static void *
+split_wrap(void *v_split_arg)
+{
+ Split_arg *split_arg = (Split_arg *)v_split_arg;
+
+ split(
+ split_arg->m2s_q,
+ split_arg->s2w_q,
+ split_arg->infd,
+ split_arg->sizeof_plain,
+ split_arg->sizeof_S2w_blk
+ );
+ return 0;
+}
+
+
+static void
+work_lz_rd(W2m_blk *w2m_blk, const size_t sizeof_compr, void *lz)
+{
+ int rd;
+
+ assert(w2m_blk->produced < sizeof_compr);
+ rd = LZ_compress_read(lz, w2m_blk->compr + w2m_blk->produced,
+ sizeof_compr - w2m_blk->produced);
+ if (-1 == rd) {
+ show_error( "LZ_compress_read() failed." );
+ fatal();
+ }
+ w2m_blk->produced += (size_t)rd;
+}
+
+
+struct Compr_lev
+{
+ unsigned dict_size,
+ mx_match;
+};
+
+
+static const Compr_lev compr_lev[] = {
+ { 1u * 1024u * 1024u, 10u },
+ { 1u * 1024u * 1024u, 12u },
+ { 1u * 1024u * 1024u, 17u },
+ { 2u * 1024u * 1024u, 26u },
+ { 4u * 1024u * 1024u, 44u },
+ { 8u * 1024u * 1024u, 80u },
+ { 16u * 1024u * 1024u, 108u },
+ { 16u * 1024u * 1024u, 163u },
+ { 32u * 1024u * 1024u, 273u }
+};
+
+
+static void
+work_compr(S2w_blk *s2w_blk, W2m_q *w2m_q, unsigned clidx,
+ const size_t sizeof_compr, const size_t sizeof_w2m_blk)
+{
+ W2m_blk *w2m_blk;
+
+ assert(0u < s2w_blk->loaded || 0u == s2w_blk->id);
+
+ w2m_blk = (W2m_blk *)xalloc(sizeof_w2m_blk);
+
+ /* Single member compression. Settings like with lzip -6. */
+ {
+ void *lz;
+ size_t written;
+
+ lz = LZ_compress_open(compr_lev[clidx].dict_size,
+ compr_lev[clidx].mx_match, (uint64_t)-1 / 2u);
+ if (LZ_ok != LZ_compress_errno(lz)) {
+ show_error( "LZ_compress_open() failed." );
+ fatal();
+ }
+
+ written = 0u;
+ w2m_blk->produced = 0u;
+ while (written < s2w_blk->loaded) {
+ int wr;
+
+ wr = LZ_compress_write(lz, s2w_blk->plain + written,
+ s2w_blk->loaded - written);
+ if (-1 == wr) {
+ show_error( "LZ_compress_write() failed." );
+ fatal();
+ }
+ written += (size_t)wr;
+
+ work_lz_rd(w2m_blk, sizeof_compr, lz);
+ }
+
+ if (-1 == LZ_compress_finish(lz)) {
+ show_error( "LZ_compress_finish() failed." );
+ fatal();
+ }
+
+ while (!LZ_compress_finished(lz)) {
+ work_lz_rd(w2m_blk, sizeof_compr, lz);
+ }
+
+ if (-1 == LZ_compress_close(lz)) {
+ show_error( "LZ_compress_close() failed." );
+ fatal();
+ }
+ }
+
+ w2m_blk->id = s2w_blk->id;
+
+ /* Push block to muxer. */
+ xlock(&w2m_q->av_or_exit);
+ w2m_blk->next = w2m_q->head;
+ w2m_q->head = w2m_blk;
+ if (w2m_blk->id == w2m_q->needed) {
+ xsignal(&w2m_q->av_or_exit);
+ }
+ xunlock(&w2m_q->av_or_exit);
+}
+
+
+static void
+work(S2w_q *s2w_q, W2m_q *w2m_q, unsigned clidx,
+ const size_t sizeof_compr, const size_t sizeof_w2m_blk)
+{
+ for (;;) {
+ S2w_blk *s2w_blk;
+
+ /* Grab a block to work on. */
+ xlock_pred(&s2w_q->av_or_eof);
+ while (0 == s2w_q->head && !s2w_q->eof) {
+ xwait(&s2w_q->av_or_eof);
+ }
+ if (0 == s2w_q->head) {
+ /* No blocks available and splitter exited. */
+ xunlock(&s2w_q->av_or_eof);
+ break;
+ }
+ s2w_blk = s2w_q->head;
+ s2w_q->head = s2w_blk->next;
+ if (0 == s2w_q->head) {
+ s2w_q->tail = 0;
+ }
+ xunlock(&s2w_q->av_or_eof);
+
+ work_compr(s2w_blk, w2m_q, clidx, sizeof_compr, sizeof_w2m_blk);
+ (*freef)(s2w_blk);
+ }
+
+ /* Notify muxer when last worker exits. */
+ xlock(&w2m_q->av_or_exit);
+ if (0u == --w2m_q->working && 0 == w2m_q->head) {
+ xsignal(&w2m_q->av_or_exit);
+ }
+ xunlock(&w2m_q->av_or_exit);
+}
+
+
+struct Work_arg
+{
+ S2w_q *s2w_q;
+ W2m_q *w2m_q;
+ unsigned clidx;
+ size_t sizeof_compr,
+ sizeof_W2m_blk;
+};
+
+
+static void *
+work_wrap(void *v_work_arg)
+{
+ Work_arg *work_arg;
+
+ work_arg = (Work_arg *)v_work_arg;
+ work(
+ work_arg->s2w_q,
+ work_arg->w2m_q,
+ work_arg->clidx,
+ work_arg->sizeof_compr,
+ work_arg->sizeof_W2m_blk
+ );
+ return 0;
+}
+
+
+static void *
+reord_alloc(size_t size, void *)
+{
+ return (*mallocf)(size);
+}
+
+
+static void
+reord_dealloc(void *ptr, void *)
+{
+ (*freef)(ptr);
+}
+
+
+static void
+mux_write(M2s_q *m2s_q, lacos_rbtree_node **reord,
+ uint64_t *reord_needed, int outfd)
+{
+ assert(0 != *reord);
+
+ /*
+ Go on until the tree becomes empty or the next block is found to be
+ missing.
+ */
+ do {
+ lacos_rbtree_node *reord_head;
+ W2m_blk *reord_w2m_blk;
+
+ reord_head = lacos_rbtree_min(*reord);
+ assert(0 != reord_head);
+
+ reord_w2m_blk = (W2m_blk *)(*(void **)reord_head);
+ if (reord_w2m_blk->id != *reord_needed) {
+ break;
+ }
+
+ /* Write out "reord_w2m_blk". */
+ if (-1 != outfd) {
+ char unsigned *cp;
+
+ cp = reord_w2m_blk->compr;
+ while (reord_w2m_blk->produced > 0u) {
+ ssize_t written;
+
+ written = write(outfd, cp, reord_w2m_blk->produced > (size_t)SSIZE_MAX
+ ? (size_t)SSIZE_MAX : reord_w2m_blk->produced);
+ if (-1 == written) {
+ fail("write()", errno);
+ }
+
+ reord_w2m_blk->produced -= (size_t)written;
+ cp += written;
+ }
+ }
+
+ ++*reord_needed;
+
+ xlock(&m2s_q->av);
+ if (0u == m2s_q->num_free++) {
+ xsignal(&m2s_q->av);
+ }
+ xunlock(&m2s_q->av);
+
+ lacos_rbtree_delete(
+ reord, /* new_root */
+ reord_head, /* old_node */
+ 0, /* old_data */
+ reord_dealloc, /* dealloc() */
+ 0 /* alloc_ctl */
+ );
+
+ /* Release "reord_w2m_blk". */
+ (*freef)(reord_w2m_blk);
+ } while (0 != *reord);
+}
+
+
+static void
+mux(W2m_q *w2m_q, M2s_q *m2s_q, int outfd)
+{
+ lacos_rbtree_node *reord;
+ uint64_t reord_needed;
+
+ reord = 0;
+ reord_needed = 0u;
+
+ xlock_pred(&w2m_q->av_or_exit);
+ for (;;) {
+ W2m_blk *w2m_blk;
+
+ /* Grab all available compressed blocks in one step. */
+ while (0 == w2m_q->head && 0u < w2m_q->working) {
+ xwait(&w2m_q->av_or_exit);
+ }
+
+ if (0 == w2m_q->head) {
+ /* w2m_q is empty and all workers exited */
+ break;
+ }
+
+ w2m_blk = w2m_q->head;
+ w2m_q->head = 0;
+ xunlock(&w2m_q->av_or_exit);
+
+ /* Merge blocks fetched this time into tree. */
+ do {
+ lacos_rbtree_node *new_node;
+ W2m_blk *next;
+
+ if (-1 == lacos_rbtree_insert(
+ &reord, /* new_root */
+ &new_node, /* new_node */
+ w2m_blk, /* new_data */
+ w2m_blk_cmp, /* cmp() */
+ reord_alloc, /* alloc() */
+ 0 /* alloc_ctl */
+ )) {
+ /* id collision shouldn't happen */
+ assert(0 == new_node);
+ show_error( "lacos_rbtree_insert(): out of memory." );
+ fatal();
+ }
+
+ next = w2m_blk->next;
+ w2m_blk->next = 0;
+ w2m_blk = next;
+ } while (0 != w2m_blk);
+
+ /* Write out initial continuous sequence of reordered blocks. */
+ mux_write(m2s_q, &reord, &reord_needed, outfd);
+
+ xlock_pred(&w2m_q->av_or_exit);
+ w2m_q->needed = reord_needed;
+ }
+ xunlock(&w2m_q->av_or_exit);
+
+ assert(0 == reord);
+}
+
+
+static void
+plzip(unsigned num_worker, unsigned num_slot, unsigned clidx, int print_cctrs,
+ int infd, int outfd)
+{
+ S2w_q s2w_q;
+ W2m_q w2m_q;
+ M2s_q m2s_q;
+ Split_arg split_arg;
+ pthread_t splitter;
+ Work_arg work_arg;
+ pthread_t *worker;
+ unsigned i;
+
+ assert(clidx < sizeof compr_lev / sizeof compr_lev[0]);
+
+ s2w_q_init(&s2w_q);
+ w2m_q_init(&w2m_q, num_worker);
+ m2s_q_init(&m2s_q, num_slot);
+
+#define SIZES(struc, arr, arsz_unsigned, arg) \
+ do { \
+ unsigned tmp; \
+\
+ tmp = arsz_unsigned; \
+ if ((size_t)-1 < tmp) { \
+ show_error( "size_t overflow in sizeof_" #arr "." ); \
+ fatal(); \
+ } \
+ arg ## _arg . sizeof_ ## arr = tmp; \
+\
+ if ((size_t)-1 - sizeof(struc) \
+ < arg ## _arg . sizeof_ ## arr - (size_t)1) { \
+ show_error( "size_t overflow in sizeof_" #struc "." ); \
+ fatal(); \
+ } \
+ arg ## _arg . sizeof_ ## struc = sizeof(struc) \
+ + (arg ## _arg . sizeof_ ## arr - (size_t)1); \
+ } while (0)
+
+ split_arg.m2s_q = &m2s_q;
+ split_arg.s2w_q = &s2w_q;
+ split_arg.infd = infd;
+ SIZES(S2w_blk, plain, 2u * compr_lev[clidx].dict_size, split);
+ xcreate(&splitter, split_wrap, &split_arg);
+
+ work_arg.s2w_q = &s2w_q;
+ work_arg.w2m_q = &w2m_q;
+ work_arg.clidx = clidx;
+ SIZES(W2m_blk, compr, (4u + 1u + 1u)
+ + ((unsigned)split_arg.sizeof_plain * 9u + 7u) / 8u + (4u + 8u + 8u),
+ work);
+
+#undef SIZES
+
+ assert(0u < num_worker);
+ assert((size_t)-1 / sizeof *worker >= num_worker);
+ worker = (pthread_t *)xalloc(num_worker * sizeof *worker);
+ for (i = 0u; i < num_worker; ++i) {
+ xcreate(&worker[i], work_wrap, &work_arg);
+ }
+
+ mux(&w2m_q, &m2s_q, outfd);
+
+ i = num_worker;
+ do {
+ xjoin(worker[--i]);
+ } while (0u < i);
+ (*freef)(worker);
+
+ xjoin(splitter);
+
+ const int FW = ( sizeof(long unsigned) * 8 ) / 3 + 1;
+ if (print_cctrs && 0 > fprintf(stderr,
+ "any worker tried to consume from splitter: %*lu\n"
+ "any worker stalled : %*lu\n"
+ "muxer tried to consume from workers : %*lu\n"
+ "muxer stalled : %*lu\n"
+ "splitter tried to consume from muxer : %*lu\n"
+ "splitter stalled : %*lu\n",
+ FW, s2w_q.av_or_eof.ccount,
+ FW, s2w_q.av_or_eof.wcount,
+ FW, w2m_q.av_or_exit.ccount,
+ FW, w2m_q.av_or_exit.wcount,
+ FW, m2s_q.av.ccount,
+ FW, m2s_q.av.wcount) )
+ {
+ fatal();
+ }
+
+ m2s_q_uninit(&m2s_q, num_slot);
+ w2m_q_uninit(&w2m_q);
+ s2w_q_uninit(&s2w_q);
+}
+
+
+void *
+plzip_wrap(void *v_arg)
+{
+ Plzip_arg *arg = (Plzip_arg *)v_arg;
+ plzip(
+ arg->num_worker,
+ arg->num_slot,
+ arg->clidx,
+ arg->print_cctrs,
+ arg->infd,
+ arg->outfd
+ );
+
+ xraise(SIGUSR2);
+ return 0;
+}