diff options
Diffstat (limited to 'scripts/sql/radsqlrelay')
-rwxr-xr-x | scripts/sql/radsqlrelay | 342 |
1 files changed, 342 insertions, 0 deletions
diff --git a/scripts/sql/radsqlrelay b/scripts/sql/radsqlrelay new file mode 100755 index 0000000..74531ba --- /dev/null +++ b/scripts/sql/radsqlrelay @@ -0,0 +1,342 @@ +#!/usr/bin/perl -w +## +## radsqlrelay.pl This program tails a SQL logfile and forwards +## the queries to a database server. Used to +## replicate accounting records to one (central) +## database, even if the database has extended +## downtime. +## +## Version: $Id$ +## +## Author: Nicolas Baradakis <nicolas.baradakis@cegetel.net> +## +## Copyright (C) 2005 Cegetel +## Copyright (C) 2019 Network RADIUS +## +## This program is free software; you can redistribute it and/or +## modify it under the terms of the GNU General Public License +## as published by the Free Software Foundation; either version 2 +## of the License, or (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program; if not, write to the Free Software +## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA +## + +use strict; + +use DBI; +use Fcntl; +use Getopt::Std; +use POSIX qw(:unistd_h :errno_h); +use Time::HiRes qw(clock_gettime usleep CLOCK_MONOTONIC); + +# We send watchdogs at half the indicated interval if the +# Linux::Systemd::Daemon module is available and the WATCHDOG_USEC environment +# variable is set by the systemd service manager. +my $watchdog_usec; +my $next_watchdog; +eval { + require Linux::Systemd::Daemon; + Linux::Systemd::Daemon->import(); + $watchdog_usec = $ENV{'WATCHDOG_USEC'} || undef; +}; + +# To synthetically test the watchdog functionality then uncomment these next +# lines: +# +# $watchdog_usec = 30 *1000*1000; +# sub sd_notify {} + +my $maxcollect = 100; # tunable, works for MySQL! + +my $lastinsert; +my @values; + +my $need_exit = 0; +my $debug = 0; + +sub got_signal() +{ + $need_exit = 1; + sd_notify(stopping => 1, status => 'Signalled. Shutting down.') if $watchdog_usec; +} + +sub debug +{ + print shift if $debug; +} + +# /!\ OS-dependent structure +# Linux struct flock +# short l_type; +# short l_whence; +# off_t l_start; +# off_t l_len; +# pid_t l_pid; +# c2ph says: typedef='s2 l2 i', sizeof=16 +my $FLOCK_STRUCT = 's2l2i'; + +sub setlock($;$$) +{ + my ($fh, $start, $len) = @_; + $start = 0 unless defined $start; + $len = 0 unless defined $len; + + #type whence start till pid + my $packed = pack($FLOCK_STRUCT, F_WRLCK, SEEK_SET, $start, $len, 0); + if (fcntl($fh, F_SETLKW, $packed)) { return 1 } + else { return 0 } +} + +sub usage() +{ + print STDERR <<HERE; +usage: radsqlrelay [options] file_path +options: + -? Print this help message. + -1 One-shot mode: push the file to database and exit. + -b database Name of the database to use. + -d sql_driver Driver to use: mysql, pg, oracle. + -f file Read password from file, instead of command line. + -h host Connect to host. + -P port Port number to use for connection. + -p password Password to use when connecting to server. + -u user User for login. + -x Turn on debugging. +HERE +} + + +# Sleep for given amount of time, but don't oversleep the watchdog interval. +# Send a watchdog notification if it is due. +# interval can be given as 0 to just check whether a watchdog is due and send +# it as necessary, in which case we do not yield. +sub sleep_for ($) +{ + my $interval=shift; + $interval*=1000*1000; + if ($watchdog_usec) { + my $now=clock_gettime(CLOCK_MONOTONIC)*1000*1000; + if ($now >= $next_watchdog) { + $next_watchdog=$now+($watchdog_usec / 2); + debug "Sending watchdog\n"; + sd_notify(watchdog => 1); + debug "Next watchdog due in ".(($next_watchdog-$now)/1000/1000)." secs.\n"; + } + # Don't oversleep! + $interval=$next_watchdog-$now if $next_watchdog-$now < $interval; + } + return unless $interval; # Don't yield if we are not asked to sleep + debug "Sleeping for ".($interval/1000/1000)." secs.\n"; + usleep ($interval); +} + + +sub connect_wait($) +{ + my $dbinfo = shift; + my $dbh; + debug "Connecting to " . $dbinfo->{base}; + while (!$dbh) { + debug "."; + $dbh = DBI->connect($dbinfo->{base}, $dbinfo->{user}, $dbinfo->{pass}, + { RaiseError => 0, PrintError => 0, + AutoCommit => 1 }); + sleep_for (1) if !$dbh; + exit if $need_exit; + } + debug "\n"; + $dbinfo->{handle} = $dbh; +} + + + +sub process_file($$) +{ + my ($dbinfo, $path) = @_; + + sub do_inserts($) { + my $dbinfo = shift; + debug "I"; + if (scalar(@values) > 0) { + my $query = $lastinsert . " "; + $query .= join(" ), ( ",@values); + $query .= " );"; + do_query($dbinfo,$query); + } + @values = (); + } + + sub do_query($$) { + my ($dbinfo,$query) = @_; + debug ">"; + until ($dbinfo->{handle}->do($query)) { + # If an error occured and we're disconnected then try to recomnnect + # and redo the query, otherwise give up so we don't become stuck. + print $dbinfo->{handle}->errstr."\n"; + if ($dbinfo->{handle}->ping) { + sleep_for (1); + last; + } else { + print "error: Lost connection to database\n"; + $dbinfo->{handle}->disconnect; + connect_wait($dbinfo); + } + } + sleep_for(0) if $watchdog_usec; # Send a watchdog if it is due + } + + unless (-e $path.'.work') { + debug "Waiting for $path\n"; + until (rename($path, $path.'.work')) { + if ($! == ENOENT) { + sleep_for(1); + return if $need_exit; + } else { + print STDERR "error: Couldn't move $path to $path.work: $!\n"; + exit 1; + } + } + debug "Renamed $path to $path.work\n"; + } + + debug "\nOpening $path.work\n"; + open(FILE, "+< $path.work") or die "error: Couldn't open $path.work: $!\n"; + debug "Getting file lock\n"; + setlock(\*FILE) or die "error: Couldn't lock $path.work: $!\n"; + + $lastinsert = ""; + @values = (); + + debug "Reading: "; + my $lines = 0; + while (<FILE>) { + chomp (my $line = $_); + $lines++; + + if (!($line =~ /^\s*insert\s+into\s+`?\w+`?\s+(?:\(.*?\)\s+)? + values\s*\(.*\)\s*;\s*$/ix)) { + # This is no INSERT, so start new collection + do_inserts($dbinfo); + debug "."; + $lastinsert = ""; + # must output this line + do_query($dbinfo, "$line"); + + } else { + # This is an INSERT, so collect it + debug "+"; + my $insert = $line; + my $values = $line; + $insert =~ s/^\s*(insert\s+into\s+`?\w+`?\s+(?:\(.*?\)\s+)? + values\s*\().*\)\s*;\s*$/$1/ix; + $values =~ s/^\s*insert\s+into\s+`?\w+`?\s+(?:\(.*?\)\s+)? + values\s*\((.*)\)\s*;\s*$/$1/ix; + + if (($lastinsert ne "") && ($insert ne $lastinsert)) { + # This is different from the last one + do_inserts($dbinfo); + } + push(@values, $values); + $lastinsert = $insert; # start new collection + } + + # limit to $maxcollect collected lines + if (scalar(@values) >= $maxcollect) { + debug "hit maxcollect limit, doing inserts"; + do_inserts($dbinfo); + } + } + + # Cleanup + debug "\nNo more lines to read, doing any final inserts: "; + do_inserts($dbinfo); + debug "\n"; + + debug "Processed $lines lines\n"; + debug "Removing and closing $path.work\n\n"; + unlink($path.'.work'); + close(FILE); # and unlock +} + +# sub main() + +my %args = ( + b => 'radius', + d => 'mysql', + h => 'localhost', + p => 'radius', + u => 'radius', +); +my $ret = getopts("b:d:f:h:P:p:u:x1?", \%args); +if (!$ret or @ARGV != 1) { + usage(); + exit 1; +} +if ($args{'?'}) { + usage(); + exit 0; +} +$debug = 1 if $args{'x'}; + +my $data_source; +if (lc($args{d}) eq 'mysql') { + $data_source = "DBI:mysql:database=$args{b};host=$args{h}"; +} elsif (lc($args{d}) eq 'pg') { + $data_source = "DBI:Pg:dbname=$args{b};host=$args{h}"; +} elsif (lc($args{d}) eq 'oracle') { + $data_source = "DBI:Oracle:$args{b}"; + # Oracle does not conform to the SQL standard for multirow INSERTs + $maxcollect = 1; +} else { + print STDERR "error: SQL driver not supported yet: $args{d}\n"; + exit 1; +} +$data_source .= ";port=$args{P}" if $args{'P'}; + +my $pw; +if($args{f}) { + open(FILE, "< $args{f}") or die "error: Couldn't open $args{f}: $!\n"; + $pw = <FILE>; + chomp($pw); + close(FILE); +} else { + # args{p} is always defined. + $pw = $args{p}; +} + +$SIG{INT} = \&got_signal; +$SIG{TERM} = \&got_signal; + +if ($watchdog_usec) { + debug "Watchdog set to $watchdog_usec\n"; + my $now=clock_gettime(CLOCK_MONOTONIC)*1000*1000; + $next_watchdog=$now+($watchdog_usec / 2); + sd_notify(ready => 1, status => 'Started'); +} + +my %dbinfo = ( + base => $data_source, + user => $args{u}, + pass => $pw, +); +connect_wait(\%dbinfo); + +my $path = shift @ARGV; + +until ($need_exit) { + process_file(\%dbinfo, $path); + last if ($args{1} || $need_exit); + debug "Sleeping\n"; + sleep_for(10); +} + +debug "Disconnecting from database\n"; +$dbinfo{handle}->disconnect; + |