summaryrefslogtreecommitdiffstats
path: root/src/backend/access/transam/xloginsert.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xloginsert.c')
-rw-r--r--src/backend/access/transam/xloginsert.c1318
1 files changed, 1318 insertions, 0 deletions
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
new file mode 100644
index 0000000..35cc055
--- /dev/null
+++ b/src/backend/access/transam/xloginsert.c
@@ -0,0 +1,1318 @@
+/*-------------------------------------------------------------------------
+ *
+ * xloginsert.c
+ * Functions for constructing WAL records
+ *
+ * Constructing a WAL record begins with a call to XLogBeginInsert,
+ * followed by a number of XLogRegister* calls. The registered data is
+ * collected in private working memory, and finally assembled into a chain
+ * of XLogRecData structs by a call to XLogRecordAssemble(). See
+ * access/transam/README for details.
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/xloginsert.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif
+
+#ifdef USE_ZSTD
+#include <zstd.h>
+#endif
+
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xloginsert.h"
+#include "catalog/pg_control.h"
+#include "common/pg_lzcompress.h"
+#include "executor/instrument.h"
+#include "miscadmin.h"
+#include "pg_trace.h"
+#include "replication/origin.h"
+#include "storage/bufmgr.h"
+#include "storage/proc.h"
+#include "utils/memutils.h"
+
+/*
+ * Guess the maximum buffer size required to store a compressed version of
+ * backup block image.
+ */
+#ifdef USE_LZ4
+#define LZ4_MAX_BLCKSZ LZ4_COMPRESSBOUND(BLCKSZ)
+#else
+#define LZ4_MAX_BLCKSZ 0
+#endif
+
+#ifdef USE_ZSTD
+#define ZSTD_MAX_BLCKSZ ZSTD_COMPRESSBOUND(BLCKSZ)
+#else
+#define ZSTD_MAX_BLCKSZ 0
+#endif
+
+#define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
+
+/* Buffer size required to store a compressed version of backup block image */
+#define COMPRESS_BUFSIZE Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
+
+/*
+ * For each block reference registered with XLogRegisterBuffer, we fill in
+ * a registered_buffer struct.
+ */
+typedef struct
+{
+ bool in_use; /* is this slot in use? */
+ uint8 flags; /* REGBUF_* flags */
+ RelFileNode rnode; /* identifies the relation and block */
+ ForkNumber forkno;
+ BlockNumber block;
+ Page page; /* page content */
+ uint32 rdata_len; /* total length of data in rdata chain */
+ XLogRecData *rdata_head; /* head of the chain of data registered with
+ * this block */
+ XLogRecData *rdata_tail; /* last entry in the chain, or &rdata_head if
+ * empty */
+
+ XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to
+ * backup block data in XLogRecordAssemble() */
+
+ /* buffer to store a compressed version of backup block image */
+ char compressed_page[COMPRESS_BUFSIZE];
+} registered_buffer;
+
+static registered_buffer *registered_buffers;
+static int max_registered_buffers; /* allocated size */
+static int max_registered_block_id = 0; /* highest block_id + 1 currently
+ * registered */
+
+/*
+ * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
+ * with XLogRegisterData(...).
+ */
+static XLogRecData *mainrdata_head;
+static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
+static uint32 mainrdata_len; /* total # of bytes in chain */
+
+/* flags for the in-progress insertion */
+static uint8 curinsert_flags = 0;
+
+/*
+ * These are used to hold the record header while constructing a record.
+ * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
+ * because we want it to be MAXALIGNed and padding bytes zeroed.
+ *
+ * For simplicity, it's allocated large enough to hold the headers for any
+ * WAL record.
+ */
+static XLogRecData hdr_rdt;
+static char *hdr_scratch = NULL;
+
+#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
+#define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char))
+
+#define HEADER_SCRATCH_SIZE \
+ (SizeOfXLogRecord + \
+ MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
+ SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \
+ SizeOfXLogTransactionId)
+
+/*
+ * An array of XLogRecData structs, to hold registered data.
+ */
+static XLogRecData *rdatas;
+static int num_rdatas; /* entries currently used */
+static int max_rdatas; /* allocated size */
+
+static bool begininsert_called = false;
+
+/* Memory context to hold the registered buffer and data references. */
+static MemoryContext xloginsert_cxt;
+
+static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
+ XLogRecPtr RedoRecPtr, bool doPageWrites,
+ XLogRecPtr *fpw_lsn, int *num_fpi,
+ bool *topxid_included);
+static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
+ uint16 hole_length, char *dest, uint16 *dlen);
+
+/*
+ * Begin constructing a WAL record. This must be called before the
+ * XLogRegister* functions and XLogInsert().
+ */
+void
+XLogBeginInsert(void)
+{
+ Assert(max_registered_block_id == 0);
+ Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
+ Assert(mainrdata_len == 0);
+
+ /* cross-check on whether we should be here or not */
+ if (!XLogInsertAllowed())
+ elog(ERROR, "cannot make new WAL entries during recovery");
+
+ if (begininsert_called)
+ elog(ERROR, "XLogBeginInsert was already called");
+
+ begininsert_called = true;
+}
+
+/*
+ * Ensure that there are enough buffer and data slots in the working area,
+ * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
+ * calls.
+ *
+ * There is always space for a small number of buffers and data chunks, enough
+ * for most record types. This function is for the exceptional cases that need
+ * more.
+ */
+void
+XLogEnsureRecordSpace(int max_block_id, int ndatas)
+{
+ int nbuffers;
+
+ /*
+ * This must be called before entering a critical section, because
+ * allocating memory inside a critical section can fail. repalloc() will
+ * check the same, but better to check it here too so that we fail
+ * consistently even if the arrays happen to be large enough already.
+ */
+ Assert(CritSectionCount == 0);
+
+ /* the minimum values can't be decreased */
+ if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
+ max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
+ if (ndatas < XLR_NORMAL_RDATAS)
+ ndatas = XLR_NORMAL_RDATAS;
+
+ if (max_block_id > XLR_MAX_BLOCK_ID)
+ elog(ERROR, "maximum number of WAL record block references exceeded");
+ nbuffers = max_block_id + 1;
+
+ if (nbuffers > max_registered_buffers)
+ {
+ registered_buffers = (registered_buffer *)
+ repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
+
+ /*
+ * At least the padding bytes in the structs must be zeroed, because
+ * they are included in WAL data, but initialize it all for tidiness.
+ */
+ MemSet(&registered_buffers[max_registered_buffers], 0,
+ (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
+ max_registered_buffers = nbuffers;
+ }
+
+ if (ndatas > max_rdatas)
+ {
+ rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
+ max_rdatas = ndatas;
+ }
+}
+
+/*
+ * Reset WAL record construction buffers.
+ */
+void
+XLogResetInsertion(void)
+{
+ int i;
+
+ for (i = 0; i < max_registered_block_id; i++)
+ registered_buffers[i].in_use = false;
+
+ num_rdatas = 0;
+ max_registered_block_id = 0;
+ mainrdata_len = 0;
+ mainrdata_last = (XLogRecData *) &mainrdata_head;
+ curinsert_flags = 0;
+ begininsert_called = false;
+}
+
+/*
+ * Register a reference to a buffer with the WAL record being constructed.
+ * This must be called for every page that the WAL-logged operation modifies.
+ */
+void
+XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
+{
+ registered_buffer *regbuf;
+
+ /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
+ Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
+ Assert(begininsert_called);
+
+ if (block_id >= max_registered_block_id)
+ {
+ if (block_id >= max_registered_buffers)
+ elog(ERROR, "too many registered buffers");
+ max_registered_block_id = block_id + 1;
+ }
+
+ regbuf = &registered_buffers[block_id];
+
+ BufferGetTag(buffer, &regbuf->rnode, &regbuf->forkno, &regbuf->block);
+ regbuf->page = BufferGetPage(buffer);
+ regbuf->flags = flags;
+ regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
+ regbuf->rdata_len = 0;
+
+ /*
+ * Check that this page hasn't already been registered with some other
+ * block_id.
+ */
+#ifdef USE_ASSERT_CHECKING
+ {
+ int i;
+
+ for (i = 0; i < max_registered_block_id; i++)
+ {
+ registered_buffer *regbuf_old = &registered_buffers[i];
+
+ if (i == block_id || !regbuf_old->in_use)
+ continue;
+
+ Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
+ regbuf_old->forkno != regbuf->forkno ||
+ regbuf_old->block != regbuf->block);
+ }
+ }
+#endif
+
+ regbuf->in_use = true;
+}
+
+/*
+ * Like XLogRegisterBuffer, but for registering a block that's not in the
+ * shared buffer pool (i.e. when you don't have a Buffer for it).
+ */
+void
+XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum,
+ BlockNumber blknum, Page page, uint8 flags)
+{
+ registered_buffer *regbuf;
+
+ Assert(begininsert_called);
+
+ if (block_id >= max_registered_block_id)
+ max_registered_block_id = block_id + 1;
+
+ if (block_id >= max_registered_buffers)
+ elog(ERROR, "too many registered buffers");
+
+ regbuf = &registered_buffers[block_id];
+
+ regbuf->rnode = *rnode;
+ regbuf->forkno = forknum;
+ regbuf->block = blknum;
+ regbuf->page = page;
+ regbuf->flags = flags;
+ regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
+ regbuf->rdata_len = 0;
+
+ /*
+ * Check that this page hasn't already been registered with some other
+ * block_id.
+ */
+#ifdef USE_ASSERT_CHECKING
+ {
+ int i;
+
+ for (i = 0; i < max_registered_block_id; i++)
+ {
+ registered_buffer *regbuf_old = &registered_buffers[i];
+
+ if (i == block_id || !regbuf_old->in_use)
+ continue;
+
+ Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
+ regbuf_old->forkno != regbuf->forkno ||
+ regbuf_old->block != regbuf->block);
+ }
+ }
+#endif
+
+ regbuf->in_use = true;
+}
+
+/*
+ * Add data to the WAL record that's being constructed.
+ *
+ * The data is appended to the "main chunk", available at replay with
+ * XLogRecGetData().
+ */
+void
+XLogRegisterData(char *data, int len)
+{
+ XLogRecData *rdata;
+
+ Assert(begininsert_called);
+
+ if (num_rdatas >= max_rdatas)
+ elog(ERROR, "too much WAL data");
+ rdata = &rdatas[num_rdatas++];
+
+ rdata->data = data;
+ rdata->len = len;
+
+ /*
+ * we use the mainrdata_last pointer to track the end of the chain, so no
+ * need to clear 'next' here.
+ */
+
+ mainrdata_last->next = rdata;
+ mainrdata_last = rdata;
+
+ mainrdata_len += len;
+}
+
+/*
+ * Add buffer-specific data to the WAL record that's being constructed.
+ *
+ * Block_id must reference a block previously registered with
+ * XLogRegisterBuffer(). If this is called more than once for the same
+ * block_id, the data is appended.
+ *
+ * The maximum amount of data that can be registered per block is 65535
+ * bytes. That should be plenty; if you need more than BLCKSZ bytes to
+ * reconstruct the changes to the page, you might as well just log a full
+ * copy of it. (the "main data" that's not associated with a block is not
+ * limited)
+ */
+void
+XLogRegisterBufData(uint8 block_id, char *data, int len)
+{
+ registered_buffer *regbuf;
+ XLogRecData *rdata;
+
+ Assert(begininsert_called);
+
+ /* find the registered buffer struct */
+ regbuf = &registered_buffers[block_id];
+ if (!regbuf->in_use)
+ elog(ERROR, "no block with id %d registered with WAL insertion",
+ block_id);
+
+ if (num_rdatas >= max_rdatas)
+ elog(ERROR, "too much WAL data");
+ rdata = &rdatas[num_rdatas++];
+
+ rdata->data = data;
+ rdata->len = len;
+
+ regbuf->rdata_tail->next = rdata;
+ regbuf->rdata_tail = rdata;
+ regbuf->rdata_len += len;
+}
+
+/*
+ * Set insert status flags for the upcoming WAL record.
+ *
+ * The flags that can be used here are:
+ * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be
+ * included in the record.
+ * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for
+ * durability, which allows to avoid triggering WAL archiving and other
+ * background activity.
+ */
+void
+XLogSetRecordFlags(uint8 flags)
+{
+ Assert(begininsert_called);
+ curinsert_flags |= flags;
+}
+
+/*
+ * Insert an XLOG record having the specified RMID and info bytes, with the
+ * body of the record being the data and buffer references registered earlier
+ * with XLogRegister* calls.
+ *
+ * Returns XLOG pointer to end of record (beginning of next record).
+ * This can be used as LSN for data pages affected by the logged action.
+ * (LSN is the XLOG point up to which the XLOG must be flushed to disk
+ * before the data page can be written out. This implements the basic
+ * WAL rule "write the log before the data".)
+ */
+XLogRecPtr
+XLogInsert(RmgrId rmid, uint8 info)
+{
+ XLogRecPtr EndPos;
+
+ /* XLogBeginInsert() must have been called. */
+ if (!begininsert_called)
+ elog(ERROR, "XLogBeginInsert was not called");
+
+ /*
+ * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
+ * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
+ */
+ if ((info & ~(XLR_RMGR_INFO_MASK |
+ XLR_SPECIAL_REL_UPDATE |
+ XLR_CHECK_CONSISTENCY)) != 0)
+ elog(PANIC, "invalid xlog info mask %02X", info);
+
+ TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
+
+ /*
+ * In bootstrap mode, we don't actually log anything but XLOG resources;
+ * return a phony record pointer.
+ */
+ if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
+ {
+ XLogResetInsertion();
+ EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
+ return EndPos;
+ }
+
+ do
+ {
+ XLogRecPtr RedoRecPtr;
+ bool doPageWrites;
+ bool topxid_included = false;
+ XLogRecPtr fpw_lsn;
+ XLogRecData *rdt;
+ int num_fpi = 0;
+
+ /*
+ * Get values needed to decide whether to do full-page writes. Since
+ * we don't yet have an insertion lock, these could change under us,
+ * but XLogInsertRecord will recheck them once it has a lock.
+ */
+ GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
+
+ rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
+ &fpw_lsn, &num_fpi, &topxid_included);
+
+ EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
+ topxid_included);
+ } while (EndPos == InvalidXLogRecPtr);
+
+ XLogResetInsertion();
+
+ return EndPos;
+}
+
+/*
+ * Assemble a WAL record from the registered data and buffers into an
+ * XLogRecData chain, ready for insertion with XLogInsertRecord().
+ *
+ * The record header fields are filled in, except for the xl_prev field. The
+ * calculated CRC does not include the record header yet.
+ *
+ * If there are any registered buffers, and a full-page image was not taken
+ * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This
+ * signals that the assembled record is only good for insertion on the
+ * assumption that the RedoRecPtr and doPageWrites values were up-to-date.
+ *
+ * *topxid_included is set if the topmost transaction ID is logged with the
+ * current subtransaction.
+ */
+static XLogRecData *
+XLogRecordAssemble(RmgrId rmid, uint8 info,
+ XLogRecPtr RedoRecPtr, bool doPageWrites,
+ XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included)
+{
+ XLogRecData *rdt;
+ uint32 total_len = 0;
+ int block_id;
+ pg_crc32c rdata_crc;
+ registered_buffer *prev_regbuf = NULL;
+ XLogRecData *rdt_datas_last;
+ XLogRecord *rechdr;
+ char *scratch = hdr_scratch;
+
+ /*
+ * Note: this function can be called multiple times for the same record.
+ * All the modifications we do to the rdata chains below must handle that.
+ */
+
+ /* The record begins with the fixed-size header */
+ rechdr = (XLogRecord *) scratch;
+ scratch += SizeOfXLogRecord;
+
+ hdr_rdt.next = NULL;
+ rdt_datas_last = &hdr_rdt;
+ hdr_rdt.data = hdr_scratch;
+
+ /*
+ * Enforce consistency checks for this record if user is looking for it.
+ * Do this before at the beginning of this routine to give the possibility
+ * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
+ * a record.
+ */
+ if (wal_consistency_checking[rmid])
+ info |= XLR_CHECK_CONSISTENCY;
+
+ /*
+ * Make an rdata chain containing all the data portions of all block
+ * references. This includes the data for full-page images. Also append
+ * the headers for the block references in the scratch buffer.
+ */
+ *fpw_lsn = InvalidXLogRecPtr;
+ for (block_id = 0; block_id < max_registered_block_id; block_id++)
+ {
+ registered_buffer *regbuf = &registered_buffers[block_id];
+ bool needs_backup;
+ bool needs_data;
+ XLogRecordBlockHeader bkpb;
+ XLogRecordBlockImageHeader bimg;
+ XLogRecordBlockCompressHeader cbimg = {0};
+ bool samerel;
+ bool is_compressed = false;
+ bool include_image;
+
+ if (!regbuf->in_use)
+ continue;
+
+ /* Determine if this block needs to be backed up */
+ if (regbuf->flags & REGBUF_FORCE_IMAGE)
+ needs_backup = true;
+ else if (regbuf->flags & REGBUF_NO_IMAGE)
+ needs_backup = false;
+ else if (!doPageWrites)
+ needs_backup = false;
+ else
+ {
+ /*
+ * We assume page LSN is first data on *every* page that can be
+ * passed to XLogInsert, whether it has the standard page layout
+ * or not.
+ */
+ XLogRecPtr page_lsn = PageGetLSN(regbuf->page);
+
+ needs_backup = (page_lsn <= RedoRecPtr);
+ if (!needs_backup)
+ {
+ if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
+ *fpw_lsn = page_lsn;
+ }
+ }
+
+ /* Determine if the buffer data needs to included */
+ if (regbuf->rdata_len == 0)
+ needs_data = false;
+ else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
+ needs_data = true;
+ else
+ needs_data = !needs_backup;
+
+ bkpb.id = block_id;
+ bkpb.fork_flags = regbuf->forkno;
+ bkpb.data_length = 0;
+
+ if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
+ bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
+
+ /*
+ * If needs_backup is true or WAL checking is enabled for current
+ * resource manager, log a full-page write for the current block.
+ */
+ include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
+
+ if (include_image)
+ {
+ Page page = regbuf->page;
+ uint16 compressed_len = 0;
+
+ /*
+ * The page needs to be backed up, so calculate its hole length
+ * and offset.
+ */
+ if (regbuf->flags & REGBUF_STANDARD)
+ {
+ /* Assume we can omit data between pd_lower and pd_upper */
+ uint16 lower = ((PageHeader) page)->pd_lower;
+ uint16 upper = ((PageHeader) page)->pd_upper;
+
+ if (lower >= SizeOfPageHeaderData &&
+ upper > lower &&
+ upper <= BLCKSZ)
+ {
+ bimg.hole_offset = lower;
+ cbimg.hole_length = upper - lower;
+ }
+ else
+ {
+ /* No "hole" to remove */
+ bimg.hole_offset = 0;
+ cbimg.hole_length = 0;
+ }
+ }
+ else
+ {
+ /* Not a standard page header, don't try to eliminate "hole" */
+ bimg.hole_offset = 0;
+ cbimg.hole_length = 0;
+ }
+
+ /*
+ * Try to compress a block image if wal_compression is enabled
+ */
+ if (wal_compression != WAL_COMPRESSION_NONE)
+ {
+ is_compressed =
+ XLogCompressBackupBlock(page, bimg.hole_offset,
+ cbimg.hole_length,
+ regbuf->compressed_page,
+ &compressed_len);
+ }
+
+ /*
+ * Fill in the remaining fields in the XLogRecordBlockHeader
+ * struct
+ */
+ bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
+
+ /* Report a full page image constructed for the WAL record */
+ *num_fpi += 1;
+
+ /*
+ * Construct XLogRecData entries for the page content.
+ */
+ rdt_datas_last->next = &regbuf->bkp_rdatas[0];
+ rdt_datas_last = rdt_datas_last->next;
+
+ bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
+
+ /*
+ * If WAL consistency checking is enabled for the resource manager
+ * of this WAL record, a full-page image is included in the record
+ * for the block modified. During redo, the full-page is replayed
+ * only if BKPIMAGE_APPLY is set.
+ */
+ if (needs_backup)
+ bimg.bimg_info |= BKPIMAGE_APPLY;
+
+ if (is_compressed)
+ {
+ /* The current compression is stored in the WAL record */
+ bimg.length = compressed_len;
+
+ /* Set the compression method used for this block */
+ switch ((WalCompression) wal_compression)
+ {
+ case WAL_COMPRESSION_PGLZ:
+ bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
+ break;
+
+ case WAL_COMPRESSION_LZ4:
+#ifdef USE_LZ4
+ bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
+#else
+ elog(ERROR, "LZ4 is not supported by this build");
+#endif
+ break;
+
+ case WAL_COMPRESSION_ZSTD:
+#ifdef USE_ZSTD
+ bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
+#else
+ elog(ERROR, "zstd is not supported by this build");
+#endif
+ break;
+
+ case WAL_COMPRESSION_NONE:
+ Assert(false); /* cannot happen */
+ break;
+ /* no default case, so that compiler will warn */
+ }
+
+ rdt_datas_last->data = regbuf->compressed_page;
+ rdt_datas_last->len = compressed_len;
+ }
+ else
+ {
+ bimg.length = BLCKSZ - cbimg.hole_length;
+
+ if (cbimg.hole_length == 0)
+ {
+ rdt_datas_last->data = page;
+ rdt_datas_last->len = BLCKSZ;
+ }
+ else
+ {
+ /* must skip the hole */
+ rdt_datas_last->data = page;
+ rdt_datas_last->len = bimg.hole_offset;
+
+ rdt_datas_last->next = &regbuf->bkp_rdatas[1];
+ rdt_datas_last = rdt_datas_last->next;
+
+ rdt_datas_last->data =
+ page + (bimg.hole_offset + cbimg.hole_length);
+ rdt_datas_last->len =
+ BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
+ }
+ }
+
+ total_len += bimg.length;
+ }
+
+ if (needs_data)
+ {
+ /*
+ * Link the caller-supplied rdata chain for this buffer to the
+ * overall list.
+ */
+ bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
+ bkpb.data_length = regbuf->rdata_len;
+ total_len += regbuf->rdata_len;
+
+ rdt_datas_last->next = regbuf->rdata_head;
+ rdt_datas_last = regbuf->rdata_tail;
+ }
+
+ if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
+ {
+ samerel = true;
+ bkpb.fork_flags |= BKPBLOCK_SAME_REL;
+ }
+ else
+ samerel = false;
+ prev_regbuf = regbuf;
+
+ /* Ok, copy the header to the scratch buffer */
+ memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
+ scratch += SizeOfXLogRecordBlockHeader;
+ if (include_image)
+ {
+ memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
+ scratch += SizeOfXLogRecordBlockImageHeader;
+ if (cbimg.hole_length != 0 && is_compressed)
+ {
+ memcpy(scratch, &cbimg,
+ SizeOfXLogRecordBlockCompressHeader);
+ scratch += SizeOfXLogRecordBlockCompressHeader;
+ }
+ }
+ if (!samerel)
+ {
+ memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
+ scratch += sizeof(RelFileNode);
+ }
+ memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
+ scratch += sizeof(BlockNumber);
+ }
+
+ /* followed by the record's origin, if any */
+ if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
+ replorigin_session_origin != InvalidRepOriginId)
+ {
+ *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
+ memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
+ scratch += sizeof(replorigin_session_origin);
+ }
+
+ /* followed by toplevel XID, if not already included in previous record */
+ if (IsSubxactTopXidLogPending())
+ {
+ TransactionId xid = GetTopTransactionIdIfAny();
+
+ /* Set the flag that the top xid is included in the WAL */
+ *topxid_included = true;
+
+ *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
+ memcpy(scratch, &xid, sizeof(TransactionId));
+ scratch += sizeof(TransactionId);
+ }
+
+ /* followed by main data, if any */
+ if (mainrdata_len > 0)
+ {
+ if (mainrdata_len > 255)
+ {
+ *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
+ memcpy(scratch, &mainrdata_len, sizeof(uint32));
+ scratch += sizeof(uint32);
+ }
+ else
+ {
+ *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
+ *(scratch++) = (uint8) mainrdata_len;
+ }
+ rdt_datas_last->next = mainrdata_head;
+ rdt_datas_last = mainrdata_last;
+ total_len += mainrdata_len;
+ }
+ rdt_datas_last->next = NULL;
+
+ hdr_rdt.len = (scratch - hdr_scratch);
+ total_len += hdr_rdt.len;
+
+ /*
+ * Calculate CRC of the data
+ *
+ * Note that the record header isn't added into the CRC initially since we
+ * don't know the prev-link yet. Thus, the CRC will represent the CRC of
+ * the whole record in the order: rdata, then backup blocks, then record
+ * header.
+ */
+ INIT_CRC32C(rdata_crc);
+ COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
+ for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
+ COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
+
+ /*
+ * Fill in the fields in the record header. Prev-link is filled in later,
+ * once we know where in the WAL the record will be inserted. The CRC does
+ * not include the record header yet.
+ */
+ rechdr->xl_xid = GetCurrentTransactionIdIfAny();
+ rechdr->xl_tot_len = total_len;
+ rechdr->xl_info = info;
+ rechdr->xl_rmid = rmid;
+ rechdr->xl_prev = InvalidXLogRecPtr;
+ rechdr->xl_crc = rdata_crc;
+
+ return &hdr_rdt;
+}
+
+/*
+ * Create a compressed version of a backup block image.
+ *
+ * Returns false if compression fails (i.e., compressed result is actually
+ * bigger than original). Otherwise, returns true and sets 'dlen' to
+ * the length of compressed block image.
+ */
+static bool
+XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
+ char *dest, uint16 *dlen)
+{
+ int32 orig_len = BLCKSZ - hole_length;
+ int32 len = -1;
+ int32 extra_bytes = 0;
+ char *source;
+ PGAlignedBlock tmp;
+
+ if (hole_length != 0)
+ {
+ /* must skip the hole */
+ source = tmp.data;
+ memcpy(source, page, hole_offset);
+ memcpy(source + hole_offset,
+ page + (hole_offset + hole_length),
+ BLCKSZ - (hole_length + hole_offset));
+
+ /*
+ * Extra data needs to be stored in WAL record for the compressed
+ * version of block image if the hole exists.
+ */
+ extra_bytes = SizeOfXLogRecordBlockCompressHeader;
+ }
+ else
+ source = page;
+
+ switch ((WalCompression) wal_compression)
+ {
+ case WAL_COMPRESSION_PGLZ:
+ len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
+ break;
+
+ case WAL_COMPRESSION_LZ4:
+#ifdef USE_LZ4
+ len = LZ4_compress_default(source, dest, orig_len,
+ COMPRESS_BUFSIZE);
+ if (len <= 0)
+ len = -1; /* failure */
+#else
+ elog(ERROR, "LZ4 is not supported by this build");
+#endif
+ break;
+
+ case WAL_COMPRESSION_ZSTD:
+#ifdef USE_ZSTD
+ len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
+ ZSTD_CLEVEL_DEFAULT);
+ if (ZSTD_isError(len))
+ len = -1; /* failure */
+#else
+ elog(ERROR, "zstd is not supported by this build");
+#endif
+ break;
+
+ case WAL_COMPRESSION_NONE:
+ Assert(false); /* cannot happen */
+ break;
+ /* no default case, so that compiler will warn */
+ }
+
+ /*
+ * We recheck the actual size even if compression reports success and see
+ * if the number of bytes saved by compression is larger than the length
+ * of extra data needed for the compressed version of block image.
+ */
+ if (len >= 0 &&
+ len + extra_bytes < orig_len)
+ {
+ *dlen = (uint16) len; /* successful compression */
+ return true;
+ }
+ return false;
+}
+
+/*
+ * Determine whether the buffer referenced has to be backed up.
+ *
+ * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
+ * could change later, so the result should be used for optimization purposes
+ * only.
+ */
+bool
+XLogCheckBufferNeedsBackup(Buffer buffer)
+{
+ XLogRecPtr RedoRecPtr;
+ bool doPageWrites;
+ Page page;
+
+ GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
+
+ page = BufferGetPage(buffer);
+
+ if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
+ return true; /* buffer requires backup */
+
+ return false; /* buffer does not need to be backed up */
+}
+
+/*
+ * Write a backup block if needed when we are setting a hint. Note that
+ * this may be called for a variety of page types, not just heaps.
+ *
+ * Callable while holding just share lock on the buffer content.
+ *
+ * We can't use the plain backup block mechanism since that relies on the
+ * Buffer being exclusively locked. Since some modifications (setting LSN, hint
+ * bits) are allowed in a sharelocked buffer that can lead to wal checksum
+ * failures. So instead we copy the page and insert the copied data as normal
+ * record data.
+ *
+ * We only need to do something if page has not yet been full page written in
+ * this checkpoint round. The LSN of the inserted wal record is returned if we
+ * had to write, InvalidXLogRecPtr otherwise.
+ *
+ * It is possible that multiple concurrent backends could attempt to write WAL
+ * records. In that case, multiple copies of the same block would be recorded
+ * in separate WAL records by different backends, though that is still OK from
+ * a correctness perspective.
+ */
+XLogRecPtr
+XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
+{
+ XLogRecPtr recptr = InvalidXLogRecPtr;
+ XLogRecPtr lsn;
+ XLogRecPtr RedoRecPtr;
+
+ /*
+ * Ensure no checkpoint can change our view of RedoRecPtr.
+ */
+ Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) != 0);
+
+ /*
+ * Update RedoRecPtr so that we can make the right decision
+ */
+ RedoRecPtr = GetRedoRecPtr();
+
+ /*
+ * We assume page LSN is first data on *every* page that can be passed to
+ * XLogInsert, whether it has the standard page layout or not. Since we're
+ * only holding a share-lock on the page, we must take the buffer header
+ * lock when we look at the LSN.
+ */
+ lsn = BufferGetLSNAtomic(buffer);
+
+ if (lsn <= RedoRecPtr)
+ {
+ int flags = 0;
+ PGAlignedBlock copied_buffer;
+ char *origdata = (char *) BufferGetBlock(buffer);
+ RelFileNode rnode;
+ ForkNumber forkno;
+ BlockNumber blkno;
+
+ /*
+ * Copy buffer so we don't have to worry about concurrent hint bit or
+ * lsn updates. We assume pd_lower/upper cannot be changed without an
+ * exclusive lock, so the contents bkp are not racy.
+ */
+ if (buffer_std)
+ {
+ /* Assume we can omit data between pd_lower and pd_upper */
+ Page page = BufferGetPage(buffer);
+ uint16 lower = ((PageHeader) page)->pd_lower;
+ uint16 upper = ((PageHeader) page)->pd_upper;
+
+ memcpy(copied_buffer.data, origdata, lower);
+ memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
+ }
+ else
+ memcpy(copied_buffer.data, origdata, BLCKSZ);
+
+ XLogBeginInsert();
+
+ if (buffer_std)
+ flags |= REGBUF_STANDARD;
+
+ BufferGetTag(buffer, &rnode, &forkno, &blkno);
+ XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer.data, flags);
+
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT);
+ }
+
+ return recptr;
+}
+
+/*
+ * Write a WAL record containing a full image of a page. Caller is responsible
+ * for writing the page to disk after calling this routine.
+ *
+ * Note: If you're using this function, you should be building pages in private
+ * memory and writing them directly to smgr. If you're using buffers, call
+ * log_newpage_buffer instead.
+ *
+ * If the page follows the standard page layout, with a PageHeader and unused
+ * space between pd_lower and pd_upper, set 'page_std' to true. That allows
+ * the unused space to be left out from the WAL record, making it smaller.
+ */
+XLogRecPtr
+log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
+ Page page, bool page_std)
+{
+ int flags;
+ XLogRecPtr recptr;
+
+ flags = REGBUF_FORCE_IMAGE;
+ if (page_std)
+ flags |= REGBUF_STANDARD;
+
+ XLogBeginInsert();
+ XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags);
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
+
+ /*
+ * The page may be uninitialized. If so, we can't set the LSN because that
+ * would corrupt the page.
+ */
+ if (!PageIsNew(page))
+ {
+ PageSetLSN(page, recptr);
+ }
+
+ return recptr;
+}
+
+/*
+ * Like log_newpage(), but allows logging multiple pages in one operation.
+ * It is more efficient than calling log_newpage() for each page separately,
+ * because we can write multiple pages in a single WAL record.
+ */
+void
+log_newpages(RelFileNode *rnode, ForkNumber forkNum, int num_pages,
+ BlockNumber *blknos, Page *pages, bool page_std)
+{
+ int flags;
+ XLogRecPtr recptr;
+ int i;
+ int j;
+
+ flags = REGBUF_FORCE_IMAGE;
+ if (page_std)
+ flags |= REGBUF_STANDARD;
+
+ /*
+ * Iterate over all the pages. They are collected into batches of
+ * XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each
+ * batch.
+ */
+ XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
+
+ i = 0;
+ while (i < num_pages)
+ {
+ int batch_start = i;
+ int nbatch;
+
+ XLogBeginInsert();
+
+ nbatch = 0;
+ while (nbatch < XLR_MAX_BLOCK_ID && i < num_pages)
+ {
+ XLogRegisterBlock(nbatch, rnode, forkNum, blknos[i], pages[i], flags);
+ i++;
+ nbatch++;
+ }
+
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
+
+ for (j = batch_start; j < i; j++)
+ {
+ /*
+ * The page may be uninitialized. If so, we can't set the LSN
+ * because that would corrupt the page.
+ */
+ if (!PageIsNew(pages[j]))
+ {
+ PageSetLSN(pages[j], recptr);
+ }
+ }
+ }
+}
+
+/*
+ * Write a WAL record containing a full image of a page.
+ *
+ * Caller should initialize the buffer and mark it dirty before calling this
+ * function. This function will set the page LSN.
+ *
+ * If the page follows the standard page layout, with a PageHeader and unused
+ * space between pd_lower and pd_upper, set 'page_std' to true. That allows
+ * the unused space to be left out from the WAL record, making it smaller.
+ */
+XLogRecPtr
+log_newpage_buffer(Buffer buffer, bool page_std)
+{
+ Page page = BufferGetPage(buffer);
+ RelFileNode rnode;
+ ForkNumber forkNum;
+ BlockNumber blkno;
+
+ /* Shared buffers should be modified in a critical section. */
+ Assert(CritSectionCount > 0);
+
+ BufferGetTag(buffer, &rnode, &forkNum, &blkno);
+
+ return log_newpage(&rnode, forkNum, blkno, page, page_std);
+}
+
+/*
+ * WAL-log a range of blocks in a relation.
+ *
+ * An image of all pages with block numbers 'startblk' <= X < 'endblk' is
+ * written to the WAL. If the range is large, this is done in multiple WAL
+ * records.
+ *
+ * If all page follows the standard page layout, with a PageHeader and unused
+ * space between pd_lower and pd_upper, set 'page_std' to true. That allows
+ * the unused space to be left out from the WAL records, making them smaller.
+ *
+ * NOTE: This function acquires exclusive-locks on the pages. Typically, this
+ * is used on a newly-built relation, and the caller is holding a
+ * AccessExclusiveLock on it, so no other backend can be accessing it at the
+ * same time. If that's not the case, you must ensure that this does not
+ * cause a deadlock through some other means.
+ */
+void
+log_newpage_range(Relation rel, ForkNumber forkNum,
+ BlockNumber startblk, BlockNumber endblk,
+ bool page_std)
+{
+ int flags;
+ BlockNumber blkno;
+
+ flags = REGBUF_FORCE_IMAGE;
+ if (page_std)
+ flags |= REGBUF_STANDARD;
+
+ /*
+ * Iterate over all the pages in the range. They are collected into
+ * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
+ * for each batch.
+ */
+ XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
+
+ blkno = startblk;
+ while (blkno < endblk)
+ {
+ Buffer bufpack[XLR_MAX_BLOCK_ID];
+ XLogRecPtr recptr;
+ int nbufs;
+ int i;
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* Collect a batch of blocks. */
+ nbufs = 0;
+ while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
+ {
+ Buffer buf = ReadBufferExtended(rel, forkNum, blkno,
+ RBM_NORMAL, NULL);
+
+ LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * Completely empty pages are not WAL-logged. Writing a WAL record
+ * would change the LSN, and we don't want that. We want the page
+ * to stay empty.
+ */
+ if (!PageIsNew(BufferGetPage(buf)))
+ bufpack[nbufs++] = buf;
+ else
+ UnlockReleaseBuffer(buf);
+ blkno++;
+ }
+
+ /* Nothing more to do if all remaining blocks were empty. */
+ if (nbufs == 0)
+ break;
+
+ /* Write WAL record for this batch. */
+ XLogBeginInsert();
+
+ START_CRIT_SECTION();
+ for (i = 0; i < nbufs; i++)
+ {
+ XLogRegisterBuffer(i, bufpack[i], flags);
+ MarkBufferDirty(bufpack[i]);
+ }
+
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
+
+ for (i = 0; i < nbufs; i++)
+ {
+ PageSetLSN(BufferGetPage(bufpack[i]), recptr);
+ UnlockReleaseBuffer(bufpack[i]);
+ }
+ END_CRIT_SECTION();
+ }
+}
+
+/*
+ * Allocate working buffers needed for WAL record construction.
+ */
+void
+InitXLogInsert(void)
+{
+ /* Initialize the working areas */
+ if (xloginsert_cxt == NULL)
+ {
+ xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
+ "WAL record construction",
+ ALLOCSET_DEFAULT_SIZES);
+ }
+
+ if (registered_buffers == NULL)
+ {
+ registered_buffers = (registered_buffer *)
+ MemoryContextAllocZero(xloginsert_cxt,
+ sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
+ max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
+ }
+ if (rdatas == NULL)
+ {
+ rdatas = MemoryContextAlloc(xloginsert_cxt,
+ sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
+ max_rdatas = XLR_NORMAL_RDATAS;
+ }
+
+ /*
+ * Allocate a buffer to hold the header information for a WAL record.
+ */
+ if (hdr_scratch == NULL)
+ hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
+ HEADER_SCRATCH_SIZE);
+}