summaryrefslogtreecommitdiffstats
path: root/src/parcat
diff options
context:
space:
mode:
Diffstat (limited to '')
-rwxr-xr-xsrc/parcat194
-rw-r--r--src/parcat.pod191
2 files changed, 385 insertions, 0 deletions
diff --git a/src/parcat b/src/parcat
new file mode 100755
index 0000000..b4956b0
--- /dev/null
+++ b/src/parcat
@@ -0,0 +1,194 @@
+#!/usr/bin/perl
+
+# Copyright (C) 2016-2024 Ole Tange, http://ole.tange.dk and Free
+# Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, see <https://www.gnu.org/licenses/>
+# or write to the Free Software Foundation, Inc., 51 Franklin St,
+# Fifth Floor, Boston, MA 02110-1301 USA
+#
+# SPDX-FileCopyrightText: 2021-2024 Ole Tange, http://ole.tange.dk and Free Software and Foundation, Inc.
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+use Symbol qw(gensym);
+use IPC::Open3;
+use POSIX qw(:errno_h);
+use IO::Select;
+use strict;
+use threads;
+use threads::shared;
+use Thread::Queue;
+
+
+my $opened :shared;
+my $q = Thread::Queue->new();
+my $okq = Thread::Queue->new();
+my @producers;
+
+if(not @ARGV) {
+ if(-t *STDIN) {
+ print "Usage:\n";
+ print " parcat file(s)\n";
+ print " cat argfile | parcat\n";
+ } else {
+ # Read arguments from stdin
+ chomp(@ARGV = <STDIN>);
+ }
+}
+
+my $files_to_open = 0;
+# Default: fd = stdout
+my $fd = 1;
+for (@ARGV) {
+ # --rm = remove file when opened
+ /^--rm$/ and do { $opt::rm = 1; next; };
+ # -1 = output to fd 1, -2 = output to fd 2
+ /^-(\d+)$/ and do { $fd = $1; next; };
+ push @producers, threads->create('producer', $_, $fd);
+ $files_to_open++;
+}
+
+sub producer {
+ # Open a file/fifo, set non blocking, enqueue fileno of the file handle
+ my $file = shift;
+ my $output_fd = shift;
+ open(my $fh, "<", $file) || do {
+ print STDERR "parcat: Cannot open $file\n";
+ exit(1);
+ };
+ # Remove file when it has been opened
+ if($opt::rm) {
+ unlink $file;
+ }
+ set_fh_non_blocking($fh);
+ $opened++;
+ # Pass the fileno to parent
+ $q->enqueue(fileno($fh),$output_fd);
+ # Get an OK that the $fh is opened and we can release the $fh
+ while(1) {
+ my $ok = $okq->dequeue();
+ if($ok == fileno($fh)) { last; }
+ # Not ours - very unlikely to happen
+ $okq->enqueue($ok);
+ }
+ return;
+}
+
+my $s = IO::Select->new();
+my %buffer;
+
+sub add_file {
+ my $infd = shift;
+ my $outfd = shift;
+ open(my $infh, "<&=", $infd) || die;
+ open(my $outfh, ">&=", $outfd) || die;
+ $s->add($infh);
+ # Tell the producer now opened here and can be released
+ $okq->enqueue($infd);
+ # Initialize the buffer
+ @{$buffer{$infh}{$outfd}} = ();
+ $Global::fh{$outfd} = $outfh;
+}
+
+sub add_files {
+ # Non-blocking dequeue
+ my ($infd,$outfd);
+ do {
+ ($infd,$outfd) = $q->dequeue_nb(2);
+ if(defined($outfd)) {
+ add_file($infd,$outfd);
+ }
+ } while(defined($outfd));
+}
+
+sub add_files_block {
+ # Blocking dequeue
+ my ($infd,$outfd) = $q->dequeue(2);
+ add_file($infd,$outfd);
+}
+
+
+my $fd;
+my (@ready,$infh,$rv,$buf);
+do {
+ # Wait until at least one file is opened
+ add_files_block();
+ while($q->pending or keys %buffer) {
+ add_files();
+ while(keys %buffer) {
+ @ready = $s->can_read(0.01);
+ if(not @ready) {
+ add_files();
+ }
+ for $infh (@ready) {
+ # There is only one key, namely the output file descriptor
+ for my $outfd (keys %{$buffer{$infh}}) {
+ $rv = sysread($infh, $buf, 65536);
+ if (!$rv) {
+ if($! == EAGAIN) {
+ # Would block: Nothing read
+ next;
+ } else {
+ # Nothing read, but would not block:
+ # This file is done
+ $s->remove($infh);
+ for(@{$buffer{$infh}{$outfd}}) {
+ syswrite($Global::fh{$outfd},$_);
+ }
+ delete $buffer{$infh};
+ # Closing the $infh causes it to block
+ # close $infh;
+ add_files();
+ next;
+ }
+ }
+ # Something read.
+ # Find \n or \r for full line
+ my $i = (rindex($buf,"\n")+1);
+ if($i) {
+ # Print full line
+ for(@{$buffer{$infh}{$outfd}}, substr($buf,0,$i)) {
+ syswrite($Global::fh{$outfd},$_);
+ }
+ # @buffer = remaining half line
+ $buffer{$infh}{$outfd} = [substr($buf,$i,$rv-$i)];
+ } else {
+ # Something read, but not a full line
+ push @{$buffer{$infh}{$outfd}}, $buf;
+ }
+ redo;
+ }
+ }
+ }
+ }
+} while($opened < $files_to_open);
+
+
+for (@producers) {
+ $_->join();
+}
+
+sub set_fh_non_blocking {
+ # Set filehandle as non-blocking
+ # Inputs:
+ # $fh = filehandle to be blocking
+ # Returns:
+ # N/A
+ my $fh = shift;
+ $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;";
+ my $flags;
+ fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle
+ $flags |= &O_NONBLOCK; # Add non-blocking to the flags
+ fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle
+}
diff --git a/src/parcat.pod b/src/parcat.pod
new file mode 100644
index 0000000..f41e530
--- /dev/null
+++ b/src/parcat.pod
@@ -0,0 +1,191 @@
+#!/usr/bin/perl
+
+# SPDX-FileCopyrightText: 2021-2024 Ole Tange, http://ole.tange.dk and Free Software and Foundation, Inc.
+# SPDX-License-Identifier: GFDL-1.3-or-later
+# SPDX-License-Identifier: CC-BY-SA-4.0
+
+=head1 NAME
+
+parcat - cat files or fifos in parallel
+
+=head1 SYNOPSIS
+
+B<parcat> [--rm] [-#] file(s) [-#] file(s)
+
+=head1 DESCRIPTION
+
+GNU B<parcat> reads files or fifos in parallel. It writes full lines
+so there will be no problem with mixed-half-lines which you risk if
+you use:
+
+ (cat file1 & cat file2 &) | ...
+
+It is faster than doing:
+
+ parallel -j0 --lb cat ::: file*
+
+Arguments can be given on the command line or passed in on stdin
+(standard input).
+
+=head1 OPTIONS
+
+=over 9
+
+=item -B<#>
+
+Arguments following this will be sent to the file descriptor B<#>. E.g.
+
+ parcat -1 stdout1 stdout2 -2 stderr1 stderr2
+
+will send I<stdout1> and I<stdout2> to stdout (standard output = file
+descriptor 1), and send I<stderr1> and I<stderr2> to stderr (standard
+error = file descriptor 2).
+
+=item --rm
+
+Remove files after opening. As soon as the files are opened, unlink
+the files.
+
+=back
+
+=head1 EXAMPLES
+
+=head2 Simple line buffered output
+
+B<traceroute> will often print half a line. If run in parallel, two
+instances may half-lines of their output. This can be avoided by
+saving the output to a fifo and then using B<parcat> to read the two
+fifos in parallel:
+
+ mkfifo freenetproject.org.fifo tange.dk.fifo
+ traceroute freenetproject.org > freenetproject.org.fifo &
+ traceroute tange.dk > tange.dk.fifo &
+ parcat --rm *fifo
+
+
+=head1 REPORTING BUGS
+
+GNU B<parcat> is part of GNU B<parallel>. Report bugs to
+<bug-parallel@gnu.org>.
+
+
+=head1 AUTHOR
+
+Copyright (C) 2016-2024 Ole Tange, http://ole.tange.dk and Free
+Software Foundation, Inc.
+
+=head1 LICENSE
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; either version 3 of the License, or
+at your option any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+=head2 Documentation license I
+
+Permission is granted to copy, distribute and/or modify this
+documentation under the terms of the GNU Free Documentation License,
+Version 1.3 or any later version published by the Free Software
+Foundation; with no Invariant Sections, with no Front-Cover Texts, and
+with no Back-Cover Texts. A copy of the license is included in the
+file LICENSES/GFDL-1.3-or-later.txt.
+
+=head2 Documentation license II
+
+You are free:
+
+=over 9
+
+=item B<to Share>
+
+to copy, distribute and transmit the work
+
+=item B<to Remix>
+
+to adapt the work
+
+=back
+
+Under the following conditions:
+
+=over 9
+
+=item B<Attribution>
+
+You must attribute the work in the manner specified by the author or
+licensor (but not in any way that suggests that they endorse you or
+your use of the work).
+
+=item B<Share Alike>
+
+If you alter, transform, or build upon this work, you may distribute
+the resulting work only under the same, similar or a compatible
+license.
+
+=back
+
+With the understanding that:
+
+=over 9
+
+=item B<Waiver>
+
+Any of the above conditions can be waived if you get permission from
+the copyright holder.
+
+=item B<Public Domain>
+
+Where the work or any of its elements is in the public domain under
+applicable law, that status is in no way affected by the license.
+
+=item B<Other Rights>
+
+In no way are any of the following rights affected by the license:
+
+=over 9
+
+=item *
+
+Your fair dealing or fair use rights, or other applicable
+copyright exceptions and limitations;
+
+=item *
+
+The author's moral rights;
+
+=item *
+
+Rights other persons may have either in the work itself or in
+how the work is used, such as publicity or privacy rights.
+
+=back
+
+=item B<Notice>
+
+For any reuse or distribution, you must make clear to others the
+license terms of this work.
+
+=back
+
+A copy of the full license is included in the file as
+LICENCES/CC-BY-SA-4.0.txt
+
+
+=head1 DEPENDENCIES
+
+GNU B<parcat> uses Perl.
+
+
+=head1 SEE ALSO
+
+B<cat>(1), B<parallel>(1)
+
+=cut