summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniel Baumann <mail@daniel-baumann.ch>2015-11-07 15:33:26 +0000
committerDaniel Baumann <mail@daniel-baumann.ch>2015-11-07 15:33:26 +0000
commit4766a0fb5fc0a2ce7c865450b1f63ddc6556aa3c (patch)
tree433a0c86a57b02c6ddf97910b122d8a3abe13ac8
parentAdding upstream version 0.9. (diff)
downloadplzip-4766a0fb5fc0a2ce7c865450b1f63ddc6556aa3c.tar.xz
plzip-4766a0fb5fc0a2ce7c865450b1f63ddc6556aa3c.zip
Adding upstream version 1.0~rc1.upstream/1.0_rc1
Signed-off-by: Daniel Baumann <mail@daniel-baumann.ch>
-rw-r--r--ChangeLog15
-rw-r--r--INSTALL11
-rw-r--r--Makefile.in30
-rw-r--r--NEWS15
-rw-r--r--README30
-rw-r--r--arg_parser.cc8
-rw-r--r--arg_parser.h2
-rw-r--r--compress.cc155
-rwxr-xr-xconfigure29
-rw-r--r--dec_stdout.cc331
-rw-r--r--dec_stream.cc520
-rw-r--r--decompress.cc551
-rw-r--r--doc/plzip.18
-rw-r--r--doc/plzip.info109
-rw-r--r--doc/plzip.texinfo83
-rw-r--r--file_index.cc143
-rw-r--r--file_index.h77
-rw-r--r--lzip.h246
-rw-r--r--main.cc92
-rw-r--r--plzip.h141
-rwxr-xr-xtestsuite/check.sh74
-rw-r--r--testsuite/test.txt.lzbin0 -> 11518 bytes
-rw-r--r--testsuite/test_v0.lzbin11540 -> 0 bytes
-rw-r--r--testsuite/test_v1.lzbin11548 -> 0 bytes
24 files changed, 1875 insertions, 795 deletions
diff --git a/ChangeLog b/ChangeLog
index bf95a75..1a3691e 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,16 @@
+2013-03-08 Antonio Diaz Diaz <ant_diaz@teleline.es>
+
+ * Version 1.0-rc1 released.
+ * compress.cc: 'deliver_packet' changed to 'deliver_packets'.
+ * Scalability of decompression from/to regular files has been
+ increased by removing splitter and muxer when not needed.
+ * The number of worker threads is now limited to the number of
+ members when decompressing from a regular file.
+ * Makefile.in: Added new target 'install-as-lzip'.
+ * Makefile.in: Added new target 'install-bin'.
+ * main.cc: Use 'setmode' instead of '_setmode' on Windows and OS/2.
+ * main.cc: Define 'strtoull' to 'std::strtoul' on Windows.
+
2012-03-01 Antonio Diaz Diaz <ant_diaz@teleline.es>
* Version 0.9 released.
@@ -82,7 +95,7 @@
until something better appears on the net.
-Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz.
+Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
This file is a collection of facts, and thus it is not copyrightable,
but just in case, you have unlimited permission to copy, distribute and
diff --git a/INSTALL b/INSTALL
index a2998a5..3ef3e11 100644
--- a/INSTALL
+++ b/INSTALL
@@ -1,7 +1,7 @@
Requirements
------------
You will need a C++ compiler and the lzlib compression library installed.
-I use gcc 4.3.5 and 3.3.6, but the code should compile with any
+I use gcc 4.7.2 and 3.3.6, but the code should compile with any
standards compliant compiler.
Lzlib must be version 1.0 or newer.
Gcc is available at http://gcc.gnu.org.
@@ -34,6 +34,13 @@ the main archive.
5. Type 'make install' to install the program and any data files and
documentation.
+ You can install only the program, the info manual or the man page
+ typing 'make install-bin', 'make install-info' or 'make install-man'
+ respectively.
+
+5a. Type 'make install-as-lzip' to install the program and any data
+ files and documentation, and link the program to the name 'lzip'.
+
Another way
-----------
@@ -52,7 +59,7 @@ After running 'configure', you can run 'make' and 'make install' as
explained above.
-Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz.
+Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
This file is free documentation: you have unlimited permission to copy,
distribute and modify it.
diff --git a/Makefile.in b/Makefile.in
index 588ede8..4c78781 100644
--- a/Makefile.in
+++ b/Makefile.in
@@ -7,11 +7,12 @@ INSTALL_DIR = $(INSTALL) -d -m 755
LIBS = -llz -lpthread
SHELL = /bin/sh
-objs = arg_parser.o compress.o decompress.o main.o
+objs = arg_parser.o file_index.o compress.o dec_stdout.o dec_stream.o \
+ decompress.o main.o
-.PHONY : all install install-info install-man install-strip \
- uninstall uninstall-info uninstall-man \
+.PHONY : all install install-bin install-info install-man install-strip \
+ install-as-lzip uninstall uninstall-bin uninstall-info uninstall-man \
doc info man check dist clean distclean
all : $(progname)
@@ -30,9 +31,12 @@ main.o : main.cc
$(objs) : Makefile
arg_parser.o : arg_parser.h
-compress.o : plzip.h
-decompress.o : plzip.h
-main.o : arg_parser.h plzip.h
+compress.o : lzip.h
+dec_stdout.o : lzip.h file_index.h
+dec_stream.o : lzip.h
+decompress.o : lzip.h file_index.h
+file_index.o : lzip.h file_index.h
+main.o : arg_parser.h lzip.h
doc : info man
@@ -54,7 +58,9 @@ Makefile : $(VPATH)/configure $(VPATH)/Makefile.in
check : all
@$(VPATH)/testsuite/check.sh $(VPATH)/testsuite $(pkgversion)
-install : all install-info install-man
+install : install-bin install-info install-man
+
+install-bin : all
if [ ! -d "$(DESTDIR)$(bindir)" ] ; then $(INSTALL_DIR) "$(DESTDIR)$(bindir)" ; fi
$(INSTALL_PROGRAM) ./$(progname) "$(DESTDIR)$(bindir)/$(progname)"
@@ -70,7 +76,13 @@ install-man :
install-strip : all
$(MAKE) INSTALL_PROGRAM='$(INSTALL_PROGRAM) -s' install
-uninstall : uninstall-info uninstall-man
+install-as-lzip : install
+ -rm -f "$(DESTDIR)$(bindir)/lzip"
+ cd "$(DESTDIR)$(bindir)" && ln -s $(progname) lzip
+
+uninstall : uninstall-bin uninstall-info uninstall-man
+
+uninstall-bin :
-rm -f "$(DESTDIR)$(bindir)/$(progname)"
uninstall-info :
@@ -96,7 +108,7 @@ dist : doc
$(DISTNAME)/doc/$(pkgname).texinfo \
$(DISTNAME)/testsuite/check.sh \
$(DISTNAME)/testsuite/test.txt \
- $(DISTNAME)/testsuite/test_v[01].lz \
+ $(DISTNAME)/testsuite/test.txt.lz \
$(DISTNAME)/*.h \
$(DISTNAME)/*.cc
rm -f $(DISTNAME)
diff --git a/NEWS b/NEWS
index 472b614..124d02a 100644
--- a/NEWS
+++ b/NEWS
@@ -1,6 +1,13 @@
-Changes in version 0.9:
+Changes in version 1.0:
-Minor fixes and cleanups.
+Scalability of compression (max number of useful worker threads) has
+been increased.
-Configure option "--datadir" has been renamed to "--datarootdir" to
-follow GNU Standards.
+Scalability when decompressing from/to regular files has been increased.
+
+The number of worker threads is now limited to the number of members in
+the input file when decompressing from a regular file.
+
+The target "install-as-lzip" has been added to the Makefile.
+
+The target "install-bin" has been added to the Makefile.
diff --git a/README b/README
index 4db6172..3f1d72b 100644
--- a/README
+++ b/README
@@ -11,8 +11,36 @@ multiprocessor machines, which makes it specially well suited for
distribution of big software files and large scale data archiving. On
files big enough, plzip can use hundreds of processors.
+Plzip replaces every file given in the command line with a compressed
+version of itself, with the name "original_name.lz". Each compressed
+file has the same modification date, permissions, and, when possible,
+ownership as the corresponding original, so that these properties can be
+correctly restored at decompression time. Plzip is able to read from some
+types of non regular files if the "--stdout" option is specified.
-Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz.
+If no file names are specified, plzip compresses (or decompresses) from
+standard input to standard output. In this case, plzip will decline to
+write compressed output to a terminal, as this would be entirely
+incomprehensible and therefore pointless.
+
+Plzip will correctly decompress a file which is the concatenation of two
+or more compressed files. The result is the concatenation of the
+corresponding uncompressed files. Integrity testing of concatenated
+compressed files is also supported.
+
+As a self-check for your protection, plzip stores in the member trailer
+the 32-bit CRC of the original data and the size of the original data,
+to make sure that the decompressed version of the data is identical to
+the original. This guards against corruption of the compressed data, and
+against undetected bugs in plzip (hopefully very unlikely). The chances
+of data corruption going undetected are microscopic, less than one
+chance in 4000 million for each member processed. Be aware, though, that
+the check occurs upon decompression, so it can only tell you that
+something is wrong. It can't help you recover the original uncompressed
+data.
+
+
+Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
This file is free documentation: you have unlimited permission to copy,
distribute and modify it.
diff --git a/arg_parser.cc b/arg_parser.cc
index b3fd48d..a28d2ba 100644
--- a/arg_parser.cc
+++ b/arg_parser.cc
@@ -1,5 +1,5 @@
/* Arg_parser - POSIX/GNU command line argument parser. (C++ version)
- Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012
+ Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
Antonio Diaz Diaz.
This library is free software: you can redistribute it and/or modify
@@ -36,7 +36,7 @@
bool Arg_parser::parse_long_option( const char * const opt, const char * const arg,
const Option options[], int & argind )
{
- unsigned int len;
+ unsigned len;
int index = -1;
bool exact = false, ambig = false;
@@ -44,7 +44,7 @@ bool Arg_parser::parse_long_option( const char * const opt, const char * const a
// Test all long options for either exact match or abbreviated matches.
for( int i = 0; options[i].code != 0; ++i )
- if( options[i].name && !std::strncmp( options[i].name, &opt[2], len ) )
+ if( options[i].name && std::strncmp( options[i].name, &opt[2], len ) == 0 )
{
if( std::strlen( options[i].name ) == len ) // Exact match found
{ index = i; exact = true; break; }
@@ -178,7 +178,7 @@ Arg_parser::Arg_parser( const int argc, const char * const argv[],
if( error_.size() ) data.clear();
else
{
- for( unsigned int i = 0; i < non_options.size(); ++i )
+ for( unsigned i = 0; i < non_options.size(); ++i )
{ data.push_back( Record() ); data.back().argument.swap( non_options[i] ); }
while( argind < argc )
{ data.push_back( Record() ); data.back().argument = argv[argind++]; }
diff --git a/arg_parser.h b/arg_parser.h
index 4fbd1af..5248cb1 100644
--- a/arg_parser.h
+++ b/arg_parser.h
@@ -1,5 +1,5 @@
/* Arg_parser - POSIX/GNU command line argument parser. (C++ version)
- Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012
+ Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
Antonio Diaz Diaz.
This library is free software: you can redistribute it and/or modify
diff --git a/compress.cc b/compress.cc
index bb50358..c4428ea 100644
--- a/compress.cc
+++ b/compress.cc
@@ -1,6 +1,6 @@
/* Plzip - A parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek.
- Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz.
+ Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -28,12 +28,16 @@
#include <queue>
#include <string>
#include <vector>
-#include <inttypes.h>
#include <pthread.h>
+#include <stdint.h>
#include <unistd.h>
#include <lzlib.h>
-#include "plzip.h"
+#include "lzip.h"
+
+#ifndef LLONG_MAX
+#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL
+#endif
// Returns the number of bytes really read.
@@ -45,13 +49,13 @@ int readblock( const int fd, uint8_t * const buf, const int size )
errno = 0;
while( rest > 0 )
{
- errno = 0;
const int n = read( fd, buf + size - rest, rest );
if( n > 0 ) rest -= n;
- else if( n == 0 ) break;
+ else if( n == 0 ) break; // EOF
else if( errno != EINTR && errno != EAGAIN ) break;
+ errno = 0;
}
- return ( rest > 0 ) ? size - rest : size;
+ return size - rest;
}
@@ -64,12 +68,12 @@ int writeblock( const int fd, const uint8_t * const buf, const int size )
errno = 0;
while( rest > 0 )
{
- errno = 0;
const int n = write( fd, buf + size - rest, rest );
if( n > 0 ) rest -= n;
else if( n < 0 && errno != EINTR && errno != EAGAIN ) break;
+ errno = 0;
}
- return ( rest > 0 ) ? size - rest : size;
+ return size - rest;
}
@@ -136,13 +140,14 @@ void xbroadcast( pthread_cond_t * const cond )
namespace {
-long long in_size = 0;
-long long out_size = 0;
+unsigned long long in_size = 0;
+unsigned long long out_size = 0;
+const char * const mem_msg = "Not enough memory. Try a smaller dictionary size";
struct Packet // data block with a serial number
{
- unsigned long long id; // serial number assigned as received
+ unsigned id; // serial number assigned as received
uint8_t * data;
int size; // number of bytes in data (if any)
};
@@ -151,16 +156,16 @@ struct Packet // data block with a serial number
class Packet_courier // moves packets around
{
public:
- unsigned long icheck_counter;
- unsigned long iwait_counter;
- unsigned long ocheck_counter;
- unsigned long owait_counter;
+ unsigned icheck_counter;
+ unsigned iwait_counter;
+ unsigned ocheck_counter;
+ unsigned owait_counter;
private:
- unsigned long long receive_id; // id assigned to next packet received
- unsigned long long deliver_id; // id of next packet to be delivered
+ unsigned receive_id; // id assigned to next packet received
+ unsigned deliver_id; // id of next packet to be delivered
Slot_tally slot_tally; // limits the number of input packets
std::queue< Packet * > packet_queue;
- std::vector< Packet * > circular_buffer;
+ std::vector< const Packet * > circular_buffer;
int num_working; // number of workers still running
const int num_slots; // max packets in circulation
pthread_mutex_t imutex;
@@ -190,12 +195,10 @@ public:
xdestroy( &iav_or_eof ); xdestroy( &imutex );
}
- const Slot_tally & tally() const { return slot_tally; }
-
// make a packet with data received from splitter
void receive_packet( uint8_t * const data, const int size )
{
- Packet * ipacket = new Packet;
+ Packet * const ipacket = new Packet;
ipacket->id = receive_id++;
ipacket->data = data;
ipacket->size = size;
@@ -216,7 +219,6 @@ public:
{
++iwait_counter;
xwait( &iav_or_eof, &imutex );
- ++icheck_counter;
}
if( !packet_queue.empty() )
{
@@ -224,7 +226,7 @@ public:
packet_queue.pop();
}
xunlock( &imutex );
- if( ipacket == 0 )
+ if( !ipacket )
{
// notify muxer when last worker exits
xlock( &omutex );
@@ -235,36 +237,43 @@ public:
}
// collect a packet from a worker
- void collect_packet( Packet * const opacket )
+ void collect_packet( const Packet * const opacket )
{
+ const int i = opacket->id%num_slots;
xlock( &omutex );
// id collision shouldn't happen
- if( circular_buffer[opacket->id%num_slots] != 0 )
+ if( circular_buffer[i] != 0 )
internal_error( "id collision in collect_packet" );
// merge packet into circular buffer
- circular_buffer[opacket->id%num_slots] = opacket;
+ circular_buffer[i] = opacket;
if( opacket->id == deliver_id ) xsignal( &oav_or_exit );
xunlock( &omutex );
}
- // deliver a packet to muxer
- Packet * deliver_packet()
+ // deliver packets to muxer
+ void deliver_packets( std::vector< const Packet * > & packet_vector )
{
xlock( &omutex );
++ocheck_counter;
- while( circular_buffer[deliver_id%num_slots] == 0 && num_working > 0 )
+ int i = deliver_id % num_slots;
+ while( circular_buffer[i] == 0 && num_working > 0 )
{
++owait_counter;
xwait( &oav_or_exit, &omutex );
- ++ocheck_counter;
}
- Packet * opacket = circular_buffer[deliver_id%num_slots];
- circular_buffer[deliver_id%num_slots] = 0;
- ++deliver_id;
+ packet_vector.clear();
+ while( true )
+ {
+ const Packet * const opacket = circular_buffer[i];
+ if( !opacket ) break;
+ packet_vector.push_back( opacket );
+ circular_buffer[i] = 0;
+ ++deliver_id;
+ i = deliver_id % num_slots;
+ }
xunlock( &omutex );
- if( opacket != 0 )
- slot_tally.leave_slot(); // return a slot to the tally
- return opacket;
+ if( packet_vector.size() ) // return slots to the tally
+ slot_tally.leave_slots( packet_vector.size() );
}
void finish() // splitter has no more packets to send
@@ -308,12 +317,12 @@ extern "C" void * csplitter( void * arg )
for( bool first_post = true; ; first_post = false )
{
uint8_t * const data = new( std::nothrow ) uint8_t[data_size];
- if( data == 0 ) { pp( "Not enough memory" ); fatal(); }
+ if( !data ) { pp( mem_msg ); fatal(); }
const int size = readblock( infd, data, data_size );
if( size != data_size && errno )
{ pp(); show_error( "Read error", errno ); fatal(); }
- if( size > 0 || first_post ) // first packet can be empty
+ if( size > 0 || first_post ) // first packet may be empty
{
in_size += size;
courier.receive_packet( data, size );
@@ -352,11 +361,11 @@ extern "C" void * cworker( void * arg )
while( true )
{
Packet * const packet = courier.distribute_packet();
- if( packet == 0 ) break; // no more packets to process
+ if( !packet ) break; // no more packets to process
const int max_compr_size = 42 + packet->size + ( ( packet->size + 7 ) / 8 );
uint8_t * const new_data = new( std::nothrow ) uint8_t[max_compr_size];
- if( new_data == 0 ) { pp( "Not enough memory" ); fatal(); }
+ if( !new_data ) { pp( mem_msg ); fatal(); }
const int dict_size = std::max( LZ_min_dictionary_size(),
std::min( dictionary_size, packet->size ) );
LZ_Encoder * const encoder =
@@ -364,14 +373,14 @@ extern "C" void * cworker( void * arg )
if( !encoder || LZ_compress_errno( encoder ) != LZ_ok )
{
if( !encoder || LZ_compress_errno( encoder ) == LZ_mem_error )
- pp( "Not enough memory. Try a smaller dictionary size" );
+ pp( mem_msg );
else
internal_error( "invalid argument to encoder" );
fatal();
}
int written = 0;
- int new_size = 0;
+ int new_pos = 0;
while( true )
{
if( LZ_compress_write_size( encoder ) > 0 )
@@ -386,8 +395,8 @@ extern "C" void * cworker( void * arg )
if( written >= packet->size )
{ delete[] packet->data; LZ_compress_finish( encoder ); }
}
- const int rd = LZ_compress_read( encoder, new_data + new_size,
- max_compr_size - new_size );
+ const int rd = LZ_compress_read( encoder, new_data + new_pos,
+ max_compr_size - new_pos );
if( rd < 0 )
{
pp();
@@ -396,8 +405,8 @@ extern "C" void * cworker( void * arg )
LZ_strerror( LZ_compress_errno( encoder ) ) );
fatal();
}
- new_size += rd;
- if( new_size > max_compr_size )
+ new_pos += rd;
+ if( new_pos > max_compr_size )
internal_error( "packet size exceeded in worker" );
if( LZ_compress_finished( encoder ) == 1 ) break;
}
@@ -406,7 +415,7 @@ extern "C" void * cworker( void * arg )
{ pp( "LZ_compress_close failed" ); fatal(); }
packet->data = new_data;
- packet->size = new_size;
+ packet->size = new_pos;
courier.collect_packet( packet );
}
return 0;
@@ -417,21 +426,26 @@ extern "C" void * cworker( void * arg )
// their contents to the output file.
void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
{
+ std::vector< const Packet * > packet_vector;
while( true )
{
- Packet * opacket = courier.deliver_packet();
- if( opacket == 0 ) break; // queue is empty. all workers exited
-
- out_size += opacket->size;
+ courier.deliver_packets( packet_vector );
+ if( packet_vector.size() == 0 ) break; // all workers exited
- if( outfd >= 0 )
+ for( unsigned i = 0; i < packet_vector.size(); ++i )
{
- const int wr = writeblock( outfd, opacket->data, opacket->size );
- if( wr != opacket->size )
- { pp(); show_error( "Write error", errno ); fatal(); }
+ const Packet * const opacket = packet_vector[i];
+ out_size += opacket->size;
+
+ if( outfd >= 0 )
+ {
+ const int wr = writeblock( outfd, opacket->data, opacket->size );
+ if( wr != opacket->size )
+ { pp(); show_error( "Write error", errno ); fatal(); }
+ }
+ delete[] opacket->data;
+ delete opacket;
}
- delete[] opacket->data;
- delete opacket;
}
}
@@ -446,11 +460,11 @@ int compress( const int data_size, const int dictionary_size,
const Pretty_print & pp, const int debug_level )
{
const int slots_per_worker = 2;
- const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ?
- num_workers * slots_per_worker : INT_MAX );
+ const int num_slots =
+ ( ( num_workers > 1 ) ? num_workers * slots_per_worker : 1 );
in_size = 0;
out_size = 0;
- Packet_courier courier( num_workers, num_slots - 1 );
+ Packet_courier courier( num_workers, num_slots );
Splitter_arg splitter_arg;
splitter_arg.courier = &courier;
@@ -470,8 +484,7 @@ int compress( const int data_size, const int dictionary_size,
worker_arg.match_len_limit = match_len_limit;
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
- if( worker_threads == 0 )
- { pp( "Not enough memory" ); fatal(); }
+ if( !worker_threads ) { pp( mem_msg ); fatal(); }
for( int i = 0; i < num_workers; ++i )
{
errcode = pthread_create( worker_threads + i, 0, cworker, &worker_arg );
@@ -487,7 +500,7 @@ int compress( const int data_size, const int dictionary_size,
if( errcode )
{ show_error( "Can't join worker threads", errcode ); fatal(); }
}
- delete[] worker_threads; worker_threads = 0;
+ delete[] worker_threads;
errcode = pthread_join( splitter_thread, 0 );
if( errcode )
@@ -495,11 +508,11 @@ int compress( const int data_size, const int dictionary_size,
if( verbosity >= 1 )
{
- if( in_size <= 0 || out_size <= 0 )
+ if( in_size == 0 || out_size == 0 )
std::fprintf( stderr, " no data compressed.\n" );
else
std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, "
- "%5.2f%% saved, %lld in, %lld out.\n",
+ "%5.2f%% saved, %llu in, %llu out.\n",
(double)in_size / out_size,
( 8.0 * out_size ) / in_size,
100.0 * ( 1.0 - ( (double)out_size / in_size ) ),
@@ -508,14 +521,10 @@ int compress( const int data_size, const int dictionary_size,
if( debug_level & 1 )
std::fprintf( stderr,
- "splitter tried to send a packet %8lu times\n"
- "splitter had to wait %8lu times\n"
- "any worker tried to consume from splitter %8lu times\n"
- "any worker had to wait %8lu times\n"
- "muxer tried to consume from workers %8lu times\n"
- "muxer had to wait %8lu times\n",
- courier.tally().check_counter,
- courier.tally().wait_counter,
+ "any worker tried to consume from splitter %8u times\n"
+ "any worker had to wait %8u times\n"
+ "muxer tried to consume from workers %8u times\n"
+ "muxer had to wait %8u times\n",
courier.icheck_counter,
courier.iwait_counter,
courier.ocheck_counter,
diff --git a/configure b/configure
index 416a21e..c4d3f0a 100755
--- a/configure
+++ b/configure
@@ -1,6 +1,6 @@
#! /bin/sh
# configure script for Plzip - A parallel compressor compatible with lzip
-# Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz.
+# Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
#
# This configure script is free software: you have unlimited permission
# to copy, distribute and modify it.
@@ -8,9 +8,9 @@
args=
no_create=
pkgname=plzip
-pkgversion=0.9
+pkgversion=1.0-rc1
progname=plzip
-srctrigger=plzip.h
+srctrigger=doc/plzip.texinfo
# clear some things potentially inherited from environment.
LC_ALL=C
@@ -22,11 +22,19 @@ bindir='$(exec_prefix)/bin'
datarootdir='$(prefix)/share'
infodir='$(datarootdir)/info'
mandir='$(datarootdir)/man'
-CXX=
+CXX=g++
CPPFLAGS=
CXXFLAGS='-Wall -W -O2'
LDFLAGS=
+# checking whether we are using GNU C++.
+if [ ! -x /bin/g++ ] &&
+ [ ! -x /usr/bin/g++ ] &&
+ [ ! -x /usr/local/bin/g++ ] ; then
+ CXX=c++
+ CXXFLAGS='-W -O2'
+fi
+
# Loop over all args
while [ -n "$1" ] ; do
@@ -109,17 +117,6 @@ fi
# Set srcdir to . if that's what it is.
if [ "`pwd`" = "`cd "${srcdir}" ; pwd`" ] ; then srcdir=. ; fi
-# checking whether we are using GNU C++.
-if [ -z "${CXX}" ] ; then # Let the user override the test.
- if [ -x /bin/g++ ] ||
- [ -x /usr/bin/g++ ] ||
- [ -x /usr/local/bin/g++ ] ; then
- CXX="g++"
- else
- CXX="c++"
- fi
-fi
-
echo
if [ -z "${no_create}" ] ; then
echo "creating config.status"
@@ -152,7 +149,7 @@ echo "LDFLAGS = ${LDFLAGS}"
rm -f Makefile
cat > Makefile << EOF
# Makefile for Plzip - A parallel compressor compatible with lzip
-# Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz.
+# Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
# This file was generated automatically by configure. Do not edit.
#
# This Makefile is free software: you have unlimited permission
diff --git a/dec_stdout.cc b/dec_stdout.cc
new file mode 100644
index 0000000..36be19b
--- /dev/null
+++ b/dec_stdout.cc
@@ -0,0 +1,331 @@
+/* Plzip - A parallel compressor compatible with lzip
+ Copyright (C) 2009 Laszlo Ersek.
+ Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#define _FILE_OFFSET_BITS 64
+
+#include <algorithm>
+#include <cerrno>
+#include <climits>
+#include <csignal>
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <queue>
+#include <string>
+#include <vector>
+#include <pthread.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <lzlib.h>
+
+#include "lzip.h"
+#include "file_index.h"
+
+
+namespace {
+
+enum { max_packet_size = 1 << 20 };
+
+
+struct Packet // data block
+ {
+ uint8_t * data; // data == 0 means end of member
+ int size; // number of bytes in data (if any)
+ };
+
+
+class Packet_courier // moves packets around
+ {
+public:
+ unsigned ocheck_counter;
+ unsigned owait_counter;
+private:
+ int deliver_worker_id; // worker queue currently delivering packets
+ std::vector< std::queue< Packet * > > opacket_queues;
+ int num_working; // number of workers still running
+ const int num_workers; // number of workers
+ const int num_slots; // max output packets in circulation
+ int num_free; // remaining free output slots
+ pthread_mutex_t omutex;
+ pthread_cond_t oav_or_exit; // output packet available or all workers exited
+ pthread_cond_t slot_av; // free output slot available
+
+ Packet_courier( const Packet_courier & ); // declared as private
+ void operator=( const Packet_courier & ); // declared as private
+
+public:
+ Packet_courier( const int workers, const int slots )
+ : ocheck_counter( 0 ), owait_counter( 0 ),
+ deliver_worker_id( 0 ),
+ opacket_queues( workers ), num_working( workers ),
+ num_workers( workers ), num_slots( 8 * slots ), num_free( num_slots )
+ { xinit( &omutex ); xinit( &oav_or_exit ); xinit( &slot_av ); }
+
+ ~Packet_courier()
+ { xdestroy( &slot_av ); xdestroy( &oav_or_exit ); xdestroy( &omutex ); }
+
+ void worker_finished()
+ {
+ // notify muxer when last worker exits
+ xlock( &omutex );
+ if( --num_working == 0 ) xsignal( &oav_or_exit );
+ xunlock( &omutex );
+ }
+
+ // collect a packet from a worker
+ void collect_packet( Packet * const opacket, const int worker_id )
+ {
+ xlock( &omutex );
+ if( opacket->data )
+ {
+ while( worker_id != deliver_worker_id && num_free <= 0 )
+ xwait( &slot_av, &omutex );
+ --num_free;
+ }
+ opacket_queues[worker_id].push( opacket );
+ if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit );
+ xunlock( &omutex );
+ }
+
+ // deliver a packet to muxer
+ // if packet data == 0, move to next queue and wait again
+ Packet * deliver_packet()
+ {
+ Packet * opacket = 0;
+ xlock( &omutex );
+ ++ocheck_counter;
+ while( true )
+ {
+ while( opacket_queues[deliver_worker_id].empty() && num_working > 0 )
+ {
+ ++owait_counter;
+ xwait( &oav_or_exit, &omutex );
+ }
+ if( opacket_queues[deliver_worker_id].empty() ) break;
+ opacket = opacket_queues[deliver_worker_id].front();
+ opacket_queues[deliver_worker_id].pop();
+ if( opacket->data )
+ {
+ if( ++num_free == 1 ) xsignal( &slot_av );
+ break;
+ }
+ if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0;
+ xbroadcast( &slot_av ); // restart deliver_worker_id thread
+ delete opacket; opacket = 0;
+ }
+ xunlock( &omutex );
+ return opacket;
+ }
+
+ bool finished() // all packets delivered to muxer
+ {
+ if( num_free != num_slots || num_working != 0 ) return false;
+ for( int i = 0; i < num_workers; ++i )
+ if( !opacket_queues[i].empty() ) return false;
+ return true;
+ }
+ };
+
+
+struct Worker_arg
+ {
+ const File_index * file_index;
+ Packet_courier * courier;
+ const Pretty_print * pp;
+ int worker_id;
+ int num_workers;
+ int infd;
+ };
+
+
+ // read members from file, decompress their contents, and
+ // give the produced packets to courier.
+extern "C" void * dworker_o( void * arg )
+ {
+ const Worker_arg & tmp = *(Worker_arg *)arg;
+ const File_index & file_index = *tmp.file_index;
+ Packet_courier & courier = *tmp.courier;
+ const Pretty_print & pp = *tmp.pp;
+ const int worker_id = tmp.worker_id;
+ const int num_workers = tmp.num_workers;
+ const int infd = tmp.infd;
+ const int buffer_size = 65536;
+
+ uint8_t * new_data = new( std::nothrow ) uint8_t[max_packet_size];
+ uint8_t * const ibuffer = new( std::nothrow ) uint8_t[buffer_size];
+ LZ_Decoder * const decoder = LZ_decompress_open();
+ if( !new_data || !ibuffer || !decoder ||
+ LZ_decompress_errno( decoder ) != LZ_ok )
+ { pp( "Not enough memory" ); fatal(); }
+ int new_pos = 0;
+
+ for( int i = worker_id; i < file_index.members(); i += num_workers )
+ {
+ long long member_pos = file_index.mblock( i ).pos();
+ long long member_rest = file_index.mblock( i ).size();
+
+ while( member_rest > 0 )
+ {
+ while( LZ_decompress_write_size( decoder ) > 0 )
+ {
+ const int size = std::min( LZ_decompress_write_size( decoder ),
+ (int)std::min( (long long)buffer_size, member_rest ) );
+ if( size > 0 )
+ {
+ if( preadblock( infd, ibuffer, size, member_pos ) != size )
+ { pp(); show_error( "Read error", errno ); fatal(); }
+ member_pos += size;
+ member_rest -= size;
+ if( LZ_decompress_write( decoder, ibuffer, size ) != size )
+ internal_error( "library error (LZ_decompress_write)" );
+ }
+ if( member_rest <= 0 ) { LZ_decompress_finish( decoder ); break; }
+ }
+ while( true ) // read and pack decompressed data
+ {
+ const int rd = LZ_decompress_read( decoder, new_data + new_pos,
+ max_packet_size - new_pos );
+ if( rd < 0 )
+ fatal( decompress_read_error( decoder, pp, worker_id ) );
+ new_pos += rd;
+ if( new_pos > max_packet_size )
+ internal_error( "opacket size exceeded in worker" );
+ if( new_pos == max_packet_size ||
+ LZ_decompress_finished( decoder ) == 1 )
+ {
+ if( new_pos > 0 ) // make data packet
+ {
+ Packet * opacket = new Packet;
+ opacket->data = new_data;
+ opacket->size = new_pos;
+ courier.collect_packet( opacket, worker_id );
+ new_pos = 0;
+ new_data = new( std::nothrow ) uint8_t[max_packet_size];
+ if( !new_data ) { pp( "Not enough memory" ); fatal(); }
+ }
+ if( LZ_decompress_finished( decoder ) == 1 )
+ {
+ LZ_decompress_reset( decoder ); // prepare for new member
+ Packet * opacket = new Packet; // end of member token
+ opacket->data = 0;
+ opacket->size = 0;
+ courier.collect_packet( opacket, worker_id );
+ break;
+ }
+ }
+ if( rd == 0 ) break;
+ }
+ }
+ }
+
+ delete[] ibuffer; delete[] new_data;
+ if( LZ_decompress_member_position( decoder ) != 0 )
+ { pp( "Error, some data remains in decoder" ); fatal(); }
+ if( LZ_decompress_close( decoder ) < 0 )
+ { pp( "LZ_decompress_close failed" ); fatal(); }
+ courier.worker_finished();
+ return 0;
+ }
+
+
+ // get from courier the processed and sorted packets, and write
+ // their contents to the output file.
+void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
+ {
+ while( true )
+ {
+ Packet * opacket = courier.deliver_packet();
+ if( !opacket ) break; // queue is empty. all workers exited
+
+ if( outfd >= 0 )
+ {
+ const int wr = writeblock( outfd, opacket->data, opacket->size );
+ if( wr != opacket->size )
+ { pp(); show_error( "Write error", errno ); fatal(); }
+ }
+ delete[] opacket->data;
+ delete opacket;
+ }
+ }
+
+} // end namespace
+
+
+ // init the courier, then start the workers and call the muxer.
+int dec_stdout( const int num_workers, const int infd, const int outfd,
+ const Pretty_print & pp, const int debug_level,
+ const File_index & file_index )
+ {
+ const int slots_per_worker = 2;
+ const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ?
+ num_workers * slots_per_worker : INT_MAX );
+
+ Packet_courier courier( num_workers, num_slots );
+
+ Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers];
+ pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
+ if( !worker_args || !worker_threads )
+ { pp( "Not enough memory" ); fatal(); }
+ for( int i = 0; i < num_workers; ++i )
+ {
+ worker_args[i].file_index = &file_index;
+ worker_args[i].courier = &courier;
+ worker_args[i].pp = &pp;
+ worker_args[i].worker_id = i;
+ worker_args[i].num_workers = num_workers;
+ worker_args[i].infd = infd;
+ const int errcode =
+ pthread_create( &worker_threads[i], 0, dworker_o, &worker_args[i] );
+ if( errcode )
+ { show_error( "Can't create worker threads", errcode ); fatal(); }
+ }
+
+ muxer( courier, pp, outfd );
+
+ for( int i = num_workers - 1; i >= 0; --i )
+ {
+ const int errcode = pthread_join( worker_threads[i], 0 );
+ if( errcode )
+ { show_error( "Can't join worker threads", errcode ); fatal(); }
+ }
+ delete[] worker_threads;
+ delete[] worker_args;
+
+ const unsigned long long in_size = file_index.file_end();
+ const unsigned long long out_size = file_index.data_end();
+ if( verbosity >= 2 && out_size > 0 && in_size > 0 )
+ std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, %5.2f%% saved. ",
+ (double)out_size / in_size,
+ ( 8.0 * in_size ) / out_size,
+ 100.0 * ( 1.0 - ( (double)in_size / out_size ) ) );
+ if( verbosity >= 3 )
+ std::fprintf( stderr, "decompressed size %9llu, size %9llu. ",
+ out_size, in_size );
+
+ if( verbosity >= 1 ) std::fprintf( stderr, "done\n" );
+
+ if( debug_level & 1 )
+ std::fprintf( stderr,
+ "muxer tried to consume from workers %8u times\n"
+ "muxer had to wait %8u times\n",
+ courier.ocheck_counter,
+ courier.owait_counter );
+
+ if( !courier.finished() ) internal_error( "courier not finished" );
+ return 0;
+ }
diff --git a/dec_stream.cc b/dec_stream.cc
new file mode 100644
index 0000000..91659da
--- /dev/null
+++ b/dec_stream.cc
@@ -0,0 +1,520 @@
+/* Plzip - A parallel compressor compatible with lzip
+ Copyright (C) 2009 Laszlo Ersek.
+ Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#define _FILE_OFFSET_BITS 64
+
+#include <algorithm>
+#include <cerrno>
+#include <climits>
+#include <csignal>
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <queue>
+#include <string>
+#include <vector>
+#include <pthread.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <lzlib.h>
+
+#include "lzip.h"
+
+
+namespace {
+
+enum { max_packet_size = 1 << 20 };
+unsigned long long in_size = 0;
+unsigned long long out_size = 0;
+
+
+struct Packet // data block
+ {
+ uint8_t * data; // data == 0 means end of member
+ int size; // number of bytes in data (if any)
+ };
+
+
+class Packet_courier // moves packets around
+ {
+public:
+ unsigned icheck_counter;
+ unsigned iwait_counter;
+ unsigned ocheck_counter;
+ unsigned owait_counter;
+private:
+ int receive_worker_id; // worker queue currently receiving packets
+ int deliver_worker_id; // worker queue currently delivering packets
+ Slot_tally slot_tally; // limits the number of input packets
+ std::vector< std::queue< Packet * > > ipacket_queues;
+ std::vector< std::queue< Packet * > > opacket_queues;
+ int num_working; // number of workers still running
+ const int num_workers; // number of workers
+ const int num_slots; // max output packets in circulation
+ int num_free; // remaining free output slots
+ pthread_mutex_t imutex;
+ pthread_cond_t iav_or_eof; // input packet available or splitter done
+ pthread_mutex_t omutex;
+ pthread_cond_t oav_or_exit; // output packet available or all workers exited
+ pthread_cond_t slot_av; // free output slot available
+ bool eof; // splitter done
+
+ Packet_courier( const Packet_courier & ); // declared as private
+ void operator=( const Packet_courier & ); // declared as private
+
+public:
+ Packet_courier( const int workers, const int slots )
+ : icheck_counter( 0 ), iwait_counter( 0 ),
+ ocheck_counter( 0 ), owait_counter( 0 ),
+ receive_worker_id( 0 ), deliver_worker_id( 0 ),
+ slot_tally( slots ), ipacket_queues( workers ),
+ opacket_queues( workers ), num_working( workers ),
+ num_workers( workers ), num_slots( 8 * slots ), num_free( num_slots ),
+ eof( false )
+ {
+ xinit( &imutex ); xinit( &iav_or_eof );
+ xinit( &omutex ); xinit( &oav_or_exit ); xinit( &slot_av );
+ }
+
+ ~Packet_courier()
+ {
+ xdestroy( &slot_av ); xdestroy( &oav_or_exit ); xdestroy( &omutex );
+ xdestroy( &iav_or_eof ); xdestroy( &imutex );
+ }
+
+ // make a packet with data received from splitter
+ // if data == 0, move to next queue
+ void receive_packet( uint8_t * const data, const int size )
+ {
+ Packet * ipacket = new Packet;
+ ipacket->data = data;
+ ipacket->size = size;
+ if( data )
+ { in_size += size; slot_tally.get_slot(); } // wait for a free slot
+ xlock( &imutex );
+ ipacket_queues[receive_worker_id].push( ipacket );
+ xbroadcast( &iav_or_eof );
+ xunlock( &imutex );
+ if( !data && ++receive_worker_id >= num_workers )
+ receive_worker_id = 0;
+ }
+
+ // distribute a packet to a worker
+ Packet * distribute_packet( const int worker_id )
+ {
+ Packet * ipacket = 0;
+ xlock( &imutex );
+ ++icheck_counter;
+ while( ipacket_queues[worker_id].empty() && !eof )
+ {
+ ++iwait_counter;
+ xwait( &iav_or_eof, &imutex );
+ }
+ if( !ipacket_queues[worker_id].empty() )
+ {
+ ipacket = ipacket_queues[worker_id].front();
+ ipacket_queues[worker_id].pop();
+ }
+ xunlock( &imutex );
+ if( ipacket )
+ { if( ipacket->data ) slot_tally.leave_slot(); }
+ else
+ {
+ // notify muxer when last worker exits
+ xlock( &omutex );
+ if( --num_working == 0 ) xsignal( &oav_or_exit );
+ xunlock( &omutex );
+ }
+ return ipacket;
+ }
+
+ // collect a packet from a worker
+ void collect_packet( Packet * const opacket, const int worker_id )
+ {
+ xlock( &omutex );
+ if( opacket->data )
+ {
+ while( worker_id != deliver_worker_id && num_free <= 0 )
+ xwait( &slot_av, &omutex );
+ --num_free;
+ }
+ opacket_queues[worker_id].push( opacket );
+ if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit );
+ xunlock( &omutex );
+ }
+
+ // deliver a packet to muxer
+ // if packet data == 0, move to next queue and wait again
+ Packet * deliver_packet()
+ {
+ Packet * opacket = 0;
+ xlock( &omutex );
+ ++ocheck_counter;
+ while( true )
+ {
+ while( opacket_queues[deliver_worker_id].empty() && num_working > 0 )
+ {
+ ++owait_counter;
+ xwait( &oav_or_exit, &omutex );
+ }
+ if( opacket_queues[deliver_worker_id].empty() ) break;
+ opacket = opacket_queues[deliver_worker_id].front();
+ opacket_queues[deliver_worker_id].pop();
+ if( opacket->data )
+ {
+ if( ++num_free == 1 ) xsignal( &slot_av );
+ break;
+ }
+ if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0;
+ xbroadcast( &slot_av ); // restart deliver_worker_id thread
+ delete opacket; opacket = 0;
+ }
+ xunlock( &omutex );
+ return opacket;
+ }
+
+ void finish() // splitter has no more packets to send
+ {
+ xlock( &imutex );
+ eof = true;
+ xbroadcast( &iav_or_eof );
+ xunlock( &imutex );
+ }
+
+ bool finished() // all packets delivered to muxer
+ {
+ if( !slot_tally.all_free() ||
+ num_free != num_slots || !eof || num_working != 0 ) return false;
+ for( int i = 0; i < num_workers; ++i )
+ if( !ipacket_queues[i].empty() ) return false;
+ for( int i = 0; i < num_workers; ++i )
+ if( !opacket_queues[i].empty() ) return false;
+ return true;
+ }
+ };
+
+
+// Search forward from 'pos' for "LZIP" (Boyer-Moore algorithm)
+// Return pos of found string or 'pos+size' if not found.
+//
+int find_magic( const uint8_t * const buffer, const int pos, const int size )
+ {
+ const uint8_t table[256] = {
+ 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
+ 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
+ 4,4,4,4,4,4,4,4,4,1,4,4,3,4,4,4,4,4,4,4,4,4,4,4,4,4,2,4,4,4,4,4,
+ 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
+ 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
+ 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
+ 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
+ 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4 };
+
+ for( int i = pos; i <= pos + size - 4; i += table[buffer[i+3]] )
+ if( buffer[i] == 'L' && buffer[i+1] == 'Z' &&
+ buffer[i+2] == 'I' && buffer[i+3] == 'P' )
+ return i; // magic string found
+ return pos + size;
+ }
+
+
+struct Splitter_arg
+ {
+ Packet_courier * courier;
+ const Pretty_print * pp;
+ int infd;
+ };
+
+
+ // split data from input file into chunks and pass them to
+ // courier for packaging and distribution to workers.
+extern "C" void * dsplitter_s( void * arg )
+ {
+ const Splitter_arg & tmp = *(Splitter_arg *)arg;
+ Packet_courier & courier = *tmp.courier;
+ const Pretty_print & pp = *tmp.pp;
+ const int infd = tmp.infd;
+ const int hsize = 6; // header size
+ const int tsize = 20; // trailer size
+ const int buffer_size = max_packet_size;
+ const int base_buffer_size = tsize + buffer_size + hsize;
+ uint8_t * const base_buffer = new( std::nothrow ) uint8_t[base_buffer_size];
+ if( !base_buffer ) { pp( "Not enough memory" ); fatal(); }
+ uint8_t * const buffer = base_buffer + tsize;
+
+ int size = readblock( infd, buffer, buffer_size + hsize ) - hsize;
+ bool at_stream_end = ( size < buffer_size );
+ if( size != buffer_size && errno )
+ { pp(); show_error( "Read error", errno ); fatal(); }
+ if( size <= tsize )
+ { pp( "Error reading member header" ); fatal(); }
+ if( find_magic( buffer, 0, 4 ) != 0 )
+ { pp( "Bad magic number (file not in lzip format)" ); fatal(); }
+
+ unsigned long long partial_member_size = 0;
+ while( true )
+ {
+ int pos = 0;
+ for( int newpos = 1; newpos <= size; ++newpos )
+ {
+ newpos = find_magic( buffer, newpos, size + 4 - newpos );
+ if( newpos <= size )
+ {
+ unsigned long long member_size = 0;
+ for( int i = 1; i <= 8; ++i )
+ { member_size <<= 8; member_size += base_buffer[tsize+newpos-i]; }
+ if( partial_member_size + newpos - pos == member_size )
+ { // header found
+ uint8_t * const data = new( std::nothrow ) uint8_t[newpos - pos];
+ if( !data ) { pp( "Not enough memory" ); fatal(); }
+ std::memcpy( data, buffer + pos, newpos - pos );
+ courier.receive_packet( data, newpos - pos );
+ courier.receive_packet( 0, 0 ); // end of member token
+ partial_member_size = 0;
+ pos = newpos;
+ }
+ }
+ }
+
+ if( at_stream_end )
+ {
+ uint8_t * data = new( std::nothrow ) uint8_t[size + hsize - pos];
+ if( !data ) { pp( "Not enough memory" ); fatal(); }
+ std::memcpy( data, buffer + pos, size + hsize - pos );
+ courier.receive_packet( data, size + hsize - pos );
+ courier.receive_packet( 0, 0 ); // end of member token
+ break;
+ }
+ if( pos < buffer_size )
+ {
+ partial_member_size += buffer_size - pos;
+ uint8_t * data = new( std::nothrow ) uint8_t[buffer_size - pos];
+ if( !data ) { pp( "Not enough memory" ); fatal(); }
+ std::memcpy( data, buffer + pos, buffer_size - pos );
+ courier.receive_packet( data, buffer_size - pos );
+ }
+ std::memcpy( base_buffer, base_buffer + buffer_size, tsize + hsize );
+ size = readblock( infd, buffer + hsize, buffer_size );
+ at_stream_end = ( size < buffer_size );
+ if( size != buffer_size && errno )
+ { pp(); show_error( "Read error", errno ); fatal(); }
+ }
+ delete[] base_buffer;
+ courier.finish(); // no more packets to send
+ return 0;
+ }
+
+
+struct Worker_arg
+ {
+ Packet_courier * courier;
+ const Pretty_print * pp;
+ int worker_id;
+ };
+
+
+ // consume packets from courier, decompress their contents, and
+ // give the produced packets to courier.
+extern "C" void * dworker_s( void * arg )
+ {
+ const Worker_arg & tmp = *(Worker_arg *)arg;
+ Packet_courier & courier = *tmp.courier;
+ const Pretty_print & pp = *tmp.pp;
+ const int worker_id = tmp.worker_id;
+
+ uint8_t * new_data = new( std::nothrow ) uint8_t[max_packet_size];
+ LZ_Decoder * const decoder = LZ_decompress_open();
+ if( !new_data || !decoder || LZ_decompress_errno( decoder ) != LZ_ok )
+ { pp( "Not enough memory" ); fatal(); }
+ int new_pos = 0;
+ bool trailing_garbage_found = false;
+
+ while( true )
+ {
+ const Packet * const ipacket = courier.distribute_packet( worker_id );
+ if( !ipacket ) break; // no more packets to process
+ if( !ipacket->data ) LZ_decompress_finish( decoder );
+
+ int written = 0;
+ while( !trailing_garbage_found )
+ {
+ if( LZ_decompress_write_size( decoder ) > 0 && written < ipacket->size )
+ {
+ const int wr = LZ_decompress_write( decoder, ipacket->data + written,
+ ipacket->size - written );
+ if( wr < 0 ) internal_error( "library error (LZ_decompress_write)" );
+ written += wr;
+ if( written > ipacket->size )
+ internal_error( "ipacket size exceeded in worker" );
+ }
+ while( !trailing_garbage_found ) // read and pack decompressed data
+ {
+ const int rd = LZ_decompress_read( decoder, new_data + new_pos,
+ max_packet_size - new_pos );
+ if( rd < 0 )
+ {
+ if( LZ_decompress_errno( decoder ) == LZ_header_error )
+ trailing_garbage_found = true;
+ else
+ fatal( decompress_read_error( decoder, pp, worker_id ) );
+ }
+ else new_pos += rd;
+ if( new_pos > max_packet_size )
+ internal_error( "opacket size exceeded in worker" );
+ if( new_pos == max_packet_size || trailing_garbage_found ||
+ LZ_decompress_finished( decoder ) == 1 )
+ {
+ if( new_pos > 0 ) // make data packet
+ {
+ Packet * opacket = new Packet;
+ opacket->data = new_data;
+ opacket->size = new_pos;
+ courier.collect_packet( opacket, worker_id );
+ new_pos = 0;
+ new_data = new( std::nothrow ) uint8_t[max_packet_size];
+ if( !new_data ) { pp( "Not enough memory" ); fatal(); }
+ }
+ if( trailing_garbage_found ||
+ LZ_decompress_finished( decoder ) == 1 )
+ {
+ LZ_decompress_reset( decoder ); // prepare for new ipacket
+ Packet * opacket = new Packet; // end of member token
+ opacket->data = 0;
+ opacket->size = 0;
+ courier.collect_packet( opacket, worker_id );
+ break;
+ }
+ }
+ if( rd == 0 ) break;
+ }
+ if( !ipacket->data || written == ipacket->size ) break;
+ }
+ if( ipacket->data ) delete[] ipacket->data;
+ delete ipacket;
+ }
+
+ delete[] new_data;
+ if( LZ_decompress_member_position( decoder ) != 0 )
+ { pp( "Error, some data remains in decoder" ); fatal(); }
+ if( LZ_decompress_close( decoder ) < 0 )
+ { pp( "LZ_decompress_close failed" ); fatal(); }
+ return 0;
+ }
+
+
+ // get from courier the processed and sorted packets, and write
+ // their contents to the output file.
+void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
+ {
+ while( true )
+ {
+ Packet * opacket = courier.deliver_packet();
+ if( !opacket ) break; // queue is empty. all workers exited
+
+ out_size += opacket->size;
+
+ if( outfd >= 0 )
+ {
+ const int wr = writeblock( outfd, opacket->data, opacket->size );
+ if( wr != opacket->size )
+ { pp(); show_error( "Write error", errno ); fatal(); }
+ }
+ delete[] opacket->data;
+ delete opacket;
+ }
+ }
+
+} // end namespace
+
+
+ // init the courier, then start the splitter and the workers and
+ // call the muxer.
+int dec_stream( const int num_workers, const int infd, const int outfd,
+ const Pretty_print & pp, const int debug_level,
+ const bool testing )
+ {
+ const int slots_per_worker = 2;
+ const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ?
+ num_workers * slots_per_worker : INT_MAX );
+ in_size = 0;
+ out_size = 0;
+ Packet_courier courier( num_workers, num_slots );
+
+ Splitter_arg splitter_arg;
+ splitter_arg.courier = &courier;
+ splitter_arg.pp = &pp;
+ splitter_arg.infd = infd;
+
+ pthread_t splitter_thread;
+ int errcode = pthread_create( &splitter_thread, 0, dsplitter_s, &splitter_arg );
+ if( errcode )
+ { show_error( "Can't create splitter thread", errcode ); fatal(); }
+
+ Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers];
+ pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
+ if( !worker_args || !worker_threads )
+ { pp( "Not enough memory" ); fatal(); }
+ for( int i = 0; i < num_workers; ++i )
+ {
+ worker_args[i].courier = &courier;
+ worker_args[i].pp = &pp;
+ worker_args[i].worker_id = i;
+ errcode = pthread_create( &worker_threads[i], 0, dworker_s, &worker_args[i] );
+ if( errcode )
+ { show_error( "Can't create worker threads", errcode ); fatal(); }
+ }
+
+ muxer( courier, pp, outfd );
+
+ for( int i = num_workers - 1; i >= 0; --i )
+ {
+ errcode = pthread_join( worker_threads[i], 0 );
+ if( errcode )
+ { show_error( "Can't join worker threads", errcode ); fatal(); }
+ }
+ delete[] worker_threads;
+ delete[] worker_args;
+
+ errcode = pthread_join( splitter_thread, 0 );
+ if( errcode )
+ { show_error( "Can't join splitter thread", errcode ); fatal(); }
+
+ if( verbosity >= 2 && out_size > 0 && in_size > 0 )
+ std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, %5.2f%% saved. ",
+ (double)out_size / in_size,
+ ( 8.0 * in_size ) / out_size,
+ 100.0 * ( 1.0 - ( (double)in_size / out_size ) ) );
+ if( verbosity >= 3 )
+ std::fprintf( stderr, "decompressed size %9llu, size %9llu. ",
+ out_size, in_size );
+
+ if( verbosity >= 1 ) std::fprintf( stderr, testing ? "ok\n" : "done\n" );
+
+ if( debug_level & 1 )
+ std::fprintf( stderr,
+ "any worker tried to consume from splitter %8u times\n"
+ "any worker had to wait %8u times\n"
+ "muxer tried to consume from workers %8u times\n"
+ "muxer had to wait %8u times\n",
+ courier.icheck_counter,
+ courier.iwait_counter,
+ courier.ocheck_counter,
+ courier.owait_counter );
+
+ if( !courier.finished() ) internal_error( "courier not finished" );
+ return 0;
+ }
diff --git a/decompress.cc b/decompress.cc
index b440ecd..c861b4d 100644
--- a/decompress.cc
+++ b/decompress.cc
@@ -1,6 +1,6 @@
/* Plzip - A parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek.
- Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz.
+ Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -25,387 +25,164 @@
#include <cstdio>
#include <cstdlib>
#include <cstring>
-#include <queue>
#include <string>
#include <vector>
-#include <inttypes.h>
#include <pthread.h>
+#include <stdint.h>
#include <unistd.h>
+#include <sys/stat.h>
#include <lzlib.h>
-#include "plzip.h"
+#include "lzip.h"
+#include "file_index.h"
-namespace {
-
-enum { max_packet_size = 1 << 20 };
-long long in_size = 0;
-long long out_size = 0;
-
-
-struct Packet // data block
- {
- uint8_t * data; // data == 0 means end of member
- int size; // number of bytes in data (if any)
- };
-
-
-class Packet_courier // moves packets around
+// Returns the number of bytes really read.
+// If (returned value < size) and (errno == 0), means EOF was reached.
+//
+int preadblock( const int fd, uint8_t * const buf, const int size,
+ const long long pos )
{
-public:
- unsigned long icheck_counter;
- unsigned long iwait_counter;
- unsigned long ocheck_counter;
- unsigned long owait_counter;
-private:
- int receive_worker_id; // worker queue currently receiving packets
- int deliver_worker_id; // worker queue currently delivering packets
- Slot_tally slot_tally; // limits the number of input packets
- std::vector< std::queue< Packet * > > ipacket_queues;
- std::vector< std::queue< Packet * > > opacket_queues;
- int num_working; // number of workers still running
- const int num_workers; // number of workers
- int num_free; // remaining free output slots
- pthread_mutex_t imutex;
- pthread_cond_t iav_or_eof; // input packet available or splitter done
- pthread_mutex_t omutex;
- pthread_cond_t oav_or_exit; // output packet available or all workers exited
- pthread_cond_t slot_av; // free output slot available
- bool eof; // splitter done
-
- Packet_courier( const Packet_courier & ); // declared as private
- void operator=( const Packet_courier & ); // declared as private
-
-public:
- Packet_courier( const int workers, const int slots )
- : icheck_counter( 0 ), iwait_counter( 0 ),
- ocheck_counter( 0 ), owait_counter( 0 ),
- receive_worker_id( 0 ), deliver_worker_id( 0 ),
- slot_tally( slots ), ipacket_queues( workers ),
- opacket_queues( workers ), num_working( workers ),
- num_workers( workers ), num_free( 8 * slots ), eof( false )
- {
- xinit( &imutex ); xinit( &iav_or_eof );
- xinit( &omutex ); xinit( &oav_or_exit ); xinit( &slot_av );
- }
-
- ~Packet_courier()
- {
- xdestroy( &slot_av ); xdestroy( &oav_or_exit ); xdestroy( &omutex );
- xdestroy( &iav_or_eof ); xdestroy( &imutex );
- }
-
- const Slot_tally & tally() const { return slot_tally; }
-
- // make a packet with data received from splitter
- // if data == 0, move to next queue
- void receive_packet( uint8_t * const data, const int size )
+ int rest = size;
+ errno = 0;
+ while( rest > 0 )
{
- Packet * ipacket = new Packet;
- ipacket->data = data;
- ipacket->size = size;
- if( data != 0 )
- { in_size += size; slot_tally.get_slot(); } // wait for a free slot
- xlock( &imutex );
- ipacket_queues[receive_worker_id].push( ipacket );
- xbroadcast( &iav_or_eof );
- xunlock( &imutex );
- if( data == 0 && ++receive_worker_id >= num_workers )
- receive_worker_id = 0;
+ const int n = pread( fd, buf + size - rest, rest, pos + size - rest );
+ if( n > 0 ) rest -= n;
+ else if( n == 0 ) break; // EOF
+ else if( errno != EINTR && errno != EAGAIN ) break;
+ errno = 0;
}
-
- // distribute a packet to a worker
- Packet * distribute_packet( const int worker_id )
- {
- Packet * ipacket = 0;
- xlock( &imutex );
- ++icheck_counter;
- while( ipacket_queues[worker_id].empty() && !eof )
- {
- ++iwait_counter;
- xwait( &iav_or_eof, &imutex );
- ++icheck_counter;
- }
- if( !ipacket_queues[worker_id].empty() )
- {
- ipacket = ipacket_queues[worker_id].front();
- ipacket_queues[worker_id].pop();
- }
- xunlock( &imutex );
- if( ipacket != 0 )
- { if( ipacket->data != 0 ) slot_tally.leave_slot(); }
- else
- {
- // notify muxer when last worker exits
- xlock( &omutex );
- if( --num_working == 0 ) xsignal( &oav_or_exit );
- xunlock( &omutex );
- }
- return ipacket;
- }
-
- // collect a packet from a worker
- void collect_packet( Packet * const opacket, const int worker_id )
- {
- xlock( &omutex );
- if( opacket->data != 0 )
- {
- while( worker_id != deliver_worker_id && num_free <= 0 )
- xwait( &slot_av, &omutex );
- --num_free;
- }
- opacket_queues[worker_id].push( opacket );
- if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit );
- xunlock( &omutex );
- }
-
- // deliver a packet to muxer
- // if packet data == 0, move to next queue and wait again
- Packet * deliver_packet()
- {
- Packet * opacket = 0;
- xlock( &omutex );
- ++ocheck_counter;
- while( true )
- {
- while( opacket_queues[deliver_worker_id].empty() && num_working > 0 )
- {
- ++owait_counter;
- xwait( &oav_or_exit, &omutex );
- ++ocheck_counter;
- }
- if( opacket_queues[deliver_worker_id].empty() ) break;
- opacket = opacket_queues[deliver_worker_id].front();
- opacket_queues[deliver_worker_id].pop();
- if( opacket->data != 0 )
- {
- if( ++num_free == 1 ) xsignal( &slot_av );
- break;
- }
- if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0;
- xbroadcast( &slot_av ); // restart deliver_worker_id thread
- delete opacket; opacket = 0;
- }
- xunlock( &omutex );
- return opacket;
- }
-
- void finish() // splitter has no more packets to send
- {
- xlock( &imutex );
- eof = true;
- xbroadcast( &iav_or_eof );
- xunlock( &imutex );
- }
-
- bool finished() // all packets delivered to muxer
- {
- if( !slot_tally.all_free() || !eof || num_working != 0 ) return false;
- for( int i = 0; i < num_workers; ++i )
- if( !ipacket_queues[i].empty() ) return false;
- for( int i = 0; i < num_workers; ++i )
- if( !opacket_queues[i].empty() ) return false;
- return true;
- }
- };
+ return size - rest;
+ }
-// Search forward from 'pos' for "LZIP" (Boyer-Moore algorithm)
-// Return pos of found string or 'pos+size' if not found.
+// Returns the number of bytes really written.
+// If (returned value < size), it is always an error.
//
-int find_magic( const uint8_t * const buffer, const int pos, const int size )
+int pwriteblock( const int fd, const uint8_t * const buf, const int size,
+ const long long pos )
{
- const uint8_t table[256] = {
- 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
- 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
- 4,4,4,4,4,4,4,4,4,1,4,4,3,4,4,4,4,4,4,4,4,4,4,4,4,4,2,4,4,4,4,4,
- 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
- 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
- 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
- 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,
- 4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4 };
-
- for( int i = pos; i <= pos + size - 4; i += table[buffer[i+3]] )
- if( buffer[i] == 'L' && buffer[i+1] == 'Z' &&
- buffer[i+2] == 'I' && buffer[i+3] == 'P' )
- return i; // magic string found
- return pos + size;
+ int rest = size;
+ errno = 0;
+ while( rest > 0 )
+ {
+ const int n = pwrite( fd, buf + size - rest, rest, pos + size - rest );
+ if( n > 0 ) rest -= n;
+ else if( n < 0 && errno != EINTR && errno != EAGAIN ) break;
+ errno = 0;
+ }
+ return size - rest;
}
-struct Splitter_arg
- {
- Packet_courier * courier;
- const Pretty_print * pp;
- int infd;
- };
-
-
- // split data from input file into chunks and pass them to
- // courier for packaging and distribution to workers.
-extern "C" void * dsplitter( void * arg )
+int decompress_read_error( struct LZ_Decoder * const decoder,
+ const Pretty_print & pp, const int worker_id )
{
- const Splitter_arg & tmp = *(Splitter_arg *)arg;
- Packet_courier & courier = *tmp.courier;
- const Pretty_print & pp = *tmp.pp;
- const int infd = tmp.infd;
- const int hsize = 6; // header size
- const int tsize = 20; // trailer size
- const int buffer_size = max_packet_size;
- const int base_buffer_size = tsize + buffer_size + hsize;
- uint8_t * const base_buffer = new( std::nothrow ) uint8_t[base_buffer_size];
- if( base_buffer == 0 ) { pp( "Not enough memory" ); fatal(); }
- uint8_t * const buffer = base_buffer + tsize;
-
- int size = readblock( infd, buffer, buffer_size + hsize ) - hsize;
- bool at_stream_end = ( size < buffer_size );
- if( size != buffer_size && errno )
- { pp(); show_error( "Read error", errno ); fatal(); }
- if( size <= tsize || find_magic( buffer, 0, 4 ) != 0 )
- { pp( "Bad magic number (file not in lzip format)" ); fatal(); }
-
- long long partial_member_size = 0;
- while( true )
- {
- int pos = 0;
- for( int newpos = 1; newpos <= size; ++newpos )
- {
- newpos = find_magic( buffer, newpos, size + 4 - newpos );
- if( newpos <= size )
- {
- long long member_size = 0;
- for( int i = 1; i <= 8; ++i )
- { member_size <<= 8; member_size += base_buffer[tsize+newpos-i]; }
- if( partial_member_size + newpos - pos == member_size )
- { // header found
- uint8_t * const data = new( std::nothrow ) uint8_t[newpos - pos];
- if( data == 0 ) { pp( "Not enough memory" ); fatal(); }
- std::memcpy( data, buffer + pos, newpos - pos );
- courier.receive_packet( data, newpos - pos );
- courier.receive_packet( 0, 0 ); // end of member token
- partial_member_size = 0;
- pos = newpos;
- }
- }
- }
-
- if( at_stream_end )
- {
- uint8_t * data = new( std::nothrow ) uint8_t[size + hsize - pos];
- if( data == 0 ) { pp( "Not enough memory" ); fatal(); }
- std::memcpy( data, buffer + pos, size + hsize - pos );
- courier.receive_packet( data, size + hsize - pos );
- courier.receive_packet( 0, 0 ); // end of member token
- break;
- }
- if( pos < buffer_size )
- {
- partial_member_size += buffer_size - pos;
- uint8_t * data = new( std::nothrow ) uint8_t[buffer_size - pos];
- if( data == 0 ) { pp( "Not enough memory" ); fatal(); }
- std::memcpy( data, buffer + pos, buffer_size - pos );
- courier.receive_packet( data, buffer_size - pos );
- }
- std::memcpy( base_buffer, base_buffer + buffer_size, tsize + hsize );
- size = readblock( infd, buffer + hsize, buffer_size );
- at_stream_end = ( size < buffer_size );
- if( size != buffer_size && errno )
- { pp(); show_error( "Read error", errno ); fatal(); }
- }
- delete[] base_buffer;
- courier.finish(); // no more packets to send
- return 0;
+ const LZ_Errno errcode = LZ_decompress_errno( decoder );
+ pp();
+ if( verbosity >= 0 )
+ std::fprintf( stderr, "LZ_decompress_read error in worker %d: %s.\n",
+ worker_id, LZ_strerror( errcode ) );
+ if( errcode == LZ_header_error || errcode == LZ_unexpected_eof ||
+ errcode == LZ_data_error )
+ return 2;
+ return 1;
}
+namespace {
+
struct Worker_arg
{
- Packet_courier * courier;
+ const File_index * file_index;
const Pretty_print * pp;
int worker_id;
+ int num_workers;
+ int infd;
+ int outfd;
};
- // consume packets from courier, decompress their contents, and
- // give the produced packets to courier.
+ // read members from file, decompress their contents, and
+ // write the produced data to file.
extern "C" void * dworker( void * arg )
{
const Worker_arg & tmp = *(Worker_arg *)arg;
- Packet_courier & courier = *tmp.courier;
+ const File_index & file_index = *tmp.file_index;
const Pretty_print & pp = *tmp.pp;
const int worker_id = tmp.worker_id;
- const int new_data_size = max_packet_size;
+ const int num_workers = tmp.num_workers;
+ const int infd = tmp.infd;
+ const int outfd = tmp.outfd;
+ const int buffer_size = 65536;
- uint8_t * new_data = new( std::nothrow ) uint8_t[new_data_size];
+ uint8_t * const ibuffer = new( std::nothrow ) uint8_t[buffer_size];
+ uint8_t * const obuffer = new( std::nothrow ) uint8_t[buffer_size];
LZ_Decoder * const decoder = LZ_decompress_open();
- if( !new_data || !decoder || LZ_decompress_errno( decoder ) != LZ_ok )
+ if( !ibuffer || !obuffer || !decoder ||
+ LZ_decompress_errno( decoder ) != LZ_ok )
{ pp( "Not enough memory" ); fatal(); }
- int new_pos = 0;
- while( true )
+ for( int i = worker_id; i < file_index.members(); i += num_workers )
{
- const Packet * const ipacket = courier.distribute_packet( worker_id );
- if( ipacket == 0 ) break; // no more packets to process
- if( ipacket->data == 0 ) LZ_decompress_finish( decoder );
+ long long data_pos = file_index.dblock( i ).pos();
+ long long data_rest = file_index.dblock( i ).size();
+ long long member_pos = file_index.mblock( i ).pos();
+ long long member_rest = file_index.mblock( i ).size();
- int written = 0;
- while( true )
+ while( member_rest > 0 )
{
- if( LZ_decompress_write_size( decoder ) > 0 && written < ipacket->size )
+ while( LZ_decompress_write_size( decoder ) > 0 )
{
- const int wr = LZ_decompress_write( decoder, ipacket->data + written,
- ipacket->size - written );
- if( wr < 0 ) internal_error( "library error (LZ_decompress_write)" );
- written += wr;
- if( written > ipacket->size )
- internal_error( "ipacket size exceeded in worker" );
+ const int size = std::min( LZ_decompress_write_size( decoder ),
+ (int)std::min( (long long)buffer_size, member_rest ) );
+ if( size > 0 )
+ {
+ if( preadblock( infd, ibuffer, size, member_pos ) != size )
+ { pp(); show_error( "Read error", errno ); fatal(); }
+ member_pos += size;
+ member_rest -= size;
+ if( LZ_decompress_write( decoder, ibuffer, size ) != size )
+ internal_error( "library error (LZ_decompress_write)" );
+ }
+ if( member_rest <= 0 ) { LZ_decompress_finish( decoder ); break; }
}
- while( true ) // read and pack decompressed data
+ while( true ) // write decompressed data to file
{
- const int rd = LZ_decompress_read( decoder, new_data + new_pos,
- new_data_size - new_pos );
+ const int rd = LZ_decompress_read( decoder, obuffer, buffer_size );
if( rd < 0 )
+ fatal( decompress_read_error( decoder, pp, worker_id ) );
+ if( rd > 0 && outfd >= 0 )
{
- pp();
- if( verbosity >= 0 )
- std::fprintf( stderr, "LZ_decompress_read error in worker %d: %s.\n",
- worker_id, LZ_strerror( LZ_decompress_errno( decoder ) ) );
- fatal();
- }
- new_pos += rd;
- if( new_pos > new_data_size )
- internal_error( "opacket size exceeded in worker" );
- if( new_pos == new_data_size || LZ_decompress_finished( decoder ) == 1 )
- {
- if( new_pos > 0 ) // make data packet
- {
- Packet * opacket = new Packet;
- opacket->data = new_data;
- opacket->size = new_pos;
- courier.collect_packet( opacket, worker_id );
- new_pos = 0;
- new_data = new( std::nothrow ) uint8_t[new_data_size];
- if( new_data == 0 ) { pp( "Not enough memory" ); fatal(); }
- }
- if( LZ_decompress_finished( decoder ) == 1 )
+ const int wr = pwriteblock( outfd, obuffer, rd, data_pos );
+ if( wr != rd )
{
- LZ_decompress_reset( decoder ); // prepare for new ipacket
- Packet * opacket = new Packet; // end of member token
- opacket->data = 0;
- opacket->size = 0;
- courier.collect_packet( opacket, worker_id );
- break;
+ pp();
+ if( verbosity >= 0 )
+ std::fprintf( stderr, "Write error in worker %d: %s\n",
+ worker_id, std::strerror( errno ) );
+ fatal();
}
}
+ if( rd > 0 )
+ {
+ data_pos += rd;
+ data_rest -= rd;
+ }
+ if( LZ_decompress_finished( decoder ) == 1 )
+ {
+ if( data_rest != 0 )
+ internal_error( "final data_rest != 0" );
+ LZ_decompress_reset( decoder ); // prepare for new member
+ break;
+ }
if( rd == 0 ) break;
}
- if( ipacket->data == 0 ) { delete ipacket; break; }
- if( written == ipacket->size )
- { delete[] ipacket->data; delete ipacket; break; }
}
}
- delete[] new_data;
+ delete[] obuffer; delete[] ibuffer;
if( LZ_decompress_member_position( decoder ) != 0 )
{ pp( "Error, some data remains in decoder" ); fatal(); }
if( LZ_decompress_close( decoder ) < 0 )
@@ -413,112 +190,76 @@ extern "C" void * dworker( void * arg )
return 0;
}
+} // end namespace
- // get from courier the processed and sorted packets, and write
- // their contents to the output file.
-void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd )
- {
- while( true )
- {
- Packet * opacket = courier.deliver_packet();
- if( opacket == 0 ) break; // queue is empty. all workers exited
- out_size += opacket->size;
+ // start the workers and wait for them to finish.
+int decompress( int num_workers, const int infd, const int outfd,
+ const Pretty_print & pp, const int debug_level,
+ const bool testing, const bool infd_isreg )
+ {
+ if( !infd_isreg )
+ return dec_stream( num_workers, infd, outfd, pp, debug_level, testing );
- if( outfd >= 0 )
- {
- const int wr = writeblock( outfd, opacket->data, opacket->size );
- if( wr != opacket->size )
- { pp(); show_error( "Write error", errno ); fatal(); }
- }
- delete[] opacket->data;
- delete opacket;
+ const File_index file_index( infd );
+ if( file_index.retval() == 1 )
+ {
+ lseek( infd, 0, SEEK_SET );
+ return dec_stream( num_workers, infd, outfd, pp, debug_level, testing );
}
- }
-
-} // end namespace
+ if( file_index.retval() != 0 )
+ { show_error( file_index.error().c_str() ); return file_index.retval(); }
+ if( num_workers > file_index.members() )
+ num_workers = file_index.members();
- // init the courier, then start the splitter and the workers and
- // call the muxer.
-int decompress( const int num_workers, const int infd, const int outfd,
- const Pretty_print & pp, const int debug_level,
- const bool testing )
- {
- const int slots_per_worker = 2;
- const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ?
- num_workers * slots_per_worker : INT_MAX );
- in_size = 0;
- out_size = 0;
- Packet_courier courier( num_workers, num_slots );
-
- Splitter_arg splitter_arg;
- splitter_arg.courier = &courier;
- splitter_arg.pp = &pp;
- splitter_arg.infd = infd;
-
- pthread_t splitter_thread;
- int errcode = pthread_create( &splitter_thread, 0, dsplitter, &splitter_arg );
- if( errcode )
- { show_error( "Can't create splitter thread", errcode ); fatal(); }
+ if( outfd >= 0 )
+ {
+ struct stat st;
+ if( fstat( outfd, &st ) != 0 || !S_ISREG( st.st_mode ) ||
+ lseek( outfd, 0, SEEK_CUR ) < 0 )
+ return dec_stdout( num_workers, infd, outfd, pp, debug_level, file_index );
+ }
Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers];
pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
- if( worker_args == 0 || worker_threads == 0 )
+ if( !worker_args || !worker_threads )
{ pp( "Not enough memory" ); fatal(); }
for( int i = 0; i < num_workers; ++i )
{
- worker_args[i].courier = &courier;
+ worker_args[i].file_index = &file_index;
worker_args[i].pp = &pp;
worker_args[i].worker_id = i;
- errcode = pthread_create( &worker_threads[i], 0, dworker, &worker_args[i] );
+ worker_args[i].num_workers = num_workers;
+ worker_args[i].infd = infd;
+ worker_args[i].outfd = outfd;
+ const int errcode =
+ pthread_create( &worker_threads[i], 0, dworker, &worker_args[i] );
if( errcode )
{ show_error( "Can't create worker threads", errcode ); fatal(); }
}
- muxer( courier, pp, outfd );
-
for( int i = num_workers - 1; i >= 0; --i )
{
- errcode = pthread_join( worker_threads[i], 0 );
+ const int errcode = pthread_join( worker_threads[i], 0 );
if( errcode )
{ show_error( "Can't join worker threads", errcode ); fatal(); }
}
- delete[] worker_threads; worker_threads = 0;
- delete[] worker_args; worker_args = 0;
-
- errcode = pthread_join( splitter_thread, 0 );
- if( errcode )
- { show_error( "Can't join splitter thread", errcode ); fatal(); }
+ delete[] worker_threads;
+ delete[] worker_args;
- if( verbosity >= 3 && out_size > 0 && in_size > 0 )
+ const unsigned long long in_size = file_index.file_end();
+ const unsigned long long out_size = file_index.data_end();
+ if( verbosity >= 2 && out_size > 0 && in_size > 0 )
std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, %5.2f%% saved. ",
(double)out_size / in_size,
( 8.0 * in_size ) / out_size,
100.0 * ( 1.0 - ( (double)in_size / out_size ) ) );
- if( verbosity >= 2 )
- std::fprintf( stderr, "decompressed size %9lld, size %9lld. ",
+ if( verbosity >= 3 )
+ std::fprintf( stderr, "decompressed size %9llu, size %9llu. ",
out_size, in_size );
- if( verbosity >= 1 )
- { if( testing ) std::fprintf( stderr, "ok\n" );
- else std::fprintf( stderr, "done\n" ); }
-
- if( debug_level & 1 )
- std::fprintf( stderr,
- "splitter tried to send a packet %8lu times\n"
- "splitter had to wait %8lu times\n"
- "any worker tried to consume from splitter %8lu times\n"
- "any worker had to wait %8lu times\n"
- "muxer tried to consume from workers %8lu times\n"
- "muxer had to wait %8lu times\n",
- courier.tally().check_counter,
- courier.tally().wait_counter,
- courier.icheck_counter,
- courier.iwait_counter,
- courier.ocheck_counter,
- courier.owait_counter );
-
- if( !courier.finished() ) internal_error( "courier not finished" );
+ if( verbosity >= 1 ) std::fprintf( stderr, testing ? "ok\n" : "done\n" );
+
return 0;
}
diff --git a/doc/plzip.1 b/doc/plzip.1
index 4bdc86e..2b1261b 100644
--- a/doc/plzip.1
+++ b/doc/plzip.1
@@ -1,5 +1,5 @@
.\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.37.1.
-.TH PLZIP "1" "March 2012" "Plzip 0.9" "User Commands"
+.TH PLZIP "1" "March 2013" "Plzip 1.0-rc1" "User Commands"
.SH NAME
Plzip \- reduces the size of files
.SH SYNOPSIS
@@ -37,7 +37,7 @@ keep (don't delete) input files
set match length limit in bytes [36]
.TP
\fB\-n\fR, \fB\-\-threads=\fR<n>
-set the number of (de)compression threads
+set number of (de)compression threads [1]
.TP
\fB\-o\fR, \fB\-\-output=\fR<file>
if reading stdin, place the output into <file>
@@ -78,8 +78,8 @@ Plzip home page: http://www.nongnu.org/lzip/plzip.html
.SH COPYRIGHT
Copyright \(co 2009 Laszlo Ersek.
.br
-Copyright \(co 2012 Antonio Diaz Diaz.
-Using Lzlib 1.3
+Copyright \(co 2013 Antonio Diaz Diaz.
+Using Lzlib 1.4\-rc2
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
.br
This is free software: you are free to change and redistribute it.
diff --git a/doc/plzip.info b/doc/plzip.info
index 94d96af..bf22e32 100644
--- a/doc/plzip.info
+++ b/doc/plzip.info
@@ -12,25 +12,25 @@ File: plzip.info, Node: Top, Next: Introduction, Up: (dir)
Plzip Manual
************
-This manual is for Plzip (version 0.9, 1 March 2012).
+This manual is for Plzip (version 1.0-rc1, 8 March 2013).
* Menu:
-* Introduction:: Purpose and features of plzip
-* Invoking Plzip:: Command line interface
-* Program Design:: Internal structure of plzip
-* File Format:: Detailed format of the compressed file
-* Problems:: Reporting bugs
-* Concept Index:: Index of concepts
+* Introduction:: Purpose and features of plzip
+* Program Design:: Internal structure of plzip
+* Invoking Plzip:: Command line interface
+* File Format:: Detailed format of the compressed file
+* Problems:: Reporting bugs
+* Concept Index:: Index of concepts
- Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz.
+ Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
This manual is free documentation: you have unlimited permission to
copy, distribute and modify it.

-File: plzip.info, Node: Introduction, Next: Invoking Plzip, Prev: Top, Up: Top
+File: plzip.info, Node: Introduction, Next: Program Design, Prev: Top, Up: Top
1 Introduction
**************
@@ -94,9 +94,32 @@ corrupt or invalid input file, 3 for an internal consistency error (eg,
bug) which caused plzip to panic.

-File: plzip.info, Node: Invoking Plzip, Next: Program Design, Prev: Introduction, Up: Top
+File: plzip.info, Node: Program Design, Next: Invoking Plzip, Prev: Introduction, Up: Top
-2 Invoking Plzip
+2 Program Design
+****************
+
+For each input file, a splitter thread and several worker threads are
+created, acting the main thread as muxer (multiplexer) thread. A "packet
+courier" takes care of data transfers among threads and limits the
+maximum number of data blocks (packets) being processed simultaneously.
+
+ The splitter reads data blocks from the input file, and distributes
+them to the workers. The workers (de)compress the blocks received from
+the splitter. The muxer collects processed packets from the workers, and
+writes them to the output file.
+
+ When decompressing from a regular file, the splitter is removed and
+the workers read directly from the input file. If the output file is
+also a regular file, the muxer is also removed, and the workers write
+directly to the output file. With these optimizations, decompression
+speed of large files with many members is only limited by the number of
+processors available and by I/O speed.
+
+
+File: plzip.info, Node: Invoking Plzip, Next: File Format, Prev: Program Design, Up: Top
+
+3 Invoking Plzip
****************
The format for running plzip is:
@@ -156,7 +179,8 @@ The format for running plzip is:
Set the number of worker threads. Valid values range from 1 to "as
many as your system can support". If this option is not used,
plzip tries to detect the number of processors in the system and
- use it as default value.
+ use it as default value. `plzip --help' shows the system's default
+ value.
`-o FILE'
`--output=FILE'
@@ -243,28 +267,17 @@ Z zettabyte (10^21) | Zi zebibyte (2^70)
Y yottabyte (10^24) | Yi yobibyte (2^80)

-File: plzip.info, Node: Program Design, Next: File Format, Prev: Invoking Plzip, Up: Top
-
-3 Program Design
-****************
-
-For each input file, a splitter thread and several worker threads are
-created, acting the main thread as muxer (multiplexer) thread. A "packet
-courier" takes care of data transfers among threads and limits the
-maximum number of data blocks (packets) being processed simultaneously.
-
- The splitter reads data blocks from the input file, and distributes
-them to the workers. The workers (de)compress the blocks received from
-the splitter. The muxer collects processed packets from the workers, and
-writes them to the output file.
-
-
-File: plzip.info, Node: File Format, Next: Problems, Prev: Program Design, Up: Top
+File: plzip.info, Node: File Format, Next: Problems, Prev: Invoking Plzip, Up: Top
4 File Format
*************
-In the diagram below, a box like this:
+Perfection is reached, not when there is no longer anything to add, but
+when there is no longer anything to take away.
+-- Antoine de Saint-Exupery
+
+
+ In the diagram below, a box like this:
+---+
| | <-- the vertical bars might be missing
+---+
@@ -293,15 +306,19 @@ additional information before, between, or after them.
"LZIP".
`VN (version number, 1 byte)'
- Just in case something needs to be modified in the future. Valid
- values are 0 and 1. Version 0 files are deprecated. They can
- contain only one member and lack the `Member size' field.
+ Just in case something needs to be modified in the future. 1 for
+ now.
`DS (coded dictionary size, 1 byte)'
- Bits 4-0 contain the base 2 logarithm of the base dictionary size.
- Bits 7-5 contain the number of "wedges" to substract from the base
- dictionary size to obtain the dictionary size. The size of a wedge
- is (base dictionary size / 16).
+ Lzip divides the distance between any two powers of 2 into 8
+ equally spaced intervals, named "wedges". The dictionary size is
+ calculated by taking a power of 2 (the base size) and substracting
+ from it a number of wedges between 0 and 7. The size of a wedge is
+ (base_size / 16).
+ Bits 4-0 contain the base 2 logarithm of the base size (12 to 29).
+ Bits 7-5 contain the number of wedges (0 to 7) to substract from
+ the base size to obtain the dictionary size.
+ Example: 0xD3 = (2^19 - 6 * 2^15) = (512KiB - 6 * 32KiB) = 320KiB
Valid values for dictionary size range from 4KiB to 512MiB.
`Lzma stream'
@@ -315,9 +332,9 @@ additional information before, between, or after them.
Size of the uncompressed original data.
`Member size (8 bytes)'
- Total size of the member, including header and trailer. This
- facilitates safe recovery of undamaged members from multi-member
- files.
+ Total size of the member, including header and trailer. This field
+ acts as a distributed index, and facilitates safe recovery of
+ undamaged members from multi-member files.

@@ -358,12 +375,12 @@ Concept Index

Tag Table:
Node: Top223
-Node: Introduction842
-Node: Invoking Plzip4008
-Node: Program Design8964
-Node: File Format9626
-Node: Problems11621
-Node: Concept Index12150
+Node: Introduction864
+Node: Program Design4030
+Node: Invoking Plzip5084
+Node: File Format10093
+Node: Problems12473
+Node: Concept Index13002

End Tag Table
diff --git a/doc/plzip.texinfo b/doc/plzip.texinfo
index f981207..5e62234 100644
--- a/doc/plzip.texinfo
+++ b/doc/plzip.texinfo
@@ -6,8 +6,8 @@
@finalout
@c %**end of header
-@set UPDATED 1 March 2012
-@set VERSION 0.9
+@set UPDATED 8 March 2013
+@set VERSION 1.0-rc1
@dircategory Data Compression
@direntry
@@ -35,16 +35,16 @@
This manual is for Plzip (version @value{VERSION}, @value{UPDATED}).
@menu
-* Introduction:: Purpose and features of plzip
-* Invoking Plzip:: Command line interface
-* Program Design:: Internal structure of plzip
-* File Format:: Detailed format of the compressed file
-* Problems:: Reporting bugs
-* Concept Index:: Index of concepts
+* Introduction:: Purpose and features of plzip
+* Program Design:: Internal structure of plzip
+* Invoking Plzip:: Command line interface
+* File Format:: Detailed format of the compressed file
+* Problems:: Reporting bugs
+* Concept Index:: Index of concepts
@end menu
@sp 1
-Copyright @copyright{} 2009, 2010, 2011, 2012 Antonio Diaz Diaz.
+Copyright @copyright{} 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
This manual is free documentation: you have unlimited permission
to copy, distribute and modify it.
@@ -115,6 +115,28 @@ invalid input file, 3 for an internal consistency error (eg, bug) which
caused plzip to panic.
+@node Program Design
+@chapter Program Design
+@cindex program design
+
+For each input file, a splitter thread and several worker threads are
+created, acting the main thread as muxer (multiplexer) thread. A "packet
+courier" takes care of data transfers among threads and limits the
+maximum number of data blocks (packets) being processed simultaneously.
+
+The splitter reads data blocks from the input file, and distributes them
+to the workers. The workers (de)compress the blocks received from the
+splitter. The muxer collects processed packets from the workers, and
+writes them to the output file.
+
+When decompressing from a regular file, the splitter is removed and the
+workers read directly from the input file. If the output file is also a
+regular file, the muxer is also removed, and the workers write directly
+to the output file. With these optimizations, decompression speed of
+large files with many members is only limited by the number of
+processors available and by I/O speed.
+
+
@node Invoking Plzip
@chapter Invoking Plzip
@cindex invoking
@@ -180,7 +202,7 @@ usually give better compression ratios but longer compression times.
Set the number of worker threads. Valid values range from 1 to "as many
as your system can support". If this option is not used, plzip tries to
detect the number of processors in the system and use it as default
-value.
+value. @w{@samp{plzip --help}} shows the system's default value.
@item -o @var{file}
@itemx --output=@var{file}
@@ -268,25 +290,15 @@ Table of SI and binary prefixes (unit multipliers):
@end multitable
-@node Program Design
-@chapter Program Design
-@cindex program design
-
-For each input file, a splitter thread and several worker threads are
-created, acting the main thread as muxer (multiplexer) thread. A "packet
-courier" takes care of data transfers among threads and limits the
-maximum number of data blocks (packets) being processed simultaneously.
-
-The splitter reads data blocks from the input file, and distributes them
-to the workers. The workers (de)compress the blocks received from the
-splitter. The muxer collects processed packets from the workers, and
-writes them to the output file.
-
-
@node File Format
@chapter File Format
@cindex file format
+Perfection is reached, not when there is no longer anything to add, but
+when there is no longer anything to take away.@*
+--- Antoine de Saint-Exupery
+
+@sp 1
In the diagram below, a box like this:
@verbatim
+---+
@@ -322,15 +334,17 @@ All multibyte values are stored in little endian order.
A four byte string, identifying the lzip format, with the value "LZIP".
@item VN (version number, 1 byte)
-Just in case something needs to be modified in the future. Valid values
-are 0 and 1. Version 0 files are deprecated. They can contain only one
-member and lack the @samp{Member size} field.
+Just in case something needs to be modified in the future. 1 for now.
@item DS (coded dictionary size, 1 byte)
-Bits 4-0 contain the base 2 logarithm of the base dictionary size.@*
-Bits 7-5 contain the number of "wedges" to substract from the base
-dictionary size to obtain the dictionary size. The size of a wedge is
-(base dictionary size / 16).@*
+Lzip divides the distance between any two powers of 2 into 8 equally
+spaced intervals, named "wedges". The dictionary size is calculated by
+taking a power of 2 (the base size) and substracting from it a number of
+wedges between 0 and 7. The size of a wedge is (base_size / 16).@*
+Bits 4-0 contain the base 2 logarithm of the base size (12 to 29).@*
+Bits 7-5 contain the number of wedges (0 to 7) to substract from the
+base size to obtain the dictionary size.@*
+Example: 0xD3 = (2^19 - 6 * 2^15) = (512KiB - 6 * 32KiB) = 320KiB@*
Valid values for dictionary size range from 4KiB to 512MiB.
@item Lzma stream
@@ -344,8 +358,9 @@ CRC of the uncompressed original data.
Size of the uncompressed original data.
@item Member size (8 bytes)
-Total size of the member, including header and trailer. This facilitates
-safe recovery of undamaged members from multi-member files.
+Total size of the member, including header and trailer. This field acts
+as a distributed index, and facilitates safe recovery of undamaged
+members from multi-member files.
@end table
diff --git a/file_index.cc b/file_index.cc
new file mode 100644
index 0000000..5cdba46
--- /dev/null
+++ b/file_index.cc
@@ -0,0 +1,143 @@
+/* Plzip - A parallel compressor compatible with lzip
+ Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <algorithm>
+#include <cerrno>
+#include <cstdio>
+#include <cstring>
+#include <string>
+#include <vector>
+#include <stdint.h>
+#include <unistd.h>
+
+#include "lzip.h"
+#include "file_index.h"
+
+
+int seek_read( const int fd, uint8_t * const buf, const int size,
+ const long long pos )
+ {
+ if( lseek( fd, pos, SEEK_SET ) == pos )
+ return readblock( fd, buf, size );
+ return 0;
+ }
+
+
+const char * format_num( unsigned long long num,
+ unsigned long long limit = -1ULL,
+ const int set_prefix = 0 )
+ {
+ const char * const si_prefix[8] =
+ { "k", "M", "G", "T", "P", "E", "Z", "Y" };
+ const char * const binary_prefix[8] =
+ { "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi", "Yi" };
+ static bool si = true;
+ static char buf[32];
+
+ if( set_prefix ) si = ( set_prefix > 0 );
+ const unsigned factor = ( si ? 1000 : 1024 );
+ const char * const * prefix = ( si ? si_prefix : binary_prefix );
+ const char * p = "";
+ bool exact = ( num % factor == 0 );
+
+ for( int i = 0; i < 8 && ( num > limit || ( exact && num >= factor ) ); ++i )
+ { num /= factor; if( num % factor != 0 ) exact = false; p = prefix[i]; }
+ snprintf( buf, sizeof buf, "%llu %s", num, p );
+ return buf;
+ }
+
+
+File_index::File_index( const int infd ) : retval_( 0 )
+ {
+ const long long isize = lseek( infd, 0, SEEK_END );
+ if( isize < 0 )
+ { error_ = "Input file is not seekable :";
+ error_ += std::strerror( errno ); retval_ = 1; return; }
+ if( isize > INT64_MAX )
+ { error_ = "Input file is too long (2^63 bytes or more).";
+ retval_ = 2; return; }
+ long long pos = isize; // always points to a header or EOF
+ File_header header;
+ File_trailer trailer;
+
+ if( isize < min_member_size )
+ { error_ = "Input file is too short."; retval_ = 2; return; }
+ if( seek_read( infd, header.data, File_header::size, 0 ) != File_header::size )
+ { error_ = "Error reading member header :";
+ error_ += std::strerror( errno ); retval_ = 1; return; }
+ if( !header.verify_magic() )
+ { error_ = "Bad magic number (file not in lzip format).";
+ retval_ = 2; return; }
+ if( !header.verify_version() )
+ { error_ = "Version "; error_ += format_num( header.version() );
+ error_ += "member format not supported."; retval_ = 2; return; }
+
+ while( pos >= min_member_size )
+ {
+ if( seek_read( infd, trailer.data, File_trailer::size,
+ pos - File_trailer::size ) != File_trailer::size )
+ { error_ = "Error reading member trailer :";
+ error_ += std::strerror( errno ); retval_ = 1; break; }
+ const long long member_size = trailer.member_size();
+ if( member_size < min_member_size || member_size > pos )
+ {
+ if( member_vector.size() == 0 ) // maybe trailing garbage
+ { --pos; continue; }
+ error_ = "Member size in trailer is corrupt at pos ";
+ error_ += format_num( pos - 8 ); retval_ = 2; break;
+ }
+ if( seek_read( infd, header.data, File_header::size,
+ pos - member_size ) != File_header::size )
+ { error_ = "Error reading member header :";
+ error_ += std::strerror( errno ); retval_ = 1; break; }
+ if( !header.verify_magic() || !header.verify_version() )
+ {
+ if( member_vector.size() == 0 ) // maybe trailing garbage
+ { --pos; continue; }
+ error_ = "Bad header at pos ";
+ error_ += format_num( pos - member_size ); retval_ = 2; break;
+ }
+ if( member_vector.size() == 0 && isize - pos > File_header::size &&
+ seek_read( infd, header.data, File_header::size, pos ) == File_header::size &&
+ header.verify_magic() && header.verify_version() )
+ { // last trailer is corrupt
+ error_ = "Member size in trailer is corrupt at pos ";
+ error_ += format_num( isize - 8 ); retval_ = 2; break;
+ }
+ pos -= member_size;
+ member_vector.push_back( Member( 0, trailer.data_size(),
+ pos, member_size ) );
+ }
+ if( pos != 0 || member_vector.size() == 0 )
+ {
+ member_vector.clear();
+ if( retval_ == 0 ) { error_ = "Can't create file index."; retval_ = 2; }
+ return;
+ }
+ std::reverse( member_vector.begin(), member_vector.end() );
+ for( unsigned i = 0; i < member_vector.size() - 1; ++i )
+ {
+ const long long end = member_vector[i].dblock.end();
+ if( end < 0 || end > INT64_MAX )
+ {
+ member_vector.clear();
+ error_ = "Data in input file is too long (2^63 bytes or more).";
+ retval_ = 2; return;
+ }
+ member_vector[i+1].dblock.pos( end );
+ }
+ }
diff --git a/file_index.h b/file_index.h
new file mode 100644
index 0000000..1dfbcf4
--- /dev/null
+++ b/file_index.h
@@ -0,0 +1,77 @@
+/* Plzip - A parallel compressor compatible with lzip
+ Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef INT64_MAX
+#define INT64_MAX 0x7FFFFFFFFFFFFFFFLL
+#endif
+
+
+class Block
+ {
+ long long pos_, size_; // pos + size <= INT64_MAX
+
+public:
+ Block( const long long p, const long long s ) : pos_( p ), size_( s ) {}
+
+ long long pos() const { return pos_; }
+ long long size() const { return size_; }
+ long long end() const { return pos_ + size_; }
+
+ void pos( const long long p ) { pos_ = p; }
+ void size( const long long s ) { size_ = s; }
+
+ bool overlaps( const Block & b ) const
+ { return ( pos_ < b.end() && b.pos_ < end() ); }
+ void shift( Block & b ) { ++size_; ++b.pos_; --b.size_; }
+ };
+
+
+class File_index
+ {
+ struct Member
+ {
+ Block dblock, mblock; // data block, member block
+
+ Member( const long long dp, const long long ds,
+ const long long mp, const long long ms )
+ : dblock( dp, ds ), mblock( mp, ms ) {}
+ };
+
+ std::vector< Member > member_vector;
+ std::string error_;
+ int retval_;
+
+public:
+ File_index( const int infd );
+
+ const std::string & error() const { return error_; }
+ int retval() const { return retval_; }
+
+ long long data_end() const
+ { if( member_vector.size() ) return member_vector.back().dblock.end();
+ else return 0; }
+
+ long long file_end() const
+ { if( member_vector.size() ) return member_vector.back().mblock.end();
+ else return 0; }
+
+ const Block & dblock( const int i ) const
+ { return member_vector[i].dblock; }
+ const Block & mblock( const int i ) const
+ { return member_vector[i].mblock; }
+ int members() const { return (int)member_vector.size(); }
+ };
diff --git a/lzip.h b/lzip.h
new file mode 100644
index 0000000..1097c98
--- /dev/null
+++ b/lzip.h
@@ -0,0 +1,246 @@
+/* Plzip - A parallel compressor compatible with lzip
+ Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+enum {
+ min_dictionary_bits = 12,
+ min_dictionary_size = 1 << min_dictionary_bits,
+ max_dictionary_bits = 29,
+ max_dictionary_size = 1 << max_dictionary_bits,
+ min_member_size = 36 };
+
+
+class Pretty_print
+ {
+ std::string name_;
+ const char * const stdin_name;
+ unsigned longest_name;
+ mutable bool first_post;
+
+public:
+ explicit Pretty_print( const std::vector< std::string > & filenames )
+ : stdin_name( "(stdin)" ), longest_name( 0 ), first_post( false )
+ {
+ const unsigned stdin_name_len = std::strlen( stdin_name );
+ for( unsigned i = 0; i < filenames.size(); ++i )
+ {
+ const std::string & s = filenames[i];
+ const unsigned len = ( ( s == "-" ) ? stdin_name_len : s.size() );
+ if( len > longest_name ) longest_name = len;
+ }
+ if( longest_name == 0 ) longest_name = stdin_name_len;
+ }
+
+ void set_name( const std::string & filename )
+ {
+ if( filename.size() && filename != "-" ) name_ = filename;
+ else name_ = stdin_name;
+ first_post = true;
+ }
+
+ void reset() const { if( name_.size() ) first_post = true; }
+ const char * name() const { return name_.c_str(); }
+ void operator()( const char * const msg = 0 ) const;
+ };
+
+
+inline int real_bits( unsigned value )
+ {
+ int bits = 0;
+ while( value > 0 ) { value >>= 1; ++bits; }
+ return bits;
+ }
+
+
+const uint8_t magic_string[4] = { 0x4C, 0x5A, 0x49, 0x50 }; // "LZIP"
+
+struct File_header
+ {
+ uint8_t data[6]; // 0-3 magic bytes
+ // 4 version
+ // 5 coded_dict_size
+ enum { size = 6 };
+
+ void set_magic() { std::memcpy( data, magic_string, 4 ); data[4] = 1; }
+ bool verify_magic() const
+ { return ( std::memcmp( data, magic_string, 4 ) == 0 ); }
+
+ uint8_t version() const { return data[4]; }
+ bool verify_version() const { return ( data[4] == 1 ); }
+
+ unsigned dictionary_size() const
+ {
+ unsigned sz = ( 1 << ( data[5] & 0x1F ) );
+ if( sz > min_dictionary_size )
+ sz -= ( sz / 16 ) * ( ( data[5] >> 5 ) & 7 );
+ return sz;
+ }
+
+ bool dictionary_size( const int sz )
+ {
+ if( sz >= min_dictionary_size && sz <= max_dictionary_size )
+ {
+ data[5] = real_bits( sz - 1 );
+ if( sz > min_dictionary_size )
+ {
+ const int base_size = 1 << data[5];
+ const int wedge = base_size / 16;
+ for( int i = 7; i >= 1; --i )
+ if( base_size - ( i * wedge ) >= sz )
+ { data[5] |= ( i << 5 ); break; }
+ }
+ return true;
+ }
+ return false;
+ }
+ };
+
+
+struct File_trailer
+ {
+ uint8_t data[20]; // 0-3 CRC32 of the uncompressed data
+ // 4-11 size of the uncompressed data
+ // 12-19 member size including header and trailer
+
+ enum { size = 20 };
+
+ unsigned data_crc() const
+ {
+ unsigned tmp = 0;
+ for( int i = 3; i >= 0; --i ) { tmp <<= 8; tmp += data[i]; }
+ return tmp;
+ }
+
+ void data_crc( unsigned crc )
+ { for( int i = 0; i <= 3; ++i ) { data[i] = (uint8_t)crc; crc >>= 8; } }
+
+ unsigned long long data_size() const
+ {
+ unsigned long long tmp = 0;
+ for( int i = 11; i >= 4; --i ) { tmp <<= 8; tmp += data[i]; }
+ return tmp;
+ }
+
+ void data_size( unsigned long long sz )
+ {
+ for( int i = 4; i <= 11; ++i ) { data[i] = (uint8_t)sz; sz >>= 8; }
+ }
+
+ unsigned long long member_size() const
+ {
+ unsigned long long tmp = 0;
+ for( int i = 19; i >= 12; --i ) { tmp <<= 8; tmp += data[i]; }
+ return tmp;
+ }
+
+ void member_size( unsigned long long sz )
+ {
+ for( int i = 12; i <= 19; ++i ) { data[i] = (uint8_t)sz; sz >>= 8; }
+ }
+ };
+
+
+// defined in compress.cc
+int readblock( const int fd, uint8_t * const buf, const int size );
+int writeblock( const int fd, const uint8_t * const buf, const int size );
+void xinit( pthread_mutex_t * const mutex );
+void xinit( pthread_cond_t * const cond );
+void xdestroy( pthread_mutex_t * const mutex );
+void xdestroy( pthread_cond_t * const cond );
+void xlock( pthread_mutex_t * const mutex );
+void xunlock( pthread_mutex_t * const mutex );
+void xwait( pthread_cond_t * const cond, pthread_mutex_t * const mutex );
+void xsignal( pthread_cond_t * const cond );
+void xbroadcast( pthread_cond_t * const cond );
+int compress( const int data_size, const int dictionary_size,
+ const int match_len_limit, int num_workers,
+ const int infd, const int outfd,
+ const Pretty_print & pp, const int debug_level );
+
+// defined in file_index.cc
+class File_index;
+
+// defined in dec_stdout.cc
+int dec_stdout( const int num_workers, const int infd, const int outfd,
+ const Pretty_print & pp, const int debug_level,
+ const File_index & file_index );
+
+// defined in dec_stream.cc
+int dec_stream( const int num_workers, const int infd, const int outfd,
+ const Pretty_print & pp, const int debug_level,
+ const bool testing );
+
+// defined in decompress.cc
+int preadblock( const int fd, uint8_t * const buf, const int size,
+ const long long pos );
+int pwriteblock( const int fd, const uint8_t * const buf, const int size,
+ const long long pos );
+int decompress_read_error( struct LZ_Decoder * const decoder,
+ const Pretty_print & pp, const int worker_id );
+int decompress( int num_workers, const int infd, const int outfd,
+ const Pretty_print & pp, const int debug_level,
+ const bool testing, const bool infd_isreg );
+
+// defined in main.cc
+extern int verbosity;
+void fatal( const int retval = 1 ); // terminate the program
+void show_error( const char * const msg, const int errcode = 0,
+ const bool help = false );
+void internal_error( const char * const msg );
+
+
+class Slot_tally
+ {
+ const int num_slots; // total slots
+ int num_free; // remaining free slots
+ pthread_mutex_t mutex;
+ pthread_cond_t slot_av; // free slot available
+
+ Slot_tally( const Slot_tally & ); // declared as private
+ void operator=( const Slot_tally & ); // declared as private
+
+public:
+ explicit Slot_tally( const int slots )
+ : num_slots( slots ), num_free( slots )
+ { xinit( &mutex ); xinit( &slot_av ); }
+
+ ~Slot_tally() { xdestroy( &slot_av ); xdestroy( &mutex ); }
+
+ bool all_free() { return ( num_free == num_slots ); }
+
+ void get_slot() // wait for a free slot
+ {
+ xlock( &mutex );
+ while( num_free <= 0 ) xwait( &slot_av, &mutex );
+ --num_free;
+ xunlock( &mutex );
+ }
+
+ void leave_slot() // return a slot to the tally
+ {
+ xlock( &mutex );
+ if( ++num_free == 1 ) xsignal( &slot_av ); // num_free was 0
+ xunlock( &mutex );
+ }
+
+ void leave_slots( const int slots ) // return slots to the tally
+ {
+ xlock( &mutex );
+ num_free += slots;
+ if( num_free == slots ) xsignal( &slot_av ); // num_free was 0
+ xunlock( &mutex );
+ }
+ };
diff --git a/main.cc b/main.cc
index 97de931..974a268 100644
--- a/main.cc
+++ b/main.cc
@@ -1,6 +1,6 @@
/* Plzip - A parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek.
- Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz.
+ Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -34,15 +34,30 @@
#include <string>
#include <vector>
#include <fcntl.h>
-#include <inttypes.h>
#include <pthread.h>
+#include <stdint.h>
#include <unistd.h>
#include <utime.h>
#include <sys/stat.h>
#include <lzlib.h>
+#if defined(__MSVCRT__)
+#include <io.h>
+#define fchmod(x,y) 0
+#define fchown(x,y,z) 0
+#define strtoull std::strtoul
+#define SIGHUP SIGTERM
+#define S_ISSOCK(x) 0
+#define S_IRGRP 0
+#define S_IWGRP 0
+#define S_IROTH 0
+#define S_IWOTH 0
+#endif
+#if defined(__OS2__)
+#include <io.h>
+#endif
#include "arg_parser.h"
-#include "plzip.h"
+#include "lzip.h"
#if CHAR_BIT != 8
#error "Environments where CHAR_BIT != 8 are not supported."
@@ -53,7 +68,7 @@ namespace {
const char * const Program_name = "Plzip";
const char * const program_name = "plzip";
-const char * const program_year = "2012";
+const char * const program_year = "2013";
const char * invocation_name = 0;
#ifdef O_BINARY
@@ -85,7 +100,7 @@ pthread_t main_thread;
pid_t main_thread_pid;
-void show_help()
+void show_help( const long num_online )
{
std::printf( "%s - A parallel compressor compatible with lzip.\n", Program_name );
std::printf( "\nUsage: %s [options] [files]\n", invocation_name );
@@ -99,7 +114,7 @@ void show_help()
" -F, --recompress force recompression of compressed files\n"
" -k, --keep keep (don't delete) input files\n"
" -m, --match-length=<bytes> set match length limit in bytes [36]\n"
- " -n, --threads=<n> set the number of (de)compression threads\n"
+ " -n, --threads=<n> set number of (de)compression threads [%ld]\n"
" -o, --output=<file> if reading stdin, place the output into <file>\n"
" -q, --quiet suppress all messages\n"
" -s, --dictionary-size=<bytes> set dictionary size limit in bytes [8MiB]\n"
@@ -107,7 +122,7 @@ void show_help()
" -v, --verbose be verbose (a 2nd -v gives more)\n"
" -1 .. -9 set compression level [default 6]\n"
" --fast alias for -1\n"
- " --best alias for -9\n" );
+ " --best alias for -9\n", num_online );
if( verbosity > 0 )
{
std::printf( " -D, --debug=<level> (0-1) print debug statistics to stderr\n" );
@@ -137,13 +152,13 @@ void show_version()
}
-long long getnum( const char * const ptr,
- const long long llimit = LLONG_MIN + 1,
- const long long ulimit = LLONG_MAX )
+unsigned long long getnum( const char * const ptr,
+ const unsigned long long llimit,
+ const unsigned long long ulimit )
{
errno = 0;
- char *tail;
- long long result = strtoll( ptr, &tail, 0 );
+ char * tail;
+ unsigned long long result = strtoull( ptr, &tail, 0 );
if( tail == ptr )
{
show_error( "Bad or missing numerical argument.", 0, true );
@@ -178,7 +193,7 @@ long long getnum( const char * const ptr,
}
for( int i = 0; i < exponent; ++i )
{
- if( LLONG_MAX / factor >= llabs( result ) ) result *= factor;
+ if( ulimit / factor >= result ) result *= factor;
else { errno = ERANGE; break; }
}
}
@@ -194,7 +209,7 @@ long long getnum( const char * const ptr,
int get_dict_size( const char * const arg )
{
- char *tail;
+ char * tail;
int bits = std::strtol( arg, &tail, 0 );
if( bits >= LZ_min_dictionary_bits() &&
bits <= LZ_max_dictionary_bits() && *tail == 0 )
@@ -240,7 +255,7 @@ int open_instream( const std::string & name, struct stat * const in_statsp,
else
{
const int i = fstat( infd, in_statsp );
- const mode_t & mode = in_statsp->st_mode;
+ const mode_t mode = in_statsp->st_mode;
const bool can_read = ( i == 0 &&
( S_ISBLK( mode ) || S_ISCHR( mode ) ||
S_ISFIFO( mode ) || S_ISSOCK( mode ) ) );
@@ -368,9 +383,9 @@ extern "C" void signal_handler( int sig )
{
if( !pthread_equal( pthread_self(), main_thread ) )
kill( main_thread_pid, sig );
- if( sig != SIGUSR1 )
+ if( sig != SIGUSR1 && sig != SIGUSR2 )
show_error( "Control-C or similar caught, quitting." );
- cleanup_and_fail( 1 );
+ cleanup_and_fail( ( sig != SIGUSR2 ) ? 1 : 2 );
}
@@ -391,7 +406,8 @@ int verbosity = 0;
// since they all call common helper functions that call fatal() in case
// of an error.
//
-void fatal() { signal_handler( SIGUSR1 ); }
+void fatal( const int retval )
+ { signal_handler( ( retval != 2 ) ? SIGUSR1 : SIGUSR2 ); }
void Pretty_print::operator()( const char * const msg ) const
@@ -402,7 +418,7 @@ void Pretty_print::operator()( const char * const msg ) const
{
first_post = false;
std::fprintf( stderr, " %s: ", name_.c_str() );
- for( unsigned int i = 0; i < longest_name - name_.size(); ++i )
+ for( unsigned i = 0; i < longest_name - name_.size(); ++i )
std::fprintf( stderr, " " );
if( !msg ) std::fflush( stderr );
}
@@ -422,7 +438,7 @@ void show_error( const char * const msg, const int errcode, const bool help )
std::fprintf( stderr, ": %s", std::strerror( errcode ) );
std::fprintf( stderr, "\n" );
}
- if( help && invocation_name && invocation_name[0] )
+ if( help )
std::fprintf( stderr, "Try '%s --help' for more information.\n",
invocation_name );
}
@@ -454,6 +470,9 @@ int main( const int argc, const char * const argv[] )
{ 3 << 23, 132 }, // -8
{ 1 << 25, 273 } }; // -9
Lzma_options encoder_options = option_mapping[6]; // default = "-6"
+ std::string input_filename;
+ std::string default_output_filename;
+ std::vector< std::string > filenames;
int data_size = 0;
int debug_level = 0;
int infd = -1;
@@ -463,9 +482,6 @@ int main( const int argc, const char * const argv[] )
bool keep_input_files = false;
bool recompress = false;
bool to_stdout = false;
- std::string input_filename;
- std::string default_output_filename;
- std::vector< std::string > filenames;
invocation_name = argv[0];
main_thread = pthread_self();
main_thread_pid = getpid();
@@ -473,6 +489,7 @@ int main( const int argc, const char * const argv[] )
if( LZ_version()[0] != LZ_version_string[0] )
internal_error( "bad library version" );
+ const long num_online = std::max( 1L, sysconf( _SC_NPROCESSORS_ONLN ) );
long max_workers = sysconf( _SC_THREAD_THREADS_MAX );
if( max_workers < 1 || max_workers > INT_MAX / (int)sizeof (pthread_t) )
max_workers = INT_MAX / sizeof (pthread_t);
@@ -521,7 +538,8 @@ int main( const int argc, const char * const argv[] )
const char * const arg = parser.argument( argind ).c_str();
switch( code )
{
- case '0': case '1': case '2': case '3': case '4':
+ case '0':
+ case '1': case '2': case '3': case '4':
case '5': case '6': case '7': case '8': case '9':
encoder_options = option_mapping[code-'0']; break;
case 'b': break;
@@ -532,7 +550,7 @@ int main( const int argc, const char * const argv[] )
case 'D': debug_level = getnum( arg, 0, 3 ); break;
case 'f': force = true; break;
case 'F': recompress = true; break;
- case 'h': show_help(); return 0;
+ case 'h': show_help( num_online ); return 0;
case 'k': keep_input_files = true; break;
case 'm': encoder_options.match_len_limit =
getnum( arg, LZ_min_match_len_limit(),
@@ -550,9 +568,9 @@ int main( const int argc, const char * const argv[] )
}
} // end process options
-#if defined(__OS2__)
- _fsetmode( stdin, "b" );
- _fsetmode( stdout, "b" );
+#if defined(__MSVCRT__) || defined(__OS2__)
+ setmode( STDIN_FILENO, O_BINARY );
+ setmode( STDOUT_FILENO, O_BINARY );
#endif
if( program_mode == m_test )
@@ -564,17 +582,13 @@ int main( const int argc, const char * const argv[] )
encoder_options.dictionary_size = std::max( data_size, LZ_min_dictionary_size() );
if( num_workers <= 0 )
- {
- long num_online = sysconf( _SC_NPROCESSORS_ONLN );
- if( num_online <= 0 ) num_online = 1;
num_workers = std::min( num_online, max_workers );
- }
bool filenames_given = false;
for( ; argind < parser.arguments(); ++argind )
{
- if( parser.argument( argind ) != "-" ) filenames_given = true;
filenames.push_back( parser.argument( argind ) );
+ if( filenames.back() != "-" ) filenames_given = true;
}
if( filenames.empty() ) filenames.push_back("-");
@@ -582,11 +596,12 @@ int main( const int argc, const char * const argv[] )
( filenames_given || default_output_filename.size() ) )
set_signals();
std::signal( SIGUSR1, signal_handler );
+ std::signal( SIGUSR2, signal_handler );
Pretty_print pp( filenames );
int retval = 0;
- for( unsigned int i = 0; i < filenames.size(); ++i )
+ for( unsigned i = 0; i < filenames.size(); ++i )
{
struct stat in_stats;
output_filename.clear();
@@ -607,7 +622,7 @@ int main( const int argc, const char * const argv[] )
outfd_mode = all_rw;
if( !open_outstream( force ) )
{
- if( outfd == -1 && retval < 1 ) retval = 1;
+ if( retval < 1 ) retval = 1;
close( infd ); infd = -1;
continue;
}
@@ -632,7 +647,7 @@ int main( const int argc, const char * const argv[] )
outfd_mode = usr_rw;
if( !open_outstream( force ) )
{
- if( outfd == -1 && retval < 1 ) retval = 1;
+ if( retval < 1 ) retval = 1;
close( infd ); infd = -1;
continue;
}
@@ -645,16 +660,17 @@ int main( const int argc, const char * const argv[] )
if( output_filename.size() && !to_stdout && program_mode != m_test )
delete_output_on_interrupt = true;
const struct stat * const in_statsp = input_filename.size() ? &in_stats : 0;
+ const bool infd_isreg = in_statsp && S_ISREG( in_statsp->st_mode );
pp.set_name( input_filename );
if( verbosity >= 1 ) pp();
- int tmp = 0;
+ int tmp;
if( program_mode == m_compress )
tmp = compress( data_size, encoder_options.dictionary_size,
encoder_options.match_len_limit,
num_workers, infd, outfd, pp, debug_level );
else
tmp = decompress( num_workers, infd, outfd, pp, debug_level,
- program_mode == m_test );
+ program_mode == m_test, infd_isreg );
if( tmp > retval ) retval = tmp;
if( tmp && program_mode != m_test ) cleanup_and_fail( retval );
diff --git a/plzip.h b/plzip.h
deleted file mode 100644
index f9aed10..0000000
--- a/plzip.h
+++ /dev/null
@@ -1,141 +0,0 @@
-/* Plzip - A parallel compressor compatible with lzip
- Copyright (C) 2009 Laszlo Ersek.
- Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz.
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef LLONG_MAX
-#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL
-#endif
-#ifndef LLONG_MIN
-#define LLONG_MIN (-LLONG_MAX - 1LL)
-#endif
-#ifndef ULLONG_MAX
-#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL
-#endif
-
-
-class Pretty_print
- {
- const char * const stdin_name;
- unsigned int longest_name;
- std::string name_;
- mutable bool first_post;
-
-public:
- explicit Pretty_print( const std::vector< std::string > & filenames )
- : stdin_name( "(stdin)" ), longest_name( 0 ), first_post( false )
- {
- const unsigned int stdin_name_len = std::strlen( stdin_name );
- for( unsigned int i = 0; i < filenames.size(); ++i )
- {
- const std::string & s = filenames[i];
- const unsigned int len = ( ( s == "-" ) ? stdin_name_len : s.size() );
- if( len > longest_name ) longest_name = len;
- }
- if( longest_name == 0 ) longest_name = stdin_name_len;
- }
-
- void set_name( const std::string & filename )
- {
- if( filename.size() && filename != "-" ) name_ = filename;
- else name_ = stdin_name;
- first_post = true;
- }
-
- void reset() const { if( name_.size() ) first_post = true; }
- const char * name() const { return name_.c_str(); }
- void operator()( const char * const msg = 0 ) const;
- };
-
-
-/*--------------------- Defined in compress.cc ---------------------*/
-
-int readblock( const int fd, uint8_t * const buf, const int size );
-int writeblock( const int fd, const uint8_t * const buf, const int size );
-void xinit( pthread_mutex_t * const mutex );
-void xinit( pthread_cond_t * const cond );
-void xdestroy( pthread_mutex_t * const mutex );
-void xdestroy( pthread_cond_t * const cond );
-void xlock( pthread_mutex_t * const mutex );
-void xunlock( pthread_mutex_t * const mutex );
-void xwait( pthread_cond_t * const cond, pthread_mutex_t * const mutex );
-void xsignal( pthread_cond_t * const cond );
-void xbroadcast( pthread_cond_t * const cond );
-
-int compress( const int data_size, const int dictionary_size,
- const int match_len_limit, const int num_workers,
- const int infd, const int outfd,
- const Pretty_print & pp, const int debug_level );
-
-
-/*-------------------- Defined in decompress.cc --------------------*/
-
-int decompress( const int num_workers, const int infd, const int outfd,
- const Pretty_print & pp, const int debug_level,
- const bool testing );
-
-
-/*----------------------- Defined in main.cc -----------------------*/
-
-extern int verbosity;
-
-void fatal(); // terminate the program
-
-void show_error( const char * const msg, const int errcode = 0, const bool help = false );
-void internal_error( const char * const msg );
-
-
-class Slot_tally
- {
-public:
- unsigned long check_counter;
- unsigned long wait_counter;
-private:
- const int num_slots; // total slots
- int num_free; // remaining free slots
- pthread_mutex_t mutex;
- pthread_cond_t slot_av; // free slot available
-
- Slot_tally( const Slot_tally & ); // declared as private
- void operator=( const Slot_tally & ); // declared as private
-
-public:
- explicit Slot_tally( const int slots )
- : check_counter( 0 ), wait_counter( 0 ),
- num_slots( slots ), num_free( slots )
- { xinit( &mutex ); xinit( &slot_av ); }
-
- ~Slot_tally() { xdestroy( &slot_av ); xdestroy( &mutex ); }
-
- bool all_free() { return ( num_free == num_slots ); }
-
- void get_slot() // wait for a free slot
- {
- xlock( &mutex );
- ++check_counter;
- while( num_free <= 0 )
- { ++wait_counter; xwait( &slot_av, &mutex ); ++check_counter; }
- --num_free;
- xunlock( &mutex );
- }
-
- void leave_slot() // return a slot to the tally
- {
- xlock( &mutex );
- if( ++num_free == 1 ) xsignal( &slot_av );
- xunlock( &mutex );
- }
- };
diff --git a/testsuite/check.sh b/testsuite/check.sh
index 8b6bf2d..a044738 100755
--- a/testsuite/check.sh
+++ b/testsuite/check.sh
@@ -1,6 +1,6 @@
#! /bin/sh
# check script for Plzip - A parallel compressor compatible with lzip
-# Copyright (C) 2009, 2010 Antonio Diaz Diaz.
+# Copyright (C) 2009, 2010, 2011, 2012, 2013 Antonio Diaz Diaz.
#
# This script is free software: you have unlimited permission
# to copy, distribute and modify it.
@@ -22,32 +22,36 @@ mkdir tmp
cd "${objdir}"/tmp
cat "${testdir}"/test.txt > in || framework_failure
-cat in in in in > in4 || framework_failure
+in_lz="${testdir}"/test.txt.lz
fail=0
printf "testing plzip-%s..." "$2"
-"${LZIP}" -t "${testdir}"/test_v0.lz || fail=1
-"${LZIP}" -cd "${testdir}"/test_v0.lz > copy || fail=1
-cmp in copy || fail=1
-printf .
-
-"${LZIP}" -t "${testdir}"/test_v1.lz || fail=1
-"${LZIP}" -cd "${testdir}"/test_v1.lz > copy || fail=1
+"${LZIP}" -t "${in_lz}" || fail=1
+"${LZIP}" -cd "${in_lz}" > copy || fail=1
cmp in copy || fail=1
printf .
-"${LZIP}" -cfq "${testdir}"/test_v1.lz > out
+"${LZIP}" -cfq "${in_lz}" > out
if [ $? != 1 ] ; then fail=1 ; printf - ; else printf . ; fi
-"${LZIP}" -cF "${testdir}"/test_v1.lz > out || fail=1
+"${LZIP}" -cF "${in_lz}" > out || fail=1
"${LZIP}" -cd out | "${LZIP}" -d > copy || fail=1
cmp in copy || fail=1
printf .
+"${LZIP}" -cqs-1 in > out
+if [ $? != 1 ] ; then fail=1 ; printf - ; else printf . ; fi
+"${LZIP}" -cqs0 in > out
+if [ $? != 1 ] ; then fail=1 ; printf - ; else printf . ; fi
+"${LZIP}" -cqs4095 in > out
+if [ $? != 1 ] ; then fail=1 ; printf - ; else printf . ; fi
+"${LZIP}" -cqm274 in > out
+if [ $? != 1 ] ; then fail=1 ; printf - ; else printf . ; fi
+
for i in s4Ki 0 1 2 3 4 5 6 7 8 9 ; do
"${LZIP}" -k -$i in || fail=1
mv -f in.lz copy.lz || fail=1
-# printf "garbage" >> copy.lz || fail=1
+ printf "garbage" >> copy.lz || fail=1
"${LZIP}" -df copy.lz || fail=1
cmp in copy || fail=1
printf .
@@ -55,7 +59,7 @@ done
for i in s4Ki 0 1 2 3 4 5 6 7 8 9 ; do
"${LZIP}" -c -$i in > out || fail=1
-# printf "g" >> out || fail=1
+ printf "g" >> out || fail=1
"${LZIP}" -cd out > copy || fail=1
cmp in copy || fail=1
printf .
@@ -63,6 +67,7 @@ done
for i in s4Ki 0 1 2 3 4 5 6 7 8 9 ; do
"${LZIP}" -$i < in > out || fail=1
+ printf "garbage" >> out || fail=1
"${LZIP}" -d < out > copy || fail=1
cmp in copy || fail=1
printf .
@@ -70,22 +75,59 @@ done
for i in s4Ki 0 1 2 3 4 5 6 7 8 9 ; do
"${LZIP}" -f -$i -o out < in || fail=1
+ printf "g" >> out.lz || fail=1
"${LZIP}" -df -o copy < out.lz || fail=1
cmp in copy || fail=1
printf .
done
+"${LZIP}" < in > anyothername || fail=1
+"${LZIP}" -d anyothername || fail=1
+cmp in anyothername.out || fail=1
+printf .
+
+cat in in in in > in4 || framework_failure
+for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ; do
+ "${LZIP}" -c -s4Ki -B8Ki -n$i in4 > out4.lz || fail=1
+ printf "g" >> out4.lz || fail=1
+ "${LZIP}" -cd -n$i out4.lz > copy4 || fail=1
+ cmp in4 copy4 || fail=1
+ "${LZIP}" -d -n$i out4.lz || fail=1
+ cmp in4 out4 || fail=1
+ rm -f out4
+ printf .
+done
+
for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ; do
"${LZIP}" -s4Ki -B8Ki -n$i < in4 > out4 || fail=1
+ printf "g" >> out4 || fail=1
"${LZIP}" -d -n$i < out4 > copy4 || fail=1
cmp in4 copy4 || fail=1
printf .
done
-"${LZIP}" < in > anyothername || fail=1
-"${LZIP}" -d anyothername || fail=1
-cmp in anyothername.out || fail=1
+cat "${in_lz}" > ingin.lz || framework_failure
+printf "g" >> ingin.lz || framework_failure
+cat "${in_lz}" >> ingin.lz || framework_failure
+"${LZIP}" -tq ingin.lz
+if [ $? != 2 ] ; then fail=1 ; printf - ; else printf . ; fi
+"${LZIP}" -cdq ingin.lz > out
+if [ $? != 2 ] ; then fail=1 ; printf - ; else printf . ; fi
+"${LZIP}" -t < ingin.lz || fail=1
printf .
+"${LZIP}" -d < ingin.lz > copy || fail=1
+cmp in copy || fail=1
+printf .
+
+dd if="${in_lz}" bs=1024 count=10 > trunc.lz 2> /dev/null || framework_failure
+"${LZIP}" -tq trunc.lz
+if [ $? != 2 ] ; then fail=1 ; printf - ; else printf . ; fi
+"${LZIP}" -cdq trunc.lz > out
+if [ $? != 2 ] ; then fail=1 ; printf - ; else printf . ; fi
+"${LZIP}" -tq < trunc.lz
+if [ $? != 2 ] ; then fail=1 ; printf - ; else printf . ; fi
+"${LZIP}" -dq < trunc.lz > out
+if [ $? != 2 ] ; then fail=1 ; printf - ; else printf . ; fi
echo
if [ ${fail} = 0 ] ; then
diff --git a/testsuite/test.txt.lz b/testsuite/test.txt.lz
new file mode 100644
index 0000000..4db881a
--- /dev/null
+++ b/testsuite/test.txt.lz
Binary files differ
diff --git a/testsuite/test_v0.lz b/testsuite/test_v0.lz
deleted file mode 100644
index a09b1e8..0000000
--- a/testsuite/test_v0.lz
+++ /dev/null
Binary files differ
diff --git a/testsuite/test_v1.lz b/testsuite/test_v1.lz
deleted file mode 100644
index f1c79eb..0000000
--- a/testsuite/test_v1.lz
+++ /dev/null
Binary files differ