summaryrefslogtreecommitdiffstats
path: root/src/test/subscription
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/test/subscription/.gitignore2
-rw-r--r--src/test/subscription/Makefile27
-rw-r--r--src/test/subscription/README23
-rw-r--r--src/test/subscription/t/001_rep_changes.pl539
-rw-r--r--src/test/subscription/t/002_types.pl568
-rw-r--r--src/test/subscription/t/003_constraints.pl139
-rw-r--r--src/test/subscription/t/004_sync.pl180
-rw-r--r--src/test/subscription/t/005_encoding.pl55
-rw-r--r--src/test/subscription/t/006_rewrite.pl68
-rw-r--r--src/test/subscription/t/007_ddl.pl45
-rw-r--r--src/test/subscription/t/008_diff_schema.pl128
-rw-r--r--src/test/subscription/t/009_matviews.pl52
-rw-r--r--src/test/subscription/t/010_truncate.pl241
-rw-r--r--src/test/subscription/t/011_generated.pl66
-rw-r--r--src/test/subscription/t/012_collation.pl110
-rw-r--r--src/test/subscription/t/013_partition.pl872
-rw-r--r--src/test/subscription/t/014_binary.pl137
-rw-r--r--src/test/subscription/t/015_stream.pl135
-rw-r--r--src/test/subscription/t/016_stream_subxact.pl93
-rw-r--r--src/test/subscription/t/017_stream_ddl.pl129
-rw-r--r--src/test/subscription/t/018_stream_subxact_abort.pl133
-rw-r--r--src/test/subscription/t/019_stream_subxact_ddl_abort.pl87
-rw-r--r--src/test/subscription/t/020_messages.pl148
-rw-r--r--src/test/subscription/t/021_alter_sub_pub.pl98
-rw-r--r--src/test/subscription/t/100_bugs.pl308
25 files changed, 4383 insertions, 0 deletions
diff --git a/src/test/subscription/.gitignore b/src/test/subscription/.gitignore
new file mode 100644
index 0000000..871e943
--- /dev/null
+++ b/src/test/subscription/.gitignore
@@ -0,0 +1,2 @@
+# Generated by test suite
+/tmp_check/
diff --git a/src/test/subscription/Makefile b/src/test/subscription/Makefile
new file mode 100644
index 0000000..28fd6f6
--- /dev/null
+++ b/src/test/subscription/Makefile
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/test/subscription
+#
+# Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+# Portions Copyright (c) 1994, Regents of the University of California
+#
+# src/test/subscription/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/test/subscription
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+EXTRA_INSTALL = contrib/hstore
+
+export with_icu
+
+check:
+ $(prove_check)
+
+installcheck:
+ $(prove_installcheck)
+
+clean distclean maintainer-clean:
+ rm -rf tmp_check
diff --git a/src/test/subscription/README b/src/test/subscription/README
new file mode 100644
index 0000000..fb5382e
--- /dev/null
+++ b/src/test/subscription/README
@@ -0,0 +1,23 @@
+src/test/subscription/README
+
+Regression tests for subscription/logical replication
+=====================================================
+
+This directory contains a test suite for subscription/logical replication.
+
+Running the tests
+=================
+
+NOTE: You must have given the --enable-tap-tests argument to configure.
+
+Run
+ make check
+or
+ make installcheck
+You can use "make installcheck" if you previously did "make install"
+(including installing the hstore extension). In that case, the code
+in the installation tree is tested. With "make check", a temporary
+installation tree is built from the current sources and then tested.
+
+Either way, this test initializes, starts, and stops several test Postgres
+clusters.
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
new file mode 100644
index 0000000..7dd69ca
--- /dev/null
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -0,0 +1,539 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 32;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql(
+ 'postgres',
+ "CREATE FUNCTION public.pg_get_replica_identity_index(int)
+ RETURNS regclass LANGUAGE sql AS 'SELECT 1/0'"); # shall not call
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a");
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_rep (a int primary key)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_mixed (a int primary key, b text, c numeric)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_mixed (a, b, c) VALUES (1, 'foo', 1.1)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))"
+);
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_full_pk (a int primary key, b text)");
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE tab_full_pk REPLICA IDENTITY FULL");
+# Let this table with REPLICA IDENTITY NOTHING, allowing only INSERT changes.
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_nothing (a int)");
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE tab_nothing REPLICA IDENTITY NOTHING");
+
+# Replicate the changes without replica identity index
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_no_replidentity_index(c1 int)");
+$node_publisher->safe_psql('postgres',
+ "CREATE INDEX idx_no_replidentity_index ON tab_no_replidentity_index(c1)"
+);
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_rep (a int primary key)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_full_pk (a int primary key, b text)");
+$node_subscriber->safe_psql('postgres',
+ "ALTER TABLE tab_full_pk REPLICA IDENTITY FULL");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_nothing (a int)");
+
+# different column count and order than on publisher
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_mixed (d text default 'local', c numeric, b text, a int primary key)"
+);
+
+# replication of the table with included index
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_include (a int, b text, CONSTRAINT covering PRIMARY KEY(a) INCLUDE(b))"
+);
+
+# replication of the table without replica identity index
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_no_replidentity_index(c1 int)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE INDEX idx_no_replidentity_index ON tab_no_replidentity_index(c1)"
+);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub_ins_only WITH (publish = insert)");
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_mixed, tab_include, tab_nothing, tab_full_pk, tab_no_replidentity_index"
+);
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
+);
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Also wait for initial table sync to finish
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
+is($result, qq(0), 'check non-replicated table is empty on subscriber');
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins");
+is($result, qq(1002), 'check initial data was copied to subscriber');
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_ins SELECT generate_series(1,50)");
+$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20");
+$node_publisher->safe_psql('postgres', "UPDATE tab_ins SET a = -a");
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_rep SELECT generate_series(1,50)");
+$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep WHERE a > 20");
+$node_publisher->safe_psql('postgres', "UPDATE tab_rep SET a = -a");
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_mixed VALUES (2, 'bar', 2.2)");
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_full_pk VALUES (1, 'foo'), (2, 'baz')");
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_nothing VALUES (generate_series(1,20))");
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_include SELECT generate_series(1,50)");
+$node_publisher->safe_psql('postgres',
+ "DELETE FROM tab_include WHERE a > 20");
+$node_publisher->safe_psql('postgres', "UPDATE tab_include SET a = -a");
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_no_replidentity_index VALUES(1)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_ins");
+is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_rep");
+is($result, qq(20|-20|-1), 'check replicated changes on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_mixed");
+is( $result, qq(local|1.1|foo|1
+local|2.2|bar|2), 'check replicated changes with different column order');
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_nothing");
+is($result, qq(20), 'check replicated changes with REPLICA IDENTITY NOTHING');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_include");
+is($result, qq(20|-20|-1),
+ 'check replicated changes with primary key index with included columns');
+
+is( $node_subscriber->safe_psql(
+ 'postgres', q(SELECT c1 FROM tab_no_replidentity_index)),
+ 1,
+ "value replicated to subscriber without replica identity index");
+
+# insert some duplicate rows
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_full SELECT generate_series(1,10)");
+
+# Test behaviour of ALTER PUBLICATION ... DROP TABLE
+#
+# When a publisher drops a table from publication, it should also stop sending
+# its changes to subscribers. We look at the subscriber whether it receives
+# the row that is inserted to the table in the publisher after it is dropped
+# from the publication.
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_ins");
+is($result, qq(1052|1|1002),
+ 'check rows on subscriber before table drop from publication');
+
+# Drop the table from publication
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub_ins_only DROP TABLE tab_ins");
+
+# Insert a row in publisher, but publisher will not send this row to subscriber
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_ins VALUES(8888)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Subscriber will not receive the inserted row, after table is dropped from
+# publication, so row count should remain the same.
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_ins");
+is($result, qq(1052|1|1002),
+ 'check rows on subscriber after table drop from publication');
+
+# Delete the inserted row in publisher
+$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a = 8888");
+
+# Add the table to publication again
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins");
+
+# Refresh publication after table is added to publication
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
+
+# Test replication with multiple publications for a subscription such that the
+# operations are performed on the table from the first publication in the list.
+
+# Create tables on publisher
+$node_publisher->safe_psql('postgres', "CREATE TABLE temp1 (a int)");
+$node_publisher->safe_psql('postgres', "CREATE TABLE temp2 (a int)");
+
+# Create tables on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE temp1 (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE temp2 (a int)");
+
+# Setup logical replication that will only be used for this test
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub_temp1 FOR TABLE temp1 WITH (publish = insert)"
+);
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub_temp2 FOR TABLE temp2");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub_temp1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub_temp1, tap_pub_temp2"
+);
+
+$node_publisher->wait_for_catchup('tap_sub_temp1');
+
+# Also wait for initial table sync to finish
+$synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscriber table will have no rows initially
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM temp1");
+is($result, qq(0),
+ 'check initial rows on subscriber with multiple publications');
+
+# Insert a row into the table that's part of first publication in subscriber
+# list of publications.
+$node_publisher->safe_psql('postgres', "INSERT INTO temp1 VALUES (1)");
+
+$node_publisher->wait_for_catchup('tap_sub_temp1');
+
+# Subscriber should receive the inserted row
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM temp1");
+is($result, qq(1), 'check rows on subscriber with multiple publications');
+
+# Drop subscription as we don't need it anymore
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_temp1");
+
+# Drop publications as we don't need them anymore
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_temp1");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_temp2");
+
+# Clean up the tables on both publisher and subscriber as we don't need them
+$node_publisher->safe_psql('postgres', "DROP TABLE temp1");
+$node_publisher->safe_psql('postgres', "DROP TABLE temp2");
+$node_subscriber->safe_psql('postgres', "DROP TABLE temp1");
+$node_subscriber->safe_psql('postgres', "DROP TABLE temp2");
+
+# add REPLICA IDENTITY FULL so we can update
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE tab_full REPLICA IDENTITY FULL");
+$node_subscriber->safe_psql('postgres',
+ "ALTER TABLE tab_full REPLICA IDENTITY FULL");
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE tab_full2 REPLICA IDENTITY FULL");
+$node_subscriber->safe_psql('postgres',
+ "ALTER TABLE tab_full2 REPLICA IDENTITY FULL");
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE tab_ins REPLICA IDENTITY FULL");
+$node_subscriber->safe_psql('postgres',
+ "ALTER TABLE tab_ins REPLICA IDENTITY FULL");
+# tab_mixed can use DEFAULT, since it has a primary key
+
+# and do the updates
+$node_publisher->safe_psql('postgres', "UPDATE tab_full SET a = a * a");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_mixed SET b = 'baz' WHERE a = 1");
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_full_pk SET b = 'bar' WHERE a = 1");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_full");
+is($result, qq(20|1|100),
+ 'update works with REPLICA IDENTITY FULL and duplicate tuples');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT x FROM tab_full2 ORDER BY 1");
+is( $result, qq(a
+bb
+bb),
+ 'update works with REPLICA IDENTITY FULL and text datums');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab_mixed ORDER BY a");
+is( $result, qq(local|1.1|baz|1
+local|2.2|bar|2),
+ 'update works with different column order and subscriber local values');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab_full_pk ORDER BY a");
+is( $result, qq(1|bar
+2|baz),
+ 'update works with REPLICA IDENTITY FULL and a primary key');
+
+# Check that subscriber handles cases where update/delete target tuple
+# is missing. We have to look for the DEBUG1 log messages about that,
+# so temporarily bump up the log verbosity.
+$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
+$node_subscriber->reload;
+
+$node_subscriber->safe_psql('postgres', "DELETE FROM tab_full_pk");
+
+# Note that the current location of the log file is not grabbed immediately
+# after reloading the configuration, but after sending one SQL command to
+# the node so as we are sure that the reloading has taken effect.
+my $log_location = -s $node_subscriber->logfile;
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_full_pk SET b = 'quux' WHERE a = 1");
+$node_publisher->safe_psql('postgres', "DELETE FROM tab_full_pk WHERE a = 2");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+my $logfile = slurp_file($node_subscriber->logfile, $log_location);
+ok( $logfile =~
+ qr/logical replication did not find row to be updated in replication target relation "tab_full_pk"/,
+ 'update target row is missing');
+ok( $logfile =~
+ qr/logical replication did not find row to be deleted in replication target relation "tab_full_pk"/,
+ 'delete target row is missing');
+
+$node_subscriber->append_conf('postgresql.conf',
+ "log_min_messages = warning");
+$node_subscriber->reload;
+
+# check behavior with toasted values
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_mixed SET b = repeat('xyzzy', 100000) WHERE a = 2");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT a, length(b), c, d FROM tab_mixed ORDER BY a");
+is( $result, qq(1|3|1.1|local
+2|500000|2.2|local),
+ 'update transmits large column value');
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_mixed SET c = 3.3 WHERE a = 2");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT a, length(b), c, d FROM tab_mixed ORDER BY a");
+is( $result, qq(1|3|1.1|local
+2|500000|3.3|local),
+ 'update with non-transmitted large column value');
+
+# check behavior with dropped columns
+
+# this update should get transmitted before the column goes away
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_mixed SET b = 'bar', c = 2.2 WHERE a = 2");
+
+$node_publisher->safe_psql('postgres', "ALTER TABLE tab_mixed DROP COLUMN b");
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_mixed SET c = 11.11 WHERE a = 1");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab_mixed ORDER BY a");
+is( $result, qq(local|11.11|baz|1
+local|2.2|bar|2),
+ 'update works with dropped publisher column');
+
+$node_subscriber->safe_psql('postgres',
+ "ALTER TABLE tab_mixed DROP COLUMN d");
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_mixed SET c = 22.22 WHERE a = 2");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT * FROM tab_mixed ORDER BY a");
+is( $result, qq(11.11|baz|1
+22.22|bar|2),
+ 'update works with dropped subscriber column');
+
+# check that change of connection string and/or publication list causes
+# restart of subscription workers. We check the state along with
+# application_name to ensure that the walsender is (re)started.
+#
+# Not all of these are registered as tests as we need to poll for a change
+# but the test suite will fail none the less when something goes wrong.
+my $oldpid = $node_publisher->safe_psql('postgres',
+ "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';"
+);
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr sslmode=disable'"
+);
+$node_publisher->poll_query_until('postgres',
+ "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';"
+) or die "Timed out while waiting for apply to restart after changing CONNECTION";
+
+$oldpid = $node_publisher->safe_psql('postgres',
+ "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';"
+);
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only WITH (copy_data = false)"
+);
+$node_publisher->poll_query_until('postgres',
+ "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';"
+) or die "Timed out while waiting for apply to restart after changing PUBLICATION";
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_ins SELECT generate_series(1001,1100)");
+$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
+
+# Restart the publisher and check the state of the subscriber which
+# should be in a streaming state after catching up.
+$node_publisher->stop('fast');
+$node_publisher->start;
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_ins");
+is($result, qq(1152|1|1100),
+ 'check replicated inserts after subscription publication change');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_rep");
+is($result, qq(20|-20|-1),
+ 'check changes skipped after subscription publication change');
+
+# check alter publication (relcache invalidation etc)
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub_ins_only SET (publish = 'insert, delete')");
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_full");
+$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 0");
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = false)"
+);
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(0)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# note that data are different on provider and subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_ins");
+is($result, qq(1052|1|1002),
+ 'check replicated deletes after alter publication');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_full");
+is($result, qq(21|0|100), 'check replicated insert after alter publication');
+
+# check restart on rename
+$oldpid = $node_publisher->safe_psql('postgres',
+ "SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub' AND state = 'streaming';"
+);
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub RENAME TO tap_sub_renamed");
+$node_publisher->poll_query_until('postgres',
+ "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub_renamed' AND state = 'streaming';"
+) or die "Timed out while waiting for apply to restart after renaming SUBSCRIPTION";
+
+# check all the cleanup
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed");
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'check subscription was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(0),
+ 'check subscription relation status was dropped on subscriber');
+
+$result = $node_publisher->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'check replication slot was dropped on publisher');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_origin");
+is($result, qq(0), 'check replication origin was dropped on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
+
+# CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING
+$node_publisher->append_conf(
+ 'postgresql.conf', qq(
+wal_level=minimal
+max_wal_senders=0
+));
+$node_publisher->start;
+($result, my $retout, my $reterr) = $node_publisher->psql(
+ 'postgres', qq{
+BEGIN;
+CREATE TABLE skip_wal();
+CREATE PUBLICATION tap_pub2 FOR TABLE skip_wal;
+ROLLBACK;
+});
+ok( $reterr =~
+ m/WARNING: wal_level is insufficient to publish logical changes/,
+ 'CREATE PUBLICATION while wal_level=minimal');
diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl
new file mode 100644
index 0000000..f915fad
--- /dev/null
+++ b/src/test/subscription/t/002_types.pl
@@ -0,0 +1,568 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# This tests that more complex datatypes are replicated correctly
+# by logical replication
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 4;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+my $ddl = qq(
+ CREATE EXTENSION hstore WITH SCHEMA public;
+ CREATE TABLE public.tst_one_array (
+ a INTEGER PRIMARY KEY,
+ b INTEGER[]
+ );
+ CREATE TABLE public.tst_arrays (
+ a INTEGER[] PRIMARY KEY,
+ b TEXT[],
+ c FLOAT[],
+ d INTERVAL[]
+ );
+
+ CREATE TYPE public.tst_enum_t AS ENUM ('a', 'b', 'c', 'd', 'e');
+ CREATE TABLE public.tst_one_enum (
+ a INTEGER PRIMARY KEY,
+ b public.tst_enum_t
+ );
+ CREATE TABLE public.tst_enums (
+ a public.tst_enum_t PRIMARY KEY,
+ b public.tst_enum_t[]
+ );
+
+ CREATE TYPE public.tst_comp_basic_t AS (a FLOAT, b TEXT, c INTEGER);
+ CREATE TYPE public.tst_comp_enum_t AS (a FLOAT, b public.tst_enum_t, c INTEGER);
+ CREATE TYPE public.tst_comp_enum_array_t AS (a FLOAT, b public.tst_enum_t[], c INTEGER);
+ CREATE TABLE public.tst_one_comp (
+ a INTEGER PRIMARY KEY,
+ b public.tst_comp_basic_t
+ );
+ CREATE TABLE public.tst_comps (
+ a public.tst_comp_basic_t PRIMARY KEY,
+ b public.tst_comp_basic_t[]
+ );
+ CREATE TABLE public.tst_comp_enum (
+ a INTEGER PRIMARY KEY,
+ b public.tst_comp_enum_t
+ );
+ CREATE TABLE public.tst_comp_enum_array (
+ a public.tst_comp_enum_t PRIMARY KEY,
+ b public.tst_comp_enum_t[]
+ );
+ CREATE TABLE public.tst_comp_one_enum_array (
+ a INTEGER PRIMARY KEY,
+ b public.tst_comp_enum_array_t
+ );
+ CREATE TABLE public.tst_comp_enum_what (
+ a public.tst_comp_enum_array_t PRIMARY KEY,
+ b public.tst_comp_enum_array_t[]
+ );
+
+ CREATE TYPE public.tst_comp_mix_t AS (
+ a public.tst_comp_basic_t,
+ b public.tst_comp_basic_t[],
+ c public.tst_enum_t,
+ d public.tst_enum_t[]
+ );
+ CREATE TABLE public.tst_comp_mix_array (
+ a public.tst_comp_mix_t PRIMARY KEY,
+ b public.tst_comp_mix_t[]
+ );
+ CREATE TABLE public.tst_range (
+ a INTEGER PRIMARY KEY,
+ b int4range
+ );
+ CREATE TABLE public.tst_range_array (
+ a INTEGER PRIMARY KEY,
+ b TSTZRANGE,
+ c int8range[]
+ );
+ CREATE TABLE public.tst_hstore (
+ a INTEGER PRIMARY KEY,
+ b public.hstore
+ );
+
+ SET check_function_bodies=off;
+ CREATE FUNCTION public.monot_incr(int) RETURNS bool LANGUAGE sql
+ AS ' select \$1 > max(a) from public.tst_dom_constr; ';
+ CREATE DOMAIN monot_int AS int CHECK (monot_incr(VALUE));
+ CREATE TABLE public.tst_dom_constr (a monot_int););
+
+# Setup structure on both nodes
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR ALL TABLES");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
+);
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Wait for initial sync to finish as well
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Insert initial test data
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ -- test_tbl_one_array_col
+ INSERT INTO tst_one_array (a, b) VALUES
+ (1, '{1, 2, 3}'),
+ (2, '{2, 3, 1}'),
+ (3, '{3, 2, 1}'),
+ (4, '{4, 3, 2}'),
+ (5, '{5, NULL, 3}');
+
+ -- test_tbl_arrays
+ INSERT INTO tst_arrays (a, b, c, d) VALUES
+ ('{1, 2, 3}', '{"a", "b", "c"}', '{1.1, 2.2, 3.3}', '{"1 day", "2 days", "3 days"}'),
+ ('{2, 3, 1}', '{"b", "c", "a"}', '{2.2, 3.3, 1.1}', '{"2 minutes", "3 minutes", "1 minute"}'),
+ ('{3, 1, 2}', '{"c", "a", "b"}', '{3.3, 1.1, 2.2}', '{"3 years", "1 year", "2 years"}'),
+ ('{4, 1, 2}', '{"d", "a", "b"}', '{4.4, 1.1, 2.2}', '{"4 years", "1 year", "2 years"}'),
+ ('{5, NULL, NULL}', '{"e", NULL, "b"}', '{5.5, 1.1, NULL}', '{"5 years", NULL, NULL}');
+
+ -- test_tbl_single_enum
+ INSERT INTO tst_one_enum (a, b) VALUES
+ (1, 'a'),
+ (2, 'b'),
+ (3, 'c'),
+ (4, 'd'),
+ (5, NULL);
+
+ -- test_tbl_enums
+ INSERT INTO tst_enums (a, b) VALUES
+ ('a', '{b, c}'),
+ ('b', '{c, a}'),
+ ('c', '{b, a}'),
+ ('d', '{c, b}'),
+ ('e', '{d, NULL}');
+
+ -- test_tbl_single_composites
+ INSERT INTO tst_one_comp (a, b) VALUES
+ (1, ROW(1.0, 'a', 1)),
+ (2, ROW(2.0, 'b', 2)),
+ (3, ROW(3.0, 'c', 3)),
+ (4, ROW(4.0, 'd', 4)),
+ (5, ROW(NULL, NULL, 5));
+
+ -- test_tbl_composites
+ INSERT INTO tst_comps (a, b) VALUES
+ (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]),
+ (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]),
+ (ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_basic_t]),
+ (ROW(4.0, 'd', 4), ARRAY[ROW(4, 'd', 3)::tst_comp_basic_t]),
+ (ROW(5.0, 'e', NULL), ARRAY[NULL, ROW(5, NULL, 5)::tst_comp_basic_t]);
+
+ -- test_tbl_composite_with_enums
+ INSERT INTO tst_comp_enum (a, b) VALUES
+ (1, ROW(1.0, 'a', 1)),
+ (2, ROW(2.0, 'b', 2)),
+ (3, ROW(3.0, 'c', 3)),
+ (4, ROW(4.0, 'd', 4)),
+ (5, ROW(NULL, 'e', NULL));
+
+ -- test_tbl_composite_with_enums_array
+ INSERT INTO tst_comp_enum_array (a, b) VALUES
+ (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]),
+ (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]),
+ (ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_enum_t]),
+ (ROW(4.0, 'd', 3), ARRAY[ROW(3, 'd', 3)::tst_comp_enum_t]),
+ (ROW(5.0, 'e', 3), ARRAY[ROW(3, 'e', 3)::tst_comp_enum_t, NULL]);
+
+ -- test_tbl_composite_with_single_enums_array_in_composite
+ INSERT INTO tst_comp_one_enum_array (a, b) VALUES
+ (1, ROW(1.0, '{a, b, c}', 1)),
+ (2, ROW(2.0, '{a, b, c}', 2)),
+ (3, ROW(3.0, '{a, b, c}', 3)),
+ (4, ROW(4.0, '{c, b, d}', 4)),
+ (5, ROW(5.0, '{NULL, e, NULL}', 5));
+
+ -- test_tbl_composite_with_enums_array_in_composite
+ INSERT INTO tst_comp_enum_what (a, b) VALUES
+ (ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]),
+ (ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]),
+ (ROW(3.0, '{c, a, b}', 1), ARRAY[ROW(3, '{c, a, b}', 1)::tst_comp_enum_array_t]),
+ (ROW(4.0, '{c, b, d}', 4), ARRAY[ROW(4, '{c, b, d}', 4)::tst_comp_enum_array_t]),
+ (ROW(5.0, '{c, NULL, b}', NULL), ARRAY[ROW(5, '{c, e, b}', 1)::tst_comp_enum_array_t]);
+
+ -- test_tbl_mixed_composites
+ INSERT INTO tst_comp_mix_array (a, b) VALUES
+ (ROW(
+ ROW(1,'a',1),
+ ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t],
+ 'a',
+ '{a,b,NULL,c}'),
+ ARRAY[
+ ROW(
+ ROW(1,'a',1),
+ ARRAY[
+ ROW(1,'a',1)::tst_comp_basic_t,
+ ROW(2,'b',2)::tst_comp_basic_t,
+ NULL
+ ],
+ 'a',
+ '{a,b,c}'
+ )::tst_comp_mix_t
+ ]
+ );
+
+ -- test_tbl_range
+ INSERT INTO tst_range (a, b) VALUES
+ (1, '[1, 10]'),
+ (2, '[2, 20]'),
+ (3, '[3, 30]'),
+ (4, '[4, 40]'),
+ (5, '[5, 50]');
+
+ -- test_tbl_range_array
+ INSERT INTO tst_range_array (a, b, c) VALUES
+ (1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'),
+ (2, tstzrange('Sat Aug 02 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}'),
+ (3, tstzrange('Fri Aug 01 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[3,4]"}'),
+ (4, tstzrange('Thu Jul 31 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[4,5]", NULL, "[40,50]"}'),
+ (5, NULL, NULL);
+
+ -- tst_hstore
+ INSERT INTO tst_hstore (a, b) VALUES
+ (1, '"a"=>"1"'),
+ (2, '"zzz"=>"foo"'),
+ (3, '"123"=>"321"'),
+ (4, '"yellow horse"=>"moaned"');
+
+ -- tst_dom_constr
+ INSERT INTO tst_dom_constr VALUES (10);
+));
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Check the data on subscriber
+my $result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SET timezone = '+2';
+ SELECT a, b FROM tst_one_array ORDER BY a;
+ SELECT a, b, c, d FROM tst_arrays ORDER BY a;
+ SELECT a, b FROM tst_one_enum ORDER BY a;
+ SELECT a, b FROM tst_enums ORDER BY a;
+ SELECT a, b FROM tst_one_comp ORDER BY a;
+ SELECT a, b FROM tst_comps ORDER BY a;
+ SELECT a, b FROM tst_comp_enum ORDER BY a;
+ SELECT a, b FROM tst_comp_enum_array ORDER BY a;
+ SELECT a, b FROM tst_comp_one_enum_array ORDER BY a;
+ SELECT a, b FROM tst_comp_enum_what ORDER BY a;
+ SELECT a, b FROM tst_comp_mix_array ORDER BY a;
+ SELECT a, b FROM tst_range ORDER BY a;
+ SELECT a, b, c FROM tst_range_array ORDER BY a;
+ SELECT a, b FROM tst_hstore ORDER BY a;
+));
+
+is( $result, '1|{1,2,3}
+2|{2,3,1}
+3|{3,2,1}
+4|{4,3,2}
+5|{5,NULL,3}
+{1,2,3}|{a,b,c}|{1.1,2.2,3.3}|{"1 day","2 days","3 days"}
+{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00}
+{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"}
+{4,1,2}|{d,a,b}|{4.4,1.1,2.2}|{"4 years","1 year","2 years"}
+{5,NULL,NULL}|{e,NULL,b}|{5.5,1.1,NULL}|{"5 years",NULL,NULL}
+1|a
+2|b
+3|c
+4|d
+5|
+a|{b,c}
+b|{c,a}
+c|{b,a}
+d|{c,b}
+e|{d,NULL}
+1|(1,a,1)
+2|(2,b,2)
+3|(3,c,3)
+4|(4,d,4)
+5|(,,5)
+(1,a,1)|{"(1,a,1)"}
+(2,b,2)|{"(2,b,2)"}
+(3,c,3)|{"(3,c,3)"}
+(4,d,4)|{"(4,d,3)"}
+(5,e,)|{NULL,"(5,,5)"}
+1|(1,a,1)
+2|(2,b,2)
+3|(3,c,3)
+4|(4,d,4)
+5|(,e,)
+(1,a,1)|{"(1,a,1)"}
+(2,b,2)|{"(2,b,2)"}
+(3,c,3)|{"(3,c,3)"}
+(4,d,3)|{"(3,d,3)"}
+(5,e,3)|{"(3,e,3)",NULL}
+1|(1,"{a,b,c}",1)
+2|(2,"{a,b,c}",2)
+3|(3,"{a,b,c}",3)
+4|(4,"{c,b,d}",4)
+5|(5,"{NULL,e,NULL}",5)
+(1,"{a,b,c}",1)|{"(1,\"{a,b,c}\",1)"}
+(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"}
+(3,"{c,a,b}",1)|{"(3,\"{c,a,b}\",1)"}
+(4,"{c,b,d}",4)|{"(4,\"{c,b,d}\",4)"}
+(5,"{c,NULL,b}",)|{"(5,\"{c,e,b}\",1)"}
+("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"}
+1|[1,11)
+2|[2,21)
+3|[3,31)
+4|[4,41)
+5|[5,51)
+1|["2014-08-04 00:00:00+02",infinity)|{"[1,3)","[10,21)"}
+2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
+3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"}
+4|["2014-07-31 00:00:00+02","2014-08-04 00:00:00+02")|{"[4,6)",NULL,"[40,51)"}
+5||
+1|"a"=>"1"
+2|"zzz"=>"foo"
+3|"123"=>"321"
+4|"yellow horse"=>"moaned"',
+ 'check replicated inserts on subscriber');
+
+# Run batch of updates
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ UPDATE tst_one_array SET b = '{4, 5, 6}' WHERE a = 1;
+ UPDATE tst_one_array SET b = '{4, 5, 6, 1}' WHERE a > 3;
+ UPDATE tst_arrays SET b = '{"1a", "2b", "3c"}', c = '{1.0, 2.0, 3.0}', d = '{"1 day 1 second", "2 days 2 seconds", "3 days 3 second"}' WHERE a = '{1, 2, 3}';
+ UPDATE tst_arrays SET b = '{"c", "d", "e"}', c = '{3.0, 4.0, 5.0}', d = '{"3 day 1 second", "4 days 2 seconds", "5 days 3 second"}' WHERE a[1] > 3;
+ UPDATE tst_one_enum SET b = 'c' WHERE a = 1;
+ UPDATE tst_one_enum SET b = NULL WHERE a > 3;
+ UPDATE tst_enums SET b = '{e, NULL}' WHERE a = 'a';
+ UPDATE tst_enums SET b = '{e, d}' WHERE a > 'c';
+ UPDATE tst_one_comp SET b = ROW(1.0, 'A', 1) WHERE a = 1;
+ UPDATE tst_one_comp SET b = ROW(NULL, 'x', -1) WHERE a > 3;
+ UPDATE tst_comps SET b = ARRAY[ROW(9, 'x', -1)::tst_comp_basic_t] WHERE (a).a = 1.0;
+ UPDATE tst_comps SET b = ARRAY[NULL, ROW(9, 'x', NULL)::tst_comp_basic_t] WHERE (a).a > 3.9;
+ UPDATE tst_comp_enum SET b = ROW(1.0, NULL, NULL) WHERE a = 1;
+ UPDATE tst_comp_enum SET b = ROW(4.0, 'd', 44) WHERE a > 3;
+ UPDATE tst_comp_enum_array SET b = ARRAY[NULL, ROW(3, 'd', 3)::tst_comp_enum_t] WHERE a = ROW(1.0, 'a', 1)::tst_comp_enum_t;
+ UPDATE tst_comp_enum_array SET b = ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t, ROW(2, 'b', 2)::tst_comp_enum_t] WHERE (a).a > 3;
+ UPDATE tst_comp_one_enum_array SET b = ROW(1.0, '{a, e, c}', NULL) WHERE a = 1;
+ UPDATE tst_comp_one_enum_array SET b = ROW(4.0, '{c, b, d}', 4) WHERE a > 3;
+ UPDATE tst_comp_enum_what SET b = ARRAY[NULL, ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t, ROW(NULL, '{a, e, c}', 2)::tst_comp_enum_array_t] WHERE (a).a = 1;
+ UPDATE tst_comp_enum_what SET b = ARRAY[ROW(5, '{a, b, c}', 5)::tst_comp_enum_array_t] WHERE (a).a > 3;
+ UPDATE tst_comp_mix_array SET b[2] = NULL WHERE ((a).a).a = 1;
+ UPDATE tst_range SET b = '[100, 1000]' WHERE a = 1;
+ UPDATE tst_range SET b = '(1, 90)' WHERE a > 3;
+ UPDATE tst_range_array SET c = '{"[100, 1000]"}' WHERE a = 1;
+ UPDATE tst_range_array SET b = tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), c = '{NULL, "[11,9999999]"}' WHERE a > 3;
+ UPDATE tst_hstore SET b = '"updated"=>"value"' WHERE a < 3;
+ UPDATE tst_hstore SET b = '"also"=>"updated"' WHERE a = 3;
+));
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SET timezone = '+2';
+ SELECT a, b FROM tst_one_array ORDER BY a;
+ SELECT a, b, c, d FROM tst_arrays ORDER BY a;
+ SELECT a, b FROM tst_one_enum ORDER BY a;
+ SELECT a, b FROM tst_enums ORDER BY a;
+ SELECT a, b FROM tst_one_comp ORDER BY a;
+ SELECT a, b FROM tst_comps ORDER BY a;
+ SELECT a, b FROM tst_comp_enum ORDER BY a;
+ SELECT a, b FROM tst_comp_enum_array ORDER BY a;
+ SELECT a, b FROM tst_comp_one_enum_array ORDER BY a;
+ SELECT a, b FROM tst_comp_enum_what ORDER BY a;
+ SELECT a, b FROM tst_comp_mix_array ORDER BY a;
+ SELECT a, b FROM tst_range ORDER BY a;
+ SELECT a, b, c FROM tst_range_array ORDER BY a;
+ SELECT a, b FROM tst_hstore ORDER BY a;
+));
+
+is( $result, '1|{4,5,6}
+2|{2,3,1}
+3|{3,2,1}
+4|{4,5,6,1}
+5|{4,5,6,1}
+{1,2,3}|{1a,2b,3c}|{1,2,3}|{"1 day 00:00:01","2 days 00:00:02","3 days 00:00:03"}
+{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00}
+{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"}
+{4,1,2}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"}
+{5,NULL,NULL}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"}
+1|c
+2|b
+3|c
+4|
+5|
+a|{e,NULL}
+b|{c,a}
+c|{b,a}
+d|{e,d}
+e|{e,d}
+1|(1,A,1)
+2|(2,b,2)
+3|(3,c,3)
+4|(,x,-1)
+5|(,x,-1)
+(1,a,1)|{"(9,x,-1)"}
+(2,b,2)|{"(2,b,2)"}
+(3,c,3)|{"(3,c,3)"}
+(4,d,4)|{NULL,"(9,x,)"}
+(5,e,)|{NULL,"(9,x,)"}
+1|(1,,)
+2|(2,b,2)
+3|(3,c,3)
+4|(4,d,44)
+5|(4,d,44)
+(1,a,1)|{NULL,"(3,d,3)"}
+(2,b,2)|{"(2,b,2)"}
+(3,c,3)|{"(3,c,3)"}
+(4,d,3)|{"(1,a,1)","(2,b,2)"}
+(5,e,3)|{"(1,a,1)","(2,b,2)"}
+1|(1,"{a,e,c}",)
+2|(2,"{a,b,c}",2)
+3|(3,"{a,b,c}",3)
+4|(4,"{c,b,d}",4)
+5|(4,"{c,b,d}",4)
+(1,"{a,b,c}",1)|{NULL,"(1,\"{a,b,c}\",1)","(,\"{a,e,c}\",2)"}
+(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"}
+(3,"{c,a,b}",1)|{"(3,\"{c,a,b}\",1)"}
+(4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"}
+(5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"}
+("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")",NULL}
+1|[100,1001)
+2|[2,21)
+3|[3,31)
+4|[2,90)
+5|[2,90)
+1|["2014-08-04 00:00:00+02",infinity)|{"[100,1001)"}
+2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
+3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"}
+4|["2014-08-04 00:00:00+02",infinity)|{NULL,"[11,10000000)"}
+5|["2014-08-04 00:00:00+02",infinity)|{NULL,"[11,10000000)"}
+1|"updated"=>"value"
+2|"updated"=>"value"
+3|"also"=>"updated"
+4|"yellow horse"=>"moaned"',
+ 'check replicated updates on subscriber');
+
+# Run batch of deletes
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ DELETE FROM tst_one_array WHERE a = 1;
+ DELETE FROM tst_one_array WHERE b = '{2, 3, 1}';
+ DELETE FROM tst_arrays WHERE a = '{1, 2, 3}';
+ DELETE FROM tst_arrays WHERE a[1] = 2;
+ DELETE FROM tst_one_enum WHERE a = 1;
+ DELETE FROM tst_one_enum WHERE b = 'b';
+ DELETE FROM tst_enums WHERE a = 'a';
+ DELETE FROM tst_enums WHERE b[1] = 'b';
+ DELETE FROM tst_one_comp WHERE a = 1;
+ DELETE FROM tst_one_comp WHERE (b).a = 2.0;
+ DELETE FROM tst_comps WHERE (a).b = 'a';
+ DELETE FROM tst_comps WHERE ROW(3, 'c', 3)::tst_comp_basic_t = ANY(b);
+ DELETE FROM tst_comp_enum WHERE a = 1;
+ DELETE FROM tst_comp_enum WHERE (b).a = 2.0;
+ DELETE FROM tst_comp_enum_array WHERE a = ROW(1.0, 'a', 1)::tst_comp_enum_t;
+ DELETE FROM tst_comp_enum_array WHERE ROW(3, 'c', 3)::tst_comp_enum_t = ANY(b);
+ DELETE FROM tst_comp_one_enum_array WHERE a = 1;
+ DELETE FROM tst_comp_one_enum_array WHERE 'a' = ANY((b).b);
+ DELETE FROM tst_comp_enum_what WHERE (a).a = 1;
+ DELETE FROM tst_comp_enum_what WHERE (b[1]).b = '{c, a, b}';
+ DELETE FROM tst_comp_mix_array WHERE ((a).a).a = 1;
+ DELETE FROM tst_range WHERE a = 1;
+ DELETE FROM tst_range WHERE '[10,20]' && b;
+ DELETE FROM tst_range_array WHERE a = 1;
+ DELETE FROM tst_range_array WHERE tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 05 00:00:00 2014 CEST'::timestamptz) && b;
+ DELETE FROM tst_hstore WHERE a = 1;
+));
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SET timezone = '+2';
+ SELECT a, b FROM tst_one_array ORDER BY a;
+ SELECT a, b, c, d FROM tst_arrays ORDER BY a;
+ SELECT a, b FROM tst_one_enum ORDER BY a;
+ SELECT a, b FROM tst_enums ORDER BY a;
+ SELECT a, b FROM tst_one_comp ORDER BY a;
+ SELECT a, b FROM tst_comps ORDER BY a;
+ SELECT a, b FROM tst_comp_enum ORDER BY a;
+ SELECT a, b FROM tst_comp_enum_array ORDER BY a;
+ SELECT a, b FROM tst_comp_one_enum_array ORDER BY a;
+ SELECT a, b FROM tst_comp_enum_what ORDER BY a;
+ SELECT a, b FROM tst_comp_mix_array ORDER BY a;
+ SELECT a, b FROM tst_range ORDER BY a;
+ SELECT a, b, c FROM tst_range_array ORDER BY a;
+ SELECT a, b FROM tst_hstore ORDER BY a;
+));
+
+is( $result, '3|{3,2,1}
+4|{4,5,6,1}
+5|{4,5,6,1}
+{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"}
+{4,1,2}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"}
+{5,NULL,NULL}|{c,d,e}|{3,4,5}|{"3 days 00:00:01","4 days 00:00:02","5 days 00:00:03"}
+3|c
+4|
+5|
+b|{c,a}
+d|{e,d}
+e|{e,d}
+3|(3,c,3)
+4|(,x,-1)
+5|(,x,-1)
+(2,b,2)|{"(2,b,2)"}
+(4,d,4)|{NULL,"(9,x,)"}
+(5,e,)|{NULL,"(9,x,)"}
+3|(3,c,3)
+4|(4,d,44)
+5|(4,d,44)
+(2,b,2)|{"(2,b,2)"}
+(4,d,3)|{"(1,a,1)","(2,b,2)"}
+(5,e,3)|{"(1,a,1)","(2,b,2)"}
+4|(4,"{c,b,d}",4)
+5|(4,"{c,b,d}",4)
+(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"}
+(4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"}
+(5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"}
+2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"}
+3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"}
+2|"updated"=>"value"
+3|"also"=>"updated"
+4|"yellow horse"=>"moaned"',
+ 'check replicated deletes on subscriber');
+
+# Test a domain with a constraint backed by a SQL-language function,
+# which needs an active snapshot in order to operate.
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tst_dom_constr VALUES (11)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT sum(a) FROM tst_dom_constr");
+is($result, '21', 'sql-function constraint on domain');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl
new file mode 100644
index 0000000..1182a12
--- /dev/null
+++ b/src/test/subscription/t/003_constraints.pl
@@ -0,0 +1,139 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# This test checks that constraints work on subscriber
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 6;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Setup structure on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_fk (bid int PRIMARY KEY);");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, junk text, bid int REFERENCES tab_fk (bid));"
+);
+
+# Setup structure on subscriber; column order intentionally different
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_fk (bid int PRIMARY KEY);");
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid), junk text);"
+);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR ALL TABLES;");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false)"
+);
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_fk (bid) VALUES (1);");
+# "junk" value is meant to be large enough to force out-of-line storage
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_fk_ref (id, bid, junk) VALUES (1, 1, repeat(pi()::text,20000));"
+);
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Check data on subscriber
+my $result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(bid), max(bid) FROM tab_fk;");
+is($result, qq(1|1|1), 'check replicated tab_fk inserts on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
+is($result, qq(1|1|1), 'check replicated tab_fk_ref inserts on subscriber');
+
+# Drop the fk on publisher
+$node_publisher->safe_psql('postgres', "DROP TABLE tab_fk CASCADE;");
+
+# Insert data
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_fk_ref (id, bid) VALUES (2, 2);");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# FK is not enforced on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
+is($result, qq(2|1|2), 'check FK ignored on subscriber');
+
+# Add replica trigger
+$node_subscriber->safe_psql(
+ 'postgres', qq{
+CREATE FUNCTION filter_basic_dml_fn() RETURNS TRIGGER AS \$\$
+BEGIN
+ IF (TG_OP = 'INSERT') THEN
+ IF (NEW.id < 10) THEN
+ RETURN NEW;
+ ELSE
+ RETURN NULL;
+ END IF;
+ ELSIF (TG_OP = 'UPDATE') THEN
+ RETURN NULL;
+ ELSE
+ RAISE WARNING 'Unknown action';
+ RETURN NULL;
+ END IF;
+END;
+\$\$ LANGUAGE plpgsql;
+CREATE TRIGGER filter_basic_dml_trg
+ BEFORE INSERT OR UPDATE OF bid ON tab_fk_ref
+ FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn();
+ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg;
+});
+
+# Insert data
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_fk_ref (id, bid) VALUES (10, 10);");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# The trigger should cause the insert to be skipped on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
+is($result, qq(2|1|2), 'check replica insert trigger applied on subscriber');
+
+# Update data
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_fk_ref SET bid = 2 WHERE bid = 1;");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# The trigger should cause the update to be skipped on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
+is($result, qq(2|1|2),
+ 'check replica update column trigger applied on subscriber');
+
+# Update on a column not specified in the trigger, but it will trigger
+# anyway because logical replication ships all columns in an update.
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab_fk_ref SET id = 6 WHERE id = 1;");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(id), max(id) FROM tab_fk_ref;");
+is($result, qq(2|1|2),
+ 'check column trigger applied even on update for other column');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
new file mode 100644
index 0000000..b3c91af
--- /dev/null
+++ b/src/test/subscription/t/004_sync.pl
@@ -0,0 +1,180 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Tests for logical replication table syncing
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 8;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+ "wal_retrieve_retry_interval = 1ms");
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_rep (a int primary key)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_rep SELECT generate_series(1,10)");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_rep (a int primary key)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR ALL TABLES");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+);
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Also wait for initial table sync to finish
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
+is($result, qq(10), 'initial data synced for first sub');
+
+# drop subscription so that there is unreplicated data
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_rep SELECT generate_series(11,20)");
+
+# recreate the subscription, it will try to do initial copy
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+);
+
+# but it will be stuck on data copy as it will fail on constraint
+my $started_query = "SELECT srsubstate = 'd' FROM pg_subscription_rel;";
+$node_subscriber->poll_query_until('postgres', $started_query)
+ or die "Timed out while waiting for subscriber to start sync";
+
+# remove the conflicting data
+$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;");
+
+# wait for sync to finish this time
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# check that all data is synced
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
+is($result, qq(20), 'initial data synced for second sub');
+
+# now check another subscription for the same node pair
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false)"
+);
+
+# wait for it to start
+$node_subscriber->poll_query_until('postgres',
+ "SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL"
+) or die "Timed out while waiting for subscriber to start";
+
+# and drop both subscriptions
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2");
+
+# check subscriptions are removed
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM pg_subscription");
+is($result, qq(0), 'second and third sub are dropped');
+
+# remove the conflicting data
+$node_subscriber->safe_psql('postgres', "DELETE FROM tab_rep;");
+
+# recreate the subscription again
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+);
+
+# and wait for data sync to finish again
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# check that all data is synced
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
+is($result, qq(20), 'initial data synced for fourth sub');
+
+# add new table on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_rep_next (a int)");
+
+# setup structure with existing data on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_rep_next (a) AS SELECT generate_series(1,10)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM tab_rep_next");
+is($result, qq(0), 'no data for table added after subscription initialized');
+
+# ask for data sync
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
+
+# wait for sync to finish
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM tab_rep_next");
+is($result, qq(10),
+ 'data for table added after subscription initialized are now synced');
+
+# Add some data
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_rep_next SELECT generate_series(1,10)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*) FROM tab_rep_next");
+is($result, qq(20),
+ 'changes for table added after subscription initialized replicated');
+
+# clean up
+$node_publisher->safe_psql('postgres', "DROP TABLE tab_rep_next");
+$node_subscriber->safe_psql('postgres', "DROP TABLE tab_rep_next");
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+# Table tap_rep already has the same records on both publisher and subscriber
+# at this time. Recreate the subscription which will do the initial copy of
+# the table again and fails due to unique constraint violation.
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+);
+
+$result = $node_subscriber->poll_query_until('postgres', $started_query)
+ or die "Timed out while waiting for subscriber to start sync";
+
+# DROP SUBSCRIPTION must clean up slots on the publisher side when the
+# subscriber is stuck on data copy for constraint violation.
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_publisher->safe_psql('postgres',
+ "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0),
+ 'DROP SUBSCRIPTION during error can clean up the slots on the publisher');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
diff --git a/src/test/subscription/t/005_encoding.pl b/src/test/subscription/t/005_encoding.pl
new file mode 100644
index 0000000..a3f56a4
--- /dev/null
+++ b/src/test/subscription/t/005_encoding.pl
@@ -0,0 +1,55 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test replication between databases with different encodings
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(
+ allows_streaming => 'logical',
+ extra => [ '--locale=C', '--encoding=UTF8' ]);
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(
+ allows_streaming => 'logical',
+ extra => [ '--locale=C', '--encoding=LATIN1' ]);
+$node_subscriber->start;
+
+my $ddl = "CREATE TABLE test1 (a int, b text);";
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION mypub FOR ALL TABLES;");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
+);
+
+$node_publisher->wait_for_catchup('mysub');
+
+# Wait for initial sync to finish as well
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_publisher->safe_psql('postgres',
+ q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8
+
+$node_publisher->wait_for_catchup('mysub');
+
+is( $node_subscriber->safe_psql(
+ 'postgres', q{SELECT a FROM test1 WHERE b = E'Mot\xf6rhead'}
+ ), # LATIN1
+ qq(1),
+ 'data replicated to subscriber');
+
+$node_subscriber->stop;
+$node_publisher->stop;
diff --git a/src/test/subscription/t/006_rewrite.pl b/src/test/subscription/t/006_rewrite.pl
new file mode 100644
index 0000000..37e05a4
--- /dev/null
+++ b/src/test/subscription/t/006_rewrite.pl
@@ -0,0 +1,68 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test logical replication behavior with heap rewrites
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $ddl = "CREATE TABLE test1 (a int, b text);";
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION mypub FOR ALL TABLES;");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
+);
+
+$node_publisher->wait_for_catchup('mysub');
+
+# Wait for initial sync to finish as well
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_publisher->safe_psql('postgres',
+ q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');});
+
+$node_publisher->wait_for_catchup('mysub');
+
+is( $node_subscriber->safe_psql('postgres', q{SELECT a, b FROM test1}),
+ qq(1|one
+2|two),
+ 'initial data replicated to subscriber');
+
+# DDL that causes a heap rewrite
+my $ddl2 = "ALTER TABLE test1 ADD c int NOT NULL DEFAULT 0;";
+$node_subscriber->safe_psql('postgres', $ddl2);
+$node_publisher->safe_psql('postgres', $ddl2);
+
+$node_publisher->wait_for_catchup('mysub');
+
+$node_publisher->safe_psql('postgres',
+ q{INSERT INTO test1 (a, b, c) VALUES (3, 'three', 33);});
+
+$node_publisher->wait_for_catchup('mysub');
+
+is( $node_subscriber->safe_psql('postgres', q{SELECT a, b, c FROM test1}),
+ qq(1|one|0
+2|two|0
+3|three|33),
+ 'data replicated to subscriber');
+
+$node_subscriber->stop;
+$node_publisher->stop;
diff --git a/src/test/subscription/t/007_ddl.pl b/src/test/subscription/t/007_ddl.pl
new file mode 100644
index 0000000..dd10d5c
--- /dev/null
+++ b/src/test/subscription/t/007_ddl.pl
@@ -0,0 +1,45 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test some logical replication DDL behavior
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $ddl = "CREATE TABLE test1 (a int, b text);";
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION mypub FOR ALL TABLES;");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
+);
+
+$node_publisher->wait_for_catchup('mysub');
+
+$node_subscriber->safe_psql(
+ 'postgres', q{
+BEGIN;
+ALTER SUBSCRIPTION mysub DISABLE;
+ALTER SUBSCRIPTION mysub SET (slot_name = NONE);
+DROP SUBSCRIPTION mysub;
+COMMIT;
+});
+
+pass "subscription disable and drop in same transaction did not hang";
+
+$node_subscriber->stop;
+$node_publisher->stop;
diff --git a/src/test/subscription/t/008_diff_schema.pl b/src/test/subscription/t/008_diff_schema.pl
new file mode 100644
index 0000000..a04a798
--- /dev/null
+++ b/src/test/subscription/t/008_diff_schema.pl
@@ -0,0 +1,128 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test behavior with different schema on subscriber
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 5;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999, e int GENERATED BY DEFAULT AS IDENTITY)"
+);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR ALL TABLES");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+);
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Also wait for initial table sync to finish
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber');
+
+# Update the rows on the publisher and check the additional columns on
+# subscriber didn't change
+$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(b)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999), count(e) FROM test_tab");
+is($result, qq(2|2|2|2),
+ 'check extra columns contain local defaults after copy');
+
+# Change the local values of the extra columns on the subscriber,
+# update publisher, and check that subscriber retains the expected
+# values
+$node_subscriber->safe_psql('postgres',
+ "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"
+);
+$node_publisher->safe_psql('postgres',
+ "UPDATE test_tab SET b = md5(a::text)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"
+);
+is($result, qq(2|2|2), 'check extra columns contain locally changed data');
+
+# Another insert
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (3, 'baz')");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999), count(e) FROM test_tab");
+is($result, qq(3|3|3|3),
+ 'check extra columns contain local defaults after apply');
+
+
+# Check a bug about adding a replica identity column on the subscriber
+# that was not yet mapped to a column on the publisher. This would
+# result in errors on the subscriber and replication thus not
+# progressing.
+# (https://www.postgresql.org/message-id/flat/a9139c29-7ddd-973b-aa7f-71fed9c38d75%40minerva.info)
+
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab2 (a int)");
+
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab2 (a int)");
+
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION");
+
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Add replica identity column. (The serial is not necessary, but it's
+# a convenient way to get a default on the new column so that rows
+# from the publisher that don't have the column yet can be inserted.)
+$node_subscriber->safe_psql('postgres',
+ "ALTER TABLE test_tab2 ADD COLUMN b serial PRIMARY KEY");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO test_tab2 VALUES (1)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+is( $node_subscriber->safe_psql(
+ 'postgres', "SELECT count(*), min(a), max(a) FROM test_tab2"),
+ qq(1|1|1),
+ 'check replicated inserts on subscriber');
+
+
+$node_subscriber->stop;
+$node_publisher->stop;
diff --git a/src/test/subscription/t/009_matviews.pl b/src/test/subscription/t/009_matviews.pl
new file mode 100644
index 0000000..92c7d18
--- /dev/null
+++ b/src/test/subscription/t/009_matviews.pl
@@ -0,0 +1,52 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test materialized views behavior
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION mypub FOR ALL TABLES;");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;"
+);
+
+$node_publisher->safe_psql('postgres',
+ q{CREATE TABLE test1 (a int PRIMARY KEY, b text)});
+$node_publisher->safe_psql('postgres',
+ q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');});
+
+$node_subscriber->safe_psql('postgres',
+ q{CREATE TABLE test1 (a int PRIMARY KEY, b text);});
+
+$node_publisher->wait_for_catchup('mysub');
+
+# Materialized views are not supported by logical replication, but
+# logical decoding does produce change information for them, so we
+# need to make sure they are properly ignored. (bug #15044)
+
+# create a MV with some data
+$node_publisher->safe_psql('postgres',
+ q{CREATE MATERIALIZED VIEW testmv1 AS SELECT * FROM test1;});
+$node_publisher->wait_for_catchup('mysub');
+
+# There is no equivalent relation on the subscriber, but MV data is
+# not replicated, so this does not hang.
+
+pass "materialized view data not replicated";
+
+$node_subscriber->stop;
+$node_publisher->stop;
diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl
new file mode 100644
index 0000000..5617469
--- /dev/null
+++ b/src/test/subscription/t/010_truncate.pl
@@ -0,0 +1,241 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test TRUNCATE
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 14;
+
+# setup
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+ qq(max_logical_replication_workers = 6));
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab3 (a int PRIMARY KEY)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab3 (a int PRIMARY KEY)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab4 (x int PRIMARY KEY, y int REFERENCES tab3)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SEQUENCE seq1 OWNED BY tab1.a");
+$node_subscriber->safe_psql('postgres', "ALTER SEQUENCE seq1 START 101");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub1 FOR TABLE tab1");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub2 FOR TABLE tab2 WITH (publish = insert)");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub3 FOR TABLE tab3, tab4");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
+);
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2"
+);
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3"
+);
+
+# Wait for initial sync of all subscriptions
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# insert data to truncate
+
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO tab1 VALUES (1), (2), (3)");
+
+$node_publisher->wait_for_catchup('sub1');
+
+# truncate and check
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1");
+
+$node_publisher->wait_for_catchup('sub1');
+
+my $result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||), 'truncate replicated');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT nextval('seq1')");
+is($result, qq(1), 'sequence not restarted');
+
+# truncate with restart identity
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1 RESTART IDENTITY");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT nextval('seq1')");
+is($result, qq(101), 'truncate restarted identities');
+
+# test publication that does not replicate truncate
+
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO tab2 VALUES (1), (2), (3)");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(3|1|3), 'truncate not replicated');
+
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION pub2 SET (publish = 'insert, truncate')");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(0||), 'truncate replicated after publication change');
+
+# test multiple tables connected by foreign keys
+
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO tab3 VALUES (1), (2), (3)");
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO tab4 VALUES (11, 1), (111, 1), (22, 2)");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab3, tab4");
+
+$node_publisher->wait_for_catchup('sub3');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab3");
+is($result, qq(0||), 'truncate of multiple tables replicated');
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(x), max(x) FROM tab4");
+is($result, qq(0||), 'truncate of multiple tables replicated');
+
+# test truncate of multiple tables, some of which are not published
+
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub2");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION pub2");
+
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO tab1 VALUES (1), (2), (3)");
+$node_subscriber->safe_psql('postgres',
+ "INSERT INTO tab2 VALUES (1), (2), (3)");
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1, tab2");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||), 'truncate of multiple tables some not published');
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab2");
+is($result, qq(3|1|3), 'truncate of multiple tables some not published');
+
+# Test that truncate works for synchronous logical replication
+
+$node_publisher->safe_psql('postgres',
+ "ALTER SYSTEM SET synchronous_standby_names TO 'sub1'");
+$node_publisher->safe_psql('postgres', "SELECT pg_reload_conf()");
+
+# insert data to truncate
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab1 VALUES (1), (2), (3)");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(3|1|3), 'check synchronous logical replication');
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||),
+ 'truncate replicated in synchronous logical replication');
+
+$node_publisher->safe_psql('postgres',
+ "ALTER SYSTEM RESET synchronous_standby_names");
+$node_publisher->safe_psql('postgres', "SELECT pg_reload_conf()");
+
+# test that truncate works for logical replication when there are multiple
+# subscriptions for a single table
+
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab5 (a int)");
+
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab5 (a int)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub5 FOR TABLE tab5");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub5_1 CONNECTION '$publisher_connstr' PUBLICATION pub5"
+);
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub5_2 CONNECTION '$publisher_connstr' PUBLICATION pub5"
+);
+
+# wait for initial data sync
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# insert data to truncate
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab5 VALUES (1), (2), (3)");
+
+$node_publisher->wait_for_catchup('sub5_1');
+$node_publisher->wait_for_catchup('sub5_2');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab5");
+is($result, qq(6|1|3), 'insert replicated for multiple subscriptions');
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab5");
+
+$node_publisher->wait_for_catchup('sub5_1');
+$node_publisher->wait_for_catchup('sub5_2');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab5");
+is($result, qq(0||), 'truncate replicated for multiple subscriptions');
+
+# check deadlocks
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT deadlocks FROM pg_stat_database WHERE datname='postgres'");
+is($result, qq(0), 'no deadlocks detected');
diff --git a/src/test/subscription/t/011_generated.pl b/src/test/subscription/t/011_generated.pl
new file mode 100644
index 0000000..29108cb
--- /dev/null
+++ b/src/test/subscription/t/011_generated.pl
@@ -0,0 +1,66 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test generated columns
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+
+# setup
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED)"
+);
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 22) STORED)"
+);
+
+# data for initial sync
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab1 (a) VALUES (1), (2), (3)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub1 FOR ALL TABLES");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
+);
+
+# Wait for initial sync of all subscriptions
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1");
+is( $result, qq(1|22
+2|44
+3|66), 'generated columns initial sync');
+
+# data to replicate
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (4), (5)");
+
+$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5");
+
+$node_publisher->wait_for_catchup('sub1');
+
+$result = $node_subscriber->safe_psql('postgres', "SELECT a, b FROM tab1");
+is( $result, qq(1|22
+2|44
+3|66
+4|88
+6|132), 'generated columns replicated');
diff --git a/src/test/subscription/t/012_collation.pl b/src/test/subscription/t/012_collation.pl
new file mode 100644
index 0000000..8137a16
--- /dev/null
+++ b/src/test/subscription/t/012_collation.pl
@@ -0,0 +1,110 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test collations, in particular nondeterministic ones
+# (only works with ICU)
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More;
+
+if ($ENV{with_icu} eq 'yes')
+{
+ plan tests => 2;
+}
+else
+{
+ plan skip_all => 'ICU not supported by this build';
+}
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(
+ allows_streaming => 'logical',
+ extra => [ '--locale=C', '--encoding=UTF8' ]);
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(
+ allows_streaming => 'logical',
+ extra => [ '--locale=C', '--encoding=UTF8' ]);
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+# Test plan: Create a table with a nondeterministic collation in the
+# primary key column. Pre-insert rows on the publisher and subscriber
+# that are collation-wise equal but byte-wise different. (We use a
+# string in different normal forms for that.) Set up publisher and
+# subscriber. Update the row on the publisher, but don't change the
+# primary key column. The subscriber needs to find the row to be
+# updated using the nondeterministic collation semantics. We need to
+# test for both a replica identity index and for replica identity
+# full, since those have different code paths internally.
+
+$node_subscriber->safe_psql('postgres',
+ q{CREATE COLLATION ctest_nondet (provider = icu, locale = 'und', deterministic = false)}
+);
+
+# table with replica identity index
+
+$node_publisher->safe_psql('postgres',
+ q{CREATE TABLE tab1 (a text PRIMARY KEY, b text)});
+
+$node_publisher->safe_psql('postgres',
+ q{INSERT INTO tab1 VALUES (U&'\00E4bc', 'foo')});
+
+$node_subscriber->safe_psql('postgres',
+ q{CREATE TABLE tab1 (a text COLLATE ctest_nondet PRIMARY KEY, b text)});
+
+$node_subscriber->safe_psql('postgres',
+ q{INSERT INTO tab1 VALUES (U&'\0061\0308bc', 'foo')});
+
+# table with replica identity full
+
+$node_publisher->safe_psql('postgres', q{CREATE TABLE tab2 (a text, b text)});
+$node_publisher->safe_psql('postgres',
+ q{ALTER TABLE tab2 REPLICA IDENTITY FULL});
+
+$node_publisher->safe_psql('postgres',
+ q{INSERT INTO tab2 VALUES (U&'\00E4bc', 'foo')});
+
+$node_subscriber->safe_psql('postgres',
+ q{CREATE TABLE tab2 (a text COLLATE ctest_nondet, b text)});
+$node_subscriber->safe_psql('postgres',
+ q{ALTER TABLE tab2 REPLICA IDENTITY FULL});
+
+$node_subscriber->safe_psql('postgres',
+ q{INSERT INTO tab2 VALUES (U&'\0061\0308bc', 'foo')});
+
+# set up publication, subscription
+
+$node_publisher->safe_psql('postgres',
+ q{CREATE PUBLICATION pub1 FOR ALL TABLES});
+
+$node_subscriber->safe_psql('postgres',
+ qq{CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 WITH (copy_data = false)}
+);
+
+$node_publisher->wait_for_catchup('sub1');
+
+# test with replica identity index
+
+$node_publisher->safe_psql('postgres',
+ q{UPDATE tab1 SET b = 'bar' WHERE b = 'foo'});
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres', q{SELECT b FROM tab1}),
+ qq(bar), 'update with primary key with nondeterministic collation');
+
+# test with replica identity full
+
+$node_publisher->safe_psql('postgres',
+ q{UPDATE tab2 SET b = 'bar' WHERE b = 'foo'});
+
+$node_publisher->wait_for_catchup('sub1');
+
+is($node_subscriber->safe_psql('postgres', q{SELECT b FROM tab2}),
+ qq(bar),
+ 'update with replica identity full with nondeterministic collation');
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
new file mode 100644
index 0000000..dfe2cb6
--- /dev/null
+++ b/src/test/subscription/t/013_partition.pl
@@ -0,0 +1,872 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test logical replication with partitioned tables
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 71;
+
+# setup
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber1 = get_new_node('subscriber1');
+$node_subscriber1->init(allows_streaming => 'logical');
+$node_subscriber1->start;
+
+my $node_subscriber2 = get_new_node('subscriber2');
+$node_subscriber2->init(allows_streaming => 'logical');
+$node_subscriber2->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+# publisher
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub_all FOR ALL TABLES");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab1_1 (b text, a int NOT NULL)");
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (4, 5, 6)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab1_def PARTITION OF tab1 DEFAULT");
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION pub1 ADD TABLE tab1, tab1_1");
+
+# subscriber1
+#
+# This is partitioned differently from the publisher. tab1_2 is
+# subpartitioned. This tests the tuple routing code on the
+# subscriber.
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab1 (c text, a int PRIMARY KEY, b text) PARTITION BY LIST (a)"
+);
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)"
+);
+$node_subscriber1->safe_psql('postgres',
+ "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (4, 5, 6) PARTITION BY LIST (a)"
+);
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab1_2_1 (c text, b text, a int NOT NULL)");
+$node_subscriber1->safe_psql('postgres',
+ "ALTER TABLE tab1_2 ATTACH PARTITION tab1_2_1 FOR VALUES IN (5)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab1_2_2 PARTITION OF tab1_2 FOR VALUES IN (4, 6)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab1_def PARTITION OF tab1 (c DEFAULT 'sub1_tab1') DEFAULT"
+);
+$node_subscriber1->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
+);
+
+# Add set of AFTER replica triggers for testing that they are fired
+# correctly. This uses a table that records details of all trigger
+# activities. Triggers are marked as enabled for a subset of the
+# partition tree.
+$node_subscriber1->safe_psql(
+ 'postgres', qq{
+CREATE TABLE sub1_trigger_activity (tgtab text, tgop text,
+ tgwhen text, tglevel text, olda int, newa int);
+CREATE FUNCTION sub1_trigger_activity_func() RETURNS TRIGGER AS \$\$
+BEGIN
+ IF (TG_OP = 'INSERT') THEN
+ INSERT INTO public.sub1_trigger_activity
+ SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, NULL, NEW.a;
+ ELSIF (TG_OP = 'UPDATE') THEN
+ INSERT INTO public.sub1_trigger_activity
+ SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, OLD.a, NEW.a;
+ END IF;
+ RETURN NULL;
+END;
+\$\$ LANGUAGE plpgsql;
+CREATE TRIGGER sub1_tab1_log_op_trigger
+ AFTER INSERT OR UPDATE ON tab1
+ FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func();
+ALTER TABLE ONLY tab1 ENABLE REPLICA TRIGGER sub1_tab1_log_op_trigger;
+CREATE TRIGGER sub1_tab1_2_log_op_trigger
+ AFTER INSERT OR UPDATE ON tab1_2
+ FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func();
+ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub1_tab1_2_log_op_trigger;
+CREATE TRIGGER sub1_tab1_2_2_log_op_trigger
+ AFTER INSERT OR UPDATE ON tab1_2_2
+ FOR EACH ROW EXECUTE PROCEDURE sub1_trigger_activity_func();
+ALTER TABLE ONLY tab1_2_2 ENABLE REPLICA TRIGGER sub1_tab1_2_2_log_op_trigger;
+});
+
+# subscriber 2
+#
+# This does not use partitioning. The tables match the leaf tables on
+# the publisher.
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text)"
+);
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab1_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_1', b text)"
+);
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab1_2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)"
+);
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab1_def (a int PRIMARY KEY, b text, c text DEFAULT 'sub2_tab1_def')"
+);
+$node_subscriber2->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub_all"
+);
+
+# Add set of AFTER replica triggers for testing that they are fired
+# correctly, using the same method as the first subscriber.
+$node_subscriber2->safe_psql(
+ 'postgres', qq{
+CREATE TABLE sub2_trigger_activity (tgtab text,
+ tgop text, tgwhen text, tglevel text, olda int, newa int);
+CREATE FUNCTION sub2_trigger_activity_func() RETURNS TRIGGER AS \$\$
+BEGIN
+ IF (TG_OP = 'INSERT') THEN
+ INSERT INTO public.sub2_trigger_activity
+ SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, NULL, NEW.a;
+ ELSIF (TG_OP = 'UPDATE') THEN
+ INSERT INTO public.sub2_trigger_activity
+ SELECT TG_RELNAME, TG_OP, TG_WHEN, TG_LEVEL, OLD.a, NEW.a;
+ END IF;
+ RETURN NULL;
+END;
+\$\$ LANGUAGE plpgsql;
+CREATE TRIGGER sub2_tab1_log_op_trigger
+ AFTER INSERT OR UPDATE ON tab1
+ FOR EACH ROW EXECUTE PROCEDURE sub2_trigger_activity_func();
+ALTER TABLE ONLY tab1 ENABLE REPLICA TRIGGER sub2_tab1_log_op_trigger;
+CREATE TRIGGER sub2_tab1_2_log_op_trigger
+ AFTER INSERT OR UPDATE ON tab1_2
+ FOR EACH ROW EXECUTE PROCEDURE sub2_trigger_activity_func();
+ALTER TABLE ONLY tab1_2 ENABLE REPLICA TRIGGER sub2_tab1_2_log_op_trigger;
+});
+
+# Wait for initial sync of all subscriptions
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber2->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Tests for replication using leaf partition identity and schema
+
+# insert
+$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab1_1 (a) VALUES (3)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab1_2 VALUES (5)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (0)");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+my $result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is( $result, qq(sub1_tab1|0
+sub1_tab1|1
+sub1_tab1|3
+sub1_tab1|5), 'inserts into tab1 and its partitions replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a FROM tab1_2_1 ORDER BY 1");
+is($result, qq(5), 'inserts into tab1_2 replicated into tab1_2_1 correctly');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a FROM tab1_2_2 ORDER BY 1");
+is($result, qq(), 'inserts into tab1_2 replicated into tab1_2_2 correctly');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1_1 ORDER BY 1, 2");
+is( $result, qq(sub2_tab1_1|1
+sub2_tab1_1|3), 'inserts into tab1_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1_2 ORDER BY 1, 2");
+is($result, qq(sub2_tab1_2|5), 'inserts into tab1_2 replicated');
+
+# The AFTER trigger of tab1_2 should have recorded one INSERT.
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT * FROM sub2_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;"
+);
+is( $result,
+ qq(tab1_2|INSERT|AFTER|ROW||5),
+ 'check replica insert after trigger applied on subscriber');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1_def ORDER BY 1, 2");
+is($result, qq(sub2_tab1_def|0), 'inserts into tab1_def replicated');
+
+# update (replicated as update)
+$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 2 WHERE a = 1");
+# All of the following cause an update to be applied to a partitioned
+# table on subscriber1: tab1_2 is leaf partition on publisher, whereas
+# it's sub-partitioned on subscriber1.
+$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5");
+$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 4 WHERE a = 6");
+$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 4");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is( $result, qq(sub1_tab1|0
+sub1_tab1|2
+sub1_tab1|3
+sub1_tab1|6), 'update of tab1_1, tab1_2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a FROM tab1_2_1 ORDER BY 1");
+is($result, qq(), 'updates of tab1_2 replicated into tab1_2_1 correctly');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a FROM tab1_2_2 ORDER BY 1");
+is($result, qq(6), 'updates of tab1_2 replicated into tab1_2_2 correctly');
+
+# The AFTER trigger should have recorded the UPDATEs of tab1_2_2.
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT * FROM sub1_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;"
+);
+is( $result, qq(tab1_2_2|INSERT|AFTER|ROW||6
+tab1_2_2|UPDATE|AFTER|ROW|4|6
+tab1_2_2|UPDATE|AFTER|ROW|6|4),
+ 'check replica update after trigger applied on subscriber');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1_1 ORDER BY 1, 2");
+is( $result, qq(sub2_tab1_1|2
+sub2_tab1_1|3), 'update of tab1_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1_2 ORDER BY 1, 2");
+is($result, qq(sub2_tab1_2|6), 'tab1_2 updated');
+
+# The AFTER trigger should have recorded the updates of tab1_2.
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT * FROM sub2_trigger_activity ORDER BY tgtab, tgop, tgwhen, olda, newa;"
+);
+is( $result, qq(tab1_2|INSERT|AFTER|ROW||5
+tab1_2|UPDATE|AFTER|ROW|4|6
+tab1_2|UPDATE|AFTER|ROW|5|6
+tab1_2|UPDATE|AFTER|ROW|6|4),
+ 'check replica update after trigger applied on subscriber');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1_def ORDER BY 1");
+is($result, qq(sub2_tab1_def|0), 'tab1_def unchanged');
+
+# update (replicated as delete+insert)
+$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 1 WHERE a = 0");
+$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 4 WHERE a = 1");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is( $result, qq(sub1_tab1|2
+sub1_tab1|3
+sub1_tab1|4
+sub1_tab1|6),
+ 'update of tab1 (delete from tab1_def + insert into tab1_1) replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT a FROM tab1_2_2 ORDER BY 1");
+is( $result, qq(4
+6), 'updates of tab1 (delete + insert) replicated into tab1_2_2 correctly');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1_1 ORDER BY 1, 2");
+is( $result, qq(sub2_tab1_1|2
+sub2_tab1_1|3), 'tab1_1 unchanged');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1_2 ORDER BY 1, 2");
+is( $result, qq(sub2_tab1_2|4
+sub2_tab1_2|6), 'insert into tab1_2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a FROM tab1_def ORDER BY 1");
+is($result, qq(), 'delete from tab1_def replicated');
+
+# delete
+$node_publisher->safe_psql('postgres',
+ "DELETE FROM tab1 WHERE a IN (2, 3, 5)");
+$node_publisher->safe_psql('postgres', "DELETE FROM tab1_2");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab1");
+is($result, qq(), 'delete from tab1_1, tab1_2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1_1");
+is($result, qq(), 'delete from tab1_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1_2");
+is($result, qq(), 'delete from tab1_2 replicated');
+
+# truncate
+$node_subscriber1->safe_psql('postgres',
+ "INSERT INTO tab1 (a) VALUES (1), (2), (5)");
+$node_subscriber2->safe_psql('postgres', "INSERT INTO tab1_2 (a) VALUES (2)");
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1_2");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result =
+ $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1");
+is( $result, qq(1
+2), 'truncate of tab1_2 replicated');
+
+$result =
+ $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1_2 ORDER BY 1");
+is($result, qq(), 'truncate of tab1_2 replicated');
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result =
+ $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1");
+is($result, qq(), 'truncate of tab1_1 replicated');
+$result =
+ $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1");
+is($result, qq(), 'truncate of tab1 replicated');
+
+# Check that subscriber handles cases where update/delete target tuple
+# is missing. We have to look for the DEBUG1 log messages about that,
+# so temporarily bump up the log verbosity.
+$node_subscriber1->append_conf('postgresql.conf',
+ "log_min_messages = debug1");
+$node_subscriber1->reload;
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab1 VALUES (1, 'foo'), (4, 'bar'), (10, 'baz')");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$node_subscriber1->safe_psql('postgres', "DELETE FROM tab1");
+
+# Note that the current location of the log file is not grabbed immediately
+# after reloading the configuration, but after sending one SQL command to
+# the node so as we are sure that the reloading has taken effect.
+my $log_location = -s $node_subscriber1->logfile;
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab1 SET b = 'quux' WHERE a = 4");
+$node_publisher->safe_psql('postgres', "DELETE FROM tab1");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+my $logfile = slurp_file($node_subscriber1->logfile(), $log_location);
+ok( $logfile =~
+ qr/logical replication did not find row to be updated in replication target relation's partition "tab1_2_2"/,
+ 'update target row is missing in tab1_2_2');
+ok( $logfile =~
+ qr/logical replication did not find row to be deleted in replication target relation "tab1_1"/,
+ 'delete target row is missing in tab1_1');
+ok( $logfile =~
+ qr/logical replication did not find row to be deleted in replication target relation "tab1_2_2"/,
+ 'delete target row is missing in tab1_2_2');
+ok( $logfile =~
+ qr/logical replication did not find row to be deleted in replication target relation "tab1_def"/,
+ 'delete target row is missing in tab1_def');
+
+$node_subscriber1->append_conf('postgresql.conf',
+ "log_min_messages = warning");
+$node_subscriber1->reload;
+
+# Tests for replication using root table identity and schema
+
+# publisher
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION pub1");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab2_1 (b text, a int NOT NULL)");
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES IN (0, 1, 2, 3)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab2_2 PARTITION OF tab2 FOR VALUES IN (5, 6)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab3 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab3_1 PARTITION OF tab3 FOR VALUES IN (0, 1, 2, 3, 5, 6)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab4 (a int PRIMARY KEY) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab4_1 PARTITION OF tab4 FOR VALUES IN (0, 1) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab4_1_1 PARTITION OF tab4_1 FOR VALUES IN (0, 1)");
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)");
+# Note: tab3_1's parent is not in the publication, in which case its
+# changes are published using own identity. For tab2, even though both parent
+# and child tables are present but changes will be replicated via the parent's
+# identity and only once.
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true)"
+);
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub_lower_level FOR TABLE tab4_1 WITH (publish_via_partition_root = true)"
+);
+
+# prepare data for the initial sync
+$node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (1)");
+
+# subscriber 1
+$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub1_tab2', b text) PARTITION BY RANGE (a)"
+);
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab2_1 (c text DEFAULT 'sub1_tab2', b text, a int NOT NULL)"
+);
+$node_subscriber1->safe_psql('postgres',
+ "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES FROM (0) TO (10)");
+$node_subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab3_1 (c text DEFAULT 'sub1_tab3_1', b text, a int NOT NULL PRIMARY KEY)"
+);
+$node_subscriber1->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub_viaroot CONNECTION '$publisher_connstr' PUBLICATION pub_viaroot"
+);
+
+# subscriber 2
+$node_subscriber2->safe_psql('postgres', "DROP TABLE tab1");
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text) PARTITION BY HASH (a)"
+);
+# Note: tab1's partitions are named tab1_1 and tab1_2 on the publisher.
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab1_part1 (b text, c text, a int NOT NULL)");
+$node_subscriber2->safe_psql('postgres',
+ "ALTER TABLE tab1 ATTACH PARTITION tab1_part1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)"
+);
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab1_part2 PARTITION OF tab1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)"
+);
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab2', b text)"
+);
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab3 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3', b text)"
+);
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3_1', b text)"
+);
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab4 (a int PRIMARY KEY)"
+);
+$node_subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab4_1 (a int PRIMARY KEY)"
+);
+# Publication that sub2 points to now publishes via root, so must update
+# subscription target relations.
+$node_subscriber2->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_lower_level, pub_all");
+
+# Wait for initial sync of all subscriptions
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber2->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# check that data is synced correctly
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab2");
+is( $result, qq(sub1_tab2|1), 'initial data synced for pub_viaroot');
+
+# insert
+$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (0)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab1_1 (a) VALUES (3)");
+$node_publisher->safe_psql('postgres', "INSERT INTO tab1_2 VALUES (5)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab2 VALUES (0), (3), (5)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab3 VALUES (1), (0), (3), (5)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab4 VALUES (0)");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is( $result, qq(sub1_tab2|0
+sub1_tab2|1
+sub1_tab2|3
+sub1_tab2|5), 'inserts into tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab3_1 ORDER BY 1, 2");
+is( $result, qq(sub1_tab3_1|0
+sub1_tab3_1|1
+sub1_tab3_1|3
+sub1_tab3_1|5), 'inserts into tab3_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is( $result, qq(sub2_tab1|0
+sub2_tab1|1
+sub2_tab1|3
+sub2_tab1|5), 'inserts into tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is( $result, qq(sub2_tab2|0
+sub2_tab2|1
+sub2_tab2|3
+sub2_tab2|5), 'inserts into tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab3 ORDER BY 1, 2");
+is( $result, qq(sub2_tab3|0
+sub2_tab3|1
+sub2_tab3|3
+sub2_tab3|5), 'inserts into tab3 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a FROM tab4 ORDER BY 1");
+is( $result, qq(0), 'inserts into tab4 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a FROM tab4_1 ORDER BY 1");
+is( $result, qq(), 'inserts into tab4_1 replicated');
+
+# now switch the order of publications in the list, try again, the result
+# should be the same (no dependence on order of pulications)
+$node_subscriber2->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub2 SET PUBLICATION pub_all, pub_lower_level");
+
+# make sure the subscription on the second subscriber is synced, before
+# continuing
+$node_subscriber2->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Insert a change into the leaf partition, should be replicated through
+# the partition root (thanks to the FOR ALL TABLES partition).
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab4 VALUES (1)");
+
+$node_publisher->wait_for_catchup('sub2');
+
+# tab4 change should be replicated through the root partition, which
+# maps to the tab4 relation on subscriber.
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a FROM tab4 ORDER BY 1");
+is( $result, qq(0
+1), 'inserts into tab4 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a FROM tab4_1 ORDER BY 1");
+is( $result, qq(), 'inserts into tab4_1 replicated');
+
+# update (replicated as update)
+$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 5");
+$node_publisher->safe_psql('postgres', "UPDATE tab2 SET a = 6 WHERE a = 5");
+$node_publisher->safe_psql('postgres', "UPDATE tab3 SET a = 6 WHERE a = 5");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is( $result, qq(sub1_tab2|0
+sub1_tab2|1
+sub1_tab2|3
+sub1_tab2|6), 'update of tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab3_1 ORDER BY 1, 2");
+is( $result, qq(sub1_tab3_1|0
+sub1_tab3_1|1
+sub1_tab3_1|3
+sub1_tab3_1|6), 'update of tab3_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is( $result, qq(sub2_tab1|0
+sub2_tab1|1
+sub2_tab1|3
+sub2_tab1|6), 'inserts into tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is( $result, qq(sub2_tab2|0
+sub2_tab2|1
+sub2_tab2|3
+sub2_tab2|6), 'inserts into tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab3 ORDER BY 1, 2");
+is( $result, qq(sub2_tab3|0
+sub2_tab3|1
+sub2_tab3|3
+sub2_tab3|6), 'inserts into tab3 replicated');
+
+# update (replicated as delete+insert)
+$node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 2 WHERE a = 6");
+$node_publisher->safe_psql('postgres', "UPDATE tab2 SET a = 2 WHERE a = 6");
+$node_publisher->safe_psql('postgres', "UPDATE tab3 SET a = 2 WHERE a = 6");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is( $result, qq(sub1_tab2|0
+sub1_tab2|1
+sub1_tab2|2
+sub1_tab2|3), 'update of tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a FROM tab3_1 ORDER BY 1, 2");
+is( $result, qq(sub1_tab3_1|0
+sub1_tab3_1|1
+sub1_tab3_1|2
+sub1_tab3_1|3), 'update of tab3_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is( $result, qq(sub2_tab1|0
+sub2_tab1|1
+sub2_tab1|2
+sub2_tab1|3), 'update of tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is( $result, qq(sub2_tab2|0
+sub2_tab2|1
+sub2_tab2|2
+sub2_tab2|3), 'update of tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a FROM tab3 ORDER BY 1, 2");
+is( $result, qq(sub2_tab3|0
+sub2_tab3|1
+sub2_tab3|2
+sub2_tab3|3), 'update of tab3 replicated');
+
+# delete
+$node_publisher->safe_psql('postgres', "DELETE FROM tab1");
+$node_publisher->safe_psql('postgres', "DELETE FROM tab2");
+$node_publisher->safe_psql('postgres', "DELETE FROM tab3");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab2");
+is($result, qq(), 'delete tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1");
+is($result, qq(), 'delete from tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab2");
+is($result, qq(), 'delete from tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab3");
+is($result, qq(), 'delete from tab3 replicated');
+
+# truncate
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab1 VALUES (1), (2), (5)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab2 VALUES (1), (2), (5)");
+# these will NOT be replicated
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1_2, tab2_1, tab3_1");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result =
+ $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab2 ORDER BY 1");
+is( $result, qq(1
+2
+5), 'truncate of tab2_1 NOT replicated');
+
+$result =
+ $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1 ORDER BY 1");
+is( $result, qq(1
+2
+5), 'truncate of tab1_2 NOT replicated');
+
+$result =
+ $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab2 ORDER BY 1");
+is( $result, qq(1
+2
+5), 'truncate of tab2_1 NOT replicated');
+
+$node_publisher->safe_psql('postgres', "TRUNCATE tab1, tab2, tab3");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres', "SELECT a FROM tab2");
+is($result, qq(), 'truncate of tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab1");
+is($result, qq(), 'truncate of tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab2");
+is($result, qq(), 'truncate of tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab3");
+is($result, qq(), 'truncate of tab3 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab3_1");
+is($result, qq(), 'truncate of tab3_1 replicated');
+
+# check that the map to convert tuples from leaf partition to the root
+# table is correctly rebuilt when a new column is added
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE tab2 DROP b, ADD COLUMN c text DEFAULT 'pub_tab2', ADD b text"
+);
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab2 (a, b) VALUES (1, 'xxx'), (3, 'yyy'), (5, 'zzz')");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab2 (a, b, c) VALUES (6, 'aaa', 'xxx_c')");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+ "SELECT c, a, b FROM tab2 ORDER BY 1, 2");
+is( $result, qq(pub_tab2|1|xxx
+pub_tab2|3|yyy
+pub_tab2|5|zzz
+xxx_c|6|aaa), 'inserts into tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT c, a, b FROM tab2 ORDER BY 1, 2");
+is( $result, qq(pub_tab2|1|xxx
+pub_tab2|3|yyy
+pub_tab2|5|zzz
+xxx_c|6|aaa), 'inserts into tab2 replicated');
+
+# Check that subscriber handles cases where update/delete target tuple
+# is missing. We have to look for the DEBUG1 log messages about that,
+# so temporarily bump up the log verbosity.
+$node_subscriber1->append_conf('postgresql.conf',
+ "log_min_messages = debug1");
+$node_subscriber1->reload;
+
+$node_subscriber1->safe_psql('postgres', "DELETE FROM tab2");
+
+# Note that the current location of the log file is not grabbed immediately
+# after reloading the configuration, but after sending one SQL command to
+# the node so as we are sure that the reloading has taken effect.
+$log_location = -s $node_subscriber1->logfile;
+
+$node_publisher->safe_psql('postgres',
+ "UPDATE tab2 SET b = 'quux' WHERE a = 5");
+$node_publisher->safe_psql('postgres', "DELETE FROM tab2 WHERE a = 1");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$logfile = slurp_file($node_subscriber1->logfile(), $log_location);
+ok( $logfile =~
+ qr/logical replication did not find row to be updated in replication target relation's partition "tab2_1"/,
+ 'update target row is missing in tab2_1');
+ok( $logfile =~
+ qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/,
+ 'delete target row is missing in tab2_1');
+
+$node_subscriber1->append_conf('postgresql.conf',
+ "log_min_messages = warning");
+$node_subscriber1->reload;
+
+# Test that replication continues to work correctly after altering the
+# partition of a partitioned target table.
+
+$node_publisher->safe_psql(
+ 'postgres', q{
+ CREATE TABLE tab5 (a int NOT NULL, b int);
+ CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a);
+ ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;});
+
+$node_subscriber2->safe_psql(
+ 'postgres', q{
+ CREATE TABLE tab5 (a int NOT NULL, b int, c int) PARTITION BY LIST (a);
+ CREATE TABLE tab5_1 PARTITION OF tab5 DEFAULT;
+ CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a);
+ ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;
+ ALTER TABLE tab5_1 REPLICA IDENTITY USING INDEX tab5_1_a_idx;});
+
+$node_subscriber2->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
+
+$node_subscriber2->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Make partition map cache
+$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)");
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 2 WHERE a = 1");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM tab5 ORDER BY 1");
+is($result, qq(2|1), 'updates of tab5 replicated correctly');
+
+# Change the column order of partition on subscriber
+$node_subscriber2->safe_psql(
+ 'postgres', q{
+ ALTER TABLE tab5 DETACH PARTITION tab5_1;
+ ALTER TABLE tab5_1 DROP COLUMN b;
+ ALTER TABLE tab5_1 ADD COLUMN b int;
+ ALTER TABLE tab5 ATTACH PARTITION tab5_1 DEFAULT});
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 3 WHERE a = 2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b, c FROM tab5 ORDER BY 1");
+is($result, qq(3|1|), 'updates of tab5 replicated correctly after altering table on subscriber');
+
+# Test that replication into the partitioned target table continues to
+# work correctly when the published table is altered.
+$node_publisher->safe_psql(
+ 'postgres', q{
+ ALTER TABLE tab5 DROP COLUMN b, ADD COLUMN c INT;
+ ALTER TABLE tab5 ADD COLUMN b INT;});
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET c = 1 WHERE a = 3");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b, c FROM tab5 ORDER BY 1");
+is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on publisher');
+
+# Test that replication works correctly as long as the leaf partition
+# has the necessary REPLICA IDENTITY, even though the actual target
+# partitioned table does not.
+$node_subscriber2->safe_psql('postgres',
+ "ALTER TABLE tab5 REPLICA IDENTITY NOTHING");
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 4 WHERE a = 3");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b, c FROM tab5_1 ORDER BY 1");
+is($result, qq(4||1), 'updates of tab5 replicated correctly');
diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl
new file mode 100644
index 0000000..7260378
--- /dev/null
+++ b/src/test/subscription/t/014_binary.pl
@@ -0,0 +1,137 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Binary mode logical replication test
+
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 5;
+
+# Create and initialize a publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create and initialize subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create tables on both sides of the replication
+my $ddl = qq(
+ CREATE TABLE public.test_numerical (
+ a INTEGER PRIMARY KEY,
+ b NUMERIC,
+ c FLOAT,
+ d BIGINT
+ );
+ CREATE TABLE public.test_arrays (
+ a INTEGER[] PRIMARY KEY,
+ b NUMERIC[],
+ c TEXT[]
+ ););
+
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Configure logical replication
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tpub FOR ALL TABLES");
+
+my $publisher_connstring = $node_publisher->connstr . ' dbname=postgres';
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' "
+ . "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)");
+
+# Ensure nodes are in sync with each other
+$node_publisher->wait_for_catchup('tsub');
+$node_subscriber->poll_query_until('postgres',
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"
+) or die "Timed out while waiting for subscriber to synchronize data";
+
+# Insert some content and make sure it's replicated across
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ INSERT INTO public.test_arrays (a, b, c) VALUES
+ ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'),
+ ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}');
+
+ INSERT INTO public.test_numerical (a, b, c, d) VALUES
+ (1, 1.2, 1.3, 10),
+ (2, 2.2, 2.3, 20),
+ (3, 3.2, 3.3, 30);
+ ));
+
+$node_publisher->wait_for_catchup('tsub');
+
+my $result = $node_subscriber->safe_psql('postgres',
+ "SELECT a, b, c, d FROM test_numerical ORDER BY a");
+
+is( $result, '1|1.2|1.3|10
+2|2.2|2.3|20
+3|3.2|3.3|30', 'check replicated data on subscriber');
+
+# Test updates as well
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ UPDATE public.test_arrays SET b[1] = 42, c = NULL;
+ UPDATE public.test_numerical SET b = 42, c = NULL;
+ ));
+
+$node_publisher->wait_for_catchup('tsub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT a, b, c FROM test_arrays ORDER BY a");
+
+is( $result, '{1,2,3}|{42,1.2,1.3}|
+{3,1,2}|{42,1.1,1.2}|', 'check updated replicated data on subscriber');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT a, b, c, d FROM test_numerical ORDER BY a");
+
+is( $result, '1|42||10
+2|42||20
+3|42||30', 'check updated replicated data on subscriber');
+
+# Test to reset back to text formatting, and then to binary again
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tsub SET (binary = false);");
+
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ INSERT INTO public.test_numerical (a, b, c, d) VALUES
+ (4, 4.2, 4.3, 40);
+ ));
+
+$node_publisher->wait_for_catchup('tsub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT a, b, c, d FROM test_numerical ORDER BY a");
+
+is( $result, '1|42||10
+2|42||20
+3|42||30
+4|4.2|4.3|40', 'check replicated data on subscriber');
+
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tsub SET (binary = true);");
+
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ INSERT INTO public.test_arrays (a, b, c) VALUES
+ ('{2,3,1}', '{1.2, 1.3, 1.1}', '{"two", "three", "one"}');
+ ));
+
+$node_publisher->wait_for_catchup('tsub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT a, b, c FROM test_arrays ORDER BY a");
+
+is( $result, '{1,2,3}|{42,1.2,1.3}|
+{2,3,1}|{1.2,1.3,1.1}|{two,three,one}
+{3,1,2}|{42,1.1,1.2}|', 'check replicated data on subscriber');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl
new file mode 100644
index 0000000..998650a
--- /dev/null
+++ b/src/test/subscription/t/015_stream.pl
@@ -0,0 +1,135 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test streaming of simple large transaction
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 4;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+ 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
+);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber');
+
+# Interleave a pair of transactions, each exceeding the 64kB limit.
+my $in = '';
+my $out = '';
+
+my $timer = IPC::Run::timeout($TestLib::timeout_default);
+
+my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer,
+ on_error_stop => 0);
+
+$in .= q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+};
+$h->pump_nb;
+
+$node_publisher->safe_psql(
+ 'postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 9999) s(i);
+DELETE FROM test_tab WHERE a > 5000;
+COMMIT;
+});
+
+$in .= q{
+COMMIT;
+\q
+};
+$h->finish; # errors make the next test fail, so ignore them here
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'check extra columns contain local defaults');
+
+# Test the streaming in binary mode
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub SET (binary = on)");
+
+# Insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql(
+ 'postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(6667|6667|6667), 'check extra columns contain local defaults');
+
+# Change the local values of the extra columns on the subscriber,
+# update publisher, and check that subscriber retains the expected
+# values. This is to ensure that non-streaming transactions behave
+# properly after a streaming transaction.
+$node_subscriber->safe_psql('postgres',
+ "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"
+);
+$node_publisher->safe_psql('postgres',
+ "UPDATE test_tab SET b = md5(a::text)");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"
+);
+is($result, qq(6667|6667|6667),
+ 'check extra columns contain locally changed data');
+
+$node_subscriber->stop;
+$node_publisher->stop;
diff --git a/src/test/subscription/t/016_stream_subxact.pl b/src/test/subscription/t/016_stream_subxact.pl
new file mode 100644
index 0000000..0245b06
--- /dev/null
+++ b/src/test/subscription/t/016_stream_subxact.pl
@@ -0,0 +1,93 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test streaming of large transaction containing large subtransactions
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+ 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
+);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber');
+
+# Insert, update and delete enough rows to exceed 64kB limit.
+$node_publisher->safe_psql(
+ 'postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series( 3, 500) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+SAVEPOINT s1;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501, 1000) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+SAVEPOINT s2;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001, 1500) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+SAVEPOINT s3;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501, 2000) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+SAVEPOINT s4;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(1667|1667|1667),
+ 'check data was copied to subscriber in streaming mode and extra columns contain local defaults'
+);
+
+$node_subscriber->stop;
+$node_publisher->stop;
diff --git a/src/test/subscription/t/017_stream_ddl.pl b/src/test/subscription/t/017_stream_ddl.pl
new file mode 100644
index 0000000..35b1468
--- /dev/null
+++ b/src/test/subscription/t/017_stream_ddl.pl
@@ -0,0 +1,129 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test streaming of large transaction with DDL and subtransactions
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+ 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT, f INT)"
+);
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|0|0), 'check initial data was copied to subscriber');
+
+# a small (non-streamed) transaction with DDL and DML
+$node_publisher->safe_psql(
+ 'postgres', q{
+BEGIN;
+INSERT INTO test_tab VALUES (3, md5(3::text));
+ALTER TABLE test_tab ADD COLUMN c INT;
+SAVEPOINT s1;
+INSERT INTO test_tab VALUES (4, md5(4::text), -4);
+COMMIT;
+});
+
+# large (streamed) transaction with DDL and DML
+$node_publisher->safe_psql(
+ 'postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(5, 1000) s(i);
+ALTER TABLE test_tab ADD COLUMN d INT;
+SAVEPOINT s1;
+INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001, 2000) s(i);
+COMMIT;
+});
+
+# a small (non-streamed) transaction with DDL and DML
+$node_publisher->safe_psql(
+ 'postgres', q{
+BEGIN;
+INSERT INTO test_tab VALUES (2001, md5(2001::text), -2001, 2*2001);
+ALTER TABLE test_tab ADD COLUMN e INT;
+SAVEPOINT s1;
+INSERT INTO test_tab VALUES (2002, md5(2002::text), -2002, 2*2002, -3*2002);
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d), count(e) FROM test_tab");
+is($result, qq(2002|1999|1002|1),
+ 'check data was copied to subscriber in streaming mode and extra columns contain local defaults'
+);
+
+# A large (streamed) transaction with DDL and DML. One of the DDL is performed
+# after DML to ensure that we invalidate the schema sent for test_tab so that
+# the next transaction has to send the schema again.
+$node_publisher->safe_psql(
+ 'postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(2003,5000) s(i);
+ALTER TABLE test_tab ADD COLUMN f INT;
+COMMIT;
+});
+
+# A small transaction that won't get streamed. This is just to ensure that we
+# send the schema again to reflect the last column added in the previous test.
+$node_publisher->safe_psql(
+ 'postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i, 4*i FROM generate_series(5001,5005) s(i);
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c), count(d), count(e), count(f) FROM test_tab");
+is($result, qq(5005|5002|4005|3004|5),
+ 'check data was copied to subscriber for both streaming and non-streaming transactions'
+);
+
+$node_subscriber->stop;
+$node_publisher->stop;
diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl
new file mode 100644
index 0000000..7fc60b5
--- /dev/null
+++ b/src/test/subscription/t/018_stream_subxact_abort.pl
@@ -0,0 +1,133 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test streaming of large transaction containing multiple subtransactions and rollbacks
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 4;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+ 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c) FROM test_tab");
+is($result, qq(2|0), 'check initial data was copied to subscriber');
+
+# large (streamed) transaction with DDL, DML and ROLLBACKs
+$node_publisher->safe_psql(
+ 'postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i);
+SAVEPOINT s1;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,1000) s(i);
+SAVEPOINT s2;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,1500) s(i);
+SAVEPOINT s3;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,2000) s(i);
+ROLLBACK TO s2;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001,2500) s(i);
+ROLLBACK TO s1;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2501,3000) s(i);
+SAVEPOINT s4;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3001,3500) s(i);
+SAVEPOINT s5;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3501,4000) s(i);
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c) FROM test_tab");
+is($result, qq(2000|0),
+ 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
+);
+
+# large (streamed) transaction with subscriber receiving out of order
+# subtransaction ROLLBACKs
+$node_publisher->safe_psql(
+ 'postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(4001,4500) s(i);
+SAVEPOINT s1;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001,5500) s(i);
+SAVEPOINT s2;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6001,6500) s(i);
+SAVEPOINT s3;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(7001,7500) s(i);
+RELEASE s2;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8001,8500) s(i);
+ROLLBACK TO s1;
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c) FROM test_tab");
+is($result, qq(2500|0),
+ 'check rollback to savepoint was reflected on subscriber');
+
+# large (streamed) transaction with subscriber receiving rollback
+$node_publisher->safe_psql(
+ 'postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8501,9000) s(i);
+SAVEPOINT s1;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9001,9500) s(i);
+SAVEPOINT s2;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9501,10000) s(i);
+ROLLBACK;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c) FROM test_tab");
+is($result, qq(2500|0), 'check rollback was reflected on subscriber');
+
+$node_subscriber->stop;
+$node_publisher->stop;
diff --git a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl
new file mode 100644
index 0000000..81149b8
--- /dev/null
+++ b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl
@@ -0,0 +1,87 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test streaming of large transaction with subtransactions, DDLs, DMLs, and
+# rollbacks
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 2;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+ 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c) FROM test_tab");
+is($result, qq(2|0), 'check initial data was copied to subscriber');
+
+# large (streamed) transaction with DDL, DML and ROLLBACKs
+$node_publisher->safe_psql(
+ 'postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i);
+ALTER TABLE test_tab ADD COLUMN c INT;
+SAVEPOINT s1;
+INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i);
+ALTER TABLE test_tab ADD COLUMN d INT;
+SAVEPOINT s2;
+INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i);
+ALTER TABLE test_tab ADD COLUMN e INT;
+SAVEPOINT s3;
+INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i);
+ALTER TABLE test_tab DROP COLUMN c;
+ROLLBACK TO s1;
+INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(501,1000) s(i);
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), count(c) FROM test_tab");
+is($result, qq(1000|500),
+ 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
+);
+
+$node_subscriber->stop;
+$node_publisher->stop;
diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
new file mode 100644
index 0000000..0e218e0
--- /dev/null
+++ b/src/test/subscription/t/020_messages.pl
@@ -0,0 +1,148 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Tests that logical decoding messages
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 5;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'autovacuum = off');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_test (a int primary key)");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_test (a int primary key)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE tab_test");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+);
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Ensure a transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
+
+# wait for the replication slot to become inactive in the publisher
+$node_publisher->poll_query_until(
+ 'postgres',
+ "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'tap_sub' AND active='f'",
+ 1);
+
+$node_publisher->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')"
+);
+
+my $result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT get_byte(data, 0)
+ FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+ 'proto_version', '1',
+ 'publication_names', 'tap_pub',
+ 'messages', 'true')
+));
+
+# 66 77 67 == B M C == BEGIN MESSAGE COMMIT
+is( $result, qq(66
+77
+67),
+ 'messages on slot are B M C with message option');
+
+$result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape')
+ FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+ 'proto_version', '1',
+ 'publication_names', 'tap_pub',
+ 'messages', 'true')
+ OFFSET 1 LIMIT 1
+));
+
+is($result, qq(1|pgoutput),
+ "flag transactional is set to 1 and prefix is pgoutput");
+
+$result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT get_byte(data, 0)
+ FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL,
+ 'proto_version', '1',
+ 'publication_names', 'tap_pub')
+));
+
+# 66 67 == B C == BEGIN COMMIT
+is( $result, qq(66
+67),
+ 'option messages defaults to false so message (M) is not available on slot'
+);
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)");
+
+my $message_lsn = $node_publisher->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')"
+);
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)");
+
+$result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT get_byte(data, 0), get_byte(data, 1)
+ FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL,
+ 'proto_version', '1',
+ 'publication_names', 'tap_pub',
+ 'messages', 'true')
+ WHERE lsn = '$message_lsn' AND xid = 0
+));
+
+is($result, qq(77|0), 'non-transactional message on slot is M');
+
+# Ensure a non-transactional logical decoding message shows up on the slot when
+# it is emitted within an aborted transaction. The message won't emit until
+# something advances the LSN, hence, we intentionally forces the server to
+# switch to a new WAL file.
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ BEGIN;
+ SELECT pg_logical_emit_message(false, 'pgoutput',
+ 'a non-transactional message is available even if the transaction is aborted 1');
+ INSERT INTO tab_test VALUES (3);
+ SELECT pg_logical_emit_message(true, 'pgoutput',
+ 'a transactional message is not available if the transaction is aborted');
+ SELECT pg_logical_emit_message(false, 'pgoutput',
+ 'a non-transactional message is available even if the transaction is aborted 2');
+ ROLLBACK;
+ SELECT pg_switch_wal();
+));
+
+$result = $node_publisher->safe_psql(
+ 'postgres', qq(
+ SELECT get_byte(data, 0), get_byte(data, 1)
+ FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+ 'proto_version', '1',
+ 'publication_names', 'tap_pub',
+ 'messages', 'true')
+));
+
+is( $result, qq(77|0
+77|0),
+ 'non-transactional message on slot from aborted transaction is M');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
diff --git a/src/test/subscription/t/021_alter_sub_pub.pl b/src/test/subscription/t/021_alter_sub_pub.pl
new file mode 100644
index 0000000..104eddb
--- /dev/null
+++ b/src/test/subscription/t/021_alter_sub_pub.pl
@@ -0,0 +1,98 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 3;
+
+# Initialize publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create table on publisher
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_1 (a int)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_1 SELECT generate_series(1,10)");
+
+# Create table on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_1 (a int)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub_1 FOR TABLE tab_1");
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub_2");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub_1, tap_pub_2"
+);
+
+# Wait for initial table sync to finish
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Check the initial data of tab_1 is copied to subscriber
+my $result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_1");
+is($result, qq(10|1|10), 'check initial data is copied to subscriber');
+
+# Create a new table on publisher
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_2 (a int)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_2 SELECT generate_series(1,10)");
+
+# Create a new table on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_2 (a int)");
+
+# Add the table to publication
+$node_publisher->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub_2 ADD TABLE tab_2");
+
+# Dropping tap_pub_1 will refresh the entire publication list
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub DROP PUBLICATION tap_pub_1");
+
+# Wait for initial table sync to finish
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Check the initial data of tab_drop_refresh was copied to subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_2");
+is($result, qq(10|1|10), 'check initial data is copied to subscriber');
+
+# Re-adding tap_pub_1 will refresh the entire publication list
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tap_sub ADD PUBLICATION tap_pub_1");
+
+# Wait for initial table sync to finish
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Check the initial data of tab_1 was copied to subscriber again
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT count(*), min(a), max(a) FROM tab_1");
+is($result, qq(20|1|10), 'check initial data is copied to subscriber');
+
+# shutdown
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
new file mode 100644
index 0000000..424ffb7
--- /dev/null
+++ b/src/test/subscription/t/100_bugs.pl
@@ -0,0 +1,308 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Tests for various bugs found over time
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 7;
+
+# Bug #15114
+
+# The bug was that determining which columns are part of the replica
+# identity index using RelationGetIndexAttrBitmap() would run
+# eval_const_expressions() on index expressions and predicates across
+# all indexes of the table, which in turn might require a snapshot,
+# but there wasn't one set, so it crashes. There were actually two
+# separate bugs, one on the publisher and one on the subscriber. The
+# fix was to avoid the constant expressions simplification in
+# RelationGetIndexAttrBitmap(), so it's safe to call in more contexts.
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY, b int)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE FUNCTION double(x int) RETURNS int IMMUTABLE LANGUAGE SQL AS 'select x * 2'"
+);
+
+# an index with a predicate that lends itself to constant expressions
+# evaluation
+$node_publisher->safe_psql('postgres',
+ "CREATE INDEX ON tab1 (b) WHERE a > double(1)");
+
+# and the same setup on the subscriber
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab1 (a int PRIMARY KEY, b int)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE FUNCTION double(x int) RETURNS int IMMUTABLE LANGUAGE SQL AS 'select x * 2'"
+);
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE INDEX ON tab1 (b) WHERE a > double(1)");
+
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub1 FOR ALL TABLES");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"
+);
+
+$node_publisher->wait_for_catchup('sub1');
+
+# This would crash, first on the publisher, and then (if the publisher
+# is fixed) on the subscriber.
+$node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1, 2)");
+
+$node_publisher->wait_for_catchup('sub1');
+
+pass('index predicates do not cause crash');
+
+$node_publisher->stop('fast');
+$node_subscriber->stop('fast');
+
+
+# Handling of temporary and unlogged tables with FOR ALL TABLES publications
+
+# If a FOR ALL TABLES publication exists, temporary and unlogged
+# tables are ignored for publishing changes. The bug was that we
+# would still check in that case that such a table has a replica
+# identity set before accepting updates. If it did not it would cause
+# an error when an update was attempted.
+
+$node_publisher = get_new_node('publisher2');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub FOR ALL TABLES");
+
+is( $node_publisher->psql(
+ 'postgres',
+ "CREATE TEMPORARY TABLE tt1 AS SELECT 1 AS a; UPDATE tt1 SET a = 2;"),
+ 0,
+ 'update to temporary table without replica identity with FOR ALL TABLES publication'
+);
+
+is( $node_publisher->psql(
+ 'postgres',
+ "CREATE UNLOGGED TABLE tu1 AS SELECT 1 AS a; UPDATE tu1 SET a = 2;"),
+ 0,
+ 'update to unlogged table without replica identity with FOR ALL TABLES publication'
+);
+
+$node_publisher->stop('fast');
+
+# Bug #16643 - https://postgr.es/m/16643-eaadeb2a1a58d28c@postgresql.org
+#
+# Initial sync doesn't complete; the protocol was not being followed per
+# expectations after commit 07082b08cc5d.
+my $node_twoways = get_new_node('twoways');
+$node_twoways->init(allows_streaming => 'logical');
+$node_twoways->start;
+for my $db (qw(d1 d2))
+{
+ $node_twoways->safe_psql('postgres', "CREATE DATABASE $db");
+ $node_twoways->safe_psql($db, "CREATE TABLE t (f int)");
+ $node_twoways->safe_psql($db, "CREATE TABLE t2 (f int)");
+}
+
+my $rows = 3000;
+$node_twoways->safe_psql(
+ 'd1', qq{
+ INSERT INTO t SELECT * FROM generate_series(1, $rows);
+ INSERT INTO t2 SELECT * FROM generate_series(1, $rows);
+ CREATE PUBLICATION testpub FOR TABLE t;
+ SELECT pg_create_logical_replication_slot('testslot', 'pgoutput');
+ });
+
+$node_twoways->safe_psql('d2',
+ "CREATE SUBSCRIPTION testsub CONNECTION \$\$"
+ . $node_twoways->connstr('d1')
+ . "\$\$ PUBLICATION testpub WITH (create_slot=false, "
+ . "slot_name='testslot')");
+$node_twoways->safe_psql(
+ 'd1', qq{
+ INSERT INTO t SELECT * FROM generate_series(1, $rows);
+ INSERT INTO t2 SELECT * FROM generate_series(1, $rows);
+ });
+$node_twoways->safe_psql('d1', 'ALTER PUBLICATION testpub ADD TABLE t2');
+$node_twoways->safe_psql('d2',
+ 'ALTER SUBSCRIPTION testsub REFRESH PUBLICATION');
+
+# We cannot rely solely on wait_for_catchup() here; it isn't sufficient
+# when tablesync workers might still be running. So in addition to that,
+# verify that tables are synced.
+# XXX maybe this should be integrated in wait_for_catchup() itself.
+$node_twoways->wait_for_catchup('testsub');
+my $synced_query =
+ "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_twoways->poll_query_until('d2', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
+ $rows * 2, "2x$rows rows in t");
+is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t2"),
+ $rows * 2, "2x$rows rows in t2");
+
+# Verify table data is synced with cascaded replication setup. This is mainly
+# to test whether the data written by tablesync worker gets replicated.
+my $node_pub = get_new_node('testpublisher1');
+$node_pub->init(allows_streaming => 'logical');
+$node_pub->start;
+
+my $node_pub_sub = get_new_node('testpublisher_subscriber');
+$node_pub_sub->init(allows_streaming => 'logical');
+$node_pub_sub->start;
+
+my $node_sub = get_new_node('testsubscriber1');
+$node_sub->init(allows_streaming => 'logical');
+$node_sub->start;
+
+# Create the tables in all nodes.
+$node_pub->safe_psql('postgres', "CREATE TABLE tab1 (a int)");
+$node_pub_sub->safe_psql('postgres', "CREATE TABLE tab1 (a int)");
+$node_sub->safe_psql('postgres', "CREATE TABLE tab1 (a int)");
+
+# Create a cascaded replication setup like:
+# N1 - Create publication testpub1.
+# N2 - Create publication testpub2 and also include subscriber which subscribes
+# to testpub1.
+# N3 - Create subscription testsub2 subscribes to testpub2.
+#
+# Note that subscription on N3 needs to be created before subscription on N2 to
+# test whether the data written by tablesync worker of N2 gets replicated.
+$node_pub->safe_psql('postgres',
+ "CREATE PUBLICATION testpub1 FOR TABLE tab1");
+
+$node_pub_sub->safe_psql('postgres',
+ "CREATE PUBLICATION testpub2 FOR TABLE tab1");
+
+my $publisher1_connstr = $node_pub->connstr . ' dbname=postgres';
+my $publisher2_connstr = $node_pub_sub->connstr . ' dbname=postgres';
+
+$node_sub->safe_psql('postgres',
+ "CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher2_connstr' PUBLICATION testpub2"
+);
+
+$node_pub_sub->safe_psql('postgres',
+ "CREATE SUBSCRIPTION testsub1 CONNECTION '$publisher1_connstr' PUBLICATION testpub1"
+);
+
+$node_pub->safe_psql('postgres',
+ "INSERT INTO tab1 values(generate_series(1,10))");
+
+# Verify that the data is cascaded from testpub1 to testsub1 and further from
+# testpub2 (which had testsub1) to testsub2.
+$node_pub->wait_for_catchup('testsub1');
+$node_pub_sub->wait_for_catchup('testsub2');
+
+# Drop subscriptions as we don't need them anymore
+$node_pub_sub->safe_psql('postgres', "DROP SUBSCRIPTION testsub1");
+$node_sub->safe_psql('postgres', "DROP SUBSCRIPTION testsub2");
+
+# Drop publications as we don't need them anymore
+$node_pub->safe_psql('postgres', "DROP PUBLICATION testpub1");
+$node_pub_sub->safe_psql('postgres', "DROP PUBLICATION testpub2");
+
+# Clean up the tables on both publisher and subscriber as we don't need them
+$node_pub->safe_psql('postgres', "DROP TABLE tab1");
+$node_pub_sub->safe_psql('postgres', "DROP TABLE tab1");
+$node_sub->safe_psql('postgres', "DROP TABLE tab1");
+
+$node_pub->stop('fast');
+$node_pub_sub->stop('fast');
+$node_sub->stop('fast');
+
+# https://postgr.es/m/OS0PR01MB61133CA11630DAE45BC6AD95FB939%40OS0PR01MB6113.jpnprd01.prod.outlook.com
+
+# The bug was that when changing the REPLICA IDENTITY INDEX to another one, the
+# target table's relcache was not being invalidated. This leads to skipping
+# UPDATE/DELETE operations during apply on the subscriber side as the columns
+# required to search corresponding rows won't get logged.
+$node_publisher = get_new_node('publisher3');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+$node_subscriber = get_new_node('subscriber3');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE tab_replidentity_index(a int not null, b int not null)");
+$node_publisher->safe_psql('postgres',
+ "CREATE UNIQUE INDEX idx_replidentity_index_a ON tab_replidentity_index(a)"
+);
+$node_publisher->safe_psql('postgres',
+ "CREATE UNIQUE INDEX idx_replidentity_index_b ON tab_replidentity_index(b)"
+);
+
+# use index idx_replidentity_index_a as REPLICA IDENTITY on publisher.
+$node_publisher->safe_psql('postgres',
+ "ALTER TABLE tab_replidentity_index REPLICA IDENTITY USING INDEX idx_replidentity_index_a"
+);
+
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tab_replidentity_index VALUES(1, 1),(2, 2)");
+
+$node_subscriber->safe_psql('postgres',
+ "CREATE TABLE tab_replidentity_index(a int not null, b int not null)");
+$node_subscriber->safe_psql('postgres',
+ "CREATE UNIQUE INDEX idx_replidentity_index_a ON tab_replidentity_index(a)"
+);
+$node_subscriber->safe_psql('postgres',
+ "CREATE UNIQUE INDEX idx_replidentity_index_b ON tab_replidentity_index(b)"
+);
+# use index idx_replidentity_index_b as REPLICA IDENTITY on subscriber because
+# it reflects the future scenario we are testing: changing REPLICA IDENTITY
+# INDEX.
+$node_subscriber->safe_psql('postgres',
+ "ALTER TABLE tab_replidentity_index REPLICA IDENTITY USING INDEX idx_replidentity_index_b"
+);
+
+$publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION tap_pub FOR TABLE tab_replidentity_index");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+);
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# Also wait for initial table sync to finish
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+is( $node_subscriber->safe_psql(
+ 'postgres', "SELECT * FROM tab_replidentity_index"),
+ qq(1|1
+2|2),
+ "check initial data on subscriber");
+
+# Set REPLICA IDENTITY to idx_replidentity_index_b on publisher, then run UPDATE and DELETE.
+$node_publisher->safe_psql(
+ 'postgres', qq[
+ ALTER TABLE tab_replidentity_index REPLICA IDENTITY USING INDEX idx_replidentity_index_b;
+ UPDATE tab_replidentity_index SET a = -a WHERE a = 1;
+ DELETE FROM tab_replidentity_index WHERE a = 2;
+]);
+
+$node_publisher->wait_for_catchup('tap_sub');
+is( $node_subscriber->safe_psql(
+ 'postgres', "SELECT * FROM tab_replidentity_index"),
+ qq(-1|1),
+ "update works with REPLICA IDENTITY");
+
+$node_publisher->stop('fast');
+$node_subscriber->stop('fast');