diff options
Diffstat (limited to '')
-rwxr-xr-x | src/parcat | 194 | ||||
-rw-r--r-- | src/parcat.pod | 191 |
2 files changed, 385 insertions, 0 deletions
diff --git a/src/parcat b/src/parcat new file mode 100755 index 0000000..b4956b0 --- /dev/null +++ b/src/parcat @@ -0,0 +1,194 @@ +#!/usr/bin/perl + +# Copyright (C) 2016-2024 Ole Tange, http://ole.tange.dk and Free +# Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, see <https://www.gnu.org/licenses/> +# or write to the Free Software Foundation, Inc., 51 Franklin St, +# Fifth Floor, Boston, MA 02110-1301 USA +# +# SPDX-FileCopyrightText: 2021-2024 Ole Tange, http://ole.tange.dk and Free Software and Foundation, Inc. +# SPDX-License-Identifier: GPL-3.0-or-later + +use Symbol qw(gensym); +use IPC::Open3; +use POSIX qw(:errno_h); +use IO::Select; +use strict; +use threads; +use threads::shared; +use Thread::Queue; + + +my $opened :shared; +my $q = Thread::Queue->new(); +my $okq = Thread::Queue->new(); +my @producers; + +if(not @ARGV) { + if(-t *STDIN) { + print "Usage:\n"; + print " parcat file(s)\n"; + print " cat argfile | parcat\n"; + } else { + # Read arguments from stdin + chomp(@ARGV = <STDIN>); + } +} + +my $files_to_open = 0; +# Default: fd = stdout +my $fd = 1; +for (@ARGV) { + # --rm = remove file when opened + /^--rm$/ and do { $opt::rm = 1; next; }; + # -1 = output to fd 1, -2 = output to fd 2 + /^-(\d+)$/ and do { $fd = $1; next; }; + push @producers, threads->create('producer', $_, $fd); + $files_to_open++; +} + +sub producer { + # Open a file/fifo, set non blocking, enqueue fileno of the file handle + my $file = shift; + my $output_fd = shift; + open(my $fh, "<", $file) || do { + print STDERR "parcat: Cannot open $file\n"; + exit(1); + }; + # Remove file when it has been opened + if($opt::rm) { + unlink $file; + } + set_fh_non_blocking($fh); + $opened++; + # Pass the fileno to parent + $q->enqueue(fileno($fh),$output_fd); + # Get an OK that the $fh is opened and we can release the $fh + while(1) { + my $ok = $okq->dequeue(); + if($ok == fileno($fh)) { last; } + # Not ours - very unlikely to happen + $okq->enqueue($ok); + } + return; +} + +my $s = IO::Select->new(); +my %buffer; + +sub add_file { + my $infd = shift; + my $outfd = shift; + open(my $infh, "<&=", $infd) || die; + open(my $outfh, ">&=", $outfd) || die; + $s->add($infh); + # Tell the producer now opened here and can be released + $okq->enqueue($infd); + # Initialize the buffer + @{$buffer{$infh}{$outfd}} = (); + $Global::fh{$outfd} = $outfh; +} + +sub add_files { + # Non-blocking dequeue + my ($infd,$outfd); + do { + ($infd,$outfd) = $q->dequeue_nb(2); + if(defined($outfd)) { + add_file($infd,$outfd); + } + } while(defined($outfd)); +} + +sub add_files_block { + # Blocking dequeue + my ($infd,$outfd) = $q->dequeue(2); + add_file($infd,$outfd); +} + + +my $fd; +my (@ready,$infh,$rv,$buf); +do { + # Wait until at least one file is opened + add_files_block(); + while($q->pending or keys %buffer) { + add_files(); + while(keys %buffer) { + @ready = $s->can_read(0.01); + if(not @ready) { + add_files(); + } + for $infh (@ready) { + # There is only one key, namely the output file descriptor + for my $outfd (keys %{$buffer{$infh}}) { + $rv = sysread($infh, $buf, 65536); + if (!$rv) { + if($! == EAGAIN) { + # Would block: Nothing read + next; + } else { + # Nothing read, but would not block: + # This file is done + $s->remove($infh); + for(@{$buffer{$infh}{$outfd}}) { + syswrite($Global::fh{$outfd},$_); + } + delete $buffer{$infh}; + # Closing the $infh causes it to block + # close $infh; + add_files(); + next; + } + } + # Something read. + # Find \n or \r for full line + my $i = (rindex($buf,"\n")+1); + if($i) { + # Print full line + for(@{$buffer{$infh}{$outfd}}, substr($buf,0,$i)) { + syswrite($Global::fh{$outfd},$_); + } + # @buffer = remaining half line + $buffer{$infh}{$outfd} = [substr($buf,$i,$rv-$i)]; + } else { + # Something read, but not a full line + push @{$buffer{$infh}{$outfd}}, $buf; + } + redo; + } + } + } + } +} while($opened < $files_to_open); + + +for (@producers) { + $_->join(); +} + +sub set_fh_non_blocking { + # Set filehandle as non-blocking + # Inputs: + # $fh = filehandle to be blocking + # Returns: + # N/A + my $fh = shift; + $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;"; + my $flags; + fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle + $flags |= &O_NONBLOCK; # Add non-blocking to the flags + fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle +} diff --git a/src/parcat.pod b/src/parcat.pod new file mode 100644 index 0000000..f41e530 --- /dev/null +++ b/src/parcat.pod @@ -0,0 +1,191 @@ +#!/usr/bin/perl + +# SPDX-FileCopyrightText: 2021-2024 Ole Tange, http://ole.tange.dk and Free Software and Foundation, Inc. +# SPDX-License-Identifier: GFDL-1.3-or-later +# SPDX-License-Identifier: CC-BY-SA-4.0 + +=head1 NAME + +parcat - cat files or fifos in parallel + +=head1 SYNOPSIS + +B<parcat> [--rm] [-#] file(s) [-#] file(s) + +=head1 DESCRIPTION + +GNU B<parcat> reads files or fifos in parallel. It writes full lines +so there will be no problem with mixed-half-lines which you risk if +you use: + + (cat file1 & cat file2 &) | ... + +It is faster than doing: + + parallel -j0 --lb cat ::: file* + +Arguments can be given on the command line or passed in on stdin +(standard input). + +=head1 OPTIONS + +=over 9 + +=item -B<#> + +Arguments following this will be sent to the file descriptor B<#>. E.g. + + parcat -1 stdout1 stdout2 -2 stderr1 stderr2 + +will send I<stdout1> and I<stdout2> to stdout (standard output = file +descriptor 1), and send I<stderr1> and I<stderr2> to stderr (standard +error = file descriptor 2). + +=item --rm + +Remove files after opening. As soon as the files are opened, unlink +the files. + +=back + +=head1 EXAMPLES + +=head2 Simple line buffered output + +B<traceroute> will often print half a line. If run in parallel, two +instances may half-lines of their output. This can be avoided by +saving the output to a fifo and then using B<parcat> to read the two +fifos in parallel: + + mkfifo freenetproject.org.fifo tange.dk.fifo + traceroute freenetproject.org > freenetproject.org.fifo & + traceroute tange.dk > tange.dk.fifo & + parcat --rm *fifo + + +=head1 REPORTING BUGS + +GNU B<parcat> is part of GNU B<parallel>. Report bugs to +<bug-parallel@gnu.org>. + + +=head1 AUTHOR + +Copyright (C) 2016-2024 Ole Tange, http://ole.tange.dk and Free +Software Foundation, Inc. + +=head1 LICENSE + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 3 of the License, or +at your option any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see <http://www.gnu.org/licenses/>. + +=head2 Documentation license I + +Permission is granted to copy, distribute and/or modify this +documentation under the terms of the GNU Free Documentation License, +Version 1.3 or any later version published by the Free Software +Foundation; with no Invariant Sections, with no Front-Cover Texts, and +with no Back-Cover Texts. A copy of the license is included in the +file LICENSES/GFDL-1.3-or-later.txt. + +=head2 Documentation license II + +You are free: + +=over 9 + +=item B<to Share> + +to copy, distribute and transmit the work + +=item B<to Remix> + +to adapt the work + +=back + +Under the following conditions: + +=over 9 + +=item B<Attribution> + +You must attribute the work in the manner specified by the author or +licensor (but not in any way that suggests that they endorse you or +your use of the work). + +=item B<Share Alike> + +If you alter, transform, or build upon this work, you may distribute +the resulting work only under the same, similar or a compatible +license. + +=back + +With the understanding that: + +=over 9 + +=item B<Waiver> + +Any of the above conditions can be waived if you get permission from +the copyright holder. + +=item B<Public Domain> + +Where the work or any of its elements is in the public domain under +applicable law, that status is in no way affected by the license. + +=item B<Other Rights> + +In no way are any of the following rights affected by the license: + +=over 9 + +=item * + +Your fair dealing or fair use rights, or other applicable +copyright exceptions and limitations; + +=item * + +The author's moral rights; + +=item * + +Rights other persons may have either in the work itself or in +how the work is used, such as publicity or privacy rights. + +=back + +=item B<Notice> + +For any reuse or distribution, you must make clear to others the +license terms of this work. + +=back + +A copy of the full license is included in the file as +LICENCES/CC-BY-SA-4.0.txt + + +=head1 DEPENDENCIES + +GNU B<parcat> uses Perl. + + +=head1 SEE ALSO + +B<cat>(1), B<parallel>(1) + +=cut |