summaryrefslogtreecommitdiffstats
path: root/contrib/test_decoding/specs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--contrib/test_decoding/specs/catalog_change_snapshot.spec62
-rw-r--r--contrib/test_decoding/specs/concurrent_ddl_dml.spec88
-rw-r--r--contrib/test_decoding/specs/concurrent_stream.spec43
-rw-r--r--contrib/test_decoding/specs/delayed_startup.spec24
-rw-r--r--contrib/test_decoding/specs/mxact.spec38
-rw-r--r--contrib/test_decoding/specs/oldest_xmin.spec42
-rw-r--r--contrib/test_decoding/specs/ondisk_startup.spec45
-rw-r--r--contrib/test_decoding/specs/slot_creation_error.spec41
-rw-r--r--contrib/test_decoding/specs/snapshot_transfer.spec43
-rw-r--r--contrib/test_decoding/specs/subxact_without_top.spec63
-rw-r--r--contrib/test_decoding/specs/twophase_snapshot.spec53
11 files changed, 542 insertions, 0 deletions
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
new file mode 100644
index 0000000..770dbd6
--- /dev/null
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -0,0 +1,62 @@
+# Test decoding only the commit record of the transaction that have
+# modified catalogs.
+setup
+{
+ DROP TABLE IF EXISTS tbl1;
+ CREATE TABLE tbl1 (val1 integer, val2 integer);
+ CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true);
+}
+
+teardown
+{
+ DROP TABLE tbl1;
+ DROP TABLE user_cat;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
+step "s0_begin" { BEGIN; }
+step "s0_savepoint" { SAVEPOINT sp1; }
+step "s0_truncate" { TRUNCATE tbl1; }
+step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
+step "s0_insert2" { INSERT INTO user_cat VALUES (1); }
+step "s0_commit" { COMMIT; }
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_checkpoint" { CHECKPOINT; }
+step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+
+# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
+# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
+# during the first checkpoint execution. This transaction must be marked as
+# containing catalog changes while decoding the COMMIT record and the decoding
+# of the INSERT record must read the pg_class with the correct historic snapshot.
+#
+# Note that in a case where bgwriter wrote the RUNNING_XACTS record between "s0_commit"
+# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACTS
+# record written by bgwriter. One might think we can either stop the bgwriter or
+# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# Test that we can handle the case where there is no association between top-level
+# transaction and its subtransactions. The last decoding restarts from the first
+# checkpoint, decodes NEW_CID generated by "s0_insert2", and marks the subtransaction
+# as containing catalog changes while adding tuple cids to its top-level transaction.
+# During that, both transaction entries are created in ReorderBuffer as top-level
+# transactions and have the same LSN. We check if the assertion check for the order
+# of transaction LSNs in AssertTXNLsnOrder() is skipped since we are still before the
+# LSN at which we start replaying the contents of transactions. Besides, when decoding
+# the commit record of the top-level transaction, we must force the top-level
+# transaction to do timetravel since one of its subtransactions has been marked as
+# containing catalog changes.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_insert2" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# The last decoding restarts from the first checkpoint and adds invalidation
+# messages generated by "s0_truncate" to the subtransaction. While
+# processing the commit record for the top-level transaction, we decide
+# to skip this xact but ensure that corresponding invalidation messages
+# get processed.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_truncate" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
diff --git a/contrib/test_decoding/specs/concurrent_ddl_dml.spec b/contrib/test_decoding/specs/concurrent_ddl_dml.spec
new file mode 100644
index 0000000..d16515e
--- /dev/null
+++ b/contrib/test_decoding/specs/concurrent_ddl_dml.spec
@@ -0,0 +1,88 @@
+setup
+{
+ DROP TABLE IF EXISTS tbl1;
+ DROP TABLE IF EXISTS tbl2;
+ CREATE TABLE tbl1(val1 integer, val2 integer);
+ CREATE TABLE tbl2(val1 integer, val2 integer);
+}
+
+teardown
+{
+ DROP TABLE tbl1;
+ DROP TABLE tbl2;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
+step "s1_begin" { BEGIN; }
+step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); }
+step "s1_insert_tbl2" { INSERT INTO tbl2 (val1, val2) VALUES (1, 1); }
+step "s1_insert_tbl2_3col" { INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); }
+step "s1_commit" { COMMIT; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2_alter_tbl1_float" { ALTER TABLE tbl1 ALTER COLUMN val2 TYPE float; }
+step "s2_alter_tbl1_char" { ALTER TABLE tbl1 ALTER COLUMN val2 TYPE character varying; }
+step "s2_alter_tbl1_boolean" { ALTER TABLE tbl1 ALTER COLUMN val2 TYPE boolean; }
+
+step "s2_alter_tbl2_float" { ALTER TABLE tbl2 ALTER COLUMN val2 TYPE float; }
+step "s2_alter_tbl2_char" { ALTER TABLE tbl2 ALTER COLUMN val2 TYPE character varying; }
+step "s2_alter_tbl2_text" { ALTER TABLE tbl2 ALTER COLUMN val2 TYPE text; }
+step "s2_alter_tbl2_boolean" { ALTER TABLE tbl2 ALTER COLUMN val2 TYPE boolean; }
+
+step "s2_alter_tbl2_add_int" { ALTER TABLE tbl2 ADD COLUMN val3 INTEGER; }
+step "s2_alter_tbl2_add_float" { ALTER TABLE tbl2 ADD COLUMN val3 FLOAT; }
+step "s2_alter_tbl2_add_char" { ALTER TABLE tbl2 ADD COLUMN val3 character varying; }
+step "s2_alter_tbl2_add_text" { ALTER TABLE tbl2 ADD COLUMN val3 TEXT; }
+step "s2_alter_tbl2_drop_3rd_col" { ALTER TABLE tbl2 DROP COLUMN val3; }
+step "s2_alter_tbl2_3rd_char" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE character varying; }
+step "s2_alter_tbl2_3rd_text" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE text; }
+step "s2_alter_tbl2_3rd_int" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE int USING val3::integer; }
+
+step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
+
+
+
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_float" "s1_insert_tbl2" "s1_commit" "s2_get_changes"
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl1_float" "s1_insert_tbl2" "s1_commit" "s2_get_changes"
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_char" "s1_insert_tbl2" "s1_commit" "s2_get_changes"
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl1_char" "s1_insert_tbl2" "s1_commit" "s2_get_changes"
+
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2" "s2_alter_tbl1_float" "s1_commit" "s2_get_changes"
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2" "s2_alter_tbl1_char" "s1_commit" "s2_get_changes"
+
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_float" "s1_insert_tbl2" "s2_alter_tbl1_float" "s1_commit" "s2_get_changes"
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_char" "s1_insert_tbl2" "s2_alter_tbl1_char" "s1_commit" "s2_get_changes"
+
+permutation "s1_init" "s2_alter_tbl2_char" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_text" "s1_insert_tbl2" "s1_commit" "s2_get_changes"
+permutation "s1_init" "s2_alter_tbl2_char" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_text" "s1_insert_tbl2" "s2_alter_tbl1_char" "s1_commit" "s2_get_changes"
+
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_boolean" "s1_insert_tbl2" "s1_commit" "s2_get_changes"
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_boolean" "s1_insert_tbl2" "s2_alter_tbl1_boolean" "s1_commit" "s2_get_changes"
+
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_add_int" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes"
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2" "s1_commit" "s1_begin" "s2_alter_tbl2_add_int" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes"
+
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_add_float" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes"
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2" "s1_commit" "s1_begin" "s2_alter_tbl2_add_float" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes"
+
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_add_char" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes"
+permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2" "s1_commit" "s1_begin" "s2_alter_tbl2_add_char" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes"
+
+permutation "s1_init" "s2_alter_tbl2_add_int" "s1_begin" "s1_insert_tbl2_3col" "s2_alter_tbl2_drop_3rd_col" "s1_commit" "s2_get_changes"
+permutation "s1_init" "s2_alter_tbl2_add_int" "s1_begin" "s1_insert_tbl2_3col" "s2_alter_tbl2_drop_3rd_col" "s1_insert_tbl2" "s1_commit" "s1_insert_tbl2" "s2_get_changes"
+
+permutation "s1_init" "s2_alter_tbl2_add_int" "s1_begin" "s1_insert_tbl2_3col" "s2_alter_tbl2_drop_3rd_col" "s1_commit" "s2_get_changes" "s2_alter_tbl2_add_text" "s1_begin" "s1_insert_tbl2_3col" "s2_alter_tbl2_3rd_char" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes" "s2_alter_tbl2_3rd_int" "s1_insert_tbl2_3col" "s2_get_changes"
+
+permutation "s1_init" "s2_alter_tbl2_add_char" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2_3col" "s2_alter_tbl2_3rd_text" "s1_insert_tbl2_3col" "s1_commit" "s1_insert_tbl2_3col" "s2_get_changes"
+permutation "s1_init" "s2_alter_tbl2_add_text" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2_3col" "s2_alter_tbl2_3rd_char" "s1_insert_tbl2_3col" "s1_commit" "s1_insert_tbl2_3col" "s2_get_changes"
+
+permutation "s1_init" "s2_alter_tbl2_add_char" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_3rd_text" "s1_insert_tbl2_3col" "s1_commit" "s2_alter_tbl2_drop_3rd_col" "s1_insert_tbl2" "s2_get_changes"
+permutation "s1_init" "s2_alter_tbl2_add_text" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_3rd_char" "s1_insert_tbl2_3col" "s1_commit" "s2_alter_tbl2_drop_3rd_col" "s1_insert_tbl2" "s2_get_changes"
+
+permutation "s1_init" "s2_alter_tbl2_add_char" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_drop_3rd_col" "s1_insert_tbl1" "s1_commit" "s2_get_changes"
diff --git a/contrib/test_decoding/specs/concurrent_stream.spec b/contrib/test_decoding/specs/concurrent_stream.spec
new file mode 100644
index 0000000..54218a4
--- /dev/null
+++ b/contrib/test_decoding/specs/concurrent_stream.spec
@@ -0,0 +1,43 @@
+# Test decoding of in-progress transaction containing dml and a concurrent
+# transaction with ddl operation. The transaction containing ddl operation
+# should not get streamed as it doesn't have any changes.
+
+setup
+{
+ SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+
+ -- consume DDL
+ SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS 'select array_agg(md5(g::text))::text from generate_series(1, 80000) g';
+}
+
+teardown
+{
+ DROP TABLE IF EXISTS stream_test;
+ DROP TABLE IF EXISTS stream_test1;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_begin" { BEGIN; }
+step "s0_ddl" {CREATE TABLE stream_test1(data text);}
+
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_ddl" {CREATE TABLE stream_test2(data text);}
+
+# The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to
+# the currently running s0_ddl and we want to test that s0_ddl should not get
+# streamed when user asked to skip-empty-xacts. Similarly, the
+# INTERNAL_SNAPSHOT change added by s2_ddl should not change the results for
+# what gets streamed.
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_ddl" { CREATE TABLE stream_test(data text); }
+step "s1_begin" { BEGIN; }
+step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();}
+step "s1_commit" { COMMIT; }
+step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');}
+
+permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s2_ddl" "s1_commit" "s1_get_stream_changes"
diff --git a/contrib/test_decoding/specs/delayed_startup.spec b/contrib/test_decoding/specs/delayed_startup.spec
new file mode 100644
index 0000000..b7fe814
--- /dev/null
+++ b/contrib/test_decoding/specs/delayed_startup.spec
@@ -0,0 +1,24 @@
+setup
+{
+ DROP TABLE IF EXISTS do_write;
+ CREATE TABLE do_write(id serial primary key);
+}
+
+teardown
+{
+ DROP TABLE do_write;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1b" { BEGIN ISOLATION LEVEL SERIALIZABLE; }
+step "s1w" { INSERT INTO do_write DEFAULT VALUES; }
+step "s1c" { COMMIT; }
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');}
+step "s2start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');}
+
+
+permutation "s1b" "s1w" "s2init" "s1c" "s2start" "s1b" "s1w" "s1c" "s2start" "s1b" "s1w" "s2start" "s1c" "s2start"
diff --git a/contrib/test_decoding/specs/mxact.spec b/contrib/test_decoding/specs/mxact.spec
new file mode 100644
index 0000000..ea5b1aa
--- /dev/null
+++ b/contrib/test_decoding/specs/mxact.spec
@@ -0,0 +1,38 @@
+setup
+{
+ DROP TABLE IF EXISTS do_write;
+ CREATE TABLE do_write(id serial primary key);
+}
+
+teardown
+{
+ DROP TABLE IF EXISTS do_write;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');}
+step "s0start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');}
+step "s0alter" {ALTER TABLE do_write ADD column ts timestamptz; }
+step "s0w" { INSERT INTO do_write DEFAULT VALUES; }
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1begin" {BEGIN;}
+step "s1sharepgclass" { SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR SHARE) s; }
+step "s1keysharepgclass" { SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR KEY SHARE) s; }
+step "s1commit" {COMMIT;}
+
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2begin" {BEGIN;}
+step "s2sharepgclass" { SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR SHARE) s; }
+step "s2keysharepgclass" { SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR KEY SHARE) s; }
+step "s2commit" {COMMIT;}
+
+# test that we're handling an update-only mxact xmax correctly
+permutation "s0init" "s0start" "s1begin" "s1sharepgclass" "s2begin" "s2sharepgclass" "s0w" "s0start" "s2commit" "s1commit"
+
+# test that we're handling an update-only mxact xmax correctly
+permutation "s0init" "s0start" "s1begin" "s1keysharepgclass" "s2begin" "s2keysharepgclass" "s0alter" "s0w" "s0start" "s2commit" "s1commit"
diff --git a/contrib/test_decoding/specs/oldest_xmin.spec b/contrib/test_decoding/specs/oldest_xmin.spec
new file mode 100644
index 0000000..88bd30f
--- /dev/null
+++ b/contrib/test_decoding/specs/oldest_xmin.spec
@@ -0,0 +1,42 @@
+# Test advancement of the slot's oldest xmin
+
+setup
+{
+ SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact
+ DROP TYPE IF EXISTS basket;
+ CREATE TYPE basket AS (apples integer, pears integer, mangos integer);
+ DROP TABLE IF EXISTS harvest;
+ CREATE TABLE harvest(fruits basket);
+}
+
+teardown
+{
+ DROP TABLE IF EXISTS harvest;
+ DROP TYPE IF EXISTS basket;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_begin" { BEGIN; }
+step "s0_getxid" { SELECT pg_current_xact_id() IS NULL; }
+step "s0_alter" { ALTER TYPE basket DROP ATTRIBUTE mangos; }
+step "s0_commit" { COMMIT; }
+step "s0_checkpoint" { CHECKPOINT; }
+step "s0_vacuum" { VACUUM pg_attribute; }
+step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_begin" { BEGIN; }
+step "s1_insert" { INSERT INTO harvest VALUES ((1, 2, 3)); }
+step "s1_commit" { COMMIT; }
+
+# Checkpoint with following get_changes forces xmin advancement. We do
+# get_changes twice because if one more xl_running_xacts record had slipped
+# before our CHECKPOINT, xmin will be advanced only on this record, thus not
+# reaching value needed for vacuuming corresponding pg_attribute entry. ALTER of
+# composite type is a rare form of DDL which allows T1 to see the tuple which
+# will be removed (xmax set) before T1 commits. That is, interlocking doesn't
+# forbid modifying catalog after someone read it (and didn't commit yet).
+permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_get_changes" "s0_get_changes" "s1_commit" "s0_vacuum" "s0_get_changes"
diff --git a/contrib/test_decoding/specs/ondisk_startup.spec b/contrib/test_decoding/specs/ondisk_startup.spec
new file mode 100644
index 0000000..96ce87f
--- /dev/null
+++ b/contrib/test_decoding/specs/ondisk_startup.spec
@@ -0,0 +1,45 @@
+# Force usage of ondisk decoding snapshots to test that code path.
+setup
+{
+ DROP TABLE IF EXISTS do_write;
+ CREATE TABLE do_write(id serial primary key);
+}
+
+teardown
+{
+ DROP TABLE do_write;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');}
+step "s1start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');}
+step "s1insert" { INSERT INTO do_write DEFAULT VALUES; }
+step "s1checkpoint" { CHECKPOINT; }
+step "s1alter" { ALTER TABLE do_write ADD COLUMN addedbys1 int; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2b" { BEGIN; }
+step "s2txid" { SELECT pg_current_xact_id() IS NULL; }
+step "s2alter" { ALTER TABLE do_write ADD COLUMN addedbys2 int; }
+step "s2c" { COMMIT; }
+
+
+session "s3"
+setup { SET synchronous_commit=on; }
+
+step "s3b" { BEGIN; }
+step "s3txid" { SELECT pg_current_xact_id() IS NULL; }
+step "s3c" { COMMIT; }
+
+# Force usage of ondisk snapshot by starting and not finishing a
+# transaction with an assigned xid after consistency has been
+# reached. In combination with a checkpoint forcing a snapshot to be
+# written and a new restart point computed that'll lead to the usage
+# of the snapshot.
+permutation "s2b" "s2txid" "s1init" "s3b" "s3txid" "s2alter" "s2c" "s2b" "s2txid" "s3c" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start"
diff --git a/contrib/test_decoding/specs/slot_creation_error.spec b/contrib/test_decoding/specs/slot_creation_error.spec
new file mode 100644
index 0000000..d1e35bf
--- /dev/null
+++ b/contrib/test_decoding/specs/slot_creation_error.spec
@@ -0,0 +1,41 @@
+# Test that erroring out during logical slot creation is handled properly
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step s1_b { BEGIN; }
+step s1_xid { SELECT 'xid' FROM txid_current(); }
+step s1_c { COMMIT; }
+step s1_cancel_s2 {
+ SELECT pg_cancel_backend(pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/slot_creation_error/s2';
+}
+
+step s1_terminate_s2 {
+ SELECT pg_terminate_backend(pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/slot_creation_error/s2';
+}
+
+step s1_view_slot {
+ SELECT slot_name, slot_type, active FROM pg_replication_slots WHERE slot_name = 'slot_creation_error'
+}
+
+step s1_drop_slot {
+ SELECT pg_drop_replication_slot('slot_creation_error');
+}
+
+session s2
+setup { SET synchronous_commit=on; }
+step s2_init {
+ SELECT 'init' FROM pg_create_logical_replication_slot('slot_creation_error', 'test_decoding');
+}
+
+# The tests first start a transaction with an xid assigned in s1, then create
+# a slot in s2. The slot creation waits for s1's transaction to end. Instead
+# we cancel / terminate s2.
+permutation s1_b s1_xid s2_init s1_view_slot s1_cancel_s2(s2_init) s1_view_slot s1_c
+permutation s1_b s1_xid s2_init s1_c s1_view_slot s1_drop_slot # check slot creation still works
+permutation s1_b s1_xid s2_init s1_terminate_s2(s2_init) s1_c s1_view_slot
+# can't run tests after this, due to s2's connection failure
diff --git a/contrib/test_decoding/specs/snapshot_transfer.spec b/contrib/test_decoding/specs/snapshot_transfer.spec
new file mode 100644
index 0000000..152f2fd
--- /dev/null
+++ b/contrib/test_decoding/specs/snapshot_transfer.spec
@@ -0,0 +1,43 @@
+# Test snapshot transfer from subxact to top-level and receival of later snaps.
+
+setup
+{
+ SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact
+ DROP TABLE IF EXISTS dummy;
+ CREATE TABLE dummy(i int);
+ DROP TABLE IF EXISTS harvest;
+ CREATE TABLE harvest(apples int, pears int);
+}
+
+teardown
+{
+ DROP TABLE IF EXISTS harvest;
+ DROP TABLE IF EXISTS dummy;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_begin" { BEGIN; }
+step "s0_begin_sub0" { SAVEPOINT s0; }
+step "s0_log_assignment" { SELECT pg_current_xact_id() IS NULL; }
+step "s0_begin_sub1" { SAVEPOINT s1; }
+step "s0_sub_get_base_snap" { INSERT INTO dummy VALUES (0); }
+step "s0_insert" { INSERT INTO harvest VALUES (1, 2, 3); }
+step "s0_end_sub0" { RELEASE SAVEPOINT s0; }
+step "s0_end_sub1" { RELEASE SAVEPOINT s1; }
+step "s0_commit" { COMMIT; }
+step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_produce_new_snap" { ALTER TABLE harvest ADD COLUMN mangos int; }
+
+# start top-level without base snap, get base snap in subxact, then create new
+# snap and make sure it is queued.
+permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub0" "s0_commit" "s0_get_changes"
+
+# In previous test, we firstly associated subxact with xact and only then got
+# base snap; now nest one more subxact to get snap first and only then (at
+# commit) associate it with toplevel.
+permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_begin_sub1" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub1" "s0_end_sub0" "s0_commit" "s0_get_changes"
diff --git a/contrib/test_decoding/specs/subxact_without_top.spec b/contrib/test_decoding/specs/subxact_without_top.spec
new file mode 100644
index 0000000..76688c7
--- /dev/null
+++ b/contrib/test_decoding/specs/subxact_without_top.spec
@@ -0,0 +1,63 @@
+# Test decoding of subtransactions whose top-transaction is before restart
+# point. Such transactions won't be streamed as we stream only complete
+# transactions, but it is good to test that they don't cause any problem.
+
+setup
+{
+ SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact
+ CREATE TABLE harvest(apples integer);
+ CREATE OR REPLACE FUNCTION subxacts() returns void as $$
+ BEGIN
+ FOR i in 1 .. 128 LOOP
+ BEGIN
+ INSERT INTO harvest VALUES (42);
+ EXCEPTION
+ WHEN OTHERS THEN
+ RAISE;
+ END;
+ END LOOP;
+ END; $$LANGUAGE 'plpgsql';
+}
+
+teardown
+{
+ DROP TABLE IF EXISTS harvest;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_begin" { BEGIN; }
+step "s0_first_subxact" {
+ DO LANGUAGE plpgsql $$
+ BEGIN
+ BEGIN
+ INSERT INTO harvest VALUES (41);
+ EXCEPTION WHEN OTHERS THEN RAISE;
+ END;
+ END $$;
+}
+step "s0_many_subxacts" { select subxacts(); }
+step "s0_commit" { COMMIT; }
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_begin" { BEGIN; }
+step "s1_dml" { INSERT INTO harvest VALUES (43); }
+step "s1_commit" { COMMIT; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_checkpoint" { CHECKPOINT; }
+step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
+step "s2_get_changes_suppress_output" { SELECT null n FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') GROUP BY n; }
+
+# The first checkpoint establishes the potential restart point (aka
+# restart_lsn) for the slot after the initial subxact. The second checkpoint
+# followed by get_changes will ensure that the potential restart point will
+# become the actual restart point. We do get_changes twice because if one
+# more xl_running_xacts record had slipped before our s0_commit, then the
+# potential restart point won't become actual restart point. The s1's open
+# transaction till get_changes holds the potential restart point to our first
+# checkpoint location.
+permutation "s0_begin" "s0_first_subxact" "s2_checkpoint" "s1_begin" "s1_dml" "s0_many_subxacts" "s0_commit" "s2_checkpoint" "s2_get_changes_suppress_output" "s2_get_changes_suppress_output" "s1_commit" "s2_get_changes"
diff --git a/contrib/test_decoding/specs/twophase_snapshot.spec b/contrib/test_decoding/specs/twophase_snapshot.spec
new file mode 100644
index 0000000..e8d9567
--- /dev/null
+++ b/contrib/test_decoding/specs/twophase_snapshot.spec
@@ -0,0 +1,53 @@
+# Test decoding of two-phase transactions during the build of a consistent snapshot.
+setup
+{
+ DROP TABLE IF EXISTS do_write;
+ CREATE TABLE do_write(id serial primary key);
+}
+
+teardown
+{
+ DROP TABLE do_write;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+
+session "s1"
+setup { SET synchronous_commit=on; }
+
+step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true);}
+step "s1start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');}
+step "s1insert" { INSERT INTO do_write DEFAULT VALUES; }
+
+session "s2"
+setup { SET synchronous_commit=on; }
+
+step "s2b" { BEGIN; }
+step "s2txid" { SELECT pg_current_xact_id() IS NULL; }
+step "s2c" { COMMIT; }
+step "s2insert" { INSERT INTO do_write DEFAULT VALUES; }
+step "s2p" { PREPARE TRANSACTION 'test1'; }
+step "s2cp" { COMMIT PREPARED 'test1'; }
+
+
+session "s3"
+setup { SET synchronous_commit=on; }
+
+step "s3b" { BEGIN; }
+step "s3txid" { SELECT pg_current_xact_id() IS NULL; }
+step "s3c" { COMMIT; }
+
+# Force building of a consistent snapshot between a PREPARE and COMMIT PREPARED
+# and ensure that the whole transaction is decoded at the time of COMMIT
+# PREPARED.
+#
+# 's1init' step will initialize the replication slot and cause logical decoding
+# to wait in initial starting point till the in-progress transaction in s2 is
+# committed. 's2c' step will cause logical decoding to go to initial consistent
+# point and wait for in-progress transaction s3 to commit. 's3c' step will cause
+# logical decoding to find a consistent point while the transaction s2 is
+# prepared and not yet committed. This will cause the first s1start to skip
+# prepared transaction s2 as that will be before consistent point. The second
+# s1start will allow decoding of skipped prepare along with commit prepared done
+# as part of s2cp.
+permutation "s2b" "s2txid" "s1init" "s3b" "s3txid" "s2c" "s2b" "s2insert" "s2p" "s3c" "s1insert" "s1start" "s2cp" "s1start"