summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog10
-rw-r--r--Makefile.in4
-rw-r--r--NEWS13
-rwxr-xr-xconfigure4
-rw-r--r--doc/plzip.15
-rw-r--r--doc/plzip.info26
-rw-r--r--doc/plzip.texinfo20
-rw-r--r--main.cc445
-rw-r--r--main.h77
-rw-r--r--plzip.cc641
-rw-r--r--plzip.h26
11 files changed, 456 insertions, 815 deletions
diff --git a/ChangeLog b/ChangeLog
index 02c918f..b0ef7eb 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,13 @@
+2010-01-24 Antonio Diaz Diaz <ant_diaz@teleline.es>
+
+ * Version 0.3 released.
+ * Implemented option "--data-size".
+ * Output file is now removed if plzip is interrupted.
+ * This version automatically chooses the smallest possible
+ dictionary size for each member during compression, saving
+ memory during decompression.
+ * main.cc: New constant "o_binary".
+
2010-01-17 Antonio Diaz Diaz <ant_diaz@teleline.es>
* Version 0.2 released.
diff --git a/Makefile.in b/Makefile.in
index f1267e2..6eea4bf 100644
--- a/Makefile.in
+++ b/Makefile.in
@@ -30,8 +30,8 @@ main.o : main.cc
$(objs) : Makefile
arg_parser.o : arg_parser.h
-main.o : arg_parser.h main.h plzip.h
-plzip.o : main.h plzip.h
+main.o : arg_parser.h plzip.h
+plzip.o : plzip.h
doc : info man
diff --git a/NEWS b/NEWS
index 1587984..08b0074 100644
--- a/NEWS
+++ b/NEWS
@@ -1,4 +1,11 @@
-Changes in version 0.2:
+Changes in version 0.3:
-Lzip options "--dictionary-size" and "--match-length" have been
-implemented.
+New option "--data-size" has been added.
+
+Output file is now removed if plzip is interrupted.
+
+This version automatically chooses the smallest possible dictionary size
+for each member during compression, saving memory during decompression.
+
+Regular files are now open in binary mode in non-POSIX platforms
+defining the O_BINARY macro.
diff --git a/configure b/configure
index 5addd3e..3cd1dc8 100755
--- a/configure
+++ b/configure
@@ -5,12 +5,12 @@
# This configure script is free software: you have unlimited permission
# to copy, distribute and modify it.
#
-# Date of this version: 2010-01-17
+# Date of this version: 2010-01-24
args=
no_create=
pkgname=plzip
-pkgversion=0.2
+pkgversion=0.3
progname=plzip
srctrigger=plzip.h
diff --git a/doc/plzip.1 b/doc/plzip.1
index 0fefe38..feb0aa1 100644
--- a/doc/plzip.1
+++ b/doc/plzip.1
@@ -1,5 +1,5 @@
.\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.36.
-.TH PLZIP "1" "January 2010" "Plzip 0.2" "User Commands"
+.TH PLZIP "1" "January 2010" "Plzip 0.3" "User Commands"
.SH NAME
Plzip \- data compressor based on the LZMA algorithm
.SH SYNOPSIS
@@ -15,6 +15,9 @@ display this help and exit
\fB\-V\fR, \fB\-\-version\fR
output version information and exit
.TP
+\fB\-B\fR, \fB\-\-data\-size=\fR<n>
+set input data block size in bytes
+.TP
\fB\-c\fR, \fB\-\-stdout\fR
send output to standard output
.TP
diff --git a/doc/plzip.info b/doc/plzip.info
index ca7bda4..9a48fcb 100644
--- a/doc/plzip.info
+++ b/doc/plzip.info
@@ -12,7 +12,7 @@ File: plzip.info, Node: Top, Next: Introduction, Up: (dir)
Plzip Manual
************
-This manual is for Plzip (version 0.2, 17 January 2010).
+This manual is for Plzip (version 0.3, 24 January 2010).
* Menu:
@@ -34,9 +34,11 @@ File: plzip.info, Node: Introduction, Next: Invoking Plzip, Prev: Top, Up: T
1 Introduction
**************
-Plzip is a parallel version of the lzip data compressor. Currently only
-compression is performed in parallel. Parallel decompression is planned
-to be implemented later.
+Plzip is a parallel version of the lzip data compressor. The files
+produced by plzip are fully compatible with lzip-1.4 or newer. Plzip is
+intended for faster compression/decompression of big files on
+multiprocessor machines. Currently only compression is performed in
+parallel. Parallel decompression is planned to be implemented later.
Lzip is a lossless data compressor based on the LZMA algorithm, with
very safe integrity checking and a user interface similar to the one of
@@ -104,6 +106,14 @@ The format for running plzip is:
`-V'
Print the version number of plzip on the standard output and exit.
+`--data-size=SIZE'
+`-B'
+ Set the input data block size in bytes. The input file will be
+ divided in chunks of this size before compression is performed.
+ Valid values range from 100kB to 1GiB. Default value is two times
+ the dictionary size. It is a waste of memory to choose a data size
+ smaller than the dictionary size.
+
`--stdout'
`-c'
Compress or decompress to standard output. Needed when reading
@@ -295,9 +305,9 @@ Concept Index
Tag Table:
Node: Top227
Node: Introduction750
-Node: Invoking Plzip3402
-Node: File Format6747
-Node: Problems8703
-Node: Concept Index9232
+Node: Invoking Plzip3571
+Node: File Format7260
+Node: Problems9216
+Node: Concept Index9745

End Tag Table
diff --git a/doc/plzip.texinfo b/doc/plzip.texinfo
index 7712600..04bf822 100644
--- a/doc/plzip.texinfo
+++ b/doc/plzip.texinfo
@@ -5,8 +5,8 @@
@finalout
@c %**end of header
-@set UPDATED 17 January 2010
-@set VERSION 0.2
+@set UPDATED 24 January 2010
+@set VERSION 0.3
@dircategory Data Compression
@direntry
@@ -50,9 +50,11 @@ to copy, distribute and modify it.
@chapter Introduction
@cindex introduction
-Plzip is a parallel version of the lzip data compressor. Currently only
-compression is performed in parallel. Parallel decompression is planned
-to be implemented later.
+Plzip is a parallel version of the lzip data compressor. The files
+produced by plzip are fully compatible with lzip-1.4 or newer. Plzip is
+intended for faster compression/decompression of big files on
+multiprocessor machines. Currently only compression is performed in
+parallel. Parallel decompression is planned to be implemented later.
Lzip is a lossless data compressor based on the LZMA algorithm, with
very safe integrity checking and a user interface similar to the one of
@@ -127,6 +129,14 @@ Print an informative help message describing the options and exit.
@itemx -V
Print the version number of plzip on the standard output and exit.
+@item --data-size=@var{size}
+@itemx -B
+Set the input data block size in bytes. The input file will be divided
+in chunks of this size before compression is performed. Valid values
+range from 100kB to 1GiB. Default value is two times the dictionary
+size. It is a waste of memory to choose a data size smaller than the
+dictionary size.
+
@item --stdout
@itemx -c
Compress or decompress to standard output. Needed when reading from a
diff --git a/main.cc b/main.cc
index 18ed642..f3f12e0 100644
--- a/main.cc
+++ b/main.cc
@@ -25,20 +25,17 @@
#define _FILE_OFFSET_BITS 64
#include <algorithm>
-#include <cassert>
#include <cerrno>
#include <climits>
#include <csignal>
-#include <cstdarg>
-#include <cstddef>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <string>
#include <vector>
#include <fcntl.h>
-#include <stdint.h>
#include <pthread.h>
+#include <stdint.h>
#include <unistd.h>
#include <utime.h>
#include <sys/stat.h>
@@ -49,7 +46,6 @@
#endif
#include "arg_parser.h"
-#include "main.h"
#include "plzip.h"
#ifndef LLONG_MAX
@@ -72,6 +68,12 @@ const char * const Program_name = "Plzip";
const char * const program_name = "plzip";
const char * const program_year = "2010";
+#ifdef O_BINARY
+const int o_binary = O_BINARY;
+#else
+const int o_binary = 0;
+#endif
+
struct { const char * from; const char * to; } const known_extensions[] = {
{ ".lz", "" },
{ ".tlz", ".tar" },
@@ -88,6 +90,8 @@ enum Mode { m_compress = 0, m_decompress, m_test };
std::string output_filename;
int outhandle = -1;
bool delete_output_on_interrupt = false;
+pthread_t main_thread;
+pid_t main_thread_pid;
class Pretty_print
{
@@ -132,6 +136,7 @@ void show_help() throw()
std::printf( " -h, --help display this help and exit\n" );
std::printf( " -V, --version output version information and exit\n" );
// std::printf( " -b, --member-size=<n> set member size limit in bytes\n" );
+ std::printf( " -B, --data-size=<n> set input data block size in bytes\n" );
std::printf( " -c, --stdout send output to standard output\n" );
std::printf( " -d, --decompress decompress\n" );
std::printf( " -f, --force overwrite existing output files\n" );
@@ -267,7 +272,7 @@ int open_instream( const std::string & name, struct stat * in_statsp,
}
else
{
- inhandle = open( name.c_str(), O_RDONLY );
+ inhandle = open( name.c_str(), O_RDONLY | o_binary );
if( inhandle < 0 )
{
if( verbosity >= 0 )
@@ -324,9 +329,11 @@ void set_d_outname( const std::string & name, const int i ) throw()
bool open_outstream( const bool force ) throw()
{
if( force )
- outhandle = open( output_filename.c_str(), O_CREAT | O_TRUNC | O_WRONLY,
+ outhandle = open( output_filename.c_str(),
+ O_CREAT | O_TRUNC | O_WRONLY | o_binary,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH );
- else outhandle = open( output_filename.c_str(), O_CREAT | O_EXCL | O_WRONLY,
+ else outhandle = open( output_filename.c_str(),
+ O_CREAT | O_EXCL | O_WRONLY | o_binary,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH );
if( outhandle < 0 )
{
@@ -412,14 +419,13 @@ int do_decompress( LZ_Decoder * const decoder, const int inhandle,
const int in_buffer_size = 65536, out_buffer_size = 8 * in_buffer_size;
uint8_t in_buffer[in_buffer_size], out_buffer[out_buffer_size];
- if( verbosity >= 1 ) pp();
while( true )
{
int in_size = std::min( LZ_decompress_write_size( decoder ), in_buffer_size );
if( in_size > 0 )
{
const int max_in_size = in_size;
- in_size = readblock( inhandle, (char *)in_buffer, max_in_size );
+ in_size = readblock( inhandle, in_buffer, max_in_size );
if( in_size != max_in_size && errno )
{ pp(); show_error( "read error", errno ); return 1; }
if( in_size == 0 ) LZ_decompress_finish( decoder );
@@ -458,7 +464,7 @@ int do_decompress( LZ_Decoder * const decoder, const int inhandle,
}
else if( out_size > 0 && outhandle >= 0 )
{
- const int wr = writeblock( outhandle, (char *)out_buffer, out_size );
+ const int wr = writeblock( outhandle, out_buffer, out_size );
if( wr != out_size )
{ pp(); show_error( "write error", errno ); return 1; }
}
@@ -490,9 +496,12 @@ int decompress( const int inhandle, const Pretty_print & pp,
}
-extern "C" void signal_handler( int ) throw()
+extern "C" void signal_handler( int sig ) throw()
{
- show_error( "Control-C or similar caught, quitting." );
+ if( !pthread_equal( pthread_self(), main_thread ) )
+ kill( main_thread_pid, sig );
+ if( sig != SIGUSR1 )
+ show_error( "Control-C or similar caught, quitting." );
cleanup_and_fail( 1 );
}
@@ -502,6 +511,7 @@ void set_signals() throw()
signal( SIGHUP, signal_handler );
signal( SIGINT, signal_handler );
signal( SIGTERM, signal_handler );
+ signal( SIGUSR1, signal_handler );
}
} // end namespace
@@ -551,372 +561,16 @@ void internal_error( const char * msg )
}
-/* Private stuff needed by fatal(). */
-static pthread_t main_thread;
-
-static pid_t pid;
-
-
-/* Public utility variables and functions. */
-
-/*
- This can be called from any thread, main thread or sub-threads alike, since
- they all call common helper functions that call fatal() in case of an error.
-*/
-void fatal()
-{
- if( pthread_equal(pthread_self(), main_thread) )
- cleanup_and_fail( 1 );
- else
- {
- if( 0 == kill(pid, SIGUSR1) )
- pthread_exit(0);
- }
- _exit( 1 );
-}
-
-
-void
-fail(const char *fmt, int err, ...)
-{
- va_list args;
-
- /* Locking stderr should also protect strerror(). */
- flockfile(stderr);
- (void)fprintf(stderr, "%s: ", program_name);
-
- va_start(args, err);
- (void)vfprintf(stderr, fmt, args);
- va_end(args);
-
- (void)fprintf(stderr, ": %s\n", strerror(err));
- funlockfile(stderr);
- /* Stream stderr is never fully buffered originally. */
- fatal();
-}
-
-
-void
-xinit(Cond *cond)
-{
- pthread_mutexattr_t attr;
-
- int ret = pthread_mutexattr_init(&attr);
- if( ret != 0 ) {
- fail("pthread_mutexattr_init()", ret);
- }
-
- ret = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
- if( ret != 0 ) {
- fail("pthread_mutexattr_settype()", ret);
- }
-
- ret = pthread_mutex_init(&cond->lock, &attr);
- if( ret != 0 ) {
- fail("pthread_mutex_init()", ret);
- }
-
- ret = pthread_mutexattr_destroy(&attr);
- if( ret != 0 ) {
- fail("pthread_mutexattr_destroy()", ret);
- }
-
- ret = pthread_cond_init(&cond->cond, 0);
- if( ret != 0 ) {
- fail("pthread_cond_init()", ret);
- }
-
- cond->ccount = 0;
- cond->wcount = 0;
-}
-
-
-void
-xdestroy(Cond *cond)
-{
- int ret = pthread_cond_destroy(&cond->cond);
- if( ret != 0 ) {
- fail("pthread_cond_destroy()", ret);
- }
-
- ret = pthread_mutex_destroy(&cond->lock);
- if( ret != 0 ) {
- fail("pthread_mutex_destroy()", ret);
- }
-}
-
-
-void
-xlock(Cond *cond)
-{
- int ret = pthread_mutex_lock(&cond->lock);
- if( ret != 0 ) {
- fail("pthread_mutex_lock()", ret);
- }
-}
-
-
-void
-xlock_pred(Cond *cond)
-{
- xlock(cond);
- ++cond->ccount;
-}
-
-
-void
-xunlock(Cond *cond)
-{
- int ret = pthread_mutex_unlock(&cond->lock);
- if( ret != 0 ) {
- fail("pthread_mutex_unlock()", ret);
- }
-}
-
-
-void
-xwait(Cond *cond)
-{
- ++cond->wcount;
- int ret = pthread_cond_wait(&cond->cond, &cond->lock);
- if( ret != 0 ) {
- fail("pthread_cond_wait()", ret);
- }
- ++cond->ccount;
-}
-
-
-void
-xsignal(Cond *cond)
-{
- int ret = pthread_cond_signal(&cond->cond);
- if( ret != 0 ) {
- fail("pthread_cond_signal()", ret);
- }
-}
-
-
-void
-xbroadcast(Cond *cond)
-{
- int ret = pthread_cond_broadcast(&cond->cond);
- if( ret != 0 ) {
- fail("pthread_cond_broadcast()", ret);
- }
-}
-
-
-void
-xcreate(pthread_t *thread, void *(*routine)(void *), void *arg)
-{
- int ret = pthread_create(thread, 0, routine, arg);
- if( ret != 0 ) {
- fail("pthread_create()", ret);
- }
-}
-
-
-void
-xjoin(pthread_t thread)
-{
- int ret = pthread_join(thread, 0);
- if( ret != 0 ) {
- fail("pthread_join()", ret);
- }
-}
-
-
-void
-xraise(int sig)
-{
- if( -1 == kill(pid, sig) ) {
- fail("kill()", errno);
- }
-}
-
-
-/* Private stuff part 2. */
-
-
-static void
-xsigemptyset(sigset_t *set)
-{
- if( -1 == sigemptyset(set) ) {
- fail("sigemptyset()", errno);
- }
-}
-
-
-static void
-xsigaddset(sigset_t *set, int signo)
-{
- if( -1 == sigaddset(set, signo) ) {
- fail("sigaddset()", errno);
- }
-}
-
-
-static void
-xsigmask(int how, const sigset_t *set, sigset_t *oset)
-{
- int ret = pthread_sigmask(how, set, oset);
- if( ret != 0 ) {
- fail("pthread_sigmask()", ret);
- }
-}
-
-
-static void
-xsigaction(int sig, void (*handler)(int))
-{
- struct sigaction act;
-
- act.sa_handler = handler;
- xsigemptyset(&act.sa_mask);
- act.sa_flags = 0;
-
- if( -1 == sigaction(sig, &act, 0) ) {
- fail("sigaction()", errno);
- }
-}
-
-
-enum Caught_sig { CS_INT = 1, CS_TERM, CS_USR1, CS_USR2 };
-
-static volatile sig_atomic_t caught_sig;
-
-
-extern "C" void sighandler( int sig )
- {
- /* sig_atomic_t is nowhere required to be able to hold signal values. */
- switch( sig )
- {
- case SIGINT : caught_sig = CS_INT; break;
- case SIGTERM: caught_sig = CS_TERM; break;
- case SIGUSR1: caught_sig = CS_USR1; break;
- case SIGUSR2: caught_sig = CS_USR2; break;
- default: internal_error( "caught signal not in set" );
- }
- }
-
-
-static void compress( const lzma_options & encoder_options, const int num_workers,
- int debug_level, int num_slots, int infd, int outfd,
- const Pretty_print & pp, const sigset_t *unblocked )
- {
- /*
- We could wait for signals with either sigwait() or sigsuspend(). SUSv2
- states about sigwait() that its effect on signal actions is unspecified.
- SUSv3 still claims the same.
-
- The SUSv2 description of sigsuspend() talks about both the thread and the
- whole process being suspended until a signal arrives, although thread
- suspension seems much more likely from the wording. They note that they
- filed a clarification request for this. SUSv3 cleans this up and chooses
- thread suspension which was more logical anyway.
-
- I favor sigsuspend() because I need to re-raise SIGTERM and SIGINT, and
- unspecified action behavior with sigwait() seems messy.
-
- 13-OCT-2009 lacos
- */
-
- if( verbosity >= 1 ) pp();
-
- Muxer_arg muxer_arg;
- muxer_arg.dictionary_size = encoder_options.dictionary_size;
- muxer_arg.match_len_limit = encoder_options.match_len_limit;
- muxer_arg.num_workers = num_workers;
- muxer_arg.num_slots = num_slots;
- muxer_arg.debug_level = debug_level;
- muxer_arg.infd = infd;
- muxer_arg.outfd = outfd;
-
- pthread_t muxer_thread;
- xcreate(&muxer_thread, muxer, &muxer_arg);
-
- /* Unblock signals, wait for them, then block them again. */
- {
- int ret = sigsuspend(unblocked);
- assert(-1 == ret && EINTR == errno);
- }
-
- switch( caught_sig ) {
- case CS_INT:
- case CS_TERM: // FIXME remove output file
- {
- int sig;
- sigset_t mask;
-
- sig = (CS_INT == caught_sig) ? SIGINT : SIGTERM;
- /*
- We might have inherited a SIG_IGN from the parent, but that would
- make no sense here. 24-OCT-2009 lacos
- */
- xsigaction(sig, SIG_DFL);
- xraise(sig);
-
- xsigemptyset(&mask);
- xsigaddset(&mask, sig);
- xsigmask(SIG_UNBLOCK, &mask, 0);
- }
- /*
- We shouldn't reach this point, but if we do for some reason, fall
- through.
- */
-
- case CS_USR1:
- /* Error from a non-main thread via fatal(). */
- fatal();
-
- case CS_USR2:
- /* Muxer thread joined other sub-threads and finished successfully. */
- break;
-
- default:
- assert(0);
- }
-
- xjoin(muxer_thread);
- }
-
-
-static void
-sigs_mod(int block_n_catch, sigset_t *oset)
- {
- void (*handler)(int);
-
- if( block_n_catch ) {
- sigset_t mask;
-
- xsigemptyset(&mask);
- xsigaddset(&mask, SIGINT);
- xsigaddset(&mask, SIGTERM);
- xsigaddset(&mask, SIGUSR1);
- xsigaddset(&mask, SIGUSR2);
- xsigmask(SIG_BLOCK, &mask, oset);
-
- handler = sighandler;
- }
- else {
- handler = SIG_DFL;
- }
-
- xsigaction(SIGINT, handler);
- xsigaction(SIGTERM, handler);
- xsigaction(SIGUSR1, handler);
- xsigaction(SIGUSR2, handler);
-
- if( !block_n_catch ) {
- xsigmask(SIG_SETMASK, oset, 0);
- }
- }
+// This can be called from any thread, main thread or sub-threads alike, since
+// they all call common helper functions that call fatal() in case of an error.
+//
+void fatal() { signal_handler( SIGUSR1 ); }
// Returns the number of bytes really read.
// If (returned value < size) and (errno == 0), means EOF was reached.
//
-int readblock( const int fd, char * buf, const int size ) throw()
+int readblock( const int fd, uint8_t * buf, const int size ) throw()
{
int rest = size;
errno = 0;
@@ -935,7 +589,7 @@ int readblock( const int fd, char * buf, const int size ) throw()
// Returns the number of bytes really written.
// If (returned value < size), it is always an error.
//
-int writeblock( const int fd, const char * buf, const int size ) throw()
+int writeblock( const int fd, const uint8_t * buf, const int size ) throw()
{
int rest = size;
errno = 0;
@@ -966,6 +620,7 @@ int main( const int argc, const char * argv[] )
{ 1 << 24, 163 }, // -8
{ 1 << 25, 273 } }; // -9
lzma_options encoder_options = option_mapping[5]; // default = "-6"
+ int data_size = 0;
int debug_level = 0;
int inhandle = -1;
int num_workers = 0; // Start this many worker threads
@@ -977,22 +632,18 @@ int main( const int argc, const char * argv[] )
std::string default_output_filename;
std::vector< std::string > filenames;
invocation_name = argv[0];
+ main_thread = pthread_self();
+ main_thread_pid = getpid();
if( LZ_version()[0] != LZ_version_string[0] )
internal_error( "bad library version" );
- main_thread = pthread_self();
- pid = getpid();
-
- xsigaction(SIGPIPE, SIG_IGN);
- xsigaction(SIGXFSZ, SIG_IGN);
-
const int slots_per_worker = 2;
long max_workers = sysconf( _SC_THREAD_THREADS_MAX );
if( max_workers < 1 || max_workers > INT_MAX / slots_per_worker )
max_workers = INT_MAX / slots_per_worker;
- if( max_workers > INT_MAX / (int)sizeof( pthread_t ) )
- max_workers = INT_MAX / sizeof( pthread_t );
+ if( max_workers > INT_MAX / (int)sizeof (pthread_t) )
+ max_workers = INT_MAX / sizeof (pthread_t);
const Arg_parser::Option options[] =
{
@@ -1006,6 +657,7 @@ int main( const int argc, const char * argv[] )
{ '8', 0, Arg_parser::no },
{ '9', "best", Arg_parser::no },
{ 'b', "member-size", Arg_parser::yes },
+ { 'B', "data-size", Arg_parser::yes },
{ 'c', "stdout", Arg_parser::no },
{ 'd', "decompress", Arg_parser::no },
{ 'D', "debug", Arg_parser::yes },
@@ -1040,6 +692,8 @@ int main( const int argc, const char * argv[] )
case '7': case '8': case '9':
encoder_options = option_mapping[code-'1']; break;
case 'b': break;
+ case 'B': data_size = getnum( arg, 0, 100000,
+ 2 * LZ_max_dictionary_size() ); break;
case 'c': to_stdout = true; break;
case 'd': program_mode = m_decompress; break;
case 'D': debug_level = getnum( arg, 0, 0, 3 );
@@ -1048,8 +702,8 @@ int main( const int argc, const char * argv[] )
case 'h': show_help(); return 0;
case 'k': keep_input_files = true; break;
case 'm': encoder_options.match_len_limit =
- getnum( arg, 0, LZ_min_match_len_limit(),
- LZ_max_match_len_limit() ); break;
+ getnum( arg, 0, LZ_min_match_len_limit(),
+ LZ_max_match_len_limit() ); break;
case 'o': default_output_filename = arg; break;
case 'n': num_workers = getnum( arg, 0, 1, max_workers ); break;
case 'q': verbosity = -1; break;
@@ -1063,13 +717,16 @@ int main( const int argc, const char * argv[] )
}
}
+ if( data_size <= 0 )
+ data_size = 2 * std::max( 65536, encoder_options.dictionary_size );
+
if( num_workers <= 0 )
{
long num_online = sysconf( _SC_NPROCESSORS_ONLN );
- if( num_online <= 0 ) num_online = 2;
+ if( num_online <= 0 ) num_online = 1;
num_workers = std::min( num_online, max_workers );
}
- const int num_slots = num_workers * slots_per_worker;
+ const int num_slots = std::max( 1, ( num_workers * slots_per_worker ) - 1 );
bool filenames_given = false;
for( ; argind < parser.arguments(); ++argind )
@@ -1079,7 +736,9 @@ int main( const int argc, const char * argv[] )
}
if( filenames.empty() ) filenames.push_back("-");
- if( filenames_given && program_mode != m_compress ) set_signals();
+ if( !to_stdout && program_mode != m_test &&
+ ( filenames_given || default_output_filename.size() ) )
+ set_signals();
Pretty_print pp( filenames );
if( program_mode == m_test )
@@ -1144,14 +803,12 @@ int main( const int argc, const char * argv[] )
delete_output_on_interrupt = true;
const struct stat * const in_statsp = input_filename.size() ? &in_stats : 0;
pp.set_name( input_filename );
+ if( verbosity >= 1 ) pp();
int tmp = 0;
if( program_mode == m_compress )
- {
- sigset_t unblocked;
- sigs_mod(1, &unblocked);
- compress( encoder_options, num_workers, debug_level, num_slots, inhandle, outhandle, pp, &unblocked );
- sigs_mod(0, &unblocked);
- }
+ tmp = compress( data_size, encoder_options.dictionary_size,
+ encoder_options.match_len_limit, num_workers,
+ num_slots, inhandle, outhandle, debug_level );
else
tmp = decompress( inhandle, pp, program_mode == m_test );
if( tmp > retval ) retval = tmp;
diff --git a/main.h b/main.h
deleted file mode 100644
index 6c7ffb6..0000000
--- a/main.h
+++ /dev/null
@@ -1,77 +0,0 @@
-/* Plzip - A parallel version of the lzip data compressor
- Copyright (C) 2009 Laszlo Ersek.
- Copyright (C) 2009, 2010 Antonio Diaz Diaz.
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-void show_error( const char * msg, const int errcode = 0, const bool help = false ) throw();
-int readblock( const int fd, char * buf, const int size ) throw();
-int writeblock( const int fd, const char * buf, const int size ) throw();
-
-
-struct Cond
-{
- pthread_mutex_t lock; /* Lock this to protect shared resource. */
- pthread_cond_t cond; /* Trigger this if predicate becomes true. */
- long unsigned ccount, /* Increment this when checking predicate. */
- wcount; /* Increment this when waiting is necessary. */
-};
-
-
-/* Terminate the process. */
-void fatal();
-
-/* Format operation and append resolved error, then call fatal(). */
-void
-fail(const char *fmt, int err, ...)
-#ifdef __GNUC__
-__attribute__((format(printf, 1, 3)))
-#endif
-;
-
-/* If these primitives fail, they call fail(), which in turn calls fatal(). */
-
-void
-xinit(Cond *cond);
-
-void
-xdestroy(Cond *cond);
-
-void
-xlock(Cond *cond);
-
-void
-xlock_pred(Cond *cond);
-
-void
-xunlock(Cond *cond);
-
-void
-xwait(Cond *cond);
-
-void
-xsignal(Cond *cond);
-
-void
-xbroadcast(Cond *cond);
-
-void
-xcreate(pthread_t *thread, void *(*routine)(void *), void *arg);
-
-void
-xjoin(pthread_t thread);
-
-void
-xraise(int sig);
diff --git a/plzip.cc b/plzip.cc
index 7ac8144..dcae860 100644
--- a/plzip.cc
+++ b/plzip.cc
@@ -26,11 +26,11 @@
#include <cstdio>
#include <cstdlib>
#include <vector>
+#include <pthread.h>
#include <stdint.h>
#include <unistd.h>
#include <lzlib.h>
-#include "main.h"
#include "plzip.h"
#ifndef LLONG_MAX
@@ -49,282 +49,304 @@ namespace {
long long in_size = 0;
long long out_size = 0;
-void *(*mallocf)(size_t size);
-void (*freef)(void *ptr);
+void *(*mallocf)( size_t size );
+void (*freef)( void *ptr );
-void * trace_malloc(size_t size)
-{
+void * trace_malloc( size_t size )
+ {
int save_errno = 0;
- void * ret = malloc(size);
+ void * ret = malloc( size );
if( ret == 0 ) save_errno = errno;
- fprintf(stderr, "malloc(%lu) == %p\n", (long unsigned)size, ret);
+ std::fprintf( stderr, "malloc(%lu) == %p\n", (unsigned long)size, ret );
if( ret == 0 ) errno = save_errno;
return ret;
-}
+ }
-void trace_free(void *ptr)
-{
- fprintf(stderr, "free(%p)\n", ptr);
- free(ptr);
-}
+void trace_free( void *ptr )
+ {
+ std::fprintf( stderr, "free(%p)\n", ptr );
+ free( ptr );
+ }
-void * xalloc(size_t size)
-{
- void *ret = (*mallocf)(size);
- if( 0 == ret ) fail("(*mallocf)()", errno);
+void * xalloc( size_t size )
+ {
+ void *ret = (*mallocf)( size );
+ if( ret == 0 ) { show_error( "not enough memory", errno ); fatal(); }
return ret;
-}
-
-
-struct S2w_blk /* Splitter to workers. */
-{
- unsigned long long id; /* Block serial number as read from infd. */
- S2w_blk *next; /* Next in queue. */
- size_t loaded; /* # of bytes in plain, may be 0 for 1st. */
- unsigned char plain[1]; /* Data read from infd, allocated: sizeof_plain. */
-};
-
-
-struct S2w_q
-{
- Cond av_or_eof; /* New block available or splitter done. */
- S2w_blk *tail, /* Splitter will append here. */
- *head; /* Next ready worker shall compress this. */
- int eof; /* Splitter done. */
-};
-
-
-void
-s2w_q_init(S2w_q *s2w_q)
-{
- xinit(&s2w_q->av_or_eof);
- s2w_q->tail = 0;
- s2w_q->head = 0;
- s2w_q->eof = 0;
-}
-
-
-void
-s2w_q_uninit(S2w_q *s2w_q)
-{
- assert(0 != s2w_q->eof);
- assert(0 == s2w_q->head);
- assert(0 == s2w_q->tail);
- xdestroy(&s2w_q->av_or_eof);
-}
-
-
-struct W2m_blk /* Workers to muxer data block. */
-{
- unsigned long long id; /* Block index as read from infd. */
- W2m_blk *next; /* Next block in list (unordered). */
- int produced; /* Number of bytes in compr. */
- unsigned char compr[1]; /* Data to write to outfd, alloc.: sizeof_compr. */
-};
-
-
-struct W2m_q
-{
- Cond av_or_exit; /* New block available or all workers exited. */
- unsigned long long needed; /* Block needed for resuming writing. */
- W2m_blk *head; /* Block list (unordered). */
- unsigned working; /* Number of workers still running. */
-};
-
-
-void
-w2m_q_init(W2m_q *w2m_q, int num_workers)
-{
- assert(0 < num_workers);
- xinit(&w2m_q->av_or_exit);
- w2m_q->needed = 0;
- w2m_q->head = 0;
- w2m_q->working = num_workers;
-}
-
-
-void
-w2m_q_uninit(W2m_q *w2m_q)
-{
- assert(0 == w2m_q->working);
- assert(0 == w2m_q->head);
- xdestroy(&w2m_q->av_or_exit);
-}
-
-
-struct M2s_q // Muxer to splitter queue
+ }
+
+
+void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex )
{
- Cond av; // Free slot available
- int num_free; // Number of free slots
+ int ret = pthread_mutex_init( mutex, 0 );
+ if( ret != 0 ) { show_error( "pthread_mutex_init", ret ); fatal(); }
- M2s_q( const int slots )
- {
- xinit(&av);
- num_free = slots;
- }
+ ret = pthread_cond_init( cond, 0 );
+ if( ret != 0 ) { show_error( "pthread_cond_init", ret ); fatal(); }
+ }
+
+
+void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex )
+ {
+ int ret = pthread_cond_destroy( cond );
+ if( ret != 0 ) { show_error( "pthread_cond_destroy", ret ); fatal(); }
+
+ ret = pthread_mutex_destroy( mutex );
+ if( ret != 0 ) { show_error( "pthread_mutex_destroy", ret ); fatal(); }
+ }
+
+
+void xlock( pthread_mutex_t * mutex )
+ {
+ int ret = pthread_mutex_lock( mutex );
+ if( ret != 0 ) { show_error( "pthread_mutex_lock", ret ); fatal(); }
+ }
+
+
+void xunlock( pthread_mutex_t * mutex )
+ {
+ int ret = pthread_mutex_unlock( mutex );
+ if( ret != 0 ) { show_error( "pthread_mutex_unlock", ret ); fatal(); }
+ }
+
+
+void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex )
+ {
+ int ret = pthread_cond_wait( cond, mutex );
+ if( ret != 0 ) { show_error( "pthread_cond_wait", ret ); fatal(); }
+ }
+
+
+void xsignal( pthread_cond_t * cond )
+ {
+ int ret = pthread_cond_signal( cond );
+ if( ret != 0 ) { show_error( "pthread_cond_signal", ret ); fatal(); }
+ }
+
+
+void xbroadcast( pthread_cond_t * cond )
+ {
+ int ret = pthread_cond_broadcast( cond );
+ if( ret != 0 ) { show_error( "pthread_cond_broadcast", ret ); fatal(); }
+ }
+
+
+void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg )
+ {
+ int ret = pthread_create( thread, 0, routine, arg );
+ if( ret != 0 ) { show_error( "pthread_create", ret ); fatal(); }
+ }
+
+
+void xjoin( pthread_t thread )
+ {
+ int ret = pthread_join( thread, 0 );
+ if( ret != 0 ) { show_error( "pthread_join", ret ); fatal(); }
+ }
+
+
+struct Slot_tally // Synchronizes splitter to muxer
+ {
+ unsigned long check_counter;
+ unsigned long wait_counter;
+ int num_free; // Number of free slots
+ pthread_mutex_t mutex;
+ pthread_cond_t slot_av; // Free slot available
+
+ Slot_tally( const int slots )
+ : check_counter( 0 ), wait_counter( 0 ), num_free( slots )
+ { xinit( &slot_av, &mutex ); }
+
+ ~Slot_tally() { xdestroy( &slot_av, &mutex ); }
+ };
+
+
+struct S2w_blk // Splitter to worker data block
+ {
+ unsigned long long id; // Block serial number as read from infd
+ S2w_blk *next; // Next in queue
+ int loaded; // # of bytes in plain, may be 0 for 1st
+ uint8_t plain[1]; // Data read from infd, allocated: data_size
+ };
+
+
+struct S2w_queue
+ {
+ S2w_blk * head; // Next ready worker shall compress this
+ S2w_blk * tail; // Splitter will append here
+ unsigned long check_counter;
+ unsigned long wait_counter;
+ pthread_mutex_t mutex;
+ pthread_cond_t av_or_eof; // New block available or splitter done
+ bool eof; // Splitter done
+
+ S2w_queue()
+ : head( 0 ), tail( 0 ), check_counter( 0 ), wait_counter( 0 ), eof( false )
+ { xinit( &av_or_eof, &mutex ); }
+
+ ~S2w_queue() { xdestroy( &av_or_eof, &mutex ); }
+ };
- ~M2s_q() { xdestroy(&av); }
+
+struct W2m_blk // Worker to muxer data block
+ {
+ unsigned long long id; // Block index as read from infd
+ W2m_blk *next; // Next block in list (unordered)
+ int produced; // Number of bytes in compr
+ uint8_t compr[1]; // Data to write to outfd, alloc.: compr_size
+ };
+
+
+struct W2m_queue
+ {
+ unsigned long long needed_id; // Block needed for resuming writing
+ W2m_blk *head; // Block list (unordered)
+ unsigned long check_counter;
+ unsigned long wait_counter;
+ int num_working; // Number of workers still running
+ pthread_mutex_t mutex;
+ pthread_cond_t av_or_exit; // New block available or all workers exited
+
+ W2m_queue( const int num_workers )
+ : needed_id( 0 ), head( 0 ), check_counter( 0 ), wait_counter( 0 ),
+ num_working( num_workers )
+ { xinit( &av_or_exit, &mutex ); }
+
+ ~W2m_queue() { xdestroy( &av_or_exit, &mutex ); }
};
struct Splitter_arg
{
- M2s_q *m2s_q;
- S2w_q *s2w_q;
+ Slot_tally * slot_tally;
+ S2w_queue * s2w_queue;
int infd;
- int sizeof_plain;
- size_t sizeof_S2w_blk;
+ int data_size;
+ int s2w_blk_size;
};
void * splitter( void * arg )
{
const Splitter_arg & tmp = *(Splitter_arg *)arg;
- M2s_q *m2s_q = tmp.m2s_q;
- S2w_q *s2w_q = tmp.s2w_q;
+ Slot_tally & slot_tally = *tmp.slot_tally;
+ S2w_queue & s2w_queue = *tmp.s2w_queue;
const int infd = tmp.infd;
- const int sizeof_plain = tmp.sizeof_plain;
- const size_t sizeof_s2w_blk = tmp.sizeof_S2w_blk;
+ const int data_size = tmp.data_size;
+ const int s2w_blk_size = tmp.s2w_blk_size;
for( unsigned long long id = 0; ; ++id )
{
- /* Grab a free slot. */
- xlock_pred(&m2s_q->av);
- while( m2s_q->num_free == 0 ) xwait(&m2s_q->av);
- --m2s_q->num_free;
- xunlock(&m2s_q->av);
- S2w_blk * s2w_blk = (S2w_blk *)xalloc(sizeof_s2w_blk);
-
- /* Fill block. */
- const int rd = readblock( infd, (char *)s2w_blk->plain, sizeof_plain );
- if( rd != sizeof_plain && errno ) fail("read()", errno);
-
- if( rd == 0 && id != 0 )
- {
- /* EOF on first read, but not for first input block. */
- (*freef)(s2w_blk);
- xlock(&m2s_q->av);
- ++m2s_q->num_free;
- xunlock(&m2s_q->av);
- }
- else
+ S2w_blk * s2w_blk = (S2w_blk *)xalloc( s2w_blk_size );
+
+ // Fill block
+ const int rd = readblock( infd, s2w_blk->plain, data_size );
+ if( rd != data_size && errno ) { show_error( "read", errno ); fatal(); }
+
+ if( rd > 0 || id == 0 ) // first block can be empty
{
s2w_blk->id = id;
s2w_blk->next = 0;
s2w_blk->loaded = rd;
in_size += rd;
+ xlock( &slot_tally.mutex ); // Grab a free slot
+ ++slot_tally.check_counter;
+ while( slot_tally.num_free == 0 )
+ {
+ ++slot_tally.wait_counter;
+ xwait( &slot_tally.slot_av, &slot_tally.mutex );
+ }
+ --slot_tally.num_free;
+ xunlock( &slot_tally.mutex );
}
+ else
+ { (*freef)( s2w_blk ); s2w_blk = 0; }
- xlock(&s2w_q->av_or_eof);
- if( s2w_q->head == 0 ) xbroadcast(&s2w_q->av_or_eof);
-
- if( rd > 0 || id == 0 )
+ xlock( &s2w_queue.mutex );
+ if( s2w_blk != 0 )
{
- if( s2w_q->tail == 0 ) s2w_q->head = s2w_blk;
- else s2w_q->tail->next = s2w_blk;
- s2w_q->tail = s2w_blk;
+ if( s2w_queue.tail == 0 ) s2w_queue.head = s2w_blk;
+ else s2w_queue.tail->next = s2w_blk;
+ s2w_queue.tail = s2w_blk;
+ xsignal( &s2w_queue.av_or_eof );
}
- s2w_q->eof = ( rd == 0 );
- xunlock(&s2w_q->av_or_eof);
+ else
+ {
+ s2w_queue.eof = true;
+ xbroadcast( &s2w_queue.av_or_eof );
+ }
+ xunlock( &s2w_queue.mutex );
- if( rd <= 0 ) break;
+ if( s2w_blk == 0 ) break;
}
return 0;
}
-void work_lz_rd(W2m_blk *w2m_blk, const int sizeof_compr, LZ_Encoder * lz)
-{
- int rd;
-
- assert(w2m_blk->produced < sizeof_compr);
- rd = LZ_compress_read(lz, w2m_blk->compr + w2m_blk->produced,
- sizeof_compr - w2m_blk->produced);
- if( -1 == rd ) {
- show_error( "LZ_compress_read() failed." );
- fatal();
- }
- w2m_blk->produced += rd;
-}
-
-
void work_compr( const int dictionary_size, const int match_len_limit,
- S2w_blk *s2w_blk, W2m_q *w2m_q,
- const int sizeof_compr, const size_t sizeof_w2m_blk )
-{
- W2m_blk *w2m_blk;
-
- assert(0 < s2w_blk->loaded || 0 == s2w_blk->id);
-
- w2m_blk = (W2m_blk *)xalloc(sizeof_w2m_blk);
-
- /* Single member compression. Settings like with lzip -6. */
+ const S2w_blk & s2w_blk, W2m_queue & w2m_queue,
+ const int compr_size, const int w2m_blk_size )
{
- LZ_Encoder * lz;
- size_t written;
+ assert( s2w_blk.loaded > 0 || s2w_blk.id == 0 );
- lz = LZ_compress_open( dictionary_size, match_len_limit, LLONG_MAX );
- if( LZ_ok != LZ_compress_errno(lz) ) {
- show_error( "LZ_compress_open() failed." );
- fatal();
- }
+ W2m_blk * w2m_blk = (W2m_blk *)xalloc( w2m_blk_size );
- written = 0;
- w2m_blk->produced = 0;
- while( written < s2w_blk->loaded ) {
- int wr;
+ const int dict_size = std::max( LZ_min_dictionary_size(),
+ std::min( dictionary_size, s2w_blk.loaded ) );
+ LZ_Encoder * const encoder =
+ LZ_compress_open( dict_size, match_len_limit, LLONG_MAX );
+ if( !encoder || LZ_compress_errno( encoder ) != LZ_ok )
+ { show_error( "LZ_compress_open failed." ); fatal(); }
- wr = LZ_compress_write(lz, s2w_blk->plain + written,
- s2w_blk->loaded - written);
- if( -1 == wr ) {
- show_error( "LZ_compress_write() failed." );
- fatal();
+ int written = 0;
+ w2m_blk->produced = 0;
+ while( true )
+ {
+ if( LZ_compress_write_size( encoder ) > 0 )
+ {
+ if( written < s2w_blk.loaded )
+ {
+ const int wr = LZ_compress_write( encoder, s2w_blk.plain + written,
+ s2w_blk.loaded - written );
+ if( wr < 0 ) { show_error( "LZ_compress_write failed." ); fatal(); }
+ written += wr;
+ }
+ if( written >= s2w_blk.loaded ) LZ_compress_finish( encoder );
}
- written += (size_t)wr;
-
- work_lz_rd(w2m_blk, sizeof_compr, lz);
+ assert( w2m_blk->produced < compr_size );
+ const int rd = LZ_compress_read( encoder, w2m_blk->compr + w2m_blk->produced,
+ compr_size - w2m_blk->produced );
+ if( rd < 0 ) { show_error( "LZ_compress_read failed." ); fatal(); }
+ w2m_blk->produced += rd;
+ if( LZ_compress_finished( encoder ) == 1 ) break;
}
- if( -1 == LZ_compress_finish(lz) ) {
- show_error( "LZ_compress_finish() failed." );
- fatal();
- }
+ if( LZ_compress_close( encoder ) < 0 )
+ { show_error( "LZ_compress_close failed." ); fatal(); }
- while( !LZ_compress_finished(lz) ) {
- work_lz_rd(w2m_blk, sizeof_compr, lz);
- }
+ w2m_blk->id = s2w_blk.id;
- if( -1 == LZ_compress_close(lz) ) {
- show_error( "LZ_compress_close() failed." );
- fatal();
- }
+ // Push block to muxer queue
+ xlock( &w2m_queue.mutex );
+ w2m_blk->next = w2m_queue.head;
+ w2m_queue.head = w2m_blk;
+ if( w2m_blk->id == w2m_queue.needed_id ) xsignal( &w2m_queue.av_or_exit );
+ xunlock( &w2m_queue.mutex );
}
- w2m_blk->id = s2w_blk->id;
-
- /* Push block to muxer. */
- xlock(&w2m_q->av_or_exit);
- w2m_blk->next = w2m_q->head;
- w2m_q->head = w2m_blk;
- if( w2m_blk->id == w2m_q->needed ) {
- xsignal(&w2m_q->av_or_exit);
- }
- xunlock(&w2m_q->av_or_exit);
-}
-
struct Worker_arg
{
int dictionary_size;
int match_len_limit;
- S2w_q *s2w_q;
- W2m_q *w2m_q;
- int sizeof_compr;
- size_t sizeof_W2m_blk;
+ S2w_queue * s2w_queue;
+ W2m_queue * w2m_queue;
+ int compr_size;
+ int w2m_blk_size;
};
@@ -333,64 +355,68 @@ void * worker( void * arg )
const Worker_arg & tmp = *(Worker_arg *)arg;
const int dictionary_size = tmp.dictionary_size;
const int match_len_limit = tmp.match_len_limit;
- S2w_q *s2w_q = tmp.s2w_q;
- W2m_q *w2m_q = tmp.w2m_q;
- const int sizeof_compr = tmp.sizeof_compr;
- const size_t sizeof_w2m_blk = tmp.sizeof_W2m_blk;
+ S2w_queue & s2w_queue = *tmp.s2w_queue;
+ W2m_queue & w2m_queue = *tmp.w2m_queue;
+ const int compr_size = tmp.compr_size;
+ const int w2m_blk_size = tmp.w2m_blk_size;
while( true )
{
S2w_blk *s2w_blk;
- /* Grab a block to work on. */
- xlock_pred(&s2w_q->av_or_eof);
- while( 0 == s2w_q->head && !s2w_q->eof ) {
- xwait(&s2w_q->av_or_eof);
- }
- if( 0 == s2w_q->head ) {
- /* No blocks available and splitter exited. */
- xunlock(&s2w_q->av_or_eof);
+ // Grab a block to work on
+ xlock( &s2w_queue.mutex );
+ ++s2w_queue.check_counter;
+ while( s2w_queue.head == 0 && !s2w_queue.eof )
+ {
+ ++s2w_queue.wait_counter;
+ xwait( &s2w_queue.av_or_eof, &s2w_queue.mutex );
+ }
+ if( s2w_queue.head == 0 ) // No blocks available and splitter exited
+ {
+ xunlock( &s2w_queue.mutex );
break;
+ }
+ s2w_blk = s2w_queue.head;
+ s2w_queue.head = s2w_blk->next;
+ if( s2w_queue.head == 0 ) s2w_queue.tail = 0;
+ xunlock( &s2w_queue.mutex );
+
+ work_compr( dictionary_size, match_len_limit, *s2w_blk, w2m_queue,
+ compr_size, w2m_blk_size );
+ (*freef)( s2w_blk );
}
- s2w_blk = s2w_q->head;
- s2w_q->head = s2w_blk->next;
- if( 0 == s2w_q->head ) {
- s2w_q->tail = 0;
- }
- xunlock(&s2w_q->av_or_eof);
- work_compr( dictionary_size, match_len_limit, s2w_blk, w2m_q,
- sizeof_compr, sizeof_w2m_blk );
- (*freef)(s2w_blk);
- }
-
- /* Notify muxer when last worker exits. */
- xlock(&w2m_q->av_or_exit);
- if( 0 == --w2m_q->working && 0 == w2m_q->head ) {
- xsignal(&w2m_q->av_or_exit);
- }
- xunlock(&w2m_q->av_or_exit);
+ // Notify muxer when last worker exits
+ xlock( &w2m_queue.mutex );
+ if( --w2m_queue.num_working == 0 && w2m_queue.head == 0 )
+ xsignal( &w2m_queue.av_or_exit );
+ xunlock( &w2m_queue.mutex );
return 0;
}
-void muxer_loop( W2m_q *w2m_q, M2s_q *m2s_q, const int num_slots, const int outfd )
+void muxer( Slot_tally & slot_tally, W2m_queue & w2m_queue,
+ const int num_slots, const int outfd )
{
unsigned long long needed_id = 0;
std::vector< W2m_blk * > circular_buffer( num_slots, (W2m_blk *)0 );
- xlock_pred(&w2m_q->av_or_exit);
+ xlock( &w2m_queue.mutex );
while( true )
{
- /* Grab all available compressed blocks in one step. */
- while( w2m_q->head == 0 && w2m_q->working > 0 )
- xwait(&w2m_q->av_or_exit);
-
- if( w2m_q->head == 0 ) break; // queue is empty. all workers exited
+ // Grab all available compressed blocks in one step
+ ++w2m_queue.check_counter;
+ while( w2m_queue.head == 0 && w2m_queue.num_working > 0 )
+ {
+ ++w2m_queue.wait_counter;
+ xwait( &w2m_queue.av_or_exit, &w2m_queue.mutex );
+ }
+ if( w2m_queue.head == 0 ) break; // queue is empty. all workers exited
- W2m_blk * w2m_blk = w2m_q->head;
- w2m_q->head = 0;
- xunlock(&w2m_q->av_or_exit);
+ W2m_blk * w2m_blk = w2m_queue.head;
+ w2m_queue.head = 0;
+ xunlock( &w2m_queue.mutex );
// Merge blocks fetched this time into circular buffer
do {
@@ -405,30 +431,31 @@ void muxer_loop( W2m_q *w2m_q, M2s_q *m2s_q, const int num_slots, const int outf
// Write out initial continuous sequence of reordered blocks
while( true )
{
- W2m_blk * needed_w2m_blk = circular_buffer[needed_id%num_slots];
- if( needed_w2m_blk == 0 ) break;
+ w2m_blk = circular_buffer[needed_id%num_slots];
+ if( w2m_blk == 0 ) break;
- out_size += needed_w2m_blk->produced;
+ out_size += w2m_blk->produced;
if( outfd >= 0 )
{
- const int wr = writeblock( outfd, (char *)needed_w2m_blk->compr, needed_w2m_blk->produced );
- if( wr != needed_w2m_blk->produced ) fail("write()", errno);
+ const int wr = writeblock( outfd, w2m_blk->compr, w2m_blk->produced );
+ if( wr != w2m_blk->produced )
+ { show_error( "write", errno ); fatal(); }
}
circular_buffer[needed_id%num_slots] = 0;
++needed_id;
- xlock(&m2s_q->av);
- if( 0 == m2s_q->num_free++ ) xsignal(&m2s_q->av);
- xunlock(&m2s_q->av);
+ xlock( &slot_tally.mutex );
+ if( slot_tally.num_free++ == 0 ) xsignal( &slot_tally.slot_av );
+ xunlock( &slot_tally.mutex );
- (*freef)(needed_w2m_blk);
+ (*freef)( w2m_blk );
}
- xlock_pred(&w2m_q->av_or_exit);
- w2m_q->needed = needed_id;
+ xlock( &w2m_queue.mutex );
+ w2m_queue.needed_id = needed_id;
}
- xunlock(&w2m_q->av_or_exit);
+ xunlock( &w2m_queue.mutex );
for( int i = 0; i < num_slots; ++i )
if( circular_buffer[i] != 0 )
@@ -438,57 +465,49 @@ void muxer_loop( W2m_q *w2m_q, M2s_q *m2s_q, const int num_slots, const int outf
} // end namespace
-void * muxer( void * arg )
+int compress( const int data_size, const int dictionary_size,
+ const int match_len_limit, const int num_workers,
+ const int num_slots, const int infd, const int outfd,
+ const int debug_level )
{
- const Muxer_arg & tmp = *(Muxer_arg *)arg;
- const int dictionary_size = tmp.dictionary_size;
- const int match_len_limit = tmp.match_len_limit;
- const int num_workers = tmp.num_workers;
- const int num_slots = tmp.num_slots;
- const int debug_level = tmp.debug_level;
- const int infd = tmp.infd;
- const int outfd = tmp.outfd;
- S2w_q s2w_q;
- W2m_q w2m_q;
-
if( debug_level & 2 ) { mallocf = trace_malloc; freef = trace_free; }
else { mallocf = malloc; freef = free; }
- s2w_q_init(&s2w_q);
- w2m_q_init(&w2m_q, num_workers);
- M2s_q m2s_q( num_slots );
-
+ Slot_tally slot_tally( num_slots );
+ S2w_queue s2w_queue;
+ W2m_queue w2m_queue( num_workers );
Splitter_arg splitter_arg;
- splitter_arg.m2s_q = &m2s_q;
- splitter_arg.s2w_q = &s2w_q;
+ splitter_arg.slot_tally = &slot_tally;
+ splitter_arg.s2w_queue = &s2w_queue;
splitter_arg.infd = infd;
- splitter_arg.sizeof_plain = 2 * std::max( 65536, dictionary_size );
- splitter_arg.sizeof_S2w_blk = sizeof(S2w_blk) + splitter_arg.sizeof_plain - 1;
+ splitter_arg.data_size = data_size;
+ splitter_arg.s2w_blk_size = sizeof (S2w_blk) + data_size - 1;
pthread_t splitter_thread;
- xcreate(&splitter_thread, splitter, &splitter_arg);
+ xcreate( &splitter_thread, splitter, &splitter_arg );
Worker_arg worker_arg;
worker_arg.dictionary_size = dictionary_size;
worker_arg.match_len_limit = match_len_limit;
- worker_arg.s2w_q = &s2w_q;
- worker_arg.w2m_q = &w2m_q;
- worker_arg.sizeof_compr = 6 + 20 + ( ( splitter_arg.sizeof_plain / 8 ) * 9 );
- worker_arg.sizeof_W2m_blk = sizeof(W2m_blk) + worker_arg.sizeof_compr - 1;
+ worker_arg.s2w_queue = &s2w_queue;
+ worker_arg.w2m_queue = &w2m_queue;
+ worker_arg.compr_size = 6 + 20 + ( ( data_size / 8 ) * 9 );
+ worker_arg.w2m_blk_size = sizeof (W2m_blk) + worker_arg.compr_size - 1;
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
- if( worker_threads == 0 ) fail("not enough memory.", errno);
+ if( worker_threads == 0 )
+ { show_error( "not enough memory.", errno ); fatal(); }
for( int i = 0; i < num_workers; ++i )
- xcreate(&worker_threads[i], worker, &worker_arg);
+ xcreate( &worker_threads[i], worker, &worker_arg );
- muxer_loop( &w2m_q, &m2s_q, num_slots, outfd );
+ muxer( slot_tally, w2m_queue, num_slots, outfd );
for( int i = num_workers - 1; i >= 0; --i )
xjoin(worker_threads[i]);
delete[] worker_threads; worker_threads = 0;
- xjoin(splitter_thread);
+ xjoin( splitter_thread );
if( verbosity >= 1 )
{
@@ -503,27 +522,27 @@ void * muxer( void * arg )
in_size, out_size );
}
- const int FW = ( sizeof(long unsigned) * 8 ) / 3 + 1;
- if( ( debug_level & 1 ) && 0 > fprintf(stderr,
+ const int FW = ( sizeof (unsigned long) * 8 ) / 3 + 1;
+ if( debug_level & 1 )
+ std::fprintf( stderr,
"any worker tried to consume from splitter: %*lu\n"
"any worker stalled : %*lu\n"
"muxer tried to consume from workers : %*lu\n"
"muxer stalled : %*lu\n"
- "splitter tried to consume from muxer : %*lu\n"
+ "splitter tried to fill a block : %*lu\n"
"splitter stalled : %*lu\n",
- FW, s2w_q.av_or_eof.ccount,
- FW, s2w_q.av_or_eof.wcount,
- FW, w2m_q.av_or_exit.ccount,
- FW, w2m_q.av_or_exit.wcount,
- FW, m2s_q.av.ccount,
- FW, m2s_q.av.wcount) )
- {
- fatal();
- }
-
- assert( m2s_q.num_free == num_slots );
- w2m_q_uninit(&w2m_q);
- s2w_q_uninit(&s2w_q);
- xraise(SIGUSR2);
+ FW, s2w_queue.check_counter,
+ FW, s2w_queue.wait_counter,
+ FW, w2m_queue.check_counter,
+ FW, w2m_queue.wait_counter,
+ FW, slot_tally.check_counter,
+ FW, slot_tally.wait_counter );
+
+ assert( slot_tally.num_free == num_slots );
+ assert( s2w_queue.eof );
+ assert( s2w_queue.head == 0 );
+ assert( s2w_queue.tail == 0 );
+ assert( w2m_queue.num_working == 0 );
+ assert( w2m_queue.head == 0 );
return 0;
}
diff --git a/plzip.h b/plzip.h
index f23152b..d7bb760 100644
--- a/plzip.h
+++ b/plzip.h
@@ -16,17 +16,19 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-struct Muxer_arg
- {
- int dictionary_size;
- int match_len_limit;
- int num_workers;
- int num_slots;
- int debug_level;
- int infd;
- int outfd;
- };
-
-void * muxer( void * arg );
+int compress( const int data_size, const int dictionary_size,
+ const int match_len_limit, const int num_workers,
+ const int num_slots, const int infd, const int outfd,
+ const int debug_level );
+
+
+/*----------------------- Defined in main.cc -----------------------*/
+
+void show_error( const char * msg, const int errcode = 0, const bool help = false ) throw();
+int readblock( const int fd, uint8_t * buf, const int size ) throw();
+int writeblock( const int fd, const uint8_t * buf, const int size ) throw();
+
+
+void fatal(); // Terminate the process
extern int verbosity;