diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 09:43:15 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 09:43:15 +0000 |
commit | 4fd40781fe3edc4ae5895e3cfddfbfea328ac0e8 (patch) | |
tree | efb19f2d9385f548990904757cdf44f3b9546fca /src/parsort | |
parent | Initial commit. (diff) | |
download | parallel-4fd40781fe3edc4ae5895e3cfddfbfea328ac0e8.tar.xz parallel-4fd40781fe3edc4ae5895e3cfddfbfea328ac0e8.zip |
Adding upstream version 20240222+ds.upstream/20240222+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/parsort')
-rwxr-xr-x | src/parsort | 459 |
1 files changed, 459 insertions, 0 deletions
diff --git a/src/parsort b/src/parsort new file mode 100755 index 0000000..41a75da --- /dev/null +++ b/src/parsort @@ -0,0 +1,459 @@ +#!/usr/bin/perl + +# SPDX-FileCopyrightText: 2021-2024 Ole Tange, http://ole.tange.dk and Free Software and Foundation, Inc. +# SPDX-License-Identifier: GPL-3.0-or-later + +=pod + +=head1 NAME + +parsort - Sort (big files) in parallel + + +=head1 SYNOPSIS + +B<parsort> I<options for sort> + + +=head1 DESCRIPTION + +B<parsort> uses GNU B<sort> to sort in parallel. It works just like +B<sort> but faster on inputs with more than 1 M lines, if you have a +multicore machine. + +Hopefully these ideas will make it into GNU B<sort> in the future. + + +=head1 OPTIONS + +Same as B<sort>. Except: + +=over 4 + +=item B<--parallel=>I<N> + +Change the number of sorts run concurrently to I<N>. I<N> will be +increased to number of files if B<parsort> is given more than I<N> +files. + +=back + + +=head1 EXAMPLE + +Sort files: + + parsort *.txt > sorted.txt + +Sort stdin (standard input) numerically: + + cat numbers | parsort -n > sorted.txt + + +=head1 PERFORMANCE + +B<parsort> is faster on files than on stdin (standard input), because +different parts of a file can be read in parallel. + +On a 48 core machine you should see a speedup of 3x over B<sort>. + + +=head1 AUTHOR + +Copyright (C) 2020-2024 Ole Tange, +http://ole.tange.dk and Free Software Foundation, Inc. + + +=head1 LICENSE + +Copyright (C) 2012 Free Software Foundation, Inc. + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 3 of the License, or +at your option any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see <http://www.gnu.org/licenses/>. + + +=head1 DEPENDENCIES + +B<parsort> uses B<sort>, B<bash>, and B<parallel>. + + +=head1 SEE ALSO + +B<sort> + + +=cut + +use strict; +use Getopt::Long; +use POSIX qw(mkfifo); + +Getopt::Long::Configure("bundling","require_order"); + +my @ARGV_before = @ARGV; + +GetOptions( + "debug|D" => \$opt::D, + "version" => \$opt::version, + "verbose|v" => \$opt::verbose, + "b|ignore-leading-blanks" => \$opt::ignore_leading_blanks, + "d|dictionary-order" => \$opt::dictionary_order, + "f|ignore-case" => \$opt::ignore_case, + "g|general-numeric-sort" => \$opt::general_numeric_sort, + "i|ignore-nonprinting" => \$opt::ignore_nonprinting, + "M|month-sort" => \$opt::month_sort, + "h|human-numeric-sort" => \$opt::human_numeric_sort, + "n|numeric-sort" => \$opt::numeric_sort, + "N|numascii" => \$opt::numascii, + "r|reverse" => \$opt::reverse, + "R|random-sort" => \$opt::random_sort, + "sort=s" => \$opt::sort, + "V|version-sort" => \$opt::version_sort, + "k|key=s" => \@opt::key, + "t|field-separator=s" => \$opt::field_separator, + "z|zero-terminated" => \$opt::zero_terminated, + "files0-from=s" => \$opt::files0_from, + "random-source=s" => \$opt::dummy, + "batch-size=s" => \$opt::dummy, + "check=s" => \$opt::dummy, + "c" => \$opt::dummy, + "C" => \$opt::dummy, + "compress-program=s" => \$opt::dummy, + "T|temporary-directory=s" => \$opt::dummy, + "parallel=s" => \$opt::parallel, + "u|unique" => \$opt::dummy, + "S|buffer-size=s" => \$opt::dummy, + "s|stable" => \$opt::dummy, + "help" => \$opt::dummy, + ) || exit(255); +$Global::progname = ($0 =~ m:(^|/)([^/]+)$:)[1]; +$Global::version = 20240222; +if($opt::version) { version(); exit 0; } +# Remove -D and --parallel=N +my @s = (grep { ! /^-D$|^--parallel=\S+$/ } + @ARGV_before[0..($#ARGV_before-$#ARGV-1)]); +my @sortoptions; +while(@s) { + my $o = shift @s; + # Remove '--parallel N' + if($o eq "--parallel") { + $o = shift @s; + } else { + push @sortoptions, $o; + } +} +@Global::sortoptions = shell_quote(@sortoptions); +$ENV{'TMPDIR'} ||= "/tmp"; + +sub merge { + # Input: + # @cmd = commands to 'cat' (part of) a file + # 'cat a' 'cat b' 'cat c' => + # sort -m <(sort -m <(cat a) <(cat b)) <(sort -m <(cat c)) + my @cmd = @_; + chomp(@cmd); + while($#cmd > 0) { + my @tmp; + while($#cmd >= 0) { + my $a = shift @cmd; + my $b = shift @cmd; + $a &&= "<($a)"; + $b &&= "<($b)"; + # This looks like useless use of 'cat', but contrary to + # naive belief it increases performance dramatically. + push @tmp, "sort -m @Global::sortoptions $a $b | cat" + } + @cmd = @tmp; + } + return @cmd; +} + +sub sort_files { + # Input is files + my @files = @_; + # Let GNU Parallel generate the commands to read parts of files + # The commands split at \n (or \0) + # and there will be at least one for each CPU thread + my @subopt; + if($opt::zero_terminated) { push @subopt, qw(--recend "\0"); } + if($opt::parallel) { push @subopt, qw(--jobs), $opt::parallel; } + # $uniq is needed because @files could contain \n + my $uniq = join "", map { (0..9,"a".."z","A".."Z")[rand(62)] } (1..20); + open(my $par,"-|",qw(parallel), @subopt, + qw(--pipepart --block -1 --dryrun -vv sort), + @Global::sortoptions, $uniq, '::::', @files) || die; + # Generated commands: + # <file perl-catter | (sort ... $uniq ) + # Use $uniq to split into commands + # (We cannot use \n because 'file' may contain newline) + my @cmd = map { "$_)\n" } split(/$uniq[)]\n/, join("",<$par>)); + debug(1,@cmd); + close $par; + @cmd = merge(@cmd); + # The command uses <(...) so it is incompatible with /bin/sh + open(my $bash,"|-","bash") || die; + print $bash @cmd; + close $bash; +} + +sub sort_stdin { + # Input is stdin + # Spread the input between n processes that each sort + # n = number of CPU threads + my $numthreads; + chomp($numthreads = $opt::parallel || `parallel --number-of-threads`); + my @fifos = map { tmpfifo() } 1..$numthreads; + map { mkfifo($_,0600) } @fifos; + # This trick removes the fifo as soon as it is connected in the other end + # (rm fifo; ...) < fifo + my @cmd = (map { "(rm $_; sort @Global::sortoptions) < $_" } + map { Q($_) } @fifos); + @cmd = merge(@cmd); + if(fork) { + } else { + my @subopt = $opt::zero_terminated ? qw(--recend "\0") : (); + exec(qw(parallel -0 -j), $numthreads, @subopt, + # 286k is the best mean value after testing 250..350 + qw(--block 286k --pipe --roundrobin cat > {} :::),@fifos); + } + # The command uses <(...) so it is incompatible with /bin/sh + open(my $bash,"|-","bash") || die; + print $bash @cmd; + close $bash; +} + +sub tmpname { + # Select a name that does not exist + # Do not create the file as it may be used for creating a socket (by tmux) + # Remember the name in $Global::unlink to avoid hitting the same name twice + my $name = shift; + my($tmpname); + if(not -w $ENV{'TMPDIR'}) { + if(not -e $ENV{'TMPDIR'}) { + ::error("Tmpdir '$ENV{'TMPDIR'}' does not exist.","Try 'mkdir ". + Q($ENV{'TMPDIR'})."'"); + } else { + ::error("Tmpdir '$ENV{'TMPDIR'}' is not writable.","Try 'chmod +w ". + Q($ENV{'TMPDIR'})."'"); + } + exit(255); + } + do { + $tmpname = $ENV{'TMPDIR'}."/".$name. + join"", map { (0..9,"a".."z","A".."Z")[rand(62)] } (1..5); + } while(-e $tmpname); + return $tmpname; +} + +sub tmpfifo { + # Find an unused name and mkfifo on it + my $tmpfifo = tmpname("psort"); + mkfifo($tmpfifo,0600); + return $tmpfifo; +} + +sub debug { + # Returns: N/A + $opt::D or return; + @_ = grep { defined $_ ? $_ : "" } @_; + print STDERR @_[1..$#_]; +} + +sub version() { + # Returns: N/A + print join + ("\n", + "GNU $Global::progname $Global::version", + "Copyright (C) 2020-2024 Ole Tange, http://ole.tange.dk and Free Software", + "Foundation, Inc.", + "License GPLv3+: GNU GPL version 3 or later <https://gnu.org/licenses/gpl.html>", + "This is free software: you are free to change and redistribute it.", + "GNU $Global::progname comes with no warranty.", + "", + "Web site: https://www.gnu.org/software/parallel\n", + ); +} + +sub shell_quote(@) { + # Input: + # @strings = strings to be quoted + # Returns: + # @shell_quoted_strings = string quoted as needed by the shell + return wantarray ? (map { Q($_) } @_) : (join" ",map { Q($_) } @_); +} + +sub shell_quote_scalar_rc($) { + # Quote for the rc-shell + my $a = $_[0]; + if(defined $a) { + if(($a =~ s/'/''/g) + + + ($a =~ s/[\n\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\^\*\<\=\>\~\|\; \"\!\$\&\'\202-\377]+/'$&'/go)) { + # A string was replaced + # No need to test for "" or \0 + } elsif($a eq "") { + $a = "''"; + } elsif($a eq "\0") { + $a = ""; + } + } + return $a; +} + +sub shell_quote_scalar_csh($) { + # Quote for (t)csh + my $a = $_[0]; + if(defined $a) { + # $a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\^\*\>\<\~\|\; \"\!\$\&\'\202-\377])/\\$1/g; + # This is 1% faster than the above + if(($a =~ s/[\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\^\*\<\=\>\~\|\; \"\!\$\&\'\202-\377]/\\$&/go) + + + # quote newline in csh as \\\n + ($a =~ s/[\n]/"\\\n"/go)) { + # A string was replaced + # No need to test for "" or \0 + } elsif($a eq "") { + $a = "''"; + } elsif($a eq "\0") { + $a = ""; + } + } + return $a; +} + +sub shell_quote_scalar_default($) { + # Quote for other shells (Bourne compatibles) + # Inputs: + # $string = string to be quoted + # Returns: + # $shell_quoted = string quoted as needed by the shell + my $s = $_[0]; + if($s =~ /[^-_.+a-z0-9\/]/i) { + $s =~ s/'/'"'"'/g; # "-quote single quotes + $s = "'$s'"; # '-quote entire string + $s =~ s/^''//; # Remove unneeded '' at ends + $s =~ s/''$//; # (faster than s/^''|''$//g) + return $s; + } elsif ($s eq "") { + return "''"; + } else { + # No quoting needed + return $s; + } +} + +sub shell_quote_scalar($) { + # Quote the string so the shell will not expand any special chars + # Inputs: + # $string = string to be quoted + # Returns: + # $shell_quoted = string quoted as needed by the shell + + # Speed optimization: Choose the correct shell_quote_scalar_* + # and call that directly from now on + no warnings 'redefine'; + if($Global::cshell) { + # (t)csh + *shell_quote_scalar = \&shell_quote_scalar_csh; + } elsif($Global::shell =~ m:(^|/)rc$:) { + # rc-shell + *shell_quote_scalar = \&shell_quote_scalar_rc; + } else { + # other shells + *shell_quote_scalar = \&shell_quote_scalar_default; + } + # The sub is now redefined. Call it + return shell_quote_scalar($_[0]); +} + +sub Q($) { + # Q alias for ::shell_quote_scalar + my $ret = shell_quote_scalar($_[0]); + no warnings 'redefine'; + *Q = \&::shell_quote_scalar; + return $ret; +} + + +sub status(@) { + my @w = @_; + my $fh = $Global::status_fd || *STDERR; + print $fh map { ($_, "\n") } @w; + flush $fh; +} + +sub status_no_nl(@) { + my @w = @_; + my $fh = $Global::status_fd || *STDERR; + print $fh @w; + flush $fh; +} + +sub warning(@) { + my @w = @_; + my $prog = $Global::progname || "parsort"; + status_no_nl(map { ($prog, ": Warning: ", $_, "\n"); } @w); +} + +{ + my %warnings; + sub warning_once(@) { + my @w = @_; + my $prog = $Global::progname || "parsort"; + $warnings{@w}++ or + status_no_nl(map { ($prog, ": Warning: ", $_, "\n"); } @w); + } +} + +sub error(@) { + my @w = @_; + my $prog = $Global::progname || "parsort"; + status(map { ($prog.": Error: ". $_); } @w); +} + +sub die_bug($) { + my $bugid = shift; + print STDERR + ("$Global::progname: This should not happen. You have found a bug. ", + "Please follow\n", + "https://www.gnu.org/software/parallel/man.html#REPORTING-BUGS\n", + "\n", + "Include this in the report:\n", + "* The version number: $Global::version\n", + "* The bugid: $bugid\n", + "* The command line being run\n", + "* The files being read (put the files on a webserver if they are big)\n", + "\n", + "If you get the error on smaller/fewer files, please include those instead.\n"); + exit(255); +} + +if(@ARGV) { + sort_files(@ARGV); +} elsif(length $opt::files0_from) { + $/="\0"; + open(my $fh,"<",$opt::files0_from) || die; + my @files = <$fh>; + chomp(@files); + sort_files(@files); +} else { + sort_stdin(); +} + +# Test +# -z +# OK: cat bigfile | parsort +# OK: parsort -k4n files*.txt +# OK: parsort files*.txt +# OK: parsort "file with space" + |