diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:19:15 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:19:15 +0000 |
commit | 6eb9c5a5657d1fe77b55cc261450f3538d35a94d (patch) | |
tree | 657d8194422a5daccecfd42d654b8a245ef7b4c8 /src/test/subscription | |
parent | Initial commit. (diff) | |
download | postgresql-13-6eb9c5a5657d1fe77b55cc261450f3538d35a94d.tar.xz postgresql-13-6eb9c5a5657d1fe77b55cc261450f3538d35a94d.zip |
Adding upstream version 13.4.upstream/13.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/subscription')
-rw-r--r-- | src/test/subscription/.gitignore | 2 | ||||
-rw-r--r-- | src/test/subscription/Makefile | 27 | ||||
-rw-r--r-- | src/test/subscription/README | 23 | ||||
-rw-r--r-- | src/test/subscription/t/001_rep_changes.pl | 415 | ||||
-rw-r--r-- | src/test/subscription/t/002_types.pl | 565 | ||||
-rw-r--r-- | src/test/subscription/t/003_constraints.pl | 136 | ||||
-rw-r--r-- | src/test/subscription/t/004_sync.pl | 155 | ||||
-rw-r--r-- | src/test/subscription/t/005_encoding.pl | 52 | ||||
-rw-r--r-- | src/test/subscription/t/006_rewrite.pl | 65 | ||||
-rw-r--r-- | src/test/subscription/t/007_ddl.pl | 42 | ||||
-rw-r--r-- | src/test/subscription/t/008_diff_schema.pl | 125 | ||||
-rw-r--r-- | src/test/subscription/t/009_matviews.pl | 49 | ||||
-rw-r--r-- | src/test/subscription/t/010_truncate.pl | 211 | ||||
-rw-r--r-- | src/test/subscription/t/011_generated.pl | 63 | ||||
-rw-r--r-- | src/test/subscription/t/012_collation.pl | 107 | ||||
-rw-r--r-- | src/test/subscription/t/013_partition.pl | 728 | ||||
-rw-r--r-- | src/test/subscription/t/100_bugs.pl | 155 |
17 files changed, 2920 insertions, 0 deletions
diff --git a/src/test/subscription/.gitignore b/src/test/subscription/.gitignore new file mode 100644 index 0000000..871e943 --- /dev/null +++ b/src/test/subscription/.gitignore @@ -0,0 +1,2 @@ +# Generated by test suite +/tmp_check/ diff --git a/src/test/subscription/Makefile b/src/test/subscription/Makefile new file mode 100644 index 0000000..3ac06aa --- /dev/null +++ b/src/test/subscription/Makefile @@ -0,0 +1,27 @@ +#------------------------------------------------------------------------- +# +# Makefile for src/test/subscription +# +# Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group +# Portions Copyright (c) 1994, Regents of the University of California +# +# src/test/subscription/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/test/subscription +top_builddir = ../../.. +include $(top_builddir)/src/Makefile.global + +EXTRA_INSTALL = contrib/hstore + +export with_icu + +check: + $(prove_check) + +installcheck: + $(prove_installcheck) + +clean distclean maintainer-clean: + rm -rf tmp_check diff --git a/src/test/subscription/README b/src/test/subscription/README new file mode 100644 index 0000000..fb5382e --- /dev/null +++ b/src/test/subscription/README @@ -0,0 +1,23 @@ +src/test/subscription/README + +Regression tests for subscription/logical replication +===================================================== + +This directory contains a test suite for subscription/logical replication. + +Running the tests +================= + +NOTE: You must have given the --enable-tap-tests argument to configure. + +Run + make check +or + make installcheck +You can use "make installcheck" if you previously did "make install" +(including installing the hstore extension). In that case, the code +in the installation tree is tested. With "make check", a temporary +installation tree is built from the current sources and then tested. + +Either way, this test initializes, starts, and stops several test Postgres +clusters. diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl new file mode 100644 index 0000000..311cca8 --- /dev/null +++ b/src/test/subscription/t/001_rep_changes.pl @@ -0,0 +1,415 @@ +# Basic logical replication test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 27; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql( + 'postgres', + "CREATE FUNCTION public.pg_get_replica_identity_index(int) + RETURNS regclass LANGUAGE sql AS 'SELECT 1/0'"); # shall not call +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a"); +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_mixed (a int primary key, b text, c numeric)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_mixed (a, b, c) VALUES (1, 'foo', 1.1)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))" +); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full_pk (a int primary key, b text)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_full_pk REPLICA IDENTITY FULL"); +# Let this table with REPLICA IDENTITY NOTHING, allowing only INSERT changes. +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_nothing (a int)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_nothing REPLICA IDENTITY NOTHING"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_full_pk (a int primary key, b text)"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_full_pk REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_nothing (a int)"); + +# different column count and order than on publisher +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_mixed (d text default 'local', c numeric, b text, a int primary key)" +); + +# replication of the table with included index +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_ins_only WITH (publish = insert)"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_mixed, tab_include, tab_nothing, tab_full_pk" +); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only" +); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep"); +is($result, qq(0), 'check non-replicated table is empty on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins"); +is($result, qq(1002), 'check initial data was copied to subscriber'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_ins SET a = -a"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_rep SET a = -a"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_mixed VALUES (2, 'bar', 2.2)"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full_pk VALUES (1, 'foo'), (2, 'baz')"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_nothing VALUES (generate_series(1,20))"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_include SELECT generate_series(1,50)"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab_include WHERE a > 20"); +$node_publisher->safe_psql('postgres', "UPDATE tab_include SET a = -a"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1052|1|1002), 'check replicated inserts on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_rep"); +is($result, qq(20|-20|-1), 'check replicated changes on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_mixed"); +is( $result, qq(local|1.1|foo|1 +local|2.2|bar|2), 'check replicated changes with different column order'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_nothing"); +is($result, qq(20), 'check replicated changes with REPLICA IDENTITY NOTHING'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_include"); +is($result, qq(20|-20|-1), + 'check replicated changes with primary key index with included columns'); + +# insert some duplicate rows +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_full SELECT generate_series(1,10)"); + +# add REPLICA IDENTITY FULL so we can update +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_full REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_full REPLICA IDENTITY FULL"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_full2 REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_full2 REPLICA IDENTITY FULL"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab_ins REPLICA IDENTITY FULL"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_ins REPLICA IDENTITY FULL"); +# tab_mixed can use DEFAULT, since it has a primary key + +# and do the updates +$node_publisher->safe_psql('postgres', "UPDATE tab_full SET a = a * a"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET b = 'baz' WHERE a = 1"); +$node_publisher->safe_psql('postgres', + "UPDATE tab_full_pk SET b = 'bar' WHERE a = 1"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_full"); +is($result, qq(20|1|100), + 'update works with REPLICA IDENTITY FULL and duplicate tuples'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT x FROM tab_full2 ORDER BY 1"); +is( $result, qq(a +bb +bb), + 'update works with REPLICA IDENTITY FULL and text datums'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_mixed ORDER BY a"); +is( $result, qq(local|1.1|baz|1 +local|2.2|bar|2), + 'update works with different column order and subscriber local values'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_full_pk ORDER BY a"); +is( $result, qq(1|bar +2|baz), + 'update works with REPLICA IDENTITY FULL and a primary key'); + +# Check that subscriber handles cases where update/delete target tuple +# is missing. We have to look for the DEBUG1 log messages about that, +# so temporarily bump up the log verbosity. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->reload; + +$node_subscriber->safe_psql('postgres', "DELETE FROM tab_full_pk"); + +# Note that the current location of the log file is not grabbed immediately +# after reloading the configuration, but after sending one SQL command to +# the node so as we are sure that the reloading has taken effect. +my $log_location = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "UPDATE tab_full_pk SET b = 'quux' WHERE a = 1"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_full_pk WHERE a = 2"); + +$node_publisher->wait_for_catchup('tap_sub'); + +my $logfile = slurp_file($node_subscriber->logfile, $log_location); +ok( $logfile =~ + qr/logical replication did not find row to be updated in replication target relation "tab_full_pk"/, + 'update target row is missing'); +ok( $logfile =~ + qr/logical replication did not find row to be deleted in replication target relation "tab_full_pk"/, + 'delete target row is missing'); + +$node_subscriber->append_conf('postgresql.conf', + "log_min_messages = warning"); +$node_subscriber->reload; + +# check behavior with toasted values + +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET b = repeat('xyzzy', 100000) WHERE a = 2"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, length(b), c, d FROM tab_mixed ORDER BY a"); +is( $result, qq(1|3|1.1|local +2|500000|2.2|local), + 'update transmits large column value'); + +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET c = 3.3 WHERE a = 2"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, length(b), c, d FROM tab_mixed ORDER BY a"); +is( $result, qq(1|3|1.1|local +2|500000|3.3|local), + 'update with non-transmitted large column value'); + +# check behavior with dropped columns + +# this update should get transmitted before the column goes away +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET b = 'bar', c = 2.2 WHERE a = 2"); + +$node_publisher->safe_psql('postgres', "ALTER TABLE tab_mixed DROP COLUMN b"); + +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET c = 11.11 WHERE a = 1"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_mixed ORDER BY a"); +is( $result, qq(local|11.11|baz|1 +local|2.2|bar|2), + 'update works with dropped publisher column'); + +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab_mixed DROP COLUMN d"); + +$node_publisher->safe_psql('postgres', + "UPDATE tab_mixed SET c = 22.22 WHERE a = 2"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab_mixed ORDER BY a"); +is( $result, qq(11.11|baz|1 +22.22|bar|2), + 'update works with dropped subscriber column'); + +# check that change of connection string and/or publication list causes +# restart of subscription workers. Not all of these are registered as tests +# as we need to poll for a change but the test suite will fail none the less +# when something goes wrong. +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr sslmode=disable'" +); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub';" +) or die "Timed out while waiting for apply to restart"; + +$oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only WITH (copy_data = false)" +); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub';" +) or die "Timed out while waiting for apply to restart"; + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins SELECT generate_series(1001,1100)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep"); + +# Restart the publisher and check the state of the subscriber which +# should be in a streaming state after catching up. +$node_publisher->stop('fast'); +$node_publisher->start; + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1152|1|1100), + 'check replicated inserts after subscription publication change'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_rep"); +is($result, qq(20|-20|-1), + 'check changes skipped after subscription publication change'); + +# check alter publication (relcache invalidation etc) +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only SET (publish = 'insert, delete')"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_full"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 0"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = false)" +); +$node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(0)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# note that data are different on provider and subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_ins"); +is($result, qq(1052|1|1002), + 'check replicated deletes after alter publication'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab_full"); +is($result, qq(21|0|100), 'check replicated insert after alter publication'); + +# check restart on rename +$oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub';" +); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub RENAME TO tap_sub_renamed"); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub_renamed';" +) or die "Timed out while waiting for apply to restart"; + +# check all the cleanup +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'check subscription was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), + 'check subscription relation status was dropped on subscriber'); + +$result = $node_publisher->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin"); +is($result, qq(0), 'check replication origin was dropped on subscriber'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); + +# CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING +$node_publisher->append_conf( + 'postgresql.conf', qq( +wal_level=minimal +max_wal_senders=0 +)); +$node_publisher->start; +($result, my $retout, my $reterr) = $node_publisher->psql( + 'postgres', qq{ +BEGIN; +CREATE TABLE skip_wal(); +CREATE PUBLICATION tap_pub2 FOR TABLE skip_wal; +ROLLBACK; +}); +ok( $reterr =~ + m/WARNING: wal_level is insufficient to publish logical changes/, + 'CREATE PUBLICATION while wal_level=minimal'); diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl new file mode 100644 index 0000000..aedcab2 --- /dev/null +++ b/src/test/subscription/t/002_types.pl @@ -0,0 +1,565 @@ +# This tests that more complex datatypes are replicated correctly +# by logical replication +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 4; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +my $ddl = qq( + CREATE EXTENSION hstore WITH SCHEMA public; + CREATE TABLE public.tst_one_array ( + a INTEGER PRIMARY KEY, + b INTEGER[] + ); + CREATE TABLE public.tst_arrays ( + a INTEGER[] PRIMARY KEY, + b TEXT[], + c FLOAT[], + d INTERVAL[] + ); + + CREATE TYPE public.tst_enum_t AS ENUM ('a', 'b', 'c', 'd', 'e'); + CREATE TABLE public.tst_one_enum ( + a INTEGER PRIMARY KEY, + b public.tst_enum_t + ); + CREATE TABLE public.tst_enums ( + a public.tst_enum_t PRIMARY KEY, + b public.tst_enum_t[] + ); + + CREATE TYPE public.tst_comp_basic_t AS (a FLOAT, b TEXT, c INTEGER); + CREATE TYPE public.tst_comp_enum_t AS (a FLOAT, b public.tst_enum_t, c INTEGER); + CREATE TYPE public.tst_comp_enum_array_t AS (a FLOAT, b public.tst_enum_t[], c INTEGER); + CREATE TABLE public.tst_one_comp ( + a INTEGER PRIMARY KEY, + b public.tst_comp_basic_t + ); + CREATE TABLE public.tst_comps ( + a public.tst_comp_basic_t PRIMARY KEY, + b public.tst_comp_basic_t[] + ); + CREATE TABLE public.tst_comp_enum ( + a INTEGER PRIMARY KEY, + b public.tst_comp_enum_t + ); + CREATE TABLE public.tst_comp_enum_array ( + a public.tst_comp_enum_t PRIMARY KEY, + b public.tst_comp_enum_t[] + ); + CREATE TABLE public.tst_comp_one_enum_array ( + a INTEGER PRIMARY KEY, + b public.tst_comp_enum_array_t + ); + CREATE TABLE public.tst_comp_enum_what ( + a public.tst_comp_enum_array_t PRIMARY KEY, + b public.tst_comp_enum_array_t[] + ); + + CREATE TYPE public.tst_comp_mix_t AS ( + a public.tst_comp_basic_t, + b public.tst_comp_basic_t[], + c public.tst_enum_t, + d public.tst_enum_t[] + ); + CREATE TABLE public.tst_comp_mix_array ( + a public.tst_comp_mix_t PRIMARY KEY, + b public.tst_comp_mix_t[] + ); + CREATE TABLE public.tst_range ( + a INTEGER PRIMARY KEY, + b int4range + ); + CREATE TABLE public.tst_range_array ( + a INTEGER PRIMARY KEY, + b TSTZRANGE, + c int8range[] + ); + CREATE TABLE public.tst_hstore ( + a INTEGER PRIMARY KEY, + b public.hstore + ); + + SET check_function_bodies=off; + CREATE FUNCTION public.monot_incr(int) RETURNS bool LANGUAGE sql + AS ' select \$1 > max(a) from public.tst_dom_constr; '; + CREATE DOMAIN monot_int AS int CHECK (monot_incr(VALUE)); + CREATE TABLE public.tst_dom_constr (a monot_int);); + +# Setup structure on both nodes +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR ALL TABLES"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)" +); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Wait for initial sync to finish as well +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Insert initial test data +$node_publisher->safe_psql( + 'postgres', qq( + -- test_tbl_one_array_col + INSERT INTO tst_one_array (a, b) VALUES + (1, '{1, 2, 3}'), + (2, '{2, 3, 1}'), + (3, '{3, 2, 1}'), + (4, '{4, 3, 2}'), + (5, '{5, NULL, 3}'); + + -- test_tbl_arrays + INSERT INTO tst_arrays (a, b, c, d) VALUES + ('{1, 2, 3}', '{"a", "b", "c"}', '{1.1, 2.2, 3.3}', '{"1 day", "2 days", "3 days"}'), + ('{2, 3, 1}', '{"b", "c", "a"}', '{2.2, 3.3, 1.1}', '{"2 minutes", "3 minutes", "1 minute"}'), + ('{3, 1, 2}', '{"c", "a", "b"}', '{3.3, 1.1, 2.2}', '{"3 years", "1 year", "2 years"}'), + ('{4, 1, 2}', '{"d", "a", "b"}', '{4.4, 1.1, 2.2}', '{"4 years", "1 year", "2 years"}'), + ('{5, NULL, NULL}', '{"e", NULL, "b"}', '{5.5, 1.1, NULL}', '{"5 years", NULL, NULL}'); + + -- test_tbl_single_enum + INSERT INTO tst_one_enum (a, b) VALUES + (1, 'a'), + (2, 'b'), + (3, 'c'), + (4, 'd'), + (5, NULL); + + -- test_tbl_enums + INSERT INTO tst_enums (a, b) VALUES + ('a', '{b, c}'), + ('b', '{c, a}'), + ('c', '{b, a}'), + ('d', '{c, b}'), + ('e', '{d, NULL}'); + + -- test_tbl_single_composites + INSERT INTO tst_one_comp (a, b) VALUES + (1, ROW(1.0, 'a', 1)), + (2, ROW(2.0, 'b', 2)), + (3, ROW(3.0, 'c', 3)), + (4, ROW(4.0, 'd', 4)), + (5, ROW(NULL, NULL, 5)); + + -- test_tbl_composites + INSERT INTO tst_comps (a, b) VALUES + (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]), + (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]), + (ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_basic_t]), + (ROW(4.0, 'd', 4), ARRAY[ROW(4, 'd', 3)::tst_comp_basic_t]), + (ROW(5.0, 'e', NULL), ARRAY[NULL, ROW(5, NULL, 5)::tst_comp_basic_t]); + + -- test_tbl_composite_with_enums + INSERT INTO tst_comp_enum (a, b) VALUES + (1, ROW(1.0, 'a', 1)), + (2, ROW(2.0, 'b', 2)), + (3, ROW(3.0, 'c', 3)), + (4, ROW(4.0, 'd', 4)), + (5, ROW(NULL, 'e', NULL)); + + -- test_tbl_composite_with_enums_array + INSERT INTO tst_comp_enum_array (a, b) VALUES + (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]), + (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]), + (ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_enum_t]), + (ROW(4.0, 'd', 3), ARRAY[ROW(3, 'd', 3)::tst_comp_enum_t]), + (ROW(5.0, 'e', 3), ARRAY[ROW(3, 'e', 3)::tst_comp_enum_t, NULL]); + + -- test_tbl_composite_with_single_enums_array_in_composite + INSERT INTO tst_comp_one_enum_array (a, b) VALUES + (1, ROW(1.0, '{a, b, c}', 1)), + (2, ROW(2.0, '{a, b, c}', 2)), + (3, ROW(3.0, '{a, b, c}', 3)), + (4, ROW(4.0, '{c, b, d}', 4)), + (5, ROW(5.0, '{NULL, e, NULL}', 5)); + + -- test_tbl_composite_with_enums_array_in_composite + INSERT INTO tst_comp_enum_what (a, b) VALUES + (ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]), + (ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]), + (ROW(3.0, '{c, a, b}', 1), ARRAY[ROW(3, '{c, a, b}', 1)::tst_comp_enum_array_t]), + (ROW(4.0, '{c, b, d}', 4), ARRAY[ROW(4, '{c, b, d}', 4)::tst_comp_enum_array_t]), + (ROW(5.0, '{c, NULL, b}', NULL), ARRAY[ROW(5, '{c, e, b}', 1)::tst_comp_enum_array_t]); + + -- test_tbl_mixed_composites + INSERT INTO tst_comp_mix_array (a, b) VALUES + (ROW( + ROW(1,'a',1), + ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t], + 'a', + '{a,b,NULL,c}'), + ARRAY[ + ROW( + ROW(1,'a',1), + ARRAY[ + ROW(1,'a',1)::tst_comp_basic_t, + ROW(2,'b',2)::tst_comp_basic_t, + NULL + ], + 'a', + '{a,b,c}' + )::tst_comp_mix_t + ] + ); + + -- test_tbl_range + INSERT INTO tst_range (a, b) VALUES + (1, '[1, 10]'), + (2, '[2, 20]'), + (3, '[3, 30]'), + (4, '[4, 40]'), + (5, '[5, 50]'); + + -- test_tbl_range_array + INSERT INTO tst_range_array (a, b, c) VALUES + (1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'), + (2, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz - interval '2 days', 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}'), + (3, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz - interval '3 days', 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[3,4]"}'), + (4, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz - interval '4 days', 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[4,5]", NULL, "[40,50]"}'), + (5, NULL, NULL); + + -- tst_hstore + INSERT INTO tst_hstore (a, b) VALUES + (1, '"a"=>"1"'), + (2, '"zzz"=>"foo"'), + (3, '"123"=>"321"'), + (4, '"yellow horse"=>"moaned"'); + + -- tst_dom_constr + INSERT INTO tst_dom_constr VALUES (10); +)); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Check the data on subscriber +my $result = $node_subscriber->safe_psql( + 'postgres', qq( + SET timezone = '+2'; + SELECT a, b FROM tst_one_array ORDER BY a; + SELECT a, b, c, d FROM tst_arrays ORDER BY a; + SELECT a, b FROM tst_one_enum ORDER BY a; + SELECT a, b FROM tst_enums ORDER BY a; + SELECT a, b FROM tst_one_comp ORDER BY a; + SELECT a, b FROM tst_comps ORDER BY a; + SELECT a, b FROM tst_comp_enum ORDER BY a; + SELECT a, b FROM tst_comp_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_one_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_enum_what ORDER BY a; + SELECT a, b FROM tst_comp_mix_array ORDER BY a; + SELECT a, b FROM tst_range ORDER BY a; + SELECT a, b, c FROM tst_range_array ORDER BY a; + SELECT a, b FROM tst_hstore ORDER BY a; +)); + +is( $result, '1|{1,2,3} +2|{2,3,1} +3|{3,2,1} +4|{4,3,2} +5|{5,NULL,3} +{1,2,3}|{a,b,c}|{1.1,2.2,3.3}|{"1 day","2 days","3 days"} +{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00} +{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"} +{4,1,2}|{d,a,b}|{4.4,1.1,2.2}|{"4 years","1 year","2 years"} +{5,NULL,NULL}|{e,NULL,b}|{5.5,1.1,NULL}|{"5 years",NULL,NULL} +1|a +2|b +3|c +4|d +5| +a|{b,c} +b|{c,a} +c|{b,a} +d|{c,b} +e|{d,NULL} +1|(1,a,1) +2|(2,b,2) +3|(3,c,3) +4|(4,d,4) +5|(,,5) +(1,a,1)|{"(1,a,1)"} +(2,b,2)|{"(2,b,2)"} +(3,c,3)|{"(3,c,3)"} +(4,d,4)|{"(4,d,3)"} +(5,e,)|{NULL,"(5,,5)"} +1|(1,a,1) +2|(2,b,2) +3|(3,c,3) +4|(4,d,4) +5|(,e,) +(1,a,1)|{"(1,a,1)"} +(2,b,2)|{"(2,b,2)"} +(3,c,3)|{"(3,c,3)"} +(4,d,3)|{"(3,d,3)"} +(5,e,3)|{"(3,e,3)",NULL} +1|(1,"{a,b,c}",1) +2|(2,"{a,b,c}",2) +3|(3,"{a,b,c}",3) +4|(4,"{c,b,d}",4) +5|(5,"{NULL,e,NULL}",5) +(1,"{a,b,c}",1)|{"(1,\"{a,b,c}\",1)"} +(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"} +(3,"{c,a,b}",1)|{"(3,\"{c,a,b}\",1)"} +(4,"{c,b,d}",4)|{"(4,\"{c,b,d}\",4)"} +(5,"{c,NULL,b}",)|{"(5,\"{c,e,b}\",1)"} +("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"} +1|[1,11) +2|[2,21) +3|[3,31) +4|[4,41) +5|[5,51) +1|["2014-08-04 00:00:00+02",infinity)|{"[1,3)","[10,21)"} +2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"} +3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"} +4|["2014-07-31 00:00:00+02","2014-08-04 00:00:00+02")|{"[4,6)",NULL,"[40,51)"} +5|| +1|"a"=>"1" +2|"zzz"=>"foo" +3|"123"=>"321" +4|"yellow horse"=>"moaned"', + 'check replicated inserts on subscriber'); + +# Run batch of updates +$node_publisher->safe_psql( + 'postgres', qq( + UPDATE tst_one_array SET b = '{4, 5, 6}' WHERE a = 1; + UPDATE tst_one_array SET b = '{4, 5, 6, 1}' WHERE a > 3; + UPDATE tst_arrays SET b = '{"1a", "2b", "3c"}', c = '{1.0, 2.0, 3.0}', d = '{"1 day 1 second", "2 days 2 seconds", "3 days 3 second"}' WHERE a = '{1, 2, 3}'; + UPDATE tst_arrays SET b = '{"c", "d", "e"}', c = '{3.0, 4.0, 5.0}', d = '{"3 day 1 second", "4 days 2 seconds", "5 days 3 second"}' WHERE a[1] > 3; + UPDATE tst_one_enum SET b = 'c' WHERE a = 1; + UPDATE tst_one_enum SET b = NULL WHERE a > 3; + UPDATE tst_enums SET b = '{e, NULL}' WHERE a = 'a'; + UPDATE tst_enums SET b = '{e, d}' WHERE a > 'c'; + UPDATE tst_one_comp SET b = ROW(1.0, 'A', 1) WHERE a = 1; + UPDATE tst_one_comp SET b = ROW(NULL, 'x', -1) WHERE a > 3; + UPDATE tst_comps SET b = ARRAY[ROW(9, 'x', -1)::tst_comp_basic_t] WHERE (a).a = 1.0; + UPDATE tst_comps SET b = ARRAY[NULL, ROW(9, 'x', NULL)::tst_comp_basic_t] WHERE (a).a > 3.9; + UPDATE tst_comp_enum SET b = ROW(1.0, NULL, NULL) WHERE a = 1; + UPDATE tst_comp_enum SET b = ROW(4.0, 'd', 44) WHERE a > 3; + UPDATE tst_comp_enum_array SET b = ARRAY[NULL, ROW(3, 'd', 3)::tst_comp_enum_t] WHERE a = ROW(1.0, 'a', 1)::tst_comp_enum_t; + UPDATE tst_comp_enum_array SET b = ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t, ROW(2, 'b', 2)::tst_comp_enum_t] WHERE (a).a > 3; + UPDATE tst_comp_one_enum_array SET b = ROW(1.0, '{a, e, c}', NULL) WHERE a = 1; + UPDATE tst_comp_one_enum_array SET b = ROW(4.0, '{c, b, d}', 4) WHERE a > 3; + UPDATE tst_comp_enum_what SET b = ARRAY[NULL, ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t, ROW(NULL, '{a, e, c}', 2)::tst_comp_enum_array_t] WHERE (a).a = 1; + UPDATE tst_comp_enum_what SET b = ARRAY[ROW(5, '{a, b, c}', 5)::tst_comp_enum_array_t] WHERE (a).a > 3; + UPDATE tst_comp_mix_array SET b[2] = NULL WHERE ((a).a).a = 1; + UPDATE tst_range SET b = '[100, 1000]' WHERE a = 1; + UPDATE tst_range SET b = '(1, 90)' WHERE a > 3; + UPDATE tst_range_array SET c = '{"[100, 1000]"}' WHERE a = 1; + UPDATE tst_range_array SET b = tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), c = '{NULL, "[11,9999999]"}' WHERE a > 3; + UPDATE tst_hstore SET b = '"updated"=>"value"' WHERE a < 3; + UPDATE tst_hstore SET b = '"also"=>"updated"' WHERE a = 3; +)); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Check the data on subscriber +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SET timezone = '+2'; + SELECT a, b FROM tst_one_array ORDER BY a; + SELECT a, b, c, d FROM tst_arrays ORDER BY a; + SELECT a, b FROM tst_one_enum ORDER BY a; + SELECT a, b FROM tst_enums ORDER BY a; + SELECT a, b FROM tst_one_comp ORDER BY a; + SELECT a, b FROM tst_comps ORDER BY a; + SELECT a, b FROM tst_comp_enum ORDER BY a; + SELECT a, b FROM tst_comp_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_one_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_enum_what ORDER BY a; + SELECT a, b FROM tst_comp_mix_array ORDER BY a; + SELECT a, b FROM tst_range ORDER BY a; + SELECT a, b, c FROM tst_range_array ORDER BY a; + SELECT a, b FROM tst_hstore ORDER BY a; +)); + +is( $result, '1|{4,5,6} +2|{2,3,1} +3|{3,2,1} +4|{4,5,6,1} +5|{4,5,6,1} +{1,2,3}|{1a,2b,3c}|{1,2,3}|{"1 day 00:00:01","2 days 00:00:02","3 days 00:00:03"} +{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00} +{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"} +{4,1,2}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"} +{5,NULL,NULL}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"} +1|c +2|b +3|c +4| +5| +a|{e,NULL} +b|{c,a} +c|{b,a} +d|{e,d} +e|{e,d} +1|(1,A,1) +2|(2,b,2) +3|(3,c,3) +4|(,x,-1) +5|(,x,-1) +(1,a,1)|{"(9,x,-1)"} +(2,b,2)|{"(2,b,2)"} +(3,c,3)|{"(3,c,3)"} +(4,d,4)|{NULL,"(9,x,)"} +(5,e,)|{NULL,"(9,x,)"} +1|(1,,) +2|(2,b,2) +3|(3,c,3) +4|(4,d,44) +5|(4,d,44) +(1,a,1)|{NULL,"(3,d,3)"} +(2,b,2)|{"(2,b,2)"} +(3,c,3)|{"(3,c,3)"} +(4,d,3)|{"(1,a,1)","(2,b,2)"} +(5,e,3)|{"(1,a,1)","(2,b,2)"} +1|(1,"{a,e,c}",) +2|(2,"{a,b,c}",2) +3|(3,"{a,b,c}",3) +4|(4,"{c,b,d}",4) +5|(4,"{c,b,d}",4) +(1,"{a,b,c}",1)|{NULL,"(1,\"{a,b,c}\",1)","(,\"{a,e,c}\",2)"} +(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"} +(3,"{c,a,b}",1)|{"(3,\"{c,a,b}\",1)"} +(4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"} +(5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"} +("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")",NULL} +1|[100,1001) +2|[2,21) +3|[3,31) +4|[2,90) +5|[2,90) +1|["2014-08-04 00:00:00+02",infinity)|{"[100,1001)"} +2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"} +3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"} +4|["2014-08-04 00:00:00+02",infinity)|{NULL,"[11,10000000)"} +5|["2014-08-04 00:00:00+02",infinity)|{NULL,"[11,10000000)"} +1|"updated"=>"value" +2|"updated"=>"value" +3|"also"=>"updated" +4|"yellow horse"=>"moaned"', + 'check replicated updates on subscriber'); + +# Run batch of deletes +$node_publisher->safe_psql( + 'postgres', qq( + DELETE FROM tst_one_array WHERE a = 1; + DELETE FROM tst_one_array WHERE b = '{2, 3, 1}'; + DELETE FROM tst_arrays WHERE a = '{1, 2, 3}'; + DELETE FROM tst_arrays WHERE a[1] = 2; + DELETE FROM tst_one_enum WHERE a = 1; + DELETE FROM tst_one_enum WHERE b = 'b'; + DELETE FROM tst_enums WHERE a = 'a'; + DELETE FROM tst_enums WHERE b[1] = 'b'; + DELETE FROM tst_one_comp WHERE a = 1; + DELETE FROM tst_one_comp WHERE (b).a = 2.0; + DELETE FROM tst_comps WHERE (a).b = 'a'; + DELETE FROM tst_comps WHERE ROW(3, 'c', 3)::tst_comp_basic_t = ANY(b); + DELETE FROM tst_comp_enum WHERE a = 1; + DELETE FROM tst_comp_enum WHERE (b).a = 2.0; + DELETE FROM tst_comp_enum_array WHERE a = ROW(1.0, 'a', 1)::tst_comp_enum_t; + DELETE FROM tst_comp_enum_array WHERE ROW(3, 'c', 3)::tst_comp_enum_t = ANY(b); + DELETE FROM tst_comp_one_enum_array WHERE a = 1; + DELETE FROM tst_comp_one_enum_array WHERE 'a' = ANY((b).b); + DELETE FROM tst_comp_enum_what WHERE (a).a = 1; + DELETE FROM tst_comp_enum_what WHERE (b[1]).b = '{c, a, b}'; + DELETE FROM tst_comp_mix_array WHERE ((a).a).a = 1; + DELETE FROM tst_range WHERE a = 1; + DELETE FROM tst_range WHERE '[10,20]' && b; + DELETE FROM tst_range_array WHERE a = 1; + DELETE FROM tst_range_array WHERE tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 05 00:00:00 2014 CEST'::timestamptz) && b; + DELETE FROM tst_hstore WHERE a = 1; +)); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Check the data on subscriber +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SET timezone = '+2'; + SELECT a, b FROM tst_one_array ORDER BY a; + SELECT a, b, c, d FROM tst_arrays ORDER BY a; + SELECT a, b FROM tst_one_enum ORDER BY a; + SELECT a, b FROM tst_enums ORDER BY a; + SELECT a, b FROM tst_one_comp ORDER BY a; + SELECT a, b FROM tst_comps ORDER BY a; + SELECT a, b FROM tst_comp_enum ORDER BY a; + SELECT a, b FROM tst_comp_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_one_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_enum_what ORDER BY a; + SELECT a, b FROM tst_comp_mix_array ORDER BY a; + SELECT a, b FROM tst_range ORDER BY a; + SELECT a, b, c FROM tst_range_array ORDER BY a; + SELECT a, b FROM tst_hstore ORDER BY a; +)); + +is( $result, '3|{3,2,1} +4|{4,5,6,1} +5|{4,5,6,1} +{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"} +{4,1,2}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"} +{5,NULL,NULL}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"} +3|c +4| +5| +b|{c,a} +d|{e,d} +e|{e,d} +3|(3,c,3) +4|(,x,-1) +5|(,x,-1) +(2,b,2)|{"(2,b,2)"} +(4,d,4)|{NULL,"(9,x,)"} +(5,e,)|{NULL,"(9,x,)"} +3|(3,c,3) +4|(4,d,44) +5|(4,d,44) +(2,b,2)|{"(2,b,2)"} +(4,d,3)|{"(1,a,1)","(2,b,2)"} +(5,e,3)|{"(1,a,1)","(2,b,2)"} +4|(4,"{c,b,d}",4) +5|(4,"{c,b,d}",4) +(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"} +(4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"} +(5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"} +2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"} +3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"} +2|"updated"=>"value" +3|"also"=>"updated" +4|"yellow horse"=>"moaned"', + 'check replicated deletes on subscriber'); + +# Test a domain with a constraint backed by a SQL-language function, +# which needs an active snapshot in order to operate. +$node_publisher->safe_psql('postgres', + "INSERT INTO tst_dom_constr VALUES (11)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT sum(a) FROM tst_dom_constr"); +is($result, '21', 'sql-function constraint on domain'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl new file mode 100644 index 0000000..9f140b5 --- /dev/null +++ b/src/test/subscription/t/003_constraints.pl @@ -0,0 +1,136 @@ +# This test checks that constraints work on subscriber +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 6; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Setup structure on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_fk (bid int PRIMARY KEY);"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, junk text, bid int REFERENCES tab_fk (bid));" +); + +# Setup structure on subscriber; column order intentionally different +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_fk (bid int PRIMARY KEY);"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid), junk text);" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR ALL TABLES;"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false)" +); + +$node_publisher->wait_for_catchup('tap_sub'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk (bid) VALUES (1);"); +# "junk" value is meant to be large enough to force out-of-line storage +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk_ref (id, bid, junk) VALUES (1, 1, repeat(pi()::text,20000));" +); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Check data on subscriber +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(bid), max(bid) FROM tab_fk;"); +is($result, qq(1|1|1), 'check replicated tab_fk inserts on subscriber'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); +is($result, qq(1|1|1), 'check replicated tab_fk_ref inserts on subscriber'); + +# Drop the fk on publisher +$node_publisher->safe_psql('postgres', "DROP TABLE tab_fk CASCADE;"); + +# Insert data +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk_ref (id, bid) VALUES (2, 2);"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# FK is not enforced on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); +is($result, qq(2|1|2), 'check FK ignored on subscriber'); + +# Add replica trigger +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE FUNCTION filter_basic_dml_fn() RETURNS TRIGGER AS \$\$ +BEGIN + IF (TG_OP = 'INSERT') THEN + IF (NEW.id < 10) THEN + RETURN NEW; + ELSE + RETURN NULL; + END IF; + ELSIF (TG_OP = 'UPDATE') THEN + RETURN NULL; + ELSE + RAISE WARNING 'Unknown action'; + RETURN NULL; + END IF; +END; +\$\$ LANGUAGE plpgsql; +CREATE TRIGGER filter_basic_dml_trg + BEFORE INSERT OR UPDATE OF bid ON tab_fk_ref + FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn(); +ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg; +}); + +# Insert data +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_fk_ref (id, bid) VALUES (10, 10);"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# The trigger should cause the insert to be skipped on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); +is($result, qq(2|1|2), 'check replica insert trigger applied on subscriber'); + +# Update data +$node_publisher->safe_psql('postgres', + "UPDATE tab_fk_ref SET bid = 2 WHERE bid = 1;"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# The trigger should cause the update to be skipped on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;"); +is($result, qq(2|1|2), + 'check replica update column trigger applied on subscriber'); + +# Update on a column not specified in the trigger, but it will trigger +# anyway because logical replication ships all columns in an update. +$node_publisher->safe_psql('postgres', + "UPDATE tab_fk_ref SET id = 6 WHERE id = 1;"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(id), max(id) FROM tab_fk_ref;"); +is($result, qq(2|1|2), + 'check column trigger applied even on update for other column'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl new file mode 100644 index 0000000..e111ab9 --- /dev/null +++ b/src/test/subscription/t/004_sync.pl @@ -0,0 +1,155 @@ +# Tests for logical replication table syncing +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 7; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + "wal_retrieve_retry_interval = 1ms"); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep SELECT generate_series(1,10)"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR ALL TABLES"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" +); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); +is($result, qq(10), 'initial data synced for first sub'); + +# drop subscription so that there is unreplicated data +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep SELECT generate_series(11,20)"); + +# recreate the subscription, it will try to do initial copy +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" +); + +# but it will be stuck on data copy as it will fail on constraint +my $started_query = "SELECT srsubstate = 'd' FROM pg_subscription_rel;"; +$node_subscriber->poll_query_until('postgres', $started_query) + or die "Timed out while waiting for subscriber to start sync"; + +# remove the conflicting data +$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;"); + +# wait for sync to finish this time +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# check that all data is synced +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); +is($result, qq(20), 'initial data synced for second sub'); + +# now check another subscription for the same node pair +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false)" +); + +# wait for it to start +$node_subscriber->poll_query_until('postgres', + "SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL" +) or die "Timed out while waiting for subscriber to start"; + +# and drop both subscriptions +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2"); + +# check subscriptions are removed +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'second and third sub are dropped'); + +# remove the conflicting data +$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;"); + +# recreate the subscription again +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" +); + +# and wait for data sync to finish again +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# check that all data is synced +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); +is($result, qq(20), 'initial data synced for fourth sub'); + +# add new table on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_rep_next (a int)"); + +# setup structure with existing data on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rep_next (a) AS SELECT generate_series(1,10)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_rep_next"); +is($result, qq(0), 'no data for table added after subscription initialized'); + +# ask for data sync +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); + +# wait for sync to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_rep_next"); +is($result, qq(10), + 'data for table added after subscription initialized are now synced'); + +# Add some data +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep_next SELECT generate_series(1,10)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_rep_next"); +is($result, qq(20), + 'changes for table added after subscription initialized replicated'); + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/test/subscription/t/005_encoding.pl b/src/test/subscription/t/005_encoding.pl new file mode 100644 index 0000000..aec7a17 --- /dev/null +++ b/src/test/subscription/t/005_encoding.pl @@ -0,0 +1,52 @@ +# Test replication between databases with different encodings +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 1; + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init( + allows_streaming => 'logical', + extra => [ '--locale=C', '--encoding=UTF8' ]); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init( + allows_streaming => 'logical', + extra => [ '--locale=C', '--encoding=LATIN1' ]); +$node_subscriber->start; + +my $ddl = "CREATE TABLE test1 (a int, b text);"; +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION mypub FOR ALL TABLES;"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" +); + +$node_publisher->wait_for_catchup('mysub'); + +# Wait for initial sync to finish as well +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_publisher->safe_psql('postgres', + q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8 + +$node_publisher->wait_for_catchup('mysub'); + +is( $node_subscriber->safe_psql( + 'postgres', q{SELECT a FROM test1 WHERE b = E'Mot\xf6rhead'} + ), # LATIN1 + qq(1), + 'data replicated to subscriber'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/006_rewrite.pl b/src/test/subscription/t/006_rewrite.pl new file mode 100644 index 0000000..c6cda10 --- /dev/null +++ b/src/test/subscription/t/006_rewrite.pl @@ -0,0 +1,65 @@ +# Test logical replication behavior with heap rewrites +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $ddl = "CREATE TABLE test1 (a int, b text);"; +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION mypub FOR ALL TABLES;"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" +); + +$node_publisher->wait_for_catchup('mysub'); + +# Wait for initial sync to finish as well +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$node_publisher->safe_psql('postgres', + q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');}); + +$node_publisher->wait_for_catchup('mysub'); + +is( $node_subscriber->safe_psql('postgres', q{SELECT a, b FROM test1}), + qq(1|one +2|two), + 'initial data replicated to subscriber'); + +# DDL that causes a heap rewrite +my $ddl2 = "ALTER TABLE test1 ADD c int NOT NULL DEFAULT 0;"; +$node_subscriber->safe_psql('postgres', $ddl2); +$node_publisher->safe_psql('postgres', $ddl2); + +$node_publisher->wait_for_catchup('mysub'); + +$node_publisher->safe_psql('postgres', + q{INSERT INTO test1 (a, b, c) VALUES (3, 'three', 33);}); + +$node_publisher->wait_for_catchup('mysub'); + +is( $node_subscriber->safe_psql('postgres', q{SELECT a, b, c FROM test1}), + qq(1|one|0 +2|two|0 +3|three|33), + 'data replicated to subscriber'); + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl new file mode 100644 index 0000000..7fe6cc6 --- /dev/null +++ b/src/test/subscription/t/007_ddl.pl @@ -0,0 +1,42 @@ +# Test some logical replication DDL behavior +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 1; + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $ddl = "CREATE TABLE test1 (a int, b text);"; +$node_publisher->safe_psql('postgres', $ddl); +$node_subscriber->safe_psql('postgres', $ddl); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION mypub FOR ALL TABLES;"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" +); + +$node_publisher->wait_for_catchup('mysub'); + +$node_subscriber->safe_psql( + 'postgres', q{ +BEGIN; +ALTER SUBSCRIPTION mysub DISABLE; +ALTER SUBSCRIPTION mysub SET (slot_name = NONE); +DROP SUBSCRIPTION mysub; +COMMIT; +}); + +pass "subscription disable and drop in same transaction did not hang"; + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/008_diff_schema.pl b/src/test/subscription/t/008_diff_schema.pl new file mode 100644 index 0000000..963334e --- /dev/null +++ b/src/test/subscription/t/008_diff_schema.pl @@ -0,0 +1,125 @@ +# Test behavior with different schema on subscriber +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 5; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999, e int GENERATED BY DEFAULT AS IDENTITY)" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR ALL TABLES"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" +); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +# Update the rows on the publisher and check the additional columns on +# subscriber didn't change +$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(b)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999), count(e) FROM test_tab"); +is($result, qq(2|2|2|2), + 'check extra columns contain local defaults after copy'); + +# Change the local values of the extra columns on the subscriber, +# update publisher, and check that subscriber retains the expected +# values +$node_subscriber->safe_psql('postgres', + "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'" +); +$node_publisher->safe_psql('postgres', + "UPDATE test_tab SET b = md5(a::text)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab" +); +is($result, qq(2|2|2), 'check extra columns contain locally changed data'); + +# Another insert +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (3, 'baz')"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999), count(e) FROM test_tab"); +is($result, qq(3|3|3|3), + 'check extra columns contain local defaults after apply'); + + +# Check a bug about adding a replica identity column on the subscriber +# that was not yet mapped to a column on the publisher. This would +# result in errors on the subscriber and replication thus not +# progressing. +# (https://www.postgresql.org/message-id/flat/a9139c29-7ddd-973b-aa7f-71fed9c38d75%40minerva.info) + +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab2 (a int)"); + +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab2 (a int)"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); + +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Add replica identity column. (The serial is not necessary, but it's +# a convenient way to get a default on the new column so that rows +# from the publisher that don't have the column yet can be inserted.) +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab2 ADD COLUMN b serial PRIMARY KEY"); + +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab2 VALUES (1)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +is( $node_subscriber->safe_psql( + 'postgres', "SELECT count(*), min(a), max(a) FROM test_tab2"), + qq(1|1|1), + 'check replicated inserts on subscriber'); + + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/009_matviews.pl b/src/test/subscription/t/009_matviews.pl new file mode 100644 index 0000000..7afc7bd --- /dev/null +++ b/src/test/subscription/t/009_matviews.pl @@ -0,0 +1,49 @@ +# Test materialized views behavior +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 1; + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION mypub FOR ALL TABLES;"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;" +); + +$node_publisher->safe_psql('postgres', + q{CREATE TABLE test1 (a int PRIMARY KEY, b text)}); +$node_publisher->safe_psql('postgres', + q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');}); + +$node_subscriber->safe_psql('postgres', + q{CREATE TABLE test1 (a int PRIMARY KEY, b text);}); + +$node_publisher->wait_for_catchup('mysub'); + +# Materialized views are not supported by logical replication, but +# logical decoding does produce change information for them, so we +# need to make sure they are properly ignored. (bug #15044) + +# create a MV with some data +$node_publisher->safe_psql('postgres', + q{CREATE MATERIALIZED VIEW testmv1 AS SELECT * FROM test1;}); +$node_publisher->wait_for_catchup('mysub'); + +# There is no equivalent relation on the subscriber, but MV data is +# not replicated, so this does not hang. + +pass "materialized view data not replicated"; + +$node_subscriber->stop; +$node_publisher->stop; diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl new file mode 100644 index 0000000..1f3719c --- /dev/null +++ b/src/test/subscription/t/010_truncate.pl @@ -0,0 +1,211 @@ +# Test TRUNCATE +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 12; + +# setup + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + qq(max_logical_replication_workers = 6)); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)"); + +$node_subscriber->safe_psql('postgres', + "CREATE SEQUENCE seq1 OWNED BY tab1.a"); +$node_subscriber->safe_psql('postgres', "ALTER SEQUENCE seq1 START 101"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1 FOR TABLE tab1"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub2 FOR TABLE tab2 WITH (publish = insert)"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub3 FOR TABLE tab3, tab4"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" +); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2" +); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3" +); + +# Wait for initial sync of all subscriptions +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# insert data to truncate + +$node_subscriber->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (2), (3)"); + +$node_publisher->wait_for_catchup('sub1'); + +# truncate and check + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1"); + +$node_publisher->wait_for_catchup('sub1'); + +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'truncate replicated'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT nextval('seq1')"); +is($result, qq(1), 'sequence not restarted'); + +# truncate with restart identity + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1 RESTART IDENTITY"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT nextval('seq1')"); +is($result, qq(101), 'truncate restarted identities'); + +# test publication that does not replicate truncate + +$node_subscriber->safe_psql('postgres', + "INSERT INTO tab2 VALUES (1), (2), (3)"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(3|1|3), 'truncate not replicated'); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub2 SET (publish = 'insert, truncate')"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(0||), 'truncate replicated after publication change'); + +# test multiple tables connected by foreign keys + +$node_subscriber->safe_psql('postgres', + "INSERT INTO tab3 VALUES (1), (2), (3)"); +$node_subscriber->safe_psql('postgres', + "INSERT INTO tab4 VALUES (11, 1), (111, 1), (22, 2)"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab3, tab4"); + +$node_publisher->wait_for_catchup('sub3'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab3"); +is($result, qq(0||), 'truncate of multiple tables replicated'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(x), max(x) FROM tab4"); +is($result, qq(0||), 'truncate of multiple tables replicated'); + +# test truncate of multiple tables, some of which are not published + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub2"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION pub2"); + +$node_subscriber->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (2), (3)"); +$node_subscriber->safe_psql('postgres', + "INSERT INTO tab2 VALUES (1), (2), (3)"); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1, tab2"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'truncate of multiple tables some not published'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(3|1|3), 'truncate of multiple tables some not published'); + +# test that truncate works for logical replication when there are multiple +# subscriptions for a single table + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab5 (a int)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab5 (a int)"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub5 FOR TABLE tab5"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub5_1 CONNECTION '$publisher_connstr' PUBLICATION pub5" +); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub5_2 CONNECTION '$publisher_connstr' PUBLICATION pub5" +); + +# wait for initial data sync +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# insert data to truncate + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab5 VALUES (1), (2), (3)"); + +$node_publisher->wait_for_catchup('sub5_1'); +$node_publisher->wait_for_catchup('sub5_2'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab5"); +is($result, qq(6|1|3), 'insert replicated for multiple subscriptions'); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab5"); + +$node_publisher->wait_for_catchup('sub5_1'); +$node_publisher->wait_for_catchup('sub5_2'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab5"); +is($result, qq(0||), + 'truncate replicated for multiple subscriptions'); + +# check deadlocks +$result = $node_subscriber->safe_psql('postgres', + "SELECT deadlocks FROM pg_stat_database WHERE datname='postgres'"); +is($result, qq(0), 'no deadlocks detected'); diff --git a/src/test/subscription/t/011_generated.pl b/src/test/subscription/t/011_generated.pl new file mode 100644 index 0000000..f35d1cb --- /dev/null +++ b/src/test/subscription/t/011_generated.pl @@ -0,0 +1,63 @@ +# Test generated columns +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +# setup + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED)" +); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 22) STORED)" +); + +# data for initial sync + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 (a) VALUES (1), (2), (3)"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1 FOR ALL TABLES"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" +); + +# Wait for initial sync of all subscriptions +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1"); +is( $result, qq(1|22 +2|44 +3|66), 'generated columns initial sync'); + +# data to replicate + +$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (4), (5)"); + +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1"); +is( $result, qq(1|22 +2|44 +3|66 +4|88 +6|132), 'generated columns replicated'); diff --git a/src/test/subscription/t/012_collation.pl b/src/test/subscription/t/012_collation.pl new file mode 100644 index 0000000..4bfcef7 --- /dev/null +++ b/src/test/subscription/t/012_collation.pl @@ -0,0 +1,107 @@ +# Test collations, in particular nondeterministic ones +# (only works with ICU) +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More; + +if ($ENV{with_icu} eq 'yes') +{ + plan tests => 2; +} +else +{ + plan skip_all => 'ICU not supported by this build'; +} + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init( + allows_streaming => 'logical', + extra => [ '--locale=C', '--encoding=UTF8' ]); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init( + allows_streaming => 'logical', + extra => [ '--locale=C', '--encoding=UTF8' ]); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +# Test plan: Create a table with a nondeterministic collation in the +# primary key column. Pre-insert rows on the publisher and subscriber +# that are collation-wise equal but byte-wise different. (We use a +# string in different normal forms for that.) Set up publisher and +# subscriber. Update the row on the publisher, but don't change the +# primary key column. The subscriber needs to find the row to be +# updated using the nondeterministic collation semantics. We need to +# test for both a replica identity index and for replica identity +# full, since those have different code paths internally. + +$node_subscriber->safe_psql('postgres', + q{CREATE COLLATION ctest_nondet (provider = icu, locale = 'und', deterministic = false)} +); + +# table with replica identity index + +$node_publisher->safe_psql('postgres', + q{CREATE TABLE tab1 (a text PRIMARY KEY, b text)}); + +$node_publisher->safe_psql('postgres', + q{INSERT INTO tab1 VALUES (U&'\00E4bc', 'foo')}); + +$node_subscriber->safe_psql('postgres', + q{CREATE TABLE tab1 (a text COLLATE ctest_nondet PRIMARY KEY, b text)}); + +$node_subscriber->safe_psql('postgres', + q{INSERT INTO tab1 VALUES (U&'\0061\0308bc', 'foo')}); + +# table with replica identity full + +$node_publisher->safe_psql('postgres', q{CREATE TABLE tab2 (a text, b text)}); +$node_publisher->safe_psql('postgres', + q{ALTER TABLE tab2 REPLICA IDENTITY FULL}); + +$node_publisher->safe_psql('postgres', + q{INSERT INTO tab2 VALUES (U&'\00E4bc', 'foo')}); + +$node_subscriber->safe_psql('postgres', + q{CREATE TABLE tab2 (a text COLLATE ctest_nondet, b text)}); +$node_subscriber->safe_psql('postgres', + q{ALTER TABLE tab2 REPLICA IDENTITY FULL}); + +$node_subscriber->safe_psql('postgres', + q{INSERT INTO tab2 VALUES (U&'\0061\0308bc', 'foo')}); + +# set up publication, subscription + +$node_publisher->safe_psql('postgres', + q{CREATE PUBLICATION pub1 FOR ALL TABLES}); + +$node_subscriber->safe_psql('postgres', + qq{CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (copy_data = false)} +); + +$node_publisher->wait_for_catchup('sub1'); + +# test with replica identity index + +$node_publisher->safe_psql('postgres', + q{UPDATE tab1 SET b = 'bar' WHERE b = 'foo'}); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres', q{SELECT b FROM tab1}), + qq(bar), 'update with primary key with nondeterministic collation'); + +# test with replica identity full + +$node_publisher->safe_psql('postgres', + q{UPDATE tab2 SET b = 'bar' WHERE b = 'foo'}); + +$node_publisher->wait_for_catchup('sub1'); + +is($node_subscriber->safe_psql('postgres', q{SELECT b FROM tab2}), + qq(bar), + 'update with replica identity full with nondeterministic collation'); diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl new file mode 100644 index 0000000..7784fcf --- /dev/null +++ b/src/test/subscription/t/013_partition.pl @@ -0,0 +1,728 @@ +# Test logical replication with partitioned tables +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 62; + +# setup + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber1 = get_new_node('subscriber1'); +$node_subscriber1->init(allows_streaming => 'logical'); +$node_subscriber1->start; + +my $node_subscriber2 = get_new_node('subscriber2'); +$node_subscriber2->init(allows_streaming => 'logical'); +$node_subscriber2->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +# publisher +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub_all FOR ALL TABLES"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1_1 (b text, a int NOT NULL)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (4, 5, 6)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1_def PARTITION OF tab1 DEFAULT"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub1 ADD TABLE tab1, tab1_1"); + +# subscriber1 +# +# This is partitioned differently from the publisher. tab1_2 is +# subpartitioned. This tests the tuple routing code on the +# subscriber. +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1 (c text, a int PRIMARY KEY, b text) PARTITION BY LIST (a)" +); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)" +); +$node_subscriber1->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (4, 5, 6) PARTITION BY LIST (a)" +); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2_1 (c text, b text, a int NOT NULL)"); +$node_subscriber1->safe_psql('postgres', + "ALTER TABLE tab1_2 ATTACH PARTITION tab1_2_1 FOR VALUES IN (5)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2_2 PARTITION OF tab1_2 FOR VALUES IN (4, 6)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_def PARTITION OF tab1 (c DEFAULT 'sub1_tab1') DEFAULT" +); +$node_subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" +); + +# Add set of AFTER replica triggers for testing that they are fired +# correctly. This uses a table that records details of all trigger +# activities. Triggers are marked as enabled for a subset of the +# partition tree. +$node_subscriber1->safe_psql( + 'postgres', qq{ +CREATE TABLE sub1_trigger_activity (tgtab text, tgop text, + tgwhen text, tglevel text, olda int, newa int); +CREATE FUNCTION sub1_trigger_activity_func() RETURNS TRIGGER AS \$\$ +BEGIN + IF (TG_OP = 'INSERT') THEN + INSERT INTO public.sub1_trigger_activity + SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, NULL, NEW.a; + ELSIF (TG_OP = 'UPDATE') THEN + INSERT INTO public.sub1_trigger_activity + SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, OLD.a, NEW.a; + END IF; + RETURN NULL; +END; +\$\$ LANGUAGE plpgsql; +CREATE TRIGGER sub1_tab1_log_op_trigger + AFTER INSERT OR UPDATE ON tab1 + FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func(); +ALTER TABLE ONLY tab1 ENABLE REPLICA TRIGGER sub1_tab1_log_op_trigger; +CREATE TRIGGER sub1_tab1_2_log_op_trigger + AFTER INSERT OR UPDATE ON tab1_2 + FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func(); +ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub1_tab1_2_log_op_trigger; +CREATE TRIGGER sub1_tab1_2_2_log_op_trigger + AFTER INSERT OR UPDATE ON tab1_2_2 + FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func(); +ALTER TABLE ONLY tab1_2_2 ENABLE REPLICA TRIGGER sub1_tab1_2_2_log_op_trigger; +}); + +# subscriber 2 +# +# This does not use partitioning. The tables match the leaf tables on +# the publisher. +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_1', b text)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_def (a int PRIMARY KEY, b text, c text DEFAULT 'sub2_tab1_def')" +); +$node_subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub_all" +); + +# Add set of AFTER replica triggers for testing that they are fired +# correctly, using the same method as the first subscriber. +$node_subscriber2->safe_psql( + 'postgres', qq{ +CREATE TABLE sub2_trigger_activity (tgtab text, + tgop text, tgwhen text, tglevel text, olda int, newa int); +CREATE FUNCTION sub2_trigger_activity_func() RETURNS TRIGGER AS \$\$ +BEGIN + IF (TG_OP = 'INSERT') THEN + INSERT INTO public.sub2_trigger_activity + SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, NULL, NEW.a; + ELSIF (TG_OP = 'UPDATE') THEN + INSERT INTO public.sub2_trigger_activity + SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, OLD.a, NEW.a; + END IF; + RETURN NULL; +END; +\$\$ LANGUAGE plpgsql; +CREATE TRIGGER sub2_tab1_log_op_trigger + AFTER INSERT OR UPDATE ON tab1 + FOR EACH ROW EXECUTE PROCEDURE sub2_trigger_activity_func(); +ALTER TABLE ONLY tab1 ENABLE REPLICA TRIGGER sub2_tab1_log_op_trigger; +CREATE TRIGGER sub2_tab1_2_log_op_trigger + AFTER INSERT OR UPDATE ON tab1_2 + FOR EACH ROW EXECUTE PROCEDURE sub2_trigger_activity_func(); +ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub2_tab1_2_log_op_trigger; +}); + +# Wait for initial sync of all subscriptions +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber1->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Tests for replication using leaf partition identity and schema + +# insert +$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab1_1 (a) VALUES (3)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab1_2 VALUES (5)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (0)"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +my $result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is( $result, qq(sub1_tab1|0 +sub1_tab1|1 +sub1_tab1|3 +sub1_tab1|5), 'inserts into tab1 and its partitions replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab1_2_1 ORDER BY 1"); +is($result, qq(5), 'inserts into tab1_2 replicated into tab1_2_1 correctly'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab1_2_2 ORDER BY 1"); +is($result, qq(), 'inserts into tab1_2 replicated into tab1_2_2 correctly'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_1 ORDER BY 1, 2"); +is( $result, qq(sub2_tab1_1|1 +sub2_tab1_1|3), 'inserts into tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_2 ORDER BY 1, 2"); +is($result, qq(sub2_tab1_2|5), 'inserts into tab1_2 replicated'); + +# The AFTER trigger of tab1_2 should have recorded one INSERT. +$result = $node_subscriber2->safe_psql('postgres', + "SELECT * FROM sub2_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;" +); +is( $result, + qq(tab1_2|INSERT|AFTER|ROW||5), + 'check replica insert after trigger applied on subscriber'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_def ORDER BY 1, 2"); +is($result, qq(sub2_tab1_def|0), 'inserts into tab1_def replicated'); + +# update (replicated as update) +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 2 WHERE a = 1"); +# All of the following cause an update to be applied to a partitioned +# table on subscriber1: tab1_2 is leaf partition on publisher, whereas +# it's sub-partitioned on subscriber1. +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5"); +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 4 WHERE a = 6"); +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 4"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is( $result, qq(sub1_tab1|0 +sub1_tab1|2 +sub1_tab1|3 +sub1_tab1|6), 'update of tab1_1, tab1_2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab1_2_1 ORDER BY 1"); +is($result, qq(), 'updates of tab1_2 replicated into tab1_2_1 correctly'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab1_2_2 ORDER BY 1"); +is($result, qq(6), 'updates of tab1_2 replicated into tab1_2_2 correctly'); + +# The AFTER trigger should have recorded the UPDATEs of tab1_2_2. +$result = $node_subscriber1->safe_psql('postgres', + "SELECT * FROM sub1_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;" +); +is( $result, qq(tab1_2_2|INSERT|AFTER|ROW||6 +tab1_2_2|UPDATE|AFTER|ROW|4|6 +tab1_2_2|UPDATE|AFTER|ROW|6|4), + 'check replica update after trigger applied on subscriber'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_1 ORDER BY 1, 2"); +is( $result, qq(sub2_tab1_1|2 +sub2_tab1_1|3), 'update of tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_2 ORDER BY 1, 2"); +is($result, qq(sub2_tab1_2|6), 'tab1_2 updated'); + +# The AFTER trigger should have recorded the updates of tab1_2. +$result = $node_subscriber2->safe_psql('postgres', + "SELECT * FROM sub2_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;" +); +is( $result, qq(tab1_2|INSERT|AFTER|ROW||5 +tab1_2|UPDATE|AFTER|ROW|4|6 +tab1_2|UPDATE|AFTER|ROW|5|6 +tab1_2|UPDATE|AFTER|ROW|6|4), + 'check replica update after trigger applied on subscriber'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_def ORDER BY 1"); +is($result, qq(sub2_tab1_def|0), 'tab1_def unchanged'); + +# update (replicated as delete+insert) +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 1 WHERE a = 0"); +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 4 WHERE a = 1"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is( $result, qq(sub1_tab1|2 +sub1_tab1|3 +sub1_tab1|4 +sub1_tab1|6), + 'update of tab1 (delete from tab1_def + insert into tab1_1) replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT a FROM tab1_2_2 ORDER BY 1"); +is( $result, qq(4 +6), 'updates of tab1 (delete + insert) replicated into tab1_2_2 correctly'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_1 ORDER BY 1, 2"); +is( $result, qq(sub2_tab1_1|2 +sub2_tab1_1|3), 'tab1_1 unchanged'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1_2 ORDER BY 1, 2"); +is( $result, qq(sub2_tab1_2|4 +sub2_tab1_2|6), 'insert into tab1_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a FROM tab1_def ORDER BY 1"); +is($result, qq(), 'delete from tab1_def replicated'); + +# delete +$node_publisher->safe_psql('postgres', + "DELETE FROM tab1 WHERE a IN (2, 3, 5)"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab1_2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab1"); +is($result, qq(), 'delete from tab1_1, tab1_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1_1"); +is($result, qq(), 'delete from tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1_2"); +is($result, qq(), 'delete from tab1_2 replicated'); + +# truncate +$node_subscriber1->safe_psql('postgres', + "INSERT INTO tab1 (a) VALUES (1), (2), (5)"); +$node_subscriber2->safe_psql('postgres', "INSERT INTO tab1_2 (a) VALUES (2)"); +$node_publisher->safe_psql('postgres', "TRUNCATE tab1_2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = + $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1"); +is( $result, qq(1 +2), 'truncate of tab1_2 replicated'); + +$result = + $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1_2 ORDER BY 1"); +is($result, qq(), 'truncate of tab1_2 replicated'); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = + $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1"); +is($result, qq(), 'truncate of tab1_1 replicated'); +$result = + $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1"); +is($result, qq(), 'truncate of tab1 replicated'); + +# Check that subscriber handles cases where update/delete target tuple +# is missing. We have to look for the DEBUG1 log messages about that, +# so temporarily bump up the log verbosity. +$node_subscriber1->append_conf('postgresql.conf', + "log_min_messages = debug1"); +$node_subscriber1->reload; + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1, 'foo'), (4, 'bar'), (10, 'baz')"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$node_subscriber1->safe_psql('postgres', "DELETE FROM tab1"); + +# Note that the current location of the log file is not grabbed immediately +# after reloading the configuration, but after sending one SQL command to +# the node so as we are sure that the reloading has taken effect. +my $log_location = -s $node_subscriber1->logfile; + +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET b = 'quux' WHERE a = 4"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab1"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +my $logfile = slurp_file($node_subscriber1->logfile(), $log_location); +ok( $logfile =~ + qr/logical replication did not find row to be updated in replication target relation's partition "tab1_2_2"/, + 'update target row is missing in tab1_2_2'); +ok( $logfile =~ + qr/logical replication did not find row to be deleted in replication target relation "tab1_1"/, + 'delete target row is missing in tab1_1'); +ok( $logfile =~ + qr/logical replication did not find row to be deleted in replication target relation "tab1_2_2"/, + 'delete target row is missing in tab1_2_2'); +ok( $logfile =~ + qr/logical replication did not find row to be deleted in replication target relation "tab1_def"/, + 'delete target row is missing in tab1_def'); + +$node_subscriber1->append_conf('postgresql.conf', + "log_min_messages = warning"); +$node_subscriber1->reload; + +# Tests for replication using root table identity and schema + +# publisher +$node_publisher->safe_psql('postgres', "DROP PUBLICATION pub1"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2_1 (b text, a int NOT NULL)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES IN (0, 1, 2, 3)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2_2 PARTITION OF tab2 FOR VALUES IN (5, 6)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3_1 PARTITION OF tab3 FOR VALUES IN (0, 1, 2, 3, 5, 6)"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)"); +# Note: tab3_1's parent is not in the publication, in which case its +# changes are published using own identity. +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab3_1 WITH (publish_via_partition_root = true)" +); + +# subscriber 1 +$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION sub1"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub1_tab2', b text) PARTITION BY RANGE (a)" +); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab2_1 (c text DEFAULT 'sub1_tab2', b text, a int NOT NULL)" +); +$node_subscriber1->safe_psql('postgres', + "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES FROM (0) TO (10)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab3_1 (c text DEFAULT 'sub1_tab3_1', b text, a int NOT NULL PRIMARY KEY)" +); +$node_subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION sub_viaroot CONNECTION '$publisher_connstr' PUBLICATION pub_viaroot" +); + +# subscriber 2 +$node_subscriber2->safe_psql('postgres', "DROP TABLE tab1"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text) PARTITION BY HASH (a)" +); +# Note: tab1's partitions are named tab1_1 and tab1_2 on the publisher. +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_part1 (b text, c text, a int NOT NULL)"); +$node_subscriber2->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_part1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_part2 PARTITION OF tab1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab2', b text)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3', b text)" +); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3_1', b text)" +); +# Publication that sub2 points to now publishes via root, so must update +# subscription target relations. +$node_subscriber2->safe_psql('postgres', + "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION"); + +# Wait for initial sync of all subscriptions +$node_subscriber1->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# insert +$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (0)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab1_1 (a) VALUES (3)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab1_2 VALUES (5)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 VALUES (1), (0), (3), (5)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab3 VALUES (1), (0), (3), (5)"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is( $result, qq(sub1_tab2|0 +sub1_tab2|1 +sub1_tab2|3 +sub1_tab2|5), 'inserts into tab2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab3_1 ORDER BY 1, 2"); +is( $result, qq(sub1_tab3_1|0 +sub1_tab3_1|1 +sub1_tab3_1|3 +sub1_tab3_1|5), 'inserts into tab3_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is( $result, qq(sub2_tab1|0 +sub2_tab1|1 +sub2_tab1|3 +sub2_tab1|5), 'inserts into tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is( $result, qq(sub2_tab2|0 +sub2_tab2|1 +sub2_tab2|3 +sub2_tab2|5), 'inserts into tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab3 ORDER BY 1, 2"); +is( $result, qq(sub2_tab3|0 +sub2_tab3|1 +sub2_tab3|3 +sub2_tab3|5), 'inserts into tab3 replicated'); + +# update (replicated as update) +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5"); +$node_publisher->safe_psql('postgres', "UPDATE tab2 SET a = 6 WHERE a = 5"); +$node_publisher->safe_psql('postgres', "UPDATE tab3 SET a = 6 WHERE a = 5"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is( $result, qq(sub1_tab2|0 +sub1_tab2|1 +sub1_tab2|3 +sub1_tab2|6), 'update of tab2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab3_1 ORDER BY 1, 2"); +is( $result, qq(sub1_tab3_1|0 +sub1_tab3_1|1 +sub1_tab3_1|3 +sub1_tab3_1|6), 'update of tab3_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is( $result, qq(sub2_tab1|0 +sub2_tab1|1 +sub2_tab1|3 +sub2_tab1|6), 'inserts into tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is( $result, qq(sub2_tab2|0 +sub2_tab2|1 +sub2_tab2|3 +sub2_tab2|6), 'inserts into tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab3 ORDER BY 1, 2"); +is( $result, qq(sub2_tab3|0 +sub2_tab3|1 +sub2_tab3|3 +sub2_tab3|6), 'inserts into tab3 replicated'); + +# update (replicated as delete+insert) +$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 2 WHERE a = 6"); +$node_publisher->safe_psql('postgres', "UPDATE tab2 SET a = 2 WHERE a = 6"); +$node_publisher->safe_psql('postgres', "UPDATE tab3 SET a = 2 WHERE a = 6"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is( $result, qq(sub1_tab2|0 +sub1_tab2|1 +sub1_tab2|2 +sub1_tab2|3), 'update of tab2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a FROM tab3_1 ORDER BY 1, 2"); +is( $result, qq(sub1_tab3_1|0 +sub1_tab3_1|1 +sub1_tab3_1|2 +sub1_tab3_1|3), 'update of tab3_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab1 ORDER BY 1, 2"); +is( $result, qq(sub2_tab1|0 +sub2_tab1|1 +sub2_tab1|2 +sub2_tab1|3), 'update of tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab2 ORDER BY 1, 2"); +is( $result, qq(sub2_tab2|0 +sub2_tab2|1 +sub2_tab2|2 +sub2_tab2|3), 'update of tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a FROM tab3 ORDER BY 1, 2"); +is( $result, qq(sub2_tab3|0 +sub2_tab3|1 +sub2_tab3|2 +sub2_tab3|3), 'update of tab3 replicated'); + +# delete +$node_publisher->safe_psql('postgres', "DELETE FROM tab1"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab2"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab3"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab2"); +is($result, qq(), 'delete tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1"); +is($result, qq(), 'delete from tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab2"); +is($result, qq(), 'delete from tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab3"); +is($result, qq(), 'delete from tab3 replicated'); + +# truncate +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (2), (5)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 VALUES (1), (2), (5)"); +# these will NOT be replicated +$node_publisher->safe_psql('postgres', "TRUNCATE tab1_2, tab2_1, tab3_1"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = + $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab2 ORDER BY 1"); +is( $result, qq(1 +2 +5), 'truncate of tab2_1 NOT replicated'); + +$result = + $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1"); +is( $result, qq(1 +2 +5), 'truncate of tab1_2 NOT replicated'); + +$result = + $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab2 ORDER BY 1"); +is( $result, qq(1 +2 +5), 'truncate of tab2_1 NOT replicated'); + +$node_publisher->safe_psql('postgres', "TRUNCATE tab1, tab2, tab3"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab2"); +is($result, qq(), 'truncate of tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1"); +is($result, qq(), 'truncate of tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab2"); +is($result, qq(), 'truncate of tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab3"); +is($result, qq(), 'truncate of tab3 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab3_1"); +is($result, qq(), 'truncate of tab3_1 replicated'); + +# check that the map to convert tuples from leaf partition to the root +# table is correctly rebuilt when a new column is added +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab2 DROP b, ADD COLUMN c text DEFAULT 'pub_tab2', ADD b text"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 (a, b) VALUES (1, 'xxx'), (3, 'yyy'), (5, 'zzz')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 (a, b, c) VALUES (6, 'aaa', 'xxx_c')"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, a, b FROM tab2 ORDER BY 1, 2"); +is( $result, qq(pub_tab2|1|xxx +pub_tab2|3|yyy +pub_tab2|5|zzz +xxx_c|6|aaa), 'inserts into tab2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, a, b FROM tab2 ORDER BY 1, 2"); +is( $result, qq(pub_tab2|1|xxx +pub_tab2|3|yyy +pub_tab2|5|zzz +xxx_c|6|aaa), 'inserts into tab2 replicated'); + +# Check that subscriber handles cases where update/delete target tuple +# is missing. We have to look for the DEBUG1 log messages about that, +# so temporarily bump up the log verbosity. +$node_subscriber1->append_conf('postgresql.conf', + "log_min_messages = debug1"); +$node_subscriber1->reload; + +$node_subscriber1->safe_psql('postgres', "DELETE FROM tab2"); + +# Note that the current location of the log file is not grabbed immediately +# after reloading the configuration, but after sending one SQL command to +# the node so as we are sure that the reloading has taken effect. +$log_location = -s $node_subscriber1->logfile; + +$node_publisher->safe_psql('postgres', + "UPDATE tab2 SET b = 'quux' WHERE a = 5"); +$node_publisher->safe_psql('postgres', "DELETE FROM tab2 WHERE a = 1"); + +$node_publisher->wait_for_catchup('sub_viaroot'); +$node_publisher->wait_for_catchup('sub2'); + +$logfile = slurp_file($node_subscriber1->logfile(), $log_location); +ok( $logfile =~ + qr/logical replication did not find row to be updated in replication target relation's partition "tab2_1"/, + 'update target row is missing in tab2_1'); +ok( $logfile =~ + qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/, + 'delete target row is missing in tab2_1'); + +# No need for this until more tests are added. +# $node_subscriber1->append_conf('postgresql.conf', +# "log_min_messages = warning"); +# $node_subscriber1->reload; diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl new file mode 100644 index 0000000..d1e407a --- /dev/null +++ b/src/test/subscription/t/100_bugs.pl @@ -0,0 +1,155 @@ +# Tests for various bugs found over time +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 5; + +# Bug #15114 + +# The bug was that determining which columns are part of the replica +# identity index using RelationGetIndexAttrBitmap() would run +# eval_const_expressions() on index expressions and predicates across +# all indexes of the table, which in turn might require a snapshot, +# but there wasn't one set, so it crashes. There were actually two +# separate bugs, one on the publisher and one on the subscriber. The +# fix was to avoid the constant expressions simplification in +# RelationGetIndexAttrBitmap(), so it's safe to call in more contexts. + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b int)"); + +$node_publisher->safe_psql('postgres', + "CREATE FUNCTION double(x int) RETURNS int IMMUTABLE LANGUAGE SQL AS 'select x * 2'" +); + +# an index with a predicate that lends itself to constant expressions +# evaluation +$node_publisher->safe_psql('postgres', + "CREATE INDEX ON tab1 (b) WHERE a > double(1)"); + +# and the same setup on the subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b int)"); + +$node_subscriber->safe_psql('postgres', + "CREATE FUNCTION double(x int) RETURNS int IMMUTABLE LANGUAGE SQL AS 'select x * 2'" +); + +$node_subscriber->safe_psql('postgres', + "CREATE INDEX ON tab1 (b) WHERE a > double(1)"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1 FOR ALL TABLES"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" +); + +$node_publisher->wait_for_catchup('sub1'); + +# This would crash, first on the publisher, and then (if the publisher +# is fixed) on the subscriber. +$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1, 2)"); + +$node_publisher->wait_for_catchup('sub1'); + +pass('index predicates do not cause crash'); + +$node_publisher->stop('fast'); +$node_subscriber->stop('fast'); + + +# Handling of temporary and unlogged tables with FOR ALL TABLES publications + +# If a FOR ALL TABLES publication exists, temporary and unlogged +# tables are ignored for publishing changes. The bug was that we +# would still check in that case that such a table has a replica +# identity set before accepting updates. If it did not it would cause +# an error when an update was attempted. + +$node_publisher = get_new_node('publisher2'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub FOR ALL TABLES"); + +is( $node_publisher->psql( + 'postgres', + "CREATE TEMPORARY TABLE tt1 AS SELECT 1 AS a; UPDATE tt1 SET a = 2;"), + 0, + 'update to temporary table without replica identity with FOR ALL TABLES publication' +); + +is( $node_publisher->psql( + 'postgres', + "CREATE UNLOGGED TABLE tu1 AS SELECT 1 AS a; UPDATE tu1 SET a = 2;"), + 0, + 'update to unlogged table without replica identity with FOR ALL TABLES publication' +); + +$node_publisher->stop('fast'); + +# Bug #16643 - https://postgr.es/m/16643-eaadeb2a1a58d28c@postgresql.org +# +# Initial sync doesn't complete; the protocol was not being followed per +# expectations after commit 07082b08cc5d. +my $node_twoways = get_new_node('twoways'); +$node_twoways->init(allows_streaming => 'logical'); +$node_twoways->start; +for my $db (qw(d1 d2)) +{ + $node_twoways->safe_psql('postgres', "CREATE DATABASE $db"); + $node_twoways->safe_psql($db, "CREATE TABLE t (f int)"); + $node_twoways->safe_psql($db, "CREATE TABLE t2 (f int)"); +} + +my $rows = 3000; +$node_twoways->safe_psql( + 'd1', qq{ + INSERT INTO t SELECT * FROM generate_series(1, $rows); + INSERT INTO t2 SELECT * FROM generate_series(1, $rows); + CREATE PUBLICATION testpub FOR TABLE t; + SELECT pg_create_logical_replication_slot('testslot', 'pgoutput'); + }); + +$node_twoways->safe_psql('d2', + "CREATE SUBSCRIPTION testsub CONNECTION \$\$" + . $node_twoways->connstr('d1') + . "\$\$ PUBLICATION testpub WITH (create_slot=false, " + . "slot_name='testslot')"); +$node_twoways->safe_psql( + 'd1', qq{ + INSERT INTO t SELECT * FROM generate_series(1, $rows); + INSERT INTO t2 SELECT * FROM generate_series(1, $rows); + }); +$node_twoways->safe_psql( + 'd1', 'ALTER PUBLICATION testpub ADD TABLE t2'); +$node_twoways->safe_psql( + 'd2', 'ALTER SUBSCRIPTION testsub REFRESH PUBLICATION'); + +# We cannot rely solely on wait_for_catchup() here; it isn't sufficient +# when tablesync workers might still be running. So in addition to that, +# verify that tables are synced. +# XXX maybe this should be integrated in wait_for_catchup() itself. +$node_twoways->wait_for_catchup('testsub'); +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_twoways->poll_query_until('d2', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"), + $rows * 2, "2x$rows rows in t"); +is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t2"), + $rows * 2, "2x$rows rows in t2"); |