# Copyright (c) 2021-2023, PostgreSQL Global Development Group # Test streaming of large transaction with DDL and subtransactions # # This file is mainly to test the DDL/DML interaction of the publisher side, # so we didn't add a parallel apply version for the tests in this file. use strict; use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); $node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); $node_publisher->start; # Create subscriber node my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init(allows_streaming => 'logical'); $node_subscriber->start; # Create some preexisting content on publisher $node_publisher->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b varchar)"); $node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); # Setup structure on subscriber $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT, f INT)" ); # Setup logical replication my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); # Wait for initial table sync to finish $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|0|0), 'check initial data was copied to subscriber'); # a small (non-streamed) transaction with DDL and DML $node_publisher->safe_psql( 'postgres', q{ BEGIN; INSERT INTO test_tab VALUES (3, md5(3::text)); ALTER TABLE test_tab ADD COLUMN c INT; SAVEPOINT s1; INSERT INTO test_tab VALUES (4, md5(4::text), -4); COMMIT; }); # large (streamed) transaction with DDL and DML $node_publisher->safe_psql( 'postgres', q{ BEGIN; INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(5, 1000) s(i); ALTER TABLE test_tab ADD COLUMN d INT; SAVEPOINT s1; INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001, 2000) s(i); COMMIT; }); # a small (non-streamed) transaction with DDL and DML $node_publisher->safe_psql( 'postgres', q{ BEGIN; INSERT INTO test_tab VALUES (2001, md5(2001::text), -2001, 2*2001); ALTER TABLE test_tab ADD COLUMN e INT; SAVEPOINT s1; INSERT INTO test_tab VALUES (2002, md5(2002::text), -2002, 2*2002, -3*2002); COMMIT; }); $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d), count(e) FROM test_tab"); is($result, qq(2002|1999|1002|1), 'check data was copied to subscriber in streaming mode and extra columns contain local defaults' ); # A large (streamed) transaction with DDL and DML. One of the DDL is performed # after DML to ensure that we invalidate the schema sent for test_tab so that # the next transaction has to send the schema again. $node_publisher->safe_psql( 'postgres', q{ BEGIN; INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(2003,5000) s(i); ALTER TABLE test_tab ADD COLUMN f INT; COMMIT; }); # A small transaction that won't get streamed. This is just to ensure that we # send the schema again to reflect the last column added in the previous test. $node_publisher->safe_psql( 'postgres', q{ BEGIN; INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i, 4*i FROM generate_series(5001,5005) s(i); COMMIT; }); $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d), count(e), count(f) FROM test_tab"); is($result, qq(5005|5002|4005|3004|5), 'check data was copied to subscriber for both streaming and non-streaming transactions' ); $node_subscriber->stop; $node_publisher->stop; done_testing();