summaryrefslogtreecommitdiffstats
path: root/src/backend/executor
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/backend/executor/nodeAppend.c66
-rw-r--r--src/backend/executor/nodeHash.c13
-rw-r--r--src/backend/executor/nodeModifyTable.c32
3 files changed, 74 insertions, 37 deletions
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 357e10a..68f13f2 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -1016,43 +1016,51 @@ ExecAppendAsyncEventWait(AppendState *node)
/* We should never be called when there are no valid async subplans. */
Assert(node->as_nasyncremain > 0);
+ Assert(node->as_eventset == NULL);
node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, nevents);
- AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
-
- /* Give each waiting subplan a chance to add an event. */
- i = -1;
- while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ PG_TRY();
{
- AsyncRequest *areq = node->as_asyncrequests[i];
+ AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+ NULL, NULL);
- if (areq->callback_pending)
- ExecAsyncConfigureWait(areq);
- }
+ /* Give each waiting subplan a chance to add an event. */
+ i = -1;
+ while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
- /*
- * No need for further processing if there are no configured events other
- * than the postmaster death event.
- */
- if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
+ if (areq->callback_pending)
+ ExecAsyncConfigureWait(areq);
+ }
+
+ /*
+ * No need for further processing if there are no configured events
+ * other than the postmaster death event.
+ */
+ if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
+ {
+ FreeWaitEventSet(node->as_eventset);
+ node->as_eventset = NULL;
+ return;
+ }
+
+ /* Return at most EVENT_BUFFER_SIZE events in one call. */
+ if (nevents > EVENT_BUFFER_SIZE)
+ nevents = EVENT_BUFFER_SIZE;
+
+ /*
+ * If the timeout is -1, wait until at least one event occurs. If the
+ * timeout is 0, poll for events, but do not wait at all.
+ */
+ noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
+ nevents, WAIT_EVENT_APPEND_READY);
+ }
+ PG_FINALLY();
{
FreeWaitEventSet(node->as_eventset);
node->as_eventset = NULL;
- return;
}
-
- /* We wait on at most EVENT_BUFFER_SIZE events. */
- if (nevents > EVENT_BUFFER_SIZE)
- nevents = EVENT_BUFFER_SIZE;
-
- /*
- * If the timeout is -1, wait until at least one event occurs. If the
- * timeout is 0, poll for events, but do not wait at all.
- */
- noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
- nevents, WAIT_EVENT_APPEND_READY);
- FreeWaitEventSet(node->as_eventset);
- node->as_eventset = NULL;
+ PG_END_TRY();
if (noccurred == 0)
return;
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 59a720d..34dd9a2 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -1162,6 +1162,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
double dtuples;
double dbuckets;
int new_nbuckets;
+ uint32 max_buckets;
/*
* We probably also need a smaller bucket array. How many
@@ -1174,9 +1175,17 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
* array.
*/
dtuples = (old_batch0->ntuples * 2.0) / new_nbatch;
+
+ /*
+ * We need to calculate the maximum number of buckets to
+ * stay within the MaxAllocSize boundary. Round the
+ * maximum number to the previous power of 2 given that
+ * later we round the number to the next power of 2.
+ */
+ max_buckets = pg_prevpower2_32((uint32)
+ (MaxAllocSize / sizeof(dsa_pointer_atomic)));
dbuckets = ceil(dtuples / NTUP_PER_BUCKET);
- dbuckets = Min(dbuckets,
- MaxAllocSize / sizeof(dsa_pointer_atomic));
+ dbuckets = Min(dbuckets, max_buckets);
new_nbuckets = (int) dbuckets;
new_nbuckets = Max(new_nbuckets, 1024);
new_nbuckets = pg_nextpower2_32(new_nbuckets);
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 7f822ef..6fea82e 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -1404,6 +1404,7 @@ ExecDelete(ModifyTableContext *context,
bool processReturning,
bool changingPart,
bool canSetTag,
+ TM_Result *tmresult,
bool *tupleDeleted,
TupleTableSlot **epqreturnslot)
{
@@ -1420,7 +1421,7 @@ ExecDelete(ModifyTableContext *context,
* done if it says we are.
*/
if (!ExecDeletePrologue(context, resultRelInfo, tupleid, oldtuple,
- epqreturnslot, NULL))
+ epqreturnslot, tmresult))
return NULL;
/* INSTEAD OF ROW DELETE Triggers */
@@ -1475,6 +1476,9 @@ ExecDelete(ModifyTableContext *context,
ldelete:;
result = ExecDeleteAct(context, resultRelInfo, tupleid, changingPart);
+ if (tmresult)
+ *tmresult = result;
+
switch (result)
{
case TM_SelfModified:
@@ -1713,6 +1717,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context,
TupleTableSlot *slot,
bool canSetTag,
UpdateContext *updateCxt,
+ TM_Result *tmresult,
TupleTableSlot **retry_slot,
TupleTableSlot **inserted_tuple,
ResultRelInfo **insert_destrel)
@@ -1776,7 +1781,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context,
false, /* processReturning */
true, /* changingPart */
false, /* canSetTag */
- &tuple_deleted, &epqslot);
+ tmresult, &tuple_deleted, &epqslot);
/*
* For some reason if DELETE didn't happen (e.g. trigger prevented it, or
@@ -1808,7 +1813,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context,
* action entirely).
*/
if (context->relaction != NULL)
- return false;
+ return *tmresult == TM_Ok;
else if (TupIsNull(epqslot))
return true;
else
@@ -2013,6 +2018,7 @@ lreplace:
if (ExecCrossPartitionUpdate(context, resultRelInfo,
tupleid, oldtuple, slot,
canSetTag, updateCxt,
+ &result,
&retry_slot,
&inserted_tuple,
&insert_destrel))
@@ -2052,7 +2058,7 @@ lreplace:
* here; instead let it handle that on its own rules.
*/
if (context->relaction != NULL)
- return TM_Updated;
+ return result;
/*
* ExecCrossPartitionUpdate installed an updated version of the new
@@ -2879,7 +2885,21 @@ lmerge_matched:;
break; /* concurrent update/delete */
}
result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL,
- newslot, false, &updateCxt);
+ newslot, canSetTag, &updateCxt);
+
+ /*
+ * As in ExecUpdate(), if ExecUpdateAct() reports that a
+ * cross-partition update was done, then there's nothing else
+ * for us to do --- the UPDATE has been turned into a DELETE
+ * and an INSERT, and we must not perform any of the usual
+ * post-update tasks.
+ */
+ if (updateCxt.crossPartUpdate)
+ {
+ mtstate->mt_merge_updated += 1;
+ return true;
+ }
+
if (result == TM_Ok && updateCxt.updated)
{
ExecUpdateEpilogue(context, &updateCxt, resultRelInfo,
@@ -3808,7 +3828,7 @@ ExecModifyTable(PlanState *pstate)
case CMD_DELETE:
slot = ExecDelete(&context, resultRelInfo, tupleid, oldtuple,
- true, false, node->canSetTag, NULL, NULL);
+ true, false, node->canSetTag, NULL, NULL, NULL);
break;
case CMD_MERGE: