diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
commit | 46651ce6fe013220ed397add242004d764fc0153 (patch) | |
tree | 6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/replication/logical/reorderbuffer.c | |
parent | Initial commit. (diff) | |
download | postgresql-14-upstream.tar.xz postgresql-14-upstream.zip |
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 5156 |
1 files changed, 5156 insertions, 0 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c new file mode 100644 index 0000000..e59d139 --- /dev/null +++ b/src/backend/replication/logical/reorderbuffer.c @@ -0,0 +1,5156 @@ +/*------------------------------------------------------------------------- + * + * reorderbuffer.c + * PostgreSQL logical replay/reorder buffer management + * + * + * Copyright (c) 2012-2021, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/replication/reorderbuffer.c + * + * NOTES + * This module gets handed individual pieces of transactions in the order + * they are written to the WAL and is responsible to reassemble them into + * toplevel transaction sized pieces. When a transaction is completely + * reassembled - signaled by reading the transaction commit record - it + * will then call the output plugin (cf. ReorderBufferCommit()) with the + * individual changes. The output plugins rely on snapshots built by + * snapbuild.c which hands them to us. + * + * Transactions and subtransactions/savepoints in postgres are not + * immediately linked to each other from outside the performing + * backend. Only at commit/abort (or special xact_assignment records) they + * are linked together. Which means that we will have to splice together a + * toplevel transaction from its subtransactions. To do that efficiently we + * build a binary heap indexed by the smallest current lsn of the individual + * subtransactions' changestreams. As the individual streams are inherently + * ordered by LSN - since that is where we build them from - the transaction + * can easily be reassembled by always using the subtransaction with the + * smallest current LSN from the heap. + * + * In order to cope with large transactions - which can be several times as + * big as the available memory - this module supports spooling the contents + * of a large transactions to disk. When the transaction is replayed the + * contents of individual (sub-)transactions will be read from disk in + * chunks. + * + * This module also has to deal with reassembling toast records from the + * individual chunks stored in WAL. When a new (or initial) version of a + * tuple is stored in WAL it will always be preceded by the toast chunks + * emitted for the columns stored out of line. Within a single toplevel + * transaction there will be no other data carrying records between a row's + * toast chunks and the row data itself. See ReorderBufferToast* for + * details. + * + * ReorderBuffer uses two special memory context types - SlabContext for + * allocations of fixed-length structures (changes and transactions), and + * GenerationContext for the variable-length transaction data (allocated + * and freed in groups with similar lifespans). + * + * To limit the amount of memory used by decoded changes, we track memory + * used at the reorder buffer level (i.e. total amount of memory), and for + * each transaction. When the total amount of used memory exceeds the + * limit, the transaction consuming the most memory is then serialized to + * disk. + * + * Only decoded changes are evicted from memory (spilled to disk), not the + * transaction records. The number of toplevel transactions is limited, + * but a transaction with many subtransactions may still consume significant + * amounts of memory. However, the transaction records are fairly small and + * are not included in the memory limit. + * + * The current eviction algorithm is very simple - the transaction is + * picked merely by size, while it might be useful to also consider age + * (LSN) of the changes for example. With the new Generational memory + * allocator, evicting the oldest changes would make it more likely the + * memory gets actually freed. + * + * We still rely on max_changes_in_memory when loading serialized changes + * back into memory. At that point we can't use the memory limit directly + * as we load the subxacts independently. One option to deal with this + * would be to count the subxacts, and allow each to allocate 1/N of the + * memory limit. That however does not seem very appealing, because with + * many subtransactions it may easily cause thrashing (short cycles of + * deserializing and applying very few changes). We probably should give + * a bit more memory to the oldest subtransactions, because it's likely + * they are the source for the next sequence of changes. + * + * ------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <unistd.h> +#include <sys/stat.h> + +#include "access/detoast.h" +#include "access/heapam.h" +#include "access/rewriteheap.h" +#include "access/transam.h" +#include "access/xact.h" +#include "access/xlog_internal.h" +#include "catalog/catalog.h" +#include "lib/binaryheap.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "replication/logical.h" +#include "replication/reorderbuffer.h" +#include "replication/slot.h" +#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ +#include "storage/bufmgr.h" +#include "storage/fd.h" +#include "storage/sinval.h" +#include "utils/builtins.h" +#include "utils/combocid.h" +#include "utils/memdebug.h" +#include "utils/memutils.h" +#include "utils/rel.h" +#include "utils/relfilenodemap.h" + + +/* entry for a hash table we use to map from xid to our transaction state */ +typedef struct ReorderBufferTXNByIdEnt +{ + TransactionId xid; + ReorderBufferTXN *txn; +} ReorderBufferTXNByIdEnt; + +/* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */ +typedef struct ReorderBufferTupleCidKey +{ + RelFileNode relnode; + ItemPointerData tid; +} ReorderBufferTupleCidKey; + +typedef struct ReorderBufferTupleCidEnt +{ + ReorderBufferTupleCidKey key; + CommandId cmin; + CommandId cmax; + CommandId combocid; /* just for debugging */ +} ReorderBufferTupleCidEnt; + +/* Virtual file descriptor with file offset tracking */ +typedef struct TXNEntryFile +{ + File vfd; /* -1 when the file is closed */ + off_t curOffset; /* offset for next write or read. Reset to 0 + * when vfd is opened. */ +} TXNEntryFile; + +/* k-way in-order change iteration support structures */ +typedef struct ReorderBufferIterTXNEntry +{ + XLogRecPtr lsn; + ReorderBufferChange *change; + ReorderBufferTXN *txn; + TXNEntryFile file; + XLogSegNo segno; +} ReorderBufferIterTXNEntry; + +typedef struct ReorderBufferIterTXNState +{ + binaryheap *heap; + Size nr_txns; + dlist_head old_change; + ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]; +} ReorderBufferIterTXNState; + +/* toast datastructures */ +typedef struct ReorderBufferToastEnt +{ + Oid chunk_id; /* toast_table.chunk_id */ + int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we + * have seen */ + Size num_chunks; /* number of chunks we've already seen */ + Size size; /* combined size of chunks seen */ + dlist_head chunks; /* linked list of chunks */ + struct varlena *reconstructed; /* reconstructed varlena now pointed to in + * main tup */ +} ReorderBufferToastEnt; + +/* Disk serialization support datastructures */ +typedef struct ReorderBufferDiskChange +{ + Size size; + ReorderBufferChange change; + /* data follows */ +} ReorderBufferDiskChange; + +#define IsSpecInsert(action) \ +( \ + ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \ +) +#define IsSpecConfirmOrAbort(action) \ +( \ + (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \ + ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \ +) +#define IsInsertOrUpdate(action) \ +( \ + (((action) == REORDER_BUFFER_CHANGE_INSERT) || \ + ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \ + ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \ +) + +/* + * Maximum number of changes kept in memory, per transaction. After that, + * changes are spooled to disk. + * + * The current value should be sufficient to decode the entire transaction + * without hitting disk in OLTP workloads, while starting to spool to disk in + * other workloads reasonably fast. + * + * At some point in the future it probably makes sense to have a more elaborate + * resource management here, but it's not entirely clear what that would look + * like. + */ +int logical_decoding_work_mem; +static const Size max_changes_in_memory = 4096; /* XXX for restore only */ + +/* --------------------------------------- + * primary reorderbuffer support routines + * --------------------------------------- + */ +static ReorderBufferTXN *ReorderBufferGetTXN(ReorderBuffer *rb); +static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); +static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb, + TransactionId xid, bool create, bool *is_new, + XLogRecPtr lsn, bool create_as_top); +static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, + ReorderBufferTXN *subtxn); + +static void AssertTXNLsnOrder(ReorderBuffer *rb); + +/* --------------------------------------- + * support functions for lsn-order iterating over the ->changes of a + * transaction and its subtransactions + * + * used for iteration over the k-way heap merge of a transaction and its + * subtransactions + * --------------------------------------- + */ +static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state); +static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state); +static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, + ReorderBufferIterTXNState *state); +static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs); + +/* + * --------------------------------------- + * Disk serialization support functions + * --------------------------------------- + */ +static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb); +static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, + int fd, ReorderBufferChange *change); +static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, + TXNEntryFile *file, XLogSegNo *segno); +static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, + char *change); +static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, + bool txn_prepared); +static void ReorderBufferCleanupSerializedTXNs(const char *slotname); +static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, + TransactionId xid, XLogSegNo segno); + +static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); +static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, + ReorderBufferTXN *txn, CommandId cid); + +/* + * --------------------------------------- + * Streaming support functions + * --------------------------------------- + */ +static inline bool ReorderBufferCanStream(ReorderBuffer *rb); +static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb); +static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn); + +/* --------------------------------------- + * toast reassembly support + * --------------------------------------- + */ +static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn); +static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change); +static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change); + +/* + * --------------------------------------- + * memory accounting + * --------------------------------------- + */ +static Size ReorderBufferChangeSize(ReorderBufferChange *change); +static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, + ReorderBufferChange *change, + bool addition, Size sz); + +/* + * Allocate a new ReorderBuffer and clean out any old serialized state from + * prior ReorderBuffer instances for the same slot. + */ +ReorderBuffer * +ReorderBufferAllocate(void) +{ + ReorderBuffer *buffer; + HASHCTL hash_ctl; + MemoryContext new_ctx; + + Assert(MyReplicationSlot != NULL); + + /* allocate memory in own context, to have better accountability */ + new_ctx = AllocSetContextCreate(CurrentMemoryContext, + "ReorderBuffer", + ALLOCSET_DEFAULT_SIZES); + + buffer = + (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer)); + + memset(&hash_ctl, 0, sizeof(hash_ctl)); + + buffer->context = new_ctx; + + buffer->change_context = SlabContextCreate(new_ctx, + "Change", + SLAB_DEFAULT_BLOCK_SIZE, + sizeof(ReorderBufferChange)); + + buffer->txn_context = SlabContextCreate(new_ctx, + "TXN", + SLAB_DEFAULT_BLOCK_SIZE, + sizeof(ReorderBufferTXN)); + + buffer->tup_context = GenerationContextCreate(new_ctx, + "Tuples", + SLAB_LARGE_BLOCK_SIZE); + + hash_ctl.keysize = sizeof(TransactionId); + hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt); + hash_ctl.hcxt = buffer->context; + + buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + buffer->by_txn_last_xid = InvalidTransactionId; + buffer->by_txn_last_txn = NULL; + + buffer->outbuf = NULL; + buffer->outbufsize = 0; + buffer->size = 0; + + buffer->spillTxns = 0; + buffer->spillCount = 0; + buffer->spillBytes = 0; + buffer->streamTxns = 0; + buffer->streamCount = 0; + buffer->streamBytes = 0; + buffer->totalTxns = 0; + buffer->totalBytes = 0; + + buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; + + dlist_init(&buffer->toplevel_by_lsn); + dlist_init(&buffer->txns_by_base_snapshot_lsn); + + /* + * Ensure there's no stale data from prior uses of this slot, in case some + * prior exit avoided calling ReorderBufferFree. Failure to do this can + * produce duplicated txns, and it's very cheap if there's nothing there. + */ + ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name)); + + return buffer; +} + +/* + * Free a ReorderBuffer + */ +void +ReorderBufferFree(ReorderBuffer *rb) +{ + MemoryContext context = rb->context; + + /* + * We free separately allocated data by entirely scrapping reorderbuffer's + * memory context. + */ + MemoryContextDelete(context); + + /* Free disk space used by unconsumed reorder buffers */ + ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name)); +} + +/* + * Get an unused, possibly preallocated, ReorderBufferTXN. + */ +static ReorderBufferTXN * +ReorderBufferGetTXN(ReorderBuffer *rb) +{ + ReorderBufferTXN *txn; + + txn = (ReorderBufferTXN *) + MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN)); + + memset(txn, 0, sizeof(ReorderBufferTXN)); + + dlist_init(&txn->changes); + dlist_init(&txn->tuplecids); + dlist_init(&txn->subtxns); + + /* InvalidCommandId is not zero, so set it explicitly */ + txn->command_id = InvalidCommandId; + txn->output_plugin_private = NULL; + + return txn; +} + +/* + * Free a ReorderBufferTXN. + */ +static void +ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + /* clean the lookup cache if we were cached (quite likely) */ + if (rb->by_txn_last_xid == txn->xid) + { + rb->by_txn_last_xid = InvalidTransactionId; + rb->by_txn_last_txn = NULL; + } + + /* free data that's contained */ + + if (txn->gid != NULL) + { + pfree(txn->gid); + txn->gid = NULL; + } + + if (txn->tuplecid_hash != NULL) + { + hash_destroy(txn->tuplecid_hash); + txn->tuplecid_hash = NULL; + } + + if (txn->invalidations) + { + pfree(txn->invalidations); + txn->invalidations = NULL; + } + + /* Reset the toast hash */ + ReorderBufferToastReset(rb, txn); + + pfree(txn); +} + +/* + * Get a fresh ReorderBufferChange. + */ +ReorderBufferChange * +ReorderBufferGetChange(ReorderBuffer *rb) +{ + ReorderBufferChange *change; + + change = (ReorderBufferChange *) + MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange)); + + memset(change, 0, sizeof(ReorderBufferChange)); + return change; +} + +/* + * Free a ReorderBufferChange and update memory accounting, if requested. + */ +void +ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, + bool upd_mem) +{ + /* update memory accounting info */ + if (upd_mem) + ReorderBufferChangeMemoryUpdate(rb, change, false, + ReorderBufferChangeSize(change)); + + /* free contained data */ + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: + if (change->data.tp.newtuple) + { + ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple); + change->data.tp.newtuple = NULL; + } + + if (change->data.tp.oldtuple) + { + ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple); + change->data.tp.oldtuple = NULL; + } + break; + case REORDER_BUFFER_CHANGE_MESSAGE: + if (change->data.msg.prefix != NULL) + pfree(change->data.msg.prefix); + change->data.msg.prefix = NULL; + if (change->data.msg.message != NULL) + pfree(change->data.msg.message); + change->data.msg.message = NULL; + break; + case REORDER_BUFFER_CHANGE_INVALIDATION: + if (change->data.inval.invalidations) + pfree(change->data.inval.invalidations); + change->data.inval.invalidations = NULL; + break; + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: + if (change->data.snapshot) + { + ReorderBufferFreeSnap(rb, change->data.snapshot); + change->data.snapshot = NULL; + } + break; + /* no data in addition to the struct itself */ + case REORDER_BUFFER_CHANGE_TRUNCATE: + if (change->data.truncate.relids != NULL) + { + ReorderBufferReturnRelids(rb, change->data.truncate.relids); + change->data.truncate.relids = NULL; + } + break; + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: + case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + break; + } + + pfree(change); +} + +/* + * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size + * tuple_len (excluding header overhead). + */ +ReorderBufferTupleBuf * +ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len) +{ + ReorderBufferTupleBuf *tuple; + Size alloc_len; + + alloc_len = tuple_len + SizeofHeapTupleHeader; + + tuple = (ReorderBufferTupleBuf *) + MemoryContextAlloc(rb->tup_context, + sizeof(ReorderBufferTupleBuf) + + MAXIMUM_ALIGNOF + alloc_len); + tuple->alloc_tuple_size = alloc_len; + tuple->tuple.t_data = ReorderBufferTupleBufData(tuple); + + return tuple; +} + +/* + * Free a ReorderBufferTupleBuf. + */ +void +ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple) +{ + pfree(tuple); +} + +/* + * Get an array for relids of truncated relations. + * + * We use the global memory context (for the whole reorder buffer), because + * none of the existing ones seems like a good match (some are SLAB, so we + * can't use those, and tup_context is meant for tuple data, not relids). We + * could add yet another context, but it seems like an overkill - TRUNCATE is + * not particularly common operation, so it does not seem worth it. + */ +Oid * +ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids) +{ + Oid *relids; + Size alloc_len; + + alloc_len = sizeof(Oid) * nrelids; + + relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len); + + return relids; +} + +/* + * Free an array of relids. + */ +void +ReorderBufferReturnRelids(ReorderBuffer *rb, Oid *relids) +{ + pfree(relids); +} + +/* + * Return the ReorderBufferTXN from the given buffer, specified by Xid. + * If create is true, and a transaction doesn't already exist, create it + * (with the given LSN, and as top transaction if that's specified); + * when this happens, is_new is set to true. + */ +static ReorderBufferTXN * +ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, + bool *is_new, XLogRecPtr lsn, bool create_as_top) +{ + ReorderBufferTXN *txn; + ReorderBufferTXNByIdEnt *ent; + bool found; + + Assert(TransactionIdIsValid(xid)); + + /* + * Check the one-entry lookup cache first + */ + if (TransactionIdIsValid(rb->by_txn_last_xid) && + rb->by_txn_last_xid == xid) + { + txn = rb->by_txn_last_txn; + + if (txn != NULL) + { + /* found it, and it's valid */ + if (is_new) + *is_new = false; + return txn; + } + + /* + * cached as non-existent, and asked not to create? Then nothing else + * to do. + */ + if (!create) + return NULL; + /* otherwise fall through to create it */ + } + + /* + * If the cache wasn't hit or it yielded a "does-not-exist" and we want + * to create an entry. + */ + + /* search the lookup table */ + ent = (ReorderBufferTXNByIdEnt *) + hash_search(rb->by_txn, + (void *) &xid, + create ? HASH_ENTER : HASH_FIND, + &found); + if (found) + txn = ent->txn; + else if (create) + { + /* initialize the new entry, if creation was requested */ + Assert(ent != NULL); + Assert(lsn != InvalidXLogRecPtr); + + ent->txn = ReorderBufferGetTXN(rb); + ent->txn->xid = xid; + txn = ent->txn; + txn->first_lsn = lsn; + txn->restart_decoding_lsn = rb->current_restart_decoding_lsn; + + if (create_as_top) + { + dlist_push_tail(&rb->toplevel_by_lsn, &txn->node); + AssertTXNLsnOrder(rb); + } + } + else + txn = NULL; /* not found and not asked to create */ + + /* update cache */ + rb->by_txn_last_xid = xid; + rb->by_txn_last_txn = txn; + + if (is_new) + *is_new = !found; + + Assert(!create || txn != NULL); + return txn; +} + +/* + * Record the partial change for the streaming of in-progress transactions. We + * can stream only complete changes so if we have a partial change like toast + * table insert or speculative insert then we mark such a 'txn' so that it + * can't be streamed. We also ensure that if the changes in such a 'txn' are + * above logical_decoding_work_mem threshold then we stream them as soon as we + * have a complete change. + */ +static void +ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferChange *change, + bool toast_insert) +{ + ReorderBufferTXN *toptxn; + + /* + * The partial changes need to be processed only while streaming + * in-progress transactions. + */ + if (!ReorderBufferCanStream(rb)) + return; + + /* Get the top transaction. */ + if (txn->toptxn != NULL) + toptxn = txn->toptxn; + else + toptxn = txn; + + /* + * Indicate a partial change for toast inserts. The change will be + * considered as complete once we get the insert or update on the main + * table and we are sure that the pending toast chunks are not required + * anymore. + * + * If we allow streaming when there are pending toast chunks then such + * chunks won't be released till the insert (multi_insert) is complete and + * we expect the txn to have streamed all changes after streaming. This + * restriction is mainly to ensure the correctness of streamed + * transactions and it doesn't seem worth uplifting such a restriction + * just to allow this case because anyway we will stream the transaction + * once such an insert is complete. + */ + if (toast_insert) + toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE; + else if (rbtxn_has_partial_change(toptxn) && + IsInsertOrUpdate(change->action) && + change->data.tp.clear_toast_afterwards) + toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE; + + /* + * Indicate a partial change for speculative inserts. The change will be + * considered as complete once we get the speculative confirm or abort + * token. + */ + if (IsSpecInsert(change->action)) + toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE; + else if (rbtxn_has_partial_change(toptxn) && + IsSpecConfirmOrAbort(change->action)) + toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE; + + /* + * Stream the transaction if it is serialized before and the changes are + * now complete in the top-level transaction. + * + * The reason for doing the streaming of such a transaction as soon as we + * get the complete change for it is that previously it would have reached + * the memory threshold and wouldn't get streamed because of incomplete + * changes. Delaying such transactions would increase apply lag for them. + */ + if (ReorderBufferCanStartStreaming(rb) && + !(rbtxn_has_partial_change(toptxn)) && + rbtxn_is_serialized(txn)) + ReorderBufferStreamTXN(rb, toptxn); +} + +/* + * Queue a change into a transaction so it can be replayed upon commit or will be + * streamed when we reach logical_decoding_work_mem threshold. + */ +void +ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, + ReorderBufferChange *change, bool toast_insert) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + /* + * While streaming the previous changes we have detected that the + * transaction is aborted. So there is no point in collecting further + * changes for it. + */ + if (txn->concurrent_abort) + { + /* + * We don't need to update memory accounting for this change as we + * have not added it to the queue yet. + */ + ReorderBufferReturnChange(rb, change, false); + return; + } + + change->lsn = lsn; + change->txn = txn; + + Assert(InvalidXLogRecPtr != lsn); + dlist_push_tail(&txn->changes, &change->node); + txn->nentries++; + txn->nentries_mem++; + + /* update memory accounting information */ + ReorderBufferChangeMemoryUpdate(rb, change, true, + ReorderBufferChangeSize(change)); + + /* process partial change */ + ReorderBufferProcessPartialChange(rb, txn, change, toast_insert); + + /* check the memory limits and evict something if needed */ + ReorderBufferCheckMemoryLimit(rb); +} + +/* + * A transactional message is queued to be processed upon commit and a + * non-transactional message gets processed immediately. + */ +void +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, + Snapshot snapshot, XLogRecPtr lsn, + bool transactional, const char *prefix, + Size message_size, const char *message) +{ + if (transactional) + { + MemoryContext oldcontext; + ReorderBufferChange *change; + + Assert(xid != InvalidTransactionId); + + oldcontext = MemoryContextSwitchTo(rb->context); + + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_MESSAGE; + change->data.msg.prefix = pstrdup(prefix); + change->data.msg.message_size = message_size; + change->data.msg.message = palloc(message_size); + memcpy(change->data.msg.message, message, message_size); + + ReorderBufferQueueChange(rb, xid, lsn, change, false); + + MemoryContextSwitchTo(oldcontext); + } + else + { + ReorderBufferTXN *txn = NULL; + volatile Snapshot snapshot_now = snapshot; + + if (xid != InvalidTransactionId) + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + /* setup snapshot to allow catalog access */ + SetupHistoricSnapshot(snapshot_now, NULL); + PG_TRY(); + { + rb->message(rb, txn, lsn, false, prefix, message_size, message); + + TeardownHistoricSnapshot(false); + } + PG_CATCH(); + { + TeardownHistoricSnapshot(true); + PG_RE_THROW(); + } + PG_END_TRY(); + } +} + +/* + * AssertTXNLsnOrder + * Verify LSN ordering of transaction lists in the reorderbuffer + * + * Other LSN-related invariants are checked too. + * + * No-op if assertions are not in use. + */ +static void +AssertTXNLsnOrder(ReorderBuffer *rb) +{ +#ifdef USE_ASSERT_CHECKING + dlist_iter iter; + XLogRecPtr prev_first_lsn = InvalidXLogRecPtr; + XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr; + + dlist_foreach(iter, &rb->toplevel_by_lsn) + { + ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node, + iter.cur); + + /* start LSN must be set */ + Assert(cur_txn->first_lsn != InvalidXLogRecPtr); + + /* If there is an end LSN, it must be higher than start LSN */ + if (cur_txn->end_lsn != InvalidXLogRecPtr) + Assert(cur_txn->first_lsn <= cur_txn->end_lsn); + + /* Current initial LSN must be strictly higher than previous */ + if (prev_first_lsn != InvalidXLogRecPtr) + Assert(prev_first_lsn < cur_txn->first_lsn); + + /* known-as-subtxn txns must not be listed */ + Assert(!rbtxn_is_known_subxact(cur_txn)); + + prev_first_lsn = cur_txn->first_lsn; + } + + dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn) + { + ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, + base_snapshot_node, + iter.cur); + + /* base snapshot (and its LSN) must be set */ + Assert(cur_txn->base_snapshot != NULL); + Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr); + + /* current LSN must be strictly higher than previous */ + if (prev_base_snap_lsn != InvalidXLogRecPtr) + Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn); + + /* known-as-subtxn txns must not be listed */ + Assert(!rbtxn_is_known_subxact(cur_txn)); + + prev_base_snap_lsn = cur_txn->base_snapshot_lsn; + } +#endif +} + +/* + * AssertChangeLsnOrder + * + * Check ordering of changes in the (sub)transaction. + */ +static void +AssertChangeLsnOrder(ReorderBufferTXN *txn) +{ +#ifdef USE_ASSERT_CHECKING + dlist_iter iter; + XLogRecPtr prev_lsn = txn->first_lsn; + + dlist_foreach(iter, &txn->changes) + { + ReorderBufferChange *cur_change; + + cur_change = dlist_container(ReorderBufferChange, node, iter.cur); + + Assert(txn->first_lsn != InvalidXLogRecPtr); + Assert(cur_change->lsn != InvalidXLogRecPtr); + Assert(txn->first_lsn <= cur_change->lsn); + + if (txn->end_lsn != InvalidXLogRecPtr) + Assert(cur_change->lsn <= txn->end_lsn); + + Assert(prev_lsn <= cur_change->lsn); + + prev_lsn = cur_change->lsn; + } +#endif +} + +/* + * ReorderBufferGetOldestTXN + * Return oldest transaction in reorderbuffer + */ +ReorderBufferTXN * +ReorderBufferGetOldestTXN(ReorderBuffer *rb) +{ + ReorderBufferTXN *txn; + + AssertTXNLsnOrder(rb); + + if (dlist_is_empty(&rb->toplevel_by_lsn)) + return NULL; + + txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn); + + Assert(!rbtxn_is_known_subxact(txn)); + Assert(txn->first_lsn != InvalidXLogRecPtr); + return txn; +} + +/* + * ReorderBufferGetOldestXmin + * Return oldest Xmin in reorderbuffer + * + * Returns oldest possibly running Xid from the point of view of snapshots + * used in the transactions kept by reorderbuffer, or InvalidTransactionId if + * there are none. + * + * Since snapshots are assigned monotonically, this equals the Xmin of the + * base snapshot with minimal base_snapshot_lsn. + */ +TransactionId +ReorderBufferGetOldestXmin(ReorderBuffer *rb) +{ + ReorderBufferTXN *txn; + + AssertTXNLsnOrder(rb); + + if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn)) + return InvalidTransactionId; + + txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node, + &rb->txns_by_base_snapshot_lsn); + return txn->base_snapshot->xmin; +} + +void +ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr) +{ + rb->current_restart_decoding_lsn = ptr; +} + +/* + * ReorderBufferAssignChild + * + * Make note that we know that subxid is a subtransaction of xid, seen as of + * the given lsn. + */ +void +ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, + TransactionId subxid, XLogRecPtr lsn) +{ + ReorderBufferTXN *txn; + ReorderBufferTXN *subtxn; + bool new_top; + bool new_sub; + + txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true); + subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false); + + if (!new_sub) + { + if (rbtxn_is_known_subxact(subtxn)) + { + /* already associated, nothing to do */ + return; + } + else + { + /* + * We already saw this transaction, but initially added it to the + * list of top-level txns. Now that we know it's not top-level, + * remove it from there. + */ + dlist_delete(&subtxn->node); + } + } + + subtxn->txn_flags |= RBTXN_IS_SUBXACT; + subtxn->toplevel_xid = xid; + Assert(subtxn->nsubtxns == 0); + + /* set the reference to top-level transaction */ + subtxn->toptxn = txn; + + /* add to subtransaction list */ + dlist_push_tail(&txn->subtxns, &subtxn->node); + txn->nsubtxns++; + + /* Possibly transfer the subtxn's snapshot to its top-level txn. */ + ReorderBufferTransferSnapToParent(txn, subtxn); + + /* Verify LSN-ordering invariant */ + AssertTXNLsnOrder(rb); +} + +/* + * ReorderBufferTransferSnapToParent + * Transfer base snapshot from subtxn to top-level txn, if needed + * + * This is done if the top-level txn doesn't have a base snapshot, or if the + * subtxn's base snapshot has an earlier LSN than the top-level txn's base + * snapshot's LSN. This can happen if there are no changes in the toplevel + * txn but there are some in the subtxn, or the first change in subtxn has + * earlier LSN than first change in the top-level txn and we learned about + * their kinship only now. + * + * The subtransaction's snapshot is cleared regardless of the transfer + * happening, since it's not needed anymore in either case. + * + * We do this as soon as we become aware of their kinship, to avoid queueing + * extra snapshots to txns known-as-subtxns -- only top-level txns will + * receive further snapshots. + */ +static void +ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, + ReorderBufferTXN *subtxn) +{ + Assert(subtxn->toplevel_xid == txn->xid); + + if (subtxn->base_snapshot != NULL) + { + if (txn->base_snapshot == NULL || + subtxn->base_snapshot_lsn < txn->base_snapshot_lsn) + { + /* + * If the toplevel transaction already has a base snapshot but + * it's newer than the subxact's, purge it. + */ + if (txn->base_snapshot != NULL) + { + SnapBuildSnapDecRefcount(txn->base_snapshot); + dlist_delete(&txn->base_snapshot_node); + } + + /* + * The snapshot is now the top transaction's; transfer it, and + * adjust the list position of the top transaction in the list by + * moving it to where the subtransaction is. + */ + txn->base_snapshot = subtxn->base_snapshot; + txn->base_snapshot_lsn = subtxn->base_snapshot_lsn; + dlist_insert_before(&subtxn->base_snapshot_node, + &txn->base_snapshot_node); + + /* + * The subtransaction doesn't have a snapshot anymore (so it + * mustn't be in the list.) + */ + subtxn->base_snapshot = NULL; + subtxn->base_snapshot_lsn = InvalidXLogRecPtr; + dlist_delete(&subtxn->base_snapshot_node); + } + else + { + /* Base snap of toplevel is fine, so subxact's is not needed */ + SnapBuildSnapDecRefcount(subtxn->base_snapshot); + dlist_delete(&subtxn->base_snapshot_node); + subtxn->base_snapshot = NULL; + subtxn->base_snapshot_lsn = InvalidXLogRecPtr; + } + } +} + +/* + * Associate a subtransaction with its toplevel transaction at commit + * time. There may be no further changes added after this. + */ +void +ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, + TransactionId subxid, XLogRecPtr commit_lsn, + XLogRecPtr end_lsn) +{ + ReorderBufferTXN *subtxn; + + subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL, + InvalidXLogRecPtr, false); + + /* + * No need to do anything if that subtxn didn't contain any changes + */ + if (!subtxn) + return; + + subtxn->final_lsn = commit_lsn; + subtxn->end_lsn = end_lsn; + + /* + * Assign this subxact as a child of the toplevel xact (no-op if already + * done.) + */ + ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr); +} + + +/* + * Support for efficiently iterating over a transaction's and its + * subtransactions' changes. + * + * We do by doing a k-way merge between transactions/subtransactions. For that + * we model the current heads of the different transactions as a binary heap + * so we easily know which (sub-)transaction has the change with the smallest + * lsn next. + * + * We assume the changes in individual transactions are already sorted by LSN. + */ + +/* + * Binary heap comparison function. + */ +static int +ReorderBufferIterCompare(Datum a, Datum b, void *arg) +{ + ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg; + XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn; + XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn; + + if (pos_a < pos_b) + return 1; + else if (pos_a == pos_b) + return 0; + return -1; +} + +/* + * Allocate & initialize an iterator which iterates in lsn order over a + * transaction and all its subtransactions. + * + * Note: The iterator state is returned through iter_state parameter rather + * than the function's return value. This is because the state gets cleaned up + * in a PG_CATCH block in the caller, so we want to make sure the caller gets + * back the state even if this function throws an exception. + */ +static void +ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferIterTXNState *volatile *iter_state) +{ + Size nr_txns = 0; + ReorderBufferIterTXNState *state; + dlist_iter cur_txn_i; + int32 off; + + *iter_state = NULL; + + /* Check ordering of changes in the toplevel transaction. */ + AssertChangeLsnOrder(txn); + + /* + * Calculate the size of our heap: one element for every transaction that + * contains changes. (Besides the transactions already in the reorder + * buffer, we count the one we were directly passed.) + */ + if (txn->nentries > 0) + nr_txns++; + + dlist_foreach(cur_txn_i, &txn->subtxns) + { + ReorderBufferTXN *cur_txn; + + cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); + + /* Check ordering of changes in this subtransaction. */ + AssertChangeLsnOrder(cur_txn); + + if (cur_txn->nentries > 0) + nr_txns++; + } + + /* allocate iteration state */ + state = (ReorderBufferIterTXNState *) + MemoryContextAllocZero(rb->context, + sizeof(ReorderBufferIterTXNState) + + sizeof(ReorderBufferIterTXNEntry) * nr_txns); + + state->nr_txns = nr_txns; + dlist_init(&state->old_change); + + for (off = 0; off < state->nr_txns; off++) + { + state->entries[off].file.vfd = -1; + state->entries[off].segno = 0; + } + + /* allocate heap */ + state->heap = binaryheap_allocate(state->nr_txns, + ReorderBufferIterCompare, + state); + + /* Now that the state fields are initialized, it is safe to return it. */ + *iter_state = state; + + /* + * Now insert items into the binary heap, in an unordered fashion. (We + * will run a heap assembly step at the end; this is more efficient.) + */ + + off = 0; + + /* add toplevel transaction if it contains changes */ + if (txn->nentries > 0) + { + ReorderBufferChange *cur_change; + + if (rbtxn_is_serialized(txn)) + { + /* serialize remaining changes */ + ReorderBufferSerializeTXN(rb, txn); + ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file, + &state->entries[off].segno); + } + + cur_change = dlist_head_element(ReorderBufferChange, node, + &txn->changes); + + state->entries[off].lsn = cur_change->lsn; + state->entries[off].change = cur_change; + state->entries[off].txn = txn; + + binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + } + + /* add subtransactions if they contain changes */ + dlist_foreach(cur_txn_i, &txn->subtxns) + { + ReorderBufferTXN *cur_txn; + + cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); + + if (cur_txn->nentries > 0) + { + ReorderBufferChange *cur_change; + + if (rbtxn_is_serialized(cur_txn)) + { + /* serialize remaining changes */ + ReorderBufferSerializeTXN(rb, cur_txn); + ReorderBufferRestoreChanges(rb, cur_txn, + &state->entries[off].file, + &state->entries[off].segno); + } + cur_change = dlist_head_element(ReorderBufferChange, node, + &cur_txn->changes); + + state->entries[off].lsn = cur_change->lsn; + state->entries[off].change = cur_change; + state->entries[off].txn = cur_txn; + + binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); + } + } + + /* assemble a valid binary heap */ + binaryheap_build(state->heap); +} + +/* + * Return the next change when iterating over a transaction and its + * subtransactions. + * + * Returns NULL when no further changes exist. + */ +static ReorderBufferChange * +ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) +{ + ReorderBufferChange *change; + ReorderBufferIterTXNEntry *entry; + int32 off; + + /* nothing there anymore */ + if (state->heap->bh_size == 0) + return NULL; + + off = DatumGetInt32(binaryheap_first(state->heap)); + entry = &state->entries[off]; + + /* free memory we might have "leaked" in the previous *Next call */ + if (!dlist_is_empty(&state->old_change)) + { + change = dlist_container(ReorderBufferChange, node, + dlist_pop_head_node(&state->old_change)); + ReorderBufferReturnChange(rb, change, true); + Assert(dlist_is_empty(&state->old_change)); + } + + change = entry->change; + + /* + * update heap with information about which transaction has the next + * relevant change in LSN order + */ + + /* there are in-memory changes */ + if (dlist_has_next(&entry->txn->changes, &entry->change->node)) + { + dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node); + ReorderBufferChange *next_change = + dlist_container(ReorderBufferChange, node, next); + + /* txn stays the same */ + state->entries[off].lsn = next_change->lsn; + state->entries[off].change = next_change; + + binaryheap_replace_first(state->heap, Int32GetDatum(off)); + return change; + } + + /* try to load changes from disk */ + if (entry->txn->nentries != entry->txn->nentries_mem) + { + /* + * Ugly: restoring changes will reuse *Change records, thus delete the + * current one from the per-tx list and only free in the next call. + */ + dlist_delete(&change->node); + dlist_push_tail(&state->old_change, &change->node); + + /* + * Update the total bytes processed by the txn for which we are + * releasing the current set of changes and restoring the new set of + * changes. + */ + rb->totalBytes += entry->txn->size; + if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, + &state->entries[off].segno)) + { + /* successfully restored changes from disk */ + ReorderBufferChange *next_change = + dlist_head_element(ReorderBufferChange, node, + &entry->txn->changes); + + elog(DEBUG2, "restored %u/%u changes from disk", + (uint32) entry->txn->nentries_mem, + (uint32) entry->txn->nentries); + + Assert(entry->txn->nentries_mem); + /* txn stays the same */ + state->entries[off].lsn = next_change->lsn; + state->entries[off].change = next_change; + binaryheap_replace_first(state->heap, Int32GetDatum(off)); + + return change; + } + } + + /* ok, no changes there anymore, remove */ + binaryheap_remove_first(state->heap); + + return change; +} + +/* + * Deallocate the iterator + */ +static void +ReorderBufferIterTXNFinish(ReorderBuffer *rb, + ReorderBufferIterTXNState *state) +{ + int32 off; + + for (off = 0; off < state->nr_txns; off++) + { + if (state->entries[off].file.vfd != -1) + FileClose(state->entries[off].file.vfd); + } + + /* free memory we might have "leaked" in the last *Next call */ + if (!dlist_is_empty(&state->old_change)) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, + dlist_pop_head_node(&state->old_change)); + ReorderBufferReturnChange(rb, change, true); + Assert(dlist_is_empty(&state->old_change)); + } + + binaryheap_free(state->heap); + pfree(state); +} + +/* + * Cleanup the contents of a transaction, usually after the transaction + * committed or aborted. + */ +static void +ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + bool found; + dlist_mutable_iter iter; + + /* cleanup subtransactions & their changes */ + dlist_foreach_modify(iter, &txn->subtxns) + { + ReorderBufferTXN *subtxn; + + subtxn = dlist_container(ReorderBufferTXN, node, iter.cur); + + /* + * Subtransactions are always associated to the toplevel TXN, even if + * they originally were happening inside another subtxn, so we won't + * ever recurse more than one level deep here. + */ + Assert(rbtxn_is_known_subxact(subtxn)); + Assert(subtxn->nsubtxns == 0); + + ReorderBufferCleanupTXN(rb, subtxn); + } + + /* cleanup changes in the txn */ + dlist_foreach_modify(iter, &txn->changes) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* Check we're not mixing changes from different transactions. */ + Assert(change->txn == txn); + + ReorderBufferReturnChange(rb, change, true); + } + + /* + * Cleanup the tuplecids we stored for decoding catalog snapshot access. + * They are always stored in the toplevel transaction. + */ + dlist_foreach_modify(iter, &txn->tuplecids) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* Check we're not mixing changes from different transactions. */ + Assert(change->txn == txn); + Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); + + ReorderBufferReturnChange(rb, change, true); + } + + /* + * Cleanup the base snapshot, if set. + */ + if (txn->base_snapshot != NULL) + { + SnapBuildSnapDecRefcount(txn->base_snapshot); + dlist_delete(&txn->base_snapshot_node); + } + + /* + * Cleanup the snapshot for the last streamed run. + */ + if (txn->snapshot_now != NULL) + { + Assert(rbtxn_is_streamed(txn)); + ReorderBufferFreeSnap(rb, txn->snapshot_now); + } + + /* + * Remove TXN from its containing list. + * + * Note: if txn is known as subxact, we are deleting the TXN from its + * parent's list of known subxacts; this leaves the parent's nsubxacts + * count too high, but we don't care. Otherwise, we are deleting the TXN + * from the LSN-ordered list of toplevel TXNs. + */ + dlist_delete(&txn->node); + + /* now remove reference from buffer */ + hash_search(rb->by_txn, + (void *) &txn->xid, + HASH_REMOVE, + &found); + Assert(found); + + /* remove entries spilled to disk */ + if (rbtxn_is_serialized(txn)) + ReorderBufferRestoreCleanup(rb, txn); + + /* deallocate */ + ReorderBufferReturnTXN(rb, txn); +} + +/* + * Discard changes from a transaction (and subtransactions), either after + * streaming or decoding them at PREPARE. Keep the remaining info - + * transactions, tuplecids, invalidations and snapshots. + * + * We additionaly remove tuplecids after decoding the transaction at prepare + * time as we only need to perform invalidation at rollback or commit prepared. + * + * 'txn_prepared' indicates that we have decoded the transaction at prepare + * time. + */ +static void +ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared) +{ + dlist_mutable_iter iter; + + /* cleanup subtransactions & their changes */ + dlist_foreach_modify(iter, &txn->subtxns) + { + ReorderBufferTXN *subtxn; + + subtxn = dlist_container(ReorderBufferTXN, node, iter.cur); + + /* + * Subtransactions are always associated to the toplevel TXN, even if + * they originally were happening inside another subtxn, so we won't + * ever recurse more than one level deep here. + */ + Assert(rbtxn_is_known_subxact(subtxn)); + Assert(subtxn->nsubtxns == 0); + + ReorderBufferTruncateTXN(rb, subtxn, txn_prepared); + } + + /* cleanup changes in the txn */ + dlist_foreach_modify(iter, &txn->changes) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* Check we're not mixing changes from different transactions. */ + Assert(change->txn == txn); + + /* remove the change from it's containing list */ + dlist_delete(&change->node); + + ReorderBufferReturnChange(rb, change, true); + } + + /* + * Mark the transaction as streamed. + * + * The toplevel transaction, identified by (toptxn==NULL), is marked as + * streamed always, even if it does not contain any changes (that is, when + * all the changes are in subtransactions). + * + * For subtransactions, we only mark them as streamed when there are + * changes in them. + * + * We do it this way because of aborts - we don't want to send aborts for + * XIDs the downstream is not aware of. And of course, it always knows + * about the toplevel xact (we send the XID in all messages), but we never + * stream XIDs of empty subxacts. + */ + if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0))) + txn->txn_flags |= RBTXN_IS_STREAMED; + + if (txn_prepared) + { + /* + * If this is a prepared txn, cleanup the tuplecids we stored for + * decoding catalog snapshot access. They are always stored in the + * toplevel transaction. + */ + dlist_foreach_modify(iter, &txn->tuplecids) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + /* Check we're not mixing changes from different transactions. */ + Assert(change->txn == txn); + Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); + + /* Remove the change from its containing list. */ + dlist_delete(&change->node); + + ReorderBufferReturnChange(rb, change, true); + } + } + + /* + * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any + * memory. We could also keep the hash table and update it with new ctid + * values, but this seems simpler and good enough for now. + */ + if (txn->tuplecid_hash != NULL) + { + hash_destroy(txn->tuplecid_hash); + txn->tuplecid_hash = NULL; + } + + /* If this txn is serialized then clean the disk space. */ + if (rbtxn_is_serialized(txn)) + { + ReorderBufferRestoreCleanup(rb, txn); + txn->txn_flags &= ~RBTXN_IS_SERIALIZED; + + /* + * We set this flag to indicate if the transaction is ever serialized. + * We need this to accurately update the stats as otherwise the same + * transaction can be counted as serialized multiple times. + */ + txn->txn_flags |= RBTXN_IS_SERIALIZED_CLEAR; + } + + /* also reset the number of entries in the transaction */ + txn->nentries_mem = 0; + txn->nentries = 0; +} + +/* + * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by + * HeapTupleSatisfiesHistoricMVCC. + */ +static void +ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + dlist_iter iter; + HASHCTL hash_ctl; + + if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids)) + return; + + hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey); + hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt); + hash_ctl.hcxt = rb->context; + + /* + * create the hash with the exact number of to-be-stored tuplecids from + * the start + */ + txn->tuplecid_hash = + hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + dlist_foreach(iter, &txn->tuplecids) + { + ReorderBufferTupleCidKey key; + ReorderBufferTupleCidEnt *ent; + bool found; + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); + + /* be careful about padding */ + memset(&key, 0, sizeof(ReorderBufferTupleCidKey)); + + key.relnode = change->data.tuplecid.node; + + ItemPointerCopy(&change->data.tuplecid.tid, + &key.tid); + + ent = (ReorderBufferTupleCidEnt *) + hash_search(txn->tuplecid_hash, + (void *) &key, + HASH_ENTER, + &found); + if (!found) + { + ent->cmin = change->data.tuplecid.cmin; + ent->cmax = change->data.tuplecid.cmax; + ent->combocid = change->data.tuplecid.combocid; + } + else + { + /* + * Maybe we already saw this tuple before in this transaction, but + * if so it must have the same cmin. + */ + Assert(ent->cmin == change->data.tuplecid.cmin); + + /* + * cmax may be initially invalid, but once set it can only grow, + * and never become invalid again. + */ + Assert((ent->cmax == InvalidCommandId) || + ((change->data.tuplecid.cmax != InvalidCommandId) && + (change->data.tuplecid.cmax > ent->cmax))); + ent->cmax = change->data.tuplecid.cmax; + } + } +} + +/* + * Copy a provided snapshot so we can modify it privately. This is needed so + * that catalog modifying transactions can look into intermediate catalog + * states. + */ +static Snapshot +ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, + ReorderBufferTXN *txn, CommandId cid) +{ + Snapshot snap; + dlist_iter iter; + int i = 0; + Size size; + + size = sizeof(SnapshotData) + + sizeof(TransactionId) * orig_snap->xcnt + + sizeof(TransactionId) * (txn->nsubtxns + 1); + + snap = MemoryContextAllocZero(rb->context, size); + memcpy(snap, orig_snap, sizeof(SnapshotData)); + + snap->copied = true; + snap->active_count = 1; /* mark as active so nobody frees it */ + snap->regd_count = 0; + snap->xip = (TransactionId *) (snap + 1); + + memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt); + + /* + * snap->subxip contains all txids that belong to our transaction which we + * need to check via cmin/cmax. That's why we store the toplevel + * transaction in there as well. + */ + snap->subxip = snap->xip + snap->xcnt; + snap->subxip[i++] = txn->xid; + + /* + * subxcnt isn't decreased when subtransactions abort, so count manually. + * Since it's an upper boundary it is safe to use it for the allocation + * above. + */ + snap->subxcnt = 1; + + dlist_foreach(iter, &txn->subtxns) + { + ReorderBufferTXN *sub_txn; + + sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur); + snap->subxip[i++] = sub_txn->xid; + snap->subxcnt++; + } + + /* sort so we can bsearch() later */ + qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator); + + /* store the specified current CommandId */ + snap->curcid = cid; + + return snap; +} + +/* + * Free a previously ReorderBufferCopySnap'ed snapshot + */ +static void +ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) +{ + if (snap->copied) + pfree(snap); + else + SnapBuildSnapDecRefcount(snap); +} + +/* + * If the transaction was (partially) streamed, we need to prepare or commit + * it in a 'streamed' way. That is, we first stream the remaining part of the + * transaction, and then invoke stream_prepare or stream_commit message as per + * the case. + */ +static void +ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + /* we should only call this for previously streamed transactions */ + Assert(rbtxn_is_streamed(txn)); + + ReorderBufferStreamTXN(rb, txn); + + if (rbtxn_prepared(txn)) + { + /* + * Note, we send stream prepare even if a concurrent abort is + * detected. See DecodePrepare for more information. + */ + rb->stream_prepare(rb, txn, txn->final_lsn); + + /* + * This is a PREPARED transaction, part of a two-phase commit. The + * full cleanup will happen as part of the COMMIT PREPAREDs, so now + * just truncate txn by removing changes and tuple_cids. + */ + ReorderBufferTruncateTXN(rb, txn, true); + /* Reset the CheckXidAlive */ + CheckXidAlive = InvalidTransactionId; + } + else + { + rb->stream_commit(rb, txn, txn->final_lsn); + ReorderBufferCleanupTXN(rb, txn); + } +} + +/* + * Set xid to detect concurrent aborts. + * + * While streaming an in-progress transaction or decoding a prepared + * transaction there is a possibility that the (sub)transaction might get + * aborted concurrently. In such case if the (sub)transaction has catalog + * update then we might decode the tuple using wrong catalog version. For + * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now, + * the transaction 501 updates the catalog tuple and after that we will have + * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is + * aborted and some other transaction say 502 updates the same catalog tuple + * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the + * problem is that when we try to decode the tuple inserted/updated in 501 + * after the catalog update, we will see the catalog tuple with (xmin: 500, + * xmax: 502) as visible because it will consider that the tuple is deleted by + * xid 502 which is not visible to our snapshot. And when we will try to + * decode with that catalog tuple, it can lead to a wrong result or a crash. + * So, it is necessary to detect concurrent aborts to allow streaming of + * in-progress transactions or decoding of prepared transactions. + * + * For detecting the concurrent abort we set CheckXidAlive to the current + * (sub)transaction's xid for which this change belongs to. And, during + * catalog scan we can check the status of the xid and if it is aborted we will + * report a specific error so that we can stop streaming current transaction + * and discard the already streamed changes on such an error. We might have + * already streamed some of the changes for the aborted (sub)transaction, but + * that is fine because when we decode the abort we will stream abort message + * to truncate the changes in the subscriber. Similarly, for prepared + * transactions, we stop decoding if concurrent abort is detected and then + * rollback the changes when rollback prepared is encountered. See + * DecodePrepare. + */ +static inline void +SetupCheckXidLive(TransactionId xid) +{ + /* + * If the input transaction id is already set as a CheckXidAlive then + * nothing to do. + */ + if (TransactionIdEquals(CheckXidAlive, xid)) + return; + + /* + * setup CheckXidAlive if it's not committed yet. We don't check if the + * xid is aborted. That will happen during catalog access. + */ + if (!TransactionIdDidCommit(xid)) + CheckXidAlive = xid; + else + CheckXidAlive = InvalidTransactionId; +} + +/* + * Helper function for ReorderBufferProcessTXN for applying change. + */ +static inline void +ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change, + bool streaming) +{ + if (streaming) + rb->stream_change(rb, txn, relation, change); + else + rb->apply_change(rb, txn, relation, change); +} + +/* + * Helper function for ReorderBufferProcessTXN for applying the truncate. + */ +static inline void +ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, + int nrelations, Relation *relations, + ReorderBufferChange *change, bool streaming) +{ + if (streaming) + rb->stream_truncate(rb, txn, nrelations, relations, change); + else + rb->apply_truncate(rb, txn, nrelations, relations, change); +} + +/* + * Helper function for ReorderBufferProcessTXN for applying the message. + */ +static inline void +ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, + ReorderBufferChange *change, bool streaming) +{ + if (streaming) + rb->stream_message(rb, txn, change->lsn, true, + change->data.msg.prefix, + change->data.msg.message_size, + change->data.msg.message); + else + rb->message(rb, txn, change->lsn, true, + change->data.msg.prefix, + change->data.msg.message_size, + change->data.msg.message); +} + +/* + * Function to store the command id and snapshot at the end of the current + * stream so that we can reuse the same while sending the next stream. + */ +static inline void +ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, + Snapshot snapshot_now, CommandId command_id) +{ + txn->command_id = command_id; + + /* Avoid copying if it's already copied. */ + if (snapshot_now->copied) + txn->snapshot_now = snapshot_now; + else + txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, + txn, command_id); +} + +/* + * Helper function for ReorderBufferProcessTXN to handle the concurrent + * abort of the streaming transaction. This resets the TXN such that it + * can be used to stream the remaining data of transaction being processed. + * This can happen when the subtransaction is aborted and we still want to + * continue processing the main or other subtransactions data. + */ +static void +ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, + Snapshot snapshot_now, + CommandId command_id, + XLogRecPtr last_lsn, + ReorderBufferChange *specinsert) +{ + /* Discard the changes that we just streamed */ + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + + /* Free all resources allocated for toast reconstruction */ + ReorderBufferToastReset(rb, txn); + + /* Return the spec insert change if it is not NULL */ + if (specinsert != NULL) + { + ReorderBufferReturnChange(rb, specinsert, true); + specinsert = NULL; + } + + /* + * For the streaming case, stop the stream and remember the command ID and + * snapshot for the streaming run. + */ + if (rbtxn_is_streamed(txn)) + { + rb->stream_stop(rb, txn, last_lsn); + ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id); + } +} + +/* + * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN. + * + * Send data of a transaction (and its subtransactions) to the + * output plugin. We iterate over the top and subtransactions (using a k-way + * merge) and replay the changes in lsn order. + * + * If streaming is true then data will be sent using stream API. + * + * Note: "volatile" markers on some parameters are to avoid trouble with + * PG_TRY inside the function. + */ +static void +ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn, + volatile Snapshot snapshot_now, + volatile CommandId command_id, + bool streaming) +{ + bool using_subtxn; + MemoryContext ccxt = CurrentMemoryContext; + ReorderBufferIterTXNState *volatile iterstate = NULL; + volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr; + ReorderBufferChange *volatile specinsert = NULL; + volatile bool stream_started = false; + ReorderBufferTXN *volatile curtxn = NULL; + + /* build data to be able to lookup the CommandIds of catalog tuples */ + ReorderBufferBuildTupleCidHash(rb, txn); + + /* setup the initial snapshot */ + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); + + /* + * Decoding needs access to syscaches et al., which in turn use + * heavyweight locks and such. Thus we need to have enough state around to + * keep track of those. The easiest way is to simply use a transaction + * internally. That also allows us to easily enforce that nothing writes + * to the database by checking for xid assignments. + * + * When we're called via the SQL SRF there's already a transaction + * started, so start an explicit subtransaction there. + */ + using_subtxn = IsTransactionOrTransactionBlock(); + + PG_TRY(); + { + ReorderBufferChange *change; + + if (using_subtxn) + BeginInternalSubTransaction(streaming ? "stream" : "replay"); + else + StartTransactionCommand(); + + /* + * We only need to send begin/begin-prepare for non-streamed + * transactions. + */ + if (!streaming) + { + if (rbtxn_prepared(txn)) + rb->begin_prepare(rb, txn); + else + rb->begin(rb, txn); + } + + ReorderBufferIterTXNInit(rb, txn, &iterstate); + while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) + { + Relation relation = NULL; + Oid reloid; + + /* + * We can't call start stream callback before processing first + * change. + */ + if (prev_lsn == InvalidXLogRecPtr) + { + if (streaming) + { + txn->origin_id = change->origin_id; + rb->stream_start(rb, txn, change->lsn); + stream_started = true; + } + } + + /* + * Enforce correct ordering of changes, merged from multiple + * subtransactions. The changes may have the same LSN due to + * MULTI_INSERT xlog records. + */ + Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn); + + prev_lsn = change->lsn; + + /* + * Set the current xid to detect concurrent aborts. This is + * required for the cases when we decode the changes before the + * COMMIT record is processed. + */ + if (streaming || rbtxn_prepared(change->txn)) + { + curtxn = change->txn; + SetupCheckXidLive(curtxn->xid); + } + + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + + /* + * Confirmation for speculative insertion arrived. Simply + * use as a normal record. It'll be cleaned up at the end + * of INSERT processing. + */ + if (specinsert == NULL) + elog(ERROR, "invalid ordering of speculative insertion changes"); + Assert(specinsert->data.tp.oldtuple == NULL); + change = specinsert; + change->action = REORDER_BUFFER_CHANGE_INSERT; + + /* intentionally fall through */ + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + Assert(snapshot_now); + + reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, + change->data.tp.relnode.relNode); + + /* + * Mapped catalog tuple without data, emitted while + * catalog table was in the process of being rewritten. We + * can fail to look up the relfilenode, because the + * relmapper has no "historic" view, in contrast to the + * normal catalog during decoding. Thus repeated rewrites + * can cause a lookup failure. That's OK because we do not + * decode catalog changes anyway. Normally such tuples + * would be skipped over below, but we can't identify + * whether the table should be logically logged without + * mapping the relfilenode to the oid. + */ + if (reloid == InvalidOid && + change->data.tp.newtuple == NULL && + change->data.tp.oldtuple == NULL) + goto change_done; + else if (reloid == InvalidOid) + elog(ERROR, "could not map filenode \"%s\" to relation OID", + relpathperm(change->data.tp.relnode, + MAIN_FORKNUM)); + + relation = RelationIdGetRelation(reloid); + + if (!RelationIsValid(relation)) + elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")", + reloid, + relpathperm(change->data.tp.relnode, + MAIN_FORKNUM)); + + if (!RelationIsLogicallyLogged(relation)) + goto change_done; + + /* + * Ignore temporary heaps created during DDL unless the + * plugin has asked for them. + */ + if (relation->rd_rel->relrewrite && !rb->output_rewrites) + goto change_done; + + /* + * For now ignore sequence changes entirely. Most of the + * time they don't log changes using records we + * understand, so it doesn't make sense to handle the few + * cases we do. + */ + if (relation->rd_rel->relkind == RELKIND_SEQUENCE) + goto change_done; + + /* user-triggered change */ + if (!IsToastRelation(relation)) + { + ReorderBufferToastReplace(rb, txn, relation, change); + ReorderBufferApplyChange(rb, txn, relation, change, + streaming); + + /* + * Only clear reassembled toast chunks if we're sure + * they're not required anymore. The creator of the + * tuple tells us. + */ + if (change->data.tp.clear_toast_afterwards) + ReorderBufferToastReset(rb, txn); + } + /* we're not interested in toast deletions */ + else if (change->action == REORDER_BUFFER_CHANGE_INSERT) + { + /* + * Need to reassemble the full toasted Datum in + * memory, to ensure the chunks don't get reused till + * we're done remove it from the list of this + * transaction's changes. Otherwise it will get + * freed/reused while restoring spooled data from + * disk. + */ + Assert(change->data.tp.newtuple != NULL); + + dlist_delete(&change->node); + ReorderBufferToastAppendChunk(rb, txn, relation, + change); + } + + change_done: + + /* + * If speculative insertion was confirmed, the record + * isn't needed anymore. + */ + if (specinsert != NULL) + { + ReorderBufferReturnChange(rb, specinsert, true); + specinsert = NULL; + } + + if (RelationIsValid(relation)) + { + RelationClose(relation); + relation = NULL; + } + break; + + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: + + /* + * Speculative insertions are dealt with by delaying the + * processing of the insert until the confirmation record + * arrives. For that we simply unlink the record from the + * chain, so it does not get freed/reused while restoring + * spooled data from disk. + * + * This is safe in the face of concurrent catalog changes + * because the relevant relation can't be changed between + * speculative insertion and confirmation due to + * CheckTableNotInUse() and locking. + */ + + /* clear out a pending (and thus failed) speculation */ + if (specinsert != NULL) + { + ReorderBufferReturnChange(rb, specinsert, true); + specinsert = NULL; + } + + /* and memorize the pending insertion */ + dlist_delete(&change->node); + specinsert = change; + break; + + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + + /* + * Abort for speculative insertion arrived. So cleanup the + * specinsert tuple and toast hash. + * + * Note that we get the spec abort change for each toast + * entry but we need to perform the cleanup only the first + * time we get it for the main table. + */ + if (specinsert != NULL) + { + /* + * We must clean the toast hash before processing a + * completely new tuple to avoid confusion about the + * previous tuple's toast chunks. + */ + Assert(change->data.tp.clear_toast_afterwards); + ReorderBufferToastReset(rb, txn); + + /* We don't need this record anymore. */ + ReorderBufferReturnChange(rb, specinsert, true); + specinsert = NULL; + } + break; + + case REORDER_BUFFER_CHANGE_TRUNCATE: + { + int i; + int nrelids = change->data.truncate.nrelids; + int nrelations = 0; + Relation *relations; + + relations = palloc0(nrelids * sizeof(Relation)); + for (i = 0; i < nrelids; i++) + { + Oid relid = change->data.truncate.relids[i]; + Relation relation; + + relation = RelationIdGetRelation(relid); + + if (!RelationIsValid(relation)) + elog(ERROR, "could not open relation with OID %u", relid); + + if (!RelationIsLogicallyLogged(relation)) + continue; + + relations[nrelations++] = relation; + } + + /* Apply the truncate. */ + ReorderBufferApplyTruncate(rb, txn, nrelations, + relations, change, + streaming); + + for (i = 0; i < nrelations; i++) + RelationClose(relations[i]); + + break; + } + + case REORDER_BUFFER_CHANGE_MESSAGE: + ReorderBufferApplyMessage(rb, txn, change, streaming); + break; + + case REORDER_BUFFER_CHANGE_INVALIDATION: + /* Execute the invalidation messages locally */ + ReorderBufferExecuteInvalidations( + change->data.inval.ninvalidations, + change->data.inval.invalidations); + break; + + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: + /* get rid of the old */ + TeardownHistoricSnapshot(false); + + if (snapshot_now->copied) + { + ReorderBufferFreeSnap(rb, snapshot_now); + snapshot_now = + ReorderBufferCopySnap(rb, change->data.snapshot, + txn, command_id); + } + + /* + * Restored from disk, need to be careful not to double + * free. We could introduce refcounting for that, but for + * now this seems infrequent enough not to care. + */ + else if (change->data.snapshot->copied) + { + snapshot_now = + ReorderBufferCopySnap(rb, change->data.snapshot, + txn, command_id); + } + else + { + snapshot_now = change->data.snapshot; + } + + /* and continue with the new one */ + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); + break; + + case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: + Assert(change->data.command_id != InvalidCommandId); + + if (command_id < change->data.command_id) + { + command_id = change->data.command_id; + + if (!snapshot_now->copied) + { + /* we don't use the global one anymore */ + snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, + txn, command_id); + } + + snapshot_now->curcid = command_id; + + TeardownHistoricSnapshot(false); + SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); + } + + break; + + case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + elog(ERROR, "tuplecid value in changequeue"); + break; + } + } + + /* speculative insertion record must be freed by now */ + Assert(!specinsert); + + /* clean up the iterator */ + ReorderBufferIterTXNFinish(rb, iterstate); + iterstate = NULL; + + /* + * Update total transaction count and total bytes processed by the + * transaction and its subtransactions. Ensure to not count the + * streamed transaction multiple times. + * + * Note that the statistics computation has to be done after + * ReorderBufferIterTXNFinish as it releases the serialized change + * which we have already accounted in ReorderBufferIterTXNNext. + */ + if (!rbtxn_is_streamed(txn)) + rb->totalTxns++; + + rb->totalBytes += txn->total_size; + + /* + * Done with current changes, send the last message for this set of + * changes depending upon streaming mode. + */ + if (streaming) + { + if (stream_started) + { + rb->stream_stop(rb, txn, prev_lsn); + stream_started = false; + } + } + else + { + /* + * Call either PREPARE (for two-phase transactions) or COMMIT (for + * regular ones). + */ + if (rbtxn_prepared(txn)) + rb->prepare(rb, txn, commit_lsn); + else + rb->commit(rb, txn, commit_lsn); + } + + /* this is just a sanity check against bad output plugin behaviour */ + if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) + elog(ERROR, "output plugin used XID %u", + GetCurrentTransactionId()); + + /* + * Remember the command ID and snapshot for the next set of changes in + * streaming mode. + */ + if (streaming) + ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id); + else if (snapshot_now->copied) + ReorderBufferFreeSnap(rb, snapshot_now); + + /* cleanup */ + TeardownHistoricSnapshot(false); + + /* + * Aborting the current (sub-)transaction as a whole has the right + * semantics. We want all locks acquired in here to be released, not + * reassigned to the parent and we do not want any database access + * have persistent effects. + */ + AbortCurrentTransaction(); + + /* make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); + + if (using_subtxn) + RollbackAndReleaseCurrentSubTransaction(); + + /* + * We are here due to one of the four reasons: 1. Decoding an + * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a + * prepared txn that was (partially) streamed. 4. Decoding a committed + * txn. + * + * For 1, we allow truncation of txn data by removing the changes + * already streamed but still keeping other things like invalidations, + * snapshot, and tuplecids. For 2 and 3, we indicate + * ReorderBufferTruncateTXN to do more elaborate truncation of txn + * data as the entire transaction has been decoded except for commit. + * For 4, as the entire txn has been decoded, we can fully clean up + * the TXN reorder buffer. + */ + if (streaming || rbtxn_prepared(txn)) + { + ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn)); + /* Reset the CheckXidAlive */ + CheckXidAlive = InvalidTransactionId; + } + else + ReorderBufferCleanupTXN(rb, txn); + } + PG_CATCH(); + { + MemoryContext ecxt = MemoryContextSwitchTo(ccxt); + ErrorData *errdata = CopyErrorData(); + + /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */ + if (iterstate) + ReorderBufferIterTXNFinish(rb, iterstate); + + TeardownHistoricSnapshot(true); + + /* + * Force cache invalidation to happen outside of a valid transaction + * to prevent catalog access as we just caught an error. + */ + AbortCurrentTransaction(); + + /* make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(txn->ninvalidations, + txn->invalidations); + + if (using_subtxn) + RollbackAndReleaseCurrentSubTransaction(); + + /* + * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent + * abort of the (sub)transaction we are streaming or preparing. We + * need to do the cleanup and return gracefully on this error, see + * SetupCheckXidLive. + * + * This error code can be thrown by one of the callbacks we call + * during decoding so we need to ensure that we return gracefully only + * when we are sending the data in streaming mode and the streaming is + * not finished yet or when we are sending the data out on a PREPARE + * during a two-phase commit. + */ + if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK && + (stream_started || rbtxn_prepared(txn))) + { + /* curtxn must be set for streaming or prepared transactions */ + Assert(curtxn); + + /* Cleanup the temporary error state. */ + FlushErrorState(); + FreeErrorData(errdata); + errdata = NULL; + curtxn->concurrent_abort = true; + + /* Reset the TXN so that it is allowed to stream remaining data. */ + ReorderBufferResetTXN(rb, txn, snapshot_now, + command_id, prev_lsn, + specinsert); + } + else + { + ReorderBufferCleanupTXN(rb, txn); + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } + } + PG_END_TRY(); +} + +/* + * Perform the replay of a transaction and its non-aborted subtransactions. + * + * Subtransactions previously have to be processed by + * ReorderBufferCommitChild(), even if previously assigned to the toplevel + * transaction with ReorderBufferAssignChild. + * + * This interface is called once a prepare or toplevel commit is read for both + * streamed as well as non-streamed transactions. + */ +static void +ReorderBufferReplay(ReorderBufferTXN *txn, + ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + Snapshot snapshot_now; + CommandId command_id = FirstCommandId; + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + + /* + * If the transaction was (partially) streamed, we need to commit it in a + * 'streamed' way. That is, we first stream the remaining part of the + * transaction, and then invoke stream_commit message. + * + * Called after everything (origin ID, LSN, ...) is stored in the + * transaction to avoid passing that information directly. + */ + if (rbtxn_is_streamed(txn)) + { + ReorderBufferStreamCommit(rb, txn); + return; + } + + /* + * If this transaction has no snapshot, it didn't make any changes to the + * database, so there's nothing to decode. Note that + * ReorderBufferCommitChild will have transferred any snapshots from + * subtransactions if there were any. + */ + if (txn->base_snapshot == NULL) + { + Assert(txn->ninvalidations == 0); + + /* + * Removing this txn before a commit might result in the computation + * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts. + */ + if (!rbtxn_prepared(txn)) + ReorderBufferCleanupTXN(rb, txn); + return; + } + + snapshot_now = txn->base_snapshot; + + /* Process and send the changes to output plugin. */ + ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now, + command_id, false); +} + +/* + * Commit a transaction. + * + * See comments for ReorderBufferReplay(). + */ +void +ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + TimestampTz commit_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time, + origin_id, origin_lsn); +} + +/* + * Record the prepare information for a transaction. + */ +bool +ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, + TimestampTz prepare_time, + RepOriginId origin_id, XLogRecPtr origin_lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); + + /* unknown transaction, nothing to do */ + if (txn == NULL) + return false; + + /* + * Remember the prepare information to be later used by commit prepared in + * case we skip doing prepare. + */ + txn->final_lsn = prepare_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = prepare_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + + return true; +} + +/* Remember that we have skipped prepare */ +void +ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); + + /* unknown transaction, nothing to do */ + if (txn == NULL) + return; + + txn->txn_flags |= RBTXN_SKIPPED_PREPARE; +} + +/* + * Prepare a two-phase transaction. + * + * See comments for ReorderBufferReplay(). + */ +void +ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, + char *gid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown transaction, nothing to replay */ + if (txn == NULL) + return; + + txn->txn_flags |= RBTXN_PREPARE; + txn->gid = pstrdup(gid); + + /* The prepare info must have been updated in txn by now. */ + Assert(txn->final_lsn != InvalidXLogRecPtr); + + ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, + txn->commit_time, txn->origin_id, txn->origin_lsn); + + /* + * We send the prepare for the concurrently aborted xacts so that later + * when rollback prepared is decoded and sent, the downstream should be + * able to rollback such a xact. See comments atop DecodePrepare. + * + * Note, for the concurrent_abort + streaming case a stream_prepare was + * already sent within the ReorderBufferReplay call above. + */ + if (txn->concurrent_abort && !rbtxn_is_streamed(txn)) + rb->prepare(rb, txn, txn->final_lsn); +} + +/* + * This is used to handle COMMIT/ROLLBACK PREPARED. + */ +void +ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr commit_lsn, XLogRecPtr end_lsn, + XLogRecPtr initial_consistent_point, + TimestampTz commit_time, RepOriginId origin_id, + XLogRecPtr origin_lsn, char *gid, bool is_commit) +{ + ReorderBufferTXN *txn; + XLogRecPtr prepare_end_lsn; + TimestampTz prepare_time; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false); + + /* unknown transaction, nothing to do */ + if (txn == NULL) + return; + + /* + * By this time the txn has the prepare record information, remember it to + * be later used for rollback. + */ + prepare_end_lsn = txn->end_lsn; + prepare_time = txn->commit_time; + + /* add the gid in the txn */ + txn->gid = pstrdup(gid); + + /* + * It is possible that this transaction is not decoded at prepare time + * either because by that time we didn't have a consistent snapshot or it + * was decoded earlier but we have restarted. We only need to send the + * prepare if it was not decoded earlier. We don't need to decode the xact + * for aborts if it is not done already. + */ + if ((txn->final_lsn < initial_consistent_point) && is_commit) + { + txn->txn_flags |= RBTXN_PREPARE; + + /* + * The prepare info must have been updated in txn even if we skip + * prepare. + */ + Assert(txn->final_lsn != InvalidXLogRecPtr); + + /* + * By this time the txn has the prepare record information and it is + * important to use that so that downstream gets the accurate + * information. If instead, we have passed commit information here + * then downstream can behave as it has already replayed commit + * prepared after the restart. + */ + ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, + txn->commit_time, txn->origin_id, txn->origin_lsn); + } + + txn->final_lsn = commit_lsn; + txn->end_lsn = end_lsn; + txn->commit_time = commit_time; + txn->origin_id = origin_id; + txn->origin_lsn = origin_lsn; + + if (is_commit) + rb->commit_prepared(rb, txn, commit_lsn); + else + rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time); + + /* cleanup: make sure there's no cache pollution */ + ReorderBufferExecuteInvalidations(txn->ninvalidations, + txn->invalidations); + ReorderBufferCleanupTXN(rb, txn); +} + +/* + * Abort a transaction that possibly has previous changes. Needs to be first + * called for subtransactions and then for the toplevel xid. + * + * NB: Transactions handled here have to have actively aborted (i.e. have + * produced an abort record). Implicitly aborted transactions are handled via + * ReorderBufferAbortOld(); transactions we're just not interested in, but + * which have committed are handled in ReorderBufferForget(). + * + * This function purges this transaction and its contents from memory and + * disk. + */ +void +ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown, nothing to remove */ + if (txn == NULL) + return; + + /* For streamed transactions notify the remote node about the abort. */ + if (rbtxn_is_streamed(txn)) + { + rb->stream_abort(rb, txn, lsn); + + /* + * We might have decoded changes for this transaction that could load + * the cache as per the current transaction's view (consider DDL's + * happened in this transaction). We don't want the decoding of future + * transactions to use those cache entries so execute invalidations. + */ + if (txn->ninvalidations > 0) + ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, + txn->invalidations); + } + + /* cosmetic... */ + txn->final_lsn = lsn; + + /* remove potential on-disk data, and deallocate */ + ReorderBufferCleanupTXN(rb, txn); +} + +/* + * Abort all transactions that aren't actually running anymore because the + * server restarted. + * + * NB: These really have to be transactions that have aborted due to a server + * crash/immediate restart, as we don't deal with invalidations here. + */ +void +ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid) +{ + dlist_mutable_iter it; + + /* + * Iterate through all (potential) toplevel TXNs and abort all that are + * older than what possibly can be running. Once we've found the first + * that is alive we stop, there might be some that acquired an xid earlier + * but started writing later, but it's unlikely and they will be cleaned + * up in a later call to this function. + */ + dlist_foreach_modify(it, &rb->toplevel_by_lsn) + { + ReorderBufferTXN *txn; + + txn = dlist_container(ReorderBufferTXN, node, it.cur); + + if (TransactionIdPrecedes(txn->xid, oldestRunningXid)) + { + elog(DEBUG2, "aborting old transaction %u", txn->xid); + + /* remove potential on-disk data, and deallocate this tx */ + ReorderBufferCleanupTXN(rb, txn); + } + else + return; + } +} + +/* + * Forget the contents of a transaction if we aren't interested in its + * contents. Needs to be first called for subtransactions and then for the + * toplevel xid. + * + * This is significantly different to ReorderBufferAbort() because + * transactions that have committed need to be treated differently from aborted + * ones since they may have modified the catalog. + * + * Note that this is only allowed to be called in the moment a transaction + * commit has just been read, not earlier; otherwise later records referring + * to this xid might re-create the transaction incompletely. + */ +void +ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown, nothing to forget */ + if (txn == NULL) + return; + + /* For streamed transactions notify the remote node about the abort. */ + if (rbtxn_is_streamed(txn)) + rb->stream_abort(rb, txn, lsn); + + /* cosmetic... */ + txn->final_lsn = lsn; + + /* + * Process cache invalidation messages if there are any. Even if we're not + * interested in the transaction's contents, it could have manipulated the + * catalog and we need to update the caches according to that. + */ + if (txn->base_snapshot != NULL && txn->ninvalidations > 0) + ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, + txn->invalidations); + else + Assert(txn->ninvalidations == 0); + + /* remove potential on-disk data, and deallocate */ + ReorderBufferCleanupTXN(rb, txn); +} + +/* + * Invalidate cache for those transactions that need to be skipped just in case + * catalogs were manipulated as part of the transaction. + * + * Note that this is a special-purpose function for prepared transactions where + * we don't want to clean up the TXN even when we decide to skip it. See + * DecodePrepare. + */ +void +ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + + /* unknown, nothing to do */ + if (txn == NULL) + return; + + /* + * Process cache invalidation messages if there are any. Even if we're not + * interested in the transaction's contents, it could have manipulated the + * catalog and we need to update the caches according to that. + */ + if (txn->base_snapshot != NULL && txn->ninvalidations > 0) + ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, + txn->invalidations); + else + Assert(txn->ninvalidations == 0); +} + + +/* + * Execute invalidations happening outside the context of a decoded + * transaction. That currently happens either for xid-less commits + * (cf. RecordTransactionCommit()) or for invalidations in uninteresting + * transactions (via ReorderBufferForget()). + */ +void +ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, + SharedInvalidationMessage *invalidations) +{ + bool use_subtxn = IsTransactionOrTransactionBlock(); + int i; + + if (use_subtxn) + BeginInternalSubTransaction("replay"); + + /* + * Force invalidations to happen outside of a valid transaction - that way + * entries will just be marked as invalid without accessing the catalog. + * That's advantageous because we don't need to setup the full state + * necessary for catalog access. + */ + if (use_subtxn) + AbortCurrentTransaction(); + + for (i = 0; i < ninvalidations; i++) + LocalExecuteInvalidationMessage(&invalidations[i]); + + if (use_subtxn) + RollbackAndReleaseCurrentSubTransaction(); +} + +/* + * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at + * least once for every xid in XLogRecord->xl_xid (other places in records + * may, but do not have to be passed through here). + * + * Reorderbuffer keeps some datastructures about transactions in LSN order, + * for efficiency. To do that it has to know about when transactions are seen + * first in the WAL. As many types of records are not actually interesting for + * logical decoding, they do not necessarily pass though here. + */ +void +ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) +{ + /* many records won't have an xid assigned, centralize check here */ + if (xid != InvalidTransactionId) + ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); +} + +/* + * Add a new snapshot to this transaction that may only used after lsn 'lsn' + * because the previous snapshot doesn't describe the catalog correctly for + * following rows. + */ +void +ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, Snapshot snap) +{ + ReorderBufferChange *change = ReorderBufferGetChange(rb); + + change->data.snapshot = snap; + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT; + + ReorderBufferQueueChange(rb, xid, lsn, change, false); +} + +/* + * Set up the transaction's base snapshot. + * + * If we know that xid is a subtransaction, set the base snapshot on the + * top-level transaction instead. + */ +void +ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, Snapshot snap) +{ + ReorderBufferTXN *txn; + bool is_new; + + AssertArg(snap != NULL); + + /* + * Fetch the transaction to operate on. If we know it's a subtransaction, + * operate on its top-level transaction instead. + */ + txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true); + if (rbtxn_is_known_subxact(txn)) + txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false, + NULL, InvalidXLogRecPtr, false); + Assert(txn->base_snapshot == NULL); + + txn->base_snapshot = snap; + txn->base_snapshot_lsn = lsn; + dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node); + + AssertTXNLsnOrder(rb); +} + +/* + * Access the catalog with this CommandId at this point in the changestream. + * + * May only be called for command ids > 1 + */ +void +ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, CommandId cid) +{ + ReorderBufferChange *change = ReorderBufferGetChange(rb); + + change->data.command_id = cid; + change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID; + + ReorderBufferQueueChange(rb, xid, lsn, change, false); +} + +/* + * Update memory counters to account for the new or removed change. + * + * We update two counters - in the reorder buffer, and in the transaction + * containing the change. The reorder buffer counter allows us to quickly + * decide if we reached the memory limit, the transaction counter allows + * us to quickly pick the largest transaction for eviction. + * + * When streaming is enabled, we need to update the toplevel transaction + * counters instead - we don't really care about subtransactions as we + * can't stream them individually anyway, and we only pick toplevel + * transactions for eviction. So only toplevel transactions matter. + */ +static void +ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, + ReorderBufferChange *change, + bool addition, Size sz) +{ + ReorderBufferTXN *txn; + ReorderBufferTXN *toptxn; + + Assert(change->txn); + + /* + * Ignore tuple CID changes, because those are not evicted when reaching + * memory limit. So we just don't count them, because it might easily + * trigger a pointless attempt to spill. + */ + if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID) + return; + + txn = change->txn; + + /* + * Update the total size in top level as well. This is later used to + * compute the decoding stats. + */ + if (txn->toptxn != NULL) + toptxn = txn->toptxn; + else + toptxn = txn; + + if (addition) + { + txn->size += sz; + rb->size += sz; + + /* Update the total size in the top transaction. */ + toptxn->total_size += sz; + } + else + { + Assert((rb->size >= sz) && (txn->size >= sz)); + txn->size -= sz; + rb->size -= sz; + + /* Update the total size in the top transaction. */ + toptxn->total_size -= sz; + } + + Assert(txn->size <= rb->size); +} + +/* + * Add new (relfilenode, tid) -> (cmin, cmax) mappings. + * + * We do not include this change type in memory accounting, because we + * keep CIDs in a separate list and do not evict them when reaching + * the memory limit. + */ +void +ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, RelFileNode node, + ItemPointerData tid, CommandId cmin, + CommandId cmax, CommandId combocid) +{ + ReorderBufferChange *change = ReorderBufferGetChange(rb); + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + change->data.tuplecid.node = node; + change->data.tuplecid.tid = tid; + change->data.tuplecid.cmin = cmin; + change->data.tuplecid.cmax = cmax; + change->data.tuplecid.combocid = combocid; + change->lsn = lsn; + change->txn = txn; + change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID; + + dlist_push_tail(&txn->tuplecids, &change->node); + txn->ntuplecids++; +} + +/* + * Setup the invalidation of the toplevel transaction. + * + * This needs to be called for each XLOG_XACT_INVALIDATIONS message and + * accumulates all the invalidation messages in the toplevel transaction as + * well as in the form of change in reorder buffer. We require to record it in + * form of the change so that we can execute only the required invalidations + * instead of executing all the invalidations on each CommandId increment. We + * also need to accumulate these in the toplevel transaction because in some + * cases we skip processing the transaction (see ReorderBufferForget), we need + * to execute all the invalidations together. + */ +void +ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn, Size nmsgs, + SharedInvalidationMessage *msgs) +{ + ReorderBufferTXN *txn; + MemoryContext oldcontext; + ReorderBufferChange *change; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + oldcontext = MemoryContextSwitchTo(rb->context); + + /* + * Collect all the invalidations under the top transaction so that we can + * execute them all together. See comment atop this function + */ + if (txn->toptxn) + txn = txn->toptxn; + + Assert(nmsgs > 0); + + /* Accumulate invalidations. */ + if (txn->ninvalidations == 0) + { + txn->ninvalidations = nmsgs; + txn->invalidations = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(txn->invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + } + else + { + txn->invalidations = (SharedInvalidationMessage *) + repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) * + (txn->ninvalidations + nmsgs)); + + memcpy(txn->invalidations + txn->ninvalidations, msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + txn->ninvalidations += nmsgs; + } + + change = ReorderBufferGetChange(rb); + change->action = REORDER_BUFFER_CHANGE_INVALIDATION; + change->data.inval.ninvalidations = nmsgs; + change->data.inval.invalidations = (SharedInvalidationMessage *) + palloc(sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(change->data.inval.invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + + ReorderBufferQueueChange(rb, xid, lsn, change, false); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Apply all invalidations we know. Possibly we only need parts at this point + * in the changestream but we don't know which those are. + */ +static void +ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs) +{ + int i; + + for (i = 0; i < nmsgs; i++) + LocalExecuteInvalidationMessage(&msgs[i]); +} + +/* + * Mark a transaction as containing catalog changes + */ +void +ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + + /* + * Mark top-level transaction as having catalog changes too if one of its + * children has so that the ReorderBufferBuildTupleCidHash can + * conveniently check just top-level transaction and decide whether to + * build the hash table or not. + */ + if (txn->toptxn != NULL) + txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; +} + +/* + * Query whether a transaction is already *known* to contain catalog + * changes. This can be wrong until directly before the commit! + */ +bool +ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + if (txn == NULL) + return false; + + return rbtxn_has_catalog_changes(txn); +} + +/* + * ReorderBufferXidHasBaseSnapshot + * Have we already set the base snapshot for the given txn/subtxn? + */ +bool +ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, + NULL, InvalidXLogRecPtr, false); + + /* transaction isn't known yet, ergo no snapshot */ + if (txn == NULL) + return false; + + /* a known subtxn? operate on top-level txn instead */ + if (rbtxn_is_known_subxact(txn)) + txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false, + NULL, InvalidXLogRecPtr, false); + + return txn->base_snapshot != NULL; +} + + +/* + * --------------------------------------- + * Disk serialization support + * --------------------------------------- + */ + +/* + * Ensure the IO buffer is >= sz. + */ +static void +ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz) +{ + if (!rb->outbufsize) + { + rb->outbuf = MemoryContextAlloc(rb->context, sz); + rb->outbufsize = sz; + } + else if (rb->outbufsize < sz) + { + rb->outbuf = repalloc(rb->outbuf, sz); + rb->outbufsize = sz; + } +} + +/* + * Find the largest transaction (toplevel or subxact) to evict (spill to disk). + * + * XXX With many subtransactions this might be quite slow, because we'll have + * to walk through all of them. There are some options how we could improve + * that: (a) maintain some secondary structure with transactions sorted by + * amount of changes, (b) not looking for the entirely largest transaction, + * but e.g. for transaction using at least some fraction of the memory limit, + * and (c) evicting multiple transactions at once, e.g. to free a given portion + * of the memory limit (e.g. 50%). + */ +static ReorderBufferTXN * +ReorderBufferLargestTXN(ReorderBuffer *rb) +{ + HASH_SEQ_STATUS hash_seq; + ReorderBufferTXNByIdEnt *ent; + ReorderBufferTXN *largest = NULL; + + hash_seq_init(&hash_seq, rb->by_txn); + while ((ent = hash_seq_search(&hash_seq)) != NULL) + { + ReorderBufferTXN *txn = ent->txn; + + /* if the current transaction is larger, remember it */ + if ((!largest) || (txn->size > largest->size)) + largest = txn; + } + + Assert(largest); + Assert(largest->size > 0); + Assert(largest->size <= rb->size); + + return largest; +} + +/* + * Find the largest toplevel transaction to evict (by streaming). + * + * This can be seen as an optimized version of ReorderBufferLargestTXN, which + * should give us the same transaction (because we don't update memory account + * for subtransaction with streaming, so it's always 0). But we can simply + * iterate over the limited number of toplevel transactions that have a base + * snapshot. There is no use of selecting a transaction that doesn't have base + * snapshot because we don't decode such transactions. + * + * Note that, we skip transactions that contains incomplete changes. There + * is a scope of optimization here such that we can select the largest + * transaction which has incomplete changes. But that will make the code and + * design quite complex and that might not be worth the benefit. If we plan to + * stream the transactions that contains incomplete changes then we need to + * find a way to partially stream/truncate the transaction changes in-memory + * and build a mechanism to partially truncate the spilled files. + * Additionally, whenever we partially stream the transaction we need to + * maintain the last streamed lsn and next time we need to restore from that + * segment and the offset in WAL. As we stream the changes from the top + * transaction and restore them subtransaction wise, we need to even remember + * the subxact from where we streamed the last change. + */ +static ReorderBufferTXN * +ReorderBufferLargestTopTXN(ReorderBuffer *rb) +{ + dlist_iter iter; + Size largest_size = 0; + ReorderBufferTXN *largest = NULL; + + /* Find the largest top-level transaction having a base snapshot. */ + dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn) + { + ReorderBufferTXN *txn; + + txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur); + + /* must not be a subtxn */ + Assert(!rbtxn_is_known_subxact(txn)); + /* base_snapshot must be set */ + Assert(txn->base_snapshot != NULL); + + if ((largest == NULL || txn->total_size > largest_size) && + (txn->total_size > 0) && !(rbtxn_has_partial_change(txn))) + { + largest = txn; + largest_size = txn->total_size; + } + } + + return largest; +} + +/* + * Check whether the logical_decoding_work_mem limit was reached, and if yes + * pick the largest (sub)transaction at-a-time to evict and spill its changes to + * disk until we reach under the memory limit. + * + * XXX At this point we select the transactions until we reach under the memory + * limit, but we might also adapt a more elaborate eviction strategy - for example + * evicting enough transactions to free certain fraction (e.g. 50%) of the memory + * limit. + */ +static void +ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) +{ + ReorderBufferTXN *txn; + + /* bail out if we haven't exceeded the memory limit */ + if (rb->size < logical_decoding_work_mem * 1024L) + return; + + /* + * Loop until we reach under the memory limit. One might think that just + * by evicting the largest (sub)transaction we will come under the memory + * limit based on assumption that the selected transaction is at least as + * large as the most recent change (which caused us to go over the memory + * limit). However, that is not true because a user can reduce the + * logical_decoding_work_mem to a smaller value before the most recent + * change. + */ + while (rb->size >= logical_decoding_work_mem * 1024L) + { + /* + * Pick the largest transaction (or subtransaction) and evict it from + * memory by streaming, if possible. Otherwise, spill to disk. + */ + if (ReorderBufferCanStartStreaming(rb) && + (txn = ReorderBufferLargestTopTXN(rb)) != NULL) + { + /* we know there has to be one, because the size is not zero */ + Assert(txn && !txn->toptxn); + Assert(txn->total_size > 0); + Assert(rb->size >= txn->total_size); + + ReorderBufferStreamTXN(rb, txn); + } + else + { + /* + * Pick the largest transaction (or subtransaction) and evict it + * from memory by serializing it to disk. + */ + txn = ReorderBufferLargestTXN(rb); + + /* we know there has to be one, because the size is not zero */ + Assert(txn); + Assert(txn->size > 0); + Assert(rb->size >= txn->size); + + ReorderBufferSerializeTXN(rb, txn); + } + + /* + * After eviction, the transaction should have no entries in memory, + * and should use 0 bytes for changes. + */ + Assert(txn->size == 0); + Assert(txn->nentries_mem == 0); + } + + /* We must be under the memory limit now. */ + Assert(rb->size < logical_decoding_work_mem * 1024L); +} + +/* + * Spill data of a large transaction (and its subtransactions) to disk. + */ +static void +ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + dlist_iter subtxn_i; + dlist_mutable_iter change_i; + int fd = -1; + XLogSegNo curOpenSegNo = 0; + Size spilled = 0; + Size size = txn->size; + + elog(DEBUG2, "spill %u changes in XID %u to disk", + (uint32) txn->nentries_mem, txn->xid); + + /* do the same to all child TXs */ + dlist_foreach(subtxn_i, &txn->subtxns) + { + ReorderBufferTXN *subtxn; + + subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur); + ReorderBufferSerializeTXN(rb, subtxn); + } + + /* serialize changestream */ + dlist_foreach_modify(change_i, &txn->changes) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, change_i.cur); + + /* + * store in segment in which it belongs by start lsn, don't split over + * multiple segments tho + */ + if (fd == -1 || + !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size)) + { + char path[MAXPGPATH]; + + if (fd != -1) + CloseTransientFile(fd); + + XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size); + + /* + * No need to care about TLIs here, only used during a single run, + * so each LSN only maps to a specific WAL record. + */ + ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, + curOpenSegNo); + + /* open segment, create it if necessary */ + fd = OpenTransientFile(path, + O_CREAT | O_WRONLY | O_APPEND | PG_BINARY); + + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + } + + ReorderBufferSerializeChange(rb, txn, fd, change); + dlist_delete(&change->node); + ReorderBufferReturnChange(rb, change, true); + + spilled++; + } + + /* update the statistics iff we have spilled anything */ + if (spilled) + { + rb->spillCount += 1; + rb->spillBytes += size; + + /* don't consider already serialized transactions */ + rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; + + /* update the decoding stats */ + UpdateDecodingStats((LogicalDecodingContext *) rb->private_data); + } + + Assert(spilled == txn->nentries_mem); + Assert(dlist_is_empty(&txn->changes)); + txn->nentries_mem = 0; + txn->txn_flags |= RBTXN_IS_SERIALIZED; + + if (fd != -1) + CloseTransientFile(fd); +} + +/* + * Serialize individual change to disk. + */ +static void +ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, + int fd, ReorderBufferChange *change) +{ + ReorderBufferDiskChange *ondisk; + Size sz = sizeof(ReorderBufferDiskChange); + + ReorderBufferSerializeReserve(rb, sz); + + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + memcpy(&ondisk->change, change, sizeof(ReorderBufferChange)); + + switch (change->action) + { + /* fall through these, they're all similar enough */ + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: + { + char *data; + ReorderBufferTupleBuf *oldtup, + *newtup; + Size oldlen = 0; + Size newlen = 0; + + oldtup = change->data.tp.oldtuple; + newtup = change->data.tp.newtuple; + + if (oldtup) + { + sz += sizeof(HeapTupleData); + oldlen = oldtup->tuple.t_len; + sz += oldlen; + } + + if (newtup) + { + sz += sizeof(HeapTupleData); + newlen = newtup->tuple.t_len; + sz += newlen; + } + + /* make sure we have enough space */ + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + if (oldlen) + { + memcpy(data, &oldtup->tuple, sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + memcpy(data, oldtup->tuple.t_data, oldlen); + data += oldlen; + } + + if (newlen) + { + memcpy(data, &newtup->tuple, sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + memcpy(data, newtup->tuple.t_data, newlen); + data += newlen; + } + break; + } + case REORDER_BUFFER_CHANGE_MESSAGE: + { + char *data; + Size prefix_size = strlen(change->data.msg.prefix) + 1; + + sz += prefix_size + change->data.msg.message_size + + sizeof(Size) + sizeof(Size); + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + /* write the prefix including the size */ + memcpy(data, &prefix_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.msg.prefix, + prefix_size); + data += prefix_size; + + /* write the message including the size */ + memcpy(data, &change->data.msg.message_size, sizeof(Size)); + data += sizeof(Size); + memcpy(data, change->data.msg.message, + change->data.msg.message_size); + data += change->data.msg.message_size; + + break; + } + case REORDER_BUFFER_CHANGE_INVALIDATION: + { + char *data; + Size inval_size = sizeof(SharedInvalidationMessage) * + change->data.inval.ninvalidations; + + sz += inval_size; + + ReorderBufferSerializeReserve(rb, sz); + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + memcpy(data, change->data.inval.invalidations, inval_size); + data += inval_size; + + break; + } + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: + { + Snapshot snap; + char *data; + + snap = change->data.snapshot; + + sz += sizeof(SnapshotData) + + sizeof(TransactionId) * snap->xcnt + + sizeof(TransactionId) * snap->subxcnt; + + /* make sure we have enough space */ + ReorderBufferSerializeReserve(rb, sz); + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + memcpy(data, snap, sizeof(SnapshotData)); + data += sizeof(SnapshotData); + + if (snap->xcnt) + { + memcpy(data, snap->xip, + sizeof(TransactionId) * snap->xcnt); + data += sizeof(TransactionId) * snap->xcnt; + } + + if (snap->subxcnt) + { + memcpy(data, snap->subxip, + sizeof(TransactionId) * snap->subxcnt); + data += sizeof(TransactionId) * snap->subxcnt; + } + break; + } + case REORDER_BUFFER_CHANGE_TRUNCATE: + { + Size size; + char *data; + + /* account for the OIDs of truncated relations */ + size = sizeof(Oid) * change->data.truncate.nrelids; + sz += size; + + /* make sure we have enough space */ + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + memcpy(data, change->data.truncate.relids, size); + data += size; + + break; + } + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: + case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + /* ReorderBufferChange contains everything important */ + break; + } + + ondisk->size = sz; + + errno = 0; + pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE); + if (write(fd, rb->outbuf, ondisk->size) != ondisk->size) + { + int save_errno = errno; + + CloseTransientFile(fd); + + /* if write didn't set errno, assume problem is no disk space */ + errno = save_errno ? save_errno : ENOSPC; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to data file for XID %u: %m", + txn->xid))); + } + pgstat_report_wait_end(); + + /* + * Keep the transaction's final_lsn up to date with each change we send to + * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to + * only do this on commit and abort records, but that doesn't work if a + * system crash leaves a transaction without its abort record). + * + * Make sure not to move it backwards. + */ + if (txn->final_lsn < change->lsn) + txn->final_lsn = change->lsn; + + Assert(ondisk->change.action == change->action); +} + +/* Returns true, if the output plugin supports streaming, false, otherwise. */ +static inline bool +ReorderBufferCanStream(ReorderBuffer *rb) +{ + LogicalDecodingContext *ctx = rb->private_data; + + return ctx->streaming; +} + +/* Returns true, if the streaming can be started now, false, otherwise. */ +static inline bool +ReorderBufferCanStartStreaming(ReorderBuffer *rb) +{ + LogicalDecodingContext *ctx = rb->private_data; + SnapBuild *builder = ctx->snapshot_builder; + + /* We can't start streaming unless a consistent state is reached. */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT) + return false; + + /* + * We can't start streaming immediately even if the streaming is enabled + * because we previously decoded this transaction and now just are + * restarting. + */ + if (ReorderBufferCanStream(rb) && + !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr)) + return true; + + return false; +} + +/* + * Send data of a large transaction (and its subtransactions) to the + * output plugin, but using the stream API. + */ +static void +ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + Snapshot snapshot_now; + CommandId command_id; + Size stream_bytes; + bool txn_is_streamed; + + /* We can never reach here for a subtransaction. */ + Assert(txn->toptxn == NULL); + + /* + * We can't make any assumptions about base snapshot here, similar to what + * ReorderBufferCommit() does. That relies on base_snapshot getting + * transferred from subxact in ReorderBufferCommitChild(), but that was + * not yet called as the transaction is in-progress. + * + * So just walk the subxacts and use the same logic here. But we only need + * to do that once, when the transaction is streamed for the first time. + * After that we need to reuse the snapshot from the previous run. + * + * Unlike DecodeCommit which adds xids of all the subtransactions in + * snapshot's xip array via SnapBuildCommittedTxn, we can't do that here + * but we do add them to subxip array instead via ReorderBufferCopySnap. + * This allows the catalog changes made in subtransactions decoded till + * now to be visible. + */ + if (txn->snapshot_now == NULL) + { + dlist_iter subxact_i; + + /* make sure this transaction is streamed for the first time */ + Assert(!rbtxn_is_streamed(txn)); + + /* at the beginning we should have invalid command ID */ + Assert(txn->command_id == InvalidCommandId); + + dlist_foreach(subxact_i, &txn->subtxns) + { + ReorderBufferTXN *subtxn; + + subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur); + ReorderBufferTransferSnapToParent(txn, subtxn); + } + + /* + * If this transaction has no snapshot, it didn't make any changes to + * the database till now, so there's nothing to decode. + */ + if (txn->base_snapshot == NULL) + { + Assert(txn->ninvalidations == 0); + return; + } + + command_id = FirstCommandId; + snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot, + txn, command_id); + } + else + { + /* the transaction must have been already streamed */ + Assert(rbtxn_is_streamed(txn)); + + /* + * Nah, we already have snapshot from the previous streaming run. We + * assume new subxacts can't move the LSN backwards, and so can't beat + * the LSN condition in the previous branch (so no need to walk + * through subxacts again). In fact, we must not do that as we may be + * using snapshot half-way through the subxact. + */ + command_id = txn->command_id; + + /* + * We can't use txn->snapshot_now directly because after the last + * streaming run, we might have got some new sub-transactions. So we + * need to add them to the snapshot. + */ + snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now, + txn, command_id); + + /* Free the previously copied snapshot. */ + Assert(txn->snapshot_now->copied); + ReorderBufferFreeSnap(rb, txn->snapshot_now); + txn->snapshot_now = NULL; + } + + /* + * Remember this information to be used later to update stats. We can't + * update the stats here as an error while processing the changes would + * lead to the accumulation of stats even though we haven't streamed all + * the changes. + */ + txn_is_streamed = rbtxn_is_streamed(txn); + stream_bytes = txn->total_size; + + /* Process and send the changes to output plugin. */ + ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, + command_id, true); + + rb->streamCount += 1; + rb->streamBytes += stream_bytes; + + /* Don't consider already streamed transaction. */ + rb->streamTxns += (txn_is_streamed) ? 0 : 1; + + /* update the decoding stats */ + UpdateDecodingStats((LogicalDecodingContext *) rb->private_data); + + Assert(dlist_is_empty(&txn->changes)); + Assert(txn->nentries == 0); + Assert(txn->nentries_mem == 0); +} + +/* + * Size of a change in memory. + */ +static Size +ReorderBufferChangeSize(ReorderBufferChange *change) +{ + Size sz = sizeof(ReorderBufferChange); + + switch (change->action) + { + /* fall through these, they're all similar enough */ + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: + { + ReorderBufferTupleBuf *oldtup, + *newtup; + Size oldlen = 0; + Size newlen = 0; + + oldtup = change->data.tp.oldtuple; + newtup = change->data.tp.newtuple; + + if (oldtup) + { + sz += sizeof(HeapTupleData); + oldlen = oldtup->tuple.t_len; + sz += oldlen; + } + + if (newtup) + { + sz += sizeof(HeapTupleData); + newlen = newtup->tuple.t_len; + sz += newlen; + } + + break; + } + case REORDER_BUFFER_CHANGE_MESSAGE: + { + Size prefix_size = strlen(change->data.msg.prefix) + 1; + + sz += prefix_size + change->data.msg.message_size + + sizeof(Size) + sizeof(Size); + + break; + } + case REORDER_BUFFER_CHANGE_INVALIDATION: + { + sz += sizeof(SharedInvalidationMessage) * + change->data.inval.ninvalidations; + break; + } + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: + { + Snapshot snap; + + snap = change->data.snapshot; + + sz += sizeof(SnapshotData) + + sizeof(TransactionId) * snap->xcnt + + sizeof(TransactionId) * snap->subxcnt; + + break; + } + case REORDER_BUFFER_CHANGE_TRUNCATE: + { + sz += sizeof(Oid) * change->data.truncate.nrelids; + + break; + } + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: + case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + /* ReorderBufferChange contains everything important */ + break; + } + + return sz; +} + + +/* + * Restore a number of changes spilled to disk back into memory. + */ +static Size +ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, + TXNEntryFile *file, XLogSegNo *segno) +{ + Size restored = 0; + XLogSegNo last_segno; + dlist_mutable_iter cleanup_iter; + File *fd = &file->vfd; + + Assert(txn->first_lsn != InvalidXLogRecPtr); + Assert(txn->final_lsn != InvalidXLogRecPtr); + + /* free current entries, so we have memory for more */ + dlist_foreach_modify(cleanup_iter, &txn->changes) + { + ReorderBufferChange *cleanup = + dlist_container(ReorderBufferChange, node, cleanup_iter.cur); + + dlist_delete(&cleanup->node); + ReorderBufferReturnChange(rb, cleanup, true); + } + txn->nentries_mem = 0; + Assert(dlist_is_empty(&txn->changes)); + + XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size); + + while (restored < max_changes_in_memory && *segno <= last_segno) + { + int readBytes; + ReorderBufferDiskChange *ondisk; + + if (*fd == -1) + { + char path[MAXPGPATH]; + + /* first time in */ + if (*segno == 0) + XLByteToSeg(txn->first_lsn, *segno, wal_segment_size); + + Assert(*segno != 0 || dlist_is_empty(&txn->changes)); + + /* + * No need to care about TLIs here, only used during a single run, + * so each LSN only maps to a specific WAL record. + */ + ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, + *segno); + + *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY); + + /* No harm in resetting the offset even in case of failure */ + file->curOffset = 0; + + if (*fd < 0 && errno == ENOENT) + { + *fd = -1; + (*segno)++; + continue; + } + else if (*fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); + } + + /* + * Read the statically sized part of a change which has information + * about the total size. If we couldn't read a record, we're at the + * end of this file. + */ + ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange)); + readBytes = FileRead(file->vfd, rb->outbuf, + sizeof(ReorderBufferDiskChange), + file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ); + + /* eof */ + if (readBytes == 0) + { + FileClose(*fd); + *fd = -1; + (*segno)++; + continue; + } + else if (readBytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from reorderbuffer spill file: %m"))); + else if (readBytes != sizeof(ReorderBufferDiskChange)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", + readBytes, + (uint32) sizeof(ReorderBufferDiskChange)))); + + file->curOffset += readBytes; + + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + ReorderBufferSerializeReserve(rb, + sizeof(ReorderBufferDiskChange) + ondisk->size); + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + readBytes = FileRead(file->vfd, + rb->outbuf + sizeof(ReorderBufferDiskChange), + ondisk->size - sizeof(ReorderBufferDiskChange), + file->curOffset, + WAIT_EVENT_REORDER_BUFFER_READ); + + if (readBytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from reorderbuffer spill file: %m"))); + else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", + readBytes, + (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange))))); + + file->curOffset += readBytes; + + /* + * ok, read a full change from disk, now restore it into proper + * in-memory format + */ + ReorderBufferRestoreChange(rb, txn, rb->outbuf); + restored++; + } + + return restored; +} + +/* + * Convert change from its on-disk format to in-memory format and queue it onto + * the TXN's ->changes list. + * + * Note: although "data" is declared char*, at entry it points to a + * maxalign'd buffer, making it safe in most of this function to assume + * that the pointed-to data is suitably aligned for direct access. + */ +static void +ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, + char *data) +{ + ReorderBufferDiskChange *ondisk; + ReorderBufferChange *change; + + ondisk = (ReorderBufferDiskChange *) data; + + change = ReorderBufferGetChange(rb); + + /* copy static part */ + memcpy(change, &ondisk->change, sizeof(ReorderBufferChange)); + + data += sizeof(ReorderBufferDiskChange); + + /* restore individual stuff */ + switch (change->action) + { + /* fall through these, they're all similar enough */ + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: + if (change->data.tp.oldtuple) + { + uint32 tuplelen = ((HeapTuple) data)->t_len; + + change->data.tp.oldtuple = + ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); + + /* restore ->tuple */ + memcpy(&change->data.tp.oldtuple->tuple, data, + sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + /* reset t_data pointer into the new tuplebuf */ + change->data.tp.oldtuple->tuple.t_data = + ReorderBufferTupleBufData(change->data.tp.oldtuple); + + /* restore tuple data itself */ + memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen); + data += tuplelen; + } + + if (change->data.tp.newtuple) + { + /* here, data might not be suitably aligned! */ + uint32 tuplelen; + + memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len), + sizeof(uint32)); + + change->data.tp.newtuple = + ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); + + /* restore ->tuple */ + memcpy(&change->data.tp.newtuple->tuple, data, + sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + /* reset t_data pointer into the new tuplebuf */ + change->data.tp.newtuple->tuple.t_data = + ReorderBufferTupleBufData(change->data.tp.newtuple); + + /* restore tuple data itself */ + memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen); + data += tuplelen; + } + + break; + case REORDER_BUFFER_CHANGE_MESSAGE: + { + Size prefix_size; + + /* read prefix */ + memcpy(&prefix_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.msg.prefix = MemoryContextAlloc(rb->context, + prefix_size); + memcpy(change->data.msg.prefix, data, prefix_size); + Assert(change->data.msg.prefix[prefix_size - 1] == '\0'); + data += prefix_size; + + /* read the message */ + memcpy(&change->data.msg.message_size, data, sizeof(Size)); + data += sizeof(Size); + change->data.msg.message = MemoryContextAlloc(rb->context, + change->data.msg.message_size); + memcpy(change->data.msg.message, data, + change->data.msg.message_size); + data += change->data.msg.message_size; + + break; + } + case REORDER_BUFFER_CHANGE_INVALIDATION: + { + Size inval_size = sizeof(SharedInvalidationMessage) * + change->data.inval.ninvalidations; + + change->data.inval.invalidations = + MemoryContextAlloc(rb->context, inval_size); + + /* read the message */ + memcpy(change->data.inval.invalidations, data, inval_size); + + break; + } + case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: + { + Snapshot oldsnap; + Snapshot newsnap; + Size size; + + oldsnap = (Snapshot) data; + + size = sizeof(SnapshotData) + + sizeof(TransactionId) * oldsnap->xcnt + + sizeof(TransactionId) * (oldsnap->subxcnt + 0); + + change->data.snapshot = MemoryContextAllocZero(rb->context, size); + + newsnap = change->data.snapshot; + + memcpy(newsnap, data, size); + newsnap->xip = (TransactionId *) + (((char *) newsnap) + sizeof(SnapshotData)); + newsnap->subxip = newsnap->xip + newsnap->xcnt; + newsnap->copied = true; + break; + } + /* the base struct contains all the data, easy peasy */ + case REORDER_BUFFER_CHANGE_TRUNCATE: + { + Oid *relids; + + relids = ReorderBufferGetRelids(rb, + change->data.truncate.nrelids); + memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid)); + change->data.truncate.relids = relids; + + break; + } + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: + case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: + case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + break; + } + + dlist_push_tail(&txn->changes, &change->node); + txn->nentries_mem++; + + /* + * Update memory accounting for the restored change. We need to do this + * although we don't check the memory limit when restoring the changes in + * this branch (we only do that when initially queueing the changes after + * decoding), because we will release the changes later, and that will + * update the accounting too (subtracting the size from the counters). And + * we don't want to underflow there. + */ + ReorderBufferChangeMemoryUpdate(rb, change, true, + ReorderBufferChangeSize(change)); +} + +/* + * Remove all on-disk stored for the passed in transaction. + */ +static void +ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + XLogSegNo first; + XLogSegNo cur; + XLogSegNo last; + + Assert(txn->first_lsn != InvalidXLogRecPtr); + Assert(txn->final_lsn != InvalidXLogRecPtr); + + XLByteToSeg(txn->first_lsn, first, wal_segment_size); + XLByteToSeg(txn->final_lsn, last, wal_segment_size); + + /* iterate over all possible filenames, and delete them */ + for (cur = first; cur <= last; cur++) + { + char path[MAXPGPATH]; + + ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur); + if (unlink(path) != 0 && errno != ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\": %m", path))); + } +} + +/* + * Remove any leftover serialized reorder buffers from a slot directory after a + * prior crash or decoding session exit. + */ +static void +ReorderBufferCleanupSerializedTXNs(const char *slotname) +{ + DIR *spill_dir; + struct dirent *spill_de; + struct stat statbuf; + char path[MAXPGPATH * 2 + 12]; + + sprintf(path, "pg_replslot/%s", slotname); + + /* we're only handling directories here, skip if it's not ours */ + if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) + return; + + spill_dir = AllocateDir(path); + while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL) + { + /* only look at names that can be ours */ + if (strncmp(spill_de->d_name, "xid", 3) == 0) + { + snprintf(path, sizeof(path), + "pg_replslot/%s/%s", slotname, + spill_de->d_name); + + if (unlink(path) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/xid*: %m", + path, slotname))); + } + } + FreeDir(spill_dir); +} + +/* + * Given a replication slot, transaction ID and segment number, fill in the + * corresponding spill file into 'path', which is a caller-owned buffer of size + * at least MAXPGPATH. + */ +static void +ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, + XLogSegNo segno) +{ + XLogRecPtr recptr; + + XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr); + + snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.spill", + NameStr(MyReplicationSlot->data.name), + xid, LSN_FORMAT_ARGS(recptr)); +} + +/* + * Delete all data spilled to disk after we've restarted/crashed. It will be + * recreated when the respective slots are reused. + */ +void +StartupReorderBuffer(void) +{ + DIR *logical_dir; + struct dirent *logical_de; + + logical_dir = AllocateDir("pg_replslot"); + while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL) + { + if (strcmp(logical_de->d_name, ".") == 0 || + strcmp(logical_de->d_name, "..") == 0) + continue; + + /* if it cannot be a slot, skip the directory */ + if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2)) + continue; + + /* + * ok, has to be a surviving logical slot, iterate and delete + * everything starting with xid-* + */ + ReorderBufferCleanupSerializedTXNs(logical_de->d_name); + } + FreeDir(logical_dir); +} + +/* --------------------------------------- + * toast reassembly support + * --------------------------------------- + */ + +/* + * Initialize per tuple toast reconstruction support. + */ +static void +ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + HASHCTL hash_ctl; + + Assert(txn->toast_hash == NULL); + + hash_ctl.keysize = sizeof(Oid); + hash_ctl.entrysize = sizeof(ReorderBufferToastEnt); + hash_ctl.hcxt = rb->context; + txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); +} + +/* + * Per toast-chunk handling for toast reconstruction + * + * Appends a toast chunk so we can reconstruct it when the tuple "owning" the + * toasted Datum comes along. + */ +static void +ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) +{ + ReorderBufferToastEnt *ent; + ReorderBufferTupleBuf *newtup; + bool found; + int32 chunksize; + bool isnull; + Pointer chunk; + TupleDesc desc = RelationGetDescr(relation); + Oid chunk_id; + int32 chunk_seq; + + if (txn->toast_hash == NULL) + ReorderBufferToastInitHash(rb, txn); + + Assert(IsToastRelation(relation)); + + newtup = change->data.tp.newtuple; + chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull)); + Assert(!isnull); + chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull)); + Assert(!isnull); + + ent = (ReorderBufferToastEnt *) + hash_search(txn->toast_hash, + (void *) &chunk_id, + HASH_ENTER, + &found); + + if (!found) + { + Assert(ent->chunk_id == chunk_id); + ent->num_chunks = 0; + ent->last_chunk_seq = 0; + ent->size = 0; + ent->reconstructed = NULL; + dlist_init(&ent->chunks); + + if (chunk_seq != 0) + elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0", + chunk_seq, chunk_id); + } + else if (found && chunk_seq != ent->last_chunk_seq + 1) + elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d", + chunk_seq, chunk_id, ent->last_chunk_seq + 1); + + chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull)); + Assert(!isnull); + + /* calculate size so we can allocate the right size at once later */ + if (!VARATT_IS_EXTENDED(chunk)) + chunksize = VARSIZE(chunk) - VARHDRSZ; + else if (VARATT_IS_SHORT(chunk)) + /* could happen due to heap_form_tuple doing its thing */ + chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT; + else + elog(ERROR, "unexpected type of toast chunk"); + + ent->size += chunksize; + ent->last_chunk_seq = chunk_seq; + ent->num_chunks++; + dlist_push_tail(&ent->chunks, &change->node); +} + +/* + * Rejigger change->newtuple to point to in-memory toast tuples instead to + * on-disk toast tuples that may not longer exist (think DROP TABLE or VACUUM). + * + * We cannot replace unchanged toast tuples though, so those will still point + * to on-disk toast data. + * + * While updating the existing change with detoasted tuple data, we need to + * update the memory accounting info, because the change size will differ. + * Otherwise the accounting may get out of sync, triggering serialization + * at unexpected times. + * + * We simply subtract size of the change before rejiggering the tuple, and + * then adding the new size. This makes it look like the change was removed + * and then added back, except it only tweaks the accounting info. + * + * In particular it can't trigger serialization, which would be pointless + * anyway as it happens during commit processing right before handing + * the change to the output plugin. + */ +static void +ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) +{ + TupleDesc desc; + int natt; + Datum *attrs; + bool *isnull; + bool *free; + HeapTuple tmphtup; + Relation toast_rel; + TupleDesc toast_desc; + MemoryContext oldcontext; + ReorderBufferTupleBuf *newtup; + Size old_size; + + /* no toast tuples changed */ + if (txn->toast_hash == NULL) + return; + + /* + * We're going to modify the size of the change. So, to make sure the + * accounting is correct we record the current change size and then after + * re-computing the change we'll subtract the recorded size and then + * re-add the new change size at the end. We don't immediately subtract + * the old size because if there is any error before we add the new size, + * we will release the changes and that will update the accounting info + * (subtracting the size from the counters). And we don't want to + * underflow there. + */ + old_size = ReorderBufferChangeSize(change); + + oldcontext = MemoryContextSwitchTo(rb->context); + + /* we should only have toast tuples in an INSERT or UPDATE */ + Assert(change->data.tp.newtuple); + + desc = RelationGetDescr(relation); + + toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid); + if (!RelationIsValid(toast_rel)) + elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")", + relation->rd_rel->reltoastrelid, RelationGetRelationName(relation)); + + toast_desc = RelationGetDescr(toast_rel); + + /* should we allocate from stack instead? */ + attrs = palloc0(sizeof(Datum) * desc->natts); + isnull = palloc0(sizeof(bool) * desc->natts); + free = palloc0(sizeof(bool) * desc->natts); + + newtup = change->data.tp.newtuple; + + heap_deform_tuple(&newtup->tuple, desc, attrs, isnull); + + for (natt = 0; natt < desc->natts; natt++) + { + Form_pg_attribute attr = TupleDescAttr(desc, natt); + ReorderBufferToastEnt *ent; + struct varlena *varlena; + + /* va_rawsize is the size of the original datum -- including header */ + struct varatt_external toast_pointer; + struct varatt_indirect redirect_pointer; + struct varlena *new_datum = NULL; + struct varlena *reconstructed; + dlist_iter it; + Size data_done = 0; + + /* system columns aren't toasted */ + if (attr->attnum < 0) + continue; + + if (attr->attisdropped) + continue; + + /* not a varlena datatype */ + if (attr->attlen != -1) + continue; + + /* no data */ + if (isnull[natt]) + continue; + + /* ok, we know we have a toast datum */ + varlena = (struct varlena *) DatumGetPointer(attrs[natt]); + + /* no need to do anything if the tuple isn't external */ + if (!VARATT_IS_EXTERNAL(varlena)) + continue; + + VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena); + + /* + * Check whether the toast tuple changed, replace if so. + */ + ent = (ReorderBufferToastEnt *) + hash_search(txn->toast_hash, + (void *) &toast_pointer.va_valueid, + HASH_FIND, + NULL); + if (ent == NULL) + continue; + + new_datum = + (struct varlena *) palloc0(INDIRECT_POINTER_SIZE); + + free[natt] = true; + + reconstructed = palloc0(toast_pointer.va_rawsize); + + ent->reconstructed = reconstructed; + + /* stitch toast tuple back together from its parts */ + dlist_foreach(it, &ent->chunks) + { + bool isnull; + ReorderBufferChange *cchange; + ReorderBufferTupleBuf *ctup; + Pointer chunk; + + cchange = dlist_container(ReorderBufferChange, node, it.cur); + ctup = cchange->data.tp.newtuple; + chunk = DatumGetPointer(fastgetattr(&ctup->tuple, 3, toast_desc, &isnull)); + + Assert(!isnull); + Assert(!VARATT_IS_EXTERNAL(chunk)); + Assert(!VARATT_IS_SHORT(chunk)); + + memcpy(VARDATA(reconstructed) + data_done, + VARDATA(chunk), + VARSIZE(chunk) - VARHDRSZ); + data_done += VARSIZE(chunk) - VARHDRSZ; + } + Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer)); + + /* make sure its marked as compressed or not */ + if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)) + SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ); + else + SET_VARSIZE(reconstructed, data_done + VARHDRSZ); + + memset(&redirect_pointer, 0, sizeof(redirect_pointer)); + redirect_pointer.pointer = reconstructed; + + SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT); + memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer, + sizeof(redirect_pointer)); + + attrs[natt] = PointerGetDatum(new_datum); + } + + /* + * Build tuple in separate memory & copy tuple back into the tuplebuf + * passed to the output plugin. We can't directly heap_fill_tuple() into + * the tuplebuf because attrs[] will point back into the current content. + */ + tmphtup = heap_form_tuple(desc, attrs, isnull); + Assert(newtup->tuple.t_len <= MaxHeapTupleSize); + Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data); + + memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len); + newtup->tuple.t_len = tmphtup->t_len; + + /* + * free resources we won't further need, more persistent stuff will be + * free'd in ReorderBufferToastReset(). + */ + RelationClose(toast_rel); + pfree(tmphtup); + for (natt = 0; natt < desc->natts; natt++) + { + if (free[natt]) + pfree(DatumGetPointer(attrs[natt])); + } + pfree(attrs); + pfree(free); + pfree(isnull); + + MemoryContextSwitchTo(oldcontext); + + /* subtract the old change size */ + ReorderBufferChangeMemoryUpdate(rb, change, false, old_size); + /* now add the change back, with the correct size */ + ReorderBufferChangeMemoryUpdate(rb, change, true, + ReorderBufferChangeSize(change)); +} + +/* + * Free all resources allocated for toast reconstruction. + */ +static void +ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn) +{ + HASH_SEQ_STATUS hstat; + ReorderBufferToastEnt *ent; + + if (txn->toast_hash == NULL) + return; + + /* sequentially walk over the hash and free everything */ + hash_seq_init(&hstat, txn->toast_hash); + while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL) + { + dlist_mutable_iter it; + + if (ent->reconstructed != NULL) + pfree(ent->reconstructed); + + dlist_foreach_modify(it, &ent->chunks) + { + ReorderBufferChange *change = + dlist_container(ReorderBufferChange, node, it.cur); + + dlist_delete(&change->node); + ReorderBufferReturnChange(rb, change, true); + } + } + + hash_destroy(txn->toast_hash); + txn->toast_hash = NULL; +} + + +/* --------------------------------------- + * Visibility support for logical decoding + * + * + * Lookup actual cmin/cmax values when using decoding snapshot. We can't + * always rely on stored cmin/cmax values because of two scenarios: + * + * * A tuple got changed multiple times during a single transaction and thus + * has got a combo CID. Combo CIDs are only valid for the duration of a + * single transaction. + * * A tuple with a cmin but no cmax (and thus no combo CID) got + * deleted/updated in another transaction than the one which created it + * which we are looking at right now. As only one of cmin, cmax or combo CID + * is actually stored in the heap we don't have access to the value we + * need anymore. + * + * To resolve those problems we have a per-transaction hash of (cmin, + * cmax) tuples keyed by (relfilenode, ctid) which contains the actual + * (cmin, cmax) values. That also takes care of combo CIDs by simply + * not caring about them at all. As we have the real cmin/cmax values + * combo CIDs aren't interesting. + * + * As we only care about catalog tuples here the overhead of this + * hashtable should be acceptable. + * + * Heap rewrites complicate this a bit, check rewriteheap.c for + * details. + * ------------------------------------------------------------------------- + */ + +/* struct for sorting mapping files by LSN efficiently */ +typedef struct RewriteMappingFile +{ + XLogRecPtr lsn; + char fname[MAXPGPATH]; +} RewriteMappingFile; + +#ifdef NOT_USED +static void +DisplayMapping(HTAB *tuplecid_data) +{ + HASH_SEQ_STATUS hstat; + ReorderBufferTupleCidEnt *ent; + + hash_seq_init(&hstat, tuplecid_data); + while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL) + { + elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u", + ent->key.relnode.dbNode, + ent->key.relnode.spcNode, + ent->key.relnode.relNode, + ItemPointerGetBlockNumber(&ent->key.tid), + ItemPointerGetOffsetNumber(&ent->key.tid), + ent->cmin, + ent->cmax + ); + } +} +#endif + +/* + * Apply a single mapping file to tuplecid_data. + * + * The mapping file has to have been verified to be a) committed b) for our + * transaction c) applied in LSN order. + */ +static void +ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname) +{ + char path[MAXPGPATH]; + int fd; + int readBytes; + LogicalRewriteMappingData map; + + sprintf(path, "pg_logical/mappings/%s", fname); + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + + while (true) + { + ReorderBufferTupleCidKey key; + ReorderBufferTupleCidEnt *ent; + ReorderBufferTupleCidEnt *new_ent; + bool found; + + /* be careful about padding */ + memset(&key, 0, sizeof(ReorderBufferTupleCidKey)); + + /* read all mappings till the end of the file */ + pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ); + readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData)); + pgstat_report_wait_end(); + + if (readBytes < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + path))); + else if (readBytes == 0) /* EOF */ + break; + else if (readBytes != sizeof(LogicalRewriteMappingData)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read from file \"%s\": read %d instead of %d bytes", + path, readBytes, + (int32) sizeof(LogicalRewriteMappingData)))); + + key.relnode = map.old_node; + ItemPointerCopy(&map.old_tid, + &key.tid); + + + ent = (ReorderBufferTupleCidEnt *) + hash_search(tuplecid_data, + (void *) &key, + HASH_FIND, + NULL); + + /* no existing mapping, no need to update */ + if (!ent) + continue; + + key.relnode = map.new_node; + ItemPointerCopy(&map.new_tid, + &key.tid); + + new_ent = (ReorderBufferTupleCidEnt *) + hash_search(tuplecid_data, + (void *) &key, + HASH_ENTER, + &found); + + if (found) + { + /* + * Make sure the existing mapping makes sense. We sometime update + * old records that did not yet have a cmax (e.g. pg_class' own + * entry while rewriting it) during rewrites, so allow that. + */ + Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin); + Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax); + } + else + { + /* update mapping */ + new_ent->cmin = ent->cmin; + new_ent->cmax = ent->cmax; + new_ent->combocid = ent->combocid; + } + } + + if (CloseTransientFile(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", path))); +} + + +/* + * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'. + */ +static bool +TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num) +{ + return bsearch(&xid, xip, num, + sizeof(TransactionId), xidComparator) != NULL; +} + +/* + * list_sort() comparator for sorting RewriteMappingFiles in LSN order. + */ +static int +file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p) +{ + RewriteMappingFile *a = (RewriteMappingFile *) lfirst(a_p); + RewriteMappingFile *b = (RewriteMappingFile *) lfirst(b_p); + + if (a->lsn < b->lsn) + return -1; + else if (a->lsn > b->lsn) + return 1; + return 0; +} + +/* + * Apply any existing logical remapping files if there are any targeted at our + * transaction for relid. + */ +static void +UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot) +{ + DIR *mapping_dir; + struct dirent *mapping_de; + List *files = NIL; + ListCell *file; + Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId; + + mapping_dir = AllocateDir("pg_logical/mappings"); + while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL) + { + Oid f_dboid; + Oid f_relid; + TransactionId f_mapped_xid; + TransactionId f_create_xid; + XLogRecPtr f_lsn; + uint32 f_hi, + f_lo; + RewriteMappingFile *f; + + if (strcmp(mapping_de->d_name, ".") == 0 || + strcmp(mapping_de->d_name, "..") == 0) + continue; + + /* Ignore files that aren't ours */ + if (strncmp(mapping_de->d_name, "map-", 4) != 0) + continue; + + if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT, + &f_dboid, &f_relid, &f_hi, &f_lo, + &f_mapped_xid, &f_create_xid) != 6) + elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name); + + f_lsn = ((uint64) f_hi) << 32 | f_lo; + + /* mapping for another database */ + if (f_dboid != dboid) + continue; + + /* mapping for another relation */ + if (f_relid != relid) + continue; + + /* did the creating transaction abort? */ + if (!TransactionIdDidCommit(f_create_xid)) + continue; + + /* not for our transaction */ + if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt)) + continue; + + /* ok, relevant, queue for apply */ + f = palloc(sizeof(RewriteMappingFile)); + f->lsn = f_lsn; + strcpy(f->fname, mapping_de->d_name); + files = lappend(files, f); + } + FreeDir(mapping_dir); + + /* sort files so we apply them in LSN order */ + list_sort(files, file_sort_by_lsn); + + foreach(file, files) + { + RewriteMappingFile *f = (RewriteMappingFile *) lfirst(file); + + elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname, + snapshot->subxip[0]); + ApplyLogicalMappingFile(tuplecid_data, relid, f->fname); + pfree(f); + } +} + +/* + * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on + * combo CIDs. + */ +bool +ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, + Snapshot snapshot, + HeapTuple htup, Buffer buffer, + CommandId *cmin, CommandId *cmax) +{ + ReorderBufferTupleCidKey key; + ReorderBufferTupleCidEnt *ent; + ForkNumber forkno; + BlockNumber blockno; + bool updated_mapping = false; + + /* + * Return unresolved if tuplecid_data is not valid. That's because when + * streaming in-progress transactions we may run into tuples with the CID + * before actually decoding them. Think e.g. about INSERT followed by + * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the + * INSERT. So in such cases, we assume the CID is from the future + * command. + */ + if (tuplecid_data == NULL) + return false; + + /* be careful about padding */ + memset(&key, 0, sizeof(key)); + + Assert(!BufferIsLocal(buffer)); + + /* + * get relfilenode from the buffer, no convenient way to access it other + * than that. + */ + BufferGetTag(buffer, &key.relnode, &forkno, &blockno); + + /* tuples can only be in the main fork */ + Assert(forkno == MAIN_FORKNUM); + Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self)); + + ItemPointerCopy(&htup->t_self, + &key.tid); + +restart: + ent = (ReorderBufferTupleCidEnt *) + hash_search(tuplecid_data, + (void *) &key, + HASH_FIND, + NULL); + + /* + * failed to find a mapping, check whether the table was rewritten and + * apply mapping if so, but only do that once - there can be no new + * mappings while we are in here since we have to hold a lock on the + * relation. + */ + if (ent == NULL && !updated_mapping) + { + UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot); + /* now check but don't update for a mapping again */ + updated_mapping = true; + goto restart; + } + else if (ent == NULL) + return false; + + if (cmin) + *cmin = ent->cmin; + if (cmax) + *cmax = ent->cmax; + return true; +} |