diff options
Diffstat (limited to 'src/test/subscription/t/022_twophase_cascade.pl')
-rw-r--r-- | src/test/subscription/t/022_twophase_cascade.pl | 461 |
1 files changed, 461 insertions, 0 deletions
diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl new file mode 100644 index 0000000..7a797f3 --- /dev/null +++ b/src/test/subscription/t/022_twophase_cascade.pl @@ -0,0 +1,461 @@ + +# Copyright (c) 2021-2022, PostgreSQL Global Development Group + +# Test cascading logical replication of 2PC. +# +# Includes tests for options 2PC (not-streaming) and also for 2PC (streaming). +# +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +############################### +# Setup a cascade of pub/sub nodes. +# node_A -> node_B -> node_C +############################### + +# Initialize nodes +# node_A +my $node_A = PostgreSQL::Test::Cluster->new('node_A'); +$node_A->init(allows_streaming => 'logical'); +$node_A->append_conf( + 'postgresql.conf', qq( +max_prepared_transactions = 10 +logical_decoding_work_mem = 64kB +)); +$node_A->start; +# node_B +my $node_B = PostgreSQL::Test::Cluster->new('node_B'); +$node_B->init(allows_streaming => 'logical'); +$node_B->append_conf( + 'postgresql.conf', qq( +max_prepared_transactions = 10 +logical_decoding_work_mem = 64kB +)); +$node_B->start; +# node_C +my $node_C = PostgreSQL::Test::Cluster->new('node_C'); +$node_C->init(allows_streaming => 'logical'); +$node_C->append_conf( + 'postgresql.conf', qq( +max_prepared_transactions = 10 +logical_decoding_work_mem = 64kB +)); +$node_C->start; + +# Create some pre-existing content on node_A +$node_A->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_A->safe_psql( + 'postgres', " + INSERT INTO tab_full SELECT generate_series(1,10);"); + +# Create the same tables on node_B and node_C +$node_B->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); +$node_C->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Create some pre-existing content on node_A (for streaming tests) +$node_A->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_A->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Create the same tables on node_B and node_C +# columns a and b are compatible with same table name on node_A +$node_B->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)" +); +$node_C->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)" +); + +# Setup logical replication + +# ----------------------- +# 2PC NON-STREAMING TESTS +# ----------------------- + +# node_A (pub) -> node_B (sub) +my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; +$node_A->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full, test_tab"); +my $appname_B = 'tap_sub_B'; +$node_B->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_B + CONNECTION '$node_A_connstr application_name=$appname_B' + PUBLICATION tap_pub_A + WITH (two_phase = on)"); + +# node_B (pub) -> node_C (sub) +my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; +$node_B->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full, test_tab"); +my $appname_C = 'tap_sub_C'; +$node_C->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_C + CONNECTION '$node_B_connstr application_name=$appname_C' + PUBLICATION tap_pub_B + WITH (two_phase = on)"); + +# Wait for subscribers to finish initialization +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# Also wait for two-phase to be enabled +my $twophase_query = + "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');"; +$node_B->poll_query_until('postgres', $twophase_query) + or die "Timed out while waiting for subscriber to enable twophase"; +$node_C->poll_query_until('postgres', $twophase_query) + or die "Timed out while waiting for subscriber to enable twophase"; + +is(1, 1, "Cascade setup is complete"); + +my $result; + +############################### +# check that 2PC gets replicated to subscriber(s) +# then COMMIT PREPARED +############################### + +# 2PC PREPARE +$node_A->safe_psql( + 'postgres', " + BEGIN; + INSERT INTO tab_full VALUES (11); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the transaction state is prepared on subscriber(s) +$result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber B'); +$result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber C'); + +# 2PC COMMIT +$node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab_full';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check that transaction was committed on subscriber(s) +$result = $node_B->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a = 11;"); +is($result, qq(1), 'Row inserted via 2PC has committed on subscriber B'); +$result = $node_C->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a = 11;"); +is($result, qq(1), 'Row inserted via 2PC has committed on subscriber C'); + +# check the transaction state is ended on subscriber(s) +$result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber B'); +$result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber C'); + +############################### +# check that 2PC gets replicated to subscriber(s) +# then ROLLBACK PREPARED +############################### + +# 2PC PREPARE +$node_A->safe_psql( + 'postgres', " + BEGIN; + INSERT INTO tab_full VALUES (12); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the transaction state is prepared on subscriber(s) +$result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber B'); +$result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber C'); + +# 2PC ROLLBACK +$node_A->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab_full';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check that transaction is aborted on subscriber(s) +$result = $node_B->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a = 12;"); +is($result, qq(0), 'Row inserted via 2PC is not present on subscriber B'); +$result = $node_C->safe_psql('postgres', + "SELECT count(*) FROM tab_full where a = 12;"); +is($result, qq(0), 'Row inserted via 2PC is not present on subscriber C'); + +# check the transaction state is ended on subscriber(s) +$result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber B'); +$result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber C'); + +############################### +# Test nested transactions with 2PC +############################### + +# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT +$node_A->safe_psql( + 'postgres', " + BEGIN; + INSERT INTO tab_full VALUES (21); + SAVEPOINT sp_inner; + INSERT INTO tab_full VALUES (22); + ROLLBACK TO SAVEPOINT sp_inner; + PREPARE TRANSACTION 'outer'; + "); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the transaction state prepared on subscriber(s) +$result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber B'); +$result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber C'); + +# 2PC COMMIT +$node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the transaction state is ended on subscriber +$result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber B'); +$result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber C'); + +# check inserts are visible at subscriber(s). +# 22 should be rolled back. +# 21 should be committed. +$result = $node_B->safe_psql('postgres', + "SELECT a FROM tab_full where a IN (21,22);"); +is($result, qq(21), 'Rows committed are present on subscriber B'); +$result = $node_C->safe_psql('postgres', + "SELECT a FROM tab_full where a IN (21,22);"); +is($result, qq(21), 'Rows committed are present on subscriber C'); + +# --------------------- +# 2PC + STREAMING TESTS +# --------------------- + +my $oldpid_B = $node_A->safe_psql( + 'postgres', " + SELECT pid FROM pg_stat_replication + WHERE application_name = '$appname_B' AND state = 'streaming';"); +my $oldpid_C = $node_B->safe_psql( + 'postgres', " + SELECT pid FROM pg_stat_replication + WHERE application_name = '$appname_C' AND state = 'streaming';"); + +# Setup logical replication (streaming = on) + +$node_B->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION tap_sub_B + SET (streaming = on);"); +$node_C->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION tap_sub_C + SET (streaming = on)"); + +# Wait for subscribers to finish initialization + +$node_A->poll_query_until( + 'postgres', " + SELECT pid != $oldpid_B FROM pg_stat_replication + WHERE application_name = '$appname_B' AND state = 'streaming';" +) or die "Timed out while waiting for apply to restart"; +$node_B->poll_query_until( + 'postgres', " + SELECT pid != $oldpid_C FROM pg_stat_replication + WHERE application_name = '$appname_C' AND state = 'streaming';" +) or die "Timed out while waiting for apply to restart"; + +############################### +# Test 2PC PREPARE / COMMIT PREPARED. +# 1. Data is streamed as a 2PC transaction. +# 2. Then do commit prepared. +# +# Expect all data is replicated on subscriber(s) after the commit. +############################### + +# Insert, update and delete enough rows to exceed the 64kB limit. +# Then 2PC PREPARE +$node_A->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the transaction state is prepared on subscriber(s) +$result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber B'); +$result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber C'); + +# 2PC COMMIT +$node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check that transaction was committed on subscriber(s) +$result = $node_B->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), + 'Rows inserted by 2PC have committed on subscriber B, and extra columns have local defaults' +); +$result = $node_C->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), + 'Rows inserted by 2PC have committed on subscriber C, and extra columns have local defaults' +); + +# check the transaction state is ended on subscriber(s) +$result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber B'); +$result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is committed on subscriber C'); + +############################### +# Test 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT. +# 0. Cleanup from previous test leaving only 2 rows. +# 1. Insert one more row. +# 2. Record a SAVEPOINT. +# 3. Data is streamed using 2PC. +# 4. Do rollback to SAVEPOINT prior to the streamed inserts. +# 5. Then COMMIT PREPARED. +# +# Expect data after the SAVEPOINT is aborted leaving only 3 rows (= 2 original + 1 from step 1). +############################### + +# First, delete the data except for 2 rows (delete will be replicated) +$node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); + +# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT +$node_A->safe_psql( + 'postgres', " + BEGIN; + INSERT INTO test_tab VALUES (9999, 'foobar'); + SAVEPOINT sp_inner; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + ROLLBACK TO SAVEPOINT sp_inner; + PREPARE TRANSACTION 'outer'; + "); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the transaction state prepared on subscriber(s) +$result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber B'); +$result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber C'); + +# 2PC COMMIT +$node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';"); + +$node_A->wait_for_catchup($appname_B); +$node_B->wait_for_catchup($appname_C); + +# check the transaction state is ended on subscriber +$result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber B'); +$result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'transaction is ended on subscriber C'); + +# check inserts are visible at subscriber(s). +# All the streamed data (prior to the SAVEPOINT) should be rolled back. +# (9999, 'foobar') should be committed. +$result = $node_B->safe_psql('postgres', + "SELECT count(*) FROM test_tab where b = 'foobar';"); +is($result, qq(1), 'Rows committed are present on subscriber B'); +$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab;"); +is($result, qq(3), 'Rows committed are present on subscriber B'); +$result = $node_C->safe_psql('postgres', + "SELECT count(*) FROM test_tab where b = 'foobar';"); +is($result, qq(1), 'Rows committed are present on subscriber C'); +$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab;"); +is($result, qq(3), 'Rows committed are present on subscriber C'); + +############################### +# check all the cleanup +############################### + +# cleanup the node_B => node_C pub/sub +$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C"); +$result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber node C'); +$result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), + 'check subscription relation status was dropped on subscriber node C'); +$result = $node_C->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), + 'check replication origin was dropped on subscriber node C'); +$result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher node B'); + +# cleanup the node_A => node_B pub/sub +$node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B"); +$result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber node B'); +$result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), + 'check subscription relation status was dropped on subscriber node B'); +$result = $node_B->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), + 'check replication origin was dropped on subscriber node B'); +$result = + $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher node A'); + +# shutdown +$node_C->stop('fast'); +$node_B->stop('fast'); +$node_A->stop('fast'); + +done_testing(); |