summaryrefslogtreecommitdiffstats
path: root/src/backend/access
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/backend/access/brin/brin_bloom.c3
-rw-r--r--src/backend/access/gin/ginbtree.c74
-rw-r--r--src/backend/access/heap/heapam.c41
-rw-r--r--src/backend/access/heap/hio.c2
-rw-r--r--src/backend/access/index/indexam.c49
-rw-r--r--src/backend/access/spgist/spgutils.c29
-rw-r--r--src/backend/access/transam/twophase.c2
-rw-r--r--src/backend/access/transam/xact.c1
-rw-r--r--src/backend/access/transam/xlog.c6
-rw-r--r--src/backend/access/transam/xlogreader.c46
-rw-r--r--src/backend/access/transam/xlogrecovery.c34
11 files changed, 219 insertions, 68 deletions
diff --git a/src/backend/access/brin/brin_bloom.c b/src/backend/access/brin/brin_bloom.c
index 6812ca9..23de868 100644
--- a/src/backend/access/brin/brin_bloom.c
+++ b/src/backend/access/brin/brin_bloom.c
@@ -279,8 +279,7 @@ bloom_init(int ndistinct, double false_positive_rate)
double k; /* number of hash functions */
Assert(ndistinct > 0);
- Assert((false_positive_rate >= BLOOM_MIN_FALSE_POSITIVE_RATE) &&
- (false_positive_rate < BLOOM_MAX_FALSE_POSITIVE_RATE));
+ Assert(false_positive_rate > 0 && false_positive_rate < 1);
/* sizing bloom filter: -(n * ln(p)) / (ln(2))^2 */
nbits = ceil(-(ndistinct * log(false_positive_rate)) / pow(log(2.0), 2));
diff --git a/src/backend/access/gin/ginbtree.c b/src/backend/access/gin/ginbtree.c
index cc6d4e6..8520e37 100644
--- a/src/backend/access/gin/ginbtree.c
+++ b/src/backend/access/gin/ginbtree.c
@@ -28,6 +28,8 @@ static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
Buffer childbuf, GinStatsData *buildStats);
static void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
bool freestack, GinStatsData *buildStats);
+static void ginFinishOldSplit(GinBtree btree, GinBtreeStack *stack,
+ GinStatsData *buildStats, int access);
/*
* Lock buffer by needed method for search.
@@ -109,7 +111,7 @@ ginFindLeafPage(GinBtree btree, bool searchMode,
* encounter on the way.
*/
if (!searchMode && GinPageIsIncompleteSplit(page))
- ginFinishSplit(btree, stack, false, NULL);
+ ginFinishOldSplit(btree, stack, NULL, access);
/*
* ok, page is correctly locked, we should check to move right ..,
@@ -130,7 +132,7 @@ ginFindLeafPage(GinBtree btree, bool searchMode,
TestForOldSnapshot(snapshot, btree->index, page);
if (!searchMode && GinPageIsIncompleteSplit(page))
- ginFinishSplit(btree, stack, false, NULL);
+ ginFinishOldSplit(btree, stack, NULL, access);
}
if (GinPageIsLeaf(page)) /* we found, return locked page */
@@ -166,8 +168,11 @@ ginFindLeafPage(GinBtree btree, bool searchMode,
* Step right from current page.
*
* The next page is locked first, before releasing the current page. This is
- * crucial to protect from concurrent page deletion (see comment in
- * ginDeletePage).
+ * crucial to prevent concurrent VACUUM from deleting a page that we are about
+ * to step to. (The lock-coupling isn't strictly necessary when we are
+ * traversing the tree to find an insert location, because page deletion grabs
+ * a cleanup lock on the root to prevent any concurrent inserts. See Page
+ * deletion section in the README. But there's no harm in doing it always.)
*/
Buffer
ginStepRight(Buffer buffer, Relation index, int lockmode)
@@ -264,7 +269,7 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
ptr->parent = root;
ptr->off = InvalidOffsetNumber;
- ginFinishSplit(btree, ptr, false, NULL);
+ ginFinishOldSplit(btree, ptr, NULL, GIN_EXCLUSIVE);
}
leftmostBlkno = btree->getLeftMostChild(btree, page);
@@ -274,7 +279,11 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
blkno = GinPageGetOpaque(page)->rightlink;
if (blkno == InvalidBlockNumber)
{
- UnlockReleaseBuffer(buffer);
+ /* Link not present in this level */
+ LockBuffer(buffer, GIN_UNLOCK);
+ /* Do not release pin on the root buffer */
+ if (buffer != root->buffer)
+ ReleaseBuffer(buffer);
break;
}
buffer = ginStepRight(buffer, btree->index, GIN_EXCLUSIVE);
@@ -289,7 +298,7 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
ptr->parent = root;
ptr->off = InvalidOffsetNumber;
- ginFinishSplit(btree, ptr, false, NULL);
+ ginFinishOldSplit(btree, ptr, NULL, GIN_EXCLUSIVE);
}
}
@@ -670,15 +679,6 @@ ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
bool done;
bool first = true;
- /*
- * freestack == false when we encounter an incompletely split page during
- * a scan, while freestack == true is used in the normal scenario that a
- * split is finished right after the initial insert.
- */
- if (!freestack)
- elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
- stack->blkno, RelationGetRelationName(btree->index));
-
/* this loop crawls up the stack until the insertion is complete */
do
{
@@ -699,7 +699,7 @@ ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
* would fail.
*/
if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
- ginFinishSplit(btree, parent, false, buildStats);
+ ginFinishOldSplit(btree, parent, buildStats, GIN_EXCLUSIVE);
/* move right if it's needed */
page = BufferGetPage(parent->buffer);
@@ -723,7 +723,7 @@ ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
page = BufferGetPage(parent->buffer);
if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
- ginFinishSplit(btree, parent, false, buildStats);
+ ginFinishOldSplit(btree, parent, buildStats, GIN_EXCLUSIVE);
}
/* insert the downlink */
@@ -760,6 +760,42 @@ ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
}
/*
+ * An entry point to ginFinishSplit() that is used when we stumble upon an
+ * existing incompletely split page in the tree, as opposed to completing a
+ * split that we just made outselves. The difference is that stack->buffer may
+ * be merely share-locked on entry, and will be upgraded to exclusive mode.
+ *
+ * Note: Upgrading the lock momentarily releases it. Doing that in a scan
+ * would not be OK, because a concurrent VACUUM might delete the page while
+ * we're not holding the lock. It's OK in an insert, though, because VACUUM
+ * has a different mechanism that prevents it from running concurrently with
+ * inserts. (Namely, it holds a cleanup lock on the root.)
+ */
+static void
+ginFinishOldSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats, int access)
+{
+ elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
+ stack->blkno, RelationGetRelationName(btree->index));
+
+ if (access == GIN_SHARE)
+ {
+ LockBuffer(stack->buffer, GIN_UNLOCK);
+ LockBuffer(stack->buffer, GIN_EXCLUSIVE);
+
+ if (!GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
+ {
+ /*
+ * Someone else already completed the split while we were not
+ * holding the lock.
+ */
+ return;
+ }
+ }
+
+ ginFinishSplit(btree, stack, false, buildStats);
+}
+
+/*
* Insert a value to tree described by stack.
*
* The value to be inserted is given in 'insertdata'. Its format depends
@@ -779,7 +815,7 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, void *insertdata,
/* If the leaf page was incompletely split, finish the split first */
if (GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
- ginFinishSplit(btree, stack, false, buildStats);
+ ginFinishOldSplit(btree, stack, buildStats, GIN_EXCLUSIVE);
done = ginPlaceToPage(btree, stack,
insertdata, InvalidBlockNumber,
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index c74fbd0..30bc88e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2856,13 +2856,7 @@ l1:
result = TM_Deleted;
}
- if (crosscheck != InvalidSnapshot && result == TM_Ok)
- {
- /* Perform additional check for transaction-snapshot mode RI updates */
- if (!HeapTupleSatisfiesVisibility(&tp, crosscheck, buffer))
- result = TM_Updated;
- }
-
+ /* sanity check the result HeapTupleSatisfiesUpdate() and the logic above */
if (result != TM_Ok)
{
Assert(result == TM_SelfModified ||
@@ -2872,6 +2866,17 @@ l1:
Assert(!(tp.t_data->t_infomask & HEAP_XMAX_INVALID));
Assert(result != TM_Updated ||
!ItemPointerEquals(&tp.t_self, &tp.t_data->t_ctid));
+ }
+
+ if (crosscheck != InvalidSnapshot && result == TM_Ok)
+ {
+ /* Perform additional check for transaction-snapshot mode RI updates */
+ if (!HeapTupleSatisfiesVisibility(&tp, crosscheck, buffer))
+ result = TM_Updated;
+ }
+
+ if (result != TM_Ok)
+ {
tmfd->ctid = tp.t_data->t_ctid;
tmfd->xmax = HeapTupleHeaderGetUpdateXid(tp.t_data);
if (result == TM_SelfModified)
@@ -3483,16 +3488,7 @@ l2:
result = TM_Deleted;
}
- if (crosscheck != InvalidSnapshot && result == TM_Ok)
- {
- /* Perform additional check for transaction-snapshot mode RI updates */
- if (!HeapTupleSatisfiesVisibility(&oldtup, crosscheck, buffer))
- {
- result = TM_Updated;
- Assert(!ItemPointerEquals(&oldtup.t_self, &oldtup.t_data->t_ctid));
- }
- }
-
+ /* Sanity check the result HeapTupleSatisfiesUpdate() and the logic above */
if (result != TM_Ok)
{
Assert(result == TM_SelfModified ||
@@ -3502,6 +3498,17 @@ l2:
Assert(!(oldtup.t_data->t_infomask & HEAP_XMAX_INVALID));
Assert(result != TM_Updated ||
!ItemPointerEquals(&oldtup.t_self, &oldtup.t_data->t_ctid));
+ }
+
+ if (crosscheck != InvalidSnapshot && result == TM_Ok)
+ {
+ /* Perform additional check for transaction-snapshot mode RI updates */
+ if (!HeapTupleSatisfiesVisibility(&oldtup, crosscheck, buffer))
+ result = TM_Updated;
+ }
+
+ if (result != TM_Ok)
+ {
tmfd->ctid = oldtup.t_data->t_ctid;
tmfd->xmax = HeapTupleHeaderGetUpdateXid(oldtup.t_data);
if (result == TM_SelfModified)
diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c
index b0ece66..6e1ff45 100644
--- a/src/backend/access/heap/hio.c
+++ b/src/backend/access/heap/hio.c
@@ -387,7 +387,7 @@ RelationGetBufferForTuple(Relation relation, Size len,
* on, as cached in the BulkInsertState or relcache entry. If that
* doesn't work, we ask the Free Space Map to locate a suitable page.
* Since the FSM's info might be out of date, we have to be prepared to
- * loop around and retry multiple times. (To insure this isn't an infinite
+ * loop around and retry multiple times. (To ensure this isn't an infinite
* loop, we must update the FSM with the correct amount of free space on
* each page that proves not to be suitable.) If the FSM has no record of
* a page with enough free space, we give up and extend the relation.
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index fe80b8b..cd5f07f 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -107,6 +107,7 @@ do { \
static IndexScanDesc index_beginscan_internal(Relation indexRelation,
int nkeys, int norderbys, Snapshot snapshot,
ParallelIndexScanDesc pscan, bool temp_snap);
+static inline void validate_relation_kind(Relation r);
/* ----------------------------------------------------------------
@@ -135,12 +136,30 @@ index_open(Oid relationId, LOCKMODE lockmode)
r = relation_open(relationId, lockmode);
- if (r->rd_rel->relkind != RELKIND_INDEX &&
- r->rd_rel->relkind != RELKIND_PARTITIONED_INDEX)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("\"%s\" is not an index",
- RelationGetRelationName(r))));
+ validate_relation_kind(r);
+
+ return r;
+}
+
+/* ----------------
+ * try_index_open - open a index relation by relation OID
+ *
+ * Same as index_open, except return NULL instead of failing
+ * if the relation does not exist.
+ * ----------------
+ */
+Relation
+try_index_open(Oid relationId, LOCKMODE lockmode)
+{
+ Relation r;
+
+ r = try_relation_open(relationId, lockmode);
+
+ /* leave if index does not exist */
+ if (!r)
+ return NULL;
+
+ validate_relation_kind(r);
return r;
}
@@ -169,6 +188,24 @@ index_close(Relation relation, LOCKMODE lockmode)
}
/* ----------------
+ * validate_relation_kind - check the relation's kind
+ *
+ * Make sure relkind is an index or a partitioned index.
+ * ----------------
+ */
+static inline void
+validate_relation_kind(Relation r)
+{
+ if (r->rd_rel->relkind != RELKIND_INDEX &&
+ r->rd_rel->relkind != RELKIND_PARTITIONED_INDEX)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is not an index",
+ RelationGetRelationName(r))));
+}
+
+
+/* ----------------
* index_insert - insert an index tuple into a relation
* ----------------
*/
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index f2da02e..bff33f3 100644
--- a/src/backend/access/spgist/spgutils.c
+++ b/src/backend/access/spgist/spgutils.c
@@ -185,8 +185,6 @@ spgGetCache(Relation index)
Oid atttype;
spgConfigIn in;
FmgrInfo *procinfo;
- Buffer metabuffer;
- SpGistMetaPageData *metadata;
cache = MemoryContextAllocZero(index->rd_indexcxt,
sizeof(SpGistCache));
@@ -254,19 +252,28 @@ spgGetCache(Relation index)
fillTypeDesc(&cache->attPrefixType, cache->config.prefixType);
fillTypeDesc(&cache->attLabelType, cache->config.labelType);
- /* Last, get the lastUsedPages data from the metapage */
- metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
- LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
+ /*
+ * Finally, if it's a real index (not a partitioned one), get the
+ * lastUsedPages data from the metapage
+ */
+ if (index->rd_rel->relkind != RELKIND_PARTITIONED_INDEX)
+ {
+ Buffer metabuffer;
+ SpGistMetaPageData *metadata;
+
+ metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
+ LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
- metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
+ metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
- if (metadata->magicNumber != SPGIST_MAGIC_NUMBER)
- elog(ERROR, "index \"%s\" is not an SP-GiST index",
- RelationGetRelationName(index));
+ if (metadata->magicNumber != SPGIST_MAGIC_NUMBER)
+ elog(ERROR, "index \"%s\" is not an SP-GiST index",
+ RelationGetRelationName(index));
- cache->lastUsedPages = metadata->lastUsedPages;
+ cache->lastUsedPages = metadata->lastUsedPages;
- UnlockReleaseBuffer(metabuffer);
+ UnlockReleaseBuffer(metabuffer);
+ }
index->rd_amcache = (void *) cache;
}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 5293c69..ca7037e 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -486,7 +486,7 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
proc->roleId = owner;
proc->tempNamespaceId = InvalidOid;
proc->isBackgroundWorker = false;
- proc->lwWaiting = false;
+ proc->lwWaiting = LW_WS_NOT_WAITING;
proc->lwWaitMode = 0;
proc->waitLock = NULL;
proc->waitProcLock = NULL;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e0c7ad1..7a3d9b4 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5265,6 +5265,7 @@ PushTransaction(void)
s->blockState = TBLOCK_SUBBEGIN;
GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
s->prevXactReadOnly = XactReadOnly;
+ s->startedInRecovery = p->startedInRecovery;
s->parallelModeLevel = 0;
s->topXidLogged = false;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 59f94b0..9795ce7 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -974,8 +974,10 @@ XLogInsertRecord(XLogRecData *rdata,
if (!debug_reader)
debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
- XL_ROUTINE(), NULL);
-
+ XL_ROUTINE(.page_read = NULL,
+ .segment_open = NULL,
+ .segment_close = NULL),
+ NULL);
if (!debug_reader)
{
appendStringInfoString(&buf, "error decoding record: out of memory while allocating a WAL reading processor");
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index c15da9d..e7ad331 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -457,18 +457,37 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversi
if (state->decode_buffer_tail >= state->decode_buffer_head)
{
/* Empty, or tail is to the right of head. */
- if (state->decode_buffer_tail + required_space <=
- state->decode_buffer + state->decode_buffer_size)
+ if (required_space <=
+ state->decode_buffer_size -
+ (state->decode_buffer_tail - state->decode_buffer))
{
- /* There is space between tail and end. */
+ /*-
+ * There is space between tail and end.
+ *
+ * +-----+--------------------+-----+
+ * | |////////////////////|here!|
+ * +-----+--------------------+-----+
+ * ^ ^
+ * | |
+ * h t
+ */
decoded = (DecodedXLogRecord *) state->decode_buffer_tail;
decoded->oversized = false;
return decoded;
}
- else if (state->decode_buffer + required_space <
- state->decode_buffer_head)
+ else if (required_space <
+ state->decode_buffer_head - state->decode_buffer)
{
- /* There is space between start and head. */
+ /*-
+ * There is space between start and head.
+ *
+ * +-----+--------------------+-----+
+ * |here!|////////////////////| |
+ * +-----+--------------------+-----+
+ * ^ ^
+ * | |
+ * h t
+ */
decoded = (DecodedXLogRecord *) state->decode_buffer;
decoded->oversized = false;
return decoded;
@@ -477,10 +496,19 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversi
else
{
/* Tail is to the left of head. */
- if (state->decode_buffer_tail + required_space <
- state->decode_buffer_head)
+ if (required_space <
+ state->decode_buffer_head - state->decode_buffer_tail)
{
- /* There is space between tail and head. */
+ /*-
+ * There is space between tail and head.
+ *
+ * +-----+--------------------+-----+
+ * |/////|here! |/////|
+ * +-----+--------------------+-----+
+ * ^ ^
+ * | |
+ * t h
+ */
decoded = (DecodedXLogRecord *) state->decode_buffer_tail;
decoded->oversized = false;
return decoded;
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 166f7b7..1503b21 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -622,6 +622,22 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
EnableStandbyMode();
/*
+ * Omitting backup_label when creating a new replica, PITR node etc.
+ * unfortunately is a common cause of corruption. Logging that
+ * backup_label was used makes it a bit easier to exclude that as the
+ * cause of observed corruption.
+ *
+ * Do so before we try to read the checkpoint record (which can fail),
+ * as otherwise it can be hard to understand why a checkpoint other
+ * than ControlFile->checkPoint is used.
+ */
+ ereport(LOG,
+ (errmsg("starting backup recovery with redo LSN %X/%X, checkpoint LSN %X/%X, on timeline ID %u",
+ LSN_FORMAT_ARGS(RedoStartLSN),
+ LSN_FORMAT_ARGS(CheckPointLoc),
+ CheckPointTLI)));
+
+ /*
* When a backup_label file is present, we want to roll forward from
* the checkpoint it identifies, rather than using pg_control.
*/
@@ -759,6 +775,16 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
EnableStandbyMode();
}
+ /*
+ * For the same reason as when starting up with backup_label present,
+ * emit a log message when we continue initializing from a base
+ * backup.
+ */
+ if (!XLogRecPtrIsInvalid(ControlFile->backupStartPoint))
+ ereport(LOG,
+ (errmsg("restarting backup recovery with redo LSN %X/%X",
+ LSN_FORMAT_ARGS(ControlFile->backupStartPoint))));
+
/* Get the last valid checkpoint record. */
CheckPointLoc = ControlFile->checkPoint;
CheckPointTLI = ControlFile->checkPointCopy.ThisTimeLineID;
@@ -2102,6 +2128,9 @@ CheckRecoveryConsistency(void)
if (!XLogRecPtrIsInvalid(backupEndPoint) &&
backupEndPoint <= lastReplayedEndRecPtr)
{
+ XLogRecPtr saveBackupStartPoint = backupStartPoint;
+ XLogRecPtr saveBackupEndPoint = backupEndPoint;
+
elog(DEBUG1, "end of backup reached");
/*
@@ -2112,6 +2141,11 @@ CheckRecoveryConsistency(void)
backupStartPoint = InvalidXLogRecPtr;
backupEndPoint = InvalidXLogRecPtr;
backupEndRequired = false;
+
+ ereport(LOG,
+ (errmsg("completed backup recovery with redo LSN %X/%X and end LSN %X/%X",
+ LSN_FORMAT_ARGS(saveBackupStartPoint),
+ LSN_FORMAT_ARGS(saveBackupEndPoint))));
}
/*