/*------------------------------------------------------------------------- * * xloginsert.c * Functions for constructing WAL records * * Constructing a WAL record begins with a call to XLogBeginInsert, * followed by a number of XLogRegister* calls. The registered data is * collected in private working memory, and finally assembled into a chain * of XLogRecData structs by a call to XLogRecordAssemble(). See * access/transam/README for details. * * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * src/backend/access/transam/xloginsert.c * *------------------------------------------------------------------------- */ #include "postgres.h" #ifdef USE_LZ4 #include #endif #ifdef USE_ZSTD #include #endif #include "access/xact.h" #include "access/xlog.h" #include "access/xlog_internal.h" #include "access/xloginsert.h" #include "catalog/pg_control.h" #include "common/pg_lzcompress.h" #include "executor/instrument.h" #include "miscadmin.h" #include "pg_trace.h" #include "replication/origin.h" #include "storage/bufmgr.h" #include "storage/proc.h" #include "utils/memutils.h" /* * Guess the maximum buffer size required to store a compressed version of * backup block image. */ #ifdef USE_LZ4 #define LZ4_MAX_BLCKSZ LZ4_COMPRESSBOUND(BLCKSZ) #else #define LZ4_MAX_BLCKSZ 0 #endif #ifdef USE_ZSTD #define ZSTD_MAX_BLCKSZ ZSTD_COMPRESSBOUND(BLCKSZ) #else #define ZSTD_MAX_BLCKSZ 0 #endif #define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ) /* Buffer size required to store a compressed version of backup block image */ #define COMPRESS_BUFSIZE Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ) /* * For each block reference registered with XLogRegisterBuffer, we fill in * a registered_buffer struct. */ typedef struct { bool in_use; /* is this slot in use? */ uint8 flags; /* REGBUF_* flags */ RelFileLocator rlocator; /* identifies the relation and block */ ForkNumber forkno; BlockNumber block; Page page; /* page content */ uint32 rdata_len; /* total length of data in rdata chain */ XLogRecData *rdata_head; /* head of the chain of data registered with * this block */ XLogRecData *rdata_tail; /* last entry in the chain, or &rdata_head if * empty */ XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to * backup block data in XLogRecordAssemble() */ /* buffer to store a compressed version of backup block image */ char compressed_page[COMPRESS_BUFSIZE]; } registered_buffer; static registered_buffer *registered_buffers; static int max_registered_buffers; /* allocated size */ static int max_registered_block_id = 0; /* highest block_id + 1 currently * registered */ /* * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered * with XLogRegisterData(...). */ static XLogRecData *mainrdata_head; static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head; static uint64 mainrdata_len; /* total # of bytes in chain */ /* flags for the in-progress insertion */ static uint8 curinsert_flags = 0; /* * These are used to hold the record header while constructing a record. * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization, * because we want it to be MAXALIGNed and padding bytes zeroed. * * For simplicity, it's allocated large enough to hold the headers for any * WAL record. */ static XLogRecData hdr_rdt; static char *hdr_scratch = NULL; #define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char)) #define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char)) #define HEADER_SCRATCH_SIZE \ (SizeOfXLogRecord + \ MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \ SizeOfXLogTransactionId) /* * An array of XLogRecData structs, to hold registered data. */ static XLogRecData *rdatas; static int num_rdatas; /* entries currently used */ static int max_rdatas; /* allocated size */ static bool begininsert_called = false; /* Memory context to hold the registered buffer and data references. */ static MemoryContext xloginsert_cxt; static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included); static bool XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length, char *dest, uint16 *dlen); /* * Begin constructing a WAL record. This must be called before the * XLogRegister* functions and XLogInsert(). */ void XLogBeginInsert(void) { Assert(max_registered_block_id == 0); Assert(mainrdata_last == (XLogRecData *) &mainrdata_head); Assert(mainrdata_len == 0); /* cross-check on whether we should be here or not */ if (!XLogInsertAllowed()) elog(ERROR, "cannot make new WAL entries during recovery"); if (begininsert_called) elog(ERROR, "XLogBeginInsert was already called"); begininsert_called = true; } /* * Ensure that there are enough buffer and data slots in the working area, * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData * calls. * * There is always space for a small number of buffers and data chunks, enough * for most record types. This function is for the exceptional cases that need * more. */ void XLogEnsureRecordSpace(int max_block_id, int ndatas) { int nbuffers; /* * This must be called before entering a critical section, because * allocating memory inside a critical section can fail. repalloc() will * check the same, but better to check it here too so that we fail * consistently even if the arrays happen to be large enough already. */ Assert(CritSectionCount == 0); /* the minimum values can't be decreased */ if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID) max_block_id = XLR_NORMAL_MAX_BLOCK_ID; if (ndatas < XLR_NORMAL_RDATAS) ndatas = XLR_NORMAL_RDATAS; if (max_block_id > XLR_MAX_BLOCK_ID) elog(ERROR, "maximum number of WAL record block references exceeded"); nbuffers = max_block_id + 1; if (nbuffers > max_registered_buffers) { registered_buffers = (registered_buffer *) repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers); /* * At least the padding bytes in the structs must be zeroed, because * they are included in WAL data, but initialize it all for tidiness. */ MemSet(®istered_buffers[max_registered_buffers], 0, (nbuffers - max_registered_buffers) * sizeof(registered_buffer)); max_registered_buffers = nbuffers; } if (ndatas > max_rdatas) { rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas); max_rdatas = ndatas; } } /* * Reset WAL record construction buffers. */ void XLogResetInsertion(void) { int i; for (i = 0; i < max_registered_block_id; i++) registered_buffers[i].in_use = false; num_rdatas = 0; max_registered_block_id = 0; mainrdata_len = 0; mainrdata_last = (XLogRecData *) &mainrdata_head; curinsert_flags = 0; begininsert_called = false; } /* * Register a reference to a buffer with the WAL record being constructed. * This must be called for every page that the WAL-logged operation modifies. */ void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags) { registered_buffer *regbuf; /* NO_IMAGE doesn't make sense with FORCE_IMAGE */ Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE)))); Assert(begininsert_called); if (block_id >= max_registered_block_id) { if (block_id >= max_registered_buffers) elog(ERROR, "too many registered buffers"); max_registered_block_id = block_id + 1; } regbuf = ®istered_buffers[block_id]; BufferGetTag(buffer, ®buf->rlocator, ®buf->forkno, ®buf->block); regbuf->page = BufferGetPage(buffer); regbuf->flags = flags; regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head; regbuf->rdata_len = 0; /* * Check that this page hasn't already been registered with some other * block_id. */ #ifdef USE_ASSERT_CHECKING { int i; for (i = 0; i < max_registered_block_id; i++) { registered_buffer *regbuf_old = ®istered_buffers[i]; if (i == block_id || !regbuf_old->in_use) continue; Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) || regbuf_old->forkno != regbuf->forkno || regbuf_old->block != regbuf->block); } } #endif regbuf->in_use = true; } /* * Like XLogRegisterBuffer, but for registering a block that's not in the * shared buffer pool (i.e. when you don't have a Buffer for it). */ void XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blknum, Page page, uint8 flags) { registered_buffer *regbuf; Assert(begininsert_called); if (block_id >= max_registered_block_id) max_registered_block_id = block_id + 1; if (block_id >= max_registered_buffers) elog(ERROR, "too many registered buffers"); regbuf = ®istered_buffers[block_id]; regbuf->rlocator = *rlocator; regbuf->forkno = forknum; regbuf->block = blknum; regbuf->page = page; regbuf->flags = flags; regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head; regbuf->rdata_len = 0; /* * Check that this page hasn't already been registered with some other * block_id. */ #ifdef USE_ASSERT_CHECKING { int i; for (i = 0; i < max_registered_block_id; i++) { registered_buffer *regbuf_old = ®istered_buffers[i]; if (i == block_id || !regbuf_old->in_use) continue; Assert(!RelFileLocatorEquals(regbuf_old->rlocator, regbuf->rlocator) || regbuf_old->forkno != regbuf->forkno || regbuf_old->block != regbuf->block); } } #endif regbuf->in_use = true; } /* * Add data to the WAL record that's being constructed. * * The data is appended to the "main chunk", available at replay with * XLogRecGetData(). */ void XLogRegisterData(char *data, uint32 len) { XLogRecData *rdata; Assert(begininsert_called); if (num_rdatas >= max_rdatas) ereport(ERROR, (errmsg_internal("too much WAL data"), errdetail_internal("%d out of %d data segments are already in use.", num_rdatas, max_rdatas))); rdata = &rdatas[num_rdatas++]; rdata->data = data; rdata->len = len; /* * we use the mainrdata_last pointer to track the end of the chain, so no * need to clear 'next' here. */ mainrdata_last->next = rdata; mainrdata_last = rdata; mainrdata_len += len; } /* * Add buffer-specific data to the WAL record that's being constructed. * * Block_id must reference a block previously registered with * XLogRegisterBuffer(). If this is called more than once for the same * block_id, the data is appended. * * The maximum amount of data that can be registered per block is 65535 * bytes. That should be plenty; if you need more than BLCKSZ bytes to * reconstruct the changes to the page, you might as well just log a full * copy of it. (the "main data" that's not associated with a block is not * limited) */ void XLogRegisterBufData(uint8 block_id, char *data, uint32 len) { registered_buffer *regbuf; XLogRecData *rdata; Assert(begininsert_called); /* find the registered buffer struct */ regbuf = ®istered_buffers[block_id]; if (!regbuf->in_use) elog(ERROR, "no block with id %d registered with WAL insertion", block_id); /* * Check against max_rdatas and ensure we do not register more data per * buffer than can be handled by the physical data format; i.e. that * regbuf->rdata_len does not grow beyond what * XLogRecordBlockHeader->data_length can hold. */ if (num_rdatas >= max_rdatas) ereport(ERROR, (errmsg_internal("too much WAL data"), errdetail_internal("%d out of %d data segments are already in use.", num_rdatas, max_rdatas))); if (regbuf->rdata_len + len > UINT16_MAX || len > UINT16_MAX) ereport(ERROR, (errmsg_internal("too much WAL data"), errdetail_internal("Registering more than maximum %u bytes allowed to block %u: current %u bytes, adding %u bytes.", UINT16_MAX, block_id, regbuf->rdata_len, len))); rdata = &rdatas[num_rdatas++]; rdata->data = data; rdata->len = len; regbuf->rdata_tail->next = rdata; regbuf->rdata_tail = rdata; regbuf->rdata_len += len; } /* * Set insert status flags for the upcoming WAL record. * * The flags that can be used here are: * - XLOG_INCLUDE_ORIGIN, to determine if the replication origin should be * included in the record. * - XLOG_MARK_UNIMPORTANT, to signal that the record is not important for * durability, which allows to avoid triggering WAL archiving and other * background activity. */ void XLogSetRecordFlags(uint8 flags) { Assert(begininsert_called); curinsert_flags |= flags; } /* * Insert an XLOG record having the specified RMID and info bytes, with the * body of the record being the data and buffer references registered earlier * with XLogRegister* calls. * * Returns XLOG pointer to end of record (beginning of next record). * This can be used as LSN for data pages affected by the logged action. * (LSN is the XLOG point up to which the XLOG must be flushed to disk * before the data page can be written out. This implements the basic * WAL rule "write the log before the data".) */ XLogRecPtr XLogInsert(RmgrId rmid, uint8 info) { XLogRecPtr EndPos; /* XLogBeginInsert() must have been called. */ if (!begininsert_called) elog(ERROR, "XLogBeginInsert was not called"); /* * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me. */ if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE | XLR_CHECK_CONSISTENCY)) != 0) elog(PANIC, "invalid xlog info mask %02X", info); TRACE_POSTGRESQL_WAL_INSERT(rmid, info); /* * In bootstrap mode, we don't actually log anything but XLOG resources; * return a phony record pointer. */ if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) { XLogResetInsertion(); EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */ return EndPos; } do { XLogRecPtr RedoRecPtr; bool doPageWrites; bool topxid_included = false; XLogRecPtr fpw_lsn; XLogRecData *rdt; int num_fpi = 0; /* * Get values needed to decide whether to do full-page writes. Since * we don't yet have an insertion lock, these could change under us, * but XLogInsertRecord will recheck them once it has a lock. */ GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, &fpw_lsn, &num_fpi, &topxid_included); EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi, topxid_included); } while (EndPos == InvalidXLogRecPtr); XLogResetInsertion(); return EndPos; } /* * Assemble a WAL record from the registered data and buffers into an * XLogRecData chain, ready for insertion with XLogInsertRecord(). * * The record header fields are filled in, except for the xl_prev field. The * calculated CRC does not include the record header yet. * * If there are any registered buffers, and a full-page image was not taken * of all of them, *fpw_lsn is set to the lowest LSN among such pages. This * signals that the assembled record is only good for insertion on the * assumption that the RedoRecPtr and doPageWrites values were up-to-date. * * *topxid_included is set if the topmost transaction ID is logged with the * current subtransaction. */ static XLogRecData * XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included) { XLogRecData *rdt; uint64 total_len = 0; int block_id; pg_crc32c rdata_crc; registered_buffer *prev_regbuf = NULL; XLogRecData *rdt_datas_last; XLogRecord *rechdr; char *scratch = hdr_scratch; /* * Note: this function can be called multiple times for the same record. * All the modifications we do to the rdata chains below must handle that. */ /* The record begins with the fixed-size header */ rechdr = (XLogRecord *) scratch; scratch += SizeOfXLogRecord; hdr_rdt.next = NULL; rdt_datas_last = &hdr_rdt; hdr_rdt.data = hdr_scratch; /* * Enforce consistency checks for this record if user is looking for it. * Do this before at the beginning of this routine to give the possibility * for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for * a record. */ if (wal_consistency_checking[rmid]) info |= XLR_CHECK_CONSISTENCY; /* * Make an rdata chain containing all the data portions of all block * references. This includes the data for full-page images. Also append * the headers for the block references in the scratch buffer. */ *fpw_lsn = InvalidXLogRecPtr; for (block_id = 0; block_id < max_registered_block_id; block_id++) { registered_buffer *regbuf = ®istered_buffers[block_id]; bool needs_backup; bool needs_data; XLogRecordBlockHeader bkpb; XLogRecordBlockImageHeader bimg; XLogRecordBlockCompressHeader cbimg = {0}; bool samerel; bool is_compressed = false; bool include_image; if (!regbuf->in_use) continue; /* Determine if this block needs to be backed up */ if (regbuf->flags & REGBUF_FORCE_IMAGE) needs_backup = true; else if (regbuf->flags & REGBUF_NO_IMAGE) needs_backup = false; else if (!doPageWrites) needs_backup = false; else { /* * We assume page LSN is first data on *every* page that can be * passed to XLogInsert, whether it has the standard page layout * or not. */ XLogRecPtr page_lsn = PageGetLSN(regbuf->page); needs_backup = (page_lsn <= RedoRecPtr); if (!needs_backup) { if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn) *fpw_lsn = page_lsn; } } /* Determine if the buffer data needs to included */ if (regbuf->rdata_len == 0) needs_data = false; else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0) needs_data = true; else needs_data = !needs_backup; bkpb.id = block_id; bkpb.fork_flags = regbuf->forkno; bkpb.data_length = 0; if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT) bkpb.fork_flags |= BKPBLOCK_WILL_INIT; /* * If needs_backup is true or WAL checking is enabled for current * resource manager, log a full-page write for the current block. */ include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0; if (include_image) { Page page = regbuf->page; uint16 compressed_len = 0; /* * The page needs to be backed up, so calculate its hole length * and offset. */ if (regbuf->flags & REGBUF_STANDARD) { /* Assume we can omit data between pd_lower and pd_upper */ uint16 lower = ((PageHeader) page)->pd_lower; uint16 upper = ((PageHeader) page)->pd_upper; if (lower >= SizeOfPageHeaderData && upper > lower && upper <= BLCKSZ) { bimg.hole_offset = lower; cbimg.hole_length = upper - lower; } else { /* No "hole" to remove */ bimg.hole_offset = 0; cbimg.hole_length = 0; } } else { /* Not a standard page header, don't try to eliminate "hole" */ bimg.hole_offset = 0; cbimg.hole_length = 0; } /* * Try to compress a block image if wal_compression is enabled */ if (wal_compression != WAL_COMPRESSION_NONE) { is_compressed = XLogCompressBackupBlock(page, bimg.hole_offset, cbimg.hole_length, regbuf->compressed_page, &compressed_len); } /* * Fill in the remaining fields in the XLogRecordBlockHeader * struct */ bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE; /* Report a full page image constructed for the WAL record */ *num_fpi += 1; /* * Construct XLogRecData entries for the page content. */ rdt_datas_last->next = ®buf->bkp_rdatas[0]; rdt_datas_last = rdt_datas_last->next; bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE; /* * If WAL consistency checking is enabled for the resource manager * of this WAL record, a full-page image is included in the record * for the block modified. During redo, the full-page is replayed * only if BKPIMAGE_APPLY is set. */ if (needs_backup) bimg.bimg_info |= BKPIMAGE_APPLY; if (is_compressed) { /* The current compression is stored in the WAL record */ bimg.length = compressed_len; /* Set the compression method used for this block */ switch ((WalCompression) wal_compression) { case WAL_COMPRESSION_PGLZ: bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ; break; case WAL_COMPRESSION_LZ4: #ifdef USE_LZ4 bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4; #else elog(ERROR, "LZ4 is not supported by this build"); #endif break; case WAL_COMPRESSION_ZSTD: #ifdef USE_ZSTD bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD; #else elog(ERROR, "zstd is not supported by this build"); #endif break; case WAL_COMPRESSION_NONE: Assert(false); /* cannot happen */ break; /* no default case, so that compiler will warn */ } rdt_datas_last->data = regbuf->compressed_page; rdt_datas_last->len = compressed_len; } else { bimg.length = BLCKSZ - cbimg.hole_length; if (cbimg.hole_length == 0) { rdt_datas_last->data = page; rdt_datas_last->len = BLCKSZ; } else { /* must skip the hole */ rdt_datas_last->data = page; rdt_datas_last->len = bimg.hole_offset; rdt_datas_last->next = ®buf->bkp_rdatas[1]; rdt_datas_last = rdt_datas_last->next; rdt_datas_last->data = page + (bimg.hole_offset + cbimg.hole_length); rdt_datas_last->len = BLCKSZ - (bimg.hole_offset + cbimg.hole_length); } } total_len += bimg.length; } if (needs_data) { /* * When copying to XLogRecordBlockHeader, the length is narrowed * to an uint16. Double-check that it is still correct. */ Assert(regbuf->rdata_len <= UINT16_MAX); /* * Link the caller-supplied rdata chain for this buffer to the * overall list. */ bkpb.fork_flags |= BKPBLOCK_HAS_DATA; bkpb.data_length = (uint16) regbuf->rdata_len; total_len += regbuf->rdata_len; rdt_datas_last->next = regbuf->rdata_head; rdt_datas_last = regbuf->rdata_tail; } if (prev_regbuf && RelFileLocatorEquals(regbuf->rlocator, prev_regbuf->rlocator)) { samerel = true; bkpb.fork_flags |= BKPBLOCK_SAME_REL; } else samerel = false; prev_regbuf = regbuf; /* Ok, copy the header to the scratch buffer */ memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader); scratch += SizeOfXLogRecordBlockHeader; if (include_image) { memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader); scratch += SizeOfXLogRecordBlockImageHeader; if (cbimg.hole_length != 0 && is_compressed) { memcpy(scratch, &cbimg, SizeOfXLogRecordBlockCompressHeader); scratch += SizeOfXLogRecordBlockCompressHeader; } } if (!samerel) { memcpy(scratch, ®buf->rlocator, sizeof(RelFileLocator)); scratch += sizeof(RelFileLocator); } memcpy(scratch, ®buf->block, sizeof(BlockNumber)); scratch += sizeof(BlockNumber); } /* followed by the record's origin, if any */ if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) && replorigin_session_origin != InvalidRepOriginId) { *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN; memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin)); scratch += sizeof(replorigin_session_origin); } /* followed by toplevel XID, if not already included in previous record */ if (IsSubxactTopXidLogPending()) { TransactionId xid = GetTopTransactionIdIfAny(); /* Set the flag that the top xid is included in the WAL */ *topxid_included = true; *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID; memcpy(scratch, &xid, sizeof(TransactionId)); scratch += sizeof(TransactionId); } /* followed by main data, if any */ if (mainrdata_len > 0) { if (mainrdata_len > 255) { uint32 mainrdata_len_4b; if (mainrdata_len > PG_UINT32_MAX) ereport(ERROR, (errmsg_internal("too much WAL data"), errdetail_internal("Main data length is %llu bytes for a maximum of %u bytes.", (unsigned long long) mainrdata_len, PG_UINT32_MAX))); mainrdata_len_4b = (uint32) mainrdata_len; *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG; memcpy(scratch, &mainrdata_len_4b, sizeof(uint32)); scratch += sizeof(uint32); } else { *(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT; *(scratch++) = (uint8) mainrdata_len; } rdt_datas_last->next = mainrdata_head; rdt_datas_last = mainrdata_last; total_len += mainrdata_len; } rdt_datas_last->next = NULL; hdr_rdt.len = (scratch - hdr_scratch); total_len += hdr_rdt.len; /* * Calculate CRC of the data * * Note that the record header isn't added into the CRC initially since we * don't know the prev-link yet. Thus, the CRC will represent the CRC of * the whole record in the order: rdata, then backup blocks, then record * header. */ INIT_CRC32C(rdata_crc); COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord); for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next) COMP_CRC32C(rdata_crc, rdt->data, rdt->len); /* * Ensure that the XLogRecord is not too large. * * XLogReader machinery is only able to handle records up to a certain * size (ignoring machine resource limitations), so make sure that we will * not emit records larger than the sizes advertised to be supported. This * cap is based on DecodeXLogRecordRequiredSpace(). */ if (total_len > XLogRecordMaxSize) ereport(ERROR, (errmsg_internal("oversized WAL record"), errdetail_internal("WAL record would be %llu bytes (of maximum %u bytes); rmid %u flags %u.", (unsigned long long) total_len, XLogRecordMaxSize, rmid, info))); /* * Fill in the fields in the record header. Prev-link is filled in later, * once we know where in the WAL the record will be inserted. The CRC does * not include the record header yet. */ rechdr->xl_xid = GetCurrentTransactionIdIfAny(); rechdr->xl_tot_len = (uint32) total_len; rechdr->xl_info = info; rechdr->xl_rmid = rmid; rechdr->xl_prev = InvalidXLogRecPtr; rechdr->xl_crc = rdata_crc; return &hdr_rdt; } /* * Create a compressed version of a backup block image. * * Returns false if compression fails (i.e., compressed result is actually * bigger than original). Otherwise, returns true and sets 'dlen' to * the length of compressed block image. */ static bool XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length, char *dest, uint16 *dlen) { int32 orig_len = BLCKSZ - hole_length; int32 len = -1; int32 extra_bytes = 0; char *source; PGAlignedBlock tmp; if (hole_length != 0) { /* must skip the hole */ source = tmp.data; memcpy(source, page, hole_offset); memcpy(source + hole_offset, page + (hole_offset + hole_length), BLCKSZ - (hole_length + hole_offset)); /* * Extra data needs to be stored in WAL record for the compressed * version of block image if the hole exists. */ extra_bytes = SizeOfXLogRecordBlockCompressHeader; } else source = page; switch ((WalCompression) wal_compression) { case WAL_COMPRESSION_PGLZ: len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default); break; case WAL_COMPRESSION_LZ4: #ifdef USE_LZ4 len = LZ4_compress_default(source, dest, orig_len, COMPRESS_BUFSIZE); if (len <= 0) len = -1; /* failure */ #else elog(ERROR, "LZ4 is not supported by this build"); #endif break; case WAL_COMPRESSION_ZSTD: #ifdef USE_ZSTD len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len, ZSTD_CLEVEL_DEFAULT); if (ZSTD_isError(len)) len = -1; /* failure */ #else elog(ERROR, "zstd is not supported by this build"); #endif break; case WAL_COMPRESSION_NONE: Assert(false); /* cannot happen */ break; /* no default case, so that compiler will warn */ } /* * We recheck the actual size even if compression reports success and see * if the number of bytes saved by compression is larger than the length * of extra data needed for the compressed version of block image. */ if (len >= 0 && len + extra_bytes < orig_len) { *dlen = (uint16) len; /* successful compression */ return true; } return false; } /* * Determine whether the buffer referenced has to be backed up. * * Since we don't yet have the insert lock, fullPageWrites and runningBackups * (which forces full-page writes) could change later, so the result should * be used for optimization purposes only. */ bool XLogCheckBufferNeedsBackup(Buffer buffer) { XLogRecPtr RedoRecPtr; bool doPageWrites; Page page; GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); page = BufferGetPage(buffer); if (doPageWrites && PageGetLSN(page) <= RedoRecPtr) return true; /* buffer requires backup */ return false; /* buffer does not need to be backed up */ } /* * Write a backup block if needed when we are setting a hint. Note that * this may be called for a variety of page types, not just heaps. * * Callable while holding just share lock on the buffer content. * * We can't use the plain backup block mechanism since that relies on the * Buffer being exclusively locked. Since some modifications (setting LSN, hint * bits) are allowed in a sharelocked buffer that can lead to wal checksum * failures. So instead we copy the page and insert the copied data as normal * record data. * * We only need to do something if page has not yet been full page written in * this checkpoint round. The LSN of the inserted wal record is returned if we * had to write, InvalidXLogRecPtr otherwise. * * It is possible that multiple concurrent backends could attempt to write WAL * records. In that case, multiple copies of the same block would be recorded * in separate WAL records by different backends, though that is still OK from * a correctness perspective. */ XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std) { XLogRecPtr recptr = InvalidXLogRecPtr; XLogRecPtr lsn; XLogRecPtr RedoRecPtr; /* * Ensure no checkpoint can change our view of RedoRecPtr. */ Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) != 0); /* * Update RedoRecPtr so that we can make the right decision */ RedoRecPtr = GetRedoRecPtr(); /* * We assume page LSN is first data on *every* page that can be passed to * XLogInsert, whether it has the standard page layout or not. Since we're * only holding a share-lock on the page, we must take the buffer header * lock when we look at the LSN. */ lsn = BufferGetLSNAtomic(buffer); if (lsn <= RedoRecPtr) { int flags = 0; PGAlignedBlock copied_buffer; char *origdata = (char *) BufferGetBlock(buffer); RelFileLocator rlocator; ForkNumber forkno; BlockNumber blkno; /* * Copy buffer so we don't have to worry about concurrent hint bit or * lsn updates. We assume pd_lower/upper cannot be changed without an * exclusive lock, so the contents bkp are not racy. */ if (buffer_std) { /* Assume we can omit data between pd_lower and pd_upper */ Page page = BufferGetPage(buffer); uint16 lower = ((PageHeader) page)->pd_lower; uint16 upper = ((PageHeader) page)->pd_upper; memcpy(copied_buffer.data, origdata, lower); memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper); } else memcpy(copied_buffer.data, origdata, BLCKSZ); XLogBeginInsert(); if (buffer_std) flags |= REGBUF_STANDARD; BufferGetTag(buffer, &rlocator, &forkno, &blkno); XLogRegisterBlock(0, &rlocator, forkno, blkno, copied_buffer.data, flags); recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI_FOR_HINT); } return recptr; } /* * Write a WAL record containing a full image of a page. Caller is responsible * for writing the page to disk after calling this routine. * * Note: If you're using this function, you should be building pages in private * memory and writing them directly to smgr. If you're using buffers, call * log_newpage_buffer instead. * * If the page follows the standard page layout, with a PageHeader and unused * space between pd_lower and pd_upper, set 'page_std' to true. That allows * the unused space to be left out from the WAL record, making it smaller. */ XLogRecPtr log_newpage(RelFileLocator *rlocator, ForkNumber forknum, BlockNumber blkno, Page page, bool page_std) { int flags; XLogRecPtr recptr; flags = REGBUF_FORCE_IMAGE; if (page_std) flags |= REGBUF_STANDARD; XLogBeginInsert(); XLogRegisterBlock(0, rlocator, forknum, blkno, page, flags); recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI); /* * The page may be uninitialized. If so, we can't set the LSN because that * would corrupt the page. */ if (!PageIsNew(page)) { PageSetLSN(page, recptr); } return recptr; } /* * Like log_newpage(), but allows logging multiple pages in one operation. * It is more efficient than calling log_newpage() for each page separately, * because we can write multiple pages in a single WAL record. */ void log_newpages(RelFileLocator *rlocator, ForkNumber forknum, int num_pages, BlockNumber *blknos, Page *pages, bool page_std) { int flags; XLogRecPtr recptr; int i; int j; flags = REGBUF_FORCE_IMAGE; if (page_std) flags |= REGBUF_STANDARD; /* * Iterate over all the pages. They are collected into batches of * XLR_MAX_BLOCK_ID pages, and a single WAL-record is written for each * batch. */ XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0); i = 0; while (i < num_pages) { int batch_start = i; int nbatch; XLogBeginInsert(); nbatch = 0; while (nbatch < XLR_MAX_BLOCK_ID && i < num_pages) { XLogRegisterBlock(nbatch, rlocator, forknum, blknos[i], pages[i], flags); i++; nbatch++; } recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI); for (j = batch_start; j < i; j++) { /* * The page may be uninitialized. If so, we can't set the LSN * because that would corrupt the page. */ if (!PageIsNew(pages[j])) { PageSetLSN(pages[j], recptr); } } } } /* * Write a WAL record containing a full image of a page. * * Caller should initialize the buffer and mark it dirty before calling this * function. This function will set the page LSN. * * If the page follows the standard page layout, with a PageHeader and unused * space between pd_lower and pd_upper, set 'page_std' to true. That allows * the unused space to be left out from the WAL record, making it smaller. */ XLogRecPtr log_newpage_buffer(Buffer buffer, bool page_std) { Page page = BufferGetPage(buffer); RelFileLocator rlocator; ForkNumber forknum; BlockNumber blkno; /* Shared buffers should be modified in a critical section. */ Assert(CritSectionCount > 0); BufferGetTag(buffer, &rlocator, &forknum, &blkno); return log_newpage(&rlocator, forknum, blkno, page, page_std); } /* * WAL-log a range of blocks in a relation. * * An image of all pages with block numbers 'startblk' <= X < 'endblk' is * written to the WAL. If the range is large, this is done in multiple WAL * records. * * If all page follows the standard page layout, with a PageHeader and unused * space between pd_lower and pd_upper, set 'page_std' to true. That allows * the unused space to be left out from the WAL records, making them smaller. * * NOTE: This function acquires exclusive-locks on the pages. Typically, this * is used on a newly-built relation, and the caller is holding a * AccessExclusiveLock on it, so no other backend can be accessing it at the * same time. If that's not the case, you must ensure that this does not * cause a deadlock through some other means. */ void log_newpage_range(Relation rel, ForkNumber forknum, BlockNumber startblk, BlockNumber endblk, bool page_std) { int flags; BlockNumber blkno; flags = REGBUF_FORCE_IMAGE; if (page_std) flags |= REGBUF_STANDARD; /* * Iterate over all the pages in the range. They are collected into * batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written * for each batch. */ XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0); blkno = startblk; while (blkno < endblk) { Buffer bufpack[XLR_MAX_BLOCK_ID]; XLogRecPtr recptr; int nbufs; int i; CHECK_FOR_INTERRUPTS(); /* Collect a batch of blocks. */ nbufs = 0; while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk) { Buffer buf = ReadBufferExtended(rel, forknum, blkno, RBM_NORMAL, NULL); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); /* * Completely empty pages are not WAL-logged. Writing a WAL record * would change the LSN, and we don't want that. We want the page * to stay empty. */ if (!PageIsNew(BufferGetPage(buf))) bufpack[nbufs++] = buf; else UnlockReleaseBuffer(buf); blkno++; } /* Nothing more to do if all remaining blocks were empty. */ if (nbufs == 0) break; /* Write WAL record for this batch. */ XLogBeginInsert(); START_CRIT_SECTION(); for (i = 0; i < nbufs; i++) { XLogRegisterBuffer(i, bufpack[i], flags); MarkBufferDirty(bufpack[i]); } recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI); for (i = 0; i < nbufs; i++) { PageSetLSN(BufferGetPage(bufpack[i]), recptr); UnlockReleaseBuffer(bufpack[i]); } END_CRIT_SECTION(); } } /* * Allocate working buffers needed for WAL record construction. */ void InitXLogInsert(void) { #ifdef USE_ASSERT_CHECKING /* * Check that any records assembled can be decoded. This is capped based * on what XLogReader would require at its maximum bound. This code path * is called once per backend, more than enough for this check. */ size_t max_required = DecodeXLogRecordRequiredSpace(XLogRecordMaxSize); Assert(AllocSizeIsValid(max_required)); #endif /* Initialize the working areas */ if (xloginsert_cxt == NULL) { xloginsert_cxt = AllocSetContextCreate(TopMemoryContext, "WAL record construction", ALLOCSET_DEFAULT_SIZES); } if (registered_buffers == NULL) { registered_buffers = (registered_buffer *) MemoryContextAllocZero(xloginsert_cxt, sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1)); max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1; } if (rdatas == NULL) { rdatas = MemoryContextAlloc(xloginsert_cxt, sizeof(XLogRecData) * XLR_NORMAL_RDATAS); max_rdatas = XLR_NORMAL_RDATAS; } /* * Allocate a buffer to hold the header information for a WAL record. */ if (hdr_scratch == NULL) hdr_scratch = MemoryContextAllocZero(xloginsert_cxt, HEADER_SCRATCH_SIZE); }