summaryrefslogtreecommitdiffstats
path: root/contrib/test_decoding/sql
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/test_decoding/sql')
-rw-r--r--contrib/test_decoding/sql/binary.sql14
-rw-r--r--contrib/test_decoding/sql/ddl.sql445
-rw-r--r--contrib/test_decoding/sql/decoding_in_xact.sql41
-rw-r--r--contrib/test_decoding/sql/decoding_into_rel.sql42
-rw-r--r--contrib/test_decoding/sql/messages.sql34
-rw-r--r--contrib/test_decoding/sql/permissions.sql69
-rw-r--r--contrib/test_decoding/sql/prepared.sql50
-rw-r--r--contrib/test_decoding/sql/replorigin.sql121
-rw-r--r--contrib/test_decoding/sql/rewrite.sql107
-rw-r--r--contrib/test_decoding/sql/slot.sql178
-rw-r--r--contrib/test_decoding/sql/spill.sql179
-rw-r--r--contrib/test_decoding/sql/stats.sql56
-rw-r--r--contrib/test_decoding/sql/stream.sql48
-rw-r--r--contrib/test_decoding/sql/time.sql22
-rw-r--r--contrib/test_decoding/sql/toast.sql327
-rw-r--r--contrib/test_decoding/sql/truncate.sql14
-rw-r--r--contrib/test_decoding/sql/twophase.sql114
-rw-r--r--contrib/test_decoding/sql/twophase_stream.sql45
-rw-r--r--contrib/test_decoding/sql/xact.sql33
19 files changed, 1939 insertions, 0 deletions
diff --git a/contrib/test_decoding/sql/binary.sql b/contrib/test_decoding/sql/binary.sql
new file mode 100644
index 0000000..df1c5fb
--- /dev/null
+++ b/contrib/test_decoding/sql/binary.sql
@@ -0,0 +1,14 @@
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+-- succeeds, textual plugin, textual consumer
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+-- fails, binary plugin, textual consumer
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '1', 'skip-empty-xacts', '1');
+-- succeeds, textual plugin, binary consumer
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+-- succeeds, binary plugin, binary consumer
+SELECT data FROM pg_logical_slot_get_binary_changes('regression_slot', NULL, NULL, 'force-binary', '1', 'skip-empty-xacts', '1');
+
+SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/ddl.sql b/contrib/test_decoding/sql/ddl.sql
new file mode 100644
index 0000000..e8b6444
--- /dev/null
+++ b/contrib/test_decoding/sql/ddl.sql
@@ -0,0 +1,445 @@
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+-- fail because of an already existing slot
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+-- fail because of an invalid name
+SELECT 'init' FROM pg_create_logical_replication_slot('Invalid Name', 'test_decoding');
+
+-- fail twice because of an invalid parameter values
+SELECT 'init' FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', 'frakbar');
+SELECT 'init' FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'nonexistent-option', 'frakbar');
+SELECT 'init' FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', 'frakbar');
+
+-- succeed once
+SELECT pg_drop_replication_slot('regression_slot');
+-- fail
+SELECT pg_drop_replication_slot('regression_slot');
+
+-- check that we're detecting a streaming rep slot used for logical decoding
+SELECT 'init' FROM pg_create_physical_replication_slot('repl');
+SELECT data FROM pg_logical_slot_get_changes('repl', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT pg_drop_replication_slot('repl');
+
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+/* check whether status function reports us, only reproduceable columns */
+SELECT slot_name, plugin, slot_type, active,
+ NOT catalog_xmin IS NULL AS catalog_xmin_set,
+ xmin IS NULl AS data_xmin_not_set,
+ pg_wal_lsn_diff(restart_lsn, '0/01000000') > 0 AS some_wal
+FROM pg_replication_slots;
+
+/*
+ * Check that changes are handled correctly when interleaved with ddl
+ */
+CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));
+BEGIN;
+INSERT INTO replication_example(somedata, text) VALUES (1, 1);
+INSERT INTO replication_example(somedata, text) VALUES (1, 2);
+COMMIT;
+
+ALTER TABLE replication_example ADD COLUMN bar int;
+
+INSERT INTO replication_example(somedata, text, bar) VALUES (2, 1, 4);
+
+BEGIN;
+INSERT INTO replication_example(somedata, text, bar) VALUES (2, 2, 4);
+INSERT INTO replication_example(somedata, text, bar) VALUES (2, 3, 4);
+INSERT INTO replication_example(somedata, text, bar) VALUES (2, 4, NULL);
+COMMIT;
+
+ALTER TABLE replication_example DROP COLUMN bar;
+INSERT INTO replication_example(somedata, text) VALUES (3, 1);
+
+BEGIN;
+INSERT INTO replication_example(somedata, text) VALUES (3, 2);
+INSERT INTO replication_example(somedata, text) VALUES (3, 3);
+COMMIT;
+
+ALTER TABLE replication_example RENAME COLUMN text TO somenum;
+
+INSERT INTO replication_example(somedata, somenum) VALUES (4, 1);
+
+-- collect all changes
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4);
+-- check that this doesn't produce any changes from the heap rewrite
+SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+INSERT INTO replication_example(somedata, somenum) VALUES (5, 1);
+
+BEGIN;
+INSERT INTO replication_example(somedata, somenum) VALUES (6, 1);
+ALTER TABLE replication_example ADD COLUMN zaphod1 int;
+INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 2, 1);
+ALTER TABLE replication_example ADD COLUMN zaphod2 int;
+INSERT INTO replication_example(somedata, somenum, zaphod2) VALUES (6, 3, 1);
+INSERT INTO replication_example(somedata, somenum, zaphod1) VALUES (6, 4, 2);
+COMMIT;
+
+-- show changes
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- ON CONFLICT DO UPDATE support
+BEGIN;
+INSERT INTO replication_example(id, somedata, somenum) SELECT i, i, i FROM generate_series(-15, 15) i
+ ON CONFLICT (id) DO UPDATE SET somenum = excluded.somenum + 1;
+COMMIT;
+
+/* display results */
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- MERGE support
+BEGIN;
+MERGE INTO replication_example t
+ USING (SELECT i as id, i as data, i as num FROM generate_series(-20, 5) i) s
+ ON t.id = s.id
+ WHEN MATCHED AND t.id < 0 THEN
+ UPDATE SET somenum = somenum + 1
+ WHEN MATCHED AND t.id >= 0 THEN
+ DELETE
+ WHEN NOT MATCHED THEN
+ INSERT VALUES (s.*);
+COMMIT;
+
+/* display results */
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int);
+INSERT INTO tr_unique(data) VALUES(10);
+ALTER TABLE tr_unique RENAME TO tr_pkey;
+ALTER TABLE tr_pkey ADD COLUMN id serial primary key;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-rewrites', '1');
+
+INSERT INTO tr_pkey(data) VALUES(1);
+--show deletion with primary key
+DELETE FROM tr_pkey;
+
+/* display results */
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+/*
+ * check that disk spooling works (also for logical messages)
+ */
+BEGIN;
+CREATE TABLE tr_etoomuch (id serial primary key, data int);
+INSERT INTO tr_etoomuch(data) SELECT g.i FROM generate_series(1, 10234) g(i);
+SELECT 'tx logical msg' FROM pg_logical_emit_message(true, 'test', 'tx logical msg');
+DELETE FROM tr_etoomuch WHERE id < 5000;
+UPDATE tr_etoomuch SET data = - data WHERE id > 5000;
+CREATE TABLE tr_oddlength (id text primary key, data text);
+INSERT INTO tr_oddlength VALUES('ab', 'foo');
+COMMIT;
+
+/* display results, but hide most of the output */
+SELECT count(*), min(data), max(data)
+FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
+GROUP BY substring(data, 1, 24)
+ORDER BY 1,2;
+
+-- check updates of primary keys work correctly
+BEGIN;
+CREATE TABLE spoolme AS SELECT g.i FROM generate_series(1, 5000) g(i);
+UPDATE tr_etoomuch SET id = -id WHERE id = 5000;
+UPDATE tr_oddlength SET id = 'x', data = 'quux';
+UPDATE tr_oddlength SET id = 'yy', data = 'a';
+DELETE FROM spoolme;
+DROP TABLE spoolme;
+COMMIT;
+
+SELECT data
+FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
+WHERE data ~ 'UPDATE';
+
+-- check that a large, spooled, upsert works
+INSERT INTO tr_etoomuch (id, data)
+SELECT g.i, -g.i FROM generate_series(8000, 12000) g(i)
+ON CONFLICT(id) DO UPDATE SET data = EXCLUDED.data;
+
+SELECT substring(data, 1, 29), count(*)
+FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') WITH ORDINALITY
+GROUP BY 1
+ORDER BY min(ordinality);
+
+/*
+ * check whether we decode subtransactions correctly in relation with each
+ * other
+ */
+CREATE TABLE tr_sub (id serial primary key, path text);
+
+-- toplevel, subtxn, toplevel, subtxn, subtxn
+BEGIN;
+INSERT INTO tr_sub(path) VALUES ('1-top-#1');
+
+SAVEPOINT a;
+INSERT INTO tr_sub(path) VALUES ('1-top-1-#1');
+INSERT INTO tr_sub(path) VALUES ('1-top-1-#2');
+RELEASE SAVEPOINT a;
+
+SAVEPOINT b;
+SAVEPOINT c;
+INSERT INTO tr_sub(path) VALUES ('1-top-2-1-#1');
+INSERT INTO tr_sub(path) VALUES ('1-top-2-1-#2');
+RELEASE SAVEPOINT c;
+INSERT INTO tr_sub(path) VALUES ('1-top-2-#1');
+RELEASE SAVEPOINT b;
+COMMIT;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- check that we handle xlog assignments correctly
+BEGIN;
+-- nest 80 subtxns
+SAVEPOINT subtop;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;SAVEPOINT a;
+-- assign xid by inserting
+INSERT INTO tr_sub(path) VALUES ('2-top-1...--#1');
+INSERT INTO tr_sub(path) VALUES ('2-top-1...--#2');
+INSERT INTO tr_sub(path) VALUES ('2-top-1...--#3');
+RELEASE SAVEPOINT subtop;
+INSERT INTO tr_sub(path) VALUES ('2-top-#1');
+COMMIT;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- make sure rollbacked subtransactions aren't decoded
+BEGIN;
+INSERT INTO tr_sub(path) VALUES ('3-top-2-#1');
+SAVEPOINT a;
+INSERT INTO tr_sub(path) VALUES ('3-top-2-1-#1');
+SAVEPOINT b;
+INSERT INTO tr_sub(path) VALUES ('3-top-2-2-#1');
+ROLLBACK TO SAVEPOINT b;
+INSERT INTO tr_sub(path) VALUES ('3-top-2-#2');
+COMMIT;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- test whether a known, but not yet logged toplevel xact, followed by a
+-- subxact commit is handled correctly
+BEGIN;
+SELECT pg_current_xact_id() != '0'; -- so no fixed xid apears in the outfile
+SAVEPOINT a;
+INSERT INTO tr_sub(path) VALUES ('4-top-1-#1');
+RELEASE SAVEPOINT a;
+COMMIT;
+
+-- test whether a change in a subtransaction, in an unknown toplevel
+-- xact is handled correctly.
+BEGIN;
+SAVEPOINT a;
+INSERT INTO tr_sub(path) VALUES ('5-top-1-#1');
+COMMIT;
+
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- check that DDL in aborted subtransactions handled correctly
+CREATE TABLE tr_sub_ddl(data int);
+BEGIN;
+SAVEPOINT a;
+ALTER TABLE tr_sub_ddl ALTER COLUMN data TYPE text;
+INSERT INTO tr_sub_ddl VALUES ('blah-blah');
+ROLLBACK TO SAVEPOINT a;
+ALTER TABLE tr_sub_ddl ALTER COLUMN data TYPE bigint;
+INSERT INTO tr_sub_ddl VALUES(43);
+COMMIT;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+
+/*
+ * Check whether treating a table as a catalog table works somewhat
+ */
+CREATE TABLE replication_metadata (
+ id serial primary key,
+ relation name NOT NULL,
+ options text[]
+)
+WITH (user_catalog_table = true)
+;
+\d+ replication_metadata
+
+INSERT INTO replication_metadata(relation, options)
+VALUES ('foo', ARRAY['a', 'b']);
+
+ALTER TABLE replication_metadata RESET (user_catalog_table);
+\d+ replication_metadata
+
+INSERT INTO replication_metadata(relation, options)
+VALUES ('bar', ARRAY['a', 'b']);
+
+ALTER TABLE replication_metadata SET (user_catalog_table = true);
+\d+ replication_metadata
+
+INSERT INTO replication_metadata(relation, options)
+VALUES ('blub', NULL);
+
+-- make sure rewrites don't work
+ALTER TABLE replication_metadata ADD COLUMN rewritemeornot int;
+ALTER TABLE replication_metadata ALTER COLUMN rewritemeornot TYPE text;
+
+ALTER TABLE replication_metadata SET (user_catalog_table = false);
+\d+ replication_metadata
+
+INSERT INTO replication_metadata(relation, options)
+VALUES ('zaphod', NULL);
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+/*
+ * check whether we handle updates/deletes correct with & without a pkey
+ */
+
+/* we should handle the case without a key at all more gracefully */
+CREATE TABLE table_without_key(id serial, data int);
+INSERT INTO table_without_key(data) VALUES(1),(2);
+DELETE FROM table_without_key WHERE data = 1;
+-- won't log old keys
+UPDATE table_without_key SET data = 3 WHERE data = 2;
+UPDATE table_without_key SET id = -id;
+UPDATE table_without_key SET id = -id;
+-- should log the full old row now
+ALTER TABLE table_without_key REPLICA IDENTITY FULL;
+UPDATE table_without_key SET data = 3 WHERE data = 2;
+UPDATE table_without_key SET id = -id;
+UPDATE table_without_key SET id = -id;
+-- ensure that FULL correctly deals with new columns
+ALTER TABLE table_without_key ADD COLUMN new_column text;
+UPDATE table_without_key SET id = -id;
+UPDATE table_without_key SET id = -id, new_column = 'someval';
+DELETE FROM table_without_key WHERE data = 3;
+
+CREATE TABLE table_with_pkey(id serial primary key, data int);
+INSERT INTO table_with_pkey(data) VALUES(1), (2);
+DELETE FROM table_with_pkey WHERE data = 1;
+-- should log the old pkey
+UPDATE table_with_pkey SET data = 3 WHERE data = 2;
+UPDATE table_with_pkey SET id = -id;
+UPDATE table_with_pkey SET id = -id;
+-- check that we log nothing despite having a pkey
+ALTER TABLE table_without_key REPLICA IDENTITY NOTHING;
+UPDATE table_with_pkey SET id = -id;
+-- check that we log everything despite having a pkey
+ALTER TABLE table_without_key REPLICA IDENTITY FULL;
+UPDATE table_with_pkey SET id = -id;
+DELETE FROM table_with_pkey WHERE data = 3;
+
+CREATE TABLE table_with_unique_not_null(id serial unique, data int);
+ALTER TABLE table_with_unique_not_null ALTER COLUMN id SET NOT NULL; --already set
+-- won't log anything, replica identity not setup
+INSERT INTO table_with_unique_not_null(data) VALUES(1), (2);
+DELETE FROM table_with_unique_not_null WHERE data = 1;
+UPDATE table_with_unique_not_null SET data = 3 WHERE data = 2;
+UPDATE table_with_unique_not_null SET id = -id;
+UPDATE table_with_unique_not_null SET id = -id;
+DELETE FROM table_with_unique_not_null WHERE data = 3;
+-- should log old key
+ALTER TABLE table_with_unique_not_null REPLICA IDENTITY USING INDEX table_with_unique_not_null_id_key;
+INSERT INTO table_with_unique_not_null(data) VALUES(1), (2);
+DELETE FROM table_with_unique_not_null WHERE data = 1;
+UPDATE table_with_unique_not_null SET data = 3 WHERE data = 2;
+UPDATE table_with_unique_not_null SET id = -id;
+UPDATE table_with_unique_not_null SET id = -id;
+DELETE FROM table_with_unique_not_null WHERE data = 3;
+
+-- check tables with dropped indexes used in REPLICA IDENTITY
+-- table with primary key
+CREATE TABLE table_dropped_index_with_pk (a int PRIMARY KEY, b int, c int);
+CREATE UNIQUE INDEX table_dropped_index_with_pk_idx
+ ON table_dropped_index_with_pk(a);
+ALTER TABLE table_dropped_index_with_pk REPLICA IDENTITY
+ USING INDEX table_dropped_index_with_pk_idx;
+DROP INDEX table_dropped_index_with_pk_idx;
+INSERT INTO table_dropped_index_with_pk VALUES (1,1,1), (2,2,2), (3,3,3);
+UPDATE table_dropped_index_with_pk SET a = 4 WHERE a = 1;
+UPDATE table_dropped_index_with_pk SET b = 5 WHERE a = 2;
+UPDATE table_dropped_index_with_pk SET b = 6, c = 7 WHERE a = 3;
+DELETE FROM table_dropped_index_with_pk WHERE b = 1;
+DELETE FROM table_dropped_index_with_pk WHERE a = 3;
+DROP TABLE table_dropped_index_with_pk;
+
+-- table without primary key
+CREATE TABLE table_dropped_index_no_pk (a int NOT NULL, b int, c int);
+CREATE UNIQUE INDEX table_dropped_index_no_pk_idx
+ ON table_dropped_index_no_pk(a);
+ALTER TABLE table_dropped_index_no_pk REPLICA IDENTITY
+ USING INDEX table_dropped_index_no_pk_idx;
+DROP INDEX table_dropped_index_no_pk_idx;
+INSERT INTO table_dropped_index_no_pk VALUES (1,1,1), (2,2,2), (3,3,3);
+UPDATE table_dropped_index_no_pk SET a = 4 WHERE a = 1;
+UPDATE table_dropped_index_no_pk SET b = 5 WHERE a = 2;
+UPDATE table_dropped_index_no_pk SET b = 6, c = 7 WHERE a = 3;
+DELETE FROM table_dropped_index_no_pk WHERE b = 1;
+DELETE FROM table_dropped_index_no_pk WHERE a = 3;
+DROP TABLE table_dropped_index_no_pk;
+
+-- check toast support
+BEGIN;
+CREATE SEQUENCE toasttable_rand_seq START 79 INCREMENT 1499; -- portable "random"
+CREATE TABLE toasttable(
+ id serial primary key,
+ toasted_col1 text,
+ rand1 float8 DEFAULT nextval('toasttable_rand_seq'),
+ toasted_col2 text,
+ rand2 float8 DEFAULT nextval('toasttable_rand_seq')
+ );
+COMMIT;
+-- uncompressed external toast data
+INSERT INTO toasttable(toasted_col1) SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i);
+
+-- compressed external toast data
+INSERT INTO toasttable(toasted_col2) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
+
+-- update of existing column
+UPDATE toasttable
+ SET toasted_col1 = (SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i))
+WHERE id = 1;
+
+-- This output is extremely wide, and using aligned mode causes psql to
+-- produce 200kB of useless dashes. Turn that off temporarily to avoid it.
+\pset format unaligned
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+\pset format aligned
+
+INSERT INTO toasttable(toasted_col1) SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i);
+
+-- update of second column, first column unchanged
+UPDATE toasttable
+ SET toasted_col2 = (SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i))
+WHERE id = 1;
+
+-- make sure we decode correctly even if the toast table is gone
+DROP TABLE toasttable;
+
+\pset format unaligned
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- done, free logical replication slot
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+\pset format aligned
+
+SELECT pg_drop_replication_slot('regression_slot');
+
+/* check that the slot is gone */
+\x
+SELECT * FROM pg_replication_slots;
+\x
diff --git a/contrib/test_decoding/sql/decoding_in_xact.sql b/contrib/test_decoding/sql/decoding_in_xact.sql
new file mode 100644
index 0000000..108782d
--- /dev/null
+++ b/contrib/test_decoding/sql/decoding_in_xact.sql
@@ -0,0 +1,41 @@
+-- predictability
+SET synchronous_commit = on;
+
+-- fail because we're creating a slot while in an xact with xid
+BEGIN;
+SELECT pg_current_xact_id() = '0';
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ROLLBACK;
+
+-- fail because we're creating a slot while in a subxact whose topxact has an xid
+BEGIN;
+SELECT pg_current_xact_id() = '0';
+SAVEPOINT barf;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ROLLBACK TO SAVEPOINT barf;
+ROLLBACK;
+
+-- succeed, outside tx.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
+
+-- succeed, in tx without xid.
+BEGIN;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+COMMIT;
+
+CREATE TABLE nobarf(id serial primary key, data text);
+INSERT INTO nobarf(data) VALUES('1');
+
+-- decoding works in transaction with xid
+BEGIN;
+SELECT pg_current_xact_id() = '0';
+-- don't show yet, haven't committed
+INSERT INTO nobarf(data) VALUES('2');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT;
+
+INSERT INTO nobarf(data) VALUES('3');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/decoding_into_rel.sql b/contrib/test_decoding/sql/decoding_into_rel.sql
new file mode 100644
index 0000000..1068cec
--- /dev/null
+++ b/contrib/test_decoding/sql/decoding_into_rel.sql
@@ -0,0 +1,42 @@
+-- test that we can insert the result of a get_changes call into a
+-- logged relation. That's really not a good idea in practical terms,
+-- but provides a nice test.
+
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- slot works
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- create some changes
+CREATE TABLE somechange(id serial primary key);
+INSERT INTO somechange DEFAULT VALUES;
+
+CREATE TABLE changeresult AS
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT * FROM changeresult;
+
+INSERT INTO changeresult
+ SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+INSERT INTO changeresult
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT * FROM changeresult;
+DROP TABLE changeresult;
+DROP TABLE somechange;
+
+-- check calling logical decoding from pl/pgsql
+CREATE FUNCTION slot_changes_wrapper(slot_name name) RETURNS SETOF TEXT AS $$
+BEGIN
+ RETURN QUERY
+ SELECT data FROM pg_logical_slot_peek_changes(slot_name, NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+END$$ LANGUAGE plpgsql;
+
+SELECT * FROM slot_changes_wrapper('regression_slot');
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
new file mode 100644
index 0000000..cf3f773
--- /dev/null
+++ b/contrib/test_decoding/sql/messages.sql
@@ -0,0 +1,34 @@
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
+SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
+
+BEGIN;
+SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');
+SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4');
+ROLLBACK;
+
+BEGIN;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5');
+SELECT 'msg6' FROM pg_logical_emit_message(false, 'test', 'msg6');
+SELECT 'msg7' FROM pg_logical_emit_message(true, 'test', 'msg7');
+COMMIT;
+
+SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+
+-- test db filtering
+\set prevdb :DBNAME
+\c template1
+
+SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1');
+SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
+
+\c :prevdb
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+
+SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/permissions.sql b/contrib/test_decoding/sql/permissions.sql
new file mode 100644
index 0000000..312b514
--- /dev/null
+++ b/contrib/test_decoding/sql/permissions.sql
@@ -0,0 +1,69 @@
+-- predictability
+SET synchronous_commit = on;
+
+-- setup
+CREATE ROLE regress_lr_normal;
+CREATE ROLE regress_lr_superuser SUPERUSER;
+CREATE ROLE regress_lr_replication REPLICATION;
+CREATE TABLE lr_test(data text);
+
+-- superuser can control replication
+SET ROLE regress_lr_superuser;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+INSERT INTO lr_test VALUES('lr_superuser_init');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT pg_drop_replication_slot('regression_slot');
+RESET ROLE;
+
+-- replication user can control replication
+SET ROLE regress_lr_replication;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+INSERT INTO lr_test VALUES('lr_superuser_init');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT pg_drop_replication_slot('regression_slot');
+RESET ROLE;
+
+-- plain user *can't* can control replication
+SET ROLE regress_lr_normal;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+INSERT INTO lr_test VALUES('lr_superuser_init');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT pg_drop_replication_slot('regression_slot');
+RESET ROLE;
+
+-- replication users can drop superuser created slots
+SET ROLE regress_lr_superuser;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+RESET ROLE;
+SET ROLE regress_lr_replication;
+SELECT pg_drop_replication_slot('regression_slot');
+RESET ROLE;
+
+-- normal users can't drop existing slots
+SET ROLE regress_lr_superuser;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+RESET ROLE;
+SET ROLE regress_lr_normal;
+SELECT pg_drop_replication_slot('regression_slot');
+RESET ROLE;
+
+-- all users can see existing slots
+SET ROLE regress_lr_superuser;
+SELECT slot_name, plugin FROM pg_replication_slots;
+RESET ROLE;
+
+SET ROLE regress_lr_replication;
+SELECT slot_name, plugin FROM pg_replication_slots;
+RESET ROLE;
+
+SET ROLE regress_lr_normal;
+SELECT slot_name, plugin FROM pg_replication_slots;
+RESET ROLE;
+
+-- cleanup
+SELECT pg_drop_replication_slot('regression_slot');
+
+DROP ROLE regress_lr_normal;
+DROP ROLE regress_lr_superuser;
+DROP ROLE regress_lr_replication;
+DROP TABLE lr_test;
diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql
new file mode 100644
index 0000000..e726397
--- /dev/null
+++ b/contrib/test_decoding/sql/prepared.sql
@@ -0,0 +1,50 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE test_prepared1(id int);
+CREATE TABLE test_prepared2(id int);
+
+-- test simple successful use of a prepared xact
+BEGIN;
+INSERT INTO test_prepared1 VALUES (1);
+PREPARE TRANSACTION 'test_prepared#1';
+COMMIT PREPARED 'test_prepared#1';
+INSERT INTO test_prepared1 VALUES (2);
+
+-- test abort of a prepared xact
+BEGIN;
+INSERT INTO test_prepared1 VALUES (3);
+PREPARE TRANSACTION 'test_prepared#2';
+ROLLBACK PREPARED 'test_prepared#2';
+
+INSERT INTO test_prepared1 VALUES (4);
+
+-- test prepared xact containing ddl
+BEGIN;
+INSERT INTO test_prepared1 VALUES (5);
+ALTER TABLE test_prepared1 ADD COLUMN data text;
+INSERT INTO test_prepared1 VALUES (6, 'frakbar');
+PREPARE TRANSACTION 'test_prepared#3';
+
+-- test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+
+-- separate table because of the lock from the ALTER
+-- this will come before the '5' row above, as this commits before it.
+INSERT INTO test_prepared2 VALUES (7);
+
+COMMIT PREPARED 'test_prepared#3';
+
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (8);
+INSERT INTO test_prepared2 VALUES (9);
+
+-- cleanup
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
+
+-- show results
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
new file mode 100644
index 0000000..2e28a48
--- /dev/null
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -0,0 +1,121 @@
+-- predictability
+SET synchronous_commit = on;
+
+-- superuser required by default
+CREATE ROLE regress_origin_replication REPLICATION;
+SET ROLE regress_origin_replication;
+SELECT pg_replication_origin_advance('regress_test_decoding: perm', '0/1');
+SELECT pg_replication_origin_create('regress_test_decoding: perm');
+SELECT pg_replication_origin_drop('regress_test_decoding: perm');
+SELECT pg_replication_origin_oid('regress_test_decoding: perm');
+SELECT pg_replication_origin_progress('regress_test_decoding: perm', false);
+SELECT pg_replication_origin_session_is_setup();
+SELECT pg_replication_origin_session_progress(false);
+SELECT pg_replication_origin_session_reset();
+SELECT pg_replication_origin_session_setup('regress_test_decoding: perm');
+SELECT pg_replication_origin_xact_reset();
+SELECT pg_replication_origin_xact_setup('0/1', '2013-01-01 00:00');
+SELECT pg_show_replication_origin_status();
+RESET ROLE;
+DROP ROLE regress_origin_replication;
+
+CREATE TABLE origin_tbl(id serial primary key, data text);
+CREATE TABLE target_tbl(id serial primary key, data text);
+
+SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+-- ensure duplicate creations fail
+SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+
+--ensure deletions work (once)
+SELECT pg_replication_origin_create('regress_test_decoding: temp');
+SELECT pg_replication_origin_drop('regress_test_decoding: temp');
+SELECT pg_replication_origin_drop('regress_test_decoding: temp');
+
+-- various failure checks for undefined slots
+select pg_replication_origin_advance('regress_test_decoding: temp', '0/1');
+select pg_replication_origin_session_setup('regress_test_decoding: temp');
+select pg_replication_origin_progress('regress_test_decoding: temp', true);
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- origin tx
+INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- as is normal, the insert into target_tbl shows up
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
+
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
+
+-- ensure we prevent duplicate setup
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot');
+
+SELECT '' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
+
+BEGIN;
+-- setup transaction origin
+SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+COMMIT;
+
+-- check replication progress for the session is correct
+SELECT pg_replication_origin_session_progress(false);
+SELECT pg_replication_origin_session_progress(true);
+
+SELECT pg_replication_origin_session_reset();
+
+SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
+
+-- check replication progress identified by name is correct
+SELECT pg_replication_origin_progress('regress_test_decoding: regression_slot', false);
+SELECT pg_replication_origin_progress('regress_test_decoding: regression_slot', true);
+
+-- ensure reset requires previously setup state
+SELECT pg_replication_origin_session_reset();
+
+-- and magically the replayed xact will be filtered!
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+
+--but new original changes still show up
+INSERT INTO origin_tbl(data) VALUES ('will be replicated');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot');
+
+-- Set of transactions with no origin LSNs and commit timestamps set for
+-- this session.
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_no_lsn', 'test_decoding');
+SELECT pg_replication_origin_create('regress_test_decoding: regression_slot_no_lsn');
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot_no_lsn');
+-- Simple transactions
+BEGIN;
+INSERT INTO origin_tbl(data) VALUES ('no_lsn, commit');
+COMMIT;
+BEGIN;
+INSERT INTO origin_tbl(data) VALUES ('no_lsn, rollback');
+ROLLBACK;
+-- 2PC transactions
+BEGIN;
+INSERT INTO origin_tbl(data) VALUES ('no_lsn, commit prepared');
+PREPARE TRANSACTION 'replorigin_prepared';
+COMMIT PREPARED 'replorigin_prepared';
+BEGIN;
+INSERT INTO origin_tbl(data) VALUES ('no_lsn, rollback prepared');
+PREPARE TRANSACTION 'replorigin_prepared';
+ROLLBACK PREPARED 'replorigin_prepared';
+SELECT local_id, external_id,
+ remote_lsn <> '0/0' AS valid_remote_lsn,
+ local_lsn <> '0/0' AS valid_local_lsn
+ FROM pg_replication_origin_status;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+-- Clean up
+SELECT pg_replication_origin_session_reset();
+SELECT pg_drop_replication_slot('regression_slot_no_lsn');
+SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn');
diff --git a/contrib/test_decoding/sql/rewrite.sql b/contrib/test_decoding/sql/rewrite.sql
new file mode 100644
index 0000000..62dead3
--- /dev/null
+++ b/contrib/test_decoding/sql/rewrite.sql
@@ -0,0 +1,107 @@
+-- predictability
+SET synchronous_commit = on;
+
+DROP TABLE IF EXISTS replication_example;
+
+-- Ensure there's tables with toast datums. To do so, we dynamically
+-- create a function returning a large textblob. We want tables of
+-- different kinds: mapped catalog table, unmapped catalog table,
+-- shared catalog table and usertable.
+CREATE FUNCTION exec(text) returns void language plpgsql volatile
+ AS $f$
+ BEGIN
+ EXECUTE $1;
+ END;
+$f$;
+CREATE ROLE regress_justforcomments NOLOGIN;
+
+SELECT exec(
+ format($outer$CREATE FUNCTION iamalongfunction() RETURNS TEXT IMMUTABLE LANGUAGE SQL AS $f$SELECT text %L$f$$outer$,
+ (SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i))));
+SELECT exec(
+ format($outer$COMMENT ON FUNCTION iamalongfunction() IS %L$outer$,
+ iamalongfunction()));
+SELECT exec(
+ format($outer$COMMENT ON ROLE REGRESS_JUSTFORCOMMENTS IS %L$outer$,
+ iamalongfunction()));
+CREATE TABLE iamalargetable AS SELECT iamalongfunction() longfunctionoutput;
+
+-- verify toast usage
+SELECT pg_relation_size((SELECT reltoastrelid FROM pg_class WHERE oid = 'pg_proc'::regclass)) > 0;
+SELECT pg_relation_size((SELECT reltoastrelid FROM pg_class WHERE oid = 'pg_description'::regclass)) > 0;
+SELECT pg_relation_size((SELECT reltoastrelid FROM pg_class WHERE oid = 'pg_shdescription'::regclass)) > 0;
+
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));
+INSERT INTO replication_example(somedata) VALUES (1);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+BEGIN;
+INSERT INTO replication_example(somedata) VALUES (2);
+ALTER TABLE replication_example ADD COLUMN testcolumn1 int;
+INSERT INTO replication_example(somedata, testcolumn1) VALUES (3, 1);
+COMMIT;
+
+BEGIN;
+INSERT INTO replication_example(somedata) VALUES (3);
+ALTER TABLE replication_example ADD COLUMN testcolumn2 int;
+INSERT INTO replication_example(somedata, testcolumn1, testcolumn2) VALUES (4, 2, 1);
+COMMIT;
+
+VACUUM FULL pg_am;
+VACUUM FULL pg_amop;
+VACUUM FULL pg_proc;
+VACUUM FULL pg_opclass;
+VACUUM FULL pg_type;
+VACUUM FULL pg_index;
+VACUUM FULL pg_database;
+
+-- repeated rewrites that fail
+BEGIN;
+CLUSTER pg_class USING pg_class_oid_index;
+CLUSTER pg_class USING pg_class_oid_index;
+ROLLBACK;
+
+-- repeated rewrites that succeed
+BEGIN;
+CLUSTER pg_class USING pg_class_oid_index;
+CLUSTER pg_class USING pg_class_oid_index;
+CLUSTER pg_class USING pg_class_oid_index;
+COMMIT;
+
+ -- repeated rewrites in different transactions
+VACUUM FULL pg_class;
+VACUUM FULL pg_class;
+
+-- reindexing of important relations / indexes
+REINDEX TABLE pg_class;
+REINDEX INDEX pg_class_oid_index;
+REINDEX INDEX pg_class_tblspc_relfilenode_index;
+
+INSERT INTO replication_example(somedata, testcolumn1) VALUES (5, 3);
+
+BEGIN;
+INSERT INTO replication_example(somedata, testcolumn1) VALUES (6, 4);
+ALTER TABLE replication_example ADD COLUMN testcolumn3 int;
+INSERT INTO replication_example(somedata, testcolumn1, testcolumn3) VALUES (7, 5, 1);
+COMMIT;
+
+-- make old files go away
+CHECKPOINT;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- trigger repeated rewrites of a system catalog with a toast table,
+-- that previously was buggy: 20180914021046.oi7dm4ra3ot2g2kt@alap3.anarazel.de
+VACUUM FULL pg_proc; VACUUM FULL pg_description; VACUUM FULL pg_shdescription; VACUUM FULL iamalargetable;
+INSERT INTO replication_example(somedata, testcolumn1, testcolumn3) VALUES (8, 6, 1);
+VACUUM FULL pg_proc; VACUUM FULL pg_description; VACUUM FULL pg_shdescription; VACUUM FULL iamalargetable;
+INSERT INTO replication_example(somedata, testcolumn1, testcolumn3) VALUES (9, 7, 1);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
+DROP TABLE IF EXISTS replication_example;
+DROP FUNCTION iamalongfunction();
+DROP FUNCTION exec(text);
+DROP ROLE regress_justforcomments;
diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql
new file mode 100644
index 0000000..1aa27c5
--- /dev/null
+++ b/contrib/test_decoding/sql/slot.sql
@@ -0,0 +1,178 @@
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t', 'test_decoding', true);
+
+SELECT pg_drop_replication_slot('regression_slot_p');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_p', 'test_decoding', false);
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_t2', 'test_decoding', true);
+
+SELECT pg_create_logical_replication_slot('foo', 'nonexistent');
+
+-- here we want to start a new session and wait till old one is gone
+select pg_backend_pid() as oldpid \gset
+\c -
+SET synchronous_commit = on;
+
+do 'declare c int = 0;
+begin
+ while (select count(*) from pg_replication_slots where active_pid = '
+ :'oldpid'
+ ') > 0 loop c := c + 1; perform pg_sleep(0.01); end loop;
+ raise log ''slot test looped % times'', c;
+end';
+
+-- should fail because the temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('regression_slot_t');
+SELECT pg_drop_replication_slot('regression_slot_t2');
+
+-- monitoring functions for slot directories
+SELECT count(*) >= 0 AS ok FROM pg_ls_logicalmapdir();
+SELECT count(*) >= 0 AS ok FROM pg_ls_logicalsnapdir();
+SELECT count(*) >= 0 AS ok FROM pg_ls_replslotdir('regression_slot_p');
+SELECT count(*) >= 0 AS ok FROM pg_ls_replslotdir('not_existing_slot'); -- fails
+
+-- permanent slot has survived
+SELECT pg_drop_replication_slot('regression_slot_p');
+
+-- test switching between slots in a session
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true);
+
+CREATE TABLE replication_example(id SERIAL PRIMARY KEY, somedata int, text varchar(120));
+BEGIN;
+INSERT INTO replication_example(somedata, text) VALUES (1, 1);
+INSERT INTO replication_example(somedata, text) VALUES (1, 2);
+COMMIT;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot2', 'test_decoding', true);
+
+INSERT INTO replication_example(somedata, text) VALUES (1, 3);
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+INSERT INTO replication_example(somedata, text) VALUES (1, 4);
+INSERT INTO replication_example(somedata, text) VALUES (1, 5);
+
+SELECT pg_current_wal_lsn() AS wal_lsn \gset
+
+INSERT INTO replication_example(somedata, text) VALUES (1, 6);
+
+SELECT end_lsn FROM pg_replication_slot_advance('regression_slot1', :'wal_lsn') \gset
+SELECT slot_name FROM pg_replication_slot_advance('regression_slot2', pg_current_wal_lsn());
+
+SELECT :'wal_lsn' = :'end_lsn';
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+DROP TABLE replication_example;
+
+-- error
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot1', 'test_decoding', true);
+
+-- both should error as they should be dropped on error
+SELECT pg_drop_replication_slot('regression_slot1');
+SELECT pg_drop_replication_slot('regression_slot2');
+
+-- slot advance with physical slot, error with non-reserved slot
+SELECT slot_name FROM pg_create_physical_replication_slot('regression_slot3');
+SELECT pg_replication_slot_advance('regression_slot3', '0/0'); -- invalid LSN
+SELECT pg_replication_slot_advance('regression_slot3', '0/1'); -- error
+SELECT pg_drop_replication_slot('regression_slot3');
+
+--
+-- Test copy functions for logical replication slots
+--
+
+-- Create and copy logical slots
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false);
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', false, 'pgoutput');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', true, 'pgoutput');
+
+-- Check all copied slots status
+SELECT
+ o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+ (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+ LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+ o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+
+-- Now we have maximum 4 replication slots. Check slots are properly
+-- released even when raise error during creating the target slot.
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+SELECT pg_drop_replication_slot('copied_slot1_change_plugin');
+
+-- Test based on the temporary logical slot
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot2', 'test_decoding', true);
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin', true, 'pgoutput');
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_change_plugin_temp', false, 'pgoutput');
+
+-- Check all copied slots status
+SELECT
+ o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+FROM
+ (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+ LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
+WHERE
+ o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+
+-- Cannot copy a logical slot to a physical slot
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
+
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('copied_slot2_change_plugin_temp');
+
+--
+-- Test copy functions for physical replication slots
+--
+
+-- Create and copy physical slots
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot1', true);
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', false);
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_no_change');
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot1', 'copied_slot1_temp', true);
+
+-- Check all copied slots status. Since all slots don't reserve WAL we check only other fields.
+SELECT slot_name, slot_type, temporary FROM pg_replication_slots;
+
+-- Cannot copy a physical slot to a logical slot
+SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'failed'); -- error
+
+-- Cannot copy a physical slot that doesn't reserve WAL
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'failed'); -- error
+
+-- temporary slots were dropped automatically
+SELECT pg_drop_replication_slot('orig_slot1');
+SELECT pg_drop_replication_slot('orig_slot2');
+SELECT pg_drop_replication_slot('copied_slot1_no_change');
+
+-- Test based on the temporary physical slot
+SELECT 'init' FROM pg_create_physical_replication_slot('orig_slot2', true, true);
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_no_change');
+SELECT 'copy' FROM pg_copy_physical_replication_slot('orig_slot2', 'copied_slot2_notemp', false);
+
+-- Check all copied slots status
+SELECT
+ o.slot_name, o.temporary, c.slot_name, c.temporary
+FROM
+ (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
+ LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn
+WHERE
+ o.slot_name != c.slot_name
+ORDER BY o.slot_name, c.slot_name;
+
+SELECT pg_drop_replication_slot('orig_slot2');
+SELECT pg_drop_replication_slot('copied_slot2_no_change');
+SELECT pg_drop_replication_slot('copied_slot2_notemp');
diff --git a/contrib/test_decoding/sql/spill.sql b/contrib/test_decoding/sql/spill.sql
new file mode 100644
index 0000000..e638cac
--- /dev/null
+++ b/contrib/test_decoding/sql/spill.sql
@@ -0,0 +1,179 @@
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE spill_test(data text);
+
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- spilling main xact
+BEGIN;
+INSERT INTO spill_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling subxact, nothing in main
+BEGIN;
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling subxact, spilling main xact
+BEGIN;
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling subxact, non-spilling main xact
+BEGIN;
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- not-spilling subxact, spilling main xact
+BEGIN;
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-subbig-topbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling main xact, spilling subxact
+BEGIN;
+INSERT INTO spill_test SELECT 'serialize-topbig-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-topbig-subbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
+RELEASE SAVEPOINT s;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling main xact, not spilling subxact
+BEGIN;
+INSERT INTO spill_test SELECT 'serialize-topbig-subsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
+SAVEPOINT s;
+INSERT INTO spill_test SELECT 'serialize-topbig-subsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
+RELEASE SAVEPOINT s;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling subxact, followed by another spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-subbig-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s1;
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-subbig-subbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
+RELEASE SAVEPOINT s2;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling subxact, followed by not spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-subbig-subsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
+RELEASE SAVEPOINT s1;
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-subbig-subsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
+RELEASE SAVEPOINT s2;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4], COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- not spilling subxact, followed by spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-subsmall-subbig--1:'||g.i FROM generate_series(1, 1) g(i);
+RELEASE SAVEPOINT s1;
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-subsmall-subbig--2:'||g.i FROM generate_series(2, 5001) g(i);
+RELEASE SAVEPOINT s2;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4] COLLATE "C", COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling subxact, containing another spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbig--2:'||g.i FROM generate_series(5001, 10000) g(i);
+RELEASE SAVEPOINT s2;
+RELEASE SAVEPOINT s1;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4] COLLATE "C", COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- spilling subxact, containing a not spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subsmall--1:'||g.i FROM generate_series(1, 5000) g(i);
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subsmall--2:'||g.i FROM generate_series(5001, 5001) g(i);
+RELEASE SAVEPOINT s2;
+RELEASE SAVEPOINT s1;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4] COLLATE "C", COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- not spilling subxact, containing a spilling subxact
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-nested-subsmall-subbig--1:'||g.i FROM generate_series(1, 1) g(i);
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-nested-subsmall-subbig--2:'||g.i FROM generate_series(2, 5001) g(i);
+RELEASE SAVEPOINT s2;
+RELEASE SAVEPOINT s1;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4] COLLATE "C", COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+-- not spilling subxact, containing a spilling subxact that aborts and one that commits
+BEGIN;
+SAVEPOINT s1;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbigabort--1:'||g.i FROM generate_series(1, 5000) g(i);
+SAVEPOINT s2;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbigabort--2:'||g.i FROM generate_series(5001, 10000) g(i);
+ROLLBACK TO SAVEPOINT s2;
+SAVEPOINT s3;
+INSERT INTO spill_test SELECT 'serialize-nested-subbig-subbigabort-subbig-3:'||g.i FROM generate_series(5001, 10000) g(i);
+RELEASE SAVEPOINT s1;
+COMMIT;
+SELECT (regexp_split_to_array(data, ':'))[4] COLLATE "C", COUNT(*), (array_agg(data))[1], (array_agg(data))[count(*)]
+FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL) WHERE data ~ 'INSERT'
+GROUP BY 1 ORDER BY 1;
+
+DROP TABLE spill_test;
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
new file mode 100644
index 0000000..630371f
--- /dev/null
+++ b/contrib/test_decoding/sql/stats.sql
@@ -0,0 +1,56 @@
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM
+ pg_create_logical_replication_slot('regression_slot_stats1', 'test_decoding') s1,
+ pg_create_logical_replication_slot('regression_slot_stats2', 'test_decoding') s2,
+ pg_create_logical_replication_slot('regression_slot_stats3', 'test_decoding') s3;
+
+CREATE TABLE stats_test(data text);
+
+-- non-spilled xact
+SET logical_decoding_work_mem to '64MB';
+INSERT INTO stats_test values(1);
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL, NULL, 'skip-empty-xacts', '1');
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1');
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1');
+SELECT pg_stat_force_next_flush();
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+RESET logical_decoding_work_mem;
+
+-- reset stats for one slot, others should be unaffected
+SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+
+-- reset stats for all slots
+SELECT pg_stat_reset_replication_slot(NULL);
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+
+-- verify accessing/resetting stats for non-existent slot does something reasonable
+SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
+SELECT pg_stat_reset_replication_slot('do-not-exist');
+SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
+
+-- spilling the xact
+BEGIN;
+INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
+COMMIT;
+SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot_stats1', NULL, NULL, 'skip-empty-xacts', '1');
+
+-- Check stats. We can't test the exact stats count as that can vary if any
+-- background transaction (say by autovacuum) happens in parallel to the main
+-- transaction.
+SELECT pg_stat_force_next_flush();
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+
+-- Ensure stats can be repeatedly accessed using the same stats snapshot. See
+-- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
+BEGIN;
+SELECT slot_name FROM pg_stat_replication_slots;
+SELECT slot_name FROM pg_stat_replication_slots;
+COMMIT;
+
+DROP TABLE stats_test;
+SELECT pg_drop_replication_slot('regression_slot_stats1'),
+ pg_drop_replication_slot('regression_slot_stats2'),
+ pg_drop_replication_slot('regression_slot_stats3');
diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql
new file mode 100644
index 0000000..4feec62
--- /dev/null
+++ b/contrib/test_decoding/sql/stream.sql
@@ -0,0 +1,48 @@
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE stream_test(data text);
+
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- streaming test with sub-transaction
+BEGIN;
+savepoint s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+rollback to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+COMMIT;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+-- streaming test for toast changes
+ALTER TABLE stream_test ALTER COLUMN data set storage external;
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 10) g(i);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+-- streaming test for toast with multi-insert
+\COPY stream_test FROM STDIN
+toasted
+toasted
+toasted
+toasted
+toasted
+toasted
+toasted-123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890
+toasted
+toasted
+toasted
+toasted
+toasted
+\.
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+DROP TABLE stream_test;
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/time.sql b/contrib/test_decoding/sql/time.sql
new file mode 100644
index 0000000..a47c973
--- /dev/null
+++ b/contrib/test_decoding/sql/time.sql
@@ -0,0 +1,22 @@
+SET synchronous_commit = on;
+
+CREATE TABLE test_time(data text);
+
+-- remember the current time
+SELECT set_config('test.time_before', NOW()::text, false) IS NOT NULL;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- a single transaction, to get the commit time
+INSERT INTO test_time(data) VALUES ('');
+
+-- parse the commit time from the changeset
+SELECT set_config('test.time_after', regexp_replace(data, '^COMMIT \(at (.*)\)$', '\1'), false) IS NOT NULL
+FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '1')
+WHERE data ~ 'COMMIT' LIMIT 1;
+
+-- ensure commit time is sane in relation to the previous time
+SELECT (time_after - time_before) <= '10 minutes'::interval, time_after >= time_before
+FROM (SELECT current_setting('test.time_after')::timestamptz AS time_after, (SELECT current_setting('test.time_before')::timestamptz) AS time_before) AS d;
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/toast.sql b/contrib/test_decoding/sql/toast.sql
new file mode 100644
index 0000000..d1c560a
--- /dev/null
+++ b/contrib/test_decoding/sql/toast.sql
@@ -0,0 +1,327 @@
+-- predictability
+SET synchronous_commit = on;
+
+DROP TABLE IF EXISTS xpto;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE SEQUENCE xpto_rand_seq START 79 INCREMENT 1499; -- portable "random"
+CREATE TABLE xpto (
+ id serial primary key,
+ toasted_col1 text,
+ rand1 float8 DEFAULT nextval('xpto_rand_seq'),
+ toasted_col2 text,
+ rand2 float8 DEFAULT nextval('xpto_rand_seq')
+);
+
+-- uncompressed external toast data
+INSERT INTO xpto (toasted_col1, toasted_col2) SELECT string_agg(g.i::text, ''), string_agg((g.i*2)::text, '') FROM generate_series(1, 2000) g(i);
+
+-- compressed external toast data
+INSERT INTO xpto (toasted_col2) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
+
+-- update of existing column
+UPDATE xpto SET toasted_col1 = (SELECT string_agg(g.i::text, '') FROM generate_series(1, 2000) g(i)) WHERE id = 1;
+
+UPDATE xpto SET rand1 = 123.456 WHERE id = 1;
+
+-- updating external via INSERT ... ON CONFLICT DO UPDATE
+INSERT INTO xpto(id, toasted_col2) VALUES (2, 'toasted2-upsert')
+ON CONFLICT (id)
+DO UPDATE SET toasted_col2 = EXCLUDED.toasted_col2 || xpto.toasted_col2;
+
+DELETE FROM xpto WHERE id = 1;
+
+DROP TABLE IF EXISTS toasted_key;
+CREATE TABLE toasted_key (
+ id serial,
+ toasted_key text PRIMARY KEY,
+ toasted_col1 text,
+ toasted_col2 text
+);
+
+ALTER TABLE toasted_key ALTER COLUMN toasted_key SET STORAGE EXTERNAL;
+ALTER TABLE toasted_key ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL;
+
+INSERT INTO toasted_key(toasted_key, toasted_col1) VALUES(repeat('1234567890', 200), repeat('9876543210', 200));
+
+-- test update of a toasted key without changing it
+UPDATE toasted_key SET toasted_col2 = toasted_col1;
+-- test update of a toasted key, changing it
+UPDATE toasted_key SET toasted_key = toasted_key || '1';
+
+DELETE FROM toasted_key;
+
+-- Test that HEAP2_MULTI_INSERT insertions with and without toasted
+-- columns are handled correctly
+CREATE TABLE toasted_copy (
+ id int primary key, -- no default, copy didn't use to handle that with multi inserts
+ data text
+);
+ALTER TABLE toasted_copy ALTER COLUMN data SET STORAGE EXTERNAL;
+\copy toasted_copy FROM STDIN
+1 untoasted1
+2 toasted
+3 untoasted2
+4 toasted
+5 untoasted3
+6 untoasted4
+7 untoasted5
+8 untoasted6
+9 untoasted7
+10 untoasted8
+11 untoasted9
+12 untoasted10
+13 untoasted11
+14 untoasted12
+15 untoasted13
+16 untoasted14
+17 untoasted15
+18 untoasted16
+19 untoasted17
+20 untoasted18
+21 untoasted19
+22 untoasted20
+23 untoasted21
+24 untoasted22
+25 untoasted23
+26 untoasted24
+27 untoasted25
+28 untoasted26
+29 untoasted27
+30 untoasted28
+31 untoasted29
+32 untoasted30
+33 untoasted31
+34 untoasted32
+35 untoasted33
+36 untoasted34
+37 untoasted35
+38 untoasted36
+39 untoasted37
+40 untoasted38
+41 untoasted39
+42 untoasted40
+43 untoasted41
+44 untoasted42
+45 untoasted43
+46 untoasted44
+47 untoasted45
+48 untoasted46
+49 untoasted47
+50 untoasted48
+51 untoasted49
+52 untoasted50
+53 untoasted51
+54 untoasted52
+55 untoasted53
+56 untoasted54
+57 untoasted55
+58 untoasted56
+59 untoasted57
+60 untoasted58
+61 untoasted59
+62 untoasted60
+63 untoasted61
+64 untoasted62
+65 untoasted63
+66 untoasted64
+67 untoasted65
+68 untoasted66
+69 untoasted67
+70 untoasted68
+71 untoasted69
+72 untoasted70
+73 untoasted71
+74 untoasted72
+75 untoasted73
+76 untoasted74
+77 untoasted75
+78 untoasted76
+79 untoasted77
+80 untoasted78
+81 untoasted79
+82 untoasted80
+83 untoasted81
+84 untoasted82
+85 untoasted83
+86 untoasted84
+87 untoasted85
+88 untoasted86
+89 untoasted87
+90 untoasted88
+91 untoasted89
+92 untoasted90
+93 untoasted91
+94 untoasted92
+95 untoasted93
+96 untoasted94
+97 untoasted95
+98 untoasted96
+99 untoasted97
+100 untoasted98
+101 untoasted99
+102 untoasted100
+103 untoasted101
+104 untoasted102
+105 untoasted103
+106 untoasted104
+107 untoasted105
+108 untoasted106
+109 untoasted107
+110 untoasted108
+111 untoasted109
+112 untoasted110
+113 untoasted111
+114 untoasted112
+115 untoasted113
+116 untoasted114
+117 untoasted115
+118 untoasted116
+119 untoasted117
+120 untoasted118
+121 untoasted119
+122 untoasted120
+123 untoasted121
+124 untoasted122
+125 untoasted123
+126 untoasted124
+127 untoasted125
+128 untoasted126
+129 untoasted127
+130 untoasted128
+131 untoasted129
+132 untoasted130
+133 untoasted131
+134 untoasted132
+135 untoasted133
+136 untoasted134
+137 untoasted135
+138 untoasted136
+139 untoasted137
+140 untoasted138
+141 untoasted139
+142 untoasted140
+143 untoasted141
+144 untoasted142
+145 untoasted143
+146 untoasted144
+147 untoasted145
+148 untoasted146
+149 untoasted147
+150 untoasted148
+151 untoasted149
+152 untoasted150
+153 untoasted151
+154 untoasted152
+155 untoasted153
+156 untoasted154
+157 untoasted155
+158 untoasted156
+159 untoasted157
+160 untoasted158
+161 untoasted159
+162 untoasted160
+163 untoasted161
+164 untoasted162
+165 untoasted163
+166 untoasted164
+167 untoasted165
+168 untoasted166
+169 untoasted167
+170 untoasted168
+171 untoasted169
+172 untoasted170
+173 untoasted171
+174 untoasted172
+175 untoasted173
+176 untoasted174
+177 untoasted175
+178 untoasted176
+179 untoasted177
+180 untoasted178
+181 untoasted179
+182 untoasted180
+183 untoasted181
+184 untoasted182
+185 untoasted183
+186 untoasted184
+187 untoasted185
+188 untoasted186
+189 untoasted187
+190 untoasted188
+191 untoasted189
+192 untoasted190
+193 untoasted191
+194 untoasted192
+195 untoasted193
+196 untoasted194
+197 untoasted195
+198 untoasted196
+199 untoasted197
+200 untoasted198
+201 toasted
+202 untoasted199
+203 untoasted200
+\.
+SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- test we can decode "old" tuples bigger than the max heap tuple size correctly
+DROP TABLE IF EXISTS toasted_several;
+CREATE TABLE toasted_several (
+ id serial unique not null,
+ toasted_key text primary key,
+ toasted_col1 text,
+ toasted_col2 text
+);
+ALTER TABLE toasted_several REPLICA IDENTITY FULL;
+ALTER TABLE toasted_several ALTER COLUMN toasted_key SET STORAGE EXTERNAL;
+ALTER TABLE toasted_several ALTER COLUMN toasted_col1 SET STORAGE EXTERNAL;
+ALTER TABLE toasted_several ALTER COLUMN toasted_col2 SET STORAGE EXTERNAL;
+
+-- Change the storage of the index back to EXTENDED, separately from
+-- the table. This is currently not doable via DDL, but it is
+-- supported internally.
+UPDATE pg_attribute SET attstorage = 'x' WHERE attrelid = 'toasted_several_pkey'::regclass AND attname = 'toasted_key';
+
+INSERT INTO toasted_several(toasted_key) VALUES(repeat('9876543210', 10000));
+SELECT pg_column_size(toasted_key) > 2^16 FROM toasted_several;
+
+SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- test update of a toasted key without changing it
+UPDATE toasted_several SET toasted_col1 = toasted_key;
+UPDATE toasted_several SET toasted_col2 = toasted_col1;
+
+SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+/*
+ * update with large tuplebuf, in a transaction large enough to force to spool to disk
+ */
+BEGIN;
+INSERT INTO toasted_several(toasted_key) SELECT * FROM generate_series(1, 10234);
+UPDATE toasted_several SET toasted_col1 = toasted_col2 WHERE id = 1;
+DELETE FROM toasted_several WHERE id = 1;
+COMMIT;
+
+DROP TABLE toasted_several;
+
+SELECT regexp_replace(data, '^(.{100}).*(.{100})$', '\1..\2') FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1')
+WHERE data NOT LIKE '%INSERT: %';
+
+/*
+ * Test decoding relation rewrite with toast. The insert into tbl2 within the
+ * same transaction is there to check that there is no remaining toast_hash not
+ * being reset.
+ */
+CREATE TABLE tbl1 (a INT, b TEXT);
+CREATE TABLE tbl2 (a INT);
+ALTER TABLE tbl1 ALTER COLUMN b SET STORAGE EXTERNAL;
+BEGIN;
+INSERT INTO tbl1 VALUES(1, repeat('a', 4000)) ;
+ALTER TABLE tbl1 ADD COLUMN id serial primary key;
+INSERT INTO tbl2 VALUES(1);
+commit;
+SELECT substr(data, 1, 200) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/truncate.sql b/contrib/test_decoding/sql/truncate.sql
new file mode 100644
index 0000000..5633854
--- /dev/null
+++ b/contrib/test_decoding/sql/truncate.sql
@@ -0,0 +1,14 @@
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE tab1 (id serial unique, data int);
+CREATE TABLE tab2 (a int primary key, b int);
+
+TRUNCATE tab1;
+TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
+TRUNCATE tab1, tab2;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/twophase.sql b/contrib/test_decoding/sql/twophase.sql
new file mode 100644
index 0000000..aff5114
--- /dev/null
+++ b/contrib/test_decoding/sql/twophase.sql
@@ -0,0 +1,114 @@
+-- Test prepared transactions. When two-phase-commit is enabled, transactions are
+-- decoded at PREPARE time rather than at COMMIT PREPARED time.
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
+
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+
+-- Test that decoding happens at PREPARE time when two-phase-commit is enabled.
+-- Decoding after COMMIT PREPARED must have all the commands in the transaction.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (1);
+INSERT INTO test_prepared1 VALUES (2);
+-- should show nothing because the xact has not been prepared yet.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+PREPARE TRANSACTION 'test_prepared#1';
+-- should show both the above inserts and the PREPARE TRANSACTION.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test that rollback of a prepared xact is decoded.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (3);
+PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test.
+BEGIN;
+ALTER TABLE test_prepared1 ADD COLUMN data text;
+INSERT INTO test_prepared1 VALUES (4, 'frakbar');
+PREPARE TRANSACTION 'test_prepared#3';
+-- confirm that exclusive lock from the ALTER command is held on test_prepared1 table
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'test_prepared1'::regclass;
+-- The insert should show the newly altered column but not the DDL.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+INSERT INTO test_prepared2 VALUES (5);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (6);
+INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Check 'CLUSTER' (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (8, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (9, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+
+SELECT 'test_prepared1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+ AND relation = 'test_prepared1'::regclass;
+-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding.
+\set env_timeout ''
+\getenv env_timeout PG_TEST_TIMEOUT_DEFAULT
+SELECT COALESCE(NULLIF(:'env_timeout', ''), '180') || 's' AS timeout \gset
+SET statement_timeout = :'timeout';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+RESET statement_timeout;
+COMMIT PREPARED 'test_prepared_lock';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test savepoints and sub-xacts. Creating savepoints will create
+-- sub-xacts implicitly.
+BEGIN;
+CREATE TABLE test_prepared_savepoint (a int);
+INSERT INTO test_prepared_savepoint VALUES (1);
+SAVEPOINT test_savepoint;
+INSERT INTO test_prepared_savepoint VALUES (2);
+ROLLBACK TO SAVEPOINT test_savepoint;
+PREPARE TRANSACTION 'test_prepared_savepoint';
+-- should show only 1, not 2
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared_savepoint';
+-- consume the commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (20);
+PREPARE TRANSACTION 'test_prepared_nodecode';
+-- should show nothing
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared_nodecode';
+-- should be decoded now
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Test 8:
+-- cleanup and make sure results are also empty
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
+-- show results. There should be nothing to show
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/twophase_stream.sql b/contrib/test_decoding/sql/twophase_stream.sql
new file mode 100644
index 0000000..646076d
--- /dev/null
+++ b/contrib/test_decoding/sql/twophase_stream.sql
@@ -0,0 +1,45 @@
+-- Test streaming of two-phase commits
+
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
+
+CREATE TABLE stream_test(data text);
+
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED
+BEGIN;
+SAVEPOINT s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+ROLLBACK TO s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1';
+-- should show the inserts after a ROLLBACK
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+COMMIT PREPARED 'test1';
+--should show the COMMIT PREPARED and the other changes in the transaction
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
+-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
+BEGIN;
+SAVEPOINT s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+ROLLBACK to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+PREPARE TRANSACTION 'test1_nodecode';
+-- should NOT show inserts after a ROLLBACK
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+COMMIT PREPARED 'test1_nodecode';
+-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
+DROP TABLE stream_test;
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/xact.sql b/contrib/test_decoding/sql/xact.sql
new file mode 100644
index 0000000..aa55591
--- /dev/null
+++ b/contrib/test_decoding/sql/xact.sql
@@ -0,0 +1,33 @@
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE xact_test(data text);
+INSERT INTO xact_test VALUES ('before-test');
+
+-- bug #13844, xids in non-decoded records need to be inspected
+BEGIN;
+-- perform operation in xact that creates and logs xid, but isn't decoded
+SELECT * FROM xact_test FOR UPDATE;
+SAVEPOINT foo;
+-- and now actually insert in subxact, xid is expected to be known
+INSERT INTO xact_test VALUES ('after-assignment');
+COMMIT;
+-- and now show those changes
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- bug #14279, do not propagate null snapshot from subtransaction
+BEGIN;
+-- first insert
+INSERT INTO xact_test VALUES ('main-txn');
+SAVEPOINT foo;
+-- now perform operation in subxact that creates and logs xid, but isn't decoded
+SELECT 1 FROM xact_test FOR UPDATE LIMIT 1;
+COMMIT;
+-- and now show those changes
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+DROP TABLE xact_test;
+
+SELECT pg_drop_replication_slot('regression_slot');