/* Plzip - A parallel version of the lzip data compressor Copyright (C) 2009 Laszlo Ersek. Copyright (C) 2009 Antonio Diaz Diaz. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #define _FILE_OFFSET_BITS 64 #include #include #include #include #include #include #include #include #include #include "main.h" #include "plzip.h" #include "lacos_rbtree.h" struct S2w_blk /* Splitter to workers. */ { uint64_t id; /* Block serial number as read from infd. */ S2w_blk *next; /* Next in queue. */ size_t loaded; /* # of bytes in plain, may be 0 for 1st. */ char unsigned plain[1]; /* Data read from infd, allocated: sizeof_plain. */ }; struct S2w_q { Cond av_or_eof; /* New block available or splitter done. */ S2w_blk *tail, /* Splitter will append here. */ *head; /* Next ready worker shall compress this. */ int eof; /* Splitter done. */ }; static void s2w_q_init(S2w_q *s2w_q) { xinit(&s2w_q->av_or_eof); s2w_q->tail = 0; s2w_q->head = 0; s2w_q->eof = 0; } static void s2w_q_uninit(S2w_q *s2w_q) { assert(0 != s2w_q->eof); assert(0 == s2w_q->head); assert(0 == s2w_q->tail); xdestroy(&s2w_q->av_or_eof); } struct W2m_blk /* Workers to muxer. */ { uint64_t id; /* Block index as read from infd. */ W2m_blk *next; /* Next block in list (unordered). */ size_t produced; /* Number of bytes in compr. */ char unsigned compr[1]; /* Data to write to outfd, alloc.: sizeof_compr. */ }; static int w2m_blk_cmp(const void *v_a, const void *v_b) { uint64_t a, b; a = ((const W2m_blk *)v_a)->id; b = ((const W2m_blk *)v_b)->id; return a < b ? -1 : a > b ? 1 : 0; } struct W2m_q { Cond av_or_exit; /* New block available or all workers exited. */ uint64_t needed; /* Block needed for resuming writing. */ W2m_blk *head; /* Block list (unordered). */ unsigned working; /* Number of workers still running. */ }; static void w2m_q_init(W2m_q *w2m_q, unsigned num_worker) { assert(0u < num_worker); xinit(&w2m_q->av_or_exit); w2m_q->needed = 0u; w2m_q->head = 0; w2m_q->working = num_worker; } static void w2m_q_uninit(W2m_q *w2m_q) { assert(0u == w2m_q->working); assert(0 == w2m_q->head); xdestroy(&w2m_q->av_or_exit); } struct M2s_q /* Muxer to splitter. */ { Cond av; /* Free slot available. */ unsigned num_free; /* Number of free slots. */ }; static void m2s_q_init(M2s_q *m2s_q, unsigned num_free) { assert(0u < num_free); xinit(&m2s_q->av); m2s_q->num_free = num_free; } static void m2s_q_uninit(M2s_q *m2s_q, unsigned num_free) { assert(m2s_q->num_free == num_free); xdestroy(&m2s_q->av); } static void split(M2s_q *m2s_q, S2w_q *s2w_q, int infd, const size_t sizeof_plain, const size_t sizeof_s2w_blk) { uint64_t id; ssize_t rd; id = 0u; do { S2w_blk *s2w_blk; size_t vacant; /* Grab a free slot. */ xlock_pred(&m2s_q->av); while (0u == m2s_q->num_free) { xwait(&m2s_q->av); } --m2s_q->num_free; xunlock(&m2s_q->av); s2w_blk = (S2w_blk *)xalloc(sizeof_s2w_blk); /* Fill block. */ vacant = sizeof_plain; do { rd = read(infd, s2w_blk->plain + (sizeof_plain - vacant), vacant > (size_t)SSIZE_MAX ? (size_t)SSIZE_MAX : vacant); } while (0 < rd && 0u < (vacant -= (size_t)rd)); /* Read error. */ if (-1 == rd) { fail("read()", errno); } if (sizeof_plain == vacant && 0u < id) { /* EOF on first read, but not for first input block. */ (*freef)(s2w_blk); xlock(&m2s_q->av); ++m2s_q->num_free; xunlock(&m2s_q->av); } else { s2w_blk->id = id; s2w_blk->next = 0; s2w_blk->loaded = sizeof_plain - vacant; } /* We either push a block, or set EOF, or both. */ assert(sizeof_plain > vacant || 0 == rd); xlock(&s2w_q->av_or_eof); if (0 == s2w_q->head) { xbroadcast(&s2w_q->av_or_eof); } if (sizeof_plain > vacant || 0u == id) { if (0 == s2w_q->tail) { s2w_q->head = s2w_blk; } else { s2w_q->tail->next = s2w_blk; } s2w_q->tail = s2w_blk; } s2w_q->eof = (0 == rd); xunlock(&s2w_q->av_or_eof); /* If we didn't push a block, then this is bogus, but then we did set EOF, so it doesn't matter, because we'll leave immediately. */ ++id; } while (0 < rd); } struct Split_arg { M2s_q *m2s_q; S2w_q *s2w_q; int infd; size_t sizeof_plain, sizeof_S2w_blk; }; static void * split_wrap(void *v_split_arg) { Split_arg *split_arg = (Split_arg *)v_split_arg; split( split_arg->m2s_q, split_arg->s2w_q, split_arg->infd, split_arg->sizeof_plain, split_arg->sizeof_S2w_blk ); return 0; } static void work_lz_rd(W2m_blk *w2m_blk, const size_t sizeof_compr, void *lz) { int rd; assert(w2m_blk->produced < sizeof_compr); rd = LZ_compress_read(lz, w2m_blk->compr + w2m_blk->produced, sizeof_compr - w2m_blk->produced); if (-1 == rd) { show_error( "LZ_compress_read() failed." ); fatal(); } w2m_blk->produced += (size_t)rd; } struct Compr_lev { unsigned dict_size, mx_match; }; static const Compr_lev compr_lev[] = { { 1u * 1024u * 1024u, 10u }, { 1u * 1024u * 1024u, 12u }, { 1u * 1024u * 1024u, 17u }, { 2u * 1024u * 1024u, 26u }, { 4u * 1024u * 1024u, 44u }, { 8u * 1024u * 1024u, 80u }, { 16u * 1024u * 1024u, 108u }, { 16u * 1024u * 1024u, 163u }, { 32u * 1024u * 1024u, 273u } }; static void work_compr(S2w_blk *s2w_blk, W2m_q *w2m_q, unsigned clidx, const size_t sizeof_compr, const size_t sizeof_w2m_blk) { W2m_blk *w2m_blk; assert(0u < s2w_blk->loaded || 0u == s2w_blk->id); w2m_blk = (W2m_blk *)xalloc(sizeof_w2m_blk); /* Single member compression. Settings like with lzip -6. */ { void *lz; size_t written; lz = LZ_compress_open(compr_lev[clidx].dict_size, compr_lev[clidx].mx_match, (uint64_t)-1 / 2u); if (LZ_ok != LZ_compress_errno(lz)) { show_error( "LZ_compress_open() failed." ); fatal(); } written = 0u; w2m_blk->produced = 0u; while (written < s2w_blk->loaded) { int wr; wr = LZ_compress_write(lz, s2w_blk->plain + written, s2w_blk->loaded - written); if (-1 == wr) { show_error( "LZ_compress_write() failed." ); fatal(); } written += (size_t)wr; work_lz_rd(w2m_blk, sizeof_compr, lz); } if (-1 == LZ_compress_finish(lz)) { show_error( "LZ_compress_finish() failed." ); fatal(); } while (!LZ_compress_finished(lz)) { work_lz_rd(w2m_blk, sizeof_compr, lz); } if (-1 == LZ_compress_close(lz)) { show_error( "LZ_compress_close() failed." ); fatal(); } } w2m_blk->id = s2w_blk->id; /* Push block to muxer. */ xlock(&w2m_q->av_or_exit); w2m_blk->next = w2m_q->head; w2m_q->head = w2m_blk; if (w2m_blk->id == w2m_q->needed) { xsignal(&w2m_q->av_or_exit); } xunlock(&w2m_q->av_or_exit); } static void work(S2w_q *s2w_q, W2m_q *w2m_q, unsigned clidx, const size_t sizeof_compr, const size_t sizeof_w2m_blk) { for (;;) { S2w_blk *s2w_blk; /* Grab a block to work on. */ xlock_pred(&s2w_q->av_or_eof); while (0 == s2w_q->head && !s2w_q->eof) { xwait(&s2w_q->av_or_eof); } if (0 == s2w_q->head) { /* No blocks available and splitter exited. */ xunlock(&s2w_q->av_or_eof); break; } s2w_blk = s2w_q->head; s2w_q->head = s2w_blk->next; if (0 == s2w_q->head) { s2w_q->tail = 0; } xunlock(&s2w_q->av_or_eof); work_compr(s2w_blk, w2m_q, clidx, sizeof_compr, sizeof_w2m_blk); (*freef)(s2w_blk); } /* Notify muxer when last worker exits. */ xlock(&w2m_q->av_or_exit); if (0u == --w2m_q->working && 0 == w2m_q->head) { xsignal(&w2m_q->av_or_exit); } xunlock(&w2m_q->av_or_exit); } struct Work_arg { S2w_q *s2w_q; W2m_q *w2m_q; unsigned clidx; size_t sizeof_compr, sizeof_W2m_blk; }; static void * work_wrap(void *v_work_arg) { Work_arg *work_arg; work_arg = (Work_arg *)v_work_arg; work( work_arg->s2w_q, work_arg->w2m_q, work_arg->clidx, work_arg->sizeof_compr, work_arg->sizeof_W2m_blk ); return 0; } static void * reord_alloc(size_t size, void *) { return (*mallocf)(size); } static void reord_dealloc(void *ptr, void *) { (*freef)(ptr); } static void mux_write(M2s_q *m2s_q, lacos_rbtree_node **reord, uint64_t *reord_needed, int outfd) { assert(0 != *reord); /* Go on until the tree becomes empty or the next block is found to be missing. */ do { lacos_rbtree_node *reord_head; W2m_blk *reord_w2m_blk; reord_head = lacos_rbtree_min(*reord); assert(0 != reord_head); reord_w2m_blk = (W2m_blk *)(*(void **)reord_head); if (reord_w2m_blk->id != *reord_needed) { break; } /* Write out "reord_w2m_blk". */ if (-1 != outfd) { char unsigned *cp; cp = reord_w2m_blk->compr; while (reord_w2m_blk->produced > 0u) { ssize_t written; written = write(outfd, cp, reord_w2m_blk->produced > (size_t)SSIZE_MAX ? (size_t)SSIZE_MAX : reord_w2m_blk->produced); if (-1 == written) { fail("write()", errno); } reord_w2m_blk->produced -= (size_t)written; cp += written; } } ++*reord_needed; xlock(&m2s_q->av); if (0u == m2s_q->num_free++) { xsignal(&m2s_q->av); } xunlock(&m2s_q->av); lacos_rbtree_delete( reord, /* new_root */ reord_head, /* old_node */ 0, /* old_data */ reord_dealloc, /* dealloc() */ 0 /* alloc_ctl */ ); /* Release "reord_w2m_blk". */ (*freef)(reord_w2m_blk); } while (0 != *reord); } static void mux(W2m_q *w2m_q, M2s_q *m2s_q, int outfd) { lacos_rbtree_node *reord; uint64_t reord_needed; reord = 0; reord_needed = 0u; xlock_pred(&w2m_q->av_or_exit); for (;;) { W2m_blk *w2m_blk; /* Grab all available compressed blocks in one step. */ while (0 == w2m_q->head && 0u < w2m_q->working) { xwait(&w2m_q->av_or_exit); } if (0 == w2m_q->head) { /* w2m_q is empty and all workers exited */ break; } w2m_blk = w2m_q->head; w2m_q->head = 0; xunlock(&w2m_q->av_or_exit); /* Merge blocks fetched this time into tree. */ do { lacos_rbtree_node *new_node; W2m_blk *next; if (-1 == lacos_rbtree_insert( &reord, /* new_root */ &new_node, /* new_node */ w2m_blk, /* new_data */ w2m_blk_cmp, /* cmp() */ reord_alloc, /* alloc() */ 0 /* alloc_ctl */ )) { /* id collision shouldn't happen */ assert(0 == new_node); show_error( "lacos_rbtree_insert(): out of memory." ); fatal(); } next = w2m_blk->next; w2m_blk->next = 0; w2m_blk = next; } while (0 != w2m_blk); /* Write out initial continuous sequence of reordered blocks. */ mux_write(m2s_q, &reord, &reord_needed, outfd); xlock_pred(&w2m_q->av_or_exit); w2m_q->needed = reord_needed; } xunlock(&w2m_q->av_or_exit); assert(0 == reord); } static void plzip(unsigned num_worker, unsigned num_slot, unsigned clidx, int print_cctrs, int infd, int outfd) { S2w_q s2w_q; W2m_q w2m_q; M2s_q m2s_q; Split_arg split_arg; pthread_t splitter; Work_arg work_arg; pthread_t *worker; unsigned i; assert(clidx < sizeof compr_lev / sizeof compr_lev[0]); s2w_q_init(&s2w_q); w2m_q_init(&w2m_q, num_worker); m2s_q_init(&m2s_q, num_slot); #define SIZES(struc, arr, arsz_unsigned, arg) \ do { \ unsigned tmp; \ \ tmp = arsz_unsigned; \ if ((size_t)-1 < tmp) { \ show_error( "size_t overflow in sizeof_" #arr "." ); \ fatal(); \ } \ arg ## _arg . sizeof_ ## arr = tmp; \ \ if ((size_t)-1 - sizeof(struc) \ < arg ## _arg . sizeof_ ## arr - (size_t)1) { \ show_error( "size_t overflow in sizeof_" #struc "." ); \ fatal(); \ } \ arg ## _arg . sizeof_ ## struc = sizeof(struc) \ + (arg ## _arg . sizeof_ ## arr - (size_t)1); \ } while (0) split_arg.m2s_q = &m2s_q; split_arg.s2w_q = &s2w_q; split_arg.infd = infd; SIZES(S2w_blk, plain, 2u * compr_lev[clidx].dict_size, split); xcreate(&splitter, split_wrap, &split_arg); work_arg.s2w_q = &s2w_q; work_arg.w2m_q = &w2m_q; work_arg.clidx = clidx; SIZES(W2m_blk, compr, (4u + 1u + 1u) + ((unsigned)split_arg.sizeof_plain * 9u + 7u) / 8u + (4u + 8u + 8u), work); #undef SIZES assert(0u < num_worker); assert((size_t)-1 / sizeof *worker >= num_worker); worker = (pthread_t *)xalloc(num_worker * sizeof *worker); for (i = 0u; i < num_worker; ++i) { xcreate(&worker[i], work_wrap, &work_arg); } mux(&w2m_q, &m2s_q, outfd); i = num_worker; do { xjoin(worker[--i]); } while (0u < i); (*freef)(worker); xjoin(splitter); const int FW = ( sizeof(long unsigned) * 8 ) / 3 + 1; if (print_cctrs && 0 > fprintf(stderr, "any worker tried to consume from splitter: %*lu\n" "any worker stalled : %*lu\n" "muxer tried to consume from workers : %*lu\n" "muxer stalled : %*lu\n" "splitter tried to consume from muxer : %*lu\n" "splitter stalled : %*lu\n", FW, s2w_q.av_or_eof.ccount, FW, s2w_q.av_or_eof.wcount, FW, w2m_q.av_or_exit.ccount, FW, w2m_q.av_or_exit.wcount, FW, m2s_q.av.ccount, FW, m2s_q.av.wcount) ) { fatal(); } m2s_q_uninit(&m2s_q, num_slot); w2m_q_uninit(&w2m_q); s2w_q_uninit(&s2w_q); } void * plzip_wrap(void *v_arg) { Plzip_arg *arg = (Plzip_arg *)v_arg; plzip( arg->num_worker, arg->num_slot, arg->clidx, arg->print_cctrs, arg->infd, arg->outfd ); xraise(SIGUSR2); return 0; }