diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 09:43:15 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 09:43:15 +0000 |
commit | 4fd40781fe3edc4ae5895e3cfddfbfea328ac0e8 (patch) | |
tree | efb19f2d9385f548990904757cdf44f3b9546fca /src/parcat | |
parent | Initial commit. (diff) | |
download | parallel-4fd40781fe3edc4ae5895e3cfddfbfea328ac0e8.tar.xz parallel-4fd40781fe3edc4ae5895e3cfddfbfea328ac0e8.zip |
Adding upstream version 20240222+ds.upstream/20240222+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/parcat')
-rwxr-xr-x | src/parcat | 194 |
1 files changed, 194 insertions, 0 deletions
diff --git a/src/parcat b/src/parcat new file mode 100755 index 0000000..b4956b0 --- /dev/null +++ b/src/parcat @@ -0,0 +1,194 @@ +#!/usr/bin/perl + +# Copyright (C) 2016-2024 Ole Tange, http://ole.tange.dk and Free +# Software Foundation, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, see <https://www.gnu.org/licenses/> +# or write to the Free Software Foundation, Inc., 51 Franklin St, +# Fifth Floor, Boston, MA 02110-1301 USA +# +# SPDX-FileCopyrightText: 2021-2024 Ole Tange, http://ole.tange.dk and Free Software and Foundation, Inc. +# SPDX-License-Identifier: GPL-3.0-or-later + +use Symbol qw(gensym); +use IPC::Open3; +use POSIX qw(:errno_h); +use IO::Select; +use strict; +use threads; +use threads::shared; +use Thread::Queue; + + +my $opened :shared; +my $q = Thread::Queue->new(); +my $okq = Thread::Queue->new(); +my @producers; + +if(not @ARGV) { + if(-t *STDIN) { + print "Usage:\n"; + print " parcat file(s)\n"; + print " cat argfile | parcat\n"; + } else { + # Read arguments from stdin + chomp(@ARGV = <STDIN>); + } +} + +my $files_to_open = 0; +# Default: fd = stdout +my $fd = 1; +for (@ARGV) { + # --rm = remove file when opened + /^--rm$/ and do { $opt::rm = 1; next; }; + # -1 = output to fd 1, -2 = output to fd 2 + /^-(\d+)$/ and do { $fd = $1; next; }; + push @producers, threads->create('producer', $_, $fd); + $files_to_open++; +} + +sub producer { + # Open a file/fifo, set non blocking, enqueue fileno of the file handle + my $file = shift; + my $output_fd = shift; + open(my $fh, "<", $file) || do { + print STDERR "parcat: Cannot open $file\n"; + exit(1); + }; + # Remove file when it has been opened + if($opt::rm) { + unlink $file; + } + set_fh_non_blocking($fh); + $opened++; + # Pass the fileno to parent + $q->enqueue(fileno($fh),$output_fd); + # Get an OK that the $fh is opened and we can release the $fh + while(1) { + my $ok = $okq->dequeue(); + if($ok == fileno($fh)) { last; } + # Not ours - very unlikely to happen + $okq->enqueue($ok); + } + return; +} + +my $s = IO::Select->new(); +my %buffer; + +sub add_file { + my $infd = shift; + my $outfd = shift; + open(my $infh, "<&=", $infd) || die; + open(my $outfh, ">&=", $outfd) || die; + $s->add($infh); + # Tell the producer now opened here and can be released + $okq->enqueue($infd); + # Initialize the buffer + @{$buffer{$infh}{$outfd}} = (); + $Global::fh{$outfd} = $outfh; +} + +sub add_files { + # Non-blocking dequeue + my ($infd,$outfd); + do { + ($infd,$outfd) = $q->dequeue_nb(2); + if(defined($outfd)) { + add_file($infd,$outfd); + } + } while(defined($outfd)); +} + +sub add_files_block { + # Blocking dequeue + my ($infd,$outfd) = $q->dequeue(2); + add_file($infd,$outfd); +} + + +my $fd; +my (@ready,$infh,$rv,$buf); +do { + # Wait until at least one file is opened + add_files_block(); + while($q->pending or keys %buffer) { + add_files(); + while(keys %buffer) { + @ready = $s->can_read(0.01); + if(not @ready) { + add_files(); + } + for $infh (@ready) { + # There is only one key, namely the output file descriptor + for my $outfd (keys %{$buffer{$infh}}) { + $rv = sysread($infh, $buf, 65536); + if (!$rv) { + if($! == EAGAIN) { + # Would block: Nothing read + next; + } else { + # Nothing read, but would not block: + # This file is done + $s->remove($infh); + for(@{$buffer{$infh}{$outfd}}) { + syswrite($Global::fh{$outfd},$_); + } + delete $buffer{$infh}; + # Closing the $infh causes it to block + # close $infh; + add_files(); + next; + } + } + # Something read. + # Find \n or \r for full line + my $i = (rindex($buf,"\n")+1); + if($i) { + # Print full line + for(@{$buffer{$infh}{$outfd}}, substr($buf,0,$i)) { + syswrite($Global::fh{$outfd},$_); + } + # @buffer = remaining half line + $buffer{$infh}{$outfd} = [substr($buf,$i,$rv-$i)]; + } else { + # Something read, but not a full line + push @{$buffer{$infh}{$outfd}}, $buf; + } + redo; + } + } + } + } +} while($opened < $files_to_open); + + +for (@producers) { + $_->join(); +} + +sub set_fh_non_blocking { + # Set filehandle as non-blocking + # Inputs: + # $fh = filehandle to be blocking + # Returns: + # N/A + my $fh = shift; + $Global::use{"Fcntl"} ||= eval "use Fcntl qw(:DEFAULT :flock); 1;"; + my $flags; + fcntl($fh, &F_GETFL, $flags) || die $!; # Get the current flags on the filehandle + $flags |= &O_NONBLOCK; # Add non-blocking to the flags + fcntl($fh, &F_SETFL, $flags) || die $!; # Set the flags on the filehandle +} |