From f6e0a844e295b58ae820dde5f12c557dc42c3624 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 7 Nov 2015 16:11:56 +0100 Subject: Merging upstream version 0.4. Signed-off-by: Daniel Baumann --- ChangeLog | 7 + Makefile.in | 27 +-- NEWS | 13 +- README | 6 +- compress.cc | 460 ++++++++++++++++++++++++++++++++++++++++++++ configure | 10 +- decompress.cc | 123 ++++++++++++ doc/plzip.1 | 5 +- doc/plzip.info | 28 +-- doc/plzip.texinfo | 18 +- main.cc | 128 +------------ plzip.cc | 548 ----------------------------------------------------- plzip.h | 102 +++++++++- testsuite/check.sh | 2 +- 14 files changed, 750 insertions(+), 727 deletions(-) create mode 100644 compress.cc create mode 100644 decompress.cc delete mode 100644 plzip.cc diff --git a/ChangeLog b/ChangeLog index b0ef7eb..194ad70 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,10 @@ +2010-01-31 Antonio Diaz Diaz + + * Version 0.4 released. + * main.cc (show_version): Show the version of lzlib being used. + * Code reorganization. Class Packet_courier now coordinates data + movement and synchronization among threads. + 2010-01-24 Antonio Diaz Diaz * Version 0.3 released. diff --git a/Makefile.in b/Makefile.in index 6eea4bf..ca6b899 100644 --- a/Makefile.in +++ b/Makefile.in @@ -7,7 +7,7 @@ INSTALL_DIR = $(INSTALL) -d -m 755 LIBS = -llz -lpthread SHELL = /bin/sh -objs = arg_parser.o plzip.o main.o +objs = arg_parser.o compress.o decompress.o main.o .PHONY : all install install-info install-man install-strip \ @@ -30,8 +30,9 @@ main.o : main.cc $(objs) : Makefile arg_parser.o : arg_parser.h +compress.o : plzip.h +decompress.o : plzip.h main.o : arg_parser.h plzip.h -plzip.o : plzip.h doc : info man @@ -53,30 +54,30 @@ check : all @$(VPATH)/testsuite/check.sh $(VPATH)/testsuite install : all install-info install-man - if [ ! -d $(DESTDIR)$(bindir) ] ; then $(INSTALL_DIR) $(DESTDIR)$(bindir) ; fi - $(INSTALL_PROGRAM) ./$(progname) $(DESTDIR)$(bindir)/$(progname) + if [ ! -d "$(DESTDIR)$(bindir)" ] ; then $(INSTALL_DIR) "$(DESTDIR)$(bindir)" ; fi + $(INSTALL_PROGRAM) ./$(progname) "$(DESTDIR)$(bindir)/$(progname)" install-info : - if [ ! -d $(DESTDIR)$(infodir) ] ; then $(INSTALL_DIR) $(DESTDIR)$(infodir) ; fi - $(INSTALL_DATA) $(VPATH)/doc/$(pkgname).info $(DESTDIR)$(infodir)/$(pkgname).info - -install-info --info-dir=$(DESTDIR)$(infodir) $(DESTDIR)$(infodir)/$(pkgname).info + if [ ! -d "$(DESTDIR)$(infodir)" ] ; then $(INSTALL_DIR) "$(DESTDIR)$(infodir)" ; fi + $(INSTALL_DATA) $(VPATH)/doc/$(pkgname).info "$(DESTDIR)$(infodir)/$(pkgname).info" + -install-info --info-dir="$(DESTDIR)$(infodir)" "$(DESTDIR)$(infodir)/$(pkgname).info" install-man : - if [ ! -d $(DESTDIR)$(mandir)/man1 ] ; then $(INSTALL_DIR) $(DESTDIR)$(mandir)/man1 ; fi - $(INSTALL_DATA) $(VPATH)/doc/$(progname).1 $(DESTDIR)$(mandir)/man1/$(progname).1 + if [ ! -d "$(DESTDIR)$(mandir)/man1" ] ; then $(INSTALL_DIR) "$(DESTDIR)$(mandir)/man1" ; fi + $(INSTALL_DATA) $(VPATH)/doc/$(progname).1 "$(DESTDIR)$(mandir)/man1/$(progname).1" install-strip : all $(MAKE) INSTALL_PROGRAM='$(INSTALL_PROGRAM) -s' install uninstall : uninstall-info uninstall-man - -rm -f $(DESTDIR)$(bindir)/$(progname) + -rm -f "$(DESTDIR)$(bindir)/$(progname)" uninstall-info : - -install-info --info-dir=$(DESTDIR)$(infodir) --remove $(DESTDIR)$(infodir)/$(pkgname).info - -rm -f $(DESTDIR)$(infodir)/$(pkgname).info + -install-info --info-dir="$(DESTDIR)$(infodir)" --remove "$(DESTDIR)$(infodir)/$(pkgname).info" + -rm -f "$(DESTDIR)$(infodir)/$(pkgname).info" uninstall-man : - -rm -f $(DESTDIR)$(mandir)/man1/$(progname).1 + -rm -f "$(DESTDIR)$(mandir)/man1/$(progname).1" dist : doc ln -sf $(VPATH) $(DISTNAME) diff --git a/NEWS b/NEWS index 08b0074..4e4b50c 100644 --- a/NEWS +++ b/NEWS @@ -1,11 +1,6 @@ -Changes in version 0.3: +Changes in version 0.4: -New option "--data-size" has been added. +The option "--version" now shows the version of lzlib being used. -Output file is now removed if plzip is interrupted. - -This version automatically chooses the smallest possible dictionary size -for each member during compression, saving memory during decompression. - -Regular files are now open in binary mode in non-POSIX platforms -defining the O_BINARY macro. +A code reorganization has been made. The new class "Packet_courier" now +coordinates data movement and synchronization among threads. diff --git a/README b/README index c9b0c03..1f5b745 100644 --- a/README +++ b/README @@ -1,6 +1,10 @@ Description -Plzip is a parallel version of the lzip data compressor. Currently only +Plzip is a massively parallel (multithreaded) data compressor compatible +with the lzip file format. The files produced by plzip are fully +compatible with lzip-1.4 or newer. Plzip is intended for faster +compression/decompression of big files on multiprocessor machines. On +files big enough, plzip can use hundreds of processors. Currently only compression is performed in parallel. Parallel decompression is planned to be implemented later. diff --git a/compress.cc b/compress.cc new file mode 100644 index 0000000..e66b537 --- /dev/null +++ b/compress.cc @@ -0,0 +1,460 @@ +/* Plzip - A parallel compressor compatible with lzip + Copyright (C) 2009 Laszlo Ersek. + Copyright (C) 2009, 2010 Antonio Diaz Diaz. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#define _FILE_OFFSET_BITS 64 + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "plzip.h" + +#ifndef LLONG_MAX +#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL +#endif +#ifndef LLONG_MIN +#define LLONG_MIN (-LLONG_MAX - 1LL) +#endif +#ifndef ULLONG_MAX +#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL +#endif + + +void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex ) + { + int ret = pthread_mutex_init( mutex, 0 ); + if( ret != 0 ) { show_error( "pthread_mutex_init", ret ); fatal(); } + + ret = pthread_cond_init( cond, 0 ); + if( ret != 0 ) { show_error( "pthread_cond_init", ret ); fatal(); } + } + + +void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex ) + { + int ret = pthread_cond_destroy( cond ); + if( ret != 0 ) { show_error( "pthread_cond_destroy", ret ); fatal(); } + + ret = pthread_mutex_destroy( mutex ); + if( ret != 0 ) { show_error( "pthread_mutex_destroy", ret ); fatal(); } + } + + +void xlock( pthread_mutex_t * mutex ) + { + int ret = pthread_mutex_lock( mutex ); + if( ret != 0 ) { show_error( "pthread_mutex_lock", ret ); fatal(); } + } + + +void xunlock( pthread_mutex_t * mutex ) + { + int ret = pthread_mutex_unlock( mutex ); + if( ret != 0 ) { show_error( "pthread_mutex_unlock", ret ); fatal(); } + } + + +void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex ) + { + int ret = pthread_cond_wait( cond, mutex ); + if( ret != 0 ) { show_error( "pthread_cond_wait", ret ); fatal(); } + } + + +void xsignal( pthread_cond_t * cond ) + { + int ret = pthread_cond_signal( cond ); + if( ret != 0 ) { show_error( "pthread_cond_signal", ret ); fatal(); } + } + + +void xbroadcast( pthread_cond_t * cond ) + { + int ret = pthread_cond_broadcast( cond ); + if( ret != 0 ) { show_error( "pthread_cond_broadcast", ret ); fatal(); } + } + + +void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg ) + { + int ret = pthread_create( thread, 0, routine, arg ); + if( ret != 0 ) { show_error( "pthread_create", ret ); fatal(); } + } + + +void xjoin( pthread_t thread ) + { + int ret = pthread_join( thread, 0 ); + if( ret != 0 ) { show_error( "pthread_join", ret ); fatal(); } + } + + +namespace { + +long long in_size = 0; +long long out_size = 0; + + +struct Packet // data block with a serial number + { + unsigned long long id; // serial number assigned as received + int size; // # of bytes in data + uint8_t * data; + }; + + +class Packet_courier // moves packets around + { +public: + unsigned long icheck_counter; + unsigned long iwait_counter; + unsigned long ocheck_counter; + unsigned long owait_counter; +private: + unsigned long long receive_id; // id assigned to next packet received + unsigned long long deliver_id; // id of next packet to be delivered + Slot_tally slot_tally; + std::queue< Packet * > packet_queue; + std::vector< Packet * > circular_buffer; + int num_working; // Number of workers still running + const int num_slots; // max packets in circulation + pthread_mutex_t imutex; + pthread_cond_t iav_or_eof; // input packet available or splitter done + pthread_mutex_t omutex; + pthread_cond_t oav_or_exit; // output packet available or all workers exited + bool eof; // splitter done + +public: + Packet_courier( const int num_workers, const int slots ) + : icheck_counter( 0 ), iwait_counter( 0 ), + ocheck_counter( 0 ), owait_counter( 0 ), + receive_id( 0 ), deliver_id( 0 ), + slot_tally( slots ), circular_buffer( slots, (Packet *) 0 ), + num_working( num_workers ), num_slots( slots ), eof( false ) + { xinit( &iav_or_eof, &imutex ); xinit( &oav_or_exit, &omutex ); } + + ~Packet_courier() + { xdestroy( &iav_or_eof, &imutex ); xdestroy( &oav_or_exit, &omutex ); } + + // make a packet with data received from splitter + void receive_packet( uint8_t * const data, const int size ) + { + Packet * ipacket = new Packet; + ipacket->id = receive_id++; + ipacket->size = size; + ipacket->data = data; + slot_tally.get_slot(); // wait for a free slot + xlock( &imutex ); + packet_queue.push( ipacket ); + xsignal( &iav_or_eof ); + xunlock( &imutex ); + } + + // distribute a packet to a worker + Packet * distribute_packet() + { + Packet * ipacket = 0; + xlock( &imutex ); + ++icheck_counter; + while( packet_queue.empty() && !eof ) + { + ++iwait_counter; + xwait( &iav_or_eof, &imutex ); + } + if( !packet_queue.empty() ) + { + ipacket = packet_queue.front(); + packet_queue.pop(); + } + xunlock( &imutex ); + if( ipacket == 0 ) + { + // Notify muxer when last worker exits + xlock( &omutex ); + if( --num_working == 0 ) + xsignal( &oav_or_exit ); + xunlock( &omutex ); + } + return ipacket; + } + + // collect a packet from a worker + void collect_packet( Packet * const opacket ) + { + xlock( &omutex ); + // id collision shouldn't happen + assert( circular_buffer[opacket->id%num_slots] == 0 ); + // Merge packet into circular buffer + circular_buffer[opacket->id%num_slots] = opacket; + if( opacket->id == deliver_id ) xsignal( &oav_or_exit ); + xunlock( &omutex ); + } + + // deliver a packet to muxer + Packet * deliver_packet() + { + xlock( &omutex ); + ++ocheck_counter; + while( circular_buffer[deliver_id%num_slots] == 0 && num_working > 0 ) + { + ++owait_counter; + xwait( &oav_or_exit, &omutex ); + } + Packet * opacket = circular_buffer[deliver_id%num_slots]; + circular_buffer[deliver_id%num_slots] = 0; + ++deliver_id; + xunlock( &omutex ); + if( opacket != 0 ) + slot_tally.leave_slot(); // return a slot to the tally + return opacket; + } + + void finish() // splitter has no more packets to send + { + xlock( &imutex ); + eof = true; + xbroadcast( &iav_or_eof ); + xunlock( &imutex ); + } + + bool finished() // all packets delivered to muxer + { + if( !slot_tally.all_free() || !eof || !packet_queue.empty() || + num_working != 0 ) return false; + for( int i = 0; i < num_slots; ++i ) + if( circular_buffer[i] != 0 ) return false; + return true; + } + + const Slot_tally & tally() const { return slot_tally; } + }; + + +struct Splitter_arg + { + Packet_courier * courier; + int infd; + int data_size; + }; + + + // split data from input file into chunks and pass them to + // courier for packaging and distribution to workers. +void * splitter( void * arg ) + { + const Splitter_arg & tmp = *(Splitter_arg *)arg; + Packet_courier & courier = *tmp.courier; + const int infd = tmp.infd; + const int data_size = tmp.data_size; + + for( bool first_post = true; ; first_post = false ) + { + uint8_t * data = new( std::nothrow ) uint8_t[data_size]; + if( data == 0 ) { show_error( "not enough memory" ); fatal(); } + const int size = readblock( infd, data, data_size ); + if( size != data_size && errno ) { show_error( "read", errno ); fatal(); } + + if( size > 0 || first_post ) // first packet can be empty + { + in_size += size; + courier.receive_packet( data, size ); + } + else + { + delete[] data; + courier.finish(); // no more packets to send + break; + } + } + return 0; + } + + +struct Worker_arg + { + int dictionary_size; + int match_len_limit; + Packet_courier * courier; + }; + + + // get packets from courier, replace their contents, and return + // them to courier. +void * worker( void * arg ) + { + const Worker_arg & tmp = *(Worker_arg *)arg; + const int dictionary_size = tmp.dictionary_size; + const int match_len_limit = tmp.match_len_limit; + Packet_courier & courier = *tmp.courier; + + while( true ) + { + Packet * packet = courier.distribute_packet(); + if( packet == 0 ) break; // no more packets to process + + const int compr_size = 42 + packet->size + ( ( packet->size + 7 ) / 8 ); + uint8_t * const new_data = new( std::nothrow ) uint8_t[compr_size]; + if( new_data == 0 ) { show_error( "not enough memory" ); fatal(); } + const int dict_size = std::max( LZ_min_dictionary_size(), + std::min( dictionary_size, packet->size ) ); + LZ_Encoder * const encoder = + LZ_compress_open( dict_size, match_len_limit, LLONG_MAX ); + if( !encoder || LZ_compress_errno( encoder ) != LZ_ok ) + { show_error( "LZ_compress_open failed." ); fatal(); } + + int written = 0; + int new_size = 0; + while( true ) + { + if( LZ_compress_write_size( encoder ) > 0 ) + { + if( written < packet->size ) + { + const int wr = LZ_compress_write( encoder, packet->data + written, + packet->size - written ); + if( wr < 0 ) { show_error( "LZ_compress_write failed." ); fatal(); } + written += wr; + } + if( written >= packet->size ) LZ_compress_finish( encoder ); + } + const int rd = LZ_compress_read( encoder, new_data + new_size, + compr_size - new_size ); + if( rd < 0 ) { show_error( "LZ_compress_read failed." ); fatal(); } + new_size += rd; + assert( new_size <= compr_size ); + if( LZ_compress_finished( encoder ) == 1 ) break; + } + + if( LZ_compress_close( encoder ) < 0 ) + { show_error( "LZ_compress_close failed." ); fatal(); } + + delete[] packet->data; + packet->size = new_size; + packet->data = new_data; + courier.collect_packet( packet ); + } + return 0; + } + + + // get from courier the processed and sorted packets, and write + // their contents to the output file. +void muxer( Packet_courier & courier, const int outfd ) + { + while( true ) + { + Packet * opacket = courier.deliver_packet(); + if( opacket == 0 ) break; // queue is empty. all workers exited + + out_size += opacket->size; + + if( outfd >= 0 ) + { + const int wr = writeblock( outfd, opacket->data, opacket->size ); + if( wr != opacket->size ) + { show_error( "write", errno ); fatal(); } + } + delete[] opacket->data; + delete opacket; + } + } + +} // end namespace + + + // init the courier, then start the splitter and the workers and + // call the muxer. +int compress( const int data_size, const int dictionary_size, + const int match_len_limit, const int num_workers, + const int num_slots, const int infd, const int outfd, + const int debug_level ) + { + in_size = 0; + out_size = 0; + Packet_courier courier( num_workers, num_slots ); + + Splitter_arg splitter_arg; + splitter_arg.courier = &courier; + splitter_arg.infd = infd; + splitter_arg.data_size = data_size; + + pthread_t splitter_thread; + xcreate( &splitter_thread, splitter, &splitter_arg ); + + Worker_arg worker_arg; + worker_arg.dictionary_size = dictionary_size; + worker_arg.match_len_limit = match_len_limit; + worker_arg.courier = &courier; + + pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; + if( worker_threads == 0 ) + { show_error( "not enough memory" ); fatal(); } + for( int i = 0; i < num_workers; ++i ) + xcreate( &worker_threads[i], worker, &worker_arg ); + + muxer( courier, outfd ); + + for( int i = num_workers - 1; i >= 0; --i ) + xjoin(worker_threads[i]); + delete[] worker_threads; worker_threads = 0; + + xjoin( splitter_thread ); + + if( verbosity >= 1 ) + { + if( in_size <= 0 || out_size <= 0 ) + std::fprintf( stderr, "no data compressed.\n" ); + else + std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, " + "%5.2f%% saved, %lld in, %lld out.\n", + (double)in_size / out_size, + ( 8.0 * out_size ) / in_size, + 100.0 * ( 1.0 - ( (double)out_size / in_size ) ), + in_size, out_size ); + } + + if( debug_level & 1 ) + std::fprintf( stderr, + "splitter tried to send a packet %8lu times\n" + "splitter had to wait %8lu times\n" + "any worker tried to consume from splitter %8lu times\n" + "any worker had to wait %8lu times\n" + "muxer tried to consume from workers %8lu times\n" + "muxer had to wait %8lu times\n", + courier.tally().check_counter, + courier.tally().wait_counter, + courier.icheck_counter, + courier.iwait_counter, + courier.ocheck_counter, + courier.owait_counter ); + + assert( courier.finished() ); + return 0; + } diff --git a/configure b/configure index 3cd1dc8..e9ccd08 100755 --- a/configure +++ b/configure @@ -1,16 +1,16 @@ #! /bin/sh -# configure script for Plzip - A parallel version of the lzip data compressor +# configure script for Plzip - A parallel compressor compatible with lzip # Copyright (C) 2009, 2010 Antonio Diaz Diaz. # # This configure script is free software: you have unlimited permission # to copy, distribute and modify it. # -# Date of this version: 2010-01-24 +# Date of this version: 2010-01-31 args= no_create= pkgname=plzip -pkgversion=0.3 +pkgversion=0.4 progname=plzip srctrigger=plzip.h @@ -135,7 +135,7 @@ if [ -z "${CXX}" ] ; then # Let the user override the test. fi echo -if [ -z ${no_create} ] ; then +if [ -z "${no_create}" ] ; then echo "creating config.status" rm -f config.status cat > config.status << EOF @@ -166,7 +166,7 @@ echo "CXXFLAGS = ${CXXFLAGS}" echo "LDFLAGS = ${LDFLAGS}" rm -f Makefile cat > Makefile << EOF -# Makefile for Plzip - A parallel version of the lzip data compressor +# Makefile for Plzip - A parallel compressor compatible with lzip # Copyright (C) 2009, 2010 Antonio Diaz Diaz. # This file was generated automatically by configure. Do not edit. # diff --git a/decompress.cc b/decompress.cc new file mode 100644 index 0000000..0a125d0 --- /dev/null +++ b/decompress.cc @@ -0,0 +1,123 @@ +/* Plzip - A parallel compressor compatible with lzip + Copyright (C) 2009 Laszlo Ersek. + Copyright (C) 2009, 2010 Antonio Diaz Diaz. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#define _FILE_OFFSET_BITS 64 + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "plzip.h" + + +namespace { + +int do_decompress( LZ_Decoder * const decoder, const int infd, const int outfd, + const Pretty_print & pp, const bool testing ) + { + const int in_buffer_size = 65536, out_buffer_size = 8 * in_buffer_size; + uint8_t in_buffer[in_buffer_size], out_buffer[out_buffer_size]; + + while( true ) + { + int in_size = std::min( LZ_decompress_write_size( decoder ), in_buffer_size ); + if( in_size > 0 ) + { + const int max_in_size = in_size; + in_size = readblock( infd, in_buffer, max_in_size ); + if( in_size != max_in_size && errno ) + { pp(); show_error( "read error", errno ); return 1; } + if( in_size == 0 ) LZ_decompress_finish( decoder ); + else if( in_size != LZ_decompress_write( decoder, in_buffer, in_size ) ) + internal_error( "library error (LZ_decompress_write)" ); + } + int out_size = LZ_decompress_read( decoder, out_buffer, out_buffer_size ); +// std::fprintf( stderr, "%5d in_size, %6d out_size.\n", in_size, out_size ); + if( out_size < 0 ) + { + const LZ_Errno lz_errno = LZ_decompress_errno( decoder ); + if( lz_errno == LZ_header_error ) + { + if( LZ_decompress_total_out_size( decoder ) > 0 ) + break; // trailing garbage + pp( "error reading member header" ); + return 1; + } + if( lz_errno == LZ_mem_error ) + { + pp( "not enough memory. Find a machine with more memory" ); + return 1; + } + pp(); + if( lz_errno == LZ_unexpected_eof ) + { + if( verbosity >= 0 ) + std::fprintf( stderr, "file ends unexpectedly at pos %lld\n", + LZ_decompress_total_in_size( decoder ) ); + return 2; + } + if( verbosity >= 0 ) + std::fprintf( stderr, "LZ_decompress_read error: %s.\n", + LZ_strerror( LZ_decompress_errno( decoder ) ) ); + return 1; + } + else if( out_size > 0 && outfd >= 0 ) + { + const int wr = writeblock( outfd, out_buffer, out_size ); + if( wr != out_size ) + { pp(); show_error( "write error", errno ); return 1; } + } + if( LZ_decompress_finished( decoder ) == 1 ) break; + if( in_size == 0 && out_size == 0 ) + internal_error( "library error (LZ_decompress_read)" ); + } + if( verbosity >= 1 ) + { if( testing ) std::fprintf( stderr, "ok\n" ); + else std::fprintf( stderr, "done\n" ); } + return 0; + } + +} // end namespace + + +int decompress( const int infd, const int outfd, const Pretty_print & pp, + const bool testing ) + { + LZ_Decoder * const decoder = LZ_decompress_open(); + int retval; + + if( !decoder || LZ_decompress_errno( decoder ) != LZ_ok ) + { + pp( "not enough memory" ); + retval = 1; + } + else retval = do_decompress( decoder, infd, outfd, pp, testing ); + LZ_decompress_close( decoder ); + return retval; + } diff --git a/doc/plzip.1 b/doc/plzip.1 index feb0aa1..148c95b 100644 --- a/doc/plzip.1 +++ b/doc/plzip.1 @@ -1,12 +1,12 @@ .\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.36. -.TH PLZIP "1" "January 2010" "Plzip 0.3" "User Commands" +.TH PLZIP "1" "January 2010" "Plzip 0.4" "User Commands" .SH NAME Plzip \- data compressor based on the LZMA algorithm .SH SYNOPSIS .B plzip [\fIoptions\fR] [\fIfiles\fR] .SH DESCRIPTION -Plzip \- A parallel version of the lzip data compressor. +Plzip \- A parallel compressor compatible with lzip. .SH OPTIONS .TP \fB\-h\fR, \fB\-\-help\fR @@ -71,6 +71,7 @@ Plzip home page: http://www.nongnu.org/lzip/plzip.html Copyright \(co 2009 Laszlo Ersek. .br Copyright \(co 2010 Antonio Diaz Diaz. +Using Lzlib 0.9-rc1 License GPLv3+: GNU GPL version 3 or later .br This is free software: you are free to change and redistribute it. diff --git a/doc/plzip.info b/doc/plzip.info index 9a48fcb..c4dc967 100644 --- a/doc/plzip.info +++ b/doc/plzip.info @@ -3,7 +3,7 @@ plzip.texinfo. INFO-DIR-SECTION Data Compression START-INFO-DIR-ENTRY -* Plzip: (plzip). Parallel version of the lzip data compressor +* Plzip: (plzip). Parallel compressor compatible with lzip END-INFO-DIR-ENTRY  @@ -12,7 +12,7 @@ File: plzip.info, Node: Top, Next: Introduction, Up: (dir) Plzip Manual ************ -This manual is for Plzip (version 0.3, 24 January 2010). +This manual is for Plzip (version 0.4, 31 January 2010). * Menu: @@ -34,11 +34,13 @@ File: plzip.info, Node: Introduction, Next: Invoking Plzip, Prev: Top, Up: T 1 Introduction ************** -Plzip is a parallel version of the lzip data compressor. The files -produced by plzip are fully compatible with lzip-1.4 or newer. Plzip is -intended for faster compression/decompression of big files on -multiprocessor machines. Currently only compression is performed in -parallel. Parallel decompression is planned to be implemented later. +Plzip is a massively parallel (multithreaded) data compressor compatible +with the lzip file format. The files produced by plzip are fully +compatible with lzip-1.4 or newer. Plzip is intended for faster +compression/decompression of big files on multiprocessor machines. On +files big enough, plzip can use hundreds of processors. Currently only +compression is performed in parallel. Parallel decompression is planned +to be implemented later. Lzip is a lossless data compressor based on the LZMA algorithm, with very safe integrity checking and a user interface similar to the one of @@ -303,11 +305,11 @@ Concept Index  Tag Table: -Node: Top227 -Node: Introduction750 -Node: Invoking Plzip3571 -Node: File Format7260 -Node: Problems9216 -Node: Concept Index9745 +Node: Top223 +Node: Introduction746 +Node: Invoking Plzip3669 +Node: File Format7358 +Node: Problems9314 +Node: Concept Index9843  End Tag Table diff --git a/doc/plzip.texinfo b/doc/plzip.texinfo index 04bf822..12ac4c8 100644 --- a/doc/plzip.texinfo +++ b/doc/plzip.texinfo @@ -5,12 +5,12 @@ @finalout @c %**end of header -@set UPDATED 24 January 2010 -@set VERSION 0.3 +@set UPDATED 31 January 2010 +@set VERSION 0.4 @dircategory Data Compression @direntry -* Plzip: (plzip). Parallel version of the lzip data compressor +* Plzip: (plzip). Parallel compressor compatible with lzip @end direntry @@ -50,11 +50,13 @@ to copy, distribute and modify it. @chapter Introduction @cindex introduction -Plzip is a parallel version of the lzip data compressor. The files -produced by plzip are fully compatible with lzip-1.4 or newer. Plzip is -intended for faster compression/decompression of big files on -multiprocessor machines. Currently only compression is performed in -parallel. Parallel decompression is planned to be implemented later. +Plzip is a massively parallel (multithreaded) data compressor compatible +with the lzip file format. The files produced by plzip are fully +compatible with lzip-1.4 or newer. Plzip is intended for faster +compression/decompression of big files on multiprocessor machines. On +files big enough, plzip can use hundreds of processors. Currently only +compression is performed in parallel. Parallel decompression is planned +to be implemented later. Lzip is a lossless data compressor based on the LZMA algorithm, with very safe integrity checking and a user interface similar to the one of diff --git a/main.cc b/main.cc index f3f12e0..db70e00 100644 --- a/main.cc +++ b/main.cc @@ -1,4 +1,4 @@ -/* Plzip - A parallel version of the lzip data compressor +/* Plzip - A parallel compressor compatible with lzip Copyright (C) 2009 Laszlo Ersek. Copyright (C) 2009, 2010 Antonio Diaz Diaz. @@ -58,8 +58,6 @@ #define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL #endif -void internal_error( const char * msg ); - namespace { @@ -93,44 +91,10 @@ bool delete_output_on_interrupt = false; pthread_t main_thread; pid_t main_thread_pid; -class Pretty_print - { - const char * const stdin_name; - const unsigned int stdin_name_len; - unsigned int longest_name; - std::string name_; - mutable bool first_post; - -public: - Pretty_print( const std::vector< std::string > & filenames ) - : stdin_name( "(stdin)" ), stdin_name_len( std::strlen( stdin_name ) ), - longest_name( 0 ), first_post( false ) - { - for( unsigned int i = 0; i < filenames.size(); ++i ) - { - const std::string & s = filenames[i]; - const unsigned int len = ( ( s == "-" ) ? stdin_name_len : s.size() ); - if( len > longest_name ) longest_name = len; - } - if( longest_name == 0 ) longest_name = stdin_name_len; - } - - void set_name( const std::string & filename ) - { - if( filename.size() && filename != "-" ) name_ = filename; - else name_ = stdin_name; - first_post = true; - } - - void reset() const throw() { if( name_.size() ) first_post = true; } - const char * name() const throw() { return name_.c_str(); } - void operator()( const char * const msg = 0 ) const throw(); - }; - void show_help() throw() { - std::printf( "%s - A parallel version of the lzip data compressor.\n", Program_name ); + std::printf( "%s - A parallel compressor compatible with lzip.\n", Program_name ); std::printf( "\nUsage: %s [options] [files]\n", invocation_name ); std::printf( "\nOptions:\n" ); std::printf( " -h, --help display this help and exit\n" ); @@ -154,7 +118,7 @@ void show_help() throw() std::printf( " --best alias for -9\n" ); if( verbosity > 0 ) { - std::printf( " -D, --debug= (0-3) print debug statistics to stderr\n" ); + std::printf( " -D, --debug= (0-1) print debug statistics to stderr\n" ); } std::printf( "If no file names are given, %s compresses or decompresses\n", program_name ); std::printf( "from standard input to standard output.\n" ); @@ -170,6 +134,7 @@ void show_version() throw() std::printf( "%s %s\n", Program_name, PROGVERSION ); std::printf( "Copyright (C) 2009 Laszlo Ersek.\n" ); std::printf( "Copyright (C) %s Antonio Diaz Diaz.\n", program_year ); + std::printf( "Using Lzlib %s\n", LZ_version() ); std::printf( "License GPLv3+: GNU GPL version 3 or later \n" ); std::printf( "This is free software: you are free to change and redistribute it.\n" ); std::printf( "There is NO WARRANTY, to the extent permitted by law.\n" ); @@ -413,89 +378,6 @@ void close_and_set_permissions( const struct stat * const in_statsp ) } -int do_decompress( LZ_Decoder * const decoder, const int inhandle, - const Pretty_print & pp, const bool testing ) - { - const int in_buffer_size = 65536, out_buffer_size = 8 * in_buffer_size; - uint8_t in_buffer[in_buffer_size], out_buffer[out_buffer_size]; - - while( true ) - { - int in_size = std::min( LZ_decompress_write_size( decoder ), in_buffer_size ); - if( in_size > 0 ) - { - const int max_in_size = in_size; - in_size = readblock( inhandle, in_buffer, max_in_size ); - if( in_size != max_in_size && errno ) - { pp(); show_error( "read error", errno ); return 1; } - if( in_size == 0 ) LZ_decompress_finish( decoder ); - else if( in_size != LZ_decompress_write( decoder, in_buffer, in_size ) ) - internal_error( "library error (LZ_decompress_write)" ); - } - int out_size = LZ_decompress_read( decoder, out_buffer, out_buffer_size ); -// std::fprintf( stderr, "%5d in_size, %6d out_size.\n", in_size, out_size ); - if( out_size < 0 ) - { - const LZ_Errno lz_errno = LZ_decompress_errno( decoder ); - if( lz_errno == LZ_header_error ) - { - if( LZ_decompress_total_out_size( decoder ) > 0 ) - break; // trailing garbage - pp( "error reading member header" ); - return 1; - } - if( lz_errno == LZ_mem_error ) - { - pp( "not enough memory. Find a machine with more memory" ); - return 1; - } - pp(); - if( lz_errno == LZ_unexpected_eof ) - { - if( verbosity >= 0 ) - std::fprintf( stderr, "file ends unexpectedly at pos %lld\n", - LZ_decompress_total_in_size( decoder ) ); - return 2; - } - if( verbosity >= 0 ) - std::fprintf( stderr, "LZ_decompress_read error: %s.\n", - LZ_strerror( LZ_decompress_errno( decoder ) ) ); - return 1; - } - else if( out_size > 0 && outhandle >= 0 ) - { - const int wr = writeblock( outhandle, out_buffer, out_size ); - if( wr != out_size ) - { pp(); show_error( "write error", errno ); return 1; } - } - if( LZ_decompress_finished( decoder ) == 1 ) break; - if( in_size == 0 && out_size == 0 ) - internal_error( "library error (LZ_decompress_read)" ); - } - if( verbosity >= 1 ) - { if( testing ) std::fprintf( stderr, "ok\n" ); - else std::fprintf( stderr, "done\n" ); } - return 0; - } - - -int decompress( const int inhandle, const Pretty_print & pp, - const bool testing ) - { - LZ_Decoder * const decoder = LZ_decompress_open(); - int retval; - - if( !decoder || LZ_decompress_errno( decoder ) != LZ_ok ) - { - pp( "not enough memory" ); - retval = 1; - } - else retval = do_decompress( decoder, inhandle, pp, testing ); - LZ_decompress_close( decoder ); - return retval; - } - - extern "C" void signal_handler( int sig ) throw() { if( !pthread_equal( pthread_self(), main_thread ) ) @@ -810,7 +692,7 @@ int main( const int argc, const char * argv[] ) encoder_options.match_len_limit, num_workers, num_slots, inhandle, outhandle, debug_level ); else - tmp = decompress( inhandle, pp, program_mode == m_test ); + tmp = decompress( inhandle, outhandle, pp, program_mode == m_test ); if( tmp > retval ) retval = tmp; if( tmp && program_mode != m_test ) cleanup_and_fail( retval ); diff --git a/plzip.cc b/plzip.cc deleted file mode 100644 index dcae860..0000000 --- a/plzip.cc +++ /dev/null @@ -1,548 +0,0 @@ -/* Plzip - A parallel version of the lzip data compressor - Copyright (C) 2009 Laszlo Ersek. - Copyright (C) 2009, 2010 Antonio Diaz Diaz. - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see . -*/ - -#define _FILE_OFFSET_BITS 64 - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "plzip.h" - -#ifndef LLONG_MAX -#define LLONG_MAX 0x7FFFFFFFFFFFFFFFLL -#endif -#ifndef LLONG_MIN -#define LLONG_MIN (-LLONG_MAX - 1LL) -#endif -#ifndef ULLONG_MAX -#define ULLONG_MAX 0xFFFFFFFFFFFFFFFFULL -#endif - - -namespace { - -long long in_size = 0; -long long out_size = 0; - -void *(*mallocf)( size_t size ); -void (*freef)( void *ptr ); - - -void * trace_malloc( size_t size ) - { - int save_errno = 0; - - void * ret = malloc( size ); - if( ret == 0 ) save_errno = errno; - std::fprintf( stderr, "malloc(%lu) == %p\n", (unsigned long)size, ret ); - if( ret == 0 ) errno = save_errno; - return ret; - } - - -void trace_free( void *ptr ) - { - std::fprintf( stderr, "free(%p)\n", ptr ); - free( ptr ); - } - - -void * xalloc( size_t size ) - { - void *ret = (*mallocf)( size ); - if( ret == 0 ) { show_error( "not enough memory", errno ); fatal(); } - return ret; - } - - -void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex ) - { - int ret = pthread_mutex_init( mutex, 0 ); - if( ret != 0 ) { show_error( "pthread_mutex_init", ret ); fatal(); } - - ret = pthread_cond_init( cond, 0 ); - if( ret != 0 ) { show_error( "pthread_cond_init", ret ); fatal(); } - } - - -void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex ) - { - int ret = pthread_cond_destroy( cond ); - if( ret != 0 ) { show_error( "pthread_cond_destroy", ret ); fatal(); } - - ret = pthread_mutex_destroy( mutex ); - if( ret != 0 ) { show_error( "pthread_mutex_destroy", ret ); fatal(); } - } - - -void xlock( pthread_mutex_t * mutex ) - { - int ret = pthread_mutex_lock( mutex ); - if( ret != 0 ) { show_error( "pthread_mutex_lock", ret ); fatal(); } - } - - -void xunlock( pthread_mutex_t * mutex ) - { - int ret = pthread_mutex_unlock( mutex ); - if( ret != 0 ) { show_error( "pthread_mutex_unlock", ret ); fatal(); } - } - - -void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex ) - { - int ret = pthread_cond_wait( cond, mutex ); - if( ret != 0 ) { show_error( "pthread_cond_wait", ret ); fatal(); } - } - - -void xsignal( pthread_cond_t * cond ) - { - int ret = pthread_cond_signal( cond ); - if( ret != 0 ) { show_error( "pthread_cond_signal", ret ); fatal(); } - } - - -void xbroadcast( pthread_cond_t * cond ) - { - int ret = pthread_cond_broadcast( cond ); - if( ret != 0 ) { show_error( "pthread_cond_broadcast", ret ); fatal(); } - } - - -void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg ) - { - int ret = pthread_create( thread, 0, routine, arg ); - if( ret != 0 ) { show_error( "pthread_create", ret ); fatal(); } - } - - -void xjoin( pthread_t thread ) - { - int ret = pthread_join( thread, 0 ); - if( ret != 0 ) { show_error( "pthread_join", ret ); fatal(); } - } - - -struct Slot_tally // Synchronizes splitter to muxer - { - unsigned long check_counter; - unsigned long wait_counter; - int num_free; // Number of free slots - pthread_mutex_t mutex; - pthread_cond_t slot_av; // Free slot available - - Slot_tally( const int slots ) - : check_counter( 0 ), wait_counter( 0 ), num_free( slots ) - { xinit( &slot_av, &mutex ); } - - ~Slot_tally() { xdestroy( &slot_av, &mutex ); } - }; - - -struct S2w_blk // Splitter to worker data block - { - unsigned long long id; // Block serial number as read from infd - S2w_blk *next; // Next in queue - int loaded; // # of bytes in plain, may be 0 for 1st - uint8_t plain[1]; // Data read from infd, allocated: data_size - }; - - -struct S2w_queue - { - S2w_blk * head; // Next ready worker shall compress this - S2w_blk * tail; // Splitter will append here - unsigned long check_counter; - unsigned long wait_counter; - pthread_mutex_t mutex; - pthread_cond_t av_or_eof; // New block available or splitter done - bool eof; // Splitter done - - S2w_queue() - : head( 0 ), tail( 0 ), check_counter( 0 ), wait_counter( 0 ), eof( false ) - { xinit( &av_or_eof, &mutex ); } - - ~S2w_queue() { xdestroy( &av_or_eof, &mutex ); } - }; - - -struct W2m_blk // Worker to muxer data block - { - unsigned long long id; // Block index as read from infd - W2m_blk *next; // Next block in list (unordered) - int produced; // Number of bytes in compr - uint8_t compr[1]; // Data to write to outfd, alloc.: compr_size - }; - - -struct W2m_queue - { - unsigned long long needed_id; // Block needed for resuming writing - W2m_blk *head; // Block list (unordered) - unsigned long check_counter; - unsigned long wait_counter; - int num_working; // Number of workers still running - pthread_mutex_t mutex; - pthread_cond_t av_or_exit; // New block available or all workers exited - - W2m_queue( const int num_workers ) - : needed_id( 0 ), head( 0 ), check_counter( 0 ), wait_counter( 0 ), - num_working( num_workers ) - { xinit( &av_or_exit, &mutex ); } - - ~W2m_queue() { xdestroy( &av_or_exit, &mutex ); } - }; - - -struct Splitter_arg - { - Slot_tally * slot_tally; - S2w_queue * s2w_queue; - int infd; - int data_size; - int s2w_blk_size; - }; - - -void * splitter( void * arg ) - { - const Splitter_arg & tmp = *(Splitter_arg *)arg; - Slot_tally & slot_tally = *tmp.slot_tally; - S2w_queue & s2w_queue = *tmp.s2w_queue; - const int infd = tmp.infd; - const int data_size = tmp.data_size; - const int s2w_blk_size = tmp.s2w_blk_size; - - for( unsigned long long id = 0; ; ++id ) - { - S2w_blk * s2w_blk = (S2w_blk *)xalloc( s2w_blk_size ); - - // Fill block - const int rd = readblock( infd, s2w_blk->plain, data_size ); - if( rd != data_size && errno ) { show_error( "read", errno ); fatal(); } - - if( rd > 0 || id == 0 ) // first block can be empty - { - s2w_blk->id = id; - s2w_blk->next = 0; - s2w_blk->loaded = rd; - in_size += rd; - xlock( &slot_tally.mutex ); // Grab a free slot - ++slot_tally.check_counter; - while( slot_tally.num_free == 0 ) - { - ++slot_tally.wait_counter; - xwait( &slot_tally.slot_av, &slot_tally.mutex ); - } - --slot_tally.num_free; - xunlock( &slot_tally.mutex ); - } - else - { (*freef)( s2w_blk ); s2w_blk = 0; } - - xlock( &s2w_queue.mutex ); - if( s2w_blk != 0 ) - { - if( s2w_queue.tail == 0 ) s2w_queue.head = s2w_blk; - else s2w_queue.tail->next = s2w_blk; - s2w_queue.tail = s2w_blk; - xsignal( &s2w_queue.av_or_eof ); - } - else - { - s2w_queue.eof = true; - xbroadcast( &s2w_queue.av_or_eof ); - } - xunlock( &s2w_queue.mutex ); - - if( s2w_blk == 0 ) break; - } - return 0; - } - - -void work_compr( const int dictionary_size, const int match_len_limit, - const S2w_blk & s2w_blk, W2m_queue & w2m_queue, - const int compr_size, const int w2m_blk_size ) - { - assert( s2w_blk.loaded > 0 || s2w_blk.id == 0 ); - - W2m_blk * w2m_blk = (W2m_blk *)xalloc( w2m_blk_size ); - - const int dict_size = std::max( LZ_min_dictionary_size(), - std::min( dictionary_size, s2w_blk.loaded ) ); - LZ_Encoder * const encoder = - LZ_compress_open( dict_size, match_len_limit, LLONG_MAX ); - if( !encoder || LZ_compress_errno( encoder ) != LZ_ok ) - { show_error( "LZ_compress_open failed." ); fatal(); } - - int written = 0; - w2m_blk->produced = 0; - while( true ) - { - if( LZ_compress_write_size( encoder ) > 0 ) - { - if( written < s2w_blk.loaded ) - { - const int wr = LZ_compress_write( encoder, s2w_blk.plain + written, - s2w_blk.loaded - written ); - if( wr < 0 ) { show_error( "LZ_compress_write failed." ); fatal(); } - written += wr; - } - if( written >= s2w_blk.loaded ) LZ_compress_finish( encoder ); - } - assert( w2m_blk->produced < compr_size ); - const int rd = LZ_compress_read( encoder, w2m_blk->compr + w2m_blk->produced, - compr_size - w2m_blk->produced ); - if( rd < 0 ) { show_error( "LZ_compress_read failed." ); fatal(); } - w2m_blk->produced += rd; - if( LZ_compress_finished( encoder ) == 1 ) break; - } - - if( LZ_compress_close( encoder ) < 0 ) - { show_error( "LZ_compress_close failed." ); fatal(); } - - w2m_blk->id = s2w_blk.id; - - // Push block to muxer queue - xlock( &w2m_queue.mutex ); - w2m_blk->next = w2m_queue.head; - w2m_queue.head = w2m_blk; - if( w2m_blk->id == w2m_queue.needed_id ) xsignal( &w2m_queue.av_or_exit ); - xunlock( &w2m_queue.mutex ); - } - - -struct Worker_arg - { - int dictionary_size; - int match_len_limit; - S2w_queue * s2w_queue; - W2m_queue * w2m_queue; - int compr_size; - int w2m_blk_size; - }; - - -void * worker( void * arg ) - { - const Worker_arg & tmp = *(Worker_arg *)arg; - const int dictionary_size = tmp.dictionary_size; - const int match_len_limit = tmp.match_len_limit; - S2w_queue & s2w_queue = *tmp.s2w_queue; - W2m_queue & w2m_queue = *tmp.w2m_queue; - const int compr_size = tmp.compr_size; - const int w2m_blk_size = tmp.w2m_blk_size; - - while( true ) - { - S2w_blk *s2w_blk; - - // Grab a block to work on - xlock( &s2w_queue.mutex ); - ++s2w_queue.check_counter; - while( s2w_queue.head == 0 && !s2w_queue.eof ) - { - ++s2w_queue.wait_counter; - xwait( &s2w_queue.av_or_eof, &s2w_queue.mutex ); - } - if( s2w_queue.head == 0 ) // No blocks available and splitter exited - { - xunlock( &s2w_queue.mutex ); - break; - } - s2w_blk = s2w_queue.head; - s2w_queue.head = s2w_blk->next; - if( s2w_queue.head == 0 ) s2w_queue.tail = 0; - xunlock( &s2w_queue.mutex ); - - work_compr( dictionary_size, match_len_limit, *s2w_blk, w2m_queue, - compr_size, w2m_blk_size ); - (*freef)( s2w_blk ); - } - - // Notify muxer when last worker exits - xlock( &w2m_queue.mutex ); - if( --w2m_queue.num_working == 0 && w2m_queue.head == 0 ) - xsignal( &w2m_queue.av_or_exit ); - xunlock( &w2m_queue.mutex ); - return 0; - } - - -void muxer( Slot_tally & slot_tally, W2m_queue & w2m_queue, - const int num_slots, const int outfd ) - { - unsigned long long needed_id = 0; - std::vector< W2m_blk * > circular_buffer( num_slots, (W2m_blk *)0 ); - - xlock( &w2m_queue.mutex ); - while( true ) - { - // Grab all available compressed blocks in one step - ++w2m_queue.check_counter; - while( w2m_queue.head == 0 && w2m_queue.num_working > 0 ) - { - ++w2m_queue.wait_counter; - xwait( &w2m_queue.av_or_exit, &w2m_queue.mutex ); - } - if( w2m_queue.head == 0 ) break; // queue is empty. all workers exited - - W2m_blk * w2m_blk = w2m_queue.head; - w2m_queue.head = 0; - xunlock( &w2m_queue.mutex ); - - // Merge blocks fetched this time into circular buffer - do { - // id collision shouldn't happen - assert( circular_buffer[w2m_blk->id%num_slots] == 0 ); - circular_buffer[w2m_blk->id%num_slots] = w2m_blk; - W2m_blk * next = w2m_blk->next; - w2m_blk->next = 0; - w2m_blk = next; - } while( w2m_blk != 0 ); - - // Write out initial continuous sequence of reordered blocks - while( true ) - { - w2m_blk = circular_buffer[needed_id%num_slots]; - if( w2m_blk == 0 ) break; - - out_size += w2m_blk->produced; - - if( outfd >= 0 ) - { - const int wr = writeblock( outfd, w2m_blk->compr, w2m_blk->produced ); - if( wr != w2m_blk->produced ) - { show_error( "write", errno ); fatal(); } - } - circular_buffer[needed_id%num_slots] = 0; - ++needed_id; - - xlock( &slot_tally.mutex ); - if( slot_tally.num_free++ == 0 ) xsignal( &slot_tally.slot_av ); - xunlock( &slot_tally.mutex ); - - (*freef)( w2m_blk ); - } - - xlock( &w2m_queue.mutex ); - w2m_queue.needed_id = needed_id; - } - xunlock( &w2m_queue.mutex ); - - for( int i = 0; i < num_slots; ++i ) - if( circular_buffer[i] != 0 ) - { show_error( "circular buffer not empty" ); fatal(); } - } - -} // end namespace - - -int compress( const int data_size, const int dictionary_size, - const int match_len_limit, const int num_workers, - const int num_slots, const int infd, const int outfd, - const int debug_level ) - { - if( debug_level & 2 ) { mallocf = trace_malloc; freef = trace_free; } - else { mallocf = malloc; freef = free; } - - Slot_tally slot_tally( num_slots ); - S2w_queue s2w_queue; - W2m_queue w2m_queue( num_workers ); - - Splitter_arg splitter_arg; - splitter_arg.slot_tally = &slot_tally; - splitter_arg.s2w_queue = &s2w_queue; - splitter_arg.infd = infd; - splitter_arg.data_size = data_size; - splitter_arg.s2w_blk_size = sizeof (S2w_blk) + data_size - 1; - - pthread_t splitter_thread; - xcreate( &splitter_thread, splitter, &splitter_arg ); - - Worker_arg worker_arg; - worker_arg.dictionary_size = dictionary_size; - worker_arg.match_len_limit = match_len_limit; - worker_arg.s2w_queue = &s2w_queue; - worker_arg.w2m_queue = &w2m_queue; - worker_arg.compr_size = 6 + 20 + ( ( data_size / 8 ) * 9 ); - worker_arg.w2m_blk_size = sizeof (W2m_blk) + worker_arg.compr_size - 1; - - pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; - if( worker_threads == 0 ) - { show_error( "not enough memory.", errno ); fatal(); } - for( int i = 0; i < num_workers; ++i ) - xcreate( &worker_threads[i], worker, &worker_arg ); - - muxer( slot_tally, w2m_queue, num_slots, outfd ); - - for( int i = num_workers - 1; i >= 0; --i ) - xjoin(worker_threads[i]); - delete[] worker_threads; worker_threads = 0; - - xjoin( splitter_thread ); - - if( verbosity >= 1 ) - { - if( in_size <= 0 || out_size <= 0 ) - std::fprintf( stderr, "no data compressed.\n" ); - else - std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, " - "%5.2f%% saved, %lld in, %lld out.\n", - (double)in_size / out_size, - ( 8.0 * out_size ) / in_size, - 100.0 * ( 1.0 - ( (double)out_size / in_size ) ), - in_size, out_size ); - } - - const int FW = ( sizeof (unsigned long) * 8 ) / 3 + 1; - if( debug_level & 1 ) - std::fprintf( stderr, - "any worker tried to consume from splitter: %*lu\n" - "any worker stalled : %*lu\n" - "muxer tried to consume from workers : %*lu\n" - "muxer stalled : %*lu\n" - "splitter tried to fill a block : %*lu\n" - "splitter stalled : %*lu\n", - FW, s2w_queue.check_counter, - FW, s2w_queue.wait_counter, - FW, w2m_queue.check_counter, - FW, w2m_queue.wait_counter, - FW, slot_tally.check_counter, - FW, slot_tally.wait_counter ); - - assert( slot_tally.num_free == num_slots ); - assert( s2w_queue.eof ); - assert( s2w_queue.head == 0 ); - assert( s2w_queue.tail == 0 ); - assert( w2m_queue.num_working == 0 ); - assert( w2m_queue.head == 0 ); - return 0; - } diff --git a/plzip.h b/plzip.h index d7bb760..6615fc1 100644 --- a/plzip.h +++ b/plzip.h @@ -1,4 +1,4 @@ -/* Plzip - A parallel version of the lzip data compressor +/* Plzip - A parallel compressor compatible with lzip Copyright (C) 2009 Laszlo Ersek. Copyright (C) 2009, 2010 Antonio Diaz Diaz. @@ -16,19 +16,113 @@ along with this program. If not, see . */ +class Pretty_print + { + const char * const stdin_name; + const unsigned int stdin_name_len; + unsigned int longest_name; + std::string name_; + mutable bool first_post; + +public: + Pretty_print( const std::vector< std::string > & filenames ) + : stdin_name( "(stdin)" ), stdin_name_len( std::strlen( stdin_name ) ), + longest_name( 0 ), first_post( false ) + { + for( unsigned int i = 0; i < filenames.size(); ++i ) + { + const std::string & s = filenames[i]; + const unsigned int len = ( ( s == "-" ) ? stdin_name_len : s.size() ); + if( len > longest_name ) longest_name = len; + } + if( longest_name == 0 ) longest_name = stdin_name_len; + } + + void set_name( const std::string & filename ) + { + if( filename.size() && filename != "-" ) name_ = filename; + else name_ = stdin_name; + first_post = true; + } + + void reset() const throw() { if( name_.size() ) first_post = true; } + const char * name() const throw() { return name_.c_str(); } + void operator()( const char * const msg = 0 ) const throw(); + }; + + +/*--------------------- Defined in compress.cc ---------------------*/ + +void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex ); +void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex ); +void xlock( pthread_mutex_t * mutex ); +void xunlock( pthread_mutex_t * mutex ); +void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex ); +void xsignal( pthread_cond_t * cond ); +void xbroadcast( pthread_cond_t * cond ); +void xcreate( pthread_t *thread, void *(*routine)(void *), void *arg ); +void xjoin( pthread_t thread ); + + +class Slot_tally + { +public: + unsigned long check_counter; + unsigned long wait_counter; +private: + const int num_slots; // total slots + int num_free; // remaining free slots + pthread_mutex_t mutex; + pthread_cond_t slot_av; // free slot available + +public: + Slot_tally( const int slots ) + : check_counter( 0 ), wait_counter( 0 ), + num_slots( slots ), num_free( slots ) + { xinit( &slot_av, &mutex ); } + + ~Slot_tally() { xdestroy( &slot_av, &mutex ); } + + bool all_free() { return ( num_free == num_slots ); } + + void get_slot() // wait for a free slot + { + xlock( &mutex ); + ++check_counter; + while( num_free == 0 ) + { ++wait_counter; xwait( &slot_av, &mutex ); } + --num_free; + xunlock( &mutex ); + } + + void leave_slot() // return a slot to the tally + { + xlock( &mutex ); + if( num_free++ == 0 ) xsignal( &slot_av ); + xunlock( &mutex ); + } + }; + + int compress( const int data_size, const int dictionary_size, const int match_len_limit, const int num_workers, const int num_slots, const int infd, const int outfd, const int debug_level ); +/*-------------------- Defined in decompress.cc --------------------*/ + +int decompress( const int infd, const int outfd, const Pretty_print & pp, + const bool testing ); + + /*----------------------- Defined in main.cc -----------------------*/ +extern int verbosity; + void show_error( const char * msg, const int errcode = 0, const bool help = false ) throw(); +void internal_error( const char * msg ); int readblock( const int fd, uint8_t * buf, const int size ) throw(); int writeblock( const int fd, const uint8_t * buf, const int size ) throw(); - void fatal(); // Terminate the process - -extern int verbosity; diff --git a/testsuite/check.sh b/testsuite/check.sh index 409e563..f71faca 100755 --- a/testsuite/check.sh +++ b/testsuite/check.sh @@ -1,5 +1,5 @@ #! /bin/sh -# check script for Plzip - A parallel version of the lzip data compressor +# check script for Plzip - A parallel compressor compatible with lzip # Copyright (C) 2009, 2010 Antonio Diaz Diaz. # # This script is free software: you have unlimited permission -- cgit v1.2.3