summaryrefslogtreecommitdiffstats
path: root/scripts/sql/radsqlrelay
diff options
context:
space:
mode:
Diffstat (limited to '')
-rwxr-xr-xscripts/sql/radsqlrelay342
1 files changed, 342 insertions, 0 deletions
diff --git a/scripts/sql/radsqlrelay b/scripts/sql/radsqlrelay
new file mode 100755
index 0000000..74531ba
--- /dev/null
+++ b/scripts/sql/radsqlrelay
@@ -0,0 +1,342 @@
+#!/usr/bin/perl -w
+##
+## radsqlrelay.pl This program tails a SQL logfile and forwards
+## the queries to a database server. Used to
+## replicate accounting records to one (central)
+## database, even if the database has extended
+## downtime.
+##
+## Version: $Id$
+##
+## Author: Nicolas Baradakis <nicolas.baradakis@cegetel.net>
+##
+## Copyright (C) 2005 Cegetel
+## Copyright (C) 2019 Network RADIUS
+##
+## This program is free software; you can redistribute it and/or
+## modify it under the terms of the GNU General Public License
+## as published by the Free Software Foundation; either version 2
+## of the License, or (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, write to the Free Software
+## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
+##
+
+use strict;
+
+use DBI;
+use Fcntl;
+use Getopt::Std;
+use POSIX qw(:unistd_h :errno_h);
+use Time::HiRes qw(clock_gettime usleep CLOCK_MONOTONIC);
+
+# We send watchdogs at half the indicated interval if the
+# Linux::Systemd::Daemon module is available and the WATCHDOG_USEC environment
+# variable is set by the systemd service manager.
+my $watchdog_usec;
+my $next_watchdog;
+eval {
+ require Linux::Systemd::Daemon;
+ Linux::Systemd::Daemon->import();
+ $watchdog_usec = $ENV{'WATCHDOG_USEC'} || undef;
+};
+
+# To synthetically test the watchdog functionality then uncomment these next
+# lines:
+#
+# $watchdog_usec = 30 *1000*1000;
+# sub sd_notify {}
+
+my $maxcollect = 100; # tunable, works for MySQL!
+
+my $lastinsert;
+my @values;
+
+my $need_exit = 0;
+my $debug = 0;
+
+sub got_signal()
+{
+ $need_exit = 1;
+ sd_notify(stopping => 1, status => 'Signalled. Shutting down.') if $watchdog_usec;
+}
+
+sub debug
+{
+ print shift if $debug;
+}
+
+# /!\ OS-dependent structure
+# Linux struct flock
+# short l_type;
+# short l_whence;
+# off_t l_start;
+# off_t l_len;
+# pid_t l_pid;
+# c2ph says: typedef='s2 l2 i', sizeof=16
+my $FLOCK_STRUCT = 's2l2i';
+
+sub setlock($;$$)
+{
+ my ($fh, $start, $len) = @_;
+ $start = 0 unless defined $start;
+ $len = 0 unless defined $len;
+
+ #type whence start till pid
+ my $packed = pack($FLOCK_STRUCT, F_WRLCK, SEEK_SET, $start, $len, 0);
+ if (fcntl($fh, F_SETLKW, $packed)) { return 1 }
+ else { return 0 }
+}
+
+sub usage()
+{
+ print STDERR <<HERE;
+usage: radsqlrelay [options] file_path
+options:
+ -? Print this help message.
+ -1 One-shot mode: push the file to database and exit.
+ -b database Name of the database to use.
+ -d sql_driver Driver to use: mysql, pg, oracle.
+ -f file Read password from file, instead of command line.
+ -h host Connect to host.
+ -P port Port number to use for connection.
+ -p password Password to use when connecting to server.
+ -u user User for login.
+ -x Turn on debugging.
+HERE
+}
+
+
+# Sleep for given amount of time, but don't oversleep the watchdog interval.
+# Send a watchdog notification if it is due.
+# interval can be given as 0 to just check whether a watchdog is due and send
+# it as necessary, in which case we do not yield.
+sub sleep_for ($)
+{
+ my $interval=shift;
+ $interval*=1000*1000;
+ if ($watchdog_usec) {
+ my $now=clock_gettime(CLOCK_MONOTONIC)*1000*1000;
+ if ($now >= $next_watchdog) {
+ $next_watchdog=$now+($watchdog_usec / 2);
+ debug "Sending watchdog\n";
+ sd_notify(watchdog => 1);
+ debug "Next watchdog due in ".(($next_watchdog-$now)/1000/1000)." secs.\n";
+ }
+ # Don't oversleep!
+ $interval=$next_watchdog-$now if $next_watchdog-$now < $interval;
+ }
+ return unless $interval; # Don't yield if we are not asked to sleep
+ debug "Sleeping for ".($interval/1000/1000)." secs.\n";
+ usleep ($interval);
+}
+
+
+sub connect_wait($)
+{
+ my $dbinfo = shift;
+ my $dbh;
+ debug "Connecting to " . $dbinfo->{base};
+ while (!$dbh) {
+ debug ".";
+ $dbh = DBI->connect($dbinfo->{base}, $dbinfo->{user}, $dbinfo->{pass},
+ { RaiseError => 0, PrintError => 0,
+ AutoCommit => 1 });
+ sleep_for (1) if !$dbh;
+ exit if $need_exit;
+ }
+ debug "\n";
+ $dbinfo->{handle} = $dbh;
+}
+
+
+
+sub process_file($$)
+{
+ my ($dbinfo, $path) = @_;
+
+ sub do_inserts($) {
+ my $dbinfo = shift;
+ debug "I";
+ if (scalar(@values) > 0) {
+ my $query = $lastinsert . " ";
+ $query .= join(" ), ( ",@values);
+ $query .= " );";
+ do_query($dbinfo,$query);
+ }
+ @values = ();
+ }
+
+ sub do_query($$) {
+ my ($dbinfo,$query) = @_;
+ debug ">";
+ until ($dbinfo->{handle}->do($query)) {
+ # If an error occured and we're disconnected then try to recomnnect
+ # and redo the query, otherwise give up so we don't become stuck.
+ print $dbinfo->{handle}->errstr."\n";
+ if ($dbinfo->{handle}->ping) {
+ sleep_for (1);
+ last;
+ } else {
+ print "error: Lost connection to database\n";
+ $dbinfo->{handle}->disconnect;
+ connect_wait($dbinfo);
+ }
+ }
+ sleep_for(0) if $watchdog_usec; # Send a watchdog if it is due
+ }
+
+ unless (-e $path.'.work') {
+ debug "Waiting for $path\n";
+ until (rename($path, $path.'.work')) {
+ if ($! == ENOENT) {
+ sleep_for(1);
+ return if $need_exit;
+ } else {
+ print STDERR "error: Couldn't move $path to $path.work: $!\n";
+ exit 1;
+ }
+ }
+ debug "Renamed $path to $path.work\n";
+ }
+
+ debug "\nOpening $path.work\n";
+ open(FILE, "+< $path.work") or die "error: Couldn't open $path.work: $!\n";
+ debug "Getting file lock\n";
+ setlock(\*FILE) or die "error: Couldn't lock $path.work: $!\n";
+
+ $lastinsert = "";
+ @values = ();
+
+ debug "Reading: ";
+ my $lines = 0;
+ while (<FILE>) {
+ chomp (my $line = $_);
+ $lines++;
+
+ if (!($line =~ /^\s*insert\s+into\s+`?\w+`?\s+(?:\(.*?\)\s+)?
+ values\s*\(.*\)\s*;\s*$/ix)) {
+ # This is no INSERT, so start new collection
+ do_inserts($dbinfo);
+ debug ".";
+ $lastinsert = "";
+ # must output this line
+ do_query($dbinfo, "$line");
+
+ } else {
+ # This is an INSERT, so collect it
+ debug "+";
+ my $insert = $line;
+ my $values = $line;
+ $insert =~ s/^\s*(insert\s+into\s+`?\w+`?\s+(?:\(.*?\)\s+)?
+ values\s*\().*\)\s*;\s*$/$1/ix;
+ $values =~ s/^\s*insert\s+into\s+`?\w+`?\s+(?:\(.*?\)\s+)?
+ values\s*\((.*)\)\s*;\s*$/$1/ix;
+
+ if (($lastinsert ne "") && ($insert ne $lastinsert)) {
+ # This is different from the last one
+ do_inserts($dbinfo);
+ }
+ push(@values, $values);
+ $lastinsert = $insert; # start new collection
+ }
+
+ # limit to $maxcollect collected lines
+ if (scalar(@values) >= $maxcollect) {
+ debug "hit maxcollect limit, doing inserts";
+ do_inserts($dbinfo);
+ }
+ }
+
+ # Cleanup
+ debug "\nNo more lines to read, doing any final inserts: ";
+ do_inserts($dbinfo);
+ debug "\n";
+
+ debug "Processed $lines lines\n";
+ debug "Removing and closing $path.work\n\n";
+ unlink($path.'.work');
+ close(FILE); # and unlock
+}
+
+# sub main()
+
+my %args = (
+ b => 'radius',
+ d => 'mysql',
+ h => 'localhost',
+ p => 'radius',
+ u => 'radius',
+);
+my $ret = getopts("b:d:f:h:P:p:u:x1?", \%args);
+if (!$ret or @ARGV != 1) {
+ usage();
+ exit 1;
+}
+if ($args{'?'}) {
+ usage();
+ exit 0;
+}
+$debug = 1 if $args{'x'};
+
+my $data_source;
+if (lc($args{d}) eq 'mysql') {
+ $data_source = "DBI:mysql:database=$args{b};host=$args{h}";
+} elsif (lc($args{d}) eq 'pg') {
+ $data_source = "DBI:Pg:dbname=$args{b};host=$args{h}";
+} elsif (lc($args{d}) eq 'oracle') {
+ $data_source = "DBI:Oracle:$args{b}";
+ # Oracle does not conform to the SQL standard for multirow INSERTs
+ $maxcollect = 1;
+} else {
+ print STDERR "error: SQL driver not supported yet: $args{d}\n";
+ exit 1;
+}
+$data_source .= ";port=$args{P}" if $args{'P'};
+
+my $pw;
+if($args{f}) {
+ open(FILE, "< $args{f}") or die "error: Couldn't open $args{f}: $!\n";
+ $pw = <FILE>;
+ chomp($pw);
+ close(FILE);
+} else {
+ # args{p} is always defined.
+ $pw = $args{p};
+}
+
+$SIG{INT} = \&got_signal;
+$SIG{TERM} = \&got_signal;
+
+if ($watchdog_usec) {
+ debug "Watchdog set to $watchdog_usec\n";
+ my $now=clock_gettime(CLOCK_MONOTONIC)*1000*1000;
+ $next_watchdog=$now+($watchdog_usec / 2);
+ sd_notify(ready => 1, status => 'Started');
+}
+
+my %dbinfo = (
+ base => $data_source,
+ user => $args{u},
+ pass => $pw,
+);
+connect_wait(\%dbinfo);
+
+my $path = shift @ARGV;
+
+until ($need_exit) {
+ process_file(\%dbinfo, $path);
+ last if ($args{1} || $need_exit);
+ debug "Sleeping\n";
+ sleep_for(10);
+}
+
+debug "Disconnecting from database\n";
+$dbinfo{handle}->disconnect;
+