# Copyright (c) 2022, PostgreSQL Global Development Group # Test partial-column publication of tables use strict; use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; # create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); $node_publisher->start; # create subscriber node my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init(allows_streaming => 'logical'); $node_subscriber->append_conf('postgresql.conf', qq(max_logical_replication_workers = 6)); $node_subscriber->start; my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; my $offset = 0; # setup tables on both nodes # tab1: simple 1:1 replication $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE tab1 (a int PRIMARY KEY, "B" int, c int) )); $node_subscriber->safe_psql( 'postgres', qq( CREATE TABLE tab1 (a int PRIMARY KEY, "B" int, c int) )); # tab2: replication from regular to table with fewer columns $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int); )); $node_subscriber->safe_psql( 'postgres', qq( CREATE TABLE tab2 (a int PRIMARY KEY, b varchar) )); # tab3: simple 1:1 replication with weird column names $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE tab3 ("a'" int PRIMARY KEY, "B" varchar, "c'" int) )); $node_subscriber->safe_psql( 'postgres', qq( CREATE TABLE tab3 ("a'" int PRIMARY KEY, "c'" int) )); # test_part: partitioned tables, with partitioning (including multi-level # partitioning, and fewer columns on the subscriber) $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a); CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3,4,5,6); CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (7,8,9,10,11,12) PARTITION BY LIST (a); CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (7,8,9,10); )); $node_subscriber->safe_psql( 'postgres', qq( CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a); CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3,4,5,6); CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (7,8,9,10,11,12) PARTITION BY LIST (a); CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (7,8,9,10); )); # tab4: table with user-defined enum types $node_publisher->safe_psql( 'postgres', qq( CREATE TYPE test_typ AS ENUM ('blue', 'red'); CREATE TABLE tab4 (a INT PRIMARY KEY, b test_typ, c int, d text); )); $node_subscriber->safe_psql( 'postgres', qq( CREATE TYPE test_typ AS ENUM ('blue', 'red'); CREATE TABLE tab4 (a INT PRIMARY KEY, b test_typ, d text); )); # TEST: create publication and subscription for some of the tables with # column lists $node_publisher->safe_psql( 'postgres', qq( CREATE PUBLICATION pub1 FOR TABLE tab1 (a, "B"), tab3 ("a'", "c'"), test_part (a, b), tab4 (a, b, d) WITH (publish_via_partition_root = 'true'); )); # check that we got the right prattrs values for the publication in the # pg_publication_rel catalog (order by relname, to get stable ordering) my $result = $node_publisher->safe_psql( 'postgres', qq( SELECT relname, prattrs FROM pg_publication_rel pb JOIN pg_class pc ON(pb.prrelid = pc.oid) ORDER BY relname )); is( $result, qq(tab1|1 2 tab3|1 3 tab4|1 2 4 test_part|1 2), 'publication relation updated'); # TEST: insert data into the tables, create subscription and see if sync # replicates the right columns $node_publisher->safe_psql( 'postgres', qq( INSERT INTO tab1 VALUES (1, 2, 3); INSERT INTO tab1 VALUES (4, 5, 6); )); $node_publisher->safe_psql( 'postgres', qq( INSERT INTO tab3 VALUES (1, 2, 3); INSERT INTO tab3 VALUES (4, 5, 6); )); $node_publisher->safe_psql( 'postgres', qq( INSERT INTO tab4 VALUES (1, 'red', 3, 'oh my'); INSERT INTO tab4 VALUES (2, 'blue', 4, 'hello'); )); # replication of partitioned table $node_publisher->safe_psql( 'postgres', qq( INSERT INTO test_part VALUES (1, 'abc', '2021-07-04 12:00:00'); INSERT INTO test_part VALUES (2, 'bcd', '2021-07-03 11:12:13'); INSERT INTO test_part VALUES (7, 'abc', '2021-07-04 12:00:00'); INSERT INTO test_part VALUES (8, 'bcd', '2021-07-03 11:12:13'); )); # create subscription for the publication, wait for sync to complete, # then check the sync results $node_subscriber->safe_psql( 'postgres', qq( CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1 )); $node_subscriber->wait_for_subscription_sync; # tab1: only (a,b) is replicated $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab1 ORDER BY a"); is( $result, qq(1|2| 4|5|), 'insert on column tab1.c is not replicated'); # tab3: only (a,c) is replicated $result = $node_subscriber->safe_psql('postgres', qq(SELECT * FROM tab3 ORDER BY "a'")); is( $result, qq(1|3 4|6), 'insert on column tab3.b is not replicated'); # tab4: only (a,b,d) is replicated $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab4 ORDER BY a"); is( $result, qq(1|red|oh my 2|blue|hello), 'insert on column tab4.c is not replicated'); # test_part: (a,b) is replicated $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM test_part ORDER BY a"); is( $result, qq(1|abc 2|bcd 7|abc 8|bcd), 'insert on column test_part.c columns is not replicated'); # TEST: now insert more data into the tables, and wait until we replicate # them (not by tablesync, but regular decoding and replication) $node_publisher->safe_psql( 'postgres', qq( INSERT INTO tab1 VALUES (2, 3, 4); INSERT INTO tab1 VALUES (5, 6, 7); )); $node_publisher->safe_psql( 'postgres', qq( INSERT INTO tab3 VALUES (2, 3, 4); INSERT INTO tab3 VALUES (5, 6, 7); )); $node_publisher->safe_psql( 'postgres', qq( INSERT INTO tab4 VALUES (3, 'red', 5, 'foo'); INSERT INTO tab4 VALUES (4, 'blue', 6, 'bar'); )); # replication of partitioned table $node_publisher->safe_psql( 'postgres', qq( INSERT INTO test_part VALUES (3, 'xxx', '2022-02-01 10:00:00'); INSERT INTO test_part VALUES (4, 'yyy', '2022-03-02 15:12:13'); INSERT INTO test_part VALUES (9, 'zzz', '2022-04-03 21:00:00'); INSERT INTO test_part VALUES (10, 'qqq', '2022-05-04 22:12:13'); )); # wait for catchup before checking the subscriber $node_publisher->wait_for_catchup('sub1'); # tab1: only (a,b) is replicated $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab1 ORDER BY a"); is( $result, qq(1|2| 2|3| 4|5| 5|6|), 'insert on column tab1.c is not replicated'); # tab3: only (a,c) is replicated $result = $node_subscriber->safe_psql('postgres', qq(SELECT * FROM tab3 ORDER BY "a'")); is( $result, qq(1|3 2|4 4|6 5|7), 'insert on column tab3.b is not replicated'); # tab4: only (a,b,d) is replicated $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab4 ORDER BY a"); is( $result, qq(1|red|oh my 2|blue|hello 3|red|foo 4|blue|bar), 'insert on column tab4.c is not replicated'); # test_part: (a,b) is replicated $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM test_part ORDER BY a"); is( $result, qq(1|abc 2|bcd 3|xxx 4|yyy 7|abc 8|bcd 9|zzz 10|qqq), 'insert on column test_part.c columns is not replicated'); # TEST: do some updates on some of the tables, both on columns included # in the column list and other # tab1: update of replicated column $node_publisher->safe_psql('postgres', qq(UPDATE tab1 SET "B" = 2 * "B" where a = 1)); # tab1: update of non-replicated column $node_publisher->safe_psql('postgres', qq(UPDATE tab1 SET c = 2*c where a = 4)); # tab3: update of non-replicated $node_publisher->safe_psql('postgres', qq(UPDATE tab3 SET "B" = "B" || ' updated' where "a'" = 4)); # tab3: update of replicated column $node_publisher->safe_psql('postgres', qq(UPDATE tab3 SET "c'" = 2 * "c'" where "a'" = 1)); # tab4 $node_publisher->safe_psql('postgres', qq(UPDATE tab4 SET b = 'blue', c = c * 2, d = d || ' updated' where a = 1) ); # tab4 $node_publisher->safe_psql('postgres', qq(UPDATE tab4 SET b = 'red', c = c * 2, d = d || ' updated' where a = 2) ); # wait for the replication to catch up, and check the UPDATE results got # replicated correctly, with the right column list $node_publisher->wait_for_catchup('sub1'); $result = $node_subscriber->safe_psql('postgres', qq(SELECT * FROM tab1 ORDER BY a)); is( $result, qq(1|4| 2|3| 4|5| 5|6|), 'only update on column tab1.b is replicated'); $result = $node_subscriber->safe_psql('postgres', qq(SELECT * FROM tab3 ORDER BY "a'")); is( $result, qq(1|6 2|4 4|6 5|7), 'only update on column tab3.c is replicated'); $result = $node_subscriber->safe_psql('postgres', qq(SELECT * FROM tab4 ORDER BY a)); is( $result, qq(1|blue|oh my updated 2|red|hello updated 3|red|foo 4|blue|bar), 'update on column tab4.c is not replicated'); # TEST: add table with a column list, insert data, replicate # insert some data before adding it to the publication $node_publisher->safe_psql( 'postgres', qq( INSERT INTO tab2 VALUES (1, 'abc', 3); )); $node_publisher->safe_psql('postgres', "ALTER PUBLICATION pub1 ADD TABLE tab2 (a, b)"); $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION"); # wait for the tablesync to complete, add a bit more data and then check # the results of the replication $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( INSERT INTO tab2 VALUES (2, 'def', 6); )); $node_publisher->wait_for_catchup('sub1'); $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab2 ORDER BY a"); is( $result, qq(1|abc 2|def), 'insert on column tab2.c is not replicated'); # do a couple updates, check the correct stuff gets replicated $node_publisher->safe_psql( 'postgres', qq( UPDATE tab2 SET c = 5 where a = 1; UPDATE tab2 SET b = 'xyz' where a = 2; )); $node_publisher->wait_for_catchup('sub1'); $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab2 ORDER BY a"); is( $result, qq(1|abc 2|xyz), 'update on column tab2.c is not replicated'); # TEST: add a table to two publications with same column lists, and # create a single subscription replicating both publications $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE tab5 (a int PRIMARY KEY, b int, c int, d int); CREATE PUBLICATION pub2 FOR TABLE tab5 (a, b); CREATE PUBLICATION pub3 FOR TABLE tab5 (a, b); -- insert a couple initial records INSERT INTO tab5 VALUES (1, 11, 111, 1111); INSERT INTO tab5 VALUES (2, 22, 222, 2222); )); $node_subscriber->safe_psql( 'postgres', qq( CREATE TABLE tab5 (a int PRIMARY KEY, b int, d int); )); $node_subscriber->safe_psql( 'postgres', qq( DROP SUBSCRIPTION sub1; CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub2, pub3 )); $node_subscriber->wait_for_subscription_sync($node_publisher, 'sub1'); # insert data and make sure the columns in column list get fully replicated $node_publisher->safe_psql( 'postgres', qq( INSERT INTO tab5 VALUES (3, 33, 333, 3333); INSERT INTO tab5 VALUES (4, 44, 444, 4444); )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab5 ORDER BY a"), qq(1|11| 2|22| 3|33| 4|44|), 'overlapping publications with overlapping column lists'); # TEST: create a table with a column list, then change the replica # identity by replacing a primary key (but use a different column in # the column list) $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE tab6 (a int PRIMARY KEY, b int, c int, d int); CREATE PUBLICATION pub4 FOR TABLE tab6 (a, b); -- initial data INSERT INTO tab6 VALUES (1, 22, 333, 4444); )); $node_subscriber->safe_psql( 'postgres', qq( CREATE TABLE tab6 (a int PRIMARY KEY, b int, c int, d int); )); $node_subscriber->safe_psql( 'postgres', qq( DROP SUBSCRIPTION sub1; CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub4 )); $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( INSERT INTO tab6 VALUES (2, 33, 444, 5555); UPDATE tab6 SET b = b * 2, c = c * 3, d = d * 4; )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab6 ORDER BY a"), qq(1|44|| 2|66||), 'replication with the original primary key'); # now redefine the constraint - move the primary key to a different column # (which is still covered by the column list, though) $node_publisher->safe_psql( 'postgres', qq( ALTER TABLE tab6 DROP CONSTRAINT tab6_pkey; ALTER TABLE tab6 ADD PRIMARY KEY (b); )); # we need to do the same thing on the subscriber # XXX What would happen if this happens before the publisher ALTER? Or # interleaved, somehow? But that seems unrelated to column lists. $node_subscriber->safe_psql( 'postgres', qq( ALTER TABLE tab6 DROP CONSTRAINT tab6_pkey; ALTER TABLE tab6 ADD PRIMARY KEY (b); )); $node_subscriber->safe_psql( 'postgres', qq( ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION )); $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( INSERT INTO tab6 VALUES (3, 55, 666, 8888); UPDATE tab6 SET b = b * 2, c = c * 3, d = d * 4; )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab6 ORDER BY a"), qq(1|88|| 2|132|| 3|110||), 'replication with the modified primary key'); # TEST: create a table with a column list, then change the replica # identity by replacing a primary key with a key on multiple columns # (all of them covered by the column list) $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE tab7 (a int PRIMARY KEY, b int, c int, d int); CREATE PUBLICATION pub5 FOR TABLE tab7 (a, b); -- some initial data INSERT INTO tab7 VALUES (1, 22, 333, 4444); )); $node_subscriber->safe_psql( 'postgres', qq( CREATE TABLE tab7 (a int PRIMARY KEY, b int, c int, d int); )); $node_subscriber->safe_psql( 'postgres', qq( DROP SUBSCRIPTION sub1; CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub5 )); $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( INSERT INTO tab7 VALUES (2, 33, 444, 5555); UPDATE tab7 SET b = b * 2, c = c * 3, d = d * 4; )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab7 ORDER BY a"), qq(1|44|| 2|66||), 'replication with the original primary key'); # now redefine the constraint - move the primary key to a different column # (which is not covered by the column list) $node_publisher->safe_psql( 'postgres', qq( ALTER TABLE tab7 DROP CONSTRAINT tab7_pkey; ALTER TABLE tab7 ADD PRIMARY KEY (a, b); )); $node_publisher->safe_psql( 'postgres', qq( INSERT INTO tab7 VALUES (3, 55, 666, 7777); UPDATE tab7 SET b = b * 2, c = c * 3, d = d * 4; )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab7 ORDER BY a"), qq(1|88|| 2|132|| 3|110||), 'replication with the modified primary key'); # now switch the primary key again to another columns not covered by the # column list, but also generate writes between the drop and creation # of the new constraint $node_publisher->safe_psql( 'postgres', qq( ALTER TABLE tab7 DROP CONSTRAINT tab7_pkey; INSERT INTO tab7 VALUES (4, 77, 888, 9999); -- update/delete is not allowed for tables without RI ALTER TABLE tab7 ADD PRIMARY KEY (b, a); UPDATE tab7 SET b = b * 2, c = c * 3, d = d * 4; DELETE FROM tab7 WHERE a = 1; )); $node_publisher->safe_psql( 'postgres', qq( )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM tab7 ORDER BY a"), qq(2|264|| 3|220|| 4|154||), 'replication with the modified primary key'); # TEST: partitioned tables (with publish_via_partition_root = false) # and replica identity. The (leaf) partitions may have different RI, so # we need to check the partition RI (with respect to the column list) # while attaching the partition. # First, let's create a partitioned table with two partitions, each with # a different RI, but a column list not covering all those RI. $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE test_part_a (a int, b int, c int) PARTITION BY LIST (a); CREATE TABLE test_part_a_1 PARTITION OF test_part_a FOR VALUES IN (1,2,3,4,5); ALTER TABLE test_part_a_1 ADD PRIMARY KEY (a); ALTER TABLE test_part_a_1 REPLICA IDENTITY USING INDEX test_part_a_1_pkey; CREATE TABLE test_part_a_2 PARTITION OF test_part_a FOR VALUES IN (6,7,8,9,10); ALTER TABLE test_part_a_2 ADD PRIMARY KEY (b); ALTER TABLE test_part_a_2 REPLICA IDENTITY USING INDEX test_part_a_2_pkey; -- initial data, one row in each partition INSERT INTO test_part_a VALUES (1, 3); INSERT INTO test_part_a VALUES (6, 4); )); # do the same thing on the subscriber (with the opposite column order) $node_subscriber->safe_psql( 'postgres', qq( CREATE TABLE test_part_a (b int, a int) PARTITION BY LIST (a); CREATE TABLE test_part_a_1 PARTITION OF test_part_a FOR VALUES IN (1,2,3,4,5); ALTER TABLE test_part_a_1 ADD PRIMARY KEY (a); ALTER TABLE test_part_a_1 REPLICA IDENTITY USING INDEX test_part_a_1_pkey; CREATE TABLE test_part_a_2 PARTITION OF test_part_a FOR VALUES IN (6,7,8,9,10); ALTER TABLE test_part_a_2 ADD PRIMARY KEY (b); ALTER TABLE test_part_a_2 REPLICA IDENTITY USING INDEX test_part_a_2_pkey; )); # create a publication replicating just the column "a", which is not enough # for the second partition $node_publisher->safe_psql( 'postgres', qq( CREATE PUBLICATION pub6 FOR TABLE test_part_a (b, a) WITH (publish_via_partition_root = true); ALTER PUBLICATION pub6 ADD TABLE test_part_a_1 (a); ALTER PUBLICATION pub6 ADD TABLE test_part_a_2 (b); )); # create the subscription for the above publication, wait for sync to # complete $node_subscriber->safe_psql( 'postgres', qq( DROP SUBSCRIPTION sub1; CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub6 )); $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( INSERT INTO test_part_a VALUES (2, 5); INSERT INTO test_part_a VALUES (7, 6); )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql( 'postgres', "SELECT a, b FROM test_part_a ORDER BY a, b"), qq(1|3 2|5 6|4 7|6), 'partitions with different replica identities not replicated correctly'); # This time start with a column list covering RI for all partitions, but # then update the column list to not cover column "b" (needed by the # second partition) $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE test_part_b (a int, b int) PARTITION BY LIST (a); CREATE TABLE test_part_b_1 PARTITION OF test_part_b FOR VALUES IN (1,2,3,4,5); ALTER TABLE test_part_b_1 ADD PRIMARY KEY (a); ALTER TABLE test_part_b_1 REPLICA IDENTITY USING INDEX test_part_b_1_pkey; CREATE TABLE test_part_b_2 PARTITION OF test_part_b FOR VALUES IN (6,7,8,9,10); ALTER TABLE test_part_b_2 ADD PRIMARY KEY (b); ALTER TABLE test_part_b_2 REPLICA IDENTITY USING INDEX test_part_b_2_pkey; -- initial data, one row in each partitions INSERT INTO test_part_b VALUES (1, 1); INSERT INTO test_part_b VALUES (6, 2); )); # do the same thing on the subscriber $node_subscriber->safe_psql( 'postgres', qq( CREATE TABLE test_part_b (a int, b int) PARTITION BY LIST (a); CREATE TABLE test_part_b_1 PARTITION OF test_part_b FOR VALUES IN (1,2,3,4,5); ALTER TABLE test_part_b_1 ADD PRIMARY KEY (a); ALTER TABLE test_part_b_1 REPLICA IDENTITY USING INDEX test_part_b_1_pkey; CREATE TABLE test_part_b_2 PARTITION OF test_part_b FOR VALUES IN (6,7,8,9,10); ALTER TABLE test_part_b_2 ADD PRIMARY KEY (b); ALTER TABLE test_part_b_2 REPLICA IDENTITY USING INDEX test_part_b_2_pkey; )); # create a publication replicating both columns, which is sufficient for # both partitions $node_publisher->safe_psql( 'postgres', qq( CREATE PUBLICATION pub7 FOR TABLE test_part_b (a, b) WITH (publish_via_partition_root = true); )); # create the subscription for the above publication, wait for sync to # complete $node_subscriber->safe_psql( 'postgres', qq( DROP SUBSCRIPTION sub1; CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub7 )); $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( INSERT INTO test_part_b VALUES (2, 3); INSERT INTO test_part_b VALUES (7, 4); )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql( 'postgres', "SELECT * FROM test_part_b ORDER BY a, b"), qq(1|1 2|3 6|2 7|4), 'partitions with different replica identities not replicated correctly'); # TEST: This time start with a column list covering RI for all partitions, # but then update RI for one of the partitions to not be covered by the # column list anymore. $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE test_part_c (a int, b int, c int) PARTITION BY LIST (a); CREATE TABLE test_part_c_1 PARTITION OF test_part_c FOR VALUES IN (1,3); ALTER TABLE test_part_c_1 ADD PRIMARY KEY (a); ALTER TABLE test_part_c_1 REPLICA IDENTITY USING INDEX test_part_c_1_pkey; CREATE TABLE test_part_c_2 PARTITION OF test_part_c FOR VALUES IN (2,4); ALTER TABLE test_part_c_2 ADD PRIMARY KEY (b); ALTER TABLE test_part_c_2 REPLICA IDENTITY USING INDEX test_part_c_2_pkey; -- initial data, one row for each partition INSERT INTO test_part_c VALUES (1, 3, 5); INSERT INTO test_part_c VALUES (2, 4, 6); )); # do the same thing on the subscriber $node_subscriber->safe_psql( 'postgres', qq( CREATE TABLE test_part_c (a int, b int, c int) PARTITION BY LIST (a); CREATE TABLE test_part_c_1 PARTITION OF test_part_c FOR VALUES IN (1,3); ALTER TABLE test_part_c_1 ADD PRIMARY KEY (a); ALTER TABLE test_part_c_1 REPLICA IDENTITY USING INDEX test_part_c_1_pkey; CREATE TABLE test_part_c_2 PARTITION OF test_part_c FOR VALUES IN (2,4); ALTER TABLE test_part_c_2 ADD PRIMARY KEY (b); ALTER TABLE test_part_c_2 REPLICA IDENTITY USING INDEX test_part_c_2_pkey; )); # create a publication replicating data through partition root, with a column # list on the root, and then add the partitions one by one with separate # column lists (but those are not applied) $node_publisher->safe_psql( 'postgres', qq( CREATE PUBLICATION pub8 FOR TABLE test_part_c WITH (publish_via_partition_root = false); ALTER PUBLICATION pub8 ADD TABLE test_part_c_1 (a,c); ALTER PUBLICATION pub8 ADD TABLE test_part_c_2 (a,b); )); # create the subscription for the above publication, wait for sync to # complete $node_subscriber->safe_psql( 'postgres', qq( DROP SUBSCRIPTION sub1; CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub8; )); $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( INSERT INTO test_part_c VALUES (3, 7, 8); INSERT INTO test_part_c VALUES (4, 9, 10); )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql( 'postgres', "SELECT * FROM test_part_c ORDER BY a, b"), qq(1||5 2|4| 3||8 4|9|), 'partitions with different replica identities not replicated correctly'); # create a publication not replicating data through partition root, without # a column list on the root, and then add the partitions one by one with # separate column lists $node_publisher->safe_psql( 'postgres', qq( DROP PUBLICATION pub8; CREATE PUBLICATION pub8 FOR TABLE test_part_c WITH (publish_via_partition_root = false); ALTER PUBLICATION pub8 ADD TABLE test_part_c_1 (a); ALTER PUBLICATION pub8 ADD TABLE test_part_c_2 (a,b); )); # add the publication to our subscription, wait for sync to complete $node_subscriber->safe_psql( 'postgres', qq( ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION; TRUNCATE test_part_c; )); $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( TRUNCATE test_part_c; INSERT INTO test_part_c VALUES (1, 3, 5); INSERT INTO test_part_c VALUES (2, 4, 6); )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql( 'postgres', "SELECT * FROM test_part_c ORDER BY a, b"), qq(1|| 2|4|), 'partitions with different replica identities not replicated correctly'); # TEST: Start with a single partition, with RI compatible with the column # list, and then attach a partition with incompatible RI. $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE test_part_d (a int, b int) PARTITION BY LIST (a); CREATE TABLE test_part_d_1 PARTITION OF test_part_d FOR VALUES IN (1,3); ALTER TABLE test_part_d_1 ADD PRIMARY KEY (a); ALTER TABLE test_part_d_1 REPLICA IDENTITY USING INDEX test_part_d_1_pkey; INSERT INTO test_part_d VALUES (1, 2); )); # do the same thing on the subscriber (in fact, create both partitions right # away, no need to delay that) $node_subscriber->safe_psql( 'postgres', qq( CREATE TABLE test_part_d (a int, b int) PARTITION BY LIST (a); CREATE TABLE test_part_d_1 PARTITION OF test_part_d FOR VALUES IN (1,3); ALTER TABLE test_part_d_1 ADD PRIMARY KEY (a); ALTER TABLE test_part_d_1 REPLICA IDENTITY USING INDEX test_part_d_1_pkey; CREATE TABLE test_part_d_2 PARTITION OF test_part_d FOR VALUES IN (2,4); ALTER TABLE test_part_d_2 ADD PRIMARY KEY (a); ALTER TABLE test_part_d_2 REPLICA IDENTITY USING INDEX test_part_d_2_pkey; )); # create a publication replicating both columns, which is sufficient for # both partitions $node_publisher->safe_psql( 'postgres', qq( CREATE PUBLICATION pub9 FOR TABLE test_part_d (a) WITH (publish_via_partition_root = true); )); # create the subscription for the above publication, wait for sync to # complete $node_subscriber->safe_psql( 'postgres', qq( DROP SUBSCRIPTION sub1; CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub9 )); $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( INSERT INTO test_part_d VALUES (3, 4); )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql( 'postgres', "SELECT * FROM test_part_d ORDER BY a, b"), qq(1| 3|), 'partitions with different replica identities not replicated correctly'); # TEST: With a table included in the publications which is FOR ALL TABLES, it # means replicate all columns. # drop unnecessary tables, so as not to interfere with the FOR ALL TABLES $node_publisher->safe_psql( 'postgres', qq( DROP TABLE tab1, tab2, tab3, tab4, tab5, tab6, tab7, test_part, test_part_a, test_part_b, test_part_c, test_part_d; )); $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE test_mix_2 (a int PRIMARY KEY, b int, c int); CREATE PUBLICATION pub_mix_3 FOR TABLE test_mix_2 (a, b, c); CREATE PUBLICATION pub_mix_4 FOR ALL TABLES; -- initial data INSERT INTO test_mix_2 VALUES (1, 2, 3); )); $node_subscriber->safe_psql( 'postgres', qq( CREATE TABLE test_mix_2 (a int PRIMARY KEY, b int, c int); DROP SUBSCRIPTION sub1; CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_3, pub_mix_4; )); $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( INSERT INTO test_mix_2 VALUES (4, 5, 6); )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM test_mix_2"), qq(1|2|3 4|5|6), 'all columns should be replicated'); # TEST: With a table included in the publication which is FOR TABLES IN # SCHEMA, it means replicate all columns. $node_subscriber->safe_psql( 'postgres', qq( DROP SUBSCRIPTION sub1; CREATE TABLE test_mix_3 (a int PRIMARY KEY, b int, c int); )); $node_publisher->safe_psql( 'postgres', qq( DROP TABLE test_mix_2; CREATE TABLE test_mix_3 (a int PRIMARY KEY, b int, c int); CREATE PUBLICATION pub_mix_5 FOR TABLE test_mix_3 (a, b, c); CREATE PUBLICATION pub_mix_6 FOR TABLES IN SCHEMA public; -- initial data INSERT INTO test_mix_3 VALUES (1, 2, 3); )); $node_subscriber->safe_psql( 'postgres', qq( CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_5, pub_mix_6; )); $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( INSERT INTO test_mix_3 VALUES (4, 5, 6); )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM test_mix_3"), qq(1|2|3 4|5|6), 'all columns should be replicated'); # TEST: Check handling of publish_via_partition_root - if a partition is # published through partition root, we should only apply the column list # defined for the whole table (not the partitions) - both during the initial # sync and when replicating changes. This is what we do for row filters. $node_subscriber->safe_psql( 'postgres', qq( DROP SUBSCRIPTION sub1; CREATE TABLE test_root (a int PRIMARY KEY, b int, c int) PARTITION BY RANGE (a); CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10); CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20); )); $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE test_root (a int PRIMARY KEY, b int, c int) PARTITION BY RANGE (a); CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10); CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20); CREATE PUBLICATION pub_root_true FOR TABLE test_root (a) WITH (publish_via_partition_root = true); -- initial data INSERT INTO test_root VALUES (1, 2, 3); INSERT INTO test_root VALUES (10, 20, 30); )); $node_subscriber->safe_psql( 'postgres', qq( CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true; )); $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( INSERT INTO test_root VALUES (2, 3, 4); INSERT INTO test_root VALUES (11, 21, 31); )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql( 'postgres', "SELECT * FROM test_root ORDER BY a, b, c"), qq(1|| 2|| 10|| 11||), 'publication via partition root applies column list'); # TEST: Multiple publications which publish schema of parent table and # partition. The partition is published through two publications, once # through a schema (so no column list) containing the parent, and then # also directly (with all columns). The expected outcome is there is # no column list. $node_publisher->safe_psql( 'postgres', qq( DROP PUBLICATION pub1, pub2, pub3, pub4, pub5, pub6, pub7, pub8; CREATE SCHEMA s1; CREATE TABLE s1.t (a int, b int, c int) PARTITION BY RANGE (a); CREATE TABLE t_1 PARTITION OF s1.t FOR VALUES FROM (1) TO (10); CREATE PUBLICATION pub1 FOR TABLES IN SCHEMA s1; CREATE PUBLICATION pub2 FOR TABLE t_1(a, b, c); -- initial data INSERT INTO s1.t VALUES (1, 2, 3); )); $node_subscriber->safe_psql( 'postgres', qq( CREATE SCHEMA s1; CREATE TABLE s1.t (a int, b int, c int) PARTITION BY RANGE (a); CREATE TABLE t_1 PARTITION OF s1.t FOR VALUES FROM (1) TO (10); DROP SUBSCRIPTION sub1; CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1, pub2; )); $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( INSERT INTO s1.t VALUES (4, 5, 6); )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM s1.t ORDER BY a"), qq(1|2|3 4|5|6), 'two publications, publishing the same relation'); # Now resync the subcription, but with publications in the opposite order. # The result should be the same. $node_subscriber->safe_psql( 'postgres', qq( TRUNCATE s1.t; ALTER SUBSCRIPTION sub1 SET PUBLICATION pub2, pub1; )); $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( INSERT INTO s1.t VALUES (7, 8, 9); )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql('postgres', "SELECT * FROM s1.t ORDER BY a"), qq(7|8|9), 'two publications, publishing the same relation'); # TEST: One publication, containing both the parent and child relations. # The expected outcome is list "a", because that's the column list defined # for the top-most ancestor added to the publication. $node_publisher->safe_psql( 'postgres', qq( DROP SCHEMA s1 CASCADE; CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a); CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10) PARTITION BY RANGE (a); CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10); CREATE PUBLICATION pub3 FOR TABLE t_1 (a), t_2 WITH (PUBLISH_VIA_PARTITION_ROOT); -- initial data INSERT INTO t VALUES (1, 2, 3); )); $node_subscriber->safe_psql( 'postgres', qq( DROP SCHEMA s1 CASCADE; CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a); CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10) PARTITION BY RANGE (a); CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10); DROP SUBSCRIPTION sub1; CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub3; )); $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( INSERT INTO t VALUES (4, 5, 6); )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql( 'postgres', "SELECT * FROM t ORDER BY a, b, c"), qq(1|| 4||), 'publication containing both parent and child relation'); # TEST: One publication, containing both the parent and child relations. # The expected outcome is list "a", because that's the column list defined # for the top-most ancestor added to the publication. # Note: The difference from the preceding test is that in this case both # relations have a column list defined. $node_publisher->safe_psql( 'postgres', qq( DROP TABLE t; CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a); CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10) PARTITION BY RANGE (a); CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10); CREATE PUBLICATION pub4 FOR TABLE t_1 (a), t_2 (b) WITH (PUBLISH_VIA_PARTITION_ROOT); -- initial data INSERT INTO t VALUES (1, 2, 3); )); $node_subscriber->safe_psql( 'postgres', qq( DROP TABLE t; CREATE TABLE t (a int, b int, c int) PARTITION BY RANGE (a); CREATE TABLE t_1 PARTITION OF t FOR VALUES FROM (1) TO (10) PARTITION BY RANGE (a); CREATE TABLE t_2 PARTITION OF t_1 FOR VALUES FROM (1) TO (10); DROP SUBSCRIPTION sub1; CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub4; )); $node_subscriber->wait_for_subscription_sync; $node_publisher->safe_psql( 'postgres', qq( INSERT INTO t VALUES (4, 5, 6); )); $node_publisher->wait_for_catchup('sub1'); is( $node_subscriber->safe_psql( 'postgres', "SELECT * FROM t ORDER BY a, b, c"), qq(1|| 4||), 'publication containing both parent and child relation'); # TEST: Only columns in the column list should exist in the old tuple of UPDATE # and DELETE. $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE test_oldtuple_col (a int PRIMARY KEY, b int, c int); CREATE PUBLICATION pub_check_oldtuple FOR TABLE test_oldtuple_col (a, b); INSERT INTO test_oldtuple_col VALUES(1, 2, 3); SELECT * FROM pg_create_logical_replication_slot('test_slot', 'pgoutput'); UPDATE test_oldtuple_col SET a = 2; DELETE FROM test_oldtuple_col; )); # Check at 7th byte of binary data for the number of columns in the old tuple. # # 7 = 1 (count from 1) + 1 byte (message type) + 4 byte (relid) + 1 byte (flag # for old key). # # The message type of UPDATE is 85('U'). # The message type of DELETE is 68('D'). $result = $node_publisher->safe_psql( 'postgres', qq( SELECT substr(data, 7, 2) = int2send(2::smallint) FROM pg_logical_slot_peek_binary_changes('test_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pub_check_oldtuple') WHERE get_byte(data, 0) = 85 OR get_byte(data, 0) = 68 )); is( $result, qq(t t), 'check the number of columns in the old tuple'); # TEST: With a table included in multiple publications with different column # lists, we should catch the error when creating the subscription. $node_publisher->safe_psql( 'postgres', qq( CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); CREATE PUBLICATION pub_mix_1 FOR TABLE test_mix_1 (a, b); CREATE PUBLICATION pub_mix_2 FOR TABLE test_mix_1 (a, c); )); $node_subscriber->safe_psql( 'postgres', qq( DROP SUBSCRIPTION sub1; CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); )); my ($cmdret, $stdout, $stderr) = $node_subscriber->psql( 'postgres', qq( CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2; )); ok( $stderr =~ qr/cannot use different column lists for table "public.test_mix_1" in different publications/, 'different column lists detected'); # TEST: If the column list is changed after creating the subscription, we # should catch the error reported by walsender. $node_publisher->safe_psql( 'postgres', qq( ALTER PUBLICATION pub_mix_1 SET TABLE test_mix_1 (a, c); )); $node_subscriber->safe_psql( 'postgres', qq( CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_mix_1, pub_mix_2; )); $node_publisher->wait_for_catchup('sub1'); $node_publisher->safe_psql( 'postgres', qq( ALTER PUBLICATION pub_mix_1 SET TABLE test_mix_1 (a, b); INSERT INTO test_mix_1 VALUES(1, 1, 1); )); $offset = $node_publisher->wait_for_log( qr/cannot use different column lists for table "public.test_mix_1" in different publications/, $offset); $node_subscriber->stop('fast'); $node_publisher->stop('fast'); done_testing();