summaryrefslogtreecommitdiffstats
path: root/src/test/subscription/t/023_twophase_stream.pl
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/subscription/t/023_twophase_stream.pl')
-rw-r--r--src/test/subscription/t/023_twophase_stream.pl324
1 files changed, 324 insertions, 0 deletions
diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl
new file mode 100644
index 0000000..9b45410
--- /dev/null
+++ b/src/test/subscription/t/023_twophase_stream.pl
@@ -0,0 +1,324 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test logical replication of 2PC with streaming.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################
+# Setup
+###############################
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf(
+ 'postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf(
+ 'postgresql.conf', qq(
+max_prepared_transactions = 10
+));
+$node_subscriber->start;
+
+# Create some pre-existing content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
+);
+
+# Setup logical replication (streaming = on)
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql(
+ 'postgres', "
+ CREATE SUBSCRIPTION tap_sub
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION tap_pub
+ WITH (streaming = on, two_phase = on)");
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# Also wait for two-phase to be enabled
+my $twophase_query =
+ "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
+$node_subscriber->poll_query_until('postgres', $twophase_query)
+ or die "Timed out while waiting for subscriber to enable twophase";
+
+###############################
+# Check initial data was copied to subscriber
+###############################
+my $result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber');
+
+###############################
+# Test 2PC PREPARE / COMMIT PREPARED.
+# 1. Data is streamed as a 2PC transaction.
+# 2. Then do commit prepared.
+#
+# Expect all data is replicated on subscriber side after the commit.
+###############################
+
+# check that 2PC gets replicated to subscriber
+# Insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+ UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+ DELETE FROM test_tab WHERE mod(a,3) = 0;
+ PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# 2PC transaction gets committed
+$node_publisher->safe_psql('postgres',
+ "COMMIT PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334),
+ 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
+);
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# Test 2PC PREPARE / ROLLBACK PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC.
+# 3. Do rollback prepared.
+#
+# Expect data rolls back leaving only the original 2 rows.
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+ UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+ DELETE FROM test_tab WHERE mod(a,3) = 0;
+ PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# 2PC transaction gets aborted
+$node_publisher->safe_psql('postgres',
+ "ROLLBACK PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is aborted on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2),
+ 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
+# 1. insert, update and delete enough rows to exceed the 64kB limit.
+# 2. Then server crashes before the 2PC transaction is committed.
+# 3. After servers are restarted the pending transaction is committed.
+#
+# Expect all data is replicated on subscriber side after the commit.
+# Note: both publisher and subscriber do crash/restart.
+###############################
+
+$node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+ UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+ DELETE FROM test_tab WHERE mod(a,3) = 0;
+ PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_subscriber->stop('immediate');
+$node_publisher->stop('immediate');
+
+$node_publisher->start;
+$node_subscriber->start;
+
+# commit post the restart
+$node_publisher->safe_psql('postgres',
+ "COMMIT PREPARED 'test_prepared_tab';");
+$node_publisher->wait_for_catchup($appname);
+
+# check inserts are visible
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334),
+ 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
+);
+
+###############################
+# Do INSERT after the PREPARE but before ROLLBACK PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC.
+# 3. A single row INSERT is done which is after the PREPARE.
+# 4. Then do a ROLLBACK PREPARED.
+#
+# Expect the 2PC data rolls back leaving only 3 rows on the subscriber
+# (the original 2 + inserted 1).
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+ UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+ DELETE FROM test_tab WHERE mod(a,3) = 0;
+ PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# Insert a different record (now we are outside of the 2PC transaction)
+# Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (99999, 'foobar')");
+
+# 2PC transaction gets aborted
+$node_publisher->safe_psql('postgres',
+ "ROLLBACK PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is aborted on subscriber,
+# but the extra INSERT outside of the 2PC still was replicated
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3|3|3), 'check the outside insert was copied to subscriber');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is aborted on subscriber');
+
+###############################
+# Do INSERT after the PREPARE but before COMMIT PREPARED.
+# 1. Table is deleted back to 2 rows which are replicated on subscriber.
+# 2. Data is streamed using 2PC.
+# 3. A single row INSERT is done which is after the PREPARE.
+# 4. Then do a COMMIT PREPARED.
+#
+# Expect 2PC data + the extra row are on the subscriber
+# (the 3334 + inserted 1 = 3335).
+###############################
+
+# First, delete the data except for 2 rows (will be replicated)
+$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
+
+# Then insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql(
+ 'postgres', q{
+ BEGIN;
+ INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+ UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+ DELETE FROM test_tab WHERE mod(a,3) = 0;
+ PREPARE TRANSACTION 'test_prepared_tab';});
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is in prepared state on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(1), 'transaction is prepared on subscriber');
+
+# Insert a different record (now we are outside of the 2PC transaction)
+# Note: the 2PC transaction still holds row locks so make sure this insert is for a separare primary key
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (99999, 'foobar')");
+
+# 2PC transaction gets committed
+$node_publisher->safe_psql('postgres',
+ "COMMIT PREPARED 'test_prepared_tab';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3335|3335|3335),
+ 'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults'
+);
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber');
+
+###############################
+# check all the cleanup
+###############################
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0),
+ 'check subscription relation status was dropped on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
+
+done_testing();