summaryrefslogtreecommitdiffstats
path: root/src/test/perl/PostgresNode.pm
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/perl/PostgresNode.pm')
-rw-r--r--src/test/perl/PostgresNode.pm2150
1 files changed, 2150 insertions, 0 deletions
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
new file mode 100644
index 0000000..52f552b
--- /dev/null
+++ b/src/test/perl/PostgresNode.pm
@@ -0,0 +1,2150 @@
+
+=pod
+
+=head1 NAME
+
+PostgresNode - class representing PostgreSQL server instance
+
+=head1 SYNOPSIS
+
+ use PostgresNode;
+
+ my $node = PostgresNode->get_new_node('mynode');
+
+ # Create a data directory with initdb
+ $node->init();
+
+ # Start the PostgreSQL server
+ $node->start();
+
+ # Change a setting and restart
+ $node->append_conf('postgresql.conf', 'hot_standby = on');
+ $node->restart();
+
+ # run a query with psql, like:
+ # echo 'SELECT 1' | psql -qAXt postgres -v ON_ERROR_STOP=1
+ $psql_stdout = $node->safe_psql('postgres', 'SELECT 1');
+
+ # Run psql with a timeout, capturing stdout and stderr
+ # as well as the psql exit code. Pass some extra psql
+ # options. If there's an error from psql raise an exception.
+ my ($stdout, $stderr, $timed_out);
+ my $cmdret = $node->psql('postgres', 'SELECT pg_sleep(600)',
+ stdout => \$stdout, stderr => \$stderr,
+ timeout => 180, timed_out => \$timed_out,
+ extra_params => ['--single-transaction'],
+ on_error_die => 1)
+ print "Sleep timed out" if $timed_out;
+
+ # Similar thing, more convenient in common cases
+ my ($cmdret, $stdout, $stderr) =
+ $node->psql('postgres', 'SELECT 1');
+
+ # run query every second until it returns 't'
+ # or times out
+ $node->poll_query_until('postgres', q|SELECT random() < 0.1;|')
+ or die "timed out";
+
+ # Do an online pg_basebackup
+ my $ret = $node->backup('testbackup1');
+
+ # Take a backup of a running server
+ my $ret = $node->backup_fs_hot('testbackup2');
+
+ # Take a backup of a stopped server
+ $node->stop;
+ my $ret = $node->backup_fs_cold('testbackup3')
+
+ # Restore it to create a new independent node (not a replica)
+ my $replica = get_new_node('replica');
+ $replica->init_from_backup($node, 'testbackup');
+ $replica->start;
+
+ # Stop the server
+ $node->stop('fast');
+
+ # Find a free, unprivileged TCP port to bind some other service to
+ my $port = get_free_port();
+
+=head1 DESCRIPTION
+
+PostgresNode contains a set of routines able to work on a PostgreSQL node,
+allowing to start, stop, backup and initialize it with various options.
+The set of nodes managed by a given test is also managed by this module.
+
+In addition to node management, PostgresNode instances have some wrappers
+around Test::More functions to run commands with an environment set up to
+point to the instance.
+
+The IPC::Run module is required.
+
+=cut
+
+package PostgresNode;
+
+use strict;
+use warnings;
+
+use Carp;
+use Config;
+use Cwd;
+use Exporter 'import';
+use Fcntl qw(:mode);
+use File::Basename;
+use File::Path qw(rmtree);
+use File::Spec;
+use File::stat qw(stat);
+use File::Temp ();
+use IPC::Run;
+use RecursiveCopy;
+use Socket;
+use Test::More;
+use TestLib ();
+use Time::HiRes qw(usleep);
+use Scalar::Util qw(blessed);
+
+our @EXPORT = qw(
+ get_new_node
+ get_free_port
+);
+
+our ($use_tcp, $test_localhost, $test_pghost, $last_host_assigned,
+ $last_port_assigned, @all_nodes, $died);
+
+INIT
+{
+
+ # Set PGHOST for backward compatibility. This doesn't work for own_host
+ # nodes, so prefer to not rely on this when writing new tests.
+ $use_tcp = !$TestLib::use_unix_sockets;
+ $test_localhost = "127.0.0.1";
+ $last_host_assigned = 1;
+ $test_pghost = $use_tcp ? $test_localhost : TestLib::tempdir_short;
+ $ENV{PGHOST} = $test_pghost;
+ $ENV{PGDATABASE} = 'postgres';
+
+ # Tracking of last port value assigned to accelerate free port lookup.
+ $last_port_assigned = int(rand() * 16384) + 49152;
+}
+
+=pod
+
+=head1 METHODS
+
+=over
+
+=item PostgresNode::new($class, $name, $pghost, $pgport)
+
+Create a new PostgresNode instance. Does not initdb or start it.
+
+You should generally prefer to use get_new_node() instead since it takes care
+of finding port numbers, registering instances for cleanup, etc.
+
+=cut
+
+sub new
+{
+ my ($class, $name, $pghost, $pgport) = @_;
+ my $testname = basename($0);
+ $testname =~ s/\.[^.]+$//;
+ my $self = {
+ _port => $pgport,
+ _host => $pghost,
+ _basedir => "$TestLib::tmp_check/t_${testname}_${name}_data",
+ _name => $name,
+ _logfile_generation => 0,
+ _logfile_base => "$TestLib::log_path/${testname}_${name}",
+ _logfile => "$TestLib::log_path/${testname}_${name}.log"
+ };
+
+ bless $self, $class;
+ mkdir $self->{_basedir}
+ or
+ BAIL_OUT("could not create data directory \"$self->{_basedir}\": $!");
+ $self->dump_info;
+
+ return $self;
+}
+
+=pod
+
+=item $node->port()
+
+Get the port number assigned to the host. This won't necessarily be a TCP port
+open on the local host since we prefer to use unix sockets if possible.
+
+Use $node->connstr() if you want a connection string.
+
+=cut
+
+sub port
+{
+ my ($self) = @_;
+ return $self->{_port};
+}
+
+=pod
+
+=item $node->host()
+
+Return the host (like PGHOST) for this instance. May be a UNIX socket path.
+
+Use $node->connstr() if you want a connection string.
+
+=cut
+
+sub host
+{
+ my ($self) = @_;
+ return $self->{_host};
+}
+
+=pod
+
+=item $node->basedir()
+
+The directory all the node's files will be within - datadir, archive directory,
+backups, etc.
+
+=cut
+
+sub basedir
+{
+ my ($self) = @_;
+ return $self->{_basedir};
+}
+
+=pod
+
+=item $node->name()
+
+The name assigned to the node at creation time.
+
+=cut
+
+sub name
+{
+ my ($self) = @_;
+ return $self->{_name};
+}
+
+=pod
+
+=item $node->logfile()
+
+Path to the PostgreSQL log file for this instance.
+
+=cut
+
+sub logfile
+{
+ my ($self) = @_;
+ return $self->{_logfile};
+}
+
+=pod
+
+=item $node->connstr()
+
+Get a libpq connection string that will establish a connection to
+this node. Suitable for passing to psql, DBD::Pg, etc.
+
+=cut
+
+sub connstr
+{
+ my ($self, $dbname) = @_;
+ my $pgport = $self->port;
+ my $pghost = $self->host;
+ if (!defined($dbname))
+ {
+ return "port=$pgport host=$pghost";
+ }
+
+ # Escape properly the database string before using it, only
+ # single quotes and backslashes need to be treated this way.
+ $dbname =~ s#\\#\\\\#g;
+ $dbname =~ s#\'#\\\'#g;
+
+ return "port=$pgport host=$pghost dbname='$dbname'";
+}
+
+=pod
+
+=item $node->group_access()
+
+Does the data dir allow group access?
+
+=cut
+
+sub group_access
+{
+ my ($self) = @_;
+
+ my $dir_stat = stat($self->data_dir);
+
+ defined($dir_stat)
+ or die('unable to stat ' . $self->data_dir);
+
+ return (S_IMODE($dir_stat->mode) == 0750);
+}
+
+=pod
+
+=item $node->data_dir()
+
+Returns the path to the data directory. postgresql.conf and pg_hba.conf are
+always here.
+
+=cut
+
+sub data_dir
+{
+ my ($self) = @_;
+ my $res = $self->basedir;
+ return "$res/pgdata";
+}
+
+=pod
+
+=item $node->archive_dir()
+
+If archiving is enabled, WAL files go here.
+
+=cut
+
+sub archive_dir
+{
+ my ($self) = @_;
+ my $basedir = $self->basedir;
+ return "$basedir/archives";
+}
+
+=pod
+
+=item $node->backup_dir()
+
+The output path for backups taken with $node->backup()
+
+=cut
+
+sub backup_dir
+{
+ my ($self) = @_;
+ my $basedir = $self->basedir;
+ return "$basedir/backup";
+}
+
+=pod
+
+=item $node->info()
+
+Return a string containing human-readable diagnostic information (paths, etc)
+about this node.
+
+=cut
+
+sub info
+{
+ my ($self) = @_;
+ my $_info = '';
+ open my $fh, '>', \$_info or die;
+ print $fh "Name: " . $self->name . "\n";
+ print $fh "Data directory: " . $self->data_dir . "\n";
+ print $fh "Backup directory: " . $self->backup_dir . "\n";
+ print $fh "Archive directory: " . $self->archive_dir . "\n";
+ print $fh "Connection string: " . $self->connstr . "\n";
+ print $fh "Log file: " . $self->logfile . "\n";
+ close $fh or die;
+ return $_info;
+}
+
+=pod
+
+=item $node->dump_info()
+
+Print $node->info()
+
+=cut
+
+sub dump_info
+{
+ my ($self) = @_;
+ print $self->info;
+ return;
+}
+
+
+# Internal method to set up trusted pg_hba.conf for replication. Not
+# documented because you shouldn't use it, it's called automatically if needed.
+sub set_replication_conf
+{
+ my ($self) = @_;
+ my $pgdata = $self->data_dir;
+
+ $self->host eq $test_pghost
+ or croak "set_replication_conf only works with the default host";
+
+ open my $hba, '>>', "$pgdata/pg_hba.conf";
+ print $hba "\n# Allow replication (set up by PostgresNode.pm)\n";
+ if ($TestLib::windows_os && !$TestLib::use_unix_sockets)
+ {
+ print $hba
+ "host replication all $test_localhost/32 sspi include_realm=1 map=regress\n";
+ }
+ close $hba;
+ return;
+}
+
+=pod
+
+=item $node->init(...)
+
+Initialize a new cluster for testing.
+
+Authentication is set up so that only the current OS user can access the
+cluster. On Unix, we use Unix domain socket connections, with the socket in
+a directory that's only accessible to the current user to ensure that.
+On Windows, we use SSPI authentication to ensure the same (by pg_regress
+--config-auth).
+
+WAL archiving can be enabled on this node by passing the keyword parameter
+has_archiving => 1. This is disabled by default.
+
+postgresql.conf can be set up for replication by passing the keyword
+parameter allows_streaming => 'logical' or 'physical' (passing 1 will also
+suffice for physical replication) depending on type of replication that
+should be enabled. This is disabled by default.
+
+The new node is set up in a fast but unsafe configuration where fsync is
+disabled.
+
+=cut
+
+sub init
+{
+ my ($self, %params) = @_;
+ my $port = $self->port;
+ my $pgdata = $self->data_dir;
+ my $host = $self->host;
+
+ $params{allows_streaming} = 0 unless defined $params{allows_streaming};
+ $params{has_archiving} = 0 unless defined $params{has_archiving};
+
+ mkdir $self->backup_dir;
+ mkdir $self->archive_dir;
+
+ TestLib::system_or_bail('initdb', '-D', $pgdata, '-A', 'trust', '-N',
+ @{ $params{extra} });
+ TestLib::system_or_bail($ENV{PG_REGRESS}, '--config-auth', $pgdata,
+ @{ $params{auth_extra} });
+
+ open my $conf, '>>', "$pgdata/postgresql.conf";
+ print $conf "\n# Added by PostgresNode.pm\n";
+ print $conf "fsync = off\n";
+ print $conf "restart_after_crash = off\n";
+ print $conf "log_line_prefix = '%m [%p] %q%a '\n";
+ print $conf "log_statement = all\n";
+ print $conf "log_replication_commands = on\n";
+ print $conf "wal_retrieve_retry_interval = '500ms'\n";
+
+ # If a setting tends to affect whether tests pass or fail, print it after
+ # TEMP_CONFIG. Otherwise, print it before TEMP_CONFIG, thereby permitting
+ # overrides. Settings that merely improve performance or ease debugging
+ # belong before TEMP_CONFIG.
+ print $conf TestLib::slurp_file($ENV{TEMP_CONFIG})
+ if defined $ENV{TEMP_CONFIG};
+
+ # XXX Neutralize any stats_temp_directory in TEMP_CONFIG. Nodes running
+ # concurrently must not share a stats_temp_directory.
+ print $conf "stats_temp_directory = 'pg_stat_tmp'\n";
+
+ if ($params{allows_streaming})
+ {
+ if ($params{allows_streaming} eq "logical")
+ {
+ print $conf "wal_level = logical\n";
+ }
+ else
+ {
+ print $conf "wal_level = replica\n";
+ }
+ print $conf "max_wal_senders = 10\n";
+ print $conf "max_replication_slots = 10\n";
+ print $conf "wal_log_hints = on\n";
+ print $conf "hot_standby = on\n";
+ # conservative settings to ensure we can run multiple postmasters:
+ print $conf "shared_buffers = 1MB\n";
+ print $conf "max_connections = 10\n";
+ # limit disk space consumption, too:
+ print $conf "max_wal_size = 128MB\n";
+ }
+ else
+ {
+ print $conf "wal_level = minimal\n";
+ print $conf "max_wal_senders = 0\n";
+ }
+
+ print $conf "port = $port\n";
+ if ($use_tcp)
+ {
+ print $conf "unix_socket_directories = ''\n";
+ print $conf "listen_addresses = '$host'\n";
+ }
+ else
+ {
+ print $conf "unix_socket_directories = '$host'\n";
+ print $conf "listen_addresses = ''\n";
+ }
+ close $conf;
+
+ chmod($self->group_access ? 0640 : 0600, "$pgdata/postgresql.conf")
+ or die("unable to set permissions for $pgdata/postgresql.conf");
+
+ $self->set_replication_conf if $params{allows_streaming};
+ $self->enable_archiving if $params{has_archiving};
+ return;
+}
+
+=pod
+
+=item $node->append_conf(filename, str)
+
+A shortcut method to append to files like pg_hba.conf and postgresql.conf.
+
+Does no validation or sanity checking. Does not reload the configuration
+after writing.
+
+A newline is automatically appended to the string.
+
+=cut
+
+sub append_conf
+{
+ my ($self, $filename, $str) = @_;
+
+ my $conffile = $self->data_dir . '/' . $filename;
+
+ TestLib::append_to_file($conffile, $str . "\n");
+
+ chmod($self->group_access() ? 0640 : 0600, $conffile)
+ or die("unable to set permissions for $conffile");
+
+ return;
+}
+
+=pod
+
+=item $node->backup(backup_name)
+
+Create a hot backup with B<pg_basebackup> in subdirectory B<backup_name> of
+B<< $node->backup_dir >>, including the WAL.
+
+By default, WAL files are fetched at the end of the backup, not streamed.
+You can adjust that and other things by passing an array of additional
+B<pg_basebackup> command line options in the keyword parameter backup_options.
+
+You'll have to configure a suitable B<max_wal_senders> on the
+target server since it isn't done by default.
+
+=cut
+
+sub backup
+{
+ my ($self, $backup_name, %params) = @_;
+ my $backup_path = $self->backup_dir . '/' . $backup_name;
+ my $name = $self->name;
+
+ print "# Taking pg_basebackup $backup_name from node \"$name\"\n";
+ TestLib::system_or_bail(
+ 'pg_basebackup', '-D', $backup_path, '-h',
+ $self->host, '-p', $self->port, '--checkpoint',
+ 'fast', '--no-sync',
+ @{ $params{backup_options} });
+ print "# Backup finished\n";
+ return;
+}
+
+=item $node->backup_fs_hot(backup_name)
+
+Create a backup with a filesystem level copy in subdirectory B<backup_name> of
+B<< $node->backup_dir >>, including WAL.
+
+Archiving must be enabled, as B<pg_start_backup()> and B<pg_stop_backup()> are
+used. This is not checked or enforced.
+
+The backup name is passed as the backup label to B<pg_start_backup()>.
+
+=cut
+
+sub backup_fs_hot
+{
+ my ($self, $backup_name) = @_;
+ $self->_backup_fs($backup_name, 1);
+ return;
+}
+
+=item $node->backup_fs_cold(backup_name)
+
+Create a backup with a filesystem level copy in subdirectory B<backup_name> of
+B<< $node->backup_dir >>, including WAL. The server must be
+stopped as no attempt to handle concurrent writes is made.
+
+Use B<backup> or B<backup_fs_hot> if you want to back up a running server.
+
+=cut
+
+sub backup_fs_cold
+{
+ my ($self, $backup_name) = @_;
+ $self->_backup_fs($backup_name, 0);
+ return;
+}
+
+
+# Common sub of backup_fs_hot and backup_fs_cold
+sub _backup_fs
+{
+ my ($self, $backup_name, $hot) = @_;
+ my $backup_path = $self->backup_dir . '/' . $backup_name;
+ my $port = $self->port;
+ my $name = $self->name;
+
+ print "# Taking filesystem backup $backup_name from node \"$name\"\n";
+
+ if ($hot)
+ {
+ my $stdout = $self->safe_psql('postgres',
+ "SELECT * FROM pg_start_backup('$backup_name');");
+ print "# pg_start_backup: $stdout\n";
+ }
+
+ RecursiveCopy::copypath(
+ $self->data_dir,
+ $backup_path,
+ filterfn => sub {
+ my $src = shift;
+ return ($src ne 'log' and $src ne 'postmaster.pid');
+ });
+
+ if ($hot)
+ {
+
+ # We ignore pg_stop_backup's return value. We also assume archiving
+ # is enabled; otherwise the caller will have to copy the remaining
+ # segments.
+ my $stdout =
+ $self->safe_psql('postgres', 'SELECT * FROM pg_stop_backup();');
+ print "# pg_stop_backup: $stdout\n";
+ }
+
+ print "# Backup finished\n";
+ return;
+}
+
+
+
+=pod
+
+=item $node->init_from_backup(root_node, backup_name)
+
+Initialize a node from a backup, which may come from this node or a different
+node. root_node must be a PostgresNode reference, backup_name the string name
+of a backup previously created on that node with $node->backup.
+
+Does not start the node after initializing it.
+
+Streaming replication can be enabled on this node by passing the keyword
+parameter has_streaming => 1. This is disabled by default.
+
+Restoring WAL segments from archives using restore_command can be enabled
+by passing the keyword parameter has_restoring => 1. This is disabled by
+default.
+
+If has_restoring is used, standby mode is used by default. To use
+recovery mode instead, pass the keyword parameter standby => 0.
+
+The backup is copied, leaving the original unmodified. pg_hba.conf is
+unconditionally set to enable replication connections.
+
+=cut
+
+sub init_from_backup
+{
+ my ($self, $root_node, $backup_name, %params) = @_;
+ my $backup_path = $root_node->backup_dir . '/' . $backup_name;
+ my $host = $self->host;
+ my $port = $self->port;
+ my $node_name = $self->name;
+ my $root_name = $root_node->name;
+
+ $params{has_streaming} = 0 unless defined $params{has_streaming};
+ $params{has_restoring} = 0 unless defined $params{has_restoring};
+ $params{standby} = 1 unless defined $params{standby};
+
+ print
+ "# Initializing node \"$node_name\" from backup \"$backup_name\" of node \"$root_name\"\n";
+ croak "Backup \"$backup_name\" does not exist at $backup_path"
+ unless -d $backup_path;
+
+ mkdir $self->backup_dir;
+ mkdir $self->archive_dir;
+
+ my $data_path = $self->data_dir;
+ rmdir($data_path);
+ RecursiveCopy::copypath($backup_path, $data_path);
+ chmod(0700, $data_path);
+
+ # Base configuration for this node
+ $self->append_conf(
+ 'postgresql.conf',
+ qq(
+port = $port
+));
+ if ($use_tcp)
+ {
+ $self->append_conf('postgresql.conf', "listen_addresses = '$host'");
+ }
+ else
+ {
+ $self->append_conf('postgresql.conf',
+ "unix_socket_directories = '$host'");
+ }
+ $self->enable_streaming($root_node) if $params{has_streaming};
+ $self->enable_restoring($root_node, $params{standby})
+ if $params{has_restoring};
+ return;
+}
+
+=pod
+
+=item $node->rotate_logfile()
+
+Switch to a new PostgreSQL log file. This does not alter any running
+PostgreSQL process. Subsequent method calls, including pg_ctl invocations,
+will use the new name. Return the new name.
+
+=cut
+
+sub rotate_logfile
+{
+ my ($self) = @_;
+ $self->{_logfile} = sprintf('%s_%d.log',
+ $self->{_logfile_base},
+ ++$self->{_logfile_generation});
+ return $self->{_logfile};
+}
+
+=pod
+
+=item $node->start(%params) => success_or_failure
+
+Wrapper for pg_ctl start
+
+Start the node and wait until it is ready to accept connections.
+
+=over
+
+=item fail_ok => 1
+
+By default, failure terminates the entire F<prove> invocation. If given,
+instead return a true or false value to indicate success or failure.
+
+=back
+
+=cut
+
+sub start
+{
+ my ($self, %params) = @_;
+ my $port = $self->port;
+ my $pgdata = $self->data_dir;
+ my $name = $self->name;
+ my $ret;
+
+ BAIL_OUT("node \"$name\" is already running") if defined $self->{_pid};
+
+ print("### Starting node \"$name\"\n");
+
+ {
+ # Temporarily unset PGAPPNAME so that the server doesn't
+ # inherit it. Otherwise this could affect libpqwalreceiver
+ # connections in confusing ways.
+ local %ENV = %ENV;
+ delete $ENV{PGAPPNAME};
+
+ # Note: We set the cluster_name here, not in postgresql.conf (in
+ # sub init) so that it does not get copied to standbys.
+ $ret = TestLib::system_log('pg_ctl', '-D', $self->data_dir, '-l',
+ $self->logfile, '-o', "--cluster-name=$name", 'start');
+ }
+
+ if ($ret != 0)
+ {
+ print "# pg_ctl start failed; logfile:\n";
+ print TestLib::slurp_file($self->logfile);
+ BAIL_OUT("pg_ctl start failed") unless $params{fail_ok};
+ return 0;
+ }
+
+ $self->_update_pid(1);
+ return 1;
+}
+
+=pod
+
+=item $node->kill9()
+
+Send SIGKILL (signal 9) to the postmaster.
+
+Note: if the node is already known stopped, this does nothing.
+However, if we think it's running and it's not, it's important for
+this to fail. Otherwise, tests might fail to detect server crashes.
+
+=cut
+
+sub kill9
+{
+ my ($self) = @_;
+ my $name = $self->name;
+ return unless defined $self->{_pid};
+ print "### Killing node \"$name\" using signal 9\n";
+ # kill(9, ...) fails under msys Perl 5.8.8, so fall back on pg_ctl.
+ kill(9, $self->{_pid})
+ or TestLib::system_or_bail('pg_ctl', 'kill', 'KILL', $self->{_pid});
+ $self->{_pid} = undef;
+ return;
+}
+
+=pod
+
+=item $node->stop(mode)
+
+Stop the node using pg_ctl -m $mode and wait for it to stop.
+
+Note: if the node is already known stopped, this does nothing.
+However, if we think it's running and it's not, it's important for
+this to fail. Otherwise, tests might fail to detect server crashes.
+
+=cut
+
+sub stop
+{
+ my ($self, $mode) = @_;
+ my $port = $self->port;
+ my $pgdata = $self->data_dir;
+ my $name = $self->name;
+ $mode = 'fast' unless defined $mode;
+ return unless defined $self->{_pid};
+ print "### Stopping node \"$name\" using mode $mode\n";
+ TestLib::system_or_bail('pg_ctl', '-D', $pgdata, '-m', $mode, 'stop');
+ $self->_update_pid(0);
+ return;
+}
+
+=pod
+
+=item $node->reload()
+
+Reload configuration parameters on the node.
+
+=cut
+
+sub reload
+{
+ my ($self) = @_;
+ my $port = $self->port;
+ my $pgdata = $self->data_dir;
+ my $name = $self->name;
+ print "### Reloading node \"$name\"\n";
+ TestLib::system_or_bail('pg_ctl', '-D', $pgdata, 'reload');
+ return;
+}
+
+=pod
+
+=item $node->restart()
+
+Wrapper for pg_ctl restart
+
+=cut
+
+sub restart
+{
+ my ($self) = @_;
+ my $port = $self->port;
+ my $pgdata = $self->data_dir;
+ my $logfile = $self->logfile;
+ my $name = $self->name;
+
+ print "### Restarting node \"$name\"\n";
+
+ {
+ local %ENV = %ENV;
+ delete $ENV{PGAPPNAME};
+
+ TestLib::system_or_bail('pg_ctl', '-D', $pgdata, '-l', $logfile,
+ 'restart');
+ }
+
+ $self->_update_pid(1);
+ return;
+}
+
+=pod
+
+=item $node->promote()
+
+Wrapper for pg_ctl promote
+
+=cut
+
+sub promote
+{
+ my ($self) = @_;
+ my $port = $self->port;
+ my $pgdata = $self->data_dir;
+ my $logfile = $self->logfile;
+ my $name = $self->name;
+ print "### Promoting node \"$name\"\n";
+ TestLib::system_or_bail('pg_ctl', '-D', $pgdata, '-l', $logfile,
+ 'promote');
+ return;
+}
+
+=pod
+
+=item $node->logrotate()
+
+Wrapper for pg_ctl logrotate
+
+=cut
+
+sub logrotate
+{
+ my ($self) = @_;
+ my $port = $self->port;
+ my $pgdata = $self->data_dir;
+ my $logfile = $self->logfile;
+ my $name = $self->name;
+ print "### Rotating log in node \"$name\"\n";
+ TestLib::system_or_bail('pg_ctl', '-D', $pgdata, '-l', $logfile,
+ 'logrotate');
+ return;
+}
+
+# Internal routine to enable streaming replication on a standby node.
+sub enable_streaming
+{
+ my ($self, $root_node) = @_;
+ my $root_connstr = $root_node->connstr;
+ my $name = $self->name;
+
+ print "### Enabling streaming replication for node \"$name\"\n";
+ $self->append_conf(
+ 'postgresql.conf', qq(
+primary_conninfo='$root_connstr'
+));
+ $self->set_standby_mode();
+ return;
+}
+
+# Internal routine to enable archive recovery command on a standby node
+sub enable_restoring
+{
+ my ($self, $root_node, $standby) = @_;
+ my $path = TestLib::perl2host($root_node->archive_dir);
+ my $name = $self->name;
+
+ print "### Enabling WAL restore for node \"$name\"\n";
+
+ # On Windows, the path specified in the restore command needs to use
+ # double back-slashes to work properly and to be able to detect properly
+ # the file targeted by the copy command, so the directory value used
+ # in this routine, using only one back-slash, need to be properly changed
+ # first. Paths also need to be double-quoted to prevent failures where
+ # the path contains spaces.
+ $path =~ s{\\}{\\\\}g if ($TestLib::windows_os);
+ my $copy_command =
+ $TestLib::windows_os
+ ? qq{copy "$path\\\\%f" "%p"}
+ : qq{cp "$path/%f" "%p"};
+
+ $self->append_conf(
+ 'postgresql.conf', qq(
+restore_command = '$copy_command'
+));
+ if ($standby)
+ {
+ $self->set_standby_mode();
+ }
+ else
+ {
+ $self->set_recovery_mode();
+ }
+ return;
+}
+
+=pod
+
+=item $node->set_recovery_mode()
+
+Place recovery.signal file.
+
+=cut
+
+sub set_recovery_mode
+{
+ my ($self) = @_;
+
+ $self->append_conf('recovery.signal', '');
+ return;
+}
+
+=pod
+
+=item $node->set_standby_mode()
+
+Place standby.signal file.
+
+=cut
+
+sub set_standby_mode
+{
+ my ($self) = @_;
+
+ $self->append_conf('standby.signal', '');
+ return;
+}
+
+# Internal routine to enable archiving
+sub enable_archiving
+{
+ my ($self) = @_;
+ my $path = TestLib::perl2host($self->archive_dir);
+ my $name = $self->name;
+
+ print "### Enabling WAL archiving for node \"$name\"\n";
+
+ # On Windows, the path specified in the restore command needs to use
+ # double back-slashes to work properly and to be able to detect properly
+ # the file targeted by the copy command, so the directory value used
+ # in this routine, using only one back-slash, need to be properly changed
+ # first. Paths also need to be double-quoted to prevent failures where
+ # the path contains spaces.
+ $path =~ s{\\}{\\\\}g if ($TestLib::windows_os);
+ my $copy_command =
+ $TestLib::windows_os
+ ? qq{copy "%p" "$path\\\\%f"}
+ : qq{cp "%p" "$path/%f"};
+
+ # Enable archive_mode and archive_command on node
+ $self->append_conf(
+ 'postgresql.conf', qq(
+archive_mode = on
+archive_command = '$copy_command'
+));
+ return;
+}
+
+# Internal method
+sub _update_pid
+{
+ my ($self, $is_running) = @_;
+ my $name = $self->name;
+
+ # If we can open the PID file, read its first line and that's the PID we
+ # want.
+ if (open my $pidfile, '<', $self->data_dir . "/postmaster.pid")
+ {
+ chomp($self->{_pid} = <$pidfile>);
+ print "# Postmaster PID for node \"$name\" is $self->{_pid}\n";
+ close $pidfile;
+
+ # If we found a pidfile when there shouldn't be one, complain.
+ BAIL_OUT("postmaster.pid unexpectedly present") unless $is_running;
+ return;
+ }
+
+ $self->{_pid} = undef;
+ print "# No postmaster PID for node \"$name\"\n";
+
+ # Complain if we expected to find a pidfile.
+ BAIL_OUT("postmaster.pid unexpectedly not present") if $is_running;
+ return;
+}
+
+=pod
+
+=item PostgresNode->get_new_node(node_name, %params)
+
+Build a new object of class C<PostgresNode> (or of a subclass, if you have
+one), assigning a free port number. Remembers the node, to prevent its port
+number from being reused for another node, and to ensure that it gets
+shut down when the test script exits.
+
+You should generally use this instead of C<PostgresNode::new(...)>.
+
+=over
+
+=item port => [1,65535]
+
+By default, this function assigns a port number to each node. Specify this to
+force a particular port number. The caller is responsible for evaluating
+potential conflicts and privilege requirements.
+
+=item own_host => 1
+
+By default, all nodes use the same PGHOST value. If specified, generate a
+PGHOST specific to this node. This allows multiple nodes to use the same
+port.
+
+=back
+
+For backwards compatibility, it is also exported as a standalone function,
+which can only create objects of class C<PostgresNode>.
+
+=cut
+
+sub get_new_node
+{
+ my $class = 'PostgresNode';
+ $class = shift if scalar(@_) % 2 != 1;
+ my ($name, %params) = @_;
+
+ # Select a port.
+ my $port;
+ if (defined $params{port})
+ {
+ $port = $params{port};
+ }
+ else
+ {
+ # When selecting a port, we look for an unassigned TCP port number,
+ # even if we intend to use only Unix-domain sockets. This is clearly
+ # necessary on $use_tcp (Windows) configurations, and it seems like a
+ # good idea on Unixen as well.
+ $port = get_free_port();
+ }
+
+ # Select a host.
+ my $host = $test_pghost;
+ if ($params{own_host})
+ {
+ if ($use_tcp)
+ {
+ $last_host_assigned++;
+ $last_host_assigned > 254 and BAIL_OUT("too many own_host nodes");
+ $host = '127.0.0.' . $last_host_assigned;
+ }
+ else
+ {
+ $host = "$test_pghost/$name"; # Assume $name =~ /^[-_a-zA-Z0-9]+$/
+ mkdir $host;
+ }
+ }
+
+ # Lock port number found by creating a new node
+ my $node = $class->new($name, $host, $port);
+
+ # Add node to list of nodes
+ push(@all_nodes, $node);
+
+ return $node;
+}
+
+=pod
+
+=item get_free_port()
+
+Locate an unprivileged (high) TCP port that's not currently bound to
+anything. This is used by get_new_node, and is also exported for use
+by test cases that need to start other, non-Postgres servers.
+
+Ports assigned to existing PostgresNode objects are automatically
+excluded, even if those servers are not currently running.
+
+XXX A port available now may become unavailable by the time we start
+the desired service.
+
+=cut
+
+sub get_free_port
+{
+ my $found = 0;
+ my $port = $last_port_assigned;
+
+ while ($found == 0)
+ {
+
+ # advance $port, wrapping correctly around range end
+ $port = 49152 if ++$port >= 65536;
+ print "# Checking port $port\n";
+
+ # Check first that candidate port number is not included in
+ # the list of already-registered nodes.
+ $found = 1;
+ foreach my $node (@all_nodes)
+ {
+ $found = 0 if ($node->port == $port);
+ }
+
+ # Check to see if anything else is listening on this TCP port.
+ # Seek a port available for all possible listen_addresses values,
+ # so callers can harness this port for the widest range of purposes.
+ # The 0.0.0.0 test achieves that for MSYS, which automatically sets
+ # SO_EXCLUSIVEADDRUSE. Testing 0.0.0.0 is insufficient for Windows
+ # native Perl (https://stackoverflow.com/a/14388707), so we also
+ # have to test individual addresses. Doing that for 127.0.0/24
+ # addresses other than 127.0.0.1 might fail with EADDRNOTAVAIL on
+ # non-Linux, non-Windows kernels.
+ #
+ # Thus, 0.0.0.0 and individual 127.0.0/24 addresses are tested
+ # only on Windows and only when TCP usage is requested.
+ if ($found == 1)
+ {
+ foreach my $addr (qw(127.0.0.1),
+ ($use_tcp && $TestLib::windows_os)
+ ? qw(127.0.0.2 127.0.0.3 0.0.0.0)
+ : ())
+ {
+ if (!can_bind($addr, $port))
+ {
+ $found = 0;
+ last;
+ }
+ }
+ }
+ }
+
+ print "# Found port $port\n";
+
+ # Update port for next time
+ $last_port_assigned = $port;
+
+ return $port;
+}
+
+# Internal routine to check whether a host:port is available to bind
+sub can_bind
+{
+ my ($host, $port) = @_;
+ my $iaddr = inet_aton($host);
+ my $paddr = sockaddr_in($port, $iaddr);
+ my $proto = getprotobyname("tcp");
+
+ socket(SOCK, PF_INET, SOCK_STREAM, $proto)
+ or die "socket failed: $!";
+
+ # As in postmaster, don't use SO_REUSEADDR on Windows
+ setsockopt(SOCK, SOL_SOCKET, SO_REUSEADDR, pack("l", 1))
+ unless $TestLib::windows_os;
+ my $ret = bind(SOCK, $paddr) && listen(SOCK, SOMAXCONN);
+ close(SOCK);
+ return $ret;
+}
+
+# Automatically shut down any still-running nodes when the test script exits.
+# Note that this just stops the postmasters (in the same order the nodes were
+# created in). Any temporary directories are deleted, in an unspecified
+# order, later when the File::Temp objects are destroyed.
+END
+{
+
+ # take care not to change the script's exit value
+ my $exit_code = $?;
+
+ foreach my $node (@all_nodes)
+ {
+ $node->teardown_node;
+
+ # skip clean if we are requested to retain the basedir
+ next if defined $ENV{'PG_TEST_NOCLEAN'};
+
+ # clean basedir on clean test invocation
+ $node->clean_node if $exit_code == 0 && TestLib::all_tests_passing();
+ }
+
+ $? = $exit_code;
+}
+
+=pod
+
+=item $node->teardown_node()
+
+Do an immediate stop of the node
+
+=cut
+
+sub teardown_node
+{
+ my $self = shift;
+
+ $self->stop('immediate');
+ return;
+}
+
+=pod
+
+=item $node->clean_node()
+
+Remove the base directory of the node if the node has been stopped.
+
+=cut
+
+sub clean_node
+{
+ my $self = shift;
+
+ rmtree $self->{_basedir} unless defined $self->{_pid};
+ return;
+}
+
+=pod
+
+=item $node->safe_psql($dbname, $sql) => stdout
+
+Invoke B<psql> to run B<sql> on B<dbname> and return its stdout on success.
+Die if the SQL produces an error. Runs with B<ON_ERROR_STOP> set.
+
+Takes optional extra params like timeout and timed_out parameters with the same
+options as psql.
+
+=cut
+
+sub safe_psql
+{
+ my ($self, $dbname, $sql, %params) = @_;
+
+ my ($stdout, $stderr);
+
+ my $ret = $self->psql(
+ $dbname, $sql,
+ %params,
+ stdout => \$stdout,
+ stderr => \$stderr,
+ on_error_die => 1,
+ on_error_stop => 1);
+
+ # psql can emit stderr from NOTICEs etc
+ if ($stderr ne "")
+ {
+ print "#### Begin standard error\n";
+ print $stderr;
+ print "\n#### End standard error\n";
+ }
+
+ return $stdout;
+}
+
+=pod
+
+=item $node->psql($dbname, $sql, %params) => psql_retval
+
+Invoke B<psql> to execute B<$sql> on B<$dbname> and return the return value
+from B<psql>, which is run with on_error_stop by default so that it will
+stop running sql and return 3 if the passed SQL results in an error.
+
+As a convenience, if B<psql> is called in array context it returns an
+array containing ($retval, $stdout, $stderr).
+
+psql is invoked in tuples-only unaligned mode with reading of B<.psqlrc>
+disabled. That may be overridden by passing extra psql parameters.
+
+stdout and stderr are transformed to UNIX line endings if on Windows. Any
+trailing newline is removed.
+
+Dies on failure to invoke psql but not if psql exits with a nonzero
+return code (unless on_error_die specified).
+
+If psql exits because of a signal, an exception is raised.
+
+=over
+
+=item stdout => \$stdout
+
+B<stdout>, if given, must be a scalar reference to which standard output is
+written. If not given, standard output is not redirected and will be printed
+unless B<psql> is called in array context, in which case it's captured and
+returned.
+
+=item stderr => \$stderr
+
+Same as B<stdout> but gets standard error. If the same scalar is passed for
+both B<stdout> and B<stderr> the results may be interleaved unpredictably.
+
+=item on_error_stop => 1
+
+By default, the B<psql> method invokes the B<psql> program with ON_ERROR_STOP=1
+set, so SQL execution is stopped at the first error and exit code 2 is
+returned. Set B<on_error_stop> to 0 to ignore errors instead.
+
+=item on_error_die => 0
+
+By default, this method returns psql's result code. Pass on_error_die to
+instead die with an informative message.
+
+=item timeout => 'interval'
+
+Set a timeout for the psql call as an interval accepted by B<IPC::Run::timer>
+(integer seconds is fine). This method raises an exception on timeout, unless
+the B<timed_out> parameter is also given.
+
+=item timed_out => \$timed_out
+
+If B<timeout> is set and this parameter is given, the scalar it references
+is set to true if the psql call times out.
+
+=item replication => B<value>
+
+If set, add B<replication=value> to the conninfo string.
+Passing the literal value C<database> results in a logical replication
+connection.
+
+=item extra_params => ['--single-transaction']
+
+If given, it must be an array reference containing additional parameters to B<psql>.
+
+=back
+
+e.g.
+
+ my ($stdout, $stderr, $timed_out);
+ my $cmdret = $node->psql('postgres', 'SELECT pg_sleep(600)',
+ stdout => \$stdout, stderr => \$stderr,
+ timeout => 180, timed_out => \$timed_out,
+ extra_params => ['--single-transaction'])
+
+will set $cmdret to undef and $timed_out to a true value.
+
+ $node->psql('postgres', $sql, on_error_die => 1);
+
+dies with an informative message if $sql fails.
+
+=cut
+
+sub psql
+{
+ my ($self, $dbname, $sql, %params) = @_;
+
+ my $stdout = $params{stdout};
+ my $stderr = $params{stderr};
+ my $replication = $params{replication};
+ my $timeout = undef;
+ my $timeout_exception = 'psql timed out';
+ my @psql_params = (
+ 'psql',
+ '-XAtq',
+ '-d',
+ $self->connstr($dbname)
+ . (defined $replication ? " replication=$replication" : ""),
+ '-f',
+ '-');
+
+ # If the caller wants an array and hasn't passed stdout/stderr
+ # references, allocate temporary ones to capture them so we
+ # can return them. Otherwise we won't redirect them at all.
+ if (wantarray)
+ {
+ if (!defined($stdout))
+ {
+ my $temp_stdout = "";
+ $stdout = \$temp_stdout;
+ }
+ if (!defined($stderr))
+ {
+ my $temp_stderr = "";
+ $stderr = \$temp_stderr;
+ }
+ }
+
+ $params{on_error_stop} = 1 unless defined $params{on_error_stop};
+ $params{on_error_die} = 0 unless defined $params{on_error_die};
+
+ push @psql_params, '-v', 'ON_ERROR_STOP=1' if $params{on_error_stop};
+ push @psql_params, @{ $params{extra_params} }
+ if defined $params{extra_params};
+
+ $timeout =
+ IPC::Run::timeout($params{timeout}, exception => $timeout_exception)
+ if (defined($params{timeout}));
+
+ ${ $params{timed_out} } = 0 if defined $params{timed_out};
+
+ # IPC::Run would otherwise append to existing contents:
+ $$stdout = "" if ref($stdout);
+ $$stderr = "" if ref($stderr);
+
+ my $ret;
+
+ # Run psql and capture any possible exceptions. If the exception is
+ # because of a timeout and the caller requested to handle that, just return
+ # and set the flag. Otherwise, and for any other exception, rethrow.
+ #
+ # For background, see
+ # https://metacpan.org/pod/release/ETHER/Try-Tiny-0.24/lib/Try/Tiny.pm
+ do
+ {
+ local $@;
+ eval {
+ my @ipcrun_opts = (\@psql_params, '<', \$sql);
+ push @ipcrun_opts, '>', $stdout if defined $stdout;
+ push @ipcrun_opts, '2>', $stderr if defined $stderr;
+ push @ipcrun_opts, $timeout if defined $timeout;
+
+ IPC::Run::run @ipcrun_opts;
+ $ret = $?;
+ };
+ my $exc_save = $@;
+ if ($exc_save)
+ {
+
+ # IPC::Run::run threw an exception. re-throw unless it's a
+ # timeout, which we'll handle by testing is_expired
+ die $exc_save
+ if (blessed($exc_save)
+ || $exc_save !~ /^\Q$timeout_exception\E/);
+
+ $ret = undef;
+
+ die "Got timeout exception '$exc_save' but timer not expired?!"
+ unless $timeout->is_expired;
+
+ if (defined($params{timed_out}))
+ {
+ ${ $params{timed_out} } = 1;
+ }
+ else
+ {
+ die "psql timed out: stderr: '$$stderr'\n"
+ . "while running '@psql_params'";
+ }
+ }
+ };
+
+ # Note: on Windows, IPC::Run seems to convert \r\n to \n in program output
+ # if we're using native Perl, but not if we're using MSys Perl. So do it
+ # by hand in the latter case, here and elsewhere.
+
+ if (defined $$stdout)
+ {
+ $$stdout =~ s/\r\n/\n/g if $Config{osname} eq 'msys';
+ chomp $$stdout;
+ }
+
+ if (defined $$stderr)
+ {
+ $$stderr =~ s/\r\n/\n/g if $Config{osname} eq 'msys';
+ chomp $$stderr;
+ }
+
+ # See http://perldoc.perl.org/perlvar.html#%24CHILD_ERROR
+ # We don't use IPC::Run::Simple to limit dependencies.
+ #
+ # We always die on signal.
+ my $core = $ret & 128 ? " (core dumped)" : "";
+ die "psql exited with signal "
+ . ($ret & 127)
+ . "$core: '$$stderr' while running '@psql_params'"
+ if $ret & 127;
+ $ret = $ret >> 8;
+
+ if ($ret && $params{on_error_die})
+ {
+ die "psql error: stderr: '$$stderr'\nwhile running '@psql_params'"
+ if $ret == 1;
+ die "connection error: '$$stderr'\nwhile running '@psql_params'"
+ if $ret == 2;
+ die
+ "error running SQL: '$$stderr'\nwhile running '@psql_params' with sql '$sql'"
+ if $ret == 3;
+ die "psql returns $ret: '$$stderr'\nwhile running '@psql_params'";
+ }
+
+ if (wantarray)
+ {
+ return ($ret, $$stdout, $$stderr);
+ }
+ else
+ {
+ return $ret;
+ }
+}
+
+=pod
+
+=item $node->interactive_psql($dbname, \$stdin, \$stdout, $timer, %params) => harness
+
+Invoke B<psql> on B<$dbname> and return an IPC::Run harness object,
+which the caller may use to send interactive input to B<psql>.
+The process's stdin is sourced from the $stdin scalar reference,
+and its stdout and stderr go to the $stdout scalar reference.
+ptys are used so that psql thinks it's being called interactively.
+
+The specified timer object is attached to the harness, as well.
+It's caller's responsibility to select the timeout length, and to
+restart the timer after each command if the timeout is per-command.
+
+psql is invoked in tuples-only unaligned mode with reading of B<.psqlrc>
+disabled. That may be overridden by passing extra psql parameters.
+
+Dies on failure to invoke psql, or if psql fails to connect.
+Errors occurring later are the caller's problem.
+
+Be sure to "finish" the harness when done with it.
+
+The only extra parameter currently accepted is
+
+=over
+
+=item extra_params => ['--single-transaction']
+
+If given, it must be an array reference containing additional parameters to B<psql>.
+
+=back
+
+This requires IO::Pty in addition to IPC::Run.
+
+=cut
+
+sub interactive_psql
+{
+ my ($self, $dbname, $stdin, $stdout, $timer, %params) = @_;
+
+ my @psql_params = ('psql', '-XAt', '-d', $self->connstr($dbname));
+
+ push @psql_params, @{ $params{extra_params} }
+ if defined $params{extra_params};
+
+ # Ensure there is no data waiting to be sent:
+ $$stdin = "" if ref($stdin);
+ # IPC::Run would otherwise append to existing contents:
+ $$stdout = "" if ref($stdout);
+
+ my $harness = IPC::Run::start \@psql_params,
+ '<pty<', $stdin, '>pty>', $stdout, $timer;
+
+ # Pump until we see psql's help banner. This ensures that callers
+ # won't write anything to the pty before it's ready, avoiding an
+ # implementation issue in IPC::Run. Also, it means that psql
+ # connection failures are caught here, relieving callers of
+ # the need to handle those. (Right now, we have no particularly
+ # good handling for errors anyway, but that might be added later.)
+ pump $harness
+ until $$stdout =~ /Type "help" for help/ || $timer->is_expired;
+
+ die "psql startup timed out" if $timer->is_expired;
+
+ return $harness;
+}
+
+=pod
+
+=item $node->poll_query_until($dbname, $query [, $expected ])
+
+Run B<$query> repeatedly, until it returns the B<$expected> result
+('t', or SQL boolean true, by default).
+Continues polling if B<psql> returns an error result.
+Times out after 180 seconds.
+Returns 1 if successful, 0 if timed out.
+
+=cut
+
+sub poll_query_until
+{
+ my ($self, $dbname, $query, $expected) = @_;
+
+ $expected = 't' unless defined($expected); # default value
+
+ my $cmd = [ 'psql', '-XAt', '-d', $self->connstr($dbname) ];
+ my ($stdout, $stderr);
+ my $max_attempts = 180 * 10;
+ my $attempts = 0;
+
+ while ($attempts < $max_attempts)
+ {
+ my $result = IPC::Run::run $cmd, '<', \$query,
+ '>', \$stdout, '2>', \$stderr;
+
+ $stdout =~ s/\r\n/\n/g if $Config{osname} eq 'msys';
+ chomp($stdout);
+ $stderr =~ s/\r\n/\n/g if $Config{osname} eq 'msys';
+ chomp($stderr);
+
+ if ($stdout eq $expected && $stderr eq '')
+ {
+ return 1;
+ }
+
+ # Wait 0.1 second before retrying.
+ usleep(100_000);
+
+ $attempts++;
+ }
+
+ # The query result didn't change in 180 seconds. Give up. Print the
+ # output from the last attempt, hopefully that's useful for debugging.
+ diag qq(poll_query_until timed out executing this query:
+$query
+expecting this output:
+$expected
+last actual query output:
+$stdout
+with stderr:
+$stderr);
+ return 0;
+}
+
+=pod
+
+=item $node->command_ok(...)
+
+Runs a shell command like TestLib::command_ok, but with PGHOST and PGPORT set
+so that the command will default to connecting to this PostgresNode.
+
+=cut
+
+sub command_ok
+{
+ local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+ my $self = shift;
+
+ local $ENV{PGHOST} = $self->host;
+ local $ENV{PGPORT} = $self->port;
+
+ TestLib::command_ok(@_);
+ return;
+}
+
+=pod
+
+=item $node->command_fails(...)
+
+TestLib::command_fails with our connection parameters. See command_ok(...)
+
+=cut
+
+sub command_fails
+{
+ local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+ my $self = shift;
+
+ local $ENV{PGHOST} = $self->host;
+ local $ENV{PGPORT} = $self->port;
+
+ TestLib::command_fails(@_);
+ return;
+}
+
+=pod
+
+=item $node->command_like(...)
+
+TestLib::command_like with our connection parameters. See command_ok(...)
+
+=cut
+
+sub command_like
+{
+ local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+ my $self = shift;
+
+ local $ENV{PGHOST} = $self->host;
+ local $ENV{PGPORT} = $self->port;
+
+ TestLib::command_like(@_);
+ return;
+}
+
+=pod
+
+=item $node->command_checks_all(...)
+
+TestLib::command_checks_all with our connection parameters. See
+command_ok(...)
+
+=cut
+
+sub command_checks_all
+{
+ local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+ my $self = shift;
+
+ local $ENV{PGHOST} = $self->host;
+ local $ENV{PGPORT} = $self->port;
+
+ TestLib::command_checks_all(@_);
+ return;
+}
+
+=pod
+
+=item $node->issues_sql_like(cmd, expected_sql, test_name)
+
+Run a command on the node, then verify that $expected_sql appears in the
+server log file.
+
+=cut
+
+sub issues_sql_like
+{
+ local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+ my ($self, $cmd, $expected_sql, $test_name) = @_;
+
+ local $ENV{PGHOST} = $self->host;
+ local $ENV{PGPORT} = $self->port;
+
+ my $log_location = -s $self->logfile;
+
+ my $result = TestLib::run_log($cmd);
+ ok($result, "@$cmd exit code 0");
+ my $log = TestLib::slurp_file($self->logfile, $log_location);
+ like($log, $expected_sql, "$test_name: SQL found in server log");
+ return;
+}
+
+=pod
+
+=item $node->run_log(...)
+
+Runs a shell command like TestLib::run_log, but with connection parameters set
+so that the command will default to connecting to this PostgresNode.
+
+=cut
+
+sub run_log
+{
+ my $self = shift;
+
+ local $ENV{PGHOST} = $self->host;
+ local $ENV{PGPORT} = $self->port;
+
+ TestLib::run_log(@_);
+ return;
+}
+
+=pod
+
+=item $node->lsn(mode)
+
+Look up WAL locations on the server:
+
+ * insert location (master only, error on replica)
+ * write location (master only, error on replica)
+ * flush location (master only, error on replica)
+ * receive location (always undef on master)
+ * replay location (always undef on master)
+
+mode must be specified.
+
+=cut
+
+sub lsn
+{
+ my ($self, $mode) = @_;
+ my %modes = (
+ 'insert' => 'pg_current_wal_insert_lsn()',
+ 'flush' => 'pg_current_wal_flush_lsn()',
+ 'write' => 'pg_current_wal_lsn()',
+ 'receive' => 'pg_last_wal_receive_lsn()',
+ 'replay' => 'pg_last_wal_replay_lsn()');
+
+ $mode = '<undef>' if !defined($mode);
+ croak "unknown mode for 'lsn': '$mode', valid modes are "
+ . join(', ', keys %modes)
+ if !defined($modes{$mode});
+
+ my $result = $self->safe_psql('postgres', "SELECT $modes{$mode}");
+ chomp($result);
+ if ($result eq '')
+ {
+ return;
+ }
+ else
+ {
+ return $result;
+ }
+}
+
+=pod
+
+=item $node->wait_for_catchup(standby_name, mode, target_lsn)
+
+Wait for the node with application_name standby_name (usually from node->name,
+also works for logical subscriptions)
+until its replication location in pg_stat_replication equals or passes the
+upstream's WAL insert point at the time this function is called. By default
+the replay_lsn is waited for, but 'mode' may be specified to wait for any of
+sent|write|flush|replay. The connection catching up must be in a streaming
+state.
+
+If there is no active replication connection from this peer, waits until
+poll_query_until timeout.
+
+Requires that the 'postgres' db exists and is accessible.
+
+target_lsn may be any arbitrary lsn, but is typically $master_node->lsn('insert').
+If omitted, pg_current_wal_lsn() is used.
+
+This is not a test. It die()s on failure.
+
+=cut
+
+sub wait_for_catchup
+{
+ my ($self, $standby_name, $mode, $target_lsn) = @_;
+ $mode = defined($mode) ? $mode : 'replay';
+ my %valid_modes =
+ ('sent' => 1, 'write' => 1, 'flush' => 1, 'replay' => 1);
+ croak "unknown mode $mode for 'wait_for_catchup', valid modes are "
+ . join(', ', keys(%valid_modes))
+ unless exists($valid_modes{$mode});
+
+ # Allow passing of a PostgresNode instance as shorthand
+ if (blessed($standby_name) && $standby_name->isa("PostgresNode"))
+ {
+ $standby_name = $standby_name->name;
+ }
+ my $lsn_expr;
+ if (defined($target_lsn))
+ {
+ $lsn_expr = "'$target_lsn'";
+ }
+ else
+ {
+ $lsn_expr = 'pg_current_wal_lsn()';
+ }
+ print "Waiting for replication conn "
+ . $standby_name . "'s "
+ . $mode
+ . "_lsn to pass "
+ . $lsn_expr . " on "
+ . $self->name . "\n";
+ my $query =
+ qq[SELECT $lsn_expr <= ${mode}_lsn AND state = 'streaming' FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
+ $self->poll_query_until('postgres', $query)
+ or croak "timed out waiting for catchup";
+ print "done\n";
+ return;
+}
+
+=pod
+
+=item $node->wait_for_slot_catchup(slot_name, mode, target_lsn)
+
+Wait for the named replication slot to equal or pass the supplied target_lsn.
+The location used is the restart_lsn unless mode is given, in which case it may
+be 'restart' or 'confirmed_flush'.
+
+Requires that the 'postgres' db exists and is accessible.
+
+This is not a test. It die()s on failure.
+
+If the slot is not active, will time out after poll_query_until's timeout.
+
+target_lsn may be any arbitrary lsn, but is typically $master_node->lsn('insert').
+
+Note that for logical slots, restart_lsn is held down by the oldest in-progress tx.
+
+=cut
+
+sub wait_for_slot_catchup
+{
+ my ($self, $slot_name, $mode, $target_lsn) = @_;
+ $mode = defined($mode) ? $mode : 'restart';
+ if (!($mode eq 'restart' || $mode eq 'confirmed_flush'))
+ {
+ croak "valid modes are restart, confirmed_flush";
+ }
+ croak 'target lsn must be specified' unless defined($target_lsn);
+ print "Waiting for replication slot "
+ . $slot_name . "'s "
+ . $mode
+ . "_lsn to pass "
+ . $target_lsn . " on "
+ . $self->name . "\n";
+ my $query =
+ qq[SELECT '$target_lsn' <= ${mode}_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name';];
+ $self->poll_query_until('postgres', $query)
+ or croak "timed out waiting for catchup";
+ print "done\n";
+ return;
+}
+
+=pod
+
+=item $node->query_hash($dbname, $query, @columns)
+
+Execute $query on $dbname, replacing any appearance of the string __COLUMNS__
+within the query with a comma-separated list of @columns.
+
+If __COLUMNS__ does not appear in the query, its result columns must EXACTLY
+match the order and number (but not necessarily alias) of supplied @columns.
+
+The query must return zero or one rows.
+
+Return a hash-ref representation of the results of the query, with any empty
+or null results as defined keys with an empty-string value. There is no way
+to differentiate between null and empty-string result fields.
+
+If the query returns zero rows, return a hash with all columns empty. There
+is no way to differentiate between zero rows returned and a row with only
+null columns.
+
+=cut
+
+sub query_hash
+{
+ my ($self, $dbname, $query, @columns) = @_;
+ croak 'calls in array context for multi-row results not supported yet'
+ if (wantarray);
+
+ # Replace __COLUMNS__ if found
+ substr($query, index($query, '__COLUMNS__'), length('__COLUMNS__')) =
+ join(', ', @columns)
+ if index($query, '__COLUMNS__') >= 0;
+ my $result = $self->safe_psql($dbname, $query);
+
+ # hash slice, see http://stackoverflow.com/a/16755894/398670 .
+ #
+ # Fills the hash with empty strings produced by x-operator element
+ # duplication if result is an empty row
+ #
+ my %val;
+ @val{@columns} =
+ $result ne '' ? split(qr/\|/, $result, -1) : ('',) x scalar(@columns);
+ return \%val;
+}
+
+=pod
+
+=item $node->slot(slot_name)
+
+Return hash-ref of replication slot data for the named slot, or a hash-ref with
+all values '' if not found. Does not differentiate between null and empty string
+for fields, no field is ever undef.
+
+The restart_lsn and confirmed_flush_lsn fields are returned verbatim, and also
+as a 2-list of [highword, lowword] integer. Since we rely on Perl 5.8.8 we can't
+"use bigint", it's from 5.20, and we can't assume we have Math::Bigint from CPAN
+either.
+
+=cut
+
+sub slot
+{
+ my ($self, $slot_name) = @_;
+ my @columns = (
+ 'plugin', 'slot_type', 'datoid', 'database',
+ 'active', 'active_pid', 'xmin', 'catalog_xmin',
+ 'restart_lsn');
+ return $self->query_hash(
+ 'postgres',
+ "SELECT __COLUMNS__ FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'",
+ @columns);
+}
+
+=pod
+
+=item $node->pg_recvlogical_upto(self, dbname, slot_name, endpos, timeout_secs, ...)
+
+Invoke pg_recvlogical to read from slot_name on dbname until LSN endpos, which
+corresponds to pg_recvlogical --endpos. Gives up after timeout (if nonzero).
+
+Disallows pg_recvlogical from internally retrying on error by passing --no-loop.
+
+Plugin options are passed as additional keyword arguments.
+
+If called in scalar context, returns stdout, and die()s on timeout or nonzero return.
+
+If called in array context, returns a tuple of (retval, stdout, stderr, timeout).
+timeout is the IPC::Run::Timeout object whose is_expired method can be tested
+to check for timeout. retval is undef on timeout.
+
+=cut
+
+sub pg_recvlogical_upto
+{
+ my ($self, $dbname, $slot_name, $endpos, $timeout_secs, %plugin_options)
+ = @_;
+ my ($stdout, $stderr);
+
+ my $timeout_exception = 'pg_recvlogical timed out';
+
+ croak 'slot name must be specified' unless defined($slot_name);
+ croak 'endpos must be specified' unless defined($endpos);
+
+ my @cmd = (
+ 'pg_recvlogical', '-S', $slot_name, '--dbname',
+ $self->connstr($dbname));
+ push @cmd, '--endpos', $endpos;
+ push @cmd, '-f', '-', '--no-loop', '--start';
+
+ while (my ($k, $v) = each %plugin_options)
+ {
+ croak "= is not permitted to appear in replication option name"
+ if ($k =~ qr/=/);
+ push @cmd, "-o", "$k=$v";
+ }
+
+ my $timeout;
+ $timeout =
+ IPC::Run::timeout($timeout_secs, exception => $timeout_exception)
+ if $timeout_secs;
+ my $ret = 0;
+
+ do
+ {
+ local $@;
+ eval {
+ IPC::Run::run(\@cmd, ">", \$stdout, "2>", \$stderr, $timeout);
+ $ret = $?;
+ };
+ my $exc_save = $@;
+ if ($exc_save)
+ {
+
+ # IPC::Run::run threw an exception. re-throw unless it's a
+ # timeout, which we'll handle by testing is_expired
+ die $exc_save
+ if (blessed($exc_save) || $exc_save !~ qr/$timeout_exception/);
+
+ $ret = undef;
+
+ die "Got timeout exception '$exc_save' but timer not expired?!"
+ unless $timeout->is_expired;
+
+ die
+ "$exc_save waiting for endpos $endpos with stdout '$stdout', stderr '$stderr'"
+ unless wantarray;
+ }
+ };
+
+ $stdout =~ s/\r\n/\n/g if $Config{osname} eq 'msys';
+ $stderr =~ s/\r\n/\n/g if $Config{osname} eq 'msys';
+
+ if (wantarray)
+ {
+ return ($ret, $stdout, $stderr, $timeout);
+ }
+ else
+ {
+ die
+ "pg_recvlogical exited with code '$ret', stdout '$stdout' and stderr '$stderr'"
+ if $ret;
+ return $stdout;
+ }
+}
+
+=pod
+
+=back
+
+=cut
+
+1;