diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 25 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 39 |
2 files changed, 54 insertions, 10 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 81fff19..7d4ee48 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -122,7 +122,14 @@ #include "utils/syscache.h" #include "utils/usercontext.h" -static bool table_states_valid = false; +typedef enum +{ + SYNC_TABLE_STATE_NEEDS_REBUILD, + SYNC_TABLE_STATE_REBUILD_STARTED, + SYNC_TABLE_STATE_VALID, +} SyncingTablesState; + +static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD; static List *table_states_not_ready = NIL; static bool FetchTableStates(bool *started_tx); @@ -272,7 +279,7 @@ wait_for_worker_state_change(char expected_state) void invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) { - table_states_valid = false; + table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD; } /* @@ -1556,13 +1563,15 @@ FetchTableStates(bool *started_tx) *started_tx = false; - if (!table_states_valid) + if (table_states_validity != SYNC_TABLE_STATE_VALID) { MemoryContext oldctx; List *rstates; ListCell *lc; SubscriptionRelState *rstate; + table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED; + /* Clean the old lists. */ list_free_deep(table_states_not_ready); table_states_not_ready = NIL; @@ -1596,7 +1605,15 @@ FetchTableStates(bool *started_tx) has_subrels = (table_states_not_ready != NIL) || HasSubscriptionRelations(MySubscription->oid); - table_states_valid = true; + /* + * If the subscription relation cache has been invalidated since we + * entered this routine, we still use and return the relations we just + * finished constructing, to avoid infinite loops, but we leave the + * table states marked as stale so that we'll rebuild it again on next + * access. Otherwise, we mark the table states as valid. + */ + if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED) + table_states_validity = SYNC_TABLE_STATE_VALID; } return has_subrels; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index bb09c40..1f9d0ed 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1321,6 +1321,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, { int last_signaled_pid = 0; bool released_lock = false; + bool terminated = false; + TransactionId initial_effective_xmin = InvalidTransactionId; + TransactionId initial_catalog_effective_xmin = InvalidTransactionId; + XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr; + ReplicationSlotInvalidationCause conflict_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE; for (;;) { @@ -1355,11 +1360,24 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, */ if (s->data.invalidated == RS_INVAL_NONE) { + /* + * The slot's mutex will be released soon, and it is possible that + * those values change since the process holding the slot has been + * terminated (if any), so record them here to ensure that we + * would report the correct conflict cause. + */ + if (!terminated) + { + initial_restart_lsn = s->data.restart_lsn; + initial_effective_xmin = s->effective_xmin; + initial_catalog_effective_xmin = s->effective_catalog_xmin; + } + switch (cause) { case RS_INVAL_WAL_REMOVED: - if (s->data.restart_lsn != InvalidXLogRecPtr && - s->data.restart_lsn < oldestLSN) + if (initial_restart_lsn != InvalidXLogRecPtr && + initial_restart_lsn < oldestLSN) conflict = cause; break; case RS_INVAL_HORIZON: @@ -1368,12 +1386,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, /* invalid DB oid signals a shared relation */ if (dboid != InvalidOid && dboid != s->data.database) break; - if (TransactionIdIsValid(s->effective_xmin) && - TransactionIdPrecedesOrEquals(s->effective_xmin, + if (TransactionIdIsValid(initial_effective_xmin) && + TransactionIdPrecedesOrEquals(initial_effective_xmin, snapshotConflictHorizon)) conflict = cause; - else if (TransactionIdIsValid(s->effective_catalog_xmin) && - TransactionIdPrecedesOrEquals(s->effective_catalog_xmin, + else if (TransactionIdIsValid(initial_catalog_effective_xmin) && + TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin, snapshotConflictHorizon)) conflict = cause; break; @@ -1386,6 +1404,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, } } + /* + * The conflict cause recorded previously should not change while the + * process owning the slot (if any) has been terminated. + */ + Assert(!(conflict_prev != RS_INVAL_NONE && terminated && + conflict_prev != conflict)); + /* if there's no conflict, we're done */ if (conflict == RS_INVAL_NONE) { @@ -1460,6 +1485,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, (void) kill(active_pid, SIGTERM); last_signaled_pid = active_pid; + terminated = true; + conflict_prev = conflict; } /* Wait until the slot is released. */ |