#!/usr/bin/perl -w ## ## radsqlrelay.pl This program tails a SQL logfile and forwards ## the queries to a database server. Used to ## replicate accounting records to one (central) ## database, even if the database has extended ## downtime. ## ## Version: $Id$ ## ## Author: Nicolas Baradakis ## ## Copyright (C) 2005 Cegetel ## Copyright (C) 2019 Network RADIUS ## ## This program is free software; you can redistribute it and/or ## modify it under the terms of the GNU General Public License ## as published by the Free Software Foundation; either version 2 ## of the License, or (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, write to the Free Software ## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA ## use strict; use DBI; use Fcntl; use Getopt::Std; use POSIX qw(:unistd_h :errno_h); use Time::HiRes qw(clock_gettime usleep CLOCK_MONOTONIC); # We send watchdogs at half the indicated interval if the # Linux::Systemd::Daemon module is available and the WATCHDOG_USEC environment # variable is set by the systemd service manager. my $watchdog_usec; my $next_watchdog; eval { require Linux::Systemd::Daemon; Linux::Systemd::Daemon->import(); $watchdog_usec = $ENV{'WATCHDOG_USEC'} || undef; }; # To synthetically test the watchdog functionality then uncomment these next # lines: # # $watchdog_usec = 30 *1000*1000; # sub sd_notify {} my $maxcollect = 100; # tunable, works for MySQL! my $lastinsert; my @values; my $need_exit = 0; my $debug = 0; sub got_signal() { $need_exit = 1; sd_notify(stopping => 1, status => 'Signalled. Shutting down.') if $watchdog_usec; } sub debug { print shift if $debug; } # /!\ OS-dependent structure # Linux struct flock # short l_type; # short l_whence; # off_t l_start; # off_t l_len; # pid_t l_pid; # c2ph says: typedef='s2 l2 i', sizeof=16 my $FLOCK_STRUCT = 's2l2i'; sub setlock($;$$) { my ($fh, $start, $len) = @_; $start = 0 unless defined $start; $len = 0 unless defined $len; #type whence start till pid my $packed = pack($FLOCK_STRUCT, F_WRLCK, SEEK_SET, $start, $len, 0); if (fcntl($fh, F_SETLKW, $packed)) { return 1 } else { return 0 } } sub usage() { print STDERR <= $next_watchdog) { $next_watchdog=$now+($watchdog_usec / 2); debug "Sending watchdog\n"; sd_notify(watchdog => 1); debug "Next watchdog due in ".(($next_watchdog-$now)/1000/1000)." secs.\n"; } # Don't oversleep! $interval=$next_watchdog-$now if $next_watchdog-$now < $interval; } return unless $interval; # Don't yield if we are not asked to sleep debug "Sleeping for ".($interval/1000/1000)." secs.\n"; usleep ($interval); } sub connect_wait($) { my $dbinfo = shift; my $dbh; debug "Connecting to " . $dbinfo->{base}; while (!$dbh) { debug "."; $dbh = DBI->connect($dbinfo->{base}, $dbinfo->{user}, $dbinfo->{pass}, { RaiseError => 0, PrintError => 0, AutoCommit => 1 }); sleep_for (1) if !$dbh; exit if $need_exit; } debug "\n"; $dbinfo->{handle} = $dbh; } sub process_file($$) { my ($dbinfo, $path) = @_; sub do_inserts($) { my $dbinfo = shift; debug "I"; if (scalar(@values) > 0) { my $query = $lastinsert . " "; $query .= join(" ), ( ",@values); $query .= " );"; do_query($dbinfo,$query); } @values = (); } sub do_query($$) { my ($dbinfo,$query) = @_; debug ">"; until ($dbinfo->{handle}->do($query)) { # If an error occured and we're disconnected then try to recomnnect # and redo the query, otherwise give up so we don't become stuck. print $dbinfo->{handle}->errstr."\n"; if ($dbinfo->{handle}->ping) { sleep_for (1); last; } else { print "error: Lost connection to database\n"; $dbinfo->{handle}->disconnect; connect_wait($dbinfo); } } sleep_for(0) if $watchdog_usec; # Send a watchdog if it is due } unless (-e $path.'.work') { debug "Waiting for $path\n"; until (rename($path, $path.'.work')) { if ($! == ENOENT) { sleep_for(1); return if $need_exit; } else { print STDERR "error: Couldn't move $path to $path.work: $!\n"; exit 1; } } debug "Renamed $path to $path.work\n"; } debug "\nOpening $path.work\n"; open(FILE, "+< $path.work") or die "error: Couldn't open $path.work: $!\n"; debug "Getting file lock\n"; setlock(\*FILE) or die "error: Couldn't lock $path.work: $!\n"; $lastinsert = ""; @values = (); debug "Reading: "; my $lines = 0; while () { chomp (my $line = $_); $lines++; if (!($line =~ /^\s*insert\s+into\s+`?\w+`?\s+(?:\(.*?\)\s+)? values\s*\(.*\)\s*;\s*$/ix)) { # This is no INSERT, so start new collection do_inserts($dbinfo); debug "."; $lastinsert = ""; # must output this line do_query($dbinfo, "$line"); } else { # This is an INSERT, so collect it debug "+"; my $insert = $line; my $values = $line; $insert =~ s/^\s*(insert\s+into\s+`?\w+`?\s+(?:\(.*?\)\s+)? values\s*\().*\)\s*;\s*$/$1/ix; $values =~ s/^\s*insert\s+into\s+`?\w+`?\s+(?:\(.*?\)\s+)? values\s*\((.*)\)\s*;\s*$/$1/ix; if (($lastinsert ne "") && ($insert ne $lastinsert)) { # This is different from the last one do_inserts($dbinfo); } push(@values, $values); $lastinsert = $insert; # start new collection } # limit to $maxcollect collected lines if (scalar(@values) >= $maxcollect) { debug "hit maxcollect limit, doing inserts"; do_inserts($dbinfo); } } # Cleanup debug "\nNo more lines to read, doing any final inserts: "; do_inserts($dbinfo); debug "\n"; debug "Processed $lines lines\n"; debug "Removing and closing $path.work\n\n"; unlink($path.'.work'); close(FILE); # and unlock } # sub main() my %args = ( b => 'radius', d => 'mysql', h => 'localhost', p => 'radius', u => 'radius', ); my $ret = getopts("b:d:f:h:P:p:u:x1?", \%args); if (!$ret or @ARGV != 1) { usage(); exit 1; } if ($args{'?'}) { usage(); exit 0; } $debug = 1 if $args{'x'}; my $data_source; if (lc($args{d}) eq 'mysql') { $data_source = "DBI:mysql:database=$args{b};host=$args{h}"; } elsif (lc($args{d}) eq 'pg') { $data_source = "DBI:Pg:dbname=$args{b};host=$args{h}"; } elsif (lc($args{d}) eq 'oracle') { $data_source = "DBI:Oracle:$args{b}"; # Oracle does not conform to the SQL standard for multirow INSERTs $maxcollect = 1; } else { print STDERR "error: SQL driver not supported yet: $args{d}\n"; exit 1; } $data_source .= ";port=$args{P}" if $args{'P'}; my $pw; if($args{f}) { open(FILE, "< $args{f}") or die "error: Couldn't open $args{f}: $!\n"; $pw = ; chomp($pw); close(FILE); } else { # args{p} is always defined. $pw = $args{p}; } $SIG{INT} = \&got_signal; $SIG{TERM} = \&got_signal; if ($watchdog_usec) { debug "Watchdog set to $watchdog_usec\n"; my $now=clock_gettime(CLOCK_MONOTONIC)*1000*1000; $next_watchdog=$now+($watchdog_usec / 2); sd_notify(ready => 1, status => 'Started'); } my %dbinfo = ( base => $data_source, user => $args{u}, pass => $pw, ); connect_wait(\%dbinfo); my $path = shift @ARGV; until ($need_exit) { process_file(\%dbinfo, $path); last if ($args{1} || $need_exit); debug "Sleeping\n"; sleep_for(10); } debug "Disconnecting from database\n"; $dbinfo{handle}->disconnect;