summaryrefslogtreecommitdiffstats
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/tablesync.c25
-rw-r--r--src/backend/replication/slot.c39
2 files changed, 54 insertions, 10 deletions
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 81fff19..7d4ee48 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -122,7 +122,14 @@
#include "utils/syscache.h"
#include "utils/usercontext.h"
-static bool table_states_valid = false;
+typedef enum
+{
+ SYNC_TABLE_STATE_NEEDS_REBUILD,
+ SYNC_TABLE_STATE_REBUILD_STARTED,
+ SYNC_TABLE_STATE_VALID,
+} SyncingTablesState;
+
+static SyncingTablesState table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
static List *table_states_not_ready = NIL;
static bool FetchTableStates(bool *started_tx);
@@ -272,7 +279,7 @@ wait_for_worker_state_change(char expected_state)
void
invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
{
- table_states_valid = false;
+ table_states_validity = SYNC_TABLE_STATE_NEEDS_REBUILD;
}
/*
@@ -1556,13 +1563,15 @@ FetchTableStates(bool *started_tx)
*started_tx = false;
- if (!table_states_valid)
+ if (table_states_validity != SYNC_TABLE_STATE_VALID)
{
MemoryContext oldctx;
List *rstates;
ListCell *lc;
SubscriptionRelState *rstate;
+ table_states_validity = SYNC_TABLE_STATE_REBUILD_STARTED;
+
/* Clean the old lists. */
list_free_deep(table_states_not_ready);
table_states_not_ready = NIL;
@@ -1596,7 +1605,15 @@ FetchTableStates(bool *started_tx)
has_subrels = (table_states_not_ready != NIL) ||
HasSubscriptionRelations(MySubscription->oid);
- table_states_valid = true;
+ /*
+ * If the subscription relation cache has been invalidated since we
+ * entered this routine, we still use and return the relations we just
+ * finished constructing, to avoid infinite loops, but we leave the
+ * table states marked as stale so that we'll rebuild it again on next
+ * access. Otherwise, we mark the table states as valid.
+ */
+ if (table_states_validity == SYNC_TABLE_STATE_REBUILD_STARTED)
+ table_states_validity = SYNC_TABLE_STATE_VALID;
}
return has_subrels;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index bb09c40..1f9d0ed 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1321,6 +1321,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
{
int last_signaled_pid = 0;
bool released_lock = false;
+ bool terminated = false;
+ TransactionId initial_effective_xmin = InvalidTransactionId;
+ TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
+ XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
+ ReplicationSlotInvalidationCause conflict_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
for (;;)
{
@@ -1355,11 +1360,24 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
*/
if (s->data.invalidated == RS_INVAL_NONE)
{
+ /*
+ * The slot's mutex will be released soon, and it is possible that
+ * those values change since the process holding the slot has been
+ * terminated (if any), so record them here to ensure that we
+ * would report the correct conflict cause.
+ */
+ if (!terminated)
+ {
+ initial_restart_lsn = s->data.restart_lsn;
+ initial_effective_xmin = s->effective_xmin;
+ initial_catalog_effective_xmin = s->effective_catalog_xmin;
+ }
+
switch (cause)
{
case RS_INVAL_WAL_REMOVED:
- if (s->data.restart_lsn != InvalidXLogRecPtr &&
- s->data.restart_lsn < oldestLSN)
+ if (initial_restart_lsn != InvalidXLogRecPtr &&
+ initial_restart_lsn < oldestLSN)
conflict = cause;
break;
case RS_INVAL_HORIZON:
@@ -1368,12 +1386,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
/* invalid DB oid signals a shared relation */
if (dboid != InvalidOid && dboid != s->data.database)
break;
- if (TransactionIdIsValid(s->effective_xmin) &&
- TransactionIdPrecedesOrEquals(s->effective_xmin,
+ if (TransactionIdIsValid(initial_effective_xmin) &&
+ TransactionIdPrecedesOrEquals(initial_effective_xmin,
snapshotConflictHorizon))
conflict = cause;
- else if (TransactionIdIsValid(s->effective_catalog_xmin) &&
- TransactionIdPrecedesOrEquals(s->effective_catalog_xmin,
+ else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
+ TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
snapshotConflictHorizon))
conflict = cause;
break;
@@ -1386,6 +1404,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
}
}
+ /*
+ * The conflict cause recorded previously should not change while the
+ * process owning the slot (if any) has been terminated.
+ */
+ Assert(!(conflict_prev != RS_INVAL_NONE && terminated &&
+ conflict_prev != conflict));
+
/* if there's no conflict, we're done */
if (conflict == RS_INVAL_NONE)
{
@@ -1460,6 +1485,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
(void) kill(active_pid, SIGTERM);
last_signaled_pid = active_pid;
+ terminated = true;
+ conflict_prev = conflict;
}
/* Wait until the slot is released. */