diff options
Diffstat (limited to '')
-rw-r--r-- | contrib/test_decoding/specs/catalog_change_snapshot.spec | 62 | ||||
-rw-r--r-- | contrib/test_decoding/specs/concurrent_ddl_dml.spec | 88 | ||||
-rw-r--r-- | contrib/test_decoding/specs/concurrent_stream.spec | 43 | ||||
-rw-r--r-- | contrib/test_decoding/specs/delayed_startup.spec | 24 | ||||
-rw-r--r-- | contrib/test_decoding/specs/mxact.spec | 38 | ||||
-rw-r--r-- | contrib/test_decoding/specs/oldest_xmin.spec | 42 | ||||
-rw-r--r-- | contrib/test_decoding/specs/ondisk_startup.spec | 45 | ||||
-rw-r--r-- | contrib/test_decoding/specs/slot_creation_error.spec | 41 | ||||
-rw-r--r-- | contrib/test_decoding/specs/snapshot_transfer.spec | 43 | ||||
-rw-r--r-- | contrib/test_decoding/specs/subxact_without_top.spec | 63 | ||||
-rw-r--r-- | contrib/test_decoding/specs/twophase_snapshot.spec | 53 |
11 files changed, 542 insertions, 0 deletions
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec new file mode 100644 index 0000000..770dbd6 --- /dev/null +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -0,0 +1,62 @@ +# Test decoding only the commit record of the transaction that have +# modified catalogs. +setup +{ + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1 (val1 integer, val2 integer); + CREATE TABLE user_cat (val1 integer) WITH (user_catalog_table = true); +} + +teardown +{ + DROP TABLE tbl1; + DROP TABLE user_cat; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_savepoint" { SAVEPOINT sp1; } +step "s0_truncate" { TRUNCATE tbl1; } +step "s0_insert" { INSERT INTO tbl1 VALUES (1); } +step "s0_insert2" { INSERT INTO user_cat VALUES (1); } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit=on; } +step "s1_checkpoint" { CHECKPOINT; } +step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + +# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes +# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted +# during the first checkpoint execution. This transaction must be marked as +# containing catalog changes while decoding the COMMIT record and the decoding +# of the INSERT record must read the pg_class with the correct historic snapshot. +# +# Note that in a case where bgwriter wrote the RUNNING_XACTS record between "s0_commit" +# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACTS +# record written by bgwriter. One might think we can either stop the bgwriter or +# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" + +# Test that we can handle the case where there is no association between top-level +# transaction and its subtransactions. The last decoding restarts from the first +# checkpoint, decodes NEW_CID generated by "s0_insert2", and marks the subtransaction +# as containing catalog changes while adding tuple cids to its top-level transaction. +# During that, both transaction entries are created in ReorderBuffer as top-level +# transactions and have the same LSN. We check if the assertion check for the order +# of transaction LSNs in AssertTXNLsnOrder() is skipped since we are still before the +# LSN at which we start replaying the contents of transactions. Besides, when decoding +# the commit record of the top-level transaction, we must force the top-level +# transaction to do timetravel since one of its subtransactions has been marked as +# containing catalog changes. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_insert2" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" + +# The last decoding restarts from the first checkpoint and adds invalidation +# messages generated by "s0_truncate" to the subtransaction. While +# processing the commit record for the top-level transaction, we decide +# to skip this xact but ensure that corresponding invalidation messages +# get processed. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_truncate" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" diff --git a/contrib/test_decoding/specs/concurrent_ddl_dml.spec b/contrib/test_decoding/specs/concurrent_ddl_dml.spec new file mode 100644 index 0000000..d16515e --- /dev/null +++ b/contrib/test_decoding/specs/concurrent_ddl_dml.spec @@ -0,0 +1,88 @@ +setup +{ + DROP TABLE IF EXISTS tbl1; + DROP TABLE IF EXISTS tbl2; + CREATE TABLE tbl1(val1 integer, val2 integer); + CREATE TABLE tbl2(val1 integer, val2 integer); +} + +teardown +{ + DROP TABLE tbl1; + DROP TABLE tbl2; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s1" +setup { SET synchronous_commit=on; } + +step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); } +step "s1_begin" { BEGIN; } +step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); } +step "s1_insert_tbl2" { INSERT INTO tbl2 (val1, val2) VALUES (1, 1); } +step "s1_insert_tbl2_3col" { INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1); } +step "s1_commit" { COMMIT; } + +session "s2" +setup { SET synchronous_commit=on; } + +step "s2_alter_tbl1_float" { ALTER TABLE tbl1 ALTER COLUMN val2 TYPE float; } +step "s2_alter_tbl1_char" { ALTER TABLE tbl1 ALTER COLUMN val2 TYPE character varying; } +step "s2_alter_tbl1_boolean" { ALTER TABLE tbl1 ALTER COLUMN val2 TYPE boolean; } + +step "s2_alter_tbl2_float" { ALTER TABLE tbl2 ALTER COLUMN val2 TYPE float; } +step "s2_alter_tbl2_char" { ALTER TABLE tbl2 ALTER COLUMN val2 TYPE character varying; } +step "s2_alter_tbl2_text" { ALTER TABLE tbl2 ALTER COLUMN val2 TYPE text; } +step "s2_alter_tbl2_boolean" { ALTER TABLE tbl2 ALTER COLUMN val2 TYPE boolean; } + +step "s2_alter_tbl2_add_int" { ALTER TABLE tbl2 ADD COLUMN val3 INTEGER; } +step "s2_alter_tbl2_add_float" { ALTER TABLE tbl2 ADD COLUMN val3 FLOAT; } +step "s2_alter_tbl2_add_char" { ALTER TABLE tbl2 ADD COLUMN val3 character varying; } +step "s2_alter_tbl2_add_text" { ALTER TABLE tbl2 ADD COLUMN val3 TEXT; } +step "s2_alter_tbl2_drop_3rd_col" { ALTER TABLE tbl2 DROP COLUMN val3; } +step "s2_alter_tbl2_3rd_char" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE character varying; } +step "s2_alter_tbl2_3rd_text" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE text; } +step "s2_alter_tbl2_3rd_int" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE int USING val3::integer; } + +step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } + + + +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_float" "s1_insert_tbl2" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl1_float" "s1_insert_tbl2" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_char" "s1_insert_tbl2" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl1_char" "s1_insert_tbl2" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2" "s2_alter_tbl1_float" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2" "s2_alter_tbl1_char" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_float" "s1_insert_tbl2" "s2_alter_tbl1_float" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_char" "s1_insert_tbl2" "s2_alter_tbl1_char" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s2_alter_tbl2_char" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_text" "s1_insert_tbl2" "s1_commit" "s2_get_changes" +permutation "s1_init" "s2_alter_tbl2_char" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_text" "s1_insert_tbl2" "s2_alter_tbl1_char" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_boolean" "s1_insert_tbl2" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_boolean" "s1_insert_tbl2" "s2_alter_tbl1_boolean" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_add_int" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2" "s1_commit" "s1_begin" "s2_alter_tbl2_add_int" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_add_float" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2" "s1_commit" "s1_begin" "s2_alter_tbl2_add_float" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_add_char" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes" +permutation "s1_init" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2" "s1_commit" "s1_begin" "s2_alter_tbl2_add_char" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes" + +permutation "s1_init" "s2_alter_tbl2_add_int" "s1_begin" "s1_insert_tbl2_3col" "s2_alter_tbl2_drop_3rd_col" "s1_commit" "s2_get_changes" +permutation "s1_init" "s2_alter_tbl2_add_int" "s1_begin" "s1_insert_tbl2_3col" "s2_alter_tbl2_drop_3rd_col" "s1_insert_tbl2" "s1_commit" "s1_insert_tbl2" "s2_get_changes" + +permutation "s1_init" "s2_alter_tbl2_add_int" "s1_begin" "s1_insert_tbl2_3col" "s2_alter_tbl2_drop_3rd_col" "s1_commit" "s2_get_changes" "s2_alter_tbl2_add_text" "s1_begin" "s1_insert_tbl2_3col" "s2_alter_tbl2_3rd_char" "s1_insert_tbl2_3col" "s1_commit" "s2_get_changes" "s2_alter_tbl2_3rd_int" "s1_insert_tbl2_3col" "s2_get_changes" + +permutation "s1_init" "s2_alter_tbl2_add_char" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2_3col" "s2_alter_tbl2_3rd_text" "s1_insert_tbl2_3col" "s1_commit" "s1_insert_tbl2_3col" "s2_get_changes" +permutation "s1_init" "s2_alter_tbl2_add_text" "s1_begin" "s1_insert_tbl1" "s1_insert_tbl2_3col" "s2_alter_tbl2_3rd_char" "s1_insert_tbl2_3col" "s1_commit" "s1_insert_tbl2_3col" "s2_get_changes" + +permutation "s1_init" "s2_alter_tbl2_add_char" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_3rd_text" "s1_insert_tbl2_3col" "s1_commit" "s2_alter_tbl2_drop_3rd_col" "s1_insert_tbl2" "s2_get_changes" +permutation "s1_init" "s2_alter_tbl2_add_text" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_3rd_char" "s1_insert_tbl2_3col" "s1_commit" "s2_alter_tbl2_drop_3rd_col" "s1_insert_tbl2" "s2_get_changes" + +permutation "s1_init" "s2_alter_tbl2_add_char" "s1_begin" "s1_insert_tbl1" "s2_alter_tbl2_drop_3rd_col" "s1_insert_tbl1" "s1_commit" "s2_get_changes" diff --git a/contrib/test_decoding/specs/concurrent_stream.spec b/contrib/test_decoding/specs/concurrent_stream.spec new file mode 100644 index 0000000..54218a4 --- /dev/null +++ b/contrib/test_decoding/specs/concurrent_stream.spec @@ -0,0 +1,43 @@ +# Test decoding of in-progress transaction containing dml and a concurrent +# transaction with ddl operation. The transaction containing ddl operation +# should not get streamed as it doesn't have any changes. + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); + + -- consume DDL + SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + CREATE OR REPLACE FUNCTION large_val() RETURNS TEXT LANGUAGE SQL AS 'select array_agg(md5(g::text))::text from generate_series(1, 80000) g'; +} + +teardown +{ + DROP TABLE IF EXISTS stream_test; + DROP TABLE IF EXISTS stream_test1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0_begin" { BEGIN; } +step "s0_ddl" {CREATE TABLE stream_test1(data text);} + +session "s2" +setup { SET synchronous_commit=on; } +step "s2_ddl" {CREATE TABLE stream_test2(data text);} + +# The transaction commit for s1_ddl will add the INTERNAL_SNAPSHOT change to +# the currently running s0_ddl and we want to test that s0_ddl should not get +# streamed when user asked to skip-empty-xacts. Similarly, the +# INTERNAL_SNAPSHOT change added by s2_ddl should not change the results for +# what gets streamed. +session "s1" +setup { SET synchronous_commit=on; } +step "s1_ddl" { CREATE TABLE stream_test(data text); } +step "s1_begin" { BEGIN; } +step "s1_toast_insert" {INSERT INTO stream_test SELECT large_val();} +step "s1_commit" { COMMIT; } +step "s1_get_stream_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');} + +permutation "s0_begin" "s0_ddl" "s1_ddl" "s1_begin" "s1_toast_insert" "s2_ddl" "s1_commit" "s1_get_stream_changes" diff --git a/contrib/test_decoding/specs/delayed_startup.spec b/contrib/test_decoding/specs/delayed_startup.spec new file mode 100644 index 0000000..b7fe814 --- /dev/null +++ b/contrib/test_decoding/specs/delayed_startup.spec @@ -0,0 +1,24 @@ +setup +{ + DROP TABLE IF EXISTS do_write; + CREATE TABLE do_write(id serial primary key); +} + +teardown +{ + DROP TABLE do_write; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s1" +setup { SET synchronous_commit=on; } +step "s1b" { BEGIN ISOLATION LEVEL SERIALIZABLE; } +step "s1w" { INSERT INTO do_write DEFAULT VALUES; } +step "s1c" { COMMIT; } +session "s2" +setup { SET synchronous_commit=on; } +step "s2init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');} +step "s2start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');} + + +permutation "s1b" "s1w" "s2init" "s1c" "s2start" "s1b" "s1w" "s1c" "s2start" "s1b" "s1w" "s2start" "s1c" "s2start" diff --git a/contrib/test_decoding/specs/mxact.spec b/contrib/test_decoding/specs/mxact.spec new file mode 100644 index 0000000..ea5b1aa --- /dev/null +++ b/contrib/test_decoding/specs/mxact.spec @@ -0,0 +1,38 @@ +setup +{ + DROP TABLE IF EXISTS do_write; + CREATE TABLE do_write(id serial primary key); +} + +teardown +{ + DROP TABLE IF EXISTS do_write; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');} +step "s0start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');} +step "s0alter" {ALTER TABLE do_write ADD column ts timestamptz; } +step "s0w" { INSERT INTO do_write DEFAULT VALUES; } + +session "s1" +setup { SET synchronous_commit=on; } +step "s1begin" {BEGIN;} +step "s1sharepgclass" { SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR SHARE) s; } +step "s1keysharepgclass" { SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR KEY SHARE) s; } +step "s1commit" {COMMIT;} + +session "s2" +setup { SET synchronous_commit=on; } +step "s2begin" {BEGIN;} +step "s2sharepgclass" { SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR SHARE) s; } +step "s2keysharepgclass" { SELECT count(*) > 1 FROM (SELECT * FROM pg_class FOR KEY SHARE) s; } +step "s2commit" {COMMIT;} + +# test that we're handling an update-only mxact xmax correctly +permutation "s0init" "s0start" "s1begin" "s1sharepgclass" "s2begin" "s2sharepgclass" "s0w" "s0start" "s2commit" "s1commit" + +# test that we're handling an update-only mxact xmax correctly +permutation "s0init" "s0start" "s1begin" "s1keysharepgclass" "s2begin" "s2keysharepgclass" "s0alter" "s0w" "s0start" "s2commit" "s1commit" diff --git a/contrib/test_decoding/specs/oldest_xmin.spec b/contrib/test_decoding/specs/oldest_xmin.spec new file mode 100644 index 0000000..88bd30f --- /dev/null +++ b/contrib/test_decoding/specs/oldest_xmin.spec @@ -0,0 +1,42 @@ +# Test advancement of the slot's oldest xmin + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact + DROP TYPE IF EXISTS basket; + CREATE TYPE basket AS (apples integer, pears integer, mangos integer); + DROP TABLE IF EXISTS harvest; + CREATE TABLE harvest(fruits basket); +} + +teardown +{ + DROP TABLE IF EXISTS harvest; + DROP TYPE IF EXISTS basket; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0_begin" { BEGIN; } +step "s0_getxid" { SELECT pg_current_xact_id() IS NULL; } +step "s0_alter" { ALTER TYPE basket DROP ATTRIBUTE mangos; } +step "s0_commit" { COMMIT; } +step "s0_checkpoint" { CHECKPOINT; } +step "s0_vacuum" { VACUUM pg_attribute; } +step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } + +session "s1" +setup { SET synchronous_commit=on; } +step "s1_begin" { BEGIN; } +step "s1_insert" { INSERT INTO harvest VALUES ((1, 2, 3)); } +step "s1_commit" { COMMIT; } + +# Checkpoint with following get_changes forces xmin advancement. We do +# get_changes twice because if one more xl_running_xacts record had slipped +# before our CHECKPOINT, xmin will be advanced only on this record, thus not +# reaching value needed for vacuuming corresponding pg_attribute entry. ALTER of +# composite type is a rare form of DDL which allows T1 to see the tuple which +# will be removed (xmax set) before T1 commits. That is, interlocking doesn't +# forbid modifying catalog after someone read it (and didn't commit yet). +permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_get_changes" "s0_get_changes" "s1_commit" "s0_vacuum" "s0_get_changes" diff --git a/contrib/test_decoding/specs/ondisk_startup.spec b/contrib/test_decoding/specs/ondisk_startup.spec new file mode 100644 index 0000000..96ce87f --- /dev/null +++ b/contrib/test_decoding/specs/ondisk_startup.spec @@ -0,0 +1,45 @@ +# Force usage of ondisk decoding snapshots to test that code path. +setup +{ + DROP TABLE IF EXISTS do_write; + CREATE TABLE do_write(id serial primary key); +} + +teardown +{ + DROP TABLE do_write; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + + +session "s1" +setup { SET synchronous_commit=on; } + +step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');} +step "s1start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false');} +step "s1insert" { INSERT INTO do_write DEFAULT VALUES; } +step "s1checkpoint" { CHECKPOINT; } +step "s1alter" { ALTER TABLE do_write ADD COLUMN addedbys1 int; } + +session "s2" +setup { SET synchronous_commit=on; } + +step "s2b" { BEGIN; } +step "s2txid" { SELECT pg_current_xact_id() IS NULL; } +step "s2alter" { ALTER TABLE do_write ADD COLUMN addedbys2 int; } +step "s2c" { COMMIT; } + + +session "s3" +setup { SET synchronous_commit=on; } + +step "s3b" { BEGIN; } +step "s3txid" { SELECT pg_current_xact_id() IS NULL; } +step "s3c" { COMMIT; } + +# Force usage of ondisk snapshot by starting and not finishing a +# transaction with an assigned xid after consistency has been +# reached. In combination with a checkpoint forcing a snapshot to be +# written and a new restart point computed that'll lead to the usage +# of the snapshot. +permutation "s2b" "s2txid" "s1init" "s3b" "s3txid" "s2alter" "s2c" "s2b" "s2txid" "s3c" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start" diff --git a/contrib/test_decoding/specs/slot_creation_error.spec b/contrib/test_decoding/specs/slot_creation_error.spec new file mode 100644 index 0000000..d1e35bf --- /dev/null +++ b/contrib/test_decoding/specs/slot_creation_error.spec @@ -0,0 +1,41 @@ +# Test that erroring out during logical slot creation is handled properly + +session "s1" +setup { SET synchronous_commit=on; } + +step s1_b { BEGIN; } +step s1_xid { SELECT 'xid' FROM txid_current(); } +step s1_c { COMMIT; } +step s1_cancel_s2 { + SELECT pg_cancel_backend(pid) + FROM pg_stat_activity + WHERE application_name = 'isolation/slot_creation_error/s2'; +} + +step s1_terminate_s2 { + SELECT pg_terminate_backend(pid) + FROM pg_stat_activity + WHERE application_name = 'isolation/slot_creation_error/s2'; +} + +step s1_view_slot { + SELECT slot_name, slot_type, active FROM pg_replication_slots WHERE slot_name = 'slot_creation_error' +} + +step s1_drop_slot { + SELECT pg_drop_replication_slot('slot_creation_error'); +} + +session s2 +setup { SET synchronous_commit=on; } +step s2_init { + SELECT 'init' FROM pg_create_logical_replication_slot('slot_creation_error', 'test_decoding'); +} + +# The tests first start a transaction with an xid assigned in s1, then create +# a slot in s2. The slot creation waits for s1's transaction to end. Instead +# we cancel / terminate s2. +permutation s1_b s1_xid s2_init s1_view_slot s1_cancel_s2(s2_init) s1_view_slot s1_c +permutation s1_b s1_xid s2_init s1_c s1_view_slot s1_drop_slot # check slot creation still works +permutation s1_b s1_xid s2_init s1_terminate_s2(s2_init) s1_c s1_view_slot +# can't run tests after this, due to s2's connection failure diff --git a/contrib/test_decoding/specs/snapshot_transfer.spec b/contrib/test_decoding/specs/snapshot_transfer.spec new file mode 100644 index 0000000..152f2fd --- /dev/null +++ b/contrib/test_decoding/specs/snapshot_transfer.spec @@ -0,0 +1,43 @@ +# Test snapshot transfer from subxact to top-level and receival of later snaps. + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact + DROP TABLE IF EXISTS dummy; + CREATE TABLE dummy(i int); + DROP TABLE IF EXISTS harvest; + CREATE TABLE harvest(apples int, pears int); +} + +teardown +{ + DROP TABLE IF EXISTS harvest; + DROP TABLE IF EXISTS dummy; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0_begin" { BEGIN; } +step "s0_begin_sub0" { SAVEPOINT s0; } +step "s0_log_assignment" { SELECT pg_current_xact_id() IS NULL; } +step "s0_begin_sub1" { SAVEPOINT s1; } +step "s0_sub_get_base_snap" { INSERT INTO dummy VALUES (0); } +step "s0_insert" { INSERT INTO harvest VALUES (1, 2, 3); } +step "s0_end_sub0" { RELEASE SAVEPOINT s0; } +step "s0_end_sub1" { RELEASE SAVEPOINT s1; } +step "s0_commit" { COMMIT; } +step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } + +session "s1" +setup { SET synchronous_commit=on; } +step "s1_produce_new_snap" { ALTER TABLE harvest ADD COLUMN mangos int; } + +# start top-level without base snap, get base snap in subxact, then create new +# snap and make sure it is queued. +permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub0" "s0_commit" "s0_get_changes" + +# In previous test, we firstly associated subxact with xact and only then got +# base snap; now nest one more subxact to get snap first and only then (at +# commit) associate it with toplevel. +permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_begin_sub1" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub1" "s0_end_sub0" "s0_commit" "s0_get_changes" diff --git a/contrib/test_decoding/specs/subxact_without_top.spec b/contrib/test_decoding/specs/subxact_without_top.spec new file mode 100644 index 0000000..76688c7 --- /dev/null +++ b/contrib/test_decoding/specs/subxact_without_top.spec @@ -0,0 +1,63 @@ +# Test decoding of subtransactions whose top-transaction is before restart +# point. Such transactions won't be streamed as we stream only complete +# transactions, but it is good to test that they don't cause any problem. + +setup +{ + SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact + CREATE TABLE harvest(apples integer); + CREATE OR REPLACE FUNCTION subxacts() returns void as $$ + BEGIN + FOR i in 1 .. 128 LOOP + BEGIN + INSERT INTO harvest VALUES (42); + EXCEPTION + WHEN OTHERS THEN + RAISE; + END; + END LOOP; + END; $$LANGUAGE 'plpgsql'; +} + +teardown +{ + DROP TABLE IF EXISTS harvest; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0_begin" { BEGIN; } +step "s0_first_subxact" { + DO LANGUAGE plpgsql $$ + BEGIN + BEGIN + INSERT INTO harvest VALUES (41); + EXCEPTION WHEN OTHERS THEN RAISE; + END; + END $$; +} +step "s0_many_subxacts" { select subxacts(); } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit=on; } +step "s1_begin" { BEGIN; } +step "s1_dml" { INSERT INTO harvest VALUES (43); } +step "s1_commit" { COMMIT; } + +session "s2" +setup { SET synchronous_commit=on; } +step "s2_checkpoint" { CHECKPOINT; } +step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); } +step "s2_get_changes_suppress_output" { SELECT null n FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1') GROUP BY n; } + +# The first checkpoint establishes the potential restart point (aka +# restart_lsn) for the slot after the initial subxact. The second checkpoint +# followed by get_changes will ensure that the potential restart point will +# become the actual restart point. We do get_changes twice because if one +# more xl_running_xacts record had slipped before our s0_commit, then the +# potential restart point won't become actual restart point. The s1's open +# transaction till get_changes holds the potential restart point to our first +# checkpoint location. +permutation "s0_begin" "s0_first_subxact" "s2_checkpoint" "s1_begin" "s1_dml" "s0_many_subxacts" "s0_commit" "s2_checkpoint" "s2_get_changes_suppress_output" "s2_get_changes_suppress_output" "s1_commit" "s2_get_changes" diff --git a/contrib/test_decoding/specs/twophase_snapshot.spec b/contrib/test_decoding/specs/twophase_snapshot.spec new file mode 100644 index 0000000..e8d9567 --- /dev/null +++ b/contrib/test_decoding/specs/twophase_snapshot.spec @@ -0,0 +1,53 @@ +# Test decoding of two-phase transactions during the build of a consistent snapshot. +setup +{ + DROP TABLE IF EXISTS do_write; + CREATE TABLE do_write(id serial primary key); +} + +teardown +{ + DROP TABLE do_write; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + + +session "s1" +setup { SET synchronous_commit=on; } + +step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true);} +step "s1start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');} +step "s1insert" { INSERT INTO do_write DEFAULT VALUES; } + +session "s2" +setup { SET synchronous_commit=on; } + +step "s2b" { BEGIN; } +step "s2txid" { SELECT pg_current_xact_id() IS NULL; } +step "s2c" { COMMIT; } +step "s2insert" { INSERT INTO do_write DEFAULT VALUES; } +step "s2p" { PREPARE TRANSACTION 'test1'; } +step "s2cp" { COMMIT PREPARED 'test1'; } + + +session "s3" +setup { SET synchronous_commit=on; } + +step "s3b" { BEGIN; } +step "s3txid" { SELECT pg_current_xact_id() IS NULL; } +step "s3c" { COMMIT; } + +# Force building of a consistent snapshot between a PREPARE and COMMIT PREPARED +# and ensure that the whole transaction is decoded at the time of COMMIT +# PREPARED. +# +# 's1init' step will initialize the replication slot and cause logical decoding +# to wait in initial starting point till the in-progress transaction in s2 is +# committed. 's2c' step will cause logical decoding to go to initial consistent +# point and wait for in-progress transaction s3 to commit. 's3c' step will cause +# logical decoding to find a consistent point while the transaction s2 is +# prepared and not yet committed. This will cause the first s1start to skip +# prepared transaction s2 as that will be before consistent point. The second +# s1start will allow decoding of skipped prepare along with commit prepared done +# as part of s2cp. +permutation "s2b" "s2txid" "s1init" "s3b" "s3txid" "s2c" "s2b" "s2insert" "s2p" "s3c" "s1insert" "s1start" "s2cp" "s1start" |