diff options
Diffstat (limited to 'src/backend/access/transam/xloginsert.c')
-rw-r--r-- | src/backend/access/transam/xloginsert.c | 1318 |
1 files changed, 1318 insertions, 0 deletions
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c new file mode 100644 index 0000000..35cc055 --- /dev/null +++ b/src/backend/access/transam/xloginsert.c @@ -0,0 +1,1318 @@ +/*------------------------------------------------------------------------- + * + * xloginsert.c + * Functions for constructing WAL records + * + * Constructing a WAL record begins with a call to XLogBeginInsert, + * followed by a number of XLogRegister* calls. The registered data is + * collected in private working memory, and finally assembled into a chain + * of XLogRecData structs by a call to XLogRecordAssemble(). See + * access/transam/README for details. + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/transam/xloginsert.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#ifdef USE_LZ4 +#include <lz4.h> +#endif + +#ifdef USE_ZSTD +#include <zstd.h> +#endif + +#include "access/xact.h" +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "access/xloginsert.h" +#include "catalog/pg_control.h" +#include "common/pg_lzcompress.h" +#include "executor/instrument.h" +#include "miscadmin.h" +#include "pg_trace.h" +#include "replication/origin.h" +#include "storage/bufmgr.h" +#include "storage/proc.h" +#include "utils/memutils.h" + +/* + * Guess the maximum buffer size required to store a compressed version of + * backup block image. + */ +#ifdef USE_LZ4 +#define LZ4_MAX_BLCKSZ LZ4_COMPRESSBOUND(BLCKSZ) +#else +#define LZ4_MAX_BLCKSZ 0 +#endif + +#ifdef USE_ZSTD +#define ZSTD_MAX_BLCKSZ ZSTD_COMPRESSBOUND(BLCKSZ) +#else +#define ZSTD_MAX_BLCKSZ 0 +#endif + +#define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ) + +/* Buffer size required to store a compressed version of backup block image */ +#define COMPRESS_BUFSIZE Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ) + +/* + * For each block reference registered with XLogRegisterBuffer, we fill in + * a registered_buffer struct. + */ +typedef struct +{ + bool in_use; /* is this slot in use? */ + uint8 flags; /* REGBUF_* flags */ + RelFileNode rnode; /* identifies the relation and block */ + ForkNumber forkno; + BlockNumber block; + Page page; /* page content */ + uint32 rdata_len; /* total length of data in rdata chain */ + XLogRecData *rdata_head; /* head of the chain of data registered with + * this block */ + XLogRecData *rdata_tail; /* last entry in the chain, or &rdata_head if + * empty */ + + XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to + * backup block data in XLogRecordAssemble() */ + + /* buffer to store a compressed version of backup block image */ + char compressed_page[COMPRESS_BUFSIZE]; +} registered_buffer; + +static registered_buffer *registered_buffers; +static int max_registered_buffers; /* allocated size */ +static int max_registered_block_id = 0; /* highest block_id + 1 currently + * registered */ + +/* + * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered + * with XLogRegisterData(...). + */ +static XLogRecData *mainrdata_head; +static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head; +static uint32 mainrdata_len; /* total # of bytes in chain */ + +/* flags for the in-progress insertion */ +static uint8 curinsert_flags = 0; + +/* + * These are used to hold the record header while constructing a record. + * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization, + * because we want it to be MAXALIGNed and padding bytes zeroed. + * + * For simplicity, it's allocated large enough to hold the headers for any + * WAL record. + */ +static XLogRecData hdr_rdt; +static char *hdr_scratch = NULL; + +#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char)) +#define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char)) + +#define HEADER_SCRATCH_SIZE \ + (SizeOfXLogRecord + \ + MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ + SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \ + SizeOfXLogTransactionId) + +/* + * An array of XLogRecData structs, to hold registered data. + */ +static XLogRecData *rdatas; +static int num_rdatas; /* entries currently used */ +static int max_rdatas; /* allocated size */ + +static bool begininsert_called = false; + +/* Memory context to hold the registered buffer and data references. */ +static MemoryContext xloginsert_cxt; + +static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, + XLogRecPtr RedoRecPtr, bool doPageWrites, + XLogRecPtr *fpw_lsn, int *num_fpi, + bool *topxid_included); +static bool XLogCompressBackupBlock(char *page, uint16 hole_offset, + uint16 hole_length, char *dest, uint16 *dlen); + +/* + * Begin constructing a WAL record. This must be called before the + * XLogRegister* functions and XLogInsert(). + */ +void +XLogBeginInsert(void) +{ + Assert(max_registered_block_id == 0); + Assert(mainrdata_last == (XLogRecData *) &mainrdata_head); + Assert(mainrdata_len == 0); + + /* cross-check on whether we should be here or not */ + if (!XLogInsertAllowed()) + elog(ERROR, "cannot make new WAL entries during recovery"); + + if (begininsert_called) + elog(ERROR, "XLogBeginInsert was already called"); + + begininsert_called = true; +} + +/* + * Ensure that there are enough buffer and data slots in the working area, + * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData + * calls. + * + * There is always space for a small number of buffers and data chunks, enough + * for most record types. This function is for the exceptional cases that need + * more. + */ +void +XLogEnsureRecordSpace(int max_block_id, int ndatas) +{ + int nbuffers; + + /* + * This must be called before entering a critical section, because + * allocating memory inside a critical section can fail. repalloc() will + * check the same, but better to check it here too so that we fail + * consistently even if the arrays happen to be large enough already. + */ + Assert(CritSectionCount == 0); + + /* the minimum values can't be decreased */ + if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID) + max_block_id = XLR_NORMAL_MAX_BLOCK_ID; + if (ndatas < XLR_NORMAL_RDATAS) + ndatas = XLR_NORMAL_RDATAS; + + if (max_block_id > XLR_MAX_BLOCK_ID) + elog(ERROR, "maximum number of WAL record block references exceeded"); + nbuffers = max_block_id + 1; + + if (nbuffers > max_registered_buffers) + { + registered_buffers = (registered_buffer *) + repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers); + + /* + * At least the padding bytes in the structs must be zeroed, because + * they are included in WAL data, but initialize it all for tidiness. + */ + MemSet(®istered_buffers[max_registered_buffers], 0, + (nbuffers - max_registered_buffers) * sizeof(registered_buffer)); + max_registered_buffers = nbuffers; + } + + if (ndatas > max_rdatas) + { + rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas); + max_rdatas = ndatas; + } +} + +/* + * Reset WAL record construction buffers. + */ +void +XLogResetInsertion(void) +{ + int i; + + for (i = 0; i < max_registered_block_id; i++) + registered_buffers[i].in_use = false; + + num_rdatas = 0; + max_registered_block_id = 0; + mainrdata_len = 0; + mainrdata_last = (XLogRecData *) &mainrdata_head; + curinsert_flags = 0; + begininsert_called = false; +} + +/* + * Register a reference to a buffer with the WAL record being constructed. + * This must be called for every page that the WAL-logged operation modifies. + */ +void +XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags) +{ + registered_buffer *regbuf; + + /* NO_IMAGE doesn't make sense with FORCE_IMAGE */ + Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE)))); + Assert(begininsert_called); + + if (block_id >= max_registered_block_id) + { + if (block_id >= max_registered_buffers) + elog(ERROR, "too many registered buffers"); + max_registered_block_id = block_id + 1; + } + + regbuf = ®istered_buffers[block_id]; + + BufferGetTag(buffer, ®buf->rnode, ®buf->forkno, ®buf->block); + regbuf->page = BufferGetPage(buffer); + regbuf->flags = flags; + regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head; + regbuf->rdata_len = 0; + + /* + * Check that this page hasn't already been registered with some other + * block_id. + */ +#ifdef USE_ASSERT_CHECKING + { + int i; + + for (i = 0; i < max_registered_block_id; i++) + { + registered_buffer *regbuf_old = ®istered_buffers[i]; + + if (i == block_id || !regbuf_old->in_use) + continue; + + Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) || + regbuf_old->forkno != regbuf->forkno || + regbuf_old->block != regbuf->block); + } + } +#endif + + regbuf->in_use = true; +} + +/* + * Like XLogRegisterBuffer, but for registering a block that's not in the + * shared buffer pool (i.e. when you don't have a Buffer for it). + */ +void +XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum, + BlockNumber blknum, Page page, uint8 flags) +{ + registered_buffer *regbuf; + + Assert(begininsert_called); + + if (block_id >= max_registered_block_id) + max_registered_block_id = block_id + 1; + + if (block_id >= max_registered_buffers) + elog(ERROR, "too many registered buffers"); + + regbuf = ®istered_buffers[block_id]; + + regbuf->rnode = *rnode; + regbuf->forkno = forknum; + regbuf->block = blknum; + regbuf->page = page; + regbuf->flags = flags; + regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head; + regbuf->rdata_len = 0; + + /* + * Check that this page hasn't already been registered with some other + * block_id. + */ +#ifdef USE_ASSERT_CHECKING + { + int i; + + for (i = 0; i < max_registered_block_id; i++) + { + registered_buffer *regbuf_old = ®istered_buffers[i]; + + if (i == block_id || !regbuf_old->in_use) + continue; + + Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) || + regbuf_old->forkno != regbuf->forkno || + regbuf_old->block != regbuf->block); + } + } +#endif + + regbuf->in_use = true; +} + +/* + * Add data to the WAL record that's being constructed. + * + * The data is appended to the "main chunk", available at replay with + * XLogRecGetData(). + */ +void +XLogRegisterData(char *data, int len) +{ + XLogRecData *rdata; + + Assert(begininsert_called); + + if (num_rdatas >= max_rdatas) + elog(ERROR, "too much WAL data"); + rdata = &rdatas[num_rdatas++]; + + rdata->data = data; + rdata->len = len; + + /* + * we use the mainrdata_last pointer to track the end of the chain, so no + * need to clear 'next' here. + */ + + mainrdata_last->next = rdata; + mainrdata_last = rdata; + + mainrdata_len += len; +} + +/* + * Add buffer-specific data to the WAL record that's being constructed. + * + * Block_id must reference a block previously registered with + * XLogRegisterBuffer(). If this is called more than once for the same + * block_id, the data is appended. + * + * The maximum amount of data that can be registered per block is 65535 + * bytes. That should be plenty; if you need more than BLCKSZ bytes to + * reconstruct the changes to the page, you might as well just log a full + * copy of it. (the "main data" that's not associated with a block is not + * limited) + */ +void +XLogRegisterBufData(uint8 block_id, char *data, int len) +{ + registered_buffer *regbuf; + XLogRecData *rdata; + + Assert(begininsert_called); + + /* find the registered buffer struct */ + regbuf = ®istered_buffers[block_id]; + if (!regbuf->in_use) + elog(ERROR, "no block with id %d registered with WAL insertion", + block_id); + + if (num_rdatas >= max_rdatas) + elog(ERROR, "too much WAL data"); + rdata = &rdatas[num_rdatas++]; + + rdata->data = data; + rdata->len = len; + + regbuf->rdata_tail->next = rdata; + regbuf->rdata_tail = rdata; + regbuf->rdata_len += len; +} + +/* + * Set insert status flags for the upcoming WAL record. + * + * The flags that can be used here are: + * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be + * included in the record. + * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for + * durability, which allows to avoid triggering WAL archiving and other + * background activity. + */ +void +XLogSetRecordFlags(uint8 flags) +{ + Assert(begininsert_called); + curinsert_flags |= flags; +} + +/* + * Insert an XLOG record having the specified RMID and info bytes, with the + * body of the record being the data and buffer references registered earlier + * with XLogRegister* calls. + * + * Returns XLOG pointer to end of record (beginning of next record). + * This can be used as LSN for data pages affected by the logged action. + * (LSN is the XLOG point up to which the XLOG must be flushed to disk + * before the data page can be written out. This implements the basic + * WAL rule "write the log before the data".) + */ +XLogRecPtr +XLogInsert(RmgrId rmid, uint8 info) +{ + XLogRecPtr EndPos; + + /* XLogBeginInsert() must have been called. */ + if (!begininsert_called) + elog(ERROR, "XLogBeginInsert was not called"); + + /* + * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and + * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me. + */ + if ((info & ~(XLR_RMGR_INFO_MASK | + XLR_SPECIAL_REL_UPDATE | + XLR_CHECK_CONSISTENCY)) != 0) + elog(PANIC, "invalid xlog info mask %02X", info); + + TRACE_POSTGRESQL_WAL_INSERT(rmid, info); + + /* + * In bootstrap mode, we don't actually log anything but XLOG resources; + * return a phony record pointer. + */ + if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) + { + XLogResetInsertion(); + EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */ + return EndPos; + } + + do + { + XLogRecPtr RedoRecPtr; + bool doPageWrites; + bool topxid_included = false; + XLogRecPtr fpw_lsn; + XLogRecData *rdt; + int num_fpi = 0; + + /* + * Get values needed to decide whether to do full-page writes. Since + * we don't yet have an insertion lock, these could change under us, + * but XLogInsertRecord will recheck them once it has a lock. + */ + GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); + + rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, + &fpw_lsn, &num_fpi, &topxid_included); + + EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi, + topxid_included); + } while (EndPos == InvalidXLogRecPtr); + + XLogResetInsertion(); + + return EndPos; +} + +/* + * Assemble a WAL record from the registered data and buffers into an + * XLogRecData chain, ready for insertion with XLogInsertRecord(). + * + * The record header fields are filled in, except for the xl_prev field. The + * calculated CRC does not include the record header yet. + * + * If there are any registered buffers, and a full-page image was not taken + * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This + * signals that the assembled record is only good for insertion on the + * assumption that the RedoRecPtr and doPageWrites values were up-to-date. + * + * *topxid_included is set if the topmost transaction ID is logged with the + * current subtransaction. + */ +static XLogRecData * +XLogRecordAssemble(RmgrId rmid, uint8 info, + XLogRecPtr RedoRecPtr, bool doPageWrites, + XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included) +{ + XLogRecData *rdt; + uint32 total_len = 0; + int block_id; + pg_crc32c rdata_crc; + registered_buffer *prev_regbuf = NULL; + XLogRecData *rdt_datas_last; + XLogRecord *rechdr; + char *scratch = hdr_scratch; + + /* + * Note: this function can be called multiple times for the same record. + * All the modifications we do to the rdata chains below must handle that. + */ + + /* The record begins with the fixed-size header */ + rechdr = (XLogRecord *) scratch; + scratch += SizeOfXLogRecord; + + hdr_rdt.next = NULL; + rdt_datas_last = &hdr_rdt; + hdr_rdt.data = hdr_scratch; + + /* + * Enforce consistency checks for this record if user is looking for it. + * Do this before at the beginning of this routine to give the possibility + * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for + * a record. + */ + if (wal_consistency_checking[rmid]) + info |= XLR_CHECK_CONSISTENCY; + + /* + * Make an rdata chain containing all the data portions of all block + * references. This includes the data for full-page images. Also append + * the headers for the block references in the scratch buffer. + */ + *fpw_lsn = InvalidXLogRecPtr; + for (block_id = 0; block_id < max_registered_block_id; block_id++) + { + registered_buffer *regbuf = ®istered_buffers[block_id]; + bool needs_backup; + bool needs_data; + XLogRecordBlockHeader bkpb; + XLogRecordBlockImageHeader bimg; + XLogRecordBlockCompressHeader cbimg = {0}; + bool samerel; + bool is_compressed = false; + bool include_image; + + if (!regbuf->in_use) + continue; + + /* Determine if this block needs to be backed up */ + if (regbuf->flags & REGBUF_FORCE_IMAGE) + needs_backup = true; + else if (regbuf->flags & REGBUF_NO_IMAGE) + needs_backup = false; + else if (!doPageWrites) + needs_backup = false; + else + { + /* + * We assume page LSN is first data on *every* page that can be + * passed to XLogInsert, whether it has the standard page layout + * or not. + */ + XLogRecPtr page_lsn = PageGetLSN(regbuf->page); + + needs_backup = (page_lsn <= RedoRecPtr); + if (!needs_backup) + { + if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn) + *fpw_lsn = page_lsn; + } + } + + /* Determine if the buffer data needs to included */ + if (regbuf->rdata_len == 0) + needs_data = false; + else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0) + needs_data = true; + else + needs_data = !needs_backup; + + bkpb.id = block_id; + bkpb.fork_flags = regbuf->forkno; + bkpb.data_length = 0; + + if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT) + bkpb.fork_flags |= BKPBLOCK_WILL_INIT; + + /* + * If needs_backup is true or WAL checking is enabled for current + * resource manager, log a full-page write for the current block. + */ + include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0; + + if (include_image) + { + Page page = regbuf->page; + uint16 compressed_len = 0; + + /* + * The page needs to be backed up, so calculate its hole length + * and offset. + */ + if (regbuf->flags & REGBUF_STANDARD) + { + /* Assume we can omit data between pd_lower and pd_upper */ + uint16 lower = ((PageHeader) page)->pd_lower; + uint16 upper = ((PageHeader) page)->pd_upper; + + if (lower >= SizeOfPageHeaderData && + upper > lower && + upper <= BLCKSZ) + { + bimg.hole_offset = lower; + cbimg.hole_length = upper - lower; + } + else + { + /* No "hole" to remove */ + bimg.hole_offset = 0; + cbimg.hole_length = 0; + } + } + else + { + /* Not a standard page header, don't try to eliminate "hole" */ + bimg.hole_offset = 0; + cbimg.hole_length = 0; + } + + /* + * Try to compress a block image if wal_compression is enabled + */ + if (wal_compression != WAL_COMPRESSION_NONE) + { + is_compressed = + XLogCompressBackupBlock(page, bimg.hole_offset, + cbimg.hole_length, + regbuf->compressed_page, + &compressed_len); + } + + /* + * Fill in the remaining fields in the XLogRecordBlockHeader + * struct + */ + bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE; + + /* Report a full page image constructed for the WAL record */ + *num_fpi += 1; + + /* + * Construct XLogRecData entries for the page content. + */ + rdt_datas_last->next = ®buf->bkp_rdatas[0]; + rdt_datas_last = rdt_datas_last->next; + + bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE; + + /* + * If WAL consistency checking is enabled for the resource manager + * of this WAL record, a full-page image is included in the record + * for the block modified. During redo, the full-page is replayed + * only if BKPIMAGE_APPLY is set. + */ + if (needs_backup) + bimg.bimg_info |= BKPIMAGE_APPLY; + + if (is_compressed) + { + /* The current compression is stored in the WAL record */ + bimg.length = compressed_len; + + /* Set the compression method used for this block */ + switch ((WalCompression) wal_compression) + { + case WAL_COMPRESSION_PGLZ: + bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ; + break; + + case WAL_COMPRESSION_LZ4: +#ifdef USE_LZ4 + bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4; +#else + elog(ERROR, "LZ4 is not supported by this build"); +#endif + break; + + case WAL_COMPRESSION_ZSTD: +#ifdef USE_ZSTD + bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD; +#else + elog(ERROR, "zstd is not supported by this build"); +#endif + break; + + case WAL_COMPRESSION_NONE: + Assert(false); /* cannot happen */ + break; + /* no default case, so that compiler will warn */ + } + + rdt_datas_last->data = regbuf->compressed_page; + rdt_datas_last->len = compressed_len; + } + else + { + bimg.length = BLCKSZ - cbimg.hole_length; + + if (cbimg.hole_length == 0) + { + rdt_datas_last->data = page; + rdt_datas_last->len = BLCKSZ; + } + else + { + /* must skip the hole */ + rdt_datas_last->data = page; + rdt_datas_last->len = bimg.hole_offset; + + rdt_datas_last->next = ®buf->bkp_rdatas[1]; + rdt_datas_last = rdt_datas_last->next; + + rdt_datas_last->data = + page + (bimg.hole_offset + cbimg.hole_length); + rdt_datas_last->len = + BLCKSZ - (bimg.hole_offset + cbimg.hole_length); + } + } + + total_len += bimg.length; + } + + if (needs_data) + { + /* + * Link the caller-supplied rdata chain for this buffer to the + * overall list. + */ + bkpb.fork_flags |= BKPBLOCK_HAS_DATA; + bkpb.data_length = regbuf->rdata_len; + total_len += regbuf->rdata_len; + + rdt_datas_last->next = regbuf->rdata_head; + rdt_datas_last = regbuf->rdata_tail; + } + + if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode)) + { + samerel = true; + bkpb.fork_flags |= BKPBLOCK_SAME_REL; + } + else + samerel = false; + prev_regbuf = regbuf; + + /* Ok, copy the header to the scratch buffer */ + memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader); + scratch += SizeOfXLogRecordBlockHeader; + if (include_image) + { + memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader); + scratch += SizeOfXLogRecordBlockImageHeader; + if (cbimg.hole_length != 0 && is_compressed) + { + memcpy(scratch, &cbimg, + SizeOfXLogRecordBlockCompressHeader); + scratch += SizeOfXLogRecordBlockCompressHeader; + } + } + if (!samerel) + { + memcpy(scratch, ®buf->rnode, sizeof(RelFileNode)); + scratch += sizeof(RelFileNode); + } + memcpy(scratch, ®buf->block, sizeof(BlockNumber)); + scratch += sizeof(BlockNumber); + } + + /* followed by the record's origin, if any */ + if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) && + replorigin_session_origin != InvalidRepOriginId) + { + *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN; + memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin)); + scratch += sizeof(replorigin_session_origin); + } + + /* followed by toplevel XID, if not already included in previous record */ + if (IsSubxactTopXidLogPending()) + { + TransactionId xid = GetTopTransactionIdIfAny(); + + /* Set the flag that the top xid is included in the WAL */ + *topxid_included = true; + + *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID; + memcpy(scratch, &xid, sizeof(TransactionId)); + scratch += sizeof(TransactionId); + } + + /* followed by main data, if any */ + if (mainrdata_len > 0) + { + if (mainrdata_len > 255) + { + *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG; + memcpy(scratch, &mainrdata_len, sizeof(uint32)); + scratch += sizeof(uint32); + } + else + { + *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT; + *(scratch++) = (uint8) mainrdata_len; + } + rdt_datas_last->next = mainrdata_head; + rdt_datas_last = mainrdata_last; + total_len += mainrdata_len; + } + rdt_datas_last->next = NULL; + + hdr_rdt.len = (scratch - hdr_scratch); + total_len += hdr_rdt.len; + + /* + * Calculate CRC of the data + * + * Note that the record header isn't added into the CRC initially since we + * don't know the prev-link yet. Thus, the CRC will represent the CRC of + * the whole record in the order: rdata, then backup blocks, then record + * header. + */ + INIT_CRC32C(rdata_crc); + COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord); + for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next) + COMP_CRC32C(rdata_crc, rdt->data, rdt->len); + + /* + * Fill in the fields in the record header. Prev-link is filled in later, + * once we know where in the WAL the record will be inserted. The CRC does + * not include the record header yet. + */ + rechdr->xl_xid = GetCurrentTransactionIdIfAny(); + rechdr->xl_tot_len = total_len; + rechdr->xl_info = info; + rechdr->xl_rmid = rmid; + rechdr->xl_prev = InvalidXLogRecPtr; + rechdr->xl_crc = rdata_crc; + + return &hdr_rdt; +} + +/* + * Create a compressed version of a backup block image. + * + * Returns false if compression fails (i.e., compressed result is actually + * bigger than original). Otherwise, returns true and sets 'dlen' to + * the length of compressed block image. + */ +static bool +XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length, + char *dest, uint16 *dlen) +{ + int32 orig_len = BLCKSZ - hole_length; + int32 len = -1; + int32 extra_bytes = 0; + char *source; + PGAlignedBlock tmp; + + if (hole_length != 0) + { + /* must skip the hole */ + source = tmp.data; + memcpy(source, page, hole_offset); + memcpy(source + hole_offset, + page + (hole_offset + hole_length), + BLCKSZ - (hole_length + hole_offset)); + + /* + * Extra data needs to be stored in WAL record for the compressed + * version of block image if the hole exists. + */ + extra_bytes = SizeOfXLogRecordBlockCompressHeader; + } + else + source = page; + + switch ((WalCompression) wal_compression) + { + case WAL_COMPRESSION_PGLZ: + len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default); + break; + + case WAL_COMPRESSION_LZ4: +#ifdef USE_LZ4 + len = LZ4_compress_default(source, dest, orig_len, + COMPRESS_BUFSIZE); + if (len <= 0) + len = -1; /* failure */ +#else + elog(ERROR, "LZ4 is not supported by this build"); +#endif + break; + + case WAL_COMPRESSION_ZSTD: +#ifdef USE_ZSTD + len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len, + ZSTD_CLEVEL_DEFAULT); + if (ZSTD_isError(len)) + len = -1; /* failure */ +#else + elog(ERROR, "zstd is not supported by this build"); +#endif + break; + + case WAL_COMPRESSION_NONE: + Assert(false); /* cannot happen */ + break; + /* no default case, so that compiler will warn */ + } + + /* + * We recheck the actual size even if compression reports success and see + * if the number of bytes saved by compression is larger than the length + * of extra data needed for the compressed version of block image. + */ + if (len >= 0 && + len + extra_bytes < orig_len) + { + *dlen = (uint16) len; /* successful compression */ + return true; + } + return false; +} + +/* + * Determine whether the buffer referenced has to be backed up. + * + * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites + * could change later, so the result should be used for optimization purposes + * only. + */ +bool +XLogCheckBufferNeedsBackup(Buffer buffer) +{ + XLogRecPtr RedoRecPtr; + bool doPageWrites; + Page page; + + GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); + + page = BufferGetPage(buffer); + + if (doPageWrites && PageGetLSN(page) <= RedoRecPtr) + return true; /* buffer requires backup */ + + return false; /* buffer does not need to be backed up */ +} + +/* + * Write a backup block if needed when we are setting a hint. Note that + * this may be called for a variety of page types, not just heaps. + * + * Callable while holding just share lock on the buffer content. + * + * We can't use the plain backup block mechanism since that relies on the + * Buffer being exclusively locked. Since some modifications (setting LSN, hint + * bits) are allowed in a sharelocked buffer that can lead to wal checksum + * failures. So instead we copy the page and insert the copied data as normal + * record data. + * + * We only need to do something if page has not yet been full page written in + * this checkpoint round. The LSN of the inserted wal record is returned if we + * had to write, InvalidXLogRecPtr otherwise. + * + * It is possible that multiple concurrent backends could attempt to write WAL + * records. In that case, multiple copies of the same block would be recorded + * in separate WAL records by different backends, though that is still OK from + * a correctness perspective. + */ +XLogRecPtr +XLogSaveBufferForHint(Buffer buffer, bool buffer_std) +{ + XLogRecPtr recptr = InvalidXLogRecPtr; + XLogRecPtr lsn; + XLogRecPtr RedoRecPtr; + + /* + * Ensure no checkpoint can change our view of RedoRecPtr. + */ + Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) != 0); + + /* + * Update RedoRecPtr so that we can make the right decision + */ + RedoRecPtr = GetRedoRecPtr(); + + /* + * We assume page LSN is first data on *every* page that can be passed to + * XLogInsert, whether it has the standard page layout or not. Since we're + * only holding a share-lock on the page, we must take the buffer header + * lock when we look at the LSN. + */ + lsn = BufferGetLSNAtomic(buffer); + + if (lsn <= RedoRecPtr) + { + int flags = 0; + PGAlignedBlock copied_buffer; + char *origdata = (char *) BufferGetBlock(buffer); + RelFileNode rnode; + ForkNumber forkno; + BlockNumber blkno; + + /* + * Copy buffer so we don't have to worry about concurrent hint bit or + * lsn updates. We assume pd_lower/upper cannot be changed without an + * exclusive lock, so the contents bkp are not racy. + */ + if (buffer_std) + { + /* Assume we can omit data between pd_lower and pd_upper */ + Page page = BufferGetPage(buffer); + uint16 lower = ((PageHeader) page)->pd_lower; + uint16 upper = ((PageHeader) page)->pd_upper; + + memcpy(copied_buffer.data, origdata, lower); + memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper); + } + else + memcpy(copied_buffer.data, origdata, BLCKSZ); + + XLogBeginInsert(); + + if (buffer_std) + flags |= REGBUF_STANDARD; + + BufferGetTag(buffer, &rnode, &forkno, &blkno); + XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer.data, flags); + + recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT); + } + + return recptr; +} + +/* + * Write a WAL record containing a full image of a page. Caller is responsible + * for writing the page to disk after calling this routine. + * + * Note: If you're using this function, you should be building pages in private + * memory and writing them directly to smgr. If you're using buffers, call + * log_newpage_buffer instead. + * + * If the page follows the standard page layout, with a PageHeader and unused + * space between pd_lower and pd_upper, set 'page_std' to true. That allows + * the unused space to be left out from the WAL record, making it smaller. + */ +XLogRecPtr +log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, + Page page, bool page_std) +{ + int flags; + XLogRecPtr recptr; + + flags = REGBUF_FORCE_IMAGE; + if (page_std) + flags |= REGBUF_STANDARD; + + XLogBeginInsert(); + XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags); + recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI); + + /* + * The page may be uninitialized. If so, we can't set the LSN because that + * would corrupt the page. + */ + if (!PageIsNew(page)) + { + PageSetLSN(page, recptr); + } + + return recptr; +} + +/* + * Like log_newpage(), but allows logging multiple pages in one operation. + * It is more efficient than calling log_newpage() for each page separately, + * because we can write multiple pages in a single WAL record. + */ +void +log_newpages(RelFileNode *rnode, ForkNumber forkNum, int num_pages, + BlockNumber *blknos, Page *pages, bool page_std) +{ + int flags; + XLogRecPtr recptr; + int i; + int j; + + flags = REGBUF_FORCE_IMAGE; + if (page_std) + flags |= REGBUF_STANDARD; + + /* + * Iterate over all the pages. They are collected into batches of + * XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each + * batch. + */ + XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0); + + i = 0; + while (i < num_pages) + { + int batch_start = i; + int nbatch; + + XLogBeginInsert(); + + nbatch = 0; + while (nbatch < XLR_MAX_BLOCK_ID && i < num_pages) + { + XLogRegisterBlock(nbatch, rnode, forkNum, blknos[i], pages[i], flags); + i++; + nbatch++; + } + + recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI); + + for (j = batch_start; j < i; j++) + { + /* + * The page may be uninitialized. If so, we can't set the LSN + * because that would corrupt the page. + */ + if (!PageIsNew(pages[j])) + { + PageSetLSN(pages[j], recptr); + } + } + } +} + +/* + * Write a WAL record containing a full image of a page. + * + * Caller should initialize the buffer and mark it dirty before calling this + * function. This function will set the page LSN. + * + * If the page follows the standard page layout, with a PageHeader and unused + * space between pd_lower and pd_upper, set 'page_std' to true. That allows + * the unused space to be left out from the WAL record, making it smaller. + */ +XLogRecPtr +log_newpage_buffer(Buffer buffer, bool page_std) +{ + Page page = BufferGetPage(buffer); + RelFileNode rnode; + ForkNumber forkNum; + BlockNumber blkno; + + /* Shared buffers should be modified in a critical section. */ + Assert(CritSectionCount > 0); + + BufferGetTag(buffer, &rnode, &forkNum, &blkno); + + return log_newpage(&rnode, forkNum, blkno, page, page_std); +} + +/* + * WAL-log a range of blocks in a relation. + * + * An image of all pages with block numbers 'startblk' <= X < 'endblk' is + * written to the WAL. If the range is large, this is done in multiple WAL + * records. + * + * If all page follows the standard page layout, with a PageHeader and unused + * space between pd_lower and pd_upper, set 'page_std' to true. That allows + * the unused space to be left out from the WAL records, making them smaller. + * + * NOTE: This function acquires exclusive-locks on the pages. Typically, this + * is used on a newly-built relation, and the caller is holding a + * AccessExclusiveLock on it, so no other backend can be accessing it at the + * same time. If that's not the case, you must ensure that this does not + * cause a deadlock through some other means. + */ +void +log_newpage_range(Relation rel, ForkNumber forkNum, + BlockNumber startblk, BlockNumber endblk, + bool page_std) +{ + int flags; + BlockNumber blkno; + + flags = REGBUF_FORCE_IMAGE; + if (page_std) + flags |= REGBUF_STANDARD; + + /* + * Iterate over all the pages in the range. They are collected into + * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written + * for each batch. + */ + XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0); + + blkno = startblk; + while (blkno < endblk) + { + Buffer bufpack[XLR_MAX_BLOCK_ID]; + XLogRecPtr recptr; + int nbufs; + int i; + + CHECK_FOR_INTERRUPTS(); + + /* Collect a batch of blocks. */ + nbufs = 0; + while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk) + { + Buffer buf = ReadBufferExtended(rel, forkNum, blkno, + RBM_NORMAL, NULL); + + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); + + /* + * Completely empty pages are not WAL-logged. Writing a WAL record + * would change the LSN, and we don't want that. We want the page + * to stay empty. + */ + if (!PageIsNew(BufferGetPage(buf))) + bufpack[nbufs++] = buf; + else + UnlockReleaseBuffer(buf); + blkno++; + } + + /* Nothing more to do if all remaining blocks were empty. */ + if (nbufs == 0) + break; + + /* Write WAL record for this batch. */ + XLogBeginInsert(); + + START_CRIT_SECTION(); + for (i = 0; i < nbufs; i++) + { + XLogRegisterBuffer(i, bufpack[i], flags); + MarkBufferDirty(bufpack[i]); + } + + recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI); + + for (i = 0; i < nbufs; i++) + { + PageSetLSN(BufferGetPage(bufpack[i]), recptr); + UnlockReleaseBuffer(bufpack[i]); + } + END_CRIT_SECTION(); + } +} + +/* + * Allocate working buffers needed for WAL record construction. + */ +void +InitXLogInsert(void) +{ + /* Initialize the working areas */ + if (xloginsert_cxt == NULL) + { + xloginsert_cxt = AllocSetContextCreate(TopMemoryContext, + "WAL record construction", + ALLOCSET_DEFAULT_SIZES); + } + + if (registered_buffers == NULL) + { + registered_buffers = (registered_buffer *) + MemoryContextAllocZero(xloginsert_cxt, + sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1)); + max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1; + } + if (rdatas == NULL) + { + rdatas = MemoryContextAlloc(xloginsert_cxt, + sizeof(XLogRecData) * XLR_NORMAL_RDATAS); + max_rdatas = XLR_NORMAL_RDATAS; + } + + /* + * Allocate a buffer to hold the header information for a WAL record. + */ + if (hdr_scratch == NULL) + hdr_scratch = MemoryContextAllocZero(xloginsert_cxt, + HEADER_SCRATCH_SIZE); +} |