/* Plzip - A parallel version of the lzip data compressor
Copyright (C) 2009 Laszlo Ersek.
Copyright (C) 2009 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
*/
#define _FILE_OFFSET_BITS 64
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "main.h"
#include "plzip.h"
#include "lacos_rbtree.h"
struct S2w_blk /* Splitter to workers. */
{
uint64_t id; /* Block serial number as read from infd. */
S2w_blk *next; /* Next in queue. */
size_t loaded; /* # of bytes in plain, may be 0 for 1st. */
char unsigned plain[1]; /* Data read from infd, allocated: sizeof_plain. */
};
struct S2w_q
{
Cond av_or_eof; /* New block available or splitter done. */
S2w_blk *tail, /* Splitter will append here. */
*head; /* Next ready worker shall compress this. */
int eof; /* Splitter done. */
};
static void
s2w_q_init(S2w_q *s2w_q)
{
xinit(&s2w_q->av_or_eof);
s2w_q->tail = 0;
s2w_q->head = 0;
s2w_q->eof = 0;
}
static void
s2w_q_uninit(S2w_q *s2w_q)
{
assert(0 != s2w_q->eof);
assert(0 == s2w_q->head);
assert(0 == s2w_q->tail);
xdestroy(&s2w_q->av_or_eof);
}
struct W2m_blk /* Workers to muxer. */
{
uint64_t id; /* Block index as read from infd. */
W2m_blk *next; /* Next block in list (unordered). */
size_t produced; /* Number of bytes in compr. */
char unsigned compr[1]; /* Data to write to outfd, alloc.: sizeof_compr. */
};
static int
w2m_blk_cmp(const void *v_a, const void *v_b)
{
uint64_t a,
b;
a = ((const W2m_blk *)v_a)->id;
b = ((const W2m_blk *)v_b)->id;
return
a < b ? -1
: a > b ? 1
: 0;
}
struct W2m_q
{
Cond av_or_exit; /* New block available or all workers exited. */
uint64_t needed; /* Block needed for resuming writing. */
W2m_blk *head; /* Block list (unordered). */
unsigned working; /* Number of workers still running. */
};
static void
w2m_q_init(W2m_q *w2m_q, unsigned num_worker)
{
assert(0u < num_worker);
xinit(&w2m_q->av_or_exit);
w2m_q->needed = 0u;
w2m_q->head = 0;
w2m_q->working = num_worker;
}
static void
w2m_q_uninit(W2m_q *w2m_q)
{
assert(0u == w2m_q->working);
assert(0 == w2m_q->head);
xdestroy(&w2m_q->av_or_exit);
}
struct M2s_q /* Muxer to splitter. */
{
Cond av; /* Free slot available. */
unsigned num_free; /* Number of free slots. */
};
static void
m2s_q_init(M2s_q *m2s_q, unsigned num_free)
{
assert(0u < num_free);
xinit(&m2s_q->av);
m2s_q->num_free = num_free;
}
static void
m2s_q_uninit(M2s_q *m2s_q, unsigned num_free)
{
assert(m2s_q->num_free == num_free);
xdestroy(&m2s_q->av);
}
static void
split(M2s_q *m2s_q, S2w_q *s2w_q, int infd,
const size_t sizeof_plain, const size_t sizeof_s2w_blk)
{
uint64_t id;
ssize_t rd;
id = 0u;
do {
S2w_blk *s2w_blk;
size_t vacant;
/* Grab a free slot. */
xlock_pred(&m2s_q->av);
while (0u == m2s_q->num_free) {
xwait(&m2s_q->av);
}
--m2s_q->num_free;
xunlock(&m2s_q->av);
s2w_blk = (S2w_blk *)xalloc(sizeof_s2w_blk);
/* Fill block. */
vacant = sizeof_plain;
do {
rd = read(infd, s2w_blk->plain + (sizeof_plain - vacant),
vacant > (size_t)SSIZE_MAX ? (size_t)SSIZE_MAX : vacant);
} while (0 < rd && 0u < (vacant -= (size_t)rd));
/* Read error. */
if (-1 == rd) {
fail("read()", errno);
}
if (sizeof_plain == vacant && 0u < id) {
/* EOF on first read, but not for first input block. */
(*freef)(s2w_blk);
xlock(&m2s_q->av);
++m2s_q->num_free;
xunlock(&m2s_q->av);
}
else {
s2w_blk->id = id;
s2w_blk->next = 0;
s2w_blk->loaded = sizeof_plain - vacant;
}
/* We either push a block, or set EOF, or both. */
assert(sizeof_plain > vacant || 0 == rd);
xlock(&s2w_q->av_or_eof);
if (0 == s2w_q->head) {
xbroadcast(&s2w_q->av_or_eof);
}
if (sizeof_plain > vacant || 0u == id) {
if (0 == s2w_q->tail) {
s2w_q->head = s2w_blk;
}
else {
s2w_q->tail->next = s2w_blk;
}
s2w_q->tail = s2w_blk;
}
s2w_q->eof = (0 == rd);
xunlock(&s2w_q->av_or_eof);
/*
If we didn't push a block, then this is bogus, but then we did set EOF,
so it doesn't matter, because we'll leave immediately.
*/
++id;
} while (0 < rd);
}
struct Split_arg
{
M2s_q *m2s_q;
S2w_q *s2w_q;
int infd;
size_t sizeof_plain,
sizeof_S2w_blk;
};
static void *
split_wrap(void *v_split_arg)
{
Split_arg *split_arg = (Split_arg *)v_split_arg;
split(
split_arg->m2s_q,
split_arg->s2w_q,
split_arg->infd,
split_arg->sizeof_plain,
split_arg->sizeof_S2w_blk
);
return 0;
}
static void
work_lz_rd(W2m_blk *w2m_blk, const size_t sizeof_compr, void *lz)
{
int rd;
assert(w2m_blk->produced < sizeof_compr);
rd = LZ_compress_read(lz, w2m_blk->compr + w2m_blk->produced,
sizeof_compr - w2m_blk->produced);
if (-1 == rd) {
show_error( "LZ_compress_read() failed." );
fatal();
}
w2m_blk->produced += (size_t)rd;
}
struct Compr_lev
{
unsigned dict_size,
mx_match;
};
static const Compr_lev compr_lev[] = {
{ 1u * 1024u * 1024u, 10u },
{ 1u * 1024u * 1024u, 12u },
{ 1u * 1024u * 1024u, 17u },
{ 2u * 1024u * 1024u, 26u },
{ 4u * 1024u * 1024u, 44u },
{ 8u * 1024u * 1024u, 80u },
{ 16u * 1024u * 1024u, 108u },
{ 16u * 1024u * 1024u, 163u },
{ 32u * 1024u * 1024u, 273u }
};
static void
work_compr(S2w_blk *s2w_blk, W2m_q *w2m_q, unsigned clidx,
const size_t sizeof_compr, const size_t sizeof_w2m_blk)
{
W2m_blk *w2m_blk;
assert(0u < s2w_blk->loaded || 0u == s2w_blk->id);
w2m_blk = (W2m_blk *)xalloc(sizeof_w2m_blk);
/* Single member compression. Settings like with lzip -6. */
{
void *lz;
size_t written;
lz = LZ_compress_open(compr_lev[clidx].dict_size,
compr_lev[clidx].mx_match, (uint64_t)-1 / 2u);
if (LZ_ok != LZ_compress_errno(lz)) {
show_error( "LZ_compress_open() failed." );
fatal();
}
written = 0u;
w2m_blk->produced = 0u;
while (written < s2w_blk->loaded) {
int wr;
wr = LZ_compress_write(lz, s2w_blk->plain + written,
s2w_blk->loaded - written);
if (-1 == wr) {
show_error( "LZ_compress_write() failed." );
fatal();
}
written += (size_t)wr;
work_lz_rd(w2m_blk, sizeof_compr, lz);
}
if (-1 == LZ_compress_finish(lz)) {
show_error( "LZ_compress_finish() failed." );
fatal();
}
while (!LZ_compress_finished(lz)) {
work_lz_rd(w2m_blk, sizeof_compr, lz);
}
if (-1 == LZ_compress_close(lz)) {
show_error( "LZ_compress_close() failed." );
fatal();
}
}
w2m_blk->id = s2w_blk->id;
/* Push block to muxer. */
xlock(&w2m_q->av_or_exit);
w2m_blk->next = w2m_q->head;
w2m_q->head = w2m_blk;
if (w2m_blk->id == w2m_q->needed) {
xsignal(&w2m_q->av_or_exit);
}
xunlock(&w2m_q->av_or_exit);
}
static void
work(S2w_q *s2w_q, W2m_q *w2m_q, unsigned clidx,
const size_t sizeof_compr, const size_t sizeof_w2m_blk)
{
for (;;) {
S2w_blk *s2w_blk;
/* Grab a block to work on. */
xlock_pred(&s2w_q->av_or_eof);
while (0 == s2w_q->head && !s2w_q->eof) {
xwait(&s2w_q->av_or_eof);
}
if (0 == s2w_q->head) {
/* No blocks available and splitter exited. */
xunlock(&s2w_q->av_or_eof);
break;
}
s2w_blk = s2w_q->head;
s2w_q->head = s2w_blk->next;
if (0 == s2w_q->head) {
s2w_q->tail = 0;
}
xunlock(&s2w_q->av_or_eof);
work_compr(s2w_blk, w2m_q, clidx, sizeof_compr, sizeof_w2m_blk);
(*freef)(s2w_blk);
}
/* Notify muxer when last worker exits. */
xlock(&w2m_q->av_or_exit);
if (0u == --w2m_q->working && 0 == w2m_q->head) {
xsignal(&w2m_q->av_or_exit);
}
xunlock(&w2m_q->av_or_exit);
}
struct Work_arg
{
S2w_q *s2w_q;
W2m_q *w2m_q;
unsigned clidx;
size_t sizeof_compr,
sizeof_W2m_blk;
};
static void *
work_wrap(void *v_work_arg)
{
Work_arg *work_arg;
work_arg = (Work_arg *)v_work_arg;
work(
work_arg->s2w_q,
work_arg->w2m_q,
work_arg->clidx,
work_arg->sizeof_compr,
work_arg->sizeof_W2m_blk
);
return 0;
}
static void *
reord_alloc(size_t size, void *)
{
return (*mallocf)(size);
}
static void
reord_dealloc(void *ptr, void *)
{
(*freef)(ptr);
}
static void
mux_write(M2s_q *m2s_q, lacos_rbtree_node **reord,
uint64_t *reord_needed, int outfd)
{
assert(0 != *reord);
/*
Go on until the tree becomes empty or the next block is found to be
missing.
*/
do {
lacos_rbtree_node *reord_head;
W2m_blk *reord_w2m_blk;
reord_head = lacos_rbtree_min(*reord);
assert(0 != reord_head);
reord_w2m_blk = (W2m_blk *)(*(void **)reord_head);
if (reord_w2m_blk->id != *reord_needed) {
break;
}
/* Write out "reord_w2m_blk". */
if (-1 != outfd) {
char unsigned *cp;
cp = reord_w2m_blk->compr;
while (reord_w2m_blk->produced > 0u) {
ssize_t written;
written = write(outfd, cp, reord_w2m_blk->produced > (size_t)SSIZE_MAX
? (size_t)SSIZE_MAX : reord_w2m_blk->produced);
if (-1 == written) {
fail("write()", errno);
}
reord_w2m_blk->produced -= (size_t)written;
cp += written;
}
}
++*reord_needed;
xlock(&m2s_q->av);
if (0u == m2s_q->num_free++) {
xsignal(&m2s_q->av);
}
xunlock(&m2s_q->av);
lacos_rbtree_delete(
reord, /* new_root */
reord_head, /* old_node */
0, /* old_data */
reord_dealloc, /* dealloc() */
0 /* alloc_ctl */
);
/* Release "reord_w2m_blk". */
(*freef)(reord_w2m_blk);
} while (0 != *reord);
}
static void
mux(W2m_q *w2m_q, M2s_q *m2s_q, int outfd)
{
lacos_rbtree_node *reord;
uint64_t reord_needed;
reord = 0;
reord_needed = 0u;
xlock_pred(&w2m_q->av_or_exit);
for (;;) {
W2m_blk *w2m_blk;
/* Grab all available compressed blocks in one step. */
while (0 == w2m_q->head && 0u < w2m_q->working) {
xwait(&w2m_q->av_or_exit);
}
if (0 == w2m_q->head) {
/* w2m_q is empty and all workers exited */
break;
}
w2m_blk = w2m_q->head;
w2m_q->head = 0;
xunlock(&w2m_q->av_or_exit);
/* Merge blocks fetched this time into tree. */
do {
lacos_rbtree_node *new_node;
W2m_blk *next;
if (-1 == lacos_rbtree_insert(
&reord, /* new_root */
&new_node, /* new_node */
w2m_blk, /* new_data */
w2m_blk_cmp, /* cmp() */
reord_alloc, /* alloc() */
0 /* alloc_ctl */
)) {
/* id collision shouldn't happen */
assert(0 == new_node);
show_error( "lacos_rbtree_insert(): out of memory." );
fatal();
}
next = w2m_blk->next;
w2m_blk->next = 0;
w2m_blk = next;
} while (0 != w2m_blk);
/* Write out initial continuous sequence of reordered blocks. */
mux_write(m2s_q, &reord, &reord_needed, outfd);
xlock_pred(&w2m_q->av_or_exit);
w2m_q->needed = reord_needed;
}
xunlock(&w2m_q->av_or_exit);
assert(0 == reord);
}
static void
plzip(unsigned num_worker, unsigned num_slot, unsigned clidx, int print_cctrs,
int infd, int outfd)
{
S2w_q s2w_q;
W2m_q w2m_q;
M2s_q m2s_q;
Split_arg split_arg;
pthread_t splitter;
Work_arg work_arg;
pthread_t *worker;
unsigned i;
assert(clidx < sizeof compr_lev / sizeof compr_lev[0]);
s2w_q_init(&s2w_q);
w2m_q_init(&w2m_q, num_worker);
m2s_q_init(&m2s_q, num_slot);
#define SIZES(struc, arr, arsz_unsigned, arg) \
do { \
unsigned tmp; \
\
tmp = arsz_unsigned; \
if ((size_t)-1 < tmp) { \
show_error( "size_t overflow in sizeof_" #arr "." ); \
fatal(); \
} \
arg ## _arg . sizeof_ ## arr = tmp; \
\
if ((size_t)-1 - sizeof(struc) \
< arg ## _arg . sizeof_ ## arr - (size_t)1) { \
show_error( "size_t overflow in sizeof_" #struc "." ); \
fatal(); \
} \
arg ## _arg . sizeof_ ## struc = sizeof(struc) \
+ (arg ## _arg . sizeof_ ## arr - (size_t)1); \
} while (0)
split_arg.m2s_q = &m2s_q;
split_arg.s2w_q = &s2w_q;
split_arg.infd = infd;
SIZES(S2w_blk, plain, 2u * compr_lev[clidx].dict_size, split);
xcreate(&splitter, split_wrap, &split_arg);
work_arg.s2w_q = &s2w_q;
work_arg.w2m_q = &w2m_q;
work_arg.clidx = clidx;
SIZES(W2m_blk, compr, (4u + 1u + 1u)
+ ((unsigned)split_arg.sizeof_plain * 9u + 7u) / 8u + (4u + 8u + 8u),
work);
#undef SIZES
assert(0u < num_worker);
assert((size_t)-1 / sizeof *worker >= num_worker);
worker = (pthread_t *)xalloc(num_worker * sizeof *worker);
for (i = 0u; i < num_worker; ++i) {
xcreate(&worker[i], work_wrap, &work_arg);
}
mux(&w2m_q, &m2s_q, outfd);
i = num_worker;
do {
xjoin(worker[--i]);
} while (0u < i);
(*freef)(worker);
xjoin(splitter);
const int FW = ( sizeof(long unsigned) * 8 ) / 3 + 1;
if (print_cctrs && 0 > fprintf(stderr,
"any worker tried to consume from splitter: %*lu\n"
"any worker stalled : %*lu\n"
"muxer tried to consume from workers : %*lu\n"
"muxer stalled : %*lu\n"
"splitter tried to consume from muxer : %*lu\n"
"splitter stalled : %*lu\n",
FW, s2w_q.av_or_eof.ccount,
FW, s2w_q.av_or_eof.wcount,
FW, w2m_q.av_or_exit.ccount,
FW, w2m_q.av_or_exit.wcount,
FW, m2s_q.av.ccount,
FW, m2s_q.av.wcount) )
{
fatal();
}
m2s_q_uninit(&m2s_q, num_slot);
w2m_q_uninit(&w2m_q);
s2w_q_uninit(&s2w_q);
}
void *
plzip_wrap(void *v_arg)
{
Plzip_arg *arg = (Plzip_arg *)v_arg;
plzip(
arg->num_worker,
arg->num_slot,
arg->clidx,
arg->print_cctrs,
arg->infd,
arg->outfd
);
xraise(SIGUSR2);
return 0;
}