summaryrefslogtreecommitdiffstats
path: root/src/parsort
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 09:43:15 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 09:43:15 +0000
commit4fd40781fe3edc4ae5895e3cfddfbfea328ac0e8 (patch)
treeefb19f2d9385f548990904757cdf44f3b9546fca /src/parsort
parentInitial commit. (diff)
downloadparallel-4fd40781fe3edc4ae5895e3cfddfbfea328ac0e8.tar.xz
parallel-4fd40781fe3edc4ae5895e3cfddfbfea328ac0e8.zip
Adding upstream version 20240222+ds.upstream/20240222+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/parsort')
-rwxr-xr-xsrc/parsort459
1 files changed, 459 insertions, 0 deletions
diff --git a/src/parsort b/src/parsort
new file mode 100755
index 0000000..41a75da
--- /dev/null
+++ b/src/parsort
@@ -0,0 +1,459 @@
+#!/usr/bin/perl
+
+# SPDX-FileCopyrightText: 2021-2024 Ole Tange, http://ole.tange.dk and Free Software and Foundation, Inc.
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+=pod
+
+=head1 NAME
+
+parsort - Sort (big files) in parallel
+
+
+=head1 SYNOPSIS
+
+B<parsort> I<options for sort>
+
+
+=head1 DESCRIPTION
+
+B<parsort> uses GNU B<sort> to sort in parallel. It works just like
+B<sort> but faster on inputs with more than 1 M lines, if you have a
+multicore machine.
+
+Hopefully these ideas will make it into GNU B<sort> in the future.
+
+
+=head1 OPTIONS
+
+Same as B<sort>. Except:
+
+=over 4
+
+=item B<--parallel=>I<N>
+
+Change the number of sorts run concurrently to I<N>. I<N> will be
+increased to number of files if B<parsort> is given more than I<N>
+files.
+
+=back
+
+
+=head1 EXAMPLE
+
+Sort files:
+
+ parsort *.txt > sorted.txt
+
+Sort stdin (standard input) numerically:
+
+ cat numbers | parsort -n > sorted.txt
+
+
+=head1 PERFORMANCE
+
+B<parsort> is faster on files than on stdin (standard input), because
+different parts of a file can be read in parallel.
+
+On a 48 core machine you should see a speedup of 3x over B<sort>.
+
+
+=head1 AUTHOR
+
+Copyright (C) 2020-2024 Ole Tange,
+http://ole.tange.dk and Free Software Foundation, Inc.
+
+
+=head1 LICENSE
+
+Copyright (C) 2012 Free Software Foundation, Inc.
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; either version 3 of the License, or
+at your option any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+=head1 DEPENDENCIES
+
+B<parsort> uses B<sort>, B<bash>, and B<parallel>.
+
+
+=head1 SEE ALSO
+
+B<sort>
+
+
+=cut
+
+use strict;
+use Getopt::Long;
+use POSIX qw(mkfifo);
+
+Getopt::Long::Configure("bundling","require_order");
+
+my @ARGV_before = @ARGV;
+
+GetOptions(
+ "debug|D" => \$opt::D,
+ "version" => \$opt::version,
+ "verbose|v" => \$opt::verbose,
+ "b|ignore-leading-blanks" => \$opt::ignore_leading_blanks,
+ "d|dictionary-order" => \$opt::dictionary_order,
+ "f|ignore-case" => \$opt::ignore_case,
+ "g|general-numeric-sort" => \$opt::general_numeric_sort,
+ "i|ignore-nonprinting" => \$opt::ignore_nonprinting,
+ "M|month-sort" => \$opt::month_sort,
+ "h|human-numeric-sort" => \$opt::human_numeric_sort,
+ "n|numeric-sort" => \$opt::numeric_sort,
+ "N|numascii" => \$opt::numascii,
+ "r|reverse" => \$opt::reverse,
+ "R|random-sort" => \$opt::random_sort,
+ "sort=s" => \$opt::sort,
+ "V|version-sort" => \$opt::version_sort,
+ "k|key=s" => \@opt::key,
+ "t|field-separator=s" => \$opt::field_separator,
+ "z|zero-terminated" => \$opt::zero_terminated,
+ "files0-from=s" => \$opt::files0_from,
+ "random-source=s" => \$opt::dummy,
+ "batch-size=s" => \$opt::dummy,
+ "check=s" => \$opt::dummy,
+ "c" => \$opt::dummy,
+ "C" => \$opt::dummy,
+ "compress-program=s" => \$opt::dummy,
+ "T|temporary-directory=s" => \$opt::dummy,
+ "parallel=s" => \$opt::parallel,
+ "u|unique" => \$opt::dummy,
+ "S|buffer-size=s" => \$opt::dummy,
+ "s|stable" => \$opt::dummy,
+ "help" => \$opt::dummy,
+ ) || exit(255);
+$Global::progname = ($0 =~ m:(^|/)([^/]+)$:)[1];
+$Global::version = 20240222;
+if($opt::version) { version(); exit 0; }
+# Remove -D and --parallel=N
+my @s = (grep { ! /^-D$|^--parallel=\S+$/ }
+ @ARGV_before[0..($#ARGV_before-$#ARGV-1)]);
+my @sortoptions;
+while(@s) {
+ my $o = shift @s;
+ # Remove '--parallel N'
+ if($o eq "--parallel") {
+ $o = shift @s;
+ } else {
+ push @sortoptions, $o;
+ }
+}
+@Global::sortoptions = shell_quote(@sortoptions);
+$ENV{'TMPDIR'} ||= "/tmp";
+
+sub merge {
+ # Input:
+ # @cmd = commands to 'cat' (part of) a file
+ # 'cat a' 'cat b' 'cat c' =>
+ # sort -m <(sort -m <(cat a) <(cat b)) <(sort -m <(cat c))
+ my @cmd = @_;
+ chomp(@cmd);
+ while($#cmd > 0) {
+ my @tmp;
+ while($#cmd >= 0) {
+ my $a = shift @cmd;
+ my $b = shift @cmd;
+ $a &&= "<($a)";
+ $b &&= "<($b)";
+ # This looks like useless use of 'cat', but contrary to
+ # naive belief it increases performance dramatically.
+ push @tmp, "sort -m @Global::sortoptions $a $b | cat"
+ }
+ @cmd = @tmp;
+ }
+ return @cmd;
+}
+
+sub sort_files {
+ # Input is files
+ my @files = @_;
+ # Let GNU Parallel generate the commands to read parts of files
+ # The commands split at \n (or \0)
+ # and there will be at least one for each CPU thread
+ my @subopt;
+ if($opt::zero_terminated) { push @subopt, qw(--recend "\0"); }
+ if($opt::parallel) { push @subopt, qw(--jobs), $opt::parallel; }
+ # $uniq is needed because @files could contain \n
+ my $uniq = join "", map { (0..9,"a".."z","A".."Z")[rand(62)] } (1..20);
+ open(my $par,"-|",qw(parallel), @subopt,
+ qw(--pipepart --block -1 --dryrun -vv sort),
+ @Global::sortoptions, $uniq, '::::', @files) || die;
+ # Generated commands:
+ # <file perl-catter | (sort ... $uniq )
+ # Use $uniq to split into commands
+ # (We cannot use \n because 'file' may contain newline)
+ my @cmd = map { "$_)\n" } split(/$uniq[)]\n/, join("",<$par>));
+ debug(1,@cmd);
+ close $par;
+ @cmd = merge(@cmd);
+ # The command uses <(...) so it is incompatible with /bin/sh
+ open(my $bash,"|-","bash") || die;
+ print $bash @cmd;
+ close $bash;
+}
+
+sub sort_stdin {
+ # Input is stdin
+ # Spread the input between n processes that each sort
+ # n = number of CPU threads
+ my $numthreads;
+ chomp($numthreads = $opt::parallel || `parallel --number-of-threads`);
+ my @fifos = map { tmpfifo() } 1..$numthreads;
+ map { mkfifo($_,0600) } @fifos;
+ # This trick removes the fifo as soon as it is connected in the other end
+ # (rm fifo; ...) < fifo
+ my @cmd = (map { "(rm $_; sort @Global::sortoptions) < $_" }
+ map { Q($_) } @fifos);
+ @cmd = merge(@cmd);
+ if(fork) {
+ } else {
+ my @subopt = $opt::zero_terminated ? qw(--recend "\0") : ();
+ exec(qw(parallel -0 -j), $numthreads, @subopt,
+ # 286k is the best mean value after testing 250..350
+ qw(--block 286k --pipe --roundrobin cat > {} :::),@fifos);
+ }
+ # The command uses <(...) so it is incompatible with /bin/sh
+ open(my $bash,"|-","bash") || die;
+ print $bash @cmd;
+ close $bash;
+}
+
+sub tmpname {
+ # Select a name that does not exist
+ # Do not create the file as it may be used for creating a socket (by tmux)
+ # Remember the name in $Global::unlink to avoid hitting the same name twice
+ my $name = shift;
+ my($tmpname);
+ if(not -w $ENV{'TMPDIR'}) {
+ if(not -e $ENV{'TMPDIR'}) {
+ ::error("Tmpdir '$ENV{'TMPDIR'}' does not exist.","Try 'mkdir ".
+ Q($ENV{'TMPDIR'})."'");
+ } else {
+ ::error("Tmpdir '$ENV{'TMPDIR'}' is not writable.","Try 'chmod +w ".
+ Q($ENV{'TMPDIR'})."'");
+ }
+ exit(255);
+ }
+ do {
+ $tmpname = $ENV{'TMPDIR'}."/".$name.
+ join"", map { (0..9,"a".."z","A".."Z")[rand(62)] } (1..5);
+ } while(-e $tmpname);
+ return $tmpname;
+}
+
+sub tmpfifo {
+ # Find an unused name and mkfifo on it
+ my $tmpfifo = tmpname("psort");
+ mkfifo($tmpfifo,0600);
+ return $tmpfifo;
+}
+
+sub debug {
+ # Returns: N/A
+ $opt::D or return;
+ @_ = grep { defined $_ ? $_ : "" } @_;
+ print STDERR @_[1..$#_];
+}
+
+sub version() {
+ # Returns: N/A
+ print join
+ ("\n",
+ "GNU $Global::progname $Global::version",
+ "Copyright (C) 2020-2024 Ole Tange, http://ole.tange.dk and Free Software",
+ "Foundation, Inc.",
+ "License GPLv3+: GNU GPL version 3 or later <https://gnu.org/licenses/gpl.html>",
+ "This is free software: you are free to change and redistribute it.",
+ "GNU $Global::progname comes with no warranty.",
+ "",
+ "Web site: https://www.gnu.org/software/parallel\n",
+ );
+}
+
+sub shell_quote(@) {
+ # Input:
+ # @strings = strings to be quoted
+ # Returns:
+ # @shell_quoted_strings = string quoted as needed by the shell
+ return wantarray ? (map { Q($_) } @_) : (join" ",map { Q($_) } @_);
+}
+
+sub shell_quote_scalar_rc($) {
+ # Quote for the rc-shell
+ my $a = $_[0];
+ if(defined $a) {
+ if(($a =~ s/'/''/g)
+ +
+ ($a =~ s/[\n\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\^\*\<\=\>\~\|\; \"\!\$\&\'\202-\377]+/'$&'/go)) {
+ # A string was replaced
+ # No need to test for "" or \0
+ } elsif($a eq "") {
+ $a = "''";
+ } elsif($a eq "\0") {
+ $a = "";
+ }
+ }
+ return $a;
+}
+
+sub shell_quote_scalar_csh($) {
+ # Quote for (t)csh
+ my $a = $_[0];
+ if(defined $a) {
+ # $a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\^\*\>\<\~\|\; \"\!\$\&\'\202-\377])/\\$1/g;
+ # This is 1% faster than the above
+ if(($a =~ s/[\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\^\*\<\=\>\~\|\; \"\!\$\&\'\202-\377]/\\$&/go)
+ +
+ # quote newline in csh as \\\n
+ ($a =~ s/[\n]/"\\\n"/go)) {
+ # A string was replaced
+ # No need to test for "" or \0
+ } elsif($a eq "") {
+ $a = "''";
+ } elsif($a eq "\0") {
+ $a = "";
+ }
+ }
+ return $a;
+}
+
+sub shell_quote_scalar_default($) {
+ # Quote for other shells (Bourne compatibles)
+ # Inputs:
+ # $string = string to be quoted
+ # Returns:
+ # $shell_quoted = string quoted as needed by the shell
+ my $s = $_[0];
+ if($s =~ /[^-_.+a-z0-9\/]/i) {
+ $s =~ s/'/'"'"'/g; # "-quote single quotes
+ $s = "'$s'"; # '-quote entire string
+ $s =~ s/^''//; # Remove unneeded '' at ends
+ $s =~ s/''$//; # (faster than s/^''|''$//g)
+ return $s;
+ } elsif ($s eq "") {
+ return "''";
+ } else {
+ # No quoting needed
+ return $s;
+ }
+}
+
+sub shell_quote_scalar($) {
+ # Quote the string so the shell will not expand any special chars
+ # Inputs:
+ # $string = string to be quoted
+ # Returns:
+ # $shell_quoted = string quoted as needed by the shell
+
+ # Speed optimization: Choose the correct shell_quote_scalar_*
+ # and call that directly from now on
+ no warnings 'redefine';
+ if($Global::cshell) {
+ # (t)csh
+ *shell_quote_scalar = \&shell_quote_scalar_csh;
+ } elsif($Global::shell =~ m:(^|/)rc$:) {
+ # rc-shell
+ *shell_quote_scalar = \&shell_quote_scalar_rc;
+ } else {
+ # other shells
+ *shell_quote_scalar = \&shell_quote_scalar_default;
+ }
+ # The sub is now redefined. Call it
+ return shell_quote_scalar($_[0]);
+}
+
+sub Q($) {
+ # Q alias for ::shell_quote_scalar
+ my $ret = shell_quote_scalar($_[0]);
+ no warnings 'redefine';
+ *Q = \&::shell_quote_scalar;
+ return $ret;
+}
+
+
+sub status(@) {
+ my @w = @_;
+ my $fh = $Global::status_fd || *STDERR;
+ print $fh map { ($_, "\n") } @w;
+ flush $fh;
+}
+
+sub status_no_nl(@) {
+ my @w = @_;
+ my $fh = $Global::status_fd || *STDERR;
+ print $fh @w;
+ flush $fh;
+}
+
+sub warning(@) {
+ my @w = @_;
+ my $prog = $Global::progname || "parsort";
+ status_no_nl(map { ($prog, ": Warning: ", $_, "\n"); } @w);
+}
+
+{
+ my %warnings;
+ sub warning_once(@) {
+ my @w = @_;
+ my $prog = $Global::progname || "parsort";
+ $warnings{@w}++ or
+ status_no_nl(map { ($prog, ": Warning: ", $_, "\n"); } @w);
+ }
+}
+
+sub error(@) {
+ my @w = @_;
+ my $prog = $Global::progname || "parsort";
+ status(map { ($prog.": Error: ". $_); } @w);
+}
+
+sub die_bug($) {
+ my $bugid = shift;
+ print STDERR
+ ("$Global::progname: This should not happen. You have found a bug. ",
+ "Please follow\n",
+ "https://www.gnu.org/software/parallel/man.html#REPORTING-BUGS\n",
+ "\n",
+ "Include this in the report:\n",
+ "* The version number: $Global::version\n",
+ "* The bugid: $bugid\n",
+ "* The command line being run\n",
+ "* The files being read (put the files on a webserver if they are big)\n",
+ "\n",
+ "If you get the error on smaller/fewer files, please include those instead.\n");
+ exit(255);
+}
+
+if(@ARGV) {
+ sort_files(@ARGV);
+} elsif(length $opt::files0_from) {
+ $/="\0";
+ open(my $fh,"<",$opt::files0_from) || die;
+ my @files = <$fh>;
+ chomp(@files);
+ sort_files(@files);
+} else {
+ sort_stdin();
+}
+
+# Test
+# -z
+# OK: cat bigfile | parsort
+# OK: parsort -k4n files*.txt
+# OK: parsort files*.txt
+# OK: parsort "file with space"
+