summaryrefslogtreecommitdiffstats
path: root/src/test/recovery/t/006_logical_decoding.pl
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/recovery/t/006_logical_decoding.pl')
-rw-r--r--src/test/recovery/t/006_logical_decoding.pl190
1 files changed, 190 insertions, 0 deletions
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
new file mode 100644
index 0000000..78229a7
--- /dev/null
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -0,0 +1,190 @@
+# Testing of logical decoding using SQL interface and/or pg_recvlogical
+#
+# Most logical decoding tests are in contrib/test_decoding. This module
+# is for work that doesn't fit well there, like where server restarts
+# are required.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 14;
+use Config;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->append_conf(
+ 'postgresql.conf', qq(
+wal_level = logical
+));
+$node_master->start;
+my $backup_name = 'master_backup';
+
+$node_master->safe_psql('postgres',
+ qq[CREATE TABLE decoding_test(x integer, y text);]);
+
+$node_master->safe_psql('postgres',
+ qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]
+);
+
+# Cover walsender error shutdown code
+my ($result, $stdout, $stderr) = $node_master->psql(
+ 'template1',
+ qq[START_REPLICATION SLOT test_slot LOGICAL 0/0],
+ replication => 'database');
+ok( $stderr =~
+ m/replication slot "test_slot" was not created in this database/,
+ "Logical decoding correctly fails to start");
+
+# Check case of walsender not using a database connection. Logical
+# decoding should not be allowed.
+($result, $stdout, $stderr) = $node_master->psql(
+ 'template1',
+ qq[START_REPLICATION SLOT s1 LOGICAL 0/1],
+ replication => 'true');
+ok($stderr =~ /ERROR: logical decoding requires a database connection/,
+ "Logical decoding fails on non-database connection");
+
+$node_master->safe_psql('postgres',
+ qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]
+);
+
+# Basic decoding works
+$result = $node_master->safe_psql('postgres',
+ qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
+is(scalar(my @foobar = split /^/m, $result),
+ 12, 'Decoding produced 12 rows inc BEGIN/COMMIT');
+
+# If we immediately crash the server we might lose the progress we just made
+# and replay the same changes again. But a clean shutdown should never repeat
+# the same changes when we use the SQL decoding interface.
+$node_master->restart('fast');
+
+# There are no new writes, so the result should be empty.
+$result = $node_master->safe_psql('postgres',
+ qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
+chomp($result);
+is($result, '', 'Decoding after fast restart repeats no rows');
+
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_master->safe_psql('postgres',
+ qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
+);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+my $stdout_sql = $node_master->safe_psql('postgres',
+ qq[SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_master->safe_psql('postgres',
+ "SELECT lsn FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
+);
+print "waiting to replay $endpos\n";
+
+# Insert some rows after $endpos, which we won't read.
+$node_master->safe_psql('postgres',
+ qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,50) s;]
+);
+
+my $stdout_recv = $node_master->pg_recvlogical_upto(
+ 'postgres', 'test_slot', $endpos, 180,
+ 'include-xids' => '0',
+ 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected,
+ 'got same expected output from pg_recvlogical decoding session');
+
+$node_master->poll_query_until('postgres',
+ "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'test_slot' AND active_pid IS NULL)"
+) or die "slot never became inactive";
+
+$stdout_recv = $node_master->pg_recvlogical_upto(
+ 'postgres', 'test_slot', $endpos, 180,
+ 'include-xids' => '0',
+ 'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes');
+
+$node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
+
+is( $node_master->psql(
+ 'otherdb',
+ "SELECT lsn FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
+ ),
+ 3,
+ 'replaying logical slot from another database fails');
+
+$node_master->safe_psql('otherdb',
+ qq[SELECT pg_create_logical_replication_slot('otherdb_slot', 'test_decoding');]
+);
+
+# make sure you can't drop a slot while active
+SKIP:
+{
+
+ # some Windows Perls at least don't like IPC::Run's start/kill_kill regime.
+ skip "Test fails on Windows perl", 2 if $Config{osname} eq 'MSWin32';
+
+ my $pg_recvlogical = IPC::Run::start(
+ [
+ 'pg_recvlogical', '-d', $node_master->connstr('otherdb'),
+ '-S', 'otherdb_slot', '-f', '-', '--start'
+ ]);
+ $node_master->poll_query_until('otherdb',
+ "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NOT NULL)"
+ ) or die "slot never became active";
+ is($node_master->psql('postgres', 'DROP DATABASE otherdb'),
+ 3, 'dropping a DB with active logical slots fails');
+ $pg_recvlogical->kill_kill;
+ is($node_master->slot('otherdb_slot')->{'slot_name'},
+ undef, 'logical slot still exists');
+}
+
+$node_master->poll_query_until('otherdb',
+ "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NULL)"
+) or die "slot never became inactive";
+
+is($node_master->psql('postgres', 'DROP DATABASE otherdb'),
+ 0, 'dropping a DB with inactive logical slots succeeds');
+is($node_master->slot('otherdb_slot')->{'slot_name'},
+ undef, 'logical slot was actually dropped with DB');
+
+# Test logical slot advancing and its durability.
+my $logical_slot = 'logical_slot';
+$node_master->safe_psql('postgres',
+ "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false);"
+);
+$node_master->psql(
+ 'postgres', "
+ CREATE TABLE tab_logical_slot (a int);
+ INSERT INTO tab_logical_slot VALUES (generate_series(1,10));");
+my $current_lsn =
+ $node_master->safe_psql('postgres', "SELECT pg_current_wal_lsn();");
+chomp($current_lsn);
+my $psql_rc = $node_master->psql('postgres',
+ "SELECT pg_replication_slot_advance('$logical_slot', '$current_lsn'::pg_lsn);"
+);
+is($psql_rc, '0', 'slot advancing with logical slot');
+my $logical_restart_lsn_pre = $node_master->safe_psql('postgres',
+ "SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$logical_slot';"
+);
+chomp($logical_restart_lsn_pre);
+# Slot advance should persist across clean restarts.
+$node_master->restart;
+my $logical_restart_lsn_post = $node_master->safe_psql('postgres',
+ "SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$logical_slot';"
+);
+chomp($logical_restart_lsn_post);
+ok(($logical_restart_lsn_pre cmp $logical_restart_lsn_post) == 0,
+ "logical slot advance persists across restarts");
+
+# done with the node
+$node_master->stop;