summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniel Baumann <mail@daniel-baumann.ch>2015-11-07 15:11:56 +0000
committerDaniel Baumann <mail@daniel-baumann.ch>2015-11-07 15:11:56 +0000
commitf6e0a844e295b58ae820dde5f12c557dc42c3624 (patch)
tree65ebb18debbbca91c7fb99853065c08f34cf0850
parentAdding debian version 0.3-1. (diff)
downloadplzip-f6e0a844e295b58ae820dde5f12c557dc42c3624.tar.xz
plzip-f6e0a844e295b58ae820dde5f12c557dc42c3624.zip
Merging upstream version 0.4.
Signed-off-by: Daniel Baumann <mail@daniel-baumann.ch>
-rw-r--r--ChangeLog7
-rw-r--r--Makefile.in27
-rw-r--r--NEWS13
-rw-r--r--README6
-rw-r--r--compress.cc460
-rwxr-xr-xconfigure10
-rw-r--r--decompress.cc123
-rw-r--r--doc/plzip.15
-rw-r--r--doc/plzip.info28
-rw-r--r--doc/plzip.texinfo18
-rw-r--r--main.cc128
-rw-r--r--plzip.cc548
-rw-r--r--plzip.h102
-rwxr-xr-xtestsuite/check.sh2
14 files changed, 750 insertions, 727 deletions
diff --git a/ChangeLog b/ChangeLog
index b0ef7eb..194ad70 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,10 @@
+2010-01-31 Antonio Diaz Diaz <ant_diaz@teleline.es>
+
+ * Version 0.4 released.
+ * main.cc (show_version): Show the version of lzlib being used.
+ * Code reorganization. Class Packet_courier now coordinates data
+ movement and synchronization among threads.
+
2010-01-24 Antonio Diaz Diaz <ant_diaz@teleline.es>
* Version 0.3 released.
diff --git a/Makefile.in b/Makefile.in
index 6eea4bf..ca6b899 100644
--- a/Makefile.in
+++ b/Makefile.in
@@ -7,7 +7,7 @@ INSTALL_DIR = $(INSTALL) -d -m 755
LIBS = -llz -lpthread
SHELL = /bin/sh
-objs = arg_parser.o plzip.o main.o
+objs = arg_parser.o compress.o decompress.o main.o
.PHONY : all install install-info install-man install-strip \
@@ -30,8 +30,9 @@ main.o : main.cc
$(objs) : Makefile
arg_parser.o : arg_parser.h
+compress.o : plzip.h
+decompress.o : plzip.h
main.o : arg_parser.h plzip.h
-plzip.o : plzip.h
doc : info man
@@ -53,30 +54,30 @@ check : all
@$(VPATH)/testsuite/check.sh $(VPATH)/testsuite
install : all install-info install-man
- if [ ! -d $(DESTDIR)$(bindir) ] ; then $(INSTALL_DIR) $(DESTDIR)$(bindir) ; fi
- $(INSTALL_PROGRAM) ./$(progname) $(DESTDIR)$(bindir)/$(progname)
+ if [ ! -d "$(DESTDIR)$(bindir)" ] ; then $(INSTALL_DIR) "$(DESTDIR)$(bindir)" ; fi
+ $(INSTALL_PROGRAM) ./$(progname) "$(DESTDIR)$(bindir)/$(progname)"
install-info :
- if [ ! -d $(DESTDIR)$(infodir) ] ; then $(INSTALL_DIR) $(DESTDIR)$(infodir) ; fi
- $(INSTALL_DATA) $(VPATH)/doc/$(pkgname).info $(DESTDIR)$(infodir)/$(pkgname).info
- -install-info --info-dir=$(DESTDIR)$(infodir) $(DESTDIR)$(infodir)/$(pkgname).info
+ if [ ! -d "$(DESTDIR)$(infodir)" ] ; then $(INSTALL_DIR) "$(DESTDIR)$(infodir)" ; fi
+ $(INSTALL_DATA) $(VPATH)/doc/$(pkgname).info "$(DESTDIR)$(infodir)/$(pkgname).info"
+ -install-info --info-dir="$(DESTDIR)$(infodir)" "$(DESTDIR)$(infodir)/$(pkgname).info"
install-man :
- if [ ! -d $(DESTDIR)$(mandir)/man1 ] ; then $(INSTALL_DIR) $(DESTDIR)$(mandir)/man1 ; fi
- $(INSTALL_DATA) $(VPATH)/doc/$(progname).1 $(DESTDIR)$(mandir)/man1/$(progname).1
+ if [ ! -d "$(DESTDIR)$(mandir)/man1" ] ; then $(INSTALL_DIR) "$(DESTDIR)$(mandir)/man1" ; fi
+ $(INSTALL_DATA) $(VPATH)/doc/$(progname).1 "$(DESTDIR)$(mandir)/man1/$(progname).1"
install-strip : all
$(MAKE) INSTALL_PROGRAM='$(INSTALL_PROGRAM) -s' install
uninstall : uninstall-info uninstall-man
- -rm -f $(DESTDIR)$(bindir)/$(progname)
+ -rm -f "$(DESTDIR)$(bindir)/$(progname)"
uninstall-info :
- -install-info --info-dir=$(DESTDIR)$(infodir) --remove $(DESTDIR)$(infodir)/$(pkgname).info
- -rm -f $(DESTDIR)$(infodir)/$(pkgname).info
+ -install-info --info-dir="$(DESTDIR)$(infodir)" --remove "$(DESTDIR)$(infodir)/$(pkgname).info"
+ -rm -f "$(DESTDIR)$(infodir)/$(pkgname).info"
uninstall-man :
- -rm -f $(DESTDIR)$(mandir)/man1/$(progname).1
+ -rm -f "$(DESTDIR)$(mandir)/man1/$(progname).1"
dist : doc
ln -sf $(VPATH) $(DISTNAME)
diff --git a/NEWS b/NEWS
index 08b0074..4e4b50c 100644
--- a/NEWS
+++ b/NEWS
@@ -1,11 +1,6 @@
-Changes in version 0.3:
+Changes in version 0.4:
-New option "--data-size" has been added.
+The option "--version" now shows the version of lzlib being used.
-Output file is now removed if plzip is interrupted.
-
-This version automatically chooses the smallest possible dictionary size
-for each member during compression, saving memory during decompression.
-
-Regular files are now open in binary mode in non-POSIX platforms
-defining the O_BINARY macro.
+A code reorganization has been made. The new class "Packet_courier" now
+coordinates data movement and synchronization among threads.
diff --git a/README b/README
index c9b0c03..1f5b745 100644
--- a/README
+++ b/README
@@ -1,6 +1,10 @@
Description
-Plzip is a parallel version of the lzip data compressor. Currently only
+Plzip is a massively parallel (multithreaded) data compressor compatible
+with the lzip file format. The files produced by plzip are fully
+compatible with lzip-1.4 or newer. Plzip is intended for faster
+compression/decompression of big files on multiprocessor machines. On
+files big enough, plzip can use hundreds of processors. Currently only
compression is performed in parallel. Parallel decompression is planned
to be implemented later.
diff --git a/compress.cc b/compress.cc
new file mode 100644
index 0000000..e66b537
--- /dev/null
+++ b/compress.cc
@@ -0,0 +1,460 @@
+/* Plzip - A parallel compressor compatible with lzip
+ Copyright (C) 2009 Laszlo Ersek.
+ Copyright (C) 2009, 2010 Antonio Diaz Diaz.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#define _FILE_OFFSET_BITS 64
+
+#include <algorithm>
+#include <cassert>
+#include <cerrno>
+#include <climits>
+#include <csignal>
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <queue>
+#include <string>
+#include <vector>
+#include <pthread.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <lzlib.h>
+
+#include "plzip.h"
+
+#ifndef LLONG_MAX
+#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL
+#endif
+#ifndef LLONG_MIN
+#define LLONG_MIN (-LLONG_MAX - 1LL)
+#endif
+#ifndef ULLONG_MAX
+#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL
+#endif
+
+
+void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex )
+ {
+ int ret = pthread_mutex_init( mutex, 0 );
+ if( ret != 0 ) { show_error( "pthread_mutex_init", ret ); fatal(); }
+
+ ret = pthread_cond_init( cond, 0 );
+ if( ret != 0 ) { show_error( "pthread_cond_init", ret ); fatal(); }
+ }
+
+
+void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex )
+ {
+ int ret = pthread_cond_destroy( cond );
+ if( ret != 0 ) { show_error( "pthread_cond_destroy", ret ); fatal(); }
+
+ ret = pthread_mutex_destroy( mutex );
+ if( ret != 0 ) { show_error( "pthread_mutex_destroy", ret ); fatal(); }
+ }
+
+
+void xlock( pthread_mutex_t * mutex )
+ {
+ int ret = pthread_mutex_lock( mutex );
+ if( ret != 0 ) { show_error( "pthread_mutex_lock", ret ); fatal(); }
+ }
+
+
+void xunlock( pthread_mutex_t * mutex )
+ {
+ int ret = pthread_mutex_unlock( mutex );
+ if( ret != 0 ) { show_error( "pthread_mutex_unlock", ret ); fatal(); }
+ }
+
+
+void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex )
+ {
+ int ret = pthread_cond_wait( cond, mutex );
+ if( ret != 0 ) { show_error( "pthread_cond_wait", ret ); fatal(); }
+ }
+
+
+void xsignal( pthread_cond_t * cond )
+ {
+ int ret = pthread_cond_signal( cond );
+ if( ret != 0 ) { show_error( "pthread_cond_signal", ret ); fatal(); }
+ }
+
+
+void xbroadcast( pthread_cond_t * cond )
+ {
+ int ret = pthread_cond_broadcast( cond );
+ if( ret != 0 ) { show_error( "pthread_cond_broadcast", ret ); fatal(); }
+ }
+
+
+void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg )
+ {
+ int ret = pthread_create( thread, 0, routine, arg );
+ if( ret != 0 ) { show_error( "pthread_create", ret ); fatal(); }
+ }
+
+
+void xjoin( pthread_t thread )
+ {
+ int ret = pthread_join( thread, 0 );
+ if( ret != 0 ) { show_error( "pthread_join", ret ); fatal(); }
+ }
+
+
+namespace {
+
+long long in_size = 0;
+long long out_size = 0;
+
+
+struct Packet // data block with a serial number
+ {
+ unsigned long long id; // serial number assigned as received
+ int size; // # of bytes in data
+ uint8_t * data;
+ };
+
+
+class Packet_courier // moves packets around
+ {
+public:
+ unsigned long icheck_counter;
+ unsigned long iwait_counter;
+ unsigned long ocheck_counter;
+ unsigned long owait_counter;
+private:
+ unsigned long long receive_id; // id assigned to next packet received
+ unsigned long long deliver_id; // id of next packet to be delivered
+ Slot_tally slot_tally;
+ std::queue< Packet * > packet_queue;
+ std::vector< Packet * > circular_buffer;
+ int num_working; // Number of workers still running
+ const int num_slots; // max packets in circulation
+ pthread_mutex_t imutex;
+ pthread_cond_t iav_or_eof; // input packet available or splitter done
+ pthread_mutex_t omutex;
+ pthread_cond_t oav_or_exit; // output packet available or all workers exited
+ bool eof; // splitter done
+
+public:
+ Packet_courier( const int num_workers, const int slots )
+ : icheck_counter( 0 ), iwait_counter( 0 ),
+ ocheck_counter( 0 ), owait_counter( 0 ),
+ receive_id( 0 ), deliver_id( 0 ),
+ slot_tally( slots ), circular_buffer( slots, (Packet *) 0 ),
+ num_working( num_workers ), num_slots( slots ), eof( false )
+ { xinit( &iav_or_eof, &imutex ); xinit( &oav_or_exit, &omutex ); }
+
+ ~Packet_courier()
+ { xdestroy( &iav_or_eof, &imutex ); xdestroy( &oav_or_exit, &omutex ); }
+
+ // make a packet with data received from splitter
+ void receive_packet( uint8_t * const data, const int size )
+ {
+ Packet * ipacket = new Packet;
+ ipacket->id = receive_id++;
+ ipacket->size = size;
+ ipacket->data = data;
+ slot_tally.get_slot(); // wait for a free slot
+ xlock( &imutex );
+ packet_queue.push( ipacket );
+ xsignal( &iav_or_eof );
+ xunlock( &imutex );
+ }
+
+ // distribute a packet to a worker
+ Packet * distribute_packet()
+ {
+ Packet * ipacket = 0;
+ xlock( &imutex );
+ ++icheck_counter;
+ while( packet_queue.empty() && !eof )
+ {
+ ++iwait_counter;
+ xwait( &iav_or_eof, &imutex );
+ }
+ if( !packet_queue.empty() )
+ {
+ ipacket = packet_queue.front();
+ packet_queue.pop();
+ }
+ xunlock( &imutex );
+ if( ipacket == 0 )
+ {
+ // Notify muxer when last worker exits
+ xlock( &omutex );
+ if( --num_working == 0 )
+ xsignal( &oav_or_exit );
+ xunlock( &omutex );
+ }
+ return ipacket;
+ }
+
+ // collect a packet from a worker
+ void collect_packet( Packet * const opacket )
+ {
+ xlock( &omutex );
+ // id collision shouldn't happen
+ assert( circular_buffer[opacket->id%num_slots] == 0 );
+ // Merge packet into circular buffer
+ circular_buffer[opacket->id%num_slots] = opacket;
+ if( opacket->id == deliver_id ) xsignal( &oav_or_exit );
+ xunlock( &omutex );
+ }
+
+ // deliver a packet to muxer
+ Packet * deliver_packet()
+ {
+ xlock( &omutex );
+ ++ocheck_counter;
+ while( circular_buffer[deliver_id%num_slots] == 0 && num_working > 0 )
+ {
+ ++owait_counter;
+ xwait( &oav_or_exit, &omutex );
+ }
+ Packet * opacket = circular_buffer[deliver_id%num_slots];
+ circular_buffer[deliver_id%num_slots] = 0;
+ ++deliver_id;
+ xunlock( &omutex );
+ if( opacket != 0 )
+ slot_tally.leave_slot(); // return a slot to the tally
+ return opacket;
+ }
+
+ void finish() // splitter has no more packets to send
+ {
+ xlock( &imutex );
+ eof = true;
+ xbroadcast( &iav_or_eof );
+ xunlock( &imutex );
+ }
+
+ bool finished() // all packets delivered to muxer
+ {
+ if( !slot_tally.all_free() || !eof || !packet_queue.empty() ||
+ num_working != 0 ) return false;
+ for( int i = 0; i < num_slots; ++i )
+ if( circular_buffer[i] != 0 ) return false;
+ return true;
+ }
+
+ const Slot_tally & tally() const { return slot_tally; }
+ };
+
+
+struct Splitter_arg
+ {
+ Packet_courier * courier;
+ int infd;
+ int data_size;
+ };
+
+
+ // split data from input file into chunks and pass them to
+ // courier for packaging and distribution to workers.
+void * splitter( void * arg )
+ {
+ const Splitter_arg & tmp = *(Splitter_arg *)arg;
+ Packet_courier & courier = *tmp.courier;
+ const int infd = tmp.infd;
+ const int data_size = tmp.data_size;
+
+ for( bool first_post = true; ; first_post = false )
+ {
+ uint8_t * data = new( std::nothrow ) uint8_t[data_size];
+ if( data == 0 ) { show_error( "not enough memory" ); fatal(); }
+ const int size = readblock( infd, data, data_size );
+ if( size != data_size && errno ) { show_error( "read", errno ); fatal(); }
+
+ if( size > 0 || first_post ) // first packet can be empty
+ {
+ in_size += size;
+ courier.receive_packet( data, size );
+ }
+ else
+ {
+ delete[] data;
+ courier.finish(); // no more packets to send
+ break;
+ }
+ }
+ return 0;
+ }
+
+
+struct Worker_arg
+ {
+ int dictionary_size;
+ int match_len_limit;
+ Packet_courier * courier;
+ };
+
+
+ // get packets from courier, replace their contents, and return
+ // them to courier.
+void * worker( void * arg )
+ {
+ const Worker_arg & tmp = *(Worker_arg *)arg;
+ const int dictionary_size = tmp.dictionary_size;
+ const int match_len_limit = tmp.match_len_limit;
+ Packet_courier & courier = *tmp.courier;
+
+ while( true )
+ {
+ Packet * packet = courier.distribute_packet();
+ if( packet == 0 ) break; // no more packets to process
+
+ const int compr_size = 42 + packet->size + ( ( packet->size + 7 ) / 8 );
+ uint8_t * const new_data = new( std::nothrow ) uint8_t[compr_size];
+ if( new_data == 0 ) { show_error( "not enough memory" ); fatal(); }
+ const int dict_size = std::max( LZ_min_dictionary_size(),
+ std::min( dictionary_size, packet->size ) );
+ LZ_Encoder * const encoder =
+ LZ_compress_open( dict_size, match_len_limit, LLONG_MAX );
+ if( !encoder || LZ_compress_errno( encoder ) != LZ_ok )
+ { show_error( "LZ_compress_open failed." ); fatal(); }
+
+ int written = 0;
+ int new_size = 0;
+ while( true )
+ {
+ if( LZ_compress_write_size( encoder ) > 0 )
+ {
+ if( written < packet->size )
+ {
+ const int wr = LZ_compress_write( encoder, packet->data + written,
+ packet->size - written );
+ if( wr < 0 ) { show_error( "LZ_compress_write failed." ); fatal(); }
+ written += wr;
+ }
+ if( written >= packet->size ) LZ_compress_finish( encoder );
+ }
+ const int rd = LZ_compress_read( encoder, new_data + new_size,
+ compr_size - new_size );
+ if( rd < 0 ) { show_error( "LZ_compress_read failed." ); fatal(); }
+ new_size += rd;
+ assert( new_size <= compr_size );
+ if( LZ_compress_finished( encoder ) == 1 ) break;
+ }
+
+ if( LZ_compress_close( encoder ) < 0 )
+ { show_error( "LZ_compress_close failed." ); fatal(); }
+
+ delete[] packet->data;
+ packet->size = new_size;
+ packet->data = new_data;
+ courier.collect_packet( packet );
+ }
+ return 0;
+ }
+
+
+ // get from courier the processed and sorted packets, and write
+ // their contents to the output file.
+void muxer( Packet_courier & courier, const int outfd )
+ {
+ while( true )
+ {
+ Packet * opacket = courier.deliver_packet();
+ if( opacket == 0 ) break; // queue is empty. all workers exited
+
+ out_size += opacket->size;
+
+ if( outfd >= 0 )
+ {
+ const int wr = writeblock( outfd, opacket->data, opacket->size );
+ if( wr != opacket->size )
+ { show_error( "write", errno ); fatal(); }
+ }
+ delete[] opacket->data;
+ delete opacket;
+ }
+ }
+
+} // end namespace
+
+
+ // init the courier, then start the splitter and the workers and
+ // call the muxer.
+int compress( const int data_size, const int dictionary_size,
+ const int match_len_limit, const int num_workers,
+ const int num_slots, const int infd, const int outfd,
+ const int debug_level )
+ {
+ in_size = 0;
+ out_size = 0;
+ Packet_courier courier( num_workers, num_slots );
+
+ Splitter_arg splitter_arg;
+ splitter_arg.courier = &courier;
+ splitter_arg.infd = infd;
+ splitter_arg.data_size = data_size;
+
+ pthread_t splitter_thread;
+ xcreate( &splitter_thread, splitter, &splitter_arg );
+
+ Worker_arg worker_arg;
+ worker_arg.dictionary_size = dictionary_size;
+ worker_arg.match_len_limit = match_len_limit;
+ worker_arg.courier = &courier;
+
+ pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
+ if( worker_threads == 0 )
+ { show_error( "not enough memory" ); fatal(); }
+ for( int i = 0; i < num_workers; ++i )
+ xcreate( &worker_threads[i], worker, &worker_arg );
+
+ muxer( courier, outfd );
+
+ for( int i = num_workers - 1; i >= 0; --i )
+ xjoin(worker_threads[i]);
+ delete[] worker_threads; worker_threads = 0;
+
+ xjoin( splitter_thread );
+
+ if( verbosity >= 1 )
+ {
+ if( in_size <= 0 || out_size <= 0 )
+ std::fprintf( stderr, "no data compressed.\n" );
+ else
+ std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, "
+ "%5.2f%% saved, %lld in, %lld out.\n",
+ (double)in_size / out_size,
+ ( 8.0 * out_size ) / in_size,
+ 100.0 * ( 1.0 - ( (double)out_size / in_size ) ),
+ in_size, out_size );
+ }
+
+ if( debug_level & 1 )
+ std::fprintf( stderr,
+ "splitter tried to send a packet %8lu times\n"
+ "splitter had to wait %8lu times\n"
+ "any worker tried to consume from splitter %8lu times\n"
+ "any worker had to wait %8lu times\n"
+ "muxer tried to consume from workers %8lu times\n"
+ "muxer had to wait %8lu times\n",
+ courier.tally().check_counter,
+ courier.tally().wait_counter,
+ courier.icheck_counter,
+ courier.iwait_counter,
+ courier.ocheck_counter,
+ courier.owait_counter );
+
+ assert( courier.finished() );
+ return 0;
+ }
diff --git a/configure b/configure
index 3cd1dc8..e9ccd08 100755
--- a/configure
+++ b/configure
@@ -1,16 +1,16 @@
#! /bin/sh
-# configure script for Plzip - A parallel version of the lzip data compressor
+# configure script for Plzip - A parallel compressor compatible with lzip
# Copyright (C) 2009, 2010 Antonio Diaz Diaz.
#
# This configure script is free software: you have unlimited permission
# to copy, distribute and modify it.
#
-# Date of this version: 2010-01-24
+# Date of this version: 2010-01-31
args=
no_create=
pkgname=plzip
-pkgversion=0.3
+pkgversion=0.4
progname=plzip
srctrigger=plzip.h
@@ -135,7 +135,7 @@ if [ -z "${CXX}" ] ; then # Let the user override the test.
fi
echo
-if [ -z ${no_create} ] ; then
+if [ -z "${no_create}" ] ; then
echo "creating config.status"
rm -f config.status
cat > config.status << EOF
@@ -166,7 +166,7 @@ echo "CXXFLAGS = ${CXXFLAGS}"
echo "LDFLAGS = ${LDFLAGS}"
rm -f Makefile
cat > Makefile << EOF
-# Makefile for Plzip - A parallel version of the lzip data compressor
+# Makefile for Plzip - A parallel compressor compatible with lzip
# Copyright (C) 2009, 2010 Antonio Diaz Diaz.
# This file was generated automatically by configure. Do not edit.
#
diff --git a/decompress.cc b/decompress.cc
new file mode 100644
index 0000000..0a125d0
--- /dev/null
+++ b/decompress.cc
@@ -0,0 +1,123 @@
+/* Plzip - A parallel compressor compatible with lzip
+ Copyright (C) 2009 Laszlo Ersek.
+ Copyright (C) 2009, 2010 Antonio Diaz Diaz.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#define _FILE_OFFSET_BITS 64
+
+#include <algorithm>
+#include <cerrno>
+#include <climits>
+#include <csignal>
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <queue>
+#include <string>
+#include <vector>
+#include <pthread.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <lzlib.h>
+
+#include "plzip.h"
+
+
+namespace {
+
+int do_decompress( LZ_Decoder * const decoder, const int infd, const int outfd,
+ const Pretty_print & pp, const bool testing )
+ {
+ const int in_buffer_size = 65536, out_buffer_size = 8 * in_buffer_size;
+ uint8_t in_buffer[in_buffer_size], out_buffer[out_buffer_size];
+
+ while( true )
+ {
+ int in_size = std::min( LZ_decompress_write_size( decoder ), in_buffer_size );
+ if( in_size > 0 )
+ {
+ const int max_in_size = in_size;
+ in_size = readblock( infd, in_buffer, max_in_size );
+ if( in_size != max_in_size && errno )
+ { pp(); show_error( "read error", errno ); return 1; }
+ if( in_size == 0 ) LZ_decompress_finish( decoder );
+ else if( in_size != LZ_decompress_write( decoder, in_buffer, in_size ) )
+ internal_error( "library error (LZ_decompress_write)" );
+ }
+ int out_size = LZ_decompress_read( decoder, out_buffer, out_buffer_size );
+// std::fprintf( stderr, "%5d in_size, %6d out_size.\n", in_size, out_size );
+ if( out_size < 0 )
+ {
+ const LZ_Errno lz_errno = LZ_decompress_errno( decoder );
+ if( lz_errno == LZ_header_error )
+ {
+ if( LZ_decompress_total_out_size( decoder ) > 0 )
+ break; // trailing garbage
+ pp( "error reading member header" );
+ return 1;
+ }
+ if( lz_errno == LZ_mem_error )
+ {
+ pp( "not enough memory. Find a machine with more memory" );
+ return 1;
+ }
+ pp();
+ if( lz_errno == LZ_unexpected_eof )
+ {
+ if( verbosity >= 0 )
+ std::fprintf( stderr, "file ends unexpectedly at pos %lld\n",
+ LZ_decompress_total_in_size( decoder ) );
+ return 2;
+ }
+ if( verbosity >= 0 )
+ std::fprintf( stderr, "LZ_decompress_read error: %s.\n",
+ LZ_strerror( LZ_decompress_errno( decoder ) ) );
+ return 1;
+ }
+ else if( out_size > 0 && outfd >= 0 )
+ {
+ const int wr = writeblock( outfd, out_buffer, out_size );
+ if( wr != out_size )
+ { pp(); show_error( "write error", errno ); return 1; }
+ }
+ if( LZ_decompress_finished( decoder ) == 1 ) break;
+ if( in_size == 0 && out_size == 0 )
+ internal_error( "library error (LZ_decompress_read)" );
+ }
+ if( verbosity >= 1 )
+ { if( testing ) std::fprintf( stderr, "ok\n" );
+ else std::fprintf( stderr, "done\n" ); }
+ return 0;
+ }
+
+} // end namespace
+
+
+int decompress( const int infd, const int outfd, const Pretty_print & pp,
+ const bool testing )
+ {
+ LZ_Decoder * const decoder = LZ_decompress_open();
+ int retval;
+
+ if( !decoder || LZ_decompress_errno( decoder ) != LZ_ok )
+ {
+ pp( "not enough memory" );
+ retval = 1;
+ }
+ else retval = do_decompress( decoder, infd, outfd, pp, testing );
+ LZ_decompress_close( decoder );
+ return retval;
+ }
diff --git a/doc/plzip.1 b/doc/plzip.1
index feb0aa1..148c95b 100644
--- a/doc/plzip.1
+++ b/doc/plzip.1
@@ -1,12 +1,12 @@
.\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.36.
-.TH PLZIP "1" "January 2010" "Plzip 0.3" "User Commands"
+.TH PLZIP "1" "January 2010" "Plzip 0.4" "User Commands"
.SH NAME
Plzip \- data compressor based on the LZMA algorithm
.SH SYNOPSIS
.B plzip
[\fIoptions\fR] [\fIfiles\fR]
.SH DESCRIPTION
-Plzip \- A parallel version of the lzip data compressor.
+Plzip \- A parallel compressor compatible with lzip.
.SH OPTIONS
.TP
\fB\-h\fR, \fB\-\-help\fR
@@ -71,6 +71,7 @@ Plzip home page: http://www.nongnu.org/lzip/plzip.html
Copyright \(co 2009 Laszlo Ersek.
.br
Copyright \(co 2010 Antonio Diaz Diaz.
+Using Lzlib 0.9-rc1
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
.br
This is free software: you are free to change and redistribute it.
diff --git a/doc/plzip.info b/doc/plzip.info
index 9a48fcb..c4dc967 100644
--- a/doc/plzip.info
+++ b/doc/plzip.info
@@ -3,7 +3,7 @@ plzip.texinfo.
INFO-DIR-SECTION Data Compression
START-INFO-DIR-ENTRY
-* Plzip: (plzip). Parallel version of the lzip data compressor
+* Plzip: (plzip). Parallel compressor compatible with lzip
END-INFO-DIR-ENTRY

@@ -12,7 +12,7 @@ File: plzip.info, Node: Top, Next: Introduction, Up: (dir)
Plzip Manual
************
-This manual is for Plzip (version 0.3, 24 January 2010).
+This manual is for Plzip (version 0.4, 31 January 2010).
* Menu:
@@ -34,11 +34,13 @@ File: plzip.info, Node: Introduction, Next: Invoking Plzip, Prev: Top, Up: T
1 Introduction
**************
-Plzip is a parallel version of the lzip data compressor. The files
-produced by plzip are fully compatible with lzip-1.4 or newer. Plzip is
-intended for faster compression/decompression of big files on
-multiprocessor machines. Currently only compression is performed in
-parallel. Parallel decompression is planned to be implemented later.
+Plzip is a massively parallel (multithreaded) data compressor compatible
+with the lzip file format. The files produced by plzip are fully
+compatible with lzip-1.4 or newer. Plzip is intended for faster
+compression/decompression of big files on multiprocessor machines. On
+files big enough, plzip can use hundreds of processors. Currently only
+compression is performed in parallel. Parallel decompression is planned
+to be implemented later.
Lzip is a lossless data compressor based on the LZMA algorithm, with
very safe integrity checking and a user interface similar to the one of
@@ -303,11 +305,11 @@ Concept Index

Tag Table:
-Node: Top227
-Node: Introduction750
-Node: Invoking Plzip3571
-Node: File Format7260
-Node: Problems9216
-Node: Concept Index9745
+Node: Top223
+Node: Introduction746
+Node: Invoking Plzip3669
+Node: File Format7358
+Node: Problems9314
+Node: Concept Index9843

End Tag Table
diff --git a/doc/plzip.texinfo b/doc/plzip.texinfo
index 04bf822..12ac4c8 100644
--- a/doc/plzip.texinfo
+++ b/doc/plzip.texinfo
@@ -5,12 +5,12 @@
@finalout
@c %**end of header
-@set UPDATED 24 January 2010
-@set VERSION 0.3
+@set UPDATED 31 January 2010
+@set VERSION 0.4
@dircategory Data Compression
@direntry
-* Plzip: (plzip). Parallel version of the lzip data compressor
+* Plzip: (plzip). Parallel compressor compatible with lzip
@end direntry
@@ -50,11 +50,13 @@ to copy, distribute and modify it.
@chapter Introduction
@cindex introduction
-Plzip is a parallel version of the lzip data compressor. The files
-produced by plzip are fully compatible with lzip-1.4 or newer. Plzip is
-intended for faster compression/decompression of big files on
-multiprocessor machines. Currently only compression is performed in
-parallel. Parallel decompression is planned to be implemented later.
+Plzip is a massively parallel (multithreaded) data compressor compatible
+with the lzip file format. The files produced by plzip are fully
+compatible with lzip-1.4 or newer. Plzip is intended for faster
+compression/decompression of big files on multiprocessor machines. On
+files big enough, plzip can use hundreds of processors. Currently only
+compression is performed in parallel. Parallel decompression is planned
+to be implemented later.
Lzip is a lossless data compressor based on the LZMA algorithm, with
very safe integrity checking and a user interface similar to the one of
diff --git a/main.cc b/main.cc
index f3f12e0..db70e00 100644
--- a/main.cc
+++ b/main.cc
@@ -1,4 +1,4 @@
-/* Plzip - A parallel version of the lzip data compressor
+/* Plzip - A parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek.
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
@@ -58,8 +58,6 @@
#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL
#endif
-void internal_error( const char * msg );
-
namespace {
@@ -93,44 +91,10 @@ bool delete_output_on_interrupt = false;
pthread_t main_thread;
pid_t main_thread_pid;
-class Pretty_print
- {
- const char * const stdin_name;
- const unsigned int stdin_name_len;
- unsigned int longest_name;
- std::string name_;
- mutable bool first_post;
-
-public:
- Pretty_print( const std::vector< std::string > & filenames )
- : stdin_name( "(stdin)" ), stdin_name_len( std::strlen( stdin_name ) ),
- longest_name( 0 ), first_post( false )
- {
- for( unsigned int i = 0; i < filenames.size(); ++i )
- {
- const std::string & s = filenames[i];
- const unsigned int len = ( ( s == "-" ) ? stdin_name_len : s.size() );
- if( len > longest_name ) longest_name = len;
- }
- if( longest_name == 0 ) longest_name = stdin_name_len;
- }
-
- void set_name( const std::string & filename )
- {
- if( filename.size() && filename != "-" ) name_ = filename;
- else name_ = stdin_name;
- first_post = true;
- }
-
- void reset() const throw() { if( name_.size() ) first_post = true; }
- const char * name() const throw() { return name_.c_str(); }
- void operator()( const char * const msg = 0 ) const throw();
- };
-
void show_help() throw()
{
- std::printf( "%s - A parallel version of the lzip data compressor.\n", Program_name );
+ std::printf( "%s - A parallel compressor compatible with lzip.\n", Program_name );
std::printf( "\nUsage: %s [options] [files]\n", invocation_name );
std::printf( "\nOptions:\n" );
std::printf( " -h, --help display this help and exit\n" );
@@ -154,7 +118,7 @@ void show_help() throw()
std::printf( " --best alias for -9\n" );
if( verbosity > 0 )
{
- std::printf( " -D, --debug=<level> (0-3) print debug statistics to stderr\n" );
+ std::printf( " -D, --debug=<level> (0-1) print debug statistics to stderr\n" );
}
std::printf( "If no file names are given, %s compresses or decompresses\n", program_name );
std::printf( "from standard input to standard output.\n" );
@@ -170,6 +134,7 @@ void show_version() throw()
std::printf( "%s %s\n", Program_name, PROGVERSION );
std::printf( "Copyright (C) 2009 Laszlo Ersek.\n" );
std::printf( "Copyright (C) %s Antonio Diaz Diaz.\n", program_year );
+ std::printf( "Using Lzlib %s\n", LZ_version() );
std::printf( "License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>\n" );
std::printf( "This is free software: you are free to change and redistribute it.\n" );
std::printf( "There is NO WARRANTY, to the extent permitted by law.\n" );
@@ -413,89 +378,6 @@ void close_and_set_permissions( const struct stat * const in_statsp )
}
-int do_decompress( LZ_Decoder * const decoder, const int inhandle,
- const Pretty_print & pp, const bool testing )
- {
- const int in_buffer_size = 65536, out_buffer_size = 8 * in_buffer_size;
- uint8_t in_buffer[in_buffer_size], out_buffer[out_buffer_size];
-
- while( true )
- {
- int in_size = std::min( LZ_decompress_write_size( decoder ), in_buffer_size );
- if( in_size > 0 )
- {
- const int max_in_size = in_size;
- in_size = readblock( inhandle, in_buffer, max_in_size );
- if( in_size != max_in_size && errno )
- { pp(); show_error( "read error", errno ); return 1; }
- if( in_size == 0 ) LZ_decompress_finish( decoder );
- else if( in_size != LZ_decompress_write( decoder, in_buffer, in_size ) )
- internal_error( "library error (LZ_decompress_write)" );
- }
- int out_size = LZ_decompress_read( decoder, out_buffer, out_buffer_size );
-// std::fprintf( stderr, "%5d in_size, %6d out_size.\n", in_size, out_size );
- if( out_size < 0 )
- {
- const LZ_Errno lz_errno = LZ_decompress_errno( decoder );
- if( lz_errno == LZ_header_error )
- {
- if( LZ_decompress_total_out_size( decoder ) > 0 )
- break; // trailing garbage
- pp( "error reading member header" );
- return 1;
- }
- if( lz_errno == LZ_mem_error )
- {
- pp( "not enough memory. Find a machine with more memory" );
- return 1;
- }
- pp();
- if( lz_errno == LZ_unexpected_eof )
- {
- if( verbosity >= 0 )
- std::fprintf( stderr, "file ends unexpectedly at pos %lld\n",
- LZ_decompress_total_in_size( decoder ) );
- return 2;
- }
- if( verbosity >= 0 )
- std::fprintf( stderr, "LZ_decompress_read error: %s.\n",
- LZ_strerror( LZ_decompress_errno( decoder ) ) );
- return 1;
- }
- else if( out_size > 0 && outhandle >= 0 )
- {
- const int wr = writeblock( outhandle, out_buffer, out_size );
- if( wr != out_size )
- { pp(); show_error( "write error", errno ); return 1; }
- }
- if( LZ_decompress_finished( decoder ) == 1 ) break;
- if( in_size == 0 && out_size == 0 )
- internal_error( "library error (LZ_decompress_read)" );
- }
- if( verbosity >= 1 )
- { if( testing ) std::fprintf( stderr, "ok\n" );
- else std::fprintf( stderr, "done\n" ); }
- return 0;
- }
-
-
-int decompress( const int inhandle, const Pretty_print & pp,
- const bool testing )
- {
- LZ_Decoder * const decoder = LZ_decompress_open();
- int retval;
-
- if( !decoder || LZ_decompress_errno( decoder ) != LZ_ok )
- {
- pp( "not enough memory" );
- retval = 1;
- }
- else retval = do_decompress( decoder, inhandle, pp, testing );
- LZ_decompress_close( decoder );
- return retval;
- }
-
-
extern "C" void signal_handler( int sig ) throw()
{
if( !pthread_equal( pthread_self(), main_thread ) )
@@ -810,7 +692,7 @@ int main( const int argc, const char * argv[] )
encoder_options.match_len_limit, num_workers,
num_slots, inhandle, outhandle, debug_level );
else
- tmp = decompress( inhandle, pp, program_mode == m_test );
+ tmp = decompress( inhandle, outhandle, pp, program_mode == m_test );
if( tmp > retval ) retval = tmp;
if( tmp && program_mode != m_test ) cleanup_and_fail( retval );
diff --git a/plzip.cc b/plzip.cc
deleted file mode 100644
index dcae860..0000000
--- a/plzip.cc
+++ /dev/null
@@ -1,548 +0,0 @@
-/* Plzip - A parallel version of the lzip data compressor
- Copyright (C) 2009 Laszlo Ersek.
- Copyright (C) 2009, 2010 Antonio Diaz Diaz.
-
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#define _FILE_OFFSET_BITS 64
-
-#include <algorithm>
-#include <cassert>
-#include <cerrno>
-#include <climits>
-#include <csignal>
-#include <cstdio>
-#include <cstdlib>
-#include <vector>
-#include <pthread.h>
-#include <stdint.h>
-#include <unistd.h>
-#include <lzlib.h>
-
-#include "plzip.h"
-
-#ifndef LLONG_MAX
-#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL
-#endif
-#ifndef LLONG_MIN
-#define LLONG_MIN (-LLONG_MAX - 1LL)
-#endif
-#ifndef ULLONG_MAX
-#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL
-#endif
-
-
-namespace {
-
-long long in_size = 0;
-long long out_size = 0;
-
-void *(*mallocf)( size_t size );
-void (*freef)( void *ptr );
-
-
-void * trace_malloc( size_t size )
- {
- int save_errno = 0;
-
- void * ret = malloc( size );
- if( ret == 0 ) save_errno = errno;
- std::fprintf( stderr, "malloc(%lu) == %p\n", (unsigned long)size, ret );
- if( ret == 0 ) errno = save_errno;
- return ret;
- }
-
-
-void trace_free( void *ptr )
- {
- std::fprintf( stderr, "free(%p)\n", ptr );
- free( ptr );
- }
-
-
-void * xalloc( size_t size )
- {
- void *ret = (*mallocf)( size );
- if( ret == 0 ) { show_error( "not enough memory", errno ); fatal(); }
- return ret;
- }
-
-
-void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex )
- {
- int ret = pthread_mutex_init( mutex, 0 );
- if( ret != 0 ) { show_error( "pthread_mutex_init", ret ); fatal(); }
-
- ret = pthread_cond_init( cond, 0 );
- if( ret != 0 ) { show_error( "pthread_cond_init", ret ); fatal(); }
- }
-
-
-void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex )
- {
- int ret = pthread_cond_destroy( cond );
- if( ret != 0 ) { show_error( "pthread_cond_destroy", ret ); fatal(); }
-
- ret = pthread_mutex_destroy( mutex );
- if( ret != 0 ) { show_error( "pthread_mutex_destroy", ret ); fatal(); }
- }
-
-
-void xlock( pthread_mutex_t * mutex )
- {
- int ret = pthread_mutex_lock( mutex );
- if( ret != 0 ) { show_error( "pthread_mutex_lock", ret ); fatal(); }
- }
-
-
-void xunlock( pthread_mutex_t * mutex )
- {
- int ret = pthread_mutex_unlock( mutex );
- if( ret != 0 ) { show_error( "pthread_mutex_unlock", ret ); fatal(); }
- }
-
-
-void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex )
- {
- int ret = pthread_cond_wait( cond, mutex );
- if( ret != 0 ) { show_error( "pthread_cond_wait", ret ); fatal(); }
- }
-
-
-void xsignal( pthread_cond_t * cond )
- {
- int ret = pthread_cond_signal( cond );
- if( ret != 0 ) { show_error( "pthread_cond_signal", ret ); fatal(); }
- }
-
-
-void xbroadcast( pthread_cond_t * cond )
- {
- int ret = pthread_cond_broadcast( cond );
- if( ret != 0 ) { show_error( "pthread_cond_broadcast", ret ); fatal(); }
- }
-
-
-void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg )
- {
- int ret = pthread_create( thread, 0, routine, arg );
- if( ret != 0 ) { show_error( "pthread_create", ret ); fatal(); }
- }
-
-
-void xjoin( pthread_t thread )
- {
- int ret = pthread_join( thread, 0 );
- if( ret != 0 ) { show_error( "pthread_join", ret ); fatal(); }
- }
-
-
-struct Slot_tally // Synchronizes splitter to muxer
- {
- unsigned long check_counter;
- unsigned long wait_counter;
- int num_free; // Number of free slots
- pthread_mutex_t mutex;
- pthread_cond_t slot_av; // Free slot available
-
- Slot_tally( const int slots )
- : check_counter( 0 ), wait_counter( 0 ), num_free( slots )
- { xinit( &slot_av, &mutex ); }
-
- ~Slot_tally() { xdestroy( &slot_av, &mutex ); }
- };
-
-
-struct S2w_blk // Splitter to worker data block
- {
- unsigned long long id; // Block serial number as read from infd
- S2w_blk *next; // Next in queue
- int loaded; // # of bytes in plain, may be 0 for 1st
- uint8_t plain[1]; // Data read from infd, allocated: data_size
- };
-
-
-struct S2w_queue
- {
- S2w_blk * head; // Next ready worker shall compress this
- S2w_blk * tail; // Splitter will append here
- unsigned long check_counter;
- unsigned long wait_counter;
- pthread_mutex_t mutex;
- pthread_cond_t av_or_eof; // New block available or splitter done
- bool eof; // Splitter done
-
- S2w_queue()
- : head( 0 ), tail( 0 ), check_counter( 0 ), wait_counter( 0 ), eof( false )
- { xinit( &av_or_eof, &mutex ); }
-
- ~S2w_queue() { xdestroy( &av_or_eof, &mutex ); }
- };
-
-
-struct W2m_blk // Worker to muxer data block
- {
- unsigned long long id; // Block index as read from infd
- W2m_blk *next; // Next block in list (unordered)
- int produced; // Number of bytes in compr
- uint8_t compr[1]; // Data to write to outfd, alloc.: compr_size
- };
-
-
-struct W2m_queue
- {
- unsigned long long needed_id; // Block needed for resuming writing
- W2m_blk *head; // Block list (unordered)
- unsigned long check_counter;
- unsigned long wait_counter;
- int num_working; // Number of workers still running
- pthread_mutex_t mutex;
- pthread_cond_t av_or_exit; // New block available or all workers exited
-
- W2m_queue( const int num_workers )
- : needed_id( 0 ), head( 0 ), check_counter( 0 ), wait_counter( 0 ),
- num_working( num_workers )
- { xinit( &av_or_exit, &mutex ); }
-
- ~W2m_queue() { xdestroy( &av_or_exit, &mutex ); }
- };
-
-
-struct Splitter_arg
- {
- Slot_tally * slot_tally;
- S2w_queue * s2w_queue;
- int infd;
- int data_size;
- int s2w_blk_size;
- };
-
-
-void * splitter( void * arg )
- {
- const Splitter_arg & tmp = *(Splitter_arg *)arg;
- Slot_tally & slot_tally = *tmp.slot_tally;
- S2w_queue & s2w_queue = *tmp.s2w_queue;
- const int infd = tmp.infd;
- const int data_size = tmp.data_size;
- const int s2w_blk_size = tmp.s2w_blk_size;
-
- for( unsigned long long id = 0; ; ++id )
- {
- S2w_blk * s2w_blk = (S2w_blk *)xalloc( s2w_blk_size );
-
- // Fill block
- const int rd = readblock( infd, s2w_blk->plain, data_size );
- if( rd != data_size && errno ) { show_error( "read", errno ); fatal(); }
-
- if( rd > 0 || id == 0 ) // first block can be empty
- {
- s2w_blk->id = id;
- s2w_blk->next = 0;
- s2w_blk->loaded = rd;
- in_size += rd;
- xlock( &slot_tally.mutex ); // Grab a free slot
- ++slot_tally.check_counter;
- while( slot_tally.num_free == 0 )
- {
- ++slot_tally.wait_counter;
- xwait( &slot_tally.slot_av, &slot_tally.mutex );
- }
- --slot_tally.num_free;
- xunlock( &slot_tally.mutex );
- }
- else
- { (*freef)( s2w_blk ); s2w_blk = 0; }
-
- xlock( &s2w_queue.mutex );
- if( s2w_blk != 0 )
- {
- if( s2w_queue.tail == 0 ) s2w_queue.head = s2w_blk;
- else s2w_queue.tail->next = s2w_blk;
- s2w_queue.tail = s2w_blk;
- xsignal( &s2w_queue.av_or_eof );
- }
- else
- {
- s2w_queue.eof = true;
- xbroadcast( &s2w_queue.av_or_eof );
- }
- xunlock( &s2w_queue.mutex );
-
- if( s2w_blk == 0 ) break;
- }
- return 0;
- }
-
-
-void work_compr( const int dictionary_size, const int match_len_limit,
- const S2w_blk & s2w_blk, W2m_queue & w2m_queue,
- const int compr_size, const int w2m_blk_size )
- {
- assert( s2w_blk.loaded > 0 || s2w_blk.id == 0 );
-
- W2m_blk * w2m_blk = (W2m_blk *)xalloc( w2m_blk_size );
-
- const int dict_size = std::max( LZ_min_dictionary_size(),
- std::min( dictionary_size, s2w_blk.loaded ) );
- LZ_Encoder * const encoder =
- LZ_compress_open( dict_size, match_len_limit, LLONG_MAX );
- if( !encoder || LZ_compress_errno( encoder ) != LZ_ok )
- { show_error( "LZ_compress_open failed." ); fatal(); }
-
- int written = 0;
- w2m_blk->produced = 0;
- while( true )
- {
- if( LZ_compress_write_size( encoder ) > 0 )
- {
- if( written < s2w_blk.loaded )
- {
- const int wr = LZ_compress_write( encoder, s2w_blk.plain + written,
- s2w_blk.loaded - written );
- if( wr < 0 ) { show_error( "LZ_compress_write failed." ); fatal(); }
- written += wr;
- }
- if( written >= s2w_blk.loaded ) LZ_compress_finish( encoder );
- }
- assert( w2m_blk->produced < compr_size );
- const int rd = LZ_compress_read( encoder, w2m_blk->compr + w2m_blk->produced,
- compr_size - w2m_blk->produced );
- if( rd < 0 ) { show_error( "LZ_compress_read failed." ); fatal(); }
- w2m_blk->produced += rd;
- if( LZ_compress_finished( encoder ) == 1 ) break;
- }
-
- if( LZ_compress_close( encoder ) < 0 )
- { show_error( "LZ_compress_close failed." ); fatal(); }
-
- w2m_blk->id = s2w_blk.id;
-
- // Push block to muxer queue
- xlock( &w2m_queue.mutex );
- w2m_blk->next = w2m_queue.head;
- w2m_queue.head = w2m_blk;
- if( w2m_blk->id == w2m_queue.needed_id ) xsignal( &w2m_queue.av_or_exit );
- xunlock( &w2m_queue.mutex );
- }
-
-
-struct Worker_arg
- {
- int dictionary_size;
- int match_len_limit;
- S2w_queue * s2w_queue;
- W2m_queue * w2m_queue;
- int compr_size;
- int w2m_blk_size;
- };
-
-
-void * worker( void * arg )
- {
- const Worker_arg & tmp = *(Worker_arg *)arg;
- const int dictionary_size = tmp.dictionary_size;
- const int match_len_limit = tmp.match_len_limit;
- S2w_queue & s2w_queue = *tmp.s2w_queue;
- W2m_queue & w2m_queue = *tmp.w2m_queue;
- const int compr_size = tmp.compr_size;
- const int w2m_blk_size = tmp.w2m_blk_size;
-
- while( true )
- {
- S2w_blk *s2w_blk;
-
- // Grab a block to work on
- xlock( &s2w_queue.mutex );
- ++s2w_queue.check_counter;
- while( s2w_queue.head == 0 && !s2w_queue.eof )
- {
- ++s2w_queue.wait_counter;
- xwait( &s2w_queue.av_or_eof, &s2w_queue.mutex );
- }
- if( s2w_queue.head == 0 ) // No blocks available and splitter exited
- {
- xunlock( &s2w_queue.mutex );
- break;
- }
- s2w_blk = s2w_queue.head;
- s2w_queue.head = s2w_blk->next;
- if( s2w_queue.head == 0 ) s2w_queue.tail = 0;
- xunlock( &s2w_queue.mutex );
-
- work_compr( dictionary_size, match_len_limit, *s2w_blk, w2m_queue,
- compr_size, w2m_blk_size );
- (*freef)( s2w_blk );
- }
-
- // Notify muxer when last worker exits
- xlock( &w2m_queue.mutex );
- if( --w2m_queue.num_working == 0 && w2m_queue.head == 0 )
- xsignal( &w2m_queue.av_or_exit );
- xunlock( &w2m_queue.mutex );
- return 0;
- }
-
-
-void muxer( Slot_tally & slot_tally, W2m_queue & w2m_queue,
- const int num_slots, const int outfd )
- {
- unsigned long long needed_id = 0;
- std::vector< W2m_blk * > circular_buffer( num_slots, (W2m_blk *)0 );
-
- xlock( &w2m_queue.mutex );
- while( true )
- {
- // Grab all available compressed blocks in one step
- ++w2m_queue.check_counter;
- while( w2m_queue.head == 0 && w2m_queue.num_working > 0 )
- {
- ++w2m_queue.wait_counter;
- xwait( &w2m_queue.av_or_exit, &w2m_queue.mutex );
- }
- if( w2m_queue.head == 0 ) break; // queue is empty. all workers exited
-
- W2m_blk * w2m_blk = w2m_queue.head;
- w2m_queue.head = 0;
- xunlock( &w2m_queue.mutex );
-
- // Merge blocks fetched this time into circular buffer
- do {
- // id collision shouldn't happen
- assert( circular_buffer[w2m_blk->id%num_slots] == 0 );
- circular_buffer[w2m_blk->id%num_slots] = w2m_blk;
- W2m_blk * next = w2m_blk->next;
- w2m_blk->next = 0;
- w2m_blk = next;
- } while( w2m_blk != 0 );
-
- // Write out initial continuous sequence of reordered blocks
- while( true )
- {
- w2m_blk = circular_buffer[needed_id%num_slots];
- if( w2m_blk == 0 ) break;
-
- out_size += w2m_blk->produced;
-
- if( outfd >= 0 )
- {
- const int wr = writeblock( outfd, w2m_blk->compr, w2m_blk->produced );
- if( wr != w2m_blk->produced )
- { show_error( "write", errno ); fatal(); }
- }
- circular_buffer[needed_id%num_slots] = 0;
- ++needed_id;
-
- xlock( &slot_tally.mutex );
- if( slot_tally.num_free++ == 0 ) xsignal( &slot_tally.slot_av );
- xunlock( &slot_tally.mutex );
-
- (*freef)( w2m_blk );
- }
-
- xlock( &w2m_queue.mutex );
- w2m_queue.needed_id = needed_id;
- }
- xunlock( &w2m_queue.mutex );
-
- for( int i = 0; i < num_slots; ++i )
- if( circular_buffer[i] != 0 )
- { show_error( "circular buffer not empty" ); fatal(); }
- }
-
-} // end namespace
-
-
-int compress( const int data_size, const int dictionary_size,
- const int match_len_limit, const int num_workers,
- const int num_slots, const int infd, const int outfd,
- const int debug_level )
- {
- if( debug_level & 2 ) { mallocf = trace_malloc; freef = trace_free; }
- else { mallocf = malloc; freef = free; }
-
- Slot_tally slot_tally( num_slots );
- S2w_queue s2w_queue;
- W2m_queue w2m_queue( num_workers );
-
- Splitter_arg splitter_arg;
- splitter_arg.slot_tally = &slot_tally;
- splitter_arg.s2w_queue = &s2w_queue;
- splitter_arg.infd = infd;
- splitter_arg.data_size = data_size;
- splitter_arg.s2w_blk_size = sizeof (S2w_blk) + data_size - 1;
-
- pthread_t splitter_thread;
- xcreate( &splitter_thread, splitter, &splitter_arg );
-
- Worker_arg worker_arg;
- worker_arg.dictionary_size = dictionary_size;
- worker_arg.match_len_limit = match_len_limit;
- worker_arg.s2w_queue = &s2w_queue;
- worker_arg.w2m_queue = &w2m_queue;
- worker_arg.compr_size = 6 + 20 + ( ( data_size / 8 ) * 9 );
- worker_arg.w2m_blk_size = sizeof (W2m_blk) + worker_arg.compr_size - 1;
-
- pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers];
- if( worker_threads == 0 )
- { show_error( "not enough memory.", errno ); fatal(); }
- for( int i = 0; i < num_workers; ++i )
- xcreate( &worker_threads[i], worker, &worker_arg );
-
- muxer( slot_tally, w2m_queue, num_slots, outfd );
-
- for( int i = num_workers - 1; i >= 0; --i )
- xjoin(worker_threads[i]);
- delete[] worker_threads; worker_threads = 0;
-
- xjoin( splitter_thread );
-
- if( verbosity >= 1 )
- {
- if( in_size <= 0 || out_size <= 0 )
- std::fprintf( stderr, "no data compressed.\n" );
- else
- std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, "
- "%5.2f%% saved, %lld in, %lld out.\n",
- (double)in_size / out_size,
- ( 8.0 * out_size ) / in_size,
- 100.0 * ( 1.0 - ( (double)out_size / in_size ) ),
- in_size, out_size );
- }
-
- const int FW = ( sizeof (unsigned long) * 8 ) / 3 + 1;
- if( debug_level & 1 )
- std::fprintf( stderr,
- "any worker tried to consume from splitter: %*lu\n"
- "any worker stalled : %*lu\n"
- "muxer tried to consume from workers : %*lu\n"
- "muxer stalled : %*lu\n"
- "splitter tried to fill a block : %*lu\n"
- "splitter stalled : %*lu\n",
- FW, s2w_queue.check_counter,
- FW, s2w_queue.wait_counter,
- FW, w2m_queue.check_counter,
- FW, w2m_queue.wait_counter,
- FW, slot_tally.check_counter,
- FW, slot_tally.wait_counter );
-
- assert( slot_tally.num_free == num_slots );
- assert( s2w_queue.eof );
- assert( s2w_queue.head == 0 );
- assert( s2w_queue.tail == 0 );
- assert( w2m_queue.num_working == 0 );
- assert( w2m_queue.head == 0 );
- return 0;
- }
diff --git a/plzip.h b/plzip.h
index d7bb760..6615fc1 100644
--- a/plzip.h
+++ b/plzip.h
@@ -1,4 +1,4 @@
-/* Plzip - A parallel version of the lzip data compressor
+/* Plzip - A parallel compressor compatible with lzip
Copyright (C) 2009 Laszlo Ersek.
Copyright (C) 2009, 2010 Antonio Diaz Diaz.
@@ -16,19 +16,113 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+class Pretty_print
+ {
+ const char * const stdin_name;
+ const unsigned int stdin_name_len;
+ unsigned int longest_name;
+ std::string name_;
+ mutable bool first_post;
+
+public:
+ Pretty_print( const std::vector< std::string > & filenames )
+ : stdin_name( "(stdin)" ), stdin_name_len( std::strlen( stdin_name ) ),
+ longest_name( 0 ), first_post( false )
+ {
+ for( unsigned int i = 0; i < filenames.size(); ++i )
+ {
+ const std::string & s = filenames[i];
+ const unsigned int len = ( ( s == "-" ) ? stdin_name_len : s.size() );
+ if( len > longest_name ) longest_name = len;
+ }
+ if( longest_name == 0 ) longest_name = stdin_name_len;
+ }
+
+ void set_name( const std::string & filename )
+ {
+ if( filename.size() && filename != "-" ) name_ = filename;
+ else name_ = stdin_name;
+ first_post = true;
+ }
+
+ void reset() const throw() { if( name_.size() ) first_post = true; }
+ const char * name() const throw() { return name_.c_str(); }
+ void operator()( const char * const msg = 0 ) const throw();
+ };
+
+
+/*--------------------- Defined in compress.cc ---------------------*/
+
+void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex );
+void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex );
+void xlock( pthread_mutex_t * mutex );
+void xunlock( pthread_mutex_t * mutex );
+void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex );
+void xsignal( pthread_cond_t * cond );
+void xbroadcast( pthread_cond_t * cond );
+void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg );
+void xjoin( pthread_t thread );
+
+
+class Slot_tally
+ {
+public:
+ unsigned long check_counter;
+ unsigned long wait_counter;
+private:
+ const int num_slots; // total slots
+ int num_free; // remaining free slots
+ pthread_mutex_t mutex;
+ pthread_cond_t slot_av; // free slot available
+
+public:
+ Slot_tally( const int slots )
+ : check_counter( 0 ), wait_counter( 0 ),
+ num_slots( slots ), num_free( slots )
+ { xinit( &slot_av, &mutex ); }
+
+ ~Slot_tally() { xdestroy( &slot_av, &mutex ); }
+
+ bool all_free() { return ( num_free == num_slots ); }
+
+ void get_slot() // wait for a free slot
+ {
+ xlock( &mutex );
+ ++check_counter;
+ while( num_free == 0 )
+ { ++wait_counter; xwait( &slot_av, &mutex ); }
+ --num_free;
+ xunlock( &mutex );
+ }
+
+ void leave_slot() // return a slot to the tally
+ {
+ xlock( &mutex );
+ if( num_free++ == 0 ) xsignal( &slot_av );
+ xunlock( &mutex );
+ }
+ };
+
+
int compress( const int data_size, const int dictionary_size,
const int match_len_limit, const int num_workers,
const int num_slots, const int infd, const int outfd,
const int debug_level );
+/*-------------------- Defined in decompress.cc --------------------*/
+
+int decompress( const int infd, const int outfd, const Pretty_print & pp,
+ const bool testing );
+
+
/*----------------------- Defined in main.cc -----------------------*/
+extern int verbosity;
+
void show_error( const char * msg, const int errcode = 0, const bool help = false ) throw();
+void internal_error( const char * msg );
int readblock( const int fd, uint8_t * buf, const int size ) throw();
int writeblock( const int fd, const uint8_t * buf, const int size ) throw();
-
void fatal(); // Terminate the process
-
-extern int verbosity;
diff --git a/testsuite/check.sh b/testsuite/check.sh
index 409e563..f71faca 100755
--- a/testsuite/check.sh
+++ b/testsuite/check.sh
@@ -1,5 +1,5 @@
#! /bin/sh
-# check script for Plzip - A parallel version of the lzip data compressor
+# check script for Plzip - A parallel compressor compatible with lzip
# Copyright (C) 2009, 2010 Antonio Diaz Diaz.
#
# This script is free software: you have unlimited permission