From ebe124eacd7c3faa36ed358e7cc1d7c5b419e5f6 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 4 May 2024 14:18:09 +0200 Subject: Merging upstream version 15.6. Signed-off-by: Daniel Baumann --- src/backend/executor/nodeAppend.c | 66 +++++++++++++++++++--------------- src/backend/executor/nodeHash.c | 13 +++++-- src/backend/executor/nodeModifyTable.c | 32 +++++++++++++---- 3 files changed, 74 insertions(+), 37 deletions(-) (limited to 'src/backend/executor') diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 357e10a..68f13f2 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -1016,43 +1016,51 @@ ExecAppendAsyncEventWait(AppendState *node) /* We should never be called when there are no valid async subplans. */ Assert(node->as_nasyncremain > 0); + Assert(node->as_eventset == NULL); node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, nevents); - AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, - NULL, NULL); - - /* Give each waiting subplan a chance to add an event. */ - i = -1; - while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) + PG_TRY(); { - AsyncRequest *areq = node->as_asyncrequests[i]; + AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); - if (areq->callback_pending) - ExecAsyncConfigureWait(areq); - } + /* Give each waiting subplan a chance to add an event. */ + i = -1; + while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; - /* - * No need for further processing if there are no configured events other - * than the postmaster death event. - */ - if (GetNumRegisteredWaitEvents(node->as_eventset) == 1) + if (areq->callback_pending) + ExecAsyncConfigureWait(areq); + } + + /* + * No need for further processing if there are no configured events + * other than the postmaster death event. + */ + if (GetNumRegisteredWaitEvents(node->as_eventset) == 1) + { + FreeWaitEventSet(node->as_eventset); + node->as_eventset = NULL; + return; + } + + /* Return at most EVENT_BUFFER_SIZE events in one call. */ + if (nevents > EVENT_BUFFER_SIZE) + nevents = EVENT_BUFFER_SIZE; + + /* + * If the timeout is -1, wait until at least one event occurs. If the + * timeout is 0, poll for events, but do not wait at all. + */ + noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event, + nevents, WAIT_EVENT_APPEND_READY); + } + PG_FINALLY(); { FreeWaitEventSet(node->as_eventset); node->as_eventset = NULL; - return; } - - /* We wait on at most EVENT_BUFFER_SIZE events. */ - if (nevents > EVENT_BUFFER_SIZE) - nevents = EVENT_BUFFER_SIZE; - - /* - * If the timeout is -1, wait until at least one event occurs. If the - * timeout is 0, poll for events, but do not wait at all. - */ - noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event, - nevents, WAIT_EVENT_APPEND_READY); - FreeWaitEventSet(node->as_eventset); - node->as_eventset = NULL; + PG_END_TRY(); if (noccurred == 0) return; diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 59a720d..34dd9a2 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -1162,6 +1162,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) double dtuples; double dbuckets; int new_nbuckets; + uint32 max_buckets; /* * We probably also need a smaller bucket array. How many @@ -1174,9 +1175,17 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) * array. */ dtuples = (old_batch0->ntuples * 2.0) / new_nbatch; + + /* + * We need to calculate the maximum number of buckets to + * stay within the MaxAllocSize boundary. Round the + * maximum number to the previous power of 2 given that + * later we round the number to the next power of 2. + */ + max_buckets = pg_prevpower2_32((uint32) + (MaxAllocSize / sizeof(dsa_pointer_atomic))); dbuckets = ceil(dtuples / NTUP_PER_BUCKET); - dbuckets = Min(dbuckets, - MaxAllocSize / sizeof(dsa_pointer_atomic)); + dbuckets = Min(dbuckets, max_buckets); new_nbuckets = (int) dbuckets; new_nbuckets = Max(new_nbuckets, 1024); new_nbuckets = pg_nextpower2_32(new_nbuckets); diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 7f822ef..6fea82e 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -1404,6 +1404,7 @@ ExecDelete(ModifyTableContext *context, bool processReturning, bool changingPart, bool canSetTag, + TM_Result *tmresult, bool *tupleDeleted, TupleTableSlot **epqreturnslot) { @@ -1420,7 +1421,7 @@ ExecDelete(ModifyTableContext *context, * done if it says we are. */ if (!ExecDeletePrologue(context, resultRelInfo, tupleid, oldtuple, - epqreturnslot, NULL)) + epqreturnslot, tmresult)) return NULL; /* INSTEAD OF ROW DELETE Triggers */ @@ -1475,6 +1476,9 @@ ExecDelete(ModifyTableContext *context, ldelete:; result = ExecDeleteAct(context, resultRelInfo, tupleid, changingPart); + if (tmresult) + *tmresult = result; + switch (result) { case TM_SelfModified: @@ -1713,6 +1717,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context, TupleTableSlot *slot, bool canSetTag, UpdateContext *updateCxt, + TM_Result *tmresult, TupleTableSlot **retry_slot, TupleTableSlot **inserted_tuple, ResultRelInfo **insert_destrel) @@ -1776,7 +1781,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context, false, /* processReturning */ true, /* changingPart */ false, /* canSetTag */ - &tuple_deleted, &epqslot); + tmresult, &tuple_deleted, &epqslot); /* * For some reason if DELETE didn't happen (e.g. trigger prevented it, or @@ -1808,7 +1813,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context, * action entirely). */ if (context->relaction != NULL) - return false; + return *tmresult == TM_Ok; else if (TupIsNull(epqslot)) return true; else @@ -2013,6 +2018,7 @@ lreplace: if (ExecCrossPartitionUpdate(context, resultRelInfo, tupleid, oldtuple, slot, canSetTag, updateCxt, + &result, &retry_slot, &inserted_tuple, &insert_destrel)) @@ -2052,7 +2058,7 @@ lreplace: * here; instead let it handle that on its own rules. */ if (context->relaction != NULL) - return TM_Updated; + return result; /* * ExecCrossPartitionUpdate installed an updated version of the new @@ -2879,7 +2885,21 @@ lmerge_matched:; break; /* concurrent update/delete */ } result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL, - newslot, false, &updateCxt); + newslot, canSetTag, &updateCxt); + + /* + * As in ExecUpdate(), if ExecUpdateAct() reports that a + * cross-partition update was done, then there's nothing else + * for us to do --- the UPDATE has been turned into a DELETE + * and an INSERT, and we must not perform any of the usual + * post-update tasks. + */ + if (updateCxt.crossPartUpdate) + { + mtstate->mt_merge_updated += 1; + return true; + } + if (result == TM_Ok && updateCxt.updated) { ExecUpdateEpilogue(context, &updateCxt, resultRelInfo, @@ -3808,7 +3828,7 @@ ExecModifyTable(PlanState *pstate) case CMD_DELETE: slot = ExecDelete(&context, resultRelInfo, tupleid, oldtuple, - true, false, node->canSetTag, NULL, NULL); + true, false, node->canSetTag, NULL, NULL, NULL); break; case CMD_MERGE: -- cgit v1.2.3