# Copyright (c) 2021-2022, PostgreSQL Global Development Group # Test cascading logical replication of 2PC. # # Includes tests for options 2PC (not-streaming) and also for 2PC (streaming). # use strict; use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; ############################### # Setup a cascade of pub/sub nodes. # node_A -> node_B -> node_C ############################### # Initialize nodes # node_A my $node_A = PostgreSQL::Test::Cluster->new('node_A'); $node_A->init(allows_streaming => 'logical'); $node_A->append_conf( 'postgresql.conf', qq( max_prepared_transactions = 10 logical_decoding_work_mem = 64kB )); $node_A->start; # node_B my $node_B = PostgreSQL::Test::Cluster->new('node_B'); $node_B->init(allows_streaming => 'logical'); $node_B->append_conf( 'postgresql.conf', qq( max_prepared_transactions = 10 logical_decoding_work_mem = 64kB )); $node_B->start; # node_C my $node_C = PostgreSQL::Test::Cluster->new('node_C'); $node_C->init(allows_streaming => 'logical'); $node_C->append_conf( 'postgresql.conf', qq( max_prepared_transactions = 10 logical_decoding_work_mem = 64kB )); $node_C->start; # Create some pre-existing content on node_A $node_A->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); $node_A->safe_psql( 'postgres', " INSERT INTO tab_full SELECT generate_series(1,10);"); # Create the same tables on node_B and node_C $node_B->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); $node_C->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); # Create some pre-existing content on node_A (for streaming tests) $node_A->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b varchar)"); $node_A->safe_psql('postgres', "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); # Create the same tables on node_B and node_C # columns a and b are compatible with same table name on node_A $node_B->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)" ); $node_C->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)" ); # Setup logical replication # ----------------------- # 2PC NON-STREAMING TESTS # ----------------------- # node_A (pub) -> node_B (sub) my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; $node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full, test_tab"); my $appname_B = 'tap_sub_B'; $node_B->safe_psql( 'postgres', " CREATE SUBSCRIPTION tap_sub_B CONNECTION '$node_A_connstr application_name=$appname_B' PUBLICATION tap_pub_A WITH (two_phase = on)"); # node_B (pub) -> node_C (sub) my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; $node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full, test_tab"); my $appname_C = 'tap_sub_C'; $node_C->safe_psql( 'postgres', " CREATE SUBSCRIPTION tap_sub_C CONNECTION '$node_B_connstr application_name=$appname_C' PUBLICATION tap_pub_B WITH (two_phase = on)"); # Wait for subscribers to finish initialization $node_A->wait_for_catchup($appname_B); $node_B->wait_for_catchup($appname_C); # Also wait for two-phase to be enabled my $twophase_query = "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');"; $node_B->poll_query_until('postgres', $twophase_query) or die "Timed out while waiting for subscriber to enable twophase"; $node_C->poll_query_until('postgres', $twophase_query) or die "Timed out while waiting for subscriber to enable twophase"; is(1, 1, "Cascade setup is complete"); my $result; ############################### # check that 2PC gets replicated to subscriber(s) # then COMMIT PREPARED ############################### # 2PC PREPARE $node_A->safe_psql( 'postgres', " BEGIN; INSERT INTO tab_full VALUES (11); PREPARE TRANSACTION 'test_prepared_tab_full';"); $node_A->wait_for_catchup($appname_B); $node_B->wait_for_catchup($appname_C); # check the transaction state is prepared on subscriber(s) $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(1), 'transaction is prepared on subscriber B'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(1), 'transaction is prepared on subscriber C'); # 2PC COMMIT $node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab_full';"); $node_A->wait_for_catchup($appname_B); $node_B->wait_for_catchup($appname_C); # check that transaction was committed on subscriber(s) $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;"); is($result, qq(1), 'Row inserted via 2PC has committed on subscriber B'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 11;"); is($result, qq(1), 'Row inserted via 2PC has committed on subscriber C'); # check the transaction state is ended on subscriber(s) $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(0), 'transaction is committed on subscriber B'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(0), 'transaction is committed on subscriber C'); ############################### # check that 2PC gets replicated to subscriber(s) # then ROLLBACK PREPARED ############################### # 2PC PREPARE $node_A->safe_psql( 'postgres', " BEGIN; INSERT INTO tab_full VALUES (12); PREPARE TRANSACTION 'test_prepared_tab_full';"); $node_A->wait_for_catchup($appname_B); $node_B->wait_for_catchup($appname_C); # check the transaction state is prepared on subscriber(s) $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(1), 'transaction is prepared on subscriber B'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(1), 'transaction is prepared on subscriber C'); # 2PC ROLLBACK $node_A->safe_psql('postgres', "ROLLBACK PREPARED 'test_prepared_tab_full';"); $node_A->wait_for_catchup($appname_B); $node_B->wait_for_catchup($appname_C); # check that transaction is aborted on subscriber(s) $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;"); is($result, qq(0), 'Row inserted via 2PC is not present on subscriber B'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM tab_full where a = 12;"); is($result, qq(0), 'Row inserted via 2PC is not present on subscriber C'); # check the transaction state is ended on subscriber(s) $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(0), 'transaction is ended on subscriber B'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(0), 'transaction is ended on subscriber C'); ############################### # Test nested transactions with 2PC ############################### # 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT $node_A->safe_psql( 'postgres', " BEGIN; INSERT INTO tab_full VALUES (21); SAVEPOINT sp_inner; INSERT INTO tab_full VALUES (22); ROLLBACK TO SAVEPOINT sp_inner; PREPARE TRANSACTION 'outer'; "); $node_A->wait_for_catchup($appname_B); $node_B->wait_for_catchup($appname_C); # check the transaction state prepared on subscriber(s) $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(1), 'transaction is prepared on subscriber B'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(1), 'transaction is prepared on subscriber C'); # 2PC COMMIT $node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';"); $node_A->wait_for_catchup($appname_B); $node_B->wait_for_catchup($appname_C); # check the transaction state is ended on subscriber $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(0), 'transaction is ended on subscriber B'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(0), 'transaction is ended on subscriber C'); # check inserts are visible at subscriber(s). # 22 should be rolled back. # 21 should be committed. $result = $node_B->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);"); is($result, qq(21), 'Rows committed are present on subscriber B'); $result = $node_C->safe_psql('postgres', "SELECT a FROM tab_full where a IN (21,22);"); is($result, qq(21), 'Rows committed are present on subscriber C'); # --------------------- # 2PC + STREAMING TESTS # --------------------- my $oldpid_B = $node_A->safe_psql( 'postgres', " SELECT pid FROM pg_stat_replication WHERE application_name = '$appname_B' AND state = 'streaming';"); my $oldpid_C = $node_B->safe_psql( 'postgres', " SELECT pid FROM pg_stat_replication WHERE application_name = '$appname_C' AND state = 'streaming';"); # Setup logical replication (streaming = on) $node_B->safe_psql( 'postgres', " ALTER SUBSCRIPTION tap_sub_B SET (streaming = on);"); $node_C->safe_psql( 'postgres', " ALTER SUBSCRIPTION tap_sub_C SET (streaming = on)"); # Wait for subscribers to finish initialization $node_A->poll_query_until( 'postgres', " SELECT pid != $oldpid_B FROM pg_stat_replication WHERE application_name = '$appname_B' AND state = 'streaming';" ) or die "Timed out while waiting for apply to restart"; $node_B->poll_query_until( 'postgres', " SELECT pid != $oldpid_C FROM pg_stat_replication WHERE application_name = '$appname_C' AND state = 'streaming';" ) or die "Timed out while waiting for apply to restart"; ############################### # Test 2PC PREPARE / COMMIT PREPARED. # 1. Data is streamed as a 2PC transaction. # 2. Then do commit prepared. # # Expect all data is replicated on subscriber(s) after the commit. ############################### # Insert, update and delete enough rows to exceed the 64kB limit. # Then 2PC PREPARE $node_A->safe_psql( 'postgres', q{ BEGIN; INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; DELETE FROM test_tab WHERE mod(a,3) = 0; PREPARE TRANSACTION 'test_prepared_tab';}); $node_A->wait_for_catchup($appname_B); $node_B->wait_for_catchup($appname_C); # check the transaction state is prepared on subscriber(s) $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(1), 'transaction is prepared on subscriber B'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(1), 'transaction is prepared on subscriber C'); # 2PC COMMIT $node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); $node_A->wait_for_catchup($appname_B); $node_B->wait_for_catchup($appname_C); # check that transaction was committed on subscriber(s) $result = $node_B->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber B, and extra columns have local defaults' ); $result = $node_C->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed on subscriber C, and extra columns have local defaults' ); # check the transaction state is ended on subscriber(s) $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(0), 'transaction is committed on subscriber B'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(0), 'transaction is committed on subscriber C'); ############################### # Test 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT. # 0. Cleanup from previous test leaving only 2 rows. # 1. Insert one more row. # 2. Record a SAVEPOINT. # 3. Data is streamed using 2PC. # 4. Do rollback to SAVEPOINT prior to the streamed inserts. # 5. Then COMMIT PREPARED. # # Expect data after the SAVEPOINT is aborted leaving only 3 rows (= 2 original + 1 from step 1). ############################### # First, delete the data except for 2 rows (delete will be replicated) $node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); # 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT $node_A->safe_psql( 'postgres', " BEGIN; INSERT INTO test_tab VALUES (9999, 'foobar'); SAVEPOINT sp_inner; INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; DELETE FROM test_tab WHERE mod(a,3) = 0; ROLLBACK TO SAVEPOINT sp_inner; PREPARE TRANSACTION 'outer'; "); $node_A->wait_for_catchup($appname_B); $node_B->wait_for_catchup($appname_C); # check the transaction state prepared on subscriber(s) $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(1), 'transaction is prepared on subscriber B'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(1), 'transaction is prepared on subscriber C'); # 2PC COMMIT $node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';"); $node_A->wait_for_catchup($appname_B); $node_B->wait_for_catchup($appname_C); # check the transaction state is ended on subscriber $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(0), 'transaction is ended on subscriber B'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(0), 'transaction is ended on subscriber C'); # check inserts are visible at subscriber(s). # All the streamed data (prior to the SAVEPOINT) should be rolled back. # (9999, 'foobar') should be committed. $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab where b = 'foobar';"); is($result, qq(1), 'Rows committed are present on subscriber B'); $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab;"); is($result, qq(3), 'Rows committed are present on subscriber B'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab where b = 'foobar';"); is($result, qq(1), 'Rows committed are present on subscriber C'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab;"); is($result, qq(3), 'Rows committed are present on subscriber C'); ############################### # check all the cleanup ############################### # cleanup the node_B => node_C pub/sub $node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C"); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); is($result, qq(0), 'check subscription was dropped on subscriber node C'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); is($result, qq(0), 'check subscription relation status was dropped on subscriber node C'); $result = $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); is($result, qq(0), 'check replication origin was dropped on subscriber node C'); $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); is($result, qq(0), 'check replication slot was dropped on publisher node B'); # cleanup the node_A => node_B pub/sub $node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B"); $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); is($result, qq(0), 'check subscription was dropped on subscriber node B'); $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); is($result, qq(0), 'check subscription relation status was dropped on subscriber node B'); $result = $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); is($result, qq(0), 'check replication origin was dropped on subscriber node B'); $result = $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); is($result, qq(0), 'check replication slot was dropped on publisher node A'); # shutdown $node_C->stop('fast'); $node_B->stop('fast'); $node_A->stop('fast'); done_testing();