diff options
Diffstat (limited to 'src/test/subscription/t')
22 files changed, 4331 insertions, 0 deletions
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl new file mode 100644 index 0000000..7dd69ca --- /dev/null +++ b/src/test/subscription/t/001_rep_changes.pl @@ -0,0 +1,539 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Basic logical replication test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 32; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql( + 'postgres', + "CREATE FUNCTION public.pg_get_replica_identity_index(int) + RETURNS regclass LANGUAGE sql AS 'SELECT 1/0'"); # shall not call +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a"); +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_mixed (a int primary key, b text, c numeric)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_mixed (a, b, c) VALUES (1, 'foo', 1.1)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))" +); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full_pk (a int primary key, b text)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_full_pk REPLICA IDENTITY FULL"); +# Let this table with REPLICA IDENTITY NOTHING, allowing only INSERT changes. +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_nothing (a int)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_nothing REPLICA IDENTITY NOTHING"); + +# Replicate the changes without replica identity index +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_no_replidentity_index(c1 int)"); +$node_publisher->safe_psql('postgres', + "CREATE INDEX idx_no_replidentity_index ON tab_no_replidentity_index(c1)" +); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_full_pk (a int primary key, b text)"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_full_pk REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_nothing (a int)"); + +# different column count and order than on publisher +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_mixed (d text default 'local', c numeric, b text, a int primary key)" +); + +# replication of the table with included index +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))" +); + +# replication of the table without replica identity index +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_no_replidentity_index(c1 int)"); +$node_subscriber->safe_psql('postgres', + "CREATE INDEX idx_no_replidentity_index ON tab_no_replidentity_index(c1)" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_ins_only WITH (publish = insert)"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_mixed, tab_include, tab_nothing, tab_full_pk, tab_no_replidentity_index" +); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only" +); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep"); +is($result, qq(0), 'check non-replicated table is empty on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins"); +is($result, qq(1002), 'check initial data was copied to subscriber'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_ins SET a = -a"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_rep SET a = -a"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_mixed VALUES (2, 'bar', 2.2)"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full_pk VALUES (1, 'foo'), (2, 'baz')"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_nothing VALUES (generate_series(1,20))"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_include SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab_include WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_include SET a = -a"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_no_replidentity_index VALUES(1)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1052|1|1002), 'check replicated inserts on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_rep"); +is($result, qq(20|-20|-1), 'check replicated changes on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_mixed"); +is( $result, qq(local|1.1|foo|1 +local|2.2|bar|2), 'check replicated changes with different column order'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_nothing"); +is($result, qq(20), 'check replicated changes with REPLICA IDENTITY NOTHING'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_include"); +is($result, qq(20|-20|-1), + 'check replicated changes with primary key index with included columns'); + +is( $node_subscriber->safe_psql( + 'postgres', q(SELECT c1 FROM tab_no_replidentity_index)), + 1, + "value replicated to subscriber without replica identity index"); + +# insert some duplicate rows +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full SELECT generate_series(1,10)"); + +# Test behaviour of ALTER PUBLICATION ... DROP TABLE +# +# When a publisher drops a table from publication, it should also stop sending +# its changes to subscribers. We look at the subscriber whether it receives +# the row that is inserted to the table in the publisher after it is dropped +# from the publication. +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1052|1|1002), + 'check rows on subscriber before table drop from publication'); + +# Drop the table from publication +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only DROP TABLE tab_ins"); + +# Insert a row in publisher, but publisher will not send this row to subscriber +$node_publisher->safe_psql('postgres', "INSERT INTO tab_ins VALUES(8888)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Subscriber will not receive the inserted row, after table is dropped from +# publication, so row count should remain the same. +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1052|1|1002), + 'check rows on subscriber after table drop from publication'); + +# Delete the inserted row in publisher +$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a = 8888"); + +# Add the table to publication again +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins"); + +# Refresh publication after table is added to publication +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); + +# Test replication with multiple publications for a subscription such that the +# operations are performed on the table from the first publication in the list. + +# Create tables on publisher +$node_publisher->safe_psql('postgres', "CREATE TABLE temp1 (a int)"); +$node_publisher->safe_psql('postgres', "CREATE TABLE temp2 (a int)"); + +# Create tables on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE temp1 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE temp2 (a int)"); + +# Setup logical replication that will only be used for this test +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_temp1 FOR TABLE temp1 WITH (publish = insert)" +); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_temp2 FOR TABLE temp2"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_temp1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub_temp1, tap_pub_temp2" +); + +$node_publisher->wait_for_catchup('tap_sub_temp1'); + +# Also wait for initial table sync to finish +$synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Subscriber table will have no rows initially +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM temp1"); +is($result, qq(0), + 'check initial rows on subscriber with multiple publications'); + +# Insert a row into the table that's part of first publication in subscriber +# list of publications. +$node_publisher->safe_psql('postgres', "INSERT INTO temp1 VALUES (1)"); + +$node_publisher->wait_for_catchup('tap_sub_temp1'); + +# Subscriber should receive the inserted row +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM temp1"); +is($result, qq(1), 'check rows on subscriber with multiple publications'); + +# Drop subscription as we don't need it anymore +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_temp1"); + +# Drop publications as we don't need them anymore +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_temp1"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_temp2"); + +# Clean up the tables on both publisher and subscriber as we don't need them +$node_publisher->safe_psql('postgres', "DROP TABLE temp1"); +$node_publisher->safe_psql('postgres', "DROP TABLE temp2"); +$node_subscriber->safe_psql('postgres', "DROP TABLE temp1"); +$node_subscriber->safe_psql('postgres', "DROP TABLE temp2"); + +# add REPLICA IDENTITY FULL so we can update +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_full REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_full REPLICA IDENTITY FULL"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_full2 REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_full2 REPLICA IDENTITY FULL"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_ins REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_ins REPLICA IDENTITY FULL"); +# tab_mixed can use DEFAULT, since it has a primary key + +# and do the updates +$node_publisher->safe_psql('postgres', "UPDATE tab_full SET a = a * a"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET b = 'baz' WHERE a = 1"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_full_pk SET b = 'bar' WHERE a = 1"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_full"); +is($result, qq(20|1|100), + 'update works with REPLICA IDENTITY FULL and duplicate tuples'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT x FROM tab_full2 ORDER BY 1"); +is( $result, qq(a +bb +bb), + 'update works with REPLICA IDENTITY FULL and text datums'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_mixed ORDER BY a"); +is( $result, qq(local|1.1|baz|1 +local|2.2|bar|2), + 'update works with different column order and subscriber local values'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_full_pk ORDER BY a"); +is( $result, qq(1|bar +2|baz), + 'update works with REPLICA IDENTITY FULL and a primary key'); + +# Check that subscriber handles cases where update/delete target tuple +# is missing. We have to look for the DEBUG1 log messages about that, +# so temporarily bump up the log verbosity. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->reload; + +$node_subscriber->safe_psql('postgres', "DELETE FROM tab_full_pk"); + +# Note that the current location of the log file is not grabbed immediately +# after reloading the configuration, but after sending one SQL command to +# the node so as we are sure that the reloading has taken effect. +my $log_location = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "UPDATE tab_full_pk SET b = 'quux' WHERE a = 1"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_full_pk WHERE a = 2"); + +$node_publisher->wait_for_catchup('tap_sub'); + +my $logfile = slurp_file($node_subscriber->logfile, $log_location); +ok( $logfile =~ + qr/logical replication did not find row to be updated in replication target relation "tab_full_pk"/, + 'update target row is missing'); +ok( $logfile =~ + qr/logical replication did not find row to be deleted in replication target relation "tab_full_pk"/, + 'delete target row is missing'); + +$node_subscriber->append_conf('postgresql.conf', + "log_min_messages = warning"); +$node_subscriber->reload; + +# check behavior with toasted values + +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET b = repeat('xyzzy', 100000) WHERE a = 2"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, length(b), c, d FROM tab_mixed ORDER BY a"); +is( $result, qq(1|3|1.1|local +2|500000|2.2|local), + 'update transmits large column value'); + +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET c = 3.3 WHERE a = 2"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, length(b), c, d FROM tab_mixed ORDER BY a"); +is( $result, qq(1|3|1.1|local +2|500000|3.3|local), + 'update with non-transmitted large column value'); + +# check behavior with dropped columns + +# this update should get transmitted before the column goes away +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET b = 'bar', c = 2.2 WHERE a = 2"); + +$node_publisher->safe_psql('postgres', "ALTER TABLE tab_mixed DROP COLUMN b"); + +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET c = 11.11 WHERE a = 1"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_mixed ORDER BY a"); +is( $result, qq(local|11.11|baz|1 +local|2.2|bar|2), + 'update works with dropped publisher column'); + +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_mixed DROP COLUMN d"); + +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET c = 22.22 WHERE a = 2"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_mixed ORDER BY a"); +is( $result, qq(11.11|baz|1 +22.22|bar|2), + 'update works with dropped subscriber column'); + +# check that change of connection string and/or publication list causes +# restart of subscription workers. We check the state along with +# application_name to ensure that the walsender is (re)started. +# +# Not all of these are registered as tests as we need to poll for a change +# but the test suite will fail none the less when something goes wrong. +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr sslmode=disable'" +); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" +) or die "Timed out while waiting for apply to restart after changing CONNECTION"; + +$oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only WITH (copy_data = false)" +); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" +) or die "Timed out while waiting for apply to restart after changing PUBLICATION"; + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins SELECT generate_series(1001,1100)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep"); + +# Restart the publisher and check the state of the subscriber which +# should be in a streaming state after catching up. +$node_publisher->stop('fast'); +$node_publisher->start; + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1152|1|1100), + 'check replicated inserts after subscription publication change'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_rep"); +is($result, qq(20|-20|-1), + 'check changes skipped after subscription publication change'); + +# check alter publication (relcache invalidation etc) +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only SET (publish = 'insert, delete')"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_full"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 0"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = false)" +); +$node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(0)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# note that data are different on provider and subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1052|1|1002), + 'check replicated deletes after alter publication'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_full"); +is($result, qq(21|0|100), 'check replicated insert after alter publication'); + +# check restart on rename +$oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub RENAME TO tap_sub_renamed"); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub_renamed' AND state = 'streaming';" +) or die "Timed out while waiting for apply to restart after renaming SUBSCRIPTION"; + +# check all the cleanup +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), + 'check subscription relation status was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); + +# CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING +$node_publisher->append_conf( + 'postgresql.conf', qq( +wal_level=minimal +max_wal_senders=0 +)); +$node_publisher->start; +($result, my $retout, my $reterr) = $node_publisher->psql( + 'postgres', qq{ +BEGIN; +CREATE TABLE skip_wal(); +CREATE PUBLICATION tap_pub2 FOR TABLE skip_wal; +ROLLBACK; +}); +ok( $reterr =~ + m/WARNING: wal_level is insufficient to publish logical changes/, + 'CREATE PUBLICATION while wal_level=minimal'); diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl new file mode 100644 index 0000000..f915fad --- /dev/null +++ b/src/test/subscription/t/002_types.pl @@ -0,0 +1,568 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# This tests that more complex datatypes are replicated correctly +# by logical replication +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 4; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +my $ddl = qq( + CREATE EXTENSION hstore WITH SCHEMA public; + CREATE TABLE public.tst_one_array ( + a INTEGER PRIMARY KEY, + b INTEGER[] + ); + CREATE TABLE public.tst_arrays ( + a INTEGER[] PRIMARY KEY, + b TEXT[], + c FLOAT[], + d INTERVAL[] + ); + + CREATE TYPE public.tst_enum_t AS ENUM ('a', 'b', 'c', 'd', 'e'); + CREATE TABLE public.tst_one_enum ( + a INTEGER PRIMARY KEY, + b public.tst_enum_t + ); + CREATE TABLE public.tst_enums ( + a public.tst_enum_t PRIMARY KEY, + b public.tst_enum_t[] + ); + + CREATE TYPE public.tst_comp_basic_t AS (a FLOAT, b TEXT, c INTEGER); + CREATE TYPE public.tst_comp_enum_t AS (a FLOAT, b public.tst_enum_t, c INTEGER); + CREATE TYPE public.tst_comp_enum_array_t AS (a FLOAT, b public.tst_enum_t[], c INTEGER); + CREATE TABLE public.tst_one_comp ( + a INTEGER PRIMARY KEY, + b public.tst_comp_basic_t + ); + CREATE TABLE public.tst_comps ( + a public.tst_comp_basic_t PRIMARY KEY, + b public.tst_comp_basic_t[] + ); + CREATE TABLE public.tst_comp_enum ( + a INTEGER PRIMARY KEY, + b public.tst_comp_enum_t + ); + CREATE TABLE public.tst_comp_enum_array ( + a public.tst_comp_enum_t PRIMARY KEY, + b public.tst_comp_enum_t[] + ); + CREATE TABLE public.tst_comp_one_enum_array ( + a INTEGER PRIMARY KEY, + b public.tst_comp_enum_array_t + ); + CREATE TABLE public.tst_comp_enum_what ( + a public.tst_comp_enum_array_t PRIMARY KEY, + b public.tst_comp_enum_array_t[] + ); + + CREATE TYPE public.tst_comp_mix_t AS ( + a public.tst_comp_basic_t, + b public.tst_comp_basic_t[], + c public.tst_enum_t, + d public.tst_enum_t[] + ); + CREATE TABLE public.tst_comp_mix_array ( + a public.tst_comp_mix_t PRIMARY KEY, + b public.tst_comp_mix_t[] + ); + CREATE TABLE public.tst_range ( + a INTEGER PRIMARY KEY, + b int4range + ); + CREATE TABLE public.tst_range_array ( + a INTEGER PRIMARY KEY, + b TSTZRANGE, + c int8range[] + ); + CREATE TABLE public.tst_hstore ( + a INTEGER PRIMARY KEY, + b public.hstore + ); + + SET check_function_bodies=off; + CREATE FUNCTION public.monot_incr(int) RETURNS bool LANGUAGE sql + AS ' select \$1 > max(a) from public.tst_dom_constr; '; + CREATE DOMAIN monot_int AS int CHECK (monot_incr(VALUE)); + CREATE TABLE public.tst_dom_constr (a monot_int);); + +# Setup structure on both nodes +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR ALL TABLES"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)" +); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Wait for initial sync to finish as well +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Insert initial test data +$node_publisher->safe_psql( + 'postgres', qq( + -- test_tbl_one_array_col + INSERT INTO tst_one_array (a, b) VALUES + (1, '{1, 2, 3}'), + (2, '{2, 3, 1}'), + (3, '{3, 2, 1}'), + (4, '{4, 3, 2}'), + (5, '{5, NULL, 3}'); + + -- test_tbl_arrays + INSERT INTO tst_arrays (a, b, c, d) VALUES + ('{1, 2, 3}', '{"a", "b", "c"}', '{1.1, 2.2, 3.3}', '{"1 day", "2 days", "3 days"}'), + ('{2, 3, 1}', '{"b", "c", "a"}', '{2.2, 3.3, 1.1}', '{"2 minutes", "3 minutes", "1 minute"}'), + ('{3, 1, 2}', '{"c", "a", "b"}', '{3.3, 1.1, 2.2}', '{"3 years", "1 year", "2 years"}'), + ('{4, 1, 2}', '{"d", "a", "b"}', '{4.4, 1.1, 2.2}', '{"4 years", "1 year", "2 years"}'), + ('{5, NULL, NULL}', '{"e", NULL, "b"}', '{5.5, 1.1, NULL}', '{"5 years", NULL, NULL}'); + + -- test_tbl_single_enum + INSERT INTO tst_one_enum (a, b) VALUES + (1, 'a'), + (2, 'b'), + (3, 'c'), + (4, 'd'), + (5, NULL); + + -- test_tbl_enums + INSERT INTO tst_enums (a, b) VALUES + ('a', '{b, c}'), + ('b', '{c, a}'), + ('c', '{b, a}'), + ('d', '{c, b}'), + ('e', '{d, NULL}'); + + -- test_tbl_single_composites + INSERT INTO tst_one_comp (a, b) VALUES + (1, ROW(1.0, 'a', 1)), + (2, ROW(2.0, 'b', 2)), + (3, ROW(3.0, 'c', 3)), + (4, ROW(4.0, 'd', 4)), + (5, ROW(NULL, NULL, 5)); + + -- test_tbl_composites + INSERT INTO tst_comps (a, b) VALUES + (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]), + (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]), + (ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_basic_t]), + (ROW(4.0, 'd', 4), ARRAY[ROW(4, 'd', 3)::tst_comp_basic_t]), + (ROW(5.0, 'e', NULL), ARRAY[NULL, ROW(5, NULL, 5)::tst_comp_basic_t]); + + -- test_tbl_composite_with_enums + INSERT INTO tst_comp_enum (a, b) VALUES + (1, ROW(1.0, 'a', 1)), + (2, ROW(2.0, 'b', 2)), + (3, ROW(3.0, 'c', 3)), + (4, ROW(4.0, 'd', 4)), + (5, ROW(NULL, 'e', NULL)); + + -- test_tbl_composite_with_enums_array + INSERT INTO tst_comp_enum_array (a, b) VALUES + (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]), + (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]), + (ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_enum_t]), + (ROW(4.0, 'd', 3), ARRAY[ROW(3, 'd', 3)::tst_comp_enum_t]), + (ROW(5.0, 'e', 3), ARRAY[ROW(3, 'e', 3)::tst_comp_enum_t, NULL]); + + -- test_tbl_composite_with_single_enums_array_in_composite + INSERT INTO tst_comp_one_enum_array (a, b) VALUES + (1, ROW(1.0, '{a, b, c}', 1)), + (2, ROW(2.0, '{a, b, c}', 2)), + (3, ROW(3.0, '{a, b, c}', 3)), + (4, ROW(4.0, '{c, b, d}', 4)), + (5, ROW(5.0, '{NULL, e, NULL}', 5)); + + -- test_tbl_composite_with_enums_array_in_composite + INSERT INTO tst_comp_enum_what (a, b) VALUES + (ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]), + (ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]), + (ROW(3.0, '{c, a, b}', 1), ARRAY[ROW(3, '{c, a, b}', 1)::tst_comp_enum_array_t]), + (ROW(4.0, '{c, b, d}', 4), ARRAY[ROW(4, '{c, b, d}', 4)::tst_comp_enum_array_t]), + (ROW(5.0, '{c, NULL, b}', NULL), ARRAY[ROW(5, '{c, e, b}', 1)::tst_comp_enum_array_t]); + + -- test_tbl_mixed_composites + INSERT INTO tst_comp_mix_array (a, b) VALUES + (ROW( + ROW(1,'a',1), + ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t], + 'a', + '{a,b,NULL,c}'), + ARRAY[ + ROW( + ROW(1,'a',1), + ARRAY[ + ROW(1,'a',1)::tst_comp_basic_t, + ROW(2,'b',2)::tst_comp_basic_t, + NULL + ], + 'a', + '{a,b,c}' + )::tst_comp_mix_t + ] + ); + + -- test_tbl_range + INSERT INTO tst_range (a, b) VALUES + (1, '[1, 10]'), + (2, '[2, 20]'), + (3, '[3, 30]'), + (4, '[4, 40]'), + (5, '[5, 50]'); + + -- test_tbl_range_array + INSERT INTO tst_range_array (a, b, c) VALUES + (1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'), + (2, tstzrange('Sat Aug 02 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}'), + (3, tstzrange('Fri Aug 01 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[3,4]"}'), + (4, tstzrange('Thu Jul 31 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[4,5]", NULL, "[40,50]"}'), + (5, NULL, NULL); + + -- tst_hstore + INSERT INTO tst_hstore (a, b) VALUES + (1, '"a"=>"1"'), + (2, '"zzz"=>"foo"'), + (3, '"123"=>"321"'), + (4, '"yellow horse"=>"moaned"'); + + -- tst_dom_constr + INSERT INTO tst_dom_constr VALUES (10); +)); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Check the data on subscriber +my $result = $node_subscriber->safe_psql( + 'postgres', qq( + SET timezone = '+2'; + SELECT a, b FROM tst_one_array ORDER BY a; + SELECT a, b, c, d FROM tst_arrays ORDER BY a; + SELECT a, b FROM tst_one_enum ORDER BY a; + SELECT a, b FROM tst_enums ORDER BY a; + SELECT a, b FROM tst_one_comp ORDER BY a; + SELECT a, b FROM tst_comps ORDER BY a; + SELECT a, b FROM tst_comp_enum ORDER BY a; + SELECT a, b FROM tst_comp_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_one_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_enum_what ORDER BY a; + SELECT a, b FROM tst_comp_mix_array ORDER BY a; + SELECT a, b FROM tst_range ORDER BY a; + SELECT a, b, c FROM tst_range_array ORDER BY a; + SELECT a, b FROM tst_hstore ORDER BY a; +)); + +is( $result, '1|{1,2,3} +2|{2,3,1} +3|{3,2,1} +4|{4,3,2} +5|{5,NULL,3} +{1,2,3}|{a,b,c}|{1.1,2.2,3.3}|{"1 day","2 days","3 days"} +{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00} +{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"} +{4,1,2}|{d,a,b}|{4.4,1.1,2.2}|{"4 years","1 year","2 years"} +{5,NULL,NULL}|{e,NULL,b}|{5.5,1.1,NULL}|{"5 years",NULL,NULL} +1|a +2|b +3|c +4|d +5| +a|{b,c} +b|{c,a} +c|{b,a} +d|{c,b} +e|{d,NULL} +1|(1,a,1) +2|(2,b,2) +3|(3,c,3) +4|(4,d,4) +5|(,,5) +(1,a,1)|{"(1,a,1)"} +(2,b,2)|{"(2,b,2)"} +(3,c,3)|{"(3,c,3)"} +(4,d,4)|{"(4,d,3)"} +(5,e,)|{NULL,"(5,,5)"} +1|(1,a,1) +2|(2,b,2) +3|(3,c,3) +4|(4,d,4) +5|(,e,) +(1,a,1)|{"(1,a,1)"} +(2,b,2)|{"(2,b,2)"} +(3,c,3)|{"(3,c,3)"} +(4,d,3)|{"(3,d,3)"} +(5,e,3)|{"(3,e,3)",NULL} +1|(1,"{a,b,c}",1) +2|(2,"{a,b,c}",2) +3|(3,"{a,b,c}",3) +4|(4,"{c,b,d}",4) +5|(5,"{NULL,e,NULL}",5) +(1,"{a,b,c}",1)|{"(1,\"{a,b,c}\",1)"} +(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"} +(3,"{c,a,b}",1)|{"(3,\"{c,a,b}\",1)"} +(4,"{c,b,d}",4)|{"(4,\"{c,b,d}\",4)"} +(5,"{c,NULL,b}",)|{"(5,\"{c,e,b}\",1)"} +("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"} +1|[1,11) +2|[2,21) +3|[3,31) +4|[4,41) +5|[5,51) +1|["2014-08-04 00:00:00+02",infinity)|{"[1,3)","[10,21)"} +2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"} +3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"} +4|["2014-07-31 00:00:00+02","2014-08-04 00:00:00+02")|{"[4,6)",NULL,"[40,51)"} +5|| +1|"a"=>"1" +2|"zzz"=>"foo" +3|"123"=>"321" +4|"yellow horse"=>"moaned"', + 'check replicated inserts on subscriber'); + +# Run batch of updates +$node_publisher->safe_psql( + 'postgres', qq( + UPDATE tst_one_array SET b = '{4, 5, 6}' WHERE a = 1; + UPDATE tst_one_array SET b = '{4, 5, 6, 1}' WHERE a > 3; + UPDATE tst_arrays SET b = '{"1a", "2b", "3c"}', c = '{1.0, 2.0, 3.0}', d = '{"1 day 1 second", "2 days 2 seconds", "3 days 3 second"}' WHERE a = '{1, 2, 3}'; + UPDATE tst_arrays SET b = '{"c", "d", "e"}', c = '{3.0, 4.0, 5.0}', d = '{"3 day 1 second", "4 days 2 seconds", "5 days 3 second"}' WHERE a[1] > 3; + UPDATE tst_one_enum SET b = 'c' WHERE a = 1; + UPDATE tst_one_enum SET b = NULL WHERE a > 3; + UPDATE tst_enums SET b = '{e, NULL}' WHERE a = 'a'; + UPDATE tst_enums SET b = '{e, d}' WHERE a > 'c'; + UPDATE tst_one_comp SET b = ROW(1.0, 'A', 1) WHERE a = 1; + UPDATE tst_one_comp SET b = ROW(NULL, 'x', -1) WHERE a > 3; + UPDATE tst_comps SET b = ARRAY[ROW(9, 'x', -1)::tst_comp_basic_t] WHERE (a).a = 1.0; + UPDATE tst_comps SET b = ARRAY[NULL, ROW(9, 'x', NULL)::tst_comp_basic_t] WHERE (a).a > 3.9; + UPDATE tst_comp_enum SET b = ROW(1.0, NULL, NULL) WHERE a = 1; + UPDATE tst_comp_enum SET b = ROW(4.0, 'd', 44) WHERE a > 3; + UPDATE tst_comp_enum_array SET b = ARRAY[NULL, ROW(3, 'd', 3)::tst_comp_enum_t] WHERE a = ROW(1.0, 'a', 1)::tst_comp_enum_t; + UPDATE tst_comp_enum_array SET b = ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t, ROW(2, 'b', 2)::tst_comp_enum_t] WHERE (a).a > 3; + UPDATE tst_comp_one_enum_array SET b = ROW(1.0, '{a, e, c}', NULL) WHERE a = 1; + UPDATE tst_comp_one_enum_array SET b = ROW(4.0, '{c, b, d}', 4) WHERE a > 3; + UPDATE tst_comp_enum_what SET b = ARRAY[NULL, ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t, ROW(NULL, '{a, e, c}', 2)::tst_comp_enum_array_t] WHERE (a).a = 1; + UPDATE tst_comp_enum_what SET b = ARRAY[ROW(5, '{a, b, c}', 5)::tst_comp_enum_array_t] WHERE (a).a > 3; + UPDATE tst_comp_mix_array SET b[2] = NULL WHERE ((a).a).a = 1; + UPDATE tst_range SET b = '[100, 1000]' WHERE a = 1; + UPDATE tst_range SET b = '(1, 90)' WHERE a > 3; + UPDATE tst_range_array SET c = '{"[100, 1000]"}' WHERE a = 1; + UPDATE tst_range_array SET b = tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), c = '{NULL, "[11,9999999]"}' WHERE a > 3; + UPDATE tst_hstore SET b = '"updated"=>"value"' WHERE a < 3; + UPDATE tst_hstore SET b = '"also"=>"updated"' WHERE a = 3; +)); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Check the data on subscriber +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SET timezone = '+2'; + SELECT a, b FROM tst_one_array ORDER BY a; + SELECT a, b, c, d FROM tst_arrays ORDER BY a; + SELECT a, b FROM tst_one_enum ORDER BY a; + SELECT a, b FROM tst_enums ORDER BY a; + SELECT a, b FROM tst_one_comp ORDER BY a; + SELECT a, b FROM tst_comps ORDER BY a; + SELECT a, b FROM tst_comp_enum ORDER BY a; + SELECT a, b FROM tst_comp_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_one_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_enum_what ORDER BY a; + SELECT a, b FROM tst_comp_mix_array ORDER BY a; + SELECT a, b FROM tst_range ORDER BY a; + SELECT a, b, c FROM tst_range_array ORDER BY a; + SELECT a, b FROM tst_hstore ORDER BY a; +)); + +is( $result, '1|{4,5,6} +2|{2,3,1} +3|{3,2,1} +4|{4,5,6,1} +5|{4,5,6,1} +{1,2,3}|{1a,2b,3c}|{1,2,3}|{"1 day 00:00:01","2 days 00:00:02","3 days 00:00:03"} +{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00} +{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"} +{4,1,2}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"} +{5,NULL,NULL}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"} +1|c +2|b +3|c +4| +5| +a|{e,NULL} +b|{c,a} +c|{b,a} +d|{e,d} +e|{e,d} +1|(1,A,1) +2|(2,b,2) +3|(3,c,3) +4|(,x,-1) +5|(,x,-1) +(1,a,1)|{"(9,x,-1)"} +(2,b,2)|{"(2,b,2)"} +(3,c,3)|{"(3,c,3)"} +(4,d,4)|{NULL,"(9,x,)"} +(5,e,)|{NULL,"(9,x,)"} +1|(1,,) +2|(2,b,2) +3|(3,c,3) +4|(4,d,44) +5|(4,d,44) +(1,a,1)|{NULL,"(3,d,3)"} +(2,b,2)|{"(2,b,2)"} +(3,c,3)|{"(3,c,3)"} +(4,d,3)|{"(1,a,1)","(2,b,2)"} +(5,e,3)|{"(1,a,1)","(2,b,2)"} +1|(1,"{a,e,c}",) +2|(2,"{a,b,c}",2) +3|(3,"{a,b,c}",3) +4|(4,"{c,b,d}",4) +5|(4,"{c,b,d}",4) +(1,"{a,b,c}",1)|{NULL,"(1,\"{a,b,c}\",1)","(,\"{a,e,c}\",2)"} +(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"} +(3,"{c,a,b}",1)|{"(3,\"{c,a,b}\",1)"} +(4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"} +(5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"} +("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")",NULL} +1|[100,1001) +2|[2,21) +3|[3,31) +4|[2,90) +5|[2,90) +1|["2014-08-04 00:00:00+02",infinity)|{"[100,1001)"} +2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"} +3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"} +4|["2014-08-04 00:00:00+02",infinity)|{NULL,"[11,10000000)"} +5|["2014-08-04 00:00:00+02",infinity)|{NULL,"[11,10000000)"} +1|"updated"=>"value" +2|"updated"=>"value" +3|"also"=>"updated" +4|"yellow horse"=>"moaned"', + 'check replicated updates on subscriber'); + +# Run batch of deletes +$node_publisher->safe_psql( + 'postgres', qq( + DELETE FROM tst_one_array WHERE a = 1; + DELETE FROM tst_one_array WHERE b = '{2, 3, 1}'; + DELETE FROM tst_arrays WHERE a = '{1, 2, 3}'; + DELETE FROM tst_arrays WHERE a[1] = 2; + DELETE FROM tst_one_enum WHERE a = 1; + DELETE FROM tst_one_enum WHERE b = 'b'; + DELETE FROM tst_enums WHERE a = 'a'; + DELETE FROM tst_enums WHERE b[1] = 'b'; + DELETE FROM tst_one_comp WHERE a = 1; + DELETE FROM tst_one_comp WHERE (b).a = 2.0; + DELETE FROM tst_comps WHERE (a).b = 'a'; + DELETE FROM tst_comps WHERE ROW(3, 'c', 3)::tst_comp_basic_t = ANY(b); + DELETE FROM tst_comp_enum WHERE a = 1; + DELETE FROM tst_comp_enum WHERE (b).a = 2.0; + DELETE FROM tst_comp_enum_array WHERE a = ROW(1.0, 'a', 1)::tst_comp_enum_t; + DELETE FROM tst_comp_enum_array WHERE ROW(3, 'c', 3)::tst_comp_enum_t = ANY(b); + DELETE FROM tst_comp_one_enum_array WHERE a = 1; + DELETE FROM tst_comp_one_enum_array WHERE 'a' = ANY((b).b); + DELETE FROM tst_comp_enum_what WHERE (a).a = 1; + DELETE FROM tst_comp_enum_what WHERE (b[1]).b = '{c, a, b}'; + DELETE FROM tst_comp_mix_array WHERE ((a).a).a = 1; + DELETE FROM tst_range WHERE a = 1; + DELETE FROM tst_range WHERE '[10,20]' && b; + DELETE FROM tst_range_array WHERE a = 1; + DELETE FROM tst_range_array WHERE tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 05 00:00:00 2014 CEST'::timestamptz) && b; + DELETE FROM tst_hstore WHERE a = 1; +)); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Check the data on subscriber +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SET timezone = '+2'; + SELECT a, b FROM tst_one_array ORDER BY a; + SELECT a, b, c, d FROM tst_arrays ORDER BY a; + SELECT a, b FROM tst_one_enum ORDER BY a; + SELECT a, b FROM tst_enums ORDER BY a; + SELECT a, b FROM tst_one_comp ORDER BY a; + SELECT a, b FROM tst_comps ORDER BY a; + SELECT a, b FROM tst_comp_enum ORDER BY a; + SELECT a, b FROM tst_comp_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_one_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_enum_what ORDER BY a; + SELECT a, b FROM tst_comp_mix_array ORDER BY a; + SELECT a, b FROM tst_range ORDER BY a; + SELECT a, b, c FROM tst_range_array ORDER BY a; + SELECT a, b FROM tst_hstore ORDER BY a; +)); + +is( $result, '3|{3,2,1} +4|{4,5,6,1} +5|{4,5,6,1} +{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"} +{4,1,2}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"} +{5,NULL,NULL}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"} +3|c +4| +5| +b|{c,a} +d|{e,d} +e|{e,d} +3|(3,c,3) +4|(,x,-1) +5|(,x,-1) +(2,b,2)|{"(2,b,2)"} +(4,d,4)|{NULL,"(9,x,)"} +(5,e,)|{NULL,"(9,x,)"} +3|(3,c,3) +4|(4,d,44) +5|(4,d,44) +(2,b,2)|{"(2,b,2)"} +(4,d,3)|{"(1,a,1)","(2,b,2)"} +(5,e,3)|{"(1,a,1)","(2,b,2)"} +4|(4,"{c,b,d}",4) +5|(4,"{c,b,d}",4) +(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"} +(4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"} +(5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"} +2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"} +3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"} +2|"updated"=>"value" +3|"also"=>"updated" +4|"yellow horse"=>"moaned"', + 'check replicated deletes on subscriber'); + +# Test a domain with a constraint backed by a SQL-language function, +# which needs an active snapshot in order to operate. +$node_publisher->safe_psql('postgres', + "INSERT INTO tst_dom_constr VALUES (11)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT sum(a) FROM tst_dom_constr"); +is($result, '21', 'sql-function constraint on domain'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl new file mode 100644 index 0000000..1182a12 --- /dev/null +++ b/src/test/subscription/t/003_constraints.pl @@ -0,0 +1,139 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# This test checks that constraints work on subscriber +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 6; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Setup structure on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_fk (bid int PRIMARY KEY);"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, junk text, bid int REFERENCES tab_fk (bid));" +); + +# Setup structure on subscriber; column order intentionally different +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_fk (bid int PRIMARY KEY);"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid), junk text);" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR ALL TABLES;"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false)" +); + +$node_publisher->wait_for_catchup('tap_sub'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk (bid) VALUES (1);"); +# "junk" value is meant to be large enough to force out-of-line storage +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk_ref (id, bid, junk) VALUES (1, 1, repeat(pi()::text,20000));" +); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Check data on subscriber +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(bid), max(bid) FROM tab_fk;"); +is($result, qq(1|1|1), 'check replicated tab_fk inserts on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); +is($result, qq(1|1|1), 'check replicated tab_fk_ref inserts on subscriber'); + +# Drop the fk on publisher +$node_publisher->safe_psql('postgres', "DROP TABLE tab_fk CASCADE;"); + +# Insert data +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk_ref (id, bid) VALUES (2, 2);"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# FK is not enforced on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); +is($result, qq(2|1|2), 'check FK ignored on subscriber'); + +# Add replica trigger +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE FUNCTION filter_basic_dml_fn() RETURNS TRIGGER AS \$\$ +BEGIN + IF (TG_OP = 'INSERT') THEN + IF (NEW.id < 10) THEN + RETURN NEW; + ELSE + RETURN NULL; + END IF; + ELSIF (TG_OP = 'UPDATE') THEN + RETURN NULL; + ELSE + RAISE WARNING 'Unknown action'; + RETURN NULL; + END IF; +END; +\$\$ LANGUAGE plpgsql; +CREATE TRIGGER filter_basic_dml_trg + BEFORE INSERT OR UPDATE OF bid ON tab_fk_ref + FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn(); +ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg; +}); + +# Insert data +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk_ref (id, bid) VALUES (10, 10);"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# The trigger should cause the insert to be skipped on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); +is($result, qq(2|1|2), 'check replica insert trigger applied on subscriber'); + +# Update data +$node_publisher->safe_psql('postgres', + "UPDATE tab_fk_ref SET bid = 2 WHERE bid = 1;"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# The trigger should cause the update to be skipped on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); +is($result, qq(2|1|2), + 'check replica update column trigger applied on subscriber'); + +# Update on a column not specified in the trigger, but it will trigger +# anyway because logical replication ships all columns in an update. +$node_publisher->safe_psql('postgres', + "UPDATE tab_fk_ref SET id = 6 WHERE id = 1;"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(id), max(id) FROM tab_fk_ref;"); +is($result, qq(2|1|2), + 'check column trigger applied even on update for other column'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl new file mode 100644 index 0000000..b3c91af --- /dev/null +++ b/src/test/subscription/t/004_sync.pl @@ -0,0 +1,180 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Tests for logical replication table syncing +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 8; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + "wal_retrieve_retry_interval = 1ms"); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep SELECT generate_series(1,10)"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR ALL TABLES"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" +); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); +is($result, qq(10), 'initial data synced for first sub'); + +# drop subscription so that there is unreplicated data +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep SELECT generate_series(11,20)"); + +# recreate the subscription, it will try to do initial copy +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" +); + +# but it will be stuck on data copy as it will fail on constraint +my $started_query = "SELECT srsubstate = 'd' FROM pg_subscription_rel;"; +$node_subscriber->poll_query_until('postgres', $started_query) + or die "Timed out while waiting for subscriber to start sync"; + +# remove the conflicting data +$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;"); + +# wait for sync to finish this time +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# check that all data is synced +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); +is($result, qq(20), 'initial data synced for second sub'); + +# now check another subscription for the same node pair +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false)" +); + +# wait for it to start +$node_subscriber->poll_query_until('postgres', + "SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL" +) or die "Timed out while waiting for subscriber to start"; + +# and drop both subscriptions +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2"); + +# check subscriptions are removed +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'second and third sub are dropped'); + +# remove the conflicting data +$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;"); + +# recreate the subscription again +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" +); + +# and wait for data sync to finish again +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# check that all data is synced +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); +is($result, qq(20), 'initial data synced for fourth sub'); + +# add new table on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_rep_next (a int)"); + +# setup structure with existing data on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rep_next (a) AS SELECT generate_series(1,10)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_rep_next"); +is($result, qq(0), 'no data for table added after subscription initialized'); + +# ask for data sync +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); + +# wait for sync to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_rep_next"); +is($result, qq(10), + 'data for table added after subscription initialized are now synced'); + +# Add some data +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep_next SELECT generate_series(1,10)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_rep_next"); +is($result, qq(20), + 'changes for table added after subscription initialized replicated'); + +# clean up +$node_publisher->safe_psql('postgres', "DROP TABLE tab_rep_next"); +$node_subscriber->safe_psql('postgres', "DROP TABLE tab_rep_next"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +# Table tap_rep already has the same records on both publisher and subscriber +# at this time. Recreate the subscription which will do the initial copy of +# the table again and fails due to unique constraint violation. +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" +); + +$result = $node_subscriber->poll_query_until('postgres', $started_query) + or die "Timed out while waiting for subscriber to start sync"; + +# DROP SUBSCRIPTION must clean up slots on the publisher side when the +# subscriber is stuck on data copy for constraint violation. +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), + 'DROP SUBSCRIPTION during error can clean up the slots on the publisher'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/test/subscription/t/005_encoding.pl b/src/test/subscription/t/005_encoding.pl new file mode 100644 index 0000000..a3f56a4 --- /dev/null +++ b/src/test/subscription/t/005_encoding.pl @@ -0,0 +1,55 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test replication between databases with different encodings +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 1; + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init( + allows_streaming => 'logical', + extra => [ '--locale=C', '--encoding=UTF8' ]); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init( + allows_streaming => 'logical', + extra => [ '--locale=C', '--encoding=LATIN1' ]); +$node_subscriber->start; + +my $ddl = "CREATE TABLE test1 (a int, b text);"; +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION mypub FOR ALL TABLES;"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" +); + +$node_publisher->wait_for_catchup('mysub'); + +# Wait for initial sync to finish as well +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_publisher->safe_psql('postgres', + q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8 + +$node_publisher->wait_for_catchup('mysub'); + +is( $node_subscriber->safe_psql( + 'postgres', q{SELECT a FROM test1 WHERE b = E'Mot\xf6rhead'} + ), # LATIN1 + qq(1), + 'data replicated to subscriber'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/006_rewrite.pl b/src/test/subscription/t/006_rewrite.pl new file mode 100644 index 0000000..37e05a4 --- /dev/null +++ b/src/test/subscription/t/006_rewrite.pl @@ -0,0 +1,68 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test logical replication behavior with heap rewrites +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $ddl = "CREATE TABLE test1 (a int, b text);"; +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION mypub FOR ALL TABLES;"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" +); + +$node_publisher->wait_for_catchup('mysub'); + +# Wait for initial sync to finish as well +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_publisher->safe_psql('postgres', + q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');}); + +$node_publisher->wait_for_catchup('mysub'); + +is( $node_subscriber->safe_psql('postgres', q{SELECT a, b FROM test1}), + qq(1|one +2|two), + 'initial data replicated to subscriber'); + +# DDL that causes a heap rewrite +my $ddl2 = "ALTER TABLE test1 ADD c int NOT NULL DEFAULT 0;"; +$node_subscriber->safe_psql('postgres', $ddl2); +$node_publisher->safe_psql('postgres', $ddl2); + +$node_publisher->wait_for_catchup('mysub'); + +$node_publisher->safe_psql('postgres', + q{INSERT INTO test1 (a, b, c) VALUES (3, 'three', 33);}); + +$node_publisher->wait_for_catchup('mysub'); + +is( $node_subscriber->safe_psql('postgres', q{SELECT a, b, c FROM test1}), + qq(1|one|0 +2|two|0 +3|three|33), + 'data replicated to subscriber'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl new file mode 100644 index 0000000..dd10d5c --- /dev/null +++ b/src/test/subscription/t/007_ddl.pl @@ -0,0 +1,45 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test some logical replication DDL behavior +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 1; + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $ddl = "CREATE TABLE test1 (a int, b text);"; +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION mypub FOR ALL TABLES;"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" +); + +$node_publisher->wait_for_catchup('mysub'); + +$node_subscriber->safe_psql( + 'postgres', q{ +BEGIN; +ALTER SUBSCRIPTION mysub DISABLE; +ALTER SUBSCRIPTION mysub SET (slot_name = NONE); +DROP SUBSCRIPTION mysub; +COMMIT; +}); + +pass "subscription disable and drop in same transaction did not hang"; + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/008_diff_schema.pl b/src/test/subscription/t/008_diff_schema.pl new file mode 100644 index 0000000..a04a798 --- /dev/null +++ b/src/test/subscription/t/008_diff_schema.pl @@ -0,0 +1,128 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test behavior with different schema on subscriber +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 5; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999, e int GENERATED BY DEFAULT AS IDENTITY)" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR ALL TABLES"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" +); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +# Update the rows on the publisher and check the additional columns on +# subscriber didn't change +$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(b)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999), count(e) FROM test_tab"); +is($result, qq(2|2|2|2), + 'check extra columns contain local defaults after copy'); + +# Change the local values of the extra columns on the subscriber, +# update publisher, and check that subscriber retains the expected +# values +$node_subscriber->safe_psql('postgres', + "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'" +); +$node_publisher->safe_psql('postgres', + "UPDATE test_tab SET b = md5(a::text)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab" +); +is($result, qq(2|2|2), 'check extra columns contain locally changed data'); + +# Another insert +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (3, 'baz')"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999), count(e) FROM test_tab"); +is($result, qq(3|3|3|3), + 'check extra columns contain local defaults after apply'); + + +# Check a bug about adding a replica identity column on the subscriber +# that was not yet mapped to a column on the publisher. This would +# result in errors on the subscriber and replication thus not +# progressing. +# (https://www.postgresql.org/message-id/flat/a9139c29-7ddd-973b-aa7f-71fed9c38d75%40minerva.info) + +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab2 (a int)"); + +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab2 (a int)"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); + +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Add replica identity column. (The serial is not necessary, but it's +# a convenient way to get a default on the new column so that rows +# from the publisher that don't have the column yet can be inserted.) +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab2 ADD COLUMN b serial PRIMARY KEY"); + +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab2 VALUES (1)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +is( $node_subscriber->safe_psql( + 'postgres', "SELECT count(*), min(a), max(a) FROM test_tab2"), + qq(1|1|1), + 'check replicated inserts on subscriber'); + + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/009_matviews.pl b/src/test/subscription/t/009_matviews.pl new file mode 100644 index 0000000..92c7d18 --- /dev/null +++ b/src/test/subscription/t/009_matviews.pl @@ -0,0 +1,52 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test materialized views behavior +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 1; + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION mypub FOR ALL TABLES;"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" +); + +$node_publisher->safe_psql('postgres', + q{CREATE TABLE test1 (a int PRIMARY KEY, b text)}); +$node_publisher->safe_psql('postgres', + q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');}); + +$node_subscriber->safe_psql('postgres', + q{CREATE TABLE test1 (a int PRIMARY KEY, b text);}); + +$node_publisher->wait_for_catchup('mysub'); + +# Materialized views are not supported by logical replication, but +# logical decoding does produce change information for them, so we +# need to make sure they are properly ignored. (bug #15044) + +# create a MV with some data +$node_publisher->safe_psql('postgres', + q{CREATE MATERIALIZED VIEW testmv1 AS SELECT * FROM test1;}); +$node_publisher->wait_for_catchup('mysub'); + +# There is no equivalent relation on the subscriber, but MV data is +# not replicated, so this does not hang. + +pass "materialized view data not replicated"; + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl new file mode 100644 index 0000000..5617469 --- /dev/null +++ b/src/test/subscription/t/010_truncate.pl @@ -0,0 +1,241 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test TRUNCATE +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 14; + +# setup + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + qq(max_logical_replication_workers = 6)); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)"); + +$node_subscriber->safe_psql('postgres', + "CREATE SEQUENCE seq1 OWNED BY tab1.a"); +$node_subscriber->safe_psql('postgres', "ALTER SEQUENCE seq1 START 101"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1 FOR TABLE tab1"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub2 FOR TABLE tab2 WITH (publish = insert)"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub3 FOR TABLE tab3, tab4"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" +); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2" +); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3" +); + +# Wait for initial sync of all subscriptions +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# insert data to truncate + +$node_subscriber->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (2), (3)"); + +$node_publisher->wait_for_catchup('sub1'); + +# truncate and check + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1"); + +$node_publisher->wait_for_catchup('sub1'); + +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'truncate replicated'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT nextval('seq1')"); +is($result, qq(1), 'sequence not restarted'); + +# truncate with restart identity + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1 RESTART IDENTITY"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT nextval('seq1')"); +is($result, qq(101), 'truncate restarted identities'); + +# test publication that does not replicate truncate + +$node_subscriber->safe_psql('postgres', + "INSERT INTO tab2 VALUES (1), (2), (3)"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(3|1|3), 'truncate not replicated'); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub2 SET (publish = 'insert, truncate')"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(0||), 'truncate replicated after publication change'); + +# test multiple tables connected by foreign keys + +$node_subscriber->safe_psql('postgres', + "INSERT INTO tab3 VALUES (1), (2), (3)"); +$node_subscriber->safe_psql('postgres', + "INSERT INTO tab4 VALUES (11, 1), (111, 1), (22, 2)"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab3, tab4"); + +$node_publisher->wait_for_catchup('sub3'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab3"); +is($result, qq(0||), 'truncate of multiple tables replicated'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(x), max(x) FROM tab4"); +is($result, qq(0||), 'truncate of multiple tables replicated'); + +# test truncate of multiple tables, some of which are not published + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub2"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION pub2"); + +$node_subscriber->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (2), (3)"); +$node_subscriber->safe_psql('postgres', + "INSERT INTO tab2 VALUES (1), (2), (3)"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1, tab2"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'truncate of multiple tables some not published'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(3|1|3), 'truncate of multiple tables some not published'); + +# Test that truncate works for synchronous logical replication + +$node_publisher->safe_psql('postgres', + "ALTER SYSTEM SET synchronous_standby_names TO 'sub1'"); +$node_publisher->safe_psql('postgres', "SELECT pg_reload_conf()"); + +# insert data to truncate + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (2), (3)"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(3|1|3), 'check synchronous logical replication'); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), + 'truncate replicated in synchronous logical replication'); + +$node_publisher->safe_psql('postgres', + "ALTER SYSTEM RESET synchronous_standby_names"); +$node_publisher->safe_psql('postgres', "SELECT pg_reload_conf()"); + +# test that truncate works for logical replication when there are multiple +# subscriptions for a single table + +$node_publisher->safe_psql('postgres', "CREATE TABLE tab5 (a int)"); + +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab5 (a int)"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub5 FOR TABLE tab5"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub5_1 CONNECTION '$publisher_connstr' PUBLICATION pub5" +); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub5_2 CONNECTION '$publisher_connstr' PUBLICATION pub5" +); + +# wait for initial data sync +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# insert data to truncate + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab5 VALUES (1), (2), (3)"); + +$node_publisher->wait_for_catchup('sub5_1'); +$node_publisher->wait_for_catchup('sub5_2'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab5"); +is($result, qq(6|1|3), 'insert replicated for multiple subscriptions'); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab5"); + +$node_publisher->wait_for_catchup('sub5_1'); +$node_publisher->wait_for_catchup('sub5_2'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab5"); +is($result, qq(0||), 'truncate replicated for multiple subscriptions'); + +# check deadlocks +$result = $node_subscriber->safe_psql('postgres', + "SELECT deadlocks FROM pg_stat_database WHERE datname='postgres'"); +is($result, qq(0), 'no deadlocks detected'); diff --git a/src/test/subscription/t/011_generated.pl b/src/test/subscription/t/011_generated.pl new file mode 100644 index 0000000..29108cb --- /dev/null +++ b/src/test/subscription/t/011_generated.pl @@ -0,0 +1,66 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test generated columns +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +# setup + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED)" +); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 22) STORED)" +); + +# data for initial sync + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 (a) VALUES (1), (2), (3)"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1 FOR ALL TABLES"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" +); + +# Wait for initial sync of all subscriptions +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1"); +is( $result, qq(1|22 +2|44 +3|66), 'generated columns initial sync'); + +# data to replicate + +$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (4), (5)"); + +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1"); +is( $result, qq(1|22 +2|44 +3|66 +4|88 +6|132), 'generated columns replicated'); diff --git a/src/test/subscription/t/012_collation.pl b/src/test/subscription/t/012_collation.pl new file mode 100644 index 0000000..8137a16 --- /dev/null +++ b/src/test/subscription/t/012_collation.pl @@ -0,0 +1,110 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test collations, in particular nondeterministic ones +# (only works with ICU) +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More; + +if ($ENV{with_icu} eq 'yes') +{ + plan tests => 2; +} +else +{ + plan skip_all => 'ICU not supported by this build'; +} + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init( + allows_streaming => 'logical', + extra => [ '--locale=C', '--encoding=UTF8' ]); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init( + allows_streaming => 'logical', + extra => [ '--locale=C', '--encoding=UTF8' ]); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +# Test plan: Create a table with a nondeterministic collation in the +# primary key column. Pre-insert rows on the publisher and subscriber +# that are collation-wise equal but byte-wise different. (We use a +# string in different normal forms for that.) Set up publisher and +# subscriber. Update the row on the publisher, but don't change the +# primary key column. The subscriber needs to find the row to be +# updated using the nondeterministic collation semantics. We need to +# test for both a replica identity index and for replica identity +# full, since those have different code paths internally. + +$node_subscriber->safe_psql('postgres', + q{CREATE COLLATION ctest_nondet (provider = icu, locale = 'und', deterministic = false)} +); + +# table with replica identity index + +$node_publisher->safe_psql('postgres', + q{CREATE TABLE tab1 (a text PRIMARY KEY, b text)}); + +$node_publisher->safe_psql('postgres', + q{INSERT INTO tab1 VALUES (U&'\00E4bc', 'foo')}); + +$node_subscriber->safe_psql('postgres', + q{CREATE TABLE tab1 (a text COLLATE ctest_nondet PRIMARY KEY, b text)}); + +$node_subscriber->safe_psql('postgres', + q{INSERT INTO tab1 VALUES (U&'\0061\0308bc', 'foo')}); + +# table with replica identity full + +$node_publisher->safe_psql('postgres', q{CREATE TABLE tab2 (a text, b text)}); +$node_publisher->safe_psql('postgres', + q{ALTER TABLE tab2 REPLICA IDENTITY FULL}); + +$node_publisher->safe_psql('postgres', + q{INSERT INTO tab2 VALUES (U&'\00E4bc', 'foo')}); + +$node_subscriber->safe_psql('postgres', + q{CREATE TABLE tab2 (a text COLLATE ctest_nondet, b text)}); +$node_subscriber->safe_psql('postgres', + q{ALTER TABLE tab2 REPLICA IDENTITY FULL}); + +$node_subscriber->safe_psql('postgres', + q{INSERT INTO tab2 VALUES (U&'\0061\0308bc', 'foo')}); + +# set up publication, subscription + +$node_publisher->safe_psql('postgres', + q{CREATE PUBLICATION pub1 FOR ALL TABLES}); + +$node_subscriber->safe_psql('postgres', + qq{CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (copy_data = false)} +); + +$node_publisher->wait_for_catchup('sub1'); + +# test with replica identity index + +$node_publisher->safe_psql('postgres', + q{UPDATE tab1 SET b = 'bar' WHERE b = 'foo'}); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres', q{SELECT b FROM tab1}), + qq(bar), 'update with primary key with nondeterministic collation'); + +# test with replica identity full + +$node_publisher->safe_psql('postgres', + q{UPDATE tab2 SET b = 'bar' WHERE b = 'foo'}); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres', q{SELECT b FROM tab2}), + qq(bar), + 'update with replica identity full with nondeterministic collation'); diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl new file mode 100644 index 0000000..dfe2cb6 --- /dev/null +++ b/src/test/subscription/t/013_partition.pl @@ -0,0 +1,872 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test logical replication with partitioned tables +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 71; + +# setup + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber1 = get_new_node('subscriber1'); +$node_subscriber1->init(allows_streaming => 'logical'); +$node_subscriber1->start; + +my $node_subscriber2 = get_new_node('subscriber2'); +$node_subscriber2->init(allows_streaming => 'logical'); +$node_subscriber2->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +# publisher +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub_all FOR ALL TABLES"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1_1 (b text, a int NOT NULL)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (4, 5, 6)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1_def PARTITION OF tab1 DEFAULT"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub1 ADD TABLE tab1, tab1_1"); + +# subscriber1 +# +# This is partitioned differently from the publisher. tab1_2 is +# subpartitioned. This tests the tuple routing code on the +# subscriber. +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1 (c text, a int PRIMARY KEY, b text) PARTITION BY LIST (a)" +); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)" +); +$node_subscriber1->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (4, 5, 6) PARTITION BY LIST (a)" +); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2_1 (c text, b text, a int NOT NULL)"); +$node_subscriber1->safe_psql('postgres', + "ALTER TABLE tab1_2 ATTACH PARTITION tab1_2_1 FOR VALUES IN (5)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2_2 PARTITION OF tab1_2 FOR VALUES IN (4, 6)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_def PARTITION OF tab1 (c DEFAULT 'sub1_tab1') DEFAULT" +); +$node_subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" +); + +# Add set of AFTER replica triggers for testing that they are fired +# correctly. This uses a table that records details of all trigger +# activities. Triggers are marked as enabled for a subset of the +# partition tree. +$node_subscriber1->safe_psql( + 'postgres', qq{ +CREATE TABLE sub1_trigger_activity (tgtab text, tgop text, + tgwhen text, tglevel text, olda int, newa int); +CREATE FUNCTION sub1_trigger_activity_func() RETURNS TRIGGER AS \$\$ +BEGIN + IF (TG_OP = 'INSERT') THEN + INSERT INTO public.sub1_trigger_activity + SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, NULL, NEW.a; + ELSIF (TG_OP = 'UPDATE') THEN + INSERT INTO public.sub1_trigger_activity + SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, OLD.a, NEW.a; + END IF; + RETURN NULL; +END; +\$\$ LANGUAGE plpgsql; +CREATE TRIGGER sub1_tab1_log_op_trigger + AFTER INSERT OR UPDATE ON tab1 + FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func(); +ALTER TABLE ONLY tab1 ENABLE REPLICA TRIGGER sub1_tab1_log_op_trigger; +CREATE TRIGGER sub1_tab1_2_log_op_trigger + AFTER INSERT OR UPDATE ON tab1_2 + FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func(); +ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub1_tab1_2_log_op_trigger; +CREATE TRIGGER sub1_tab1_2_2_log_op_trigger + AFTER INSERT OR UPDATE ON tab1_2_2 + FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func(); +ALTER TABLE ONLY tab1_2_2 ENABLE REPLICA TRIGGER sub1_tab1_2_2_log_op_trigger; +}); + +# subscriber 2 +# +# This does not use partitioning. The tables match the leaf tables on +# the publisher. +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_1', b text)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_def (a int PRIMARY KEY, b text, c text DEFAULT 'sub2_tab1_def')" +); +$node_subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub_all" +); + +# Add set of AFTER replica triggers for testing that they are fired +# correctly, using the same method as the first subscriber. +$node_subscriber2->safe_psql( + 'postgres', qq{ +CREATE TABLE sub2_trigger_activity (tgtab text, + tgop text, tgwhen text, tglevel text, olda int, newa int); +CREATE FUNCTION sub2_trigger_activity_func() RETURNS TRIGGER AS \$\$ +BEGIN + IF (TG_OP = 'INSERT') THEN + INSERT INTO public.sub2_trigger_activity + SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, NULL, NEW.a; + ELSIF (TG_OP = 'UPDATE') THEN + INSERT INTO public.sub2_trigger_activity + SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, OLD.a, NEW.a; + END IF; + RETURN NULL; +END; +\$\$ LANGUAGE plpgsql; +CREATE TRIGGER sub2_tab1_log_op_trigger + AFTER INSERT OR UPDATE ON tab1 + FOR EACH ROW EXECUTE PROCEDURE sub2_trigger_activity_func(); +ALTER TABLE ONLY tab1 ENABLE REPLICA TRIGGER sub2_tab1_log_op_trigger; +CREATE TRIGGER sub2_tab1_2_log_op_trigger + AFTER INSERT OR UPDATE ON tab1_2 + FOR EACH ROW EXECUTE PROCEDURE sub2_trigger_activity_func(); +ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub2_tab1_2_log_op_trigger; +}); + +# Wait for initial sync of all subscriptions +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber1->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Tests for replication using leaf partition identity and schema + +# insert +$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab1_1 (a) VALUES (3)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab1_2 VALUES (5)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (0)"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +my $result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is( $result, qq(sub1_tab1|0 +sub1_tab1|1 +sub1_tab1|3 +sub1_tab1|5), 'inserts into tab1 and its partitions replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab1_2_1 ORDER BY 1"); +is($result, qq(5), 'inserts into tab1_2 replicated into tab1_2_1 correctly'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab1_2_2 ORDER BY 1"); +is($result, qq(), 'inserts into tab1_2 replicated into tab1_2_2 correctly'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_1 ORDER BY 1, 2"); +is( $result, qq(sub2_tab1_1|1 +sub2_tab1_1|3), 'inserts into tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_2 ORDER BY 1, 2"); +is($result, qq(sub2_tab1_2|5), 'inserts into tab1_2 replicated'); + +# The AFTER trigger of tab1_2 should have recorded one INSERT. +$result = $node_subscriber2->safe_psql('postgres', + "SELECT * FROM sub2_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;" +); +is( $result, + qq(tab1_2|INSERT|AFTER|ROW||5), + 'check replica insert after trigger applied on subscriber'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_def ORDER BY 1, 2"); +is($result, qq(sub2_tab1_def|0), 'inserts into tab1_def replicated'); + +# update (replicated as update) +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 2 WHERE a = 1"); +# All of the following cause an update to be applied to a partitioned +# table on subscriber1: tab1_2 is leaf partition on publisher, whereas +# it's sub-partitioned on subscriber1. +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5"); +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 4 WHERE a = 6"); +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 4"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is( $result, qq(sub1_tab1|0 +sub1_tab1|2 +sub1_tab1|3 +sub1_tab1|6), 'update of tab1_1, tab1_2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab1_2_1 ORDER BY 1"); +is($result, qq(), 'updates of tab1_2 replicated into tab1_2_1 correctly'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab1_2_2 ORDER BY 1"); +is($result, qq(6), 'updates of tab1_2 replicated into tab1_2_2 correctly'); + +# The AFTER trigger should have recorded the UPDATEs of tab1_2_2. +$result = $node_subscriber1->safe_psql('postgres', + "SELECT * FROM sub1_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;" +); +is( $result, qq(tab1_2_2|INSERT|AFTER|ROW||6 +tab1_2_2|UPDATE|AFTER|ROW|4|6 +tab1_2_2|UPDATE|AFTER|ROW|6|4), + 'check replica update after trigger applied on subscriber'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_1 ORDER BY 1, 2"); +is( $result, qq(sub2_tab1_1|2 +sub2_tab1_1|3), 'update of tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_2 ORDER BY 1, 2"); +is($result, qq(sub2_tab1_2|6), 'tab1_2 updated'); + +# The AFTER trigger should have recorded the updates of tab1_2. +$result = $node_subscriber2->safe_psql('postgres', + "SELECT * FROM sub2_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;" +); +is( $result, qq(tab1_2|INSERT|AFTER|ROW||5 +tab1_2|UPDATE|AFTER|ROW|4|6 +tab1_2|UPDATE|AFTER|ROW|5|6 +tab1_2|UPDATE|AFTER|ROW|6|4), + 'check replica update after trigger applied on subscriber'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_def ORDER BY 1"); +is($result, qq(sub2_tab1_def|0), 'tab1_def unchanged'); + +# update (replicated as delete+insert) +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 1 WHERE a = 0"); +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 4 WHERE a = 1"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is( $result, qq(sub1_tab1|2 +sub1_tab1|3 +sub1_tab1|4 +sub1_tab1|6), + 'update of tab1 (delete from tab1_def + insert into tab1_1) replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab1_2_2 ORDER BY 1"); +is( $result, qq(4 +6), 'updates of tab1 (delete + insert) replicated into tab1_2_2 correctly'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_1 ORDER BY 1, 2"); +is( $result, qq(sub2_tab1_1|2 +sub2_tab1_1|3), 'tab1_1 unchanged'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_2 ORDER BY 1, 2"); +is( $result, qq(sub2_tab1_2|4 +sub2_tab1_2|6), 'insert into tab1_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab1_def ORDER BY 1"); +is($result, qq(), 'delete from tab1_def replicated'); + +# delete +$node_publisher->safe_psql('postgres', + "DELETE FROM tab1 WHERE a IN (2, 3, 5)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab1_2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab1"); +is($result, qq(), 'delete from tab1_1, tab1_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1_1"); +is($result, qq(), 'delete from tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1_2"); +is($result, qq(), 'delete from tab1_2 replicated'); + +# truncate +$node_subscriber1->safe_psql('postgres', + "INSERT INTO tab1 (a) VALUES (1), (2), (5)"); +$node_subscriber2->safe_psql('postgres', "INSERT INTO tab1_2 (a) VALUES (2)"); +$node_publisher->safe_psql('postgres', "TRUNCATE tab1_2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = + $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1"); +is( $result, qq(1 +2), 'truncate of tab1_2 replicated'); + +$result = + $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1_2 ORDER BY 1"); +is($result, qq(), 'truncate of tab1_2 replicated'); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = + $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1"); +is($result, qq(), 'truncate of tab1_1 replicated'); +$result = + $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1"); +is($result, qq(), 'truncate of tab1 replicated'); + +# Check that subscriber handles cases where update/delete target tuple +# is missing. We have to look for the DEBUG1 log messages about that, +# so temporarily bump up the log verbosity. +$node_subscriber1->append_conf('postgresql.conf', + "log_min_messages = debug1"); +$node_subscriber1->reload; + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1, 'foo'), (4, 'bar'), (10, 'baz')"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$node_subscriber1->safe_psql('postgres', "DELETE FROM tab1"); + +# Note that the current location of the log file is not grabbed immediately +# after reloading the configuration, but after sending one SQL command to +# the node so as we are sure that the reloading has taken effect. +my $log_location = -s $node_subscriber1->logfile; + +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET b = 'quux' WHERE a = 4"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab1"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +my $logfile = slurp_file($node_subscriber1->logfile(), $log_location); +ok( $logfile =~ + qr/logical replication did not find row to be updated in replication target relation's partition "tab1_2_2"/, + 'update target row is missing in tab1_2_2'); +ok( $logfile =~ + qr/logical replication did not find row to be deleted in replication target relation "tab1_1"/, + 'delete target row is missing in tab1_1'); +ok( $logfile =~ + qr/logical replication did not find row to be deleted in replication target relation "tab1_2_2"/, + 'delete target row is missing in tab1_2_2'); +ok( $logfile =~ + qr/logical replication did not find row to be deleted in replication target relation "tab1_def"/, + 'delete target row is missing in tab1_def'); + +$node_subscriber1->append_conf('postgresql.conf', + "log_min_messages = warning"); +$node_subscriber1->reload; + +# Tests for replication using root table identity and schema + +# publisher +$node_publisher->safe_psql('postgres', "DROP PUBLICATION pub1"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2_1 (b text, a int NOT NULL)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES IN (0, 1, 2, 3)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2_2 PARTITION OF tab2 FOR VALUES IN (5, 6)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3_1 PARTITION OF tab3 FOR VALUES IN (0, 1, 2, 3, 5, 6)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab4 (a int PRIMARY KEY) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab4_1 PARTITION OF tab4 FOR VALUES IN (0, 1) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab4_1_1 PARTITION OF tab4_1 FOR VALUES IN (0, 1)"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)"); +# Note: tab3_1's parent is not in the publication, in which case its +# changes are published using own identity. For tab2, even though both parent +# and child tables are present but changes will be replicated via the parent's +# identity and only once. +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true)" +); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub_lower_level FOR TABLE tab4_1 WITH (publish_via_partition_root = true)" +); + +# prepare data for the initial sync +$node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (1)"); + +# subscriber 1 +$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION sub1"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub1_tab2', b text) PARTITION BY RANGE (a)" +); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab2_1 (c text DEFAULT 'sub1_tab2', b text, a int NOT NULL)" +); +$node_subscriber1->safe_psql('postgres', + "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES FROM (0) TO (10)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab3_1 (c text DEFAULT 'sub1_tab3_1', b text, a int NOT NULL PRIMARY KEY)" +); +$node_subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION sub_viaroot CONNECTION '$publisher_connstr' PUBLICATION pub_viaroot" +); + +# subscriber 2 +$node_subscriber2->safe_psql('postgres', "DROP TABLE tab1"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text) PARTITION BY HASH (a)" +); +# Note: tab1's partitions are named tab1_1 and tab1_2 on the publisher. +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_part1 (b text, c text, a int NOT NULL)"); +$node_subscriber2->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_part1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_part2 PARTITION OF tab1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab2', b text)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3', b text)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3_1', b text)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab4 (a int PRIMARY KEY)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab4_1 (a int PRIMARY KEY)" +); +# Publication that sub2 points to now publishes via root, so must update +# subscription target relations. +$node_subscriber2->safe_psql('postgres', + "ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all"); + +# Wait for initial sync of all subscriptions +$node_subscriber1->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# check that data is synced correctly +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab2"); +is( $result, qq(sub1_tab2|1), 'initial data synced for pub_viaroot'); + +# insert +$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (0)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab1_1 (a) VALUES (3)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab1_2 VALUES (5)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 VALUES (0), (3), (5)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab3 VALUES (1), (0), (3), (5)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab4 VALUES (0)"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is( $result, qq(sub1_tab2|0 +sub1_tab2|1 +sub1_tab2|3 +sub1_tab2|5), 'inserts into tab2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab3_1 ORDER BY 1, 2"); +is( $result, qq(sub1_tab3_1|0 +sub1_tab3_1|1 +sub1_tab3_1|3 +sub1_tab3_1|5), 'inserts into tab3_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is( $result, qq(sub2_tab1|0 +sub2_tab1|1 +sub2_tab1|3 +sub2_tab1|5), 'inserts into tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is( $result, qq(sub2_tab2|0 +sub2_tab2|1 +sub2_tab2|3 +sub2_tab2|5), 'inserts into tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab3 ORDER BY 1, 2"); +is( $result, qq(sub2_tab3|0 +sub2_tab3|1 +sub2_tab3|3 +sub2_tab3|5), 'inserts into tab3 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab4 ORDER BY 1"); +is( $result, qq(0), 'inserts into tab4 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab4_1 ORDER BY 1"); +is( $result, qq(), 'inserts into tab4_1 replicated'); + +# now switch the order of publications in the list, try again, the result +# should be the same (no dependence on order of pulications) +$node_subscriber2->safe_psql('postgres', + "ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_all, pub_lower_level"); + +# make sure the subscription on the second subscriber is synced, before +# continuing +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Insert a change into the leaf partition, should be replicated through +# the partition root (thanks to the FOR ALL TABLES partition). +$node_publisher->safe_psql('postgres', + "INSERT INTO tab4 VALUES (1)"); + +$node_publisher->wait_for_catchup('sub2'); + +# tab4 change should be replicated through the root partition, which +# maps to the tab4 relation on subscriber. +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab4 ORDER BY 1"); +is( $result, qq(0 +1), 'inserts into tab4 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab4_1 ORDER BY 1"); +is( $result, qq(), 'inserts into tab4_1 replicated'); + +# update (replicated as update) +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5"); +$node_publisher->safe_psql('postgres', "UPDATE tab2 SET a = 6 WHERE a = 5"); +$node_publisher->safe_psql('postgres', "UPDATE tab3 SET a = 6 WHERE a = 5"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is( $result, qq(sub1_tab2|0 +sub1_tab2|1 +sub1_tab2|3 +sub1_tab2|6), 'update of tab2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab3_1 ORDER BY 1, 2"); +is( $result, qq(sub1_tab3_1|0 +sub1_tab3_1|1 +sub1_tab3_1|3 +sub1_tab3_1|6), 'update of tab3_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is( $result, qq(sub2_tab1|0 +sub2_tab1|1 +sub2_tab1|3 +sub2_tab1|6), 'inserts into tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is( $result, qq(sub2_tab2|0 +sub2_tab2|1 +sub2_tab2|3 +sub2_tab2|6), 'inserts into tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab3 ORDER BY 1, 2"); +is( $result, qq(sub2_tab3|0 +sub2_tab3|1 +sub2_tab3|3 +sub2_tab3|6), 'inserts into tab3 replicated'); + +# update (replicated as delete+insert) +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 2 WHERE a = 6"); +$node_publisher->safe_psql('postgres', "UPDATE tab2 SET a = 2 WHERE a = 6"); +$node_publisher->safe_psql('postgres', "UPDATE tab3 SET a = 2 WHERE a = 6"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is( $result, qq(sub1_tab2|0 +sub1_tab2|1 +sub1_tab2|2 +sub1_tab2|3), 'update of tab2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab3_1 ORDER BY 1, 2"); +is( $result, qq(sub1_tab3_1|0 +sub1_tab3_1|1 +sub1_tab3_1|2 +sub1_tab3_1|3), 'update of tab3_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is( $result, qq(sub2_tab1|0 +sub2_tab1|1 +sub2_tab1|2 +sub2_tab1|3), 'update of tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is( $result, qq(sub2_tab2|0 +sub2_tab2|1 +sub2_tab2|2 +sub2_tab2|3), 'update of tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab3 ORDER BY 1, 2"); +is( $result, qq(sub2_tab3|0 +sub2_tab3|1 +sub2_tab3|2 +sub2_tab3|3), 'update of tab3 replicated'); + +# delete +$node_publisher->safe_psql('postgres', "DELETE FROM tab1"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab2"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab3"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab2"); +is($result, qq(), 'delete tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1"); +is($result, qq(), 'delete from tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab2"); +is($result, qq(), 'delete from tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab3"); +is($result, qq(), 'delete from tab3 replicated'); + +# truncate +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (2), (5)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 VALUES (1), (2), (5)"); +# these will NOT be replicated +$node_publisher->safe_psql('postgres', "TRUNCATE tab1_2, tab2_1, tab3_1"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = + $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab2 ORDER BY 1"); +is( $result, qq(1 +2 +5), 'truncate of tab2_1 NOT replicated'); + +$result = + $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1"); +is( $result, qq(1 +2 +5), 'truncate of tab1_2 NOT replicated'); + +$result = + $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab2 ORDER BY 1"); +is( $result, qq(1 +2 +5), 'truncate of tab2_1 NOT replicated'); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1, tab2, tab3"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab2"); +is($result, qq(), 'truncate of tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1"); +is($result, qq(), 'truncate of tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab2"); +is($result, qq(), 'truncate of tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab3"); +is($result, qq(), 'truncate of tab3 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab3_1"); +is($result, qq(), 'truncate of tab3_1 replicated'); + +# check that the map to convert tuples from leaf partition to the root +# table is correctly rebuilt when a new column is added +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab2 DROP b, ADD COLUMN c text DEFAULT 'pub_tab2', ADD b text" +); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 (a, b) VALUES (1, 'xxx'), (3, 'yyy'), (5, 'zzz')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 (a, b, c) VALUES (6, 'aaa', 'xxx_c')"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a, b FROM tab2 ORDER BY 1, 2"); +is( $result, qq(pub_tab2|1|xxx +pub_tab2|3|yyy +pub_tab2|5|zzz +xxx_c|6|aaa), 'inserts into tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a, b FROM tab2 ORDER BY 1, 2"); +is( $result, qq(pub_tab2|1|xxx +pub_tab2|3|yyy +pub_tab2|5|zzz +xxx_c|6|aaa), 'inserts into tab2 replicated'); + +# Check that subscriber handles cases where update/delete target tuple +# is missing. We have to look for the DEBUG1 log messages about that, +# so temporarily bump up the log verbosity. +$node_subscriber1->append_conf('postgresql.conf', + "log_min_messages = debug1"); +$node_subscriber1->reload; + +$node_subscriber1->safe_psql('postgres', "DELETE FROM tab2"); + +# Note that the current location of the log file is not grabbed immediately +# after reloading the configuration, but after sending one SQL command to +# the node so as we are sure that the reloading has taken effect. +$log_location = -s $node_subscriber1->logfile; + +$node_publisher->safe_psql('postgres', + "UPDATE tab2 SET b = 'quux' WHERE a = 5"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab2 WHERE a = 1"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$logfile = slurp_file($node_subscriber1->logfile(), $log_location); +ok( $logfile =~ + qr/logical replication did not find row to be updated in replication target relation's partition "tab2_1"/, + 'update target row is missing in tab2_1'); +ok( $logfile =~ + qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/, + 'delete target row is missing in tab2_1'); + +$node_subscriber1->append_conf('postgresql.conf', + "log_min_messages = warning"); +$node_subscriber1->reload; + +# Test that replication continues to work correctly after altering the +# partition of a partitioned target table. + +$node_publisher->safe_psql( + 'postgres', q{ + CREATE TABLE tab5 (a int NOT NULL, b int); + CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a); + ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;}); + +$node_subscriber2->safe_psql( + 'postgres', q{ + CREATE TABLE tab5 (a int NOT NULL, b int, c int) PARTITION BY LIST (a); + CREATE TABLE tab5_1 PARTITION OF tab5 DEFAULT; + CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a); + ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx; + ALTER TABLE tab5_1 REPLICA IDENTITY USING INDEX tab5_1_a_idx;}); + +$node_subscriber2->safe_psql('postgres', + "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION"); + +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Make partition map cache +$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)"); +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 2 WHERE a = 1"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b FROM tab5 ORDER BY 1"); +is($result, qq(2|1), 'updates of tab5 replicated correctly'); + +# Change the column order of partition on subscriber +$node_subscriber2->safe_psql( + 'postgres', q{ + ALTER TABLE tab5 DETACH PARTITION tab5_1; + ALTER TABLE tab5_1 DROP COLUMN b; + ALTER TABLE tab5_1 ADD COLUMN b int; + ALTER TABLE tab5 ATTACH PARTITION tab5_1 DEFAULT}); + +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 3 WHERE a = 2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b, c FROM tab5 ORDER BY 1"); +is($result, qq(3|1|), 'updates of tab5 replicated correctly after altering table on subscriber'); + +# Test that replication into the partitioned target table continues to +# work correctly when the published table is altered. +$node_publisher->safe_psql( + 'postgres', q{ + ALTER TABLE tab5 DROP COLUMN b, ADD COLUMN c INT; + ALTER TABLE tab5 ADD COLUMN b INT;}); + +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET c = 1 WHERE a = 3"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b, c FROM tab5 ORDER BY 1"); +is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on publisher'); + +# Test that replication works correctly as long as the leaf partition +# has the necessary REPLICA IDENTITY, even though the actual target +# partitioned table does not. +$node_subscriber2->safe_psql('postgres', + "ALTER TABLE tab5 REPLICA IDENTITY NOTHING"); + +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 4 WHERE a = 3"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b, c FROM tab5_1 ORDER BY 1"); +is($result, qq(4||1), 'updates of tab5 replicated correctly'); diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl new file mode 100644 index 0000000..7260378 --- /dev/null +++ b/src/test/subscription/t/014_binary.pl @@ -0,0 +1,137 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Binary mode logical replication test + +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 5; + +# Create and initialize a publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create and initialize subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create tables on both sides of the replication +my $ddl = qq( + CREATE TABLE public.test_numerical ( + a INTEGER PRIMARY KEY, + b NUMERIC, + c FLOAT, + d BIGINT + ); + CREATE TABLE public.test_arrays ( + a INTEGER[] PRIMARY KEY, + b NUMERIC[], + c TEXT[] + );); + +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +# Configure logical replication +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tpub FOR ALL TABLES"); + +my $publisher_connstring = $node_publisher->connstr . ' dbname=postgres'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' " + . "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)"); + +# Ensure nodes are in sync with each other +$node_publisher->wait_for_catchup('tsub'); +$node_subscriber->poll_query_until('postgres', + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');" +) or die "Timed out while waiting for subscriber to synchronize data"; + +# Insert some content and make sure it's replicated across +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO public.test_arrays (a, b, c) VALUES + ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'), + ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}'); + + INSERT INTO public.test_numerical (a, b, c, d) VALUES + (1, 1.2, 1.3, 10), + (2, 2.2, 2.3, 20), + (3, 3.2, 3.3, 30); + )); + +$node_publisher->wait_for_catchup('tsub'); + +my $result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c, d FROM test_numerical ORDER BY a"); + +is( $result, '1|1.2|1.3|10 +2|2.2|2.3|20 +3|3.2|3.3|30', 'check replicated data on subscriber'); + +# Test updates as well +$node_publisher->safe_psql( + 'postgres', qq( + UPDATE public.test_arrays SET b[1] = 42, c = NULL; + UPDATE public.test_numerical SET b = 42, c = NULL; + )); + +$node_publisher->wait_for_catchup('tsub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c FROM test_arrays ORDER BY a"); + +is( $result, '{1,2,3}|{42,1.2,1.3}| +{3,1,2}|{42,1.1,1.2}|', 'check updated replicated data on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c, d FROM test_numerical ORDER BY a"); + +is( $result, '1|42||10 +2|42||20 +3|42||30', 'check updated replicated data on subscriber'); + +# Test to reset back to text formatting, and then to binary again +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tsub SET (binary = false);"); + +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO public.test_numerical (a, b, c, d) VALUES + (4, 4.2, 4.3, 40); + )); + +$node_publisher->wait_for_catchup('tsub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c, d FROM test_numerical ORDER BY a"); + +is( $result, '1|42||10 +2|42||20 +3|42||30 +4|4.2|4.3|40', 'check replicated data on subscriber'); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tsub SET (binary = true);"); + +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO public.test_arrays (a, b, c) VALUES + ('{2,3,1}', '{1.2, 1.3, 1.1}', '{"two", "three", "one"}'); + )); + +$node_publisher->wait_for_catchup('tsub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c FROM test_arrays ORDER BY a"); + +is( $result, '{1,2,3}|{42,1.2,1.3}| +{2,3,1}|{1.2,1.3,1.1}|{two,three,one} +{3,1,2}|{42,1.1,1.2}|', 'check replicated data on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl new file mode 100644 index 0000000..998650a --- /dev/null +++ b/src/test/subscription/t/015_stream.pl @@ -0,0 +1,135 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test streaming of simple large transaction +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 4; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" +); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +# Interleave a pair of transactions, each exceeding the 64kB limit. +my $in = ''; +my $out = ''; + +my $timer = IPC::Run::timeout($TestLib::timeout_default); + +my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, + on_error_stop => 0); + +$in .= q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +}; +$h->pump_nb; + +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 9999) s(i); +DELETE FROM test_tab WHERE a > 5000; +COMMIT; +}); + +$in .= q{ +COMMIT; +\q +}; +$h->finish; # errors make the next test fail, so ignore them here + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'check extra columns contain local defaults'); + +# Test the streaming in binary mode +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET (binary = on)"); + +# Insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(6667|6667|6667), 'check extra columns contain local defaults'); + +# Change the local values of the extra columns on the subscriber, +# update publisher, and check that subscriber retains the expected +# values. This is to ensure that non-streaming transactions behave +# properly after a streaming transaction. +$node_subscriber->safe_psql('postgres', + "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'" +); +$node_publisher->safe_psql('postgres', + "UPDATE test_tab SET b = md5(a::text)"); + +$node_publisher->wait_for_catchup($appname); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab" +); +is($result, qq(6667|6667|6667), + 'check extra columns contain locally changed data'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/016_stream_subxact.pl b/src/test/subscription/t/016_stream_subxact.pl new file mode 100644 index 0000000..0245b06 --- /dev/null +++ b/src/test/subscription/t/016_stream_subxact.pl @@ -0,0 +1,93 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test streaming of large transaction containing large subtransactions +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" +); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +# Insert, update and delete enough rows to exceed 64kB limit. +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series( 3, 500) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501, 1000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001, 1500) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501, 2000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +SAVEPOINT s4; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(1667|1667|1667), + 'check data was copied to subscriber in streaming mode and extra columns contain local defaults' +); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/017_stream_ddl.pl b/src/test/subscription/t/017_stream_ddl.pl new file mode 100644 index 0000000..35b1468 --- /dev/null +++ b/src/test/subscription/t/017_stream_ddl.pl @@ -0,0 +1,129 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test streaming of large transaction with DDL and subtransactions +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT, f INT)" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" +); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|0|0), 'check initial data was copied to subscriber'); + +# a small (non-streamed) transaction with DDL and DML +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +INSERT INTO test_tab VALUES (3, md5(3::text)); +ALTER TABLE test_tab ADD COLUMN c INT; +SAVEPOINT s1; +INSERT INTO test_tab VALUES (4, md5(4::text), -4); +COMMIT; +}); + +# large (streamed) transaction with DDL and DML +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(5, 1000) s(i); +ALTER TABLE test_tab ADD COLUMN d INT; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001, 2000) s(i); +COMMIT; +}); + +# a small (non-streamed) transaction with DDL and DML +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +INSERT INTO test_tab VALUES (2001, md5(2001::text), -2001, 2*2001); +ALTER TABLE test_tab ADD COLUMN e INT; +SAVEPOINT s1; +INSERT INTO test_tab VALUES (2002, md5(2002::text), -2002, 2*2002, -3*2002); +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d), count(e) FROM test_tab"); +is($result, qq(2002|1999|1002|1), + 'check data was copied to subscriber in streaming mode and extra columns contain local defaults' +); + +# A large (streamed) transaction with DDL and DML. One of the DDL is performed +# after DML to ensure that we invalidate the schema sent for test_tab so that +# the next transaction has to send the schema again. +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(2003,5000) s(i); +ALTER TABLE test_tab ADD COLUMN f INT; +COMMIT; +}); + +# A small transaction that won't get streamed. This is just to ensure that we +# send the schema again to reflect the last column added in the previous test. +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i, 4*i FROM generate_series(5001,5005) s(i); +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d), count(e), count(f) FROM test_tab"); +is($result, qq(5005|5002|4005|3004|5), + 'check data was copied to subscriber for both streaming and non-streaming transactions' +); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl new file mode 100644 index 0000000..7fc60b5 --- /dev/null +++ b/src/test/subscription/t/018_stream_subxact_abort.pl @@ -0,0 +1,133 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test streaming of large transaction containing multiple subtransactions and rollbacks +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 4; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" +); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2|0), 'check initial data was copied to subscriber'); + +# large (streamed) transaction with DDL, DML and ROLLBACKs +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,1000) s(i); +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,1500) s(i); +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,2000) s(i); +ROLLBACK TO s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001,2500) s(i); +ROLLBACK TO s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2501,3000) s(i); +SAVEPOINT s4; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3001,3500) s(i); +SAVEPOINT s5; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3501,4000) s(i); +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2000|0), + 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults' +); + +# large (streamed) transaction with subscriber receiving out of order +# subtransaction ROLLBACKs +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(4001,4500) s(i); +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001,5500) s(i); +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6001,6500) s(i); +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(7001,7500) s(i); +RELEASE s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8001,8500) s(i); +ROLLBACK TO s1; +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2500|0), + 'check rollback to savepoint was reflected on subscriber'); + +# large (streamed) transaction with subscriber receiving rollback +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8501,9000) s(i); +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9001,9500) s(i); +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9501,10000) s(i); +ROLLBACK; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2500|0), 'check rollback was reflected on subscriber'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl new file mode 100644 index 0000000..81149b8 --- /dev/null +++ b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl @@ -0,0 +1,87 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Test streaming of large transaction with subtransactions, DDLs, DMLs, and +# rollbacks +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" +); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(2|0), 'check initial data was copied to subscriber'); + +# large (streamed) transaction with DDL, DML and ROLLBACKs +$node_publisher->safe_psql( + 'postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); +ALTER TABLE test_tab ADD COLUMN c INT; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i); +ALTER TABLE test_tab ADD COLUMN d INT; +SAVEPOINT s2; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i); +ALTER TABLE test_tab ADD COLUMN e INT; +SAVEPOINT s3; +INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i); +ALTER TABLE test_tab DROP COLUMN c; +ROLLBACK TO s1; +INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(501,1000) s(i); +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); +is($result, qq(1000|500), + 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults' +); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl new file mode 100644 index 0000000..0e218e0 --- /dev/null +++ b/src/test/subscription/t/020_messages.pl @@ -0,0 +1,148 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Tests that logical decoding messages +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 5; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'autovacuum = off'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_test (a int primary key)"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_test (a int primary key)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE tab_test"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" +); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Ensure a transactional logical decoding message shows up on the slot +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); + +# wait for the replication slot to become inactive in the publisher +$node_publisher->poll_query_until( + 'postgres', + "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'tap_sub' AND active='f'", + 1); + +$node_publisher->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')" +); + +my $result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') +)); + +# 66 77 67 == B M C == BEGIN MESSAGE COMMIT +is( $result, qq(66 +77 +67), + 'messages on slot are B M C with message option'); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape') + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') + OFFSET 1 LIMIT 1 +)); + +is($result, qq(1|pgoutput), + "flag transactional is set to 1 and prefix is pgoutput"); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0) + FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub') +)); + +# 66 67 == B C == BEGIN COMMIT +is( $result, qq(66 +67), + 'option messages defaults to false so message (M) is not available on slot' +); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)"); + +my $message_lsn = $node_publisher->safe_psql('postgres', + "SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')" +); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)"); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0), get_byte(data, 1) + FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') + WHERE lsn = '$message_lsn' AND xid = 0 +)); + +is($result, qq(77|0), 'non-transactional message on slot is M'); + +# Ensure a non-transactional logical decoding message shows up on the slot when +# it is emitted within an aborted transaction. The message won't emit until +# something advances the LSN, hence, we intentionally forces the server to +# switch to a new WAL file. +$node_publisher->safe_psql( + 'postgres', qq( + BEGIN; + SELECT pg_logical_emit_message(false, 'pgoutput', + 'a non-transactional message is available even if the transaction is aborted 1'); + INSERT INTO tab_test VALUES (3); + SELECT pg_logical_emit_message(true, 'pgoutput', + 'a transactional message is not available if the transaction is aborted'); + SELECT pg_logical_emit_message(false, 'pgoutput', + 'a non-transactional message is available even if the transaction is aborted 2'); + ROLLBACK; + SELECT pg_switch_wal(); +)); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0), get_byte(data, 1) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') +)); + +is( $result, qq(77|0 +77|0), + 'non-transactional message on slot from aborted transaction is M'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/test/subscription/t/021_alter_sub_pub.pl b/src/test/subscription/t/021_alter_sub_pub.pl new file mode 100644 index 0000000..104eddb --- /dev/null +++ b/src/test/subscription/t/021_alter_sub_pub.pl @@ -0,0 +1,98 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create table on publisher +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_1 (a int)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_1 SELECT generate_series(1,10)"); + +# Create table on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_1 (a int)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_1 FOR TABLE tab_1"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_2"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub_1, tap_pub_2" +); + +# Wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; + +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_publisher->wait_for_catchup('tap_sub'); + +# Check the initial data of tab_1 is copied to subscriber +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_1"); +is($result, qq(10|1|10), 'check initial data is copied to subscriber'); + +# Create a new table on publisher +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_2 (a int)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_2 SELECT generate_series(1,10)"); + +# Create a new table on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_2 (a int)"); + +# Add the table to publication +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_2 ADD TABLE tab_2"); + +# Dropping tap_pub_1 will refresh the entire publication list +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub DROP PUBLICATION tap_pub_1"); + +# Wait for initial table sync to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_publisher->wait_for_catchup('tap_sub'); + +# Check the initial data of tab_drop_refresh was copied to subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_2"); +is($result, qq(10|1|10), 'check initial data is copied to subscriber'); + +# Re-adding tap_pub_1 will refresh the entire publication list +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub ADD PUBLICATION tap_pub_1"); + +# Wait for initial table sync to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_publisher->wait_for_catchup('tap_sub'); + +# Check the initial data of tab_1 was copied to subscriber again +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_1"); +is($result, qq(20|1|10), 'check initial data is copied to subscriber'); + +# shutdown +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl new file mode 100644 index 0000000..424ffb7 --- /dev/null +++ b/src/test/subscription/t/100_bugs.pl @@ -0,0 +1,308 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Tests for various bugs found over time +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 7; + +# Bug #15114 + +# The bug was that determining which columns are part of the replica +# identity index using RelationGetIndexAttrBitmap() would run +# eval_const_expressions() on index expressions and predicates across +# all indexes of the table, which in turn might require a snapshot, +# but there wasn't one set, so it crashes. There were actually two +# separate bugs, one on the publisher and one on the subscriber. The +# fix was to avoid the constant expressions simplification in +# RelationGetIndexAttrBitmap(), so it's safe to call in more contexts. + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b int)"); + +$node_publisher->safe_psql('postgres', + "CREATE FUNCTION double(x int) RETURNS int IMMUTABLE LANGUAGE SQL AS 'select x * 2'" +); + +# an index with a predicate that lends itself to constant expressions +# evaluation +$node_publisher->safe_psql('postgres', + "CREATE INDEX ON tab1 (b) WHERE a > double(1)"); + +# and the same setup on the subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b int)"); + +$node_subscriber->safe_psql('postgres', + "CREATE FUNCTION double(x int) RETURNS int IMMUTABLE LANGUAGE SQL AS 'select x * 2'" +); + +$node_subscriber->safe_psql('postgres', + "CREATE INDEX ON tab1 (b) WHERE a > double(1)"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1 FOR ALL TABLES"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" +); + +$node_publisher->wait_for_catchup('sub1'); + +# This would crash, first on the publisher, and then (if the publisher +# is fixed) on the subscriber. +$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1, 2)"); + +$node_publisher->wait_for_catchup('sub1'); + +pass('index predicates do not cause crash'); + +$node_publisher->stop('fast'); +$node_subscriber->stop('fast'); + + +# Handling of temporary and unlogged tables with FOR ALL TABLES publications + +# If a FOR ALL TABLES publication exists, temporary and unlogged +# tables are ignored for publishing changes. The bug was that we +# would still check in that case that such a table has a replica +# identity set before accepting updates. If it did not it would cause +# an error when an update was attempted. + +$node_publisher = get_new_node('publisher2'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub FOR ALL TABLES"); + +is( $node_publisher->psql( + 'postgres', + "CREATE TEMPORARY TABLE tt1 AS SELECT 1 AS a; UPDATE tt1 SET a = 2;"), + 0, + 'update to temporary table without replica identity with FOR ALL TABLES publication' +); + +is( $node_publisher->psql( + 'postgres', + "CREATE UNLOGGED TABLE tu1 AS SELECT 1 AS a; UPDATE tu1 SET a = 2;"), + 0, + 'update to unlogged table without replica identity with FOR ALL TABLES publication' +); + +$node_publisher->stop('fast'); + +# Bug #16643 - https://postgr.es/m/16643-eaadeb2a1a58d28c@postgresql.org +# +# Initial sync doesn't complete; the protocol was not being followed per +# expectations after commit 07082b08cc5d. +my $node_twoways = get_new_node('twoways'); +$node_twoways->init(allows_streaming => 'logical'); +$node_twoways->start; +for my $db (qw(d1 d2)) +{ + $node_twoways->safe_psql('postgres', "CREATE DATABASE $db"); + $node_twoways->safe_psql($db, "CREATE TABLE t (f int)"); + $node_twoways->safe_psql($db, "CREATE TABLE t2 (f int)"); +} + +my $rows = 3000; +$node_twoways->safe_psql( + 'd1', qq{ + INSERT INTO t SELECT * FROM generate_series(1, $rows); + INSERT INTO t2 SELECT * FROM generate_series(1, $rows); + CREATE PUBLICATION testpub FOR TABLE t; + SELECT pg_create_logical_replication_slot('testslot', 'pgoutput'); + }); + +$node_twoways->safe_psql('d2', + "CREATE SUBSCRIPTION testsub CONNECTION \$\$" + . $node_twoways->connstr('d1') + . "\$\$ PUBLICATION testpub WITH (create_slot=false, " + . "slot_name='testslot')"); +$node_twoways->safe_psql( + 'd1', qq{ + INSERT INTO t SELECT * FROM generate_series(1, $rows); + INSERT INTO t2 SELECT * FROM generate_series(1, $rows); + }); +$node_twoways->safe_psql('d1', 'ALTER PUBLICATION testpub ADD TABLE t2'); +$node_twoways->safe_psql('d2', + 'ALTER SUBSCRIPTION testsub REFRESH PUBLICATION'); + +# We cannot rely solely on wait_for_catchup() here; it isn't sufficient +# when tablesync workers might still be running. So in addition to that, +# verify that tables are synced. +# XXX maybe this should be integrated in wait_for_catchup() itself. +$node_twoways->wait_for_catchup('testsub'); +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_twoways->poll_query_until('d2', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"), + $rows * 2, "2x$rows rows in t"); +is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t2"), + $rows * 2, "2x$rows rows in t2"); + +# Verify table data is synced with cascaded replication setup. This is mainly +# to test whether the data written by tablesync worker gets replicated. +my $node_pub = get_new_node('testpublisher1'); +$node_pub->init(allows_streaming => 'logical'); +$node_pub->start; + +my $node_pub_sub = get_new_node('testpublisher_subscriber'); +$node_pub_sub->init(allows_streaming => 'logical'); +$node_pub_sub->start; + +my $node_sub = get_new_node('testsubscriber1'); +$node_sub->init(allows_streaming => 'logical'); +$node_sub->start; + +# Create the tables in all nodes. +$node_pub->safe_psql('postgres', "CREATE TABLE tab1 (a int)"); +$node_pub_sub->safe_psql('postgres', "CREATE TABLE tab1 (a int)"); +$node_sub->safe_psql('postgres', "CREATE TABLE tab1 (a int)"); + +# Create a cascaded replication setup like: +# N1 - Create publication testpub1. +# N2 - Create publication testpub2 and also include subscriber which subscribes +# to testpub1. +# N3 - Create subscription testsub2 subscribes to testpub2. +# +# Note that subscription on N3 needs to be created before subscription on N2 to +# test whether the data written by tablesync worker of N2 gets replicated. +$node_pub->safe_psql('postgres', + "CREATE PUBLICATION testpub1 FOR TABLE tab1"); + +$node_pub_sub->safe_psql('postgres', + "CREATE PUBLICATION testpub2 FOR TABLE tab1"); + +my $publisher1_connstr = $node_pub->connstr . ' dbname=postgres'; +my $publisher2_connstr = $node_pub_sub->connstr . ' dbname=postgres'; + +$node_sub->safe_psql('postgres', + "CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher2_connstr' PUBLICATION testpub2" +); + +$node_pub_sub->safe_psql('postgres', + "CREATE SUBSCRIPTION testsub1 CONNECTION '$publisher1_connstr' PUBLICATION testpub1" +); + +$node_pub->safe_psql('postgres', + "INSERT INTO tab1 values(generate_series(1,10))"); + +# Verify that the data is cascaded from testpub1 to testsub1 and further from +# testpub2 (which had testsub1) to testsub2. +$node_pub->wait_for_catchup('testsub1'); +$node_pub_sub->wait_for_catchup('testsub2'); + +# Drop subscriptions as we don't need them anymore +$node_pub_sub->safe_psql('postgres', "DROP SUBSCRIPTION testsub1"); +$node_sub->safe_psql('postgres', "DROP SUBSCRIPTION testsub2"); + +# Drop publications as we don't need them anymore +$node_pub->safe_psql('postgres', "DROP PUBLICATION testpub1"); +$node_pub_sub->safe_psql('postgres', "DROP PUBLICATION testpub2"); + +# Clean up the tables on both publisher and subscriber as we don't need them +$node_pub->safe_psql('postgres', "DROP TABLE tab1"); +$node_pub_sub->safe_psql('postgres', "DROP TABLE tab1"); +$node_sub->safe_psql('postgres', "DROP TABLE tab1"); + +$node_pub->stop('fast'); +$node_pub_sub->stop('fast'); +$node_sub->stop('fast'); + +# https://postgr.es/m/OS0PR01MB61133CA11630DAE45BC6AD95FB939%40OS0PR01MB6113.jpnprd01.prod.outlook.com + +# The bug was that when changing the REPLICA IDENTITY INDEX to another one, the +# target table's relcache was not being invalidated. This leads to skipping +# UPDATE/DELETE operations during apply on the subscriber side as the columns +# required to search corresponding rows won't get logged. +$node_publisher = get_new_node('publisher3'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +$node_subscriber = get_new_node('subscriber3'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_replidentity_index(a int not null, b int not null)"); +$node_publisher->safe_psql('postgres', + "CREATE UNIQUE INDEX idx_replidentity_index_a ON tab_replidentity_index(a)" +); +$node_publisher->safe_psql('postgres', + "CREATE UNIQUE INDEX idx_replidentity_index_b ON tab_replidentity_index(b)" +); + +# use index idx_replidentity_index_a as REPLICA IDENTITY on publisher. +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_replidentity_index REPLICA IDENTITY USING INDEX idx_replidentity_index_a" +); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_replidentity_index VALUES(1, 1),(2, 2)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_replidentity_index(a int not null, b int not null)"); +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX idx_replidentity_index_a ON tab_replidentity_index(a)" +); +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX idx_replidentity_index_b ON tab_replidentity_index(b)" +); +# use index idx_replidentity_index_b as REPLICA IDENTITY on subscriber because +# it reflects the future scenario we are testing: changing REPLICA IDENTITY +# INDEX. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_replidentity_index REPLICA IDENTITY USING INDEX idx_replidentity_index_b" +); + +$publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE tab_replidentity_index"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" +); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Also wait for initial table sync to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +is( $node_subscriber->safe_psql( + 'postgres', "SELECT * FROM tab_replidentity_index"), + qq(1|1 +2|2), + "check initial data on subscriber"); + +# Set REPLICA IDENTITY to idx_replidentity_index_b on publisher, then run UPDATE and DELETE. +$node_publisher->safe_psql( + 'postgres', qq[ + ALTER TABLE tab_replidentity_index REPLICA IDENTITY USING INDEX idx_replidentity_index_b; + UPDATE tab_replidentity_index SET a = -a WHERE a = 1; + DELETE FROM tab_replidentity_index WHERE a = 2; +]); + +$node_publisher->wait_for_catchup('tap_sub'); +is( $node_subscriber->safe_psql( + 'postgres', "SELECT * FROM tab_replidentity_index"), + qq(-1|1), + "update works with REPLICA IDENTITY"); + +$node_publisher->stop('fast'); +$node_subscriber->stop('fast'); |