summaryrefslogtreecommitdiffstats
path: root/src/test/subscription/t/022_twophase_cascade.pl
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:17:33 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:17:33 +0000
commit5e45211a64149b3c659b90ff2de6fa982a5a93ed (patch)
tree739caf8c461053357daa9f162bef34516c7bf452 /src/test/subscription/t/022_twophase_cascade.pl
parentInitial commit. (diff)
downloadpostgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.tar.xz
postgresql-15-5e45211a64149b3c659b90ff2de6fa982a5a93ed.zip
Adding upstream version 15.5.upstream/15.5
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/subscription/t/022_twophase_cascade.pl')
-rw-r--r--src/test/subscription/t/022_twophase_cascade.pl461
1 files changed, 461 insertions, 0 deletions
diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl
new file mode 100644
index 0000000..7a797f3
--- /dev/null
+++ b/src/test/subscription/t/022_twophase_cascade.pl
@@ -0,0 +1,461 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test cascading logical replication of 2PC.
+#
+# Includes tests for options 2PC (not-streaming) and also for 2PC (streaming).
+#
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################
+# Setup a cascade of pub/sub nodes.
+# node_A -> node_B -> node_C
+###############################
+
+# Initialize nodes
+# node_A
+my $node_A = PostgreSQL::Test::Cluster->new('node_A');
+$node_A->init(allows_streaming => 'logical');
+$node_A->append_conf(
+ 'postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
+$node_A->start;
+# node_B
+my $node_B = PostgreSQL::Test::Cluster->new('node_B');
+$node_B->init(allows_streaming => 'logical');
+$node_B->append_conf(
+ 'postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
+$node_B->start;
+# node_C
+my $node_C = PostgreSQL::Test::Cluster->new('node_C');
+$node_C->init(allows_streaming => 'logical');
+$node_C->append_conf(
+ 'postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
+$node_C->start;
+
+# Create some pre-existing content on node_A
+$node_A->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_A->safe_psql(
+ 'postgres', "
+ INSERT INTO tab_full SELECT generate_series(1,10);");
+
+# Create the same tables on node_B and node_C
+$node_B->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
+$node_C->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Create some pre-existing content on node_A (for streaming tests)
+$node_A->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_A->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Create the same tables on node_B and node_C
+# columns a and b are compatible with same table name on node_A
+$node_B->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
+);
+$node_C->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
+);
+
+# Setup logical replication
+
+# -----------------------
+# 2PC NON-STREAMING TESTS
+# -----------------------
+
+# node_A (pub) -> node_B (sub)
+my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
+$node_A->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full, test_tab");
+my $appname_B = 'tap_sub_B';
+$node_B->safe_psql(
+ 'postgres', "
+ CREATE SUBSCRIPTION tap_sub_B
+ CONNECTION '$node_A_connstr application_name=$appname_B'
+ PUBLICATION tap_pub_A
+ WITH (two_phase = on)");
+
+# node_B (pub) -> node_C (sub)
+my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
+$node_B->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full, test_tab");
+my $appname_C = 'tap_sub_C';
+$node_C->safe_psql(
+ 'postgres', "
+ CREATE SUBSCRIPTION tap_sub_C
+ CONNECTION '$node_B_connstr application_name=$appname_C'
+ PUBLICATION tap_pub_B
+ WITH (two_phase = on)");
+
+# Wait for subscribers to finish initialization
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# Also wait for two-phase to be enabled
+my $twophase_query =
+ "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
+$node_B->poll_query_until('postgres', $twophase_query)
+ or die "Timed out while waiting for subscriber to enable twophase";
+$node_C->poll_query_until('postgres', $twophase_query)
+ or die "Timed out while waiting for subscriber to enable twophase";
+
+is(1, 1, "Cascade setup is complete");
+
+my $result;
+
+###############################
+# check that 2PC gets replicated to subscriber(s)
+# then COMMIT PREPARED
+###############################
+
+# 2PC PREPARE
+$node_A->safe_psql(
+ 'postgres', "
+ BEGIN;
+ INSERT INTO tab_full VALUES (11);
+ PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is prepared on subscriber(s)
+$result =
+ $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result =
+ $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab_full';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check that transaction was committed on subscriber(s)
+$result = $node_B->safe_psql('postgres',
+ "SELECT count(*) FROM tab_full where a = 11;");
+is($result, qq(1), 'Row inserted via 2PC has committed on subscriber B');
+$result = $node_C->safe_psql('postgres',
+ "SELECT count(*) FROM tab_full where a = 11;");
+is($result, qq(1), 'Row inserted via 2PC has committed on subscriber C');
+
+# check the transaction state is ended on subscriber(s)
+$result =
+ $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber B');
+$result =
+ $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber C');
+
+###############################
+# check that 2PC gets replicated to subscriber(s)
+# then ROLLBACK PREPARED
+###############################
+
+# 2PC PREPARE
+$node_A->safe_psql(
+ 'postgres', "
+ BEGIN;
+ INSERT INTO tab_full VALUES (12);
+ PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is prepared on subscriber(s)
+$result =
+ $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result =
+ $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC ROLLBACK
+$node_A->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab_full';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check that transaction is aborted on subscriber(s)
+$result = $node_B->safe_psql('postgres',
+ "SELECT count(*) FROM tab_full where a = 12;");
+is($result, qq(0), 'Row inserted via 2PC is not present on subscriber B');
+$result = $node_C->safe_psql('postgres',
+ "SELECT count(*) FROM tab_full where a = 12;");
+is($result, qq(0), 'Row inserted via 2PC is not present on subscriber C');
+
+# check the transaction state is ended on subscriber(s)
+$result =
+ $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber B');
+$result =
+ $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber C');
+
+###############################
+# Test nested transactions with 2PC
+###############################
+
+# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
+$node_A->safe_psql(
+ 'postgres', "
+ BEGIN;
+ INSERT INTO tab_full VALUES (21);
+ SAVEPOINT sp_inner;
+ INSERT INTO tab_full VALUES (22);
+ ROLLBACK TO SAVEPOINT sp_inner;
+ PREPARE TRANSACTION 'outer';
+ ");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state prepared on subscriber(s)
+$result =
+ $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result =
+ $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is ended on subscriber
+$result =
+ $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber B');
+$result =
+ $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber C');
+
+# check inserts are visible at subscriber(s).
+# 22 should be rolled back.
+# 21 should be committed.
+$result = $node_B->safe_psql('postgres',
+ "SELECT a FROM tab_full where a IN (21,22);");
+is($result, qq(21), 'Rows committed are present on subscriber B');
+$result = $node_C->safe_psql('postgres',
+ "SELECT a FROM tab_full where a IN (21,22);");
+is($result, qq(21), 'Rows committed are present on subscriber C');
+
+# ---------------------
+# 2PC + STREAMING TESTS
+# ---------------------
+
+my $oldpid_B = $node_A->safe_psql(
+ 'postgres', "
+ SELECT pid FROM pg_stat_replication
+ WHERE application_name = '$appname_B' AND state = 'streaming';");
+my $oldpid_C = $node_B->safe_psql(
+ 'postgres', "
+ SELECT pid FROM pg_stat_replication
+ WHERE application_name = '$appname_C' AND state = 'streaming';");
+
+# Setup logical replication (streaming = on)
+
+$node_B->safe_psql(
+ 'postgres', "
+ ALTER SUBSCRIPTION tap_sub_B
+ SET (streaming = on);");
+$node_C->safe_psql(
+ 'postgres', "
+ ALTER SUBSCRIPTION tap_sub_C
+ SET (streaming = on)");
+
+# Wait for subscribers to finish initialization
+
+$node_A->poll_query_until(
+ 'postgres', "
+ SELECT pid != $oldpid_B FROM pg_stat_replication
+ WHERE application_name = '$appname_B' AND state = 'streaming';"
+) or die "Timed out while waiting for apply to restart";
+$node_B->poll_query_until(
+ 'postgres', "
+ SELECT pid != $oldpid_C FROM pg_stat_replication
+ WHERE application_name = '$appname_C' AND state = 'streaming';"
+) or die "Timed out while waiting for apply to restart";
+
+###############################
+# Test 2PC PREPARE / COMMIT PREPARED.
+# 1. Data is streamed as a 2PC transaction.
+# 2. Then do commit prepared.
+#
+# Expect all data is replicated on subscriber(s) after the commit.
+###############################
+
+# Insert, update and delete enough rows to exceed the 64kB limit.
+# Then 2PC PREPARE
+$node_A->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+ UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+ DELETE FROM test_tab WHERE mod(a,3) = 0;
+ PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is prepared on subscriber(s)
+$result =
+ $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result =
+ $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check that transaction was committed on subscriber(s)
+$result = $node_B->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334),
+ 'Rows inserted by 2PC have committed on subscriber B, and extra columns have local defaults'
+);
+$result = $node_C->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334),
+ 'Rows inserted by 2PC have committed on subscriber C, and extra columns have local defaults'
+);
+
+# check the transaction state is ended on subscriber(s)
+$result =
+ $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber B');
+$result =
+ $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber C');
+
+###############################
+# Test 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT.
+# 0. Cleanup from previous test leaving only 2 rows.
+# 1. Insert one more row.
+# 2. Record a SAVEPOINT.
+# 3. Data is streamed using 2PC.
+# 4. Do rollback to SAVEPOINT prior to the streamed inserts.
+# 5. Then COMMIT PREPARED.
+#
+# Expect data after the SAVEPOINT is aborted leaving only 3 rows (= 2 original + 1 from step 1).
+###############################
+
+# First, delete the data except for 2 rows (delete will be replicated)
+$node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT
+$node_A->safe_psql(
+ 'postgres', "
+ BEGIN;
+ INSERT INTO test_tab VALUES (9999, 'foobar');
+ SAVEPOINT sp_inner;
+ INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+ UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+ DELETE FROM test_tab WHERE mod(a,3) = 0;
+ ROLLBACK TO SAVEPOINT sp_inner;
+ PREPARE TRANSACTION 'outer';
+ ");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state prepared on subscriber(s)
+$result =
+ $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber B');
+$result =
+ $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber C');
+
+# 2PC COMMIT
+$node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_C);
+
+# check the transaction state is ended on subscriber
+$result =
+ $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber B');
+$result =
+ $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is ended on subscriber C');
+
+# check inserts are visible at subscriber(s).
+# All the streamed data (prior to the SAVEPOINT) should be rolled back.
+# (9999, 'foobar') should be committed.
+$result = $node_B->safe_psql('postgres',
+ "SELECT count(*) FROM test_tab where b = 'foobar';");
+is($result, qq(1), 'Rows committed are present on subscriber B');
+$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab;");
+is($result, qq(3), 'Rows committed are present on subscriber B');
+$result = $node_C->safe_psql('postgres',
+ "SELECT count(*) FROM test_tab where b = 'foobar';");
+is($result, qq(1), 'Rows committed are present on subscriber C');
+$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab;");
+is($result, qq(3), 'Rows committed are present on subscriber C');
+
+###############################
+# check all the cleanup
+###############################
+
+# cleanup the node_B => node_C pub/sub
+$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C");
+$result =
+ $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber node C');
+$result =
+ $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0),
+ 'check subscription relation status was dropped on subscriber node C');
+$result = $node_C->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0),
+ 'check replication origin was dropped on subscriber node C');
+$result =
+ $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher node B');
+
+# cleanup the node_A => node_B pub/sub
+$node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B");
+$result =
+ $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber node B');
+$result =
+ $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0),
+ 'check subscription relation status was dropped on subscriber node B');
+$result = $node_B->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0),
+ 'check replication origin was dropped on subscriber node B');
+$result =
+ $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher node A');
+
+# shutdown
+$node_C->stop('fast');
+$node_B->stop('fast');
+$node_A->stop('fast');
+
+done_testing();