diff options
Diffstat (limited to 'src/backend/executor/nodeAppend.c')
-rw-r--r-- | src/backend/executor/nodeAppend.c | 1186 |
1 files changed, 1186 insertions, 0 deletions
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c new file mode 100644 index 0000000..6a2daa6 --- /dev/null +++ b/src/backend/executor/nodeAppend.c @@ -0,0 +1,1186 @@ +/*------------------------------------------------------------------------- + * + * nodeAppend.c + * routines to handle append nodes. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/executor/nodeAppend.c + * + *------------------------------------------------------------------------- + */ +/* INTERFACE ROUTINES + * ExecInitAppend - initialize the append node + * ExecAppend - retrieve the next tuple from the node + * ExecEndAppend - shut down the append node + * ExecReScanAppend - rescan the append node + * + * NOTES + * Each append node contains a list of one or more subplans which + * must be iteratively processed (forwards or backwards). + * Tuples are retrieved by executing the 'whichplan'th subplan + * until the subplan stops returning tuples, at which point that + * plan is shut down and the next started up. + * + * Append nodes don't make use of their left and right + * subtrees, rather they maintain a list of subplans so + * a typical append node looks like this in the plan tree: + * + * ... + * / + * Append -------+------+------+--- nil + * / \ | | | + * nil nil ... ... ... + * subplans + * + * Append nodes are currently used for unions, and to support + * inheritance queries, where several relations need to be scanned. + * For example, in our standard person/student/employee/student-emp + * example, where student and employee inherit from person + * and student-emp inherits from student and employee, the + * query: + * + * select name from person + * + * generates the plan: + * + * | + * Append -------+-------+--------+--------+ + * / \ | | | | + * nil nil Scan Scan Scan Scan + * | | | | + * person employee student student-emp + */ + +#include "postgres.h" + +#include "executor/execAsync.h" +#include "executor/execdebug.h" +#include "executor/execPartition.h" +#include "executor/nodeAppend.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "storage/latch.h" + +/* Shared state for parallel-aware Append. */ +struct ParallelAppendState +{ + LWLock pa_lock; /* mutual exclusion to choose next subplan */ + int pa_next_plan; /* next plan to choose by any worker */ + + /* + * pa_finished[i] should be true if no more workers should select subplan + * i. for a non-partial plan, this should be set to true as soon as a + * worker selects the plan; for a partial plan, it remains false until + * some worker executes the plan to completion. + */ + bool pa_finished[FLEXIBLE_ARRAY_MEMBER]; +}; + +#define INVALID_SUBPLAN_INDEX -1 +#define EVENT_BUFFER_SIZE 16 + +static TupleTableSlot *ExecAppend(PlanState *pstate); +static bool choose_next_subplan_locally(AppendState *node); +static bool choose_next_subplan_for_leader(AppendState *node); +static bool choose_next_subplan_for_worker(AppendState *node); +static void mark_invalid_subplans_as_finished(AppendState *node); +static void ExecAppendAsyncBegin(AppendState *node); +static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result); +static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result); +static void ExecAppendAsyncEventWait(AppendState *node); +static void classify_matching_subplans(AppendState *node); + +/* ---------------------------------------------------------------- + * ExecInitAppend + * + * Begin all of the subscans of the append node. + * + * (This is potentially wasteful, since the entire result of the + * append node may not be scanned, but this way all of the + * structures get allocated in the executor's top level memory + * block instead of that of the call to ExecAppend.) + * ---------------------------------------------------------------- + */ +AppendState * +ExecInitAppend(Append *node, EState *estate, int eflags) +{ + AppendState *appendstate = makeNode(AppendState); + PlanState **appendplanstates; + Bitmapset *validsubplans; + Bitmapset *asyncplans; + int nplans; + int nasyncplans; + int firstvalid; + int i, + j; + + /* check for unsupported flags */ + Assert(!(eflags & EXEC_FLAG_MARK)); + + /* + * create new AppendState for our append node + */ + appendstate->ps.plan = (Plan *) node; + appendstate->ps.state = estate; + appendstate->ps.ExecProcNode = ExecAppend; + + /* Let choose_next_subplan_* function handle setting the first subplan */ + appendstate->as_whichplan = INVALID_SUBPLAN_INDEX; + appendstate->as_syncdone = false; + appendstate->as_begun = false; + + /* If run-time partition pruning is enabled, then set that up now */ + if (node->part_prune_info != NULL) + { + PartitionPruneState *prunestate; + + /* We may need an expression context to evaluate partition exprs */ + ExecAssignExprContext(estate, &appendstate->ps); + + /* Create the working data structure for pruning. */ + prunestate = ExecCreatePartitionPruneState(&appendstate->ps, + node->part_prune_info); + appendstate->as_prune_state = prunestate; + + /* Perform an initial partition prune, if required. */ + if (prunestate->do_initial_prune) + { + /* Determine which subplans survive initial pruning */ + validsubplans = ExecFindInitialMatchingSubPlans(prunestate, + list_length(node->appendplans)); + + nplans = bms_num_members(validsubplans); + } + else + { + /* We'll need to initialize all subplans */ + nplans = list_length(node->appendplans); + Assert(nplans > 0); + validsubplans = bms_add_range(NULL, 0, nplans - 1); + } + + /* + * When no run-time pruning is required and there's at least one + * subplan, we can fill as_valid_subplans immediately, preventing + * later calls to ExecFindMatchingSubPlans. + */ + if (!prunestate->do_exec_prune && nplans > 0) + appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1); + } + else + { + nplans = list_length(node->appendplans); + + /* + * When run-time partition pruning is not enabled we can just mark all + * subplans as valid; they must also all be initialized. + */ + Assert(nplans > 0); + appendstate->as_valid_subplans = validsubplans = + bms_add_range(NULL, 0, nplans - 1); + appendstate->as_prune_state = NULL; + } + + /* + * Initialize result tuple type and slot. + */ + ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual); + + /* node returns slots from each of its subnodes, therefore not fixed */ + appendstate->ps.resultopsset = true; + appendstate->ps.resultopsfixed = false; + + appendplanstates = (PlanState **) palloc(nplans * + sizeof(PlanState *)); + + /* + * call ExecInitNode on each of the valid plans to be executed and save + * the results into the appendplanstates array. + * + * While at it, find out the first valid partial plan. + */ + j = 0; + asyncplans = NULL; + nasyncplans = 0; + firstvalid = nplans; + i = -1; + while ((i = bms_next_member(validsubplans, i)) >= 0) + { + Plan *initNode = (Plan *) list_nth(node->appendplans, i); + + /* + * Record async subplans. When executing EvalPlanQual, we treat them + * as sync ones; don't do this when initializing an EvalPlanQual plan + * tree. + */ + if (initNode->async_capable && estate->es_epq_active == NULL) + { + asyncplans = bms_add_member(asyncplans, j); + nasyncplans++; + } + + /* + * Record the lowest appendplans index which is a valid partial plan. + */ + if (i >= node->first_partial_plan && j < firstvalid) + firstvalid = j; + + appendplanstates[j++] = ExecInitNode(initNode, estate, eflags); + } + + appendstate->as_first_partial_plan = firstvalid; + appendstate->appendplans = appendplanstates; + appendstate->as_nplans = nplans; + + /* Initialize async state */ + appendstate->as_asyncplans = asyncplans; + appendstate->as_nasyncplans = nasyncplans; + appendstate->as_asyncrequests = NULL; + appendstate->as_asyncresults = NULL; + appendstate->as_nasyncresults = 0; + appendstate->as_nasyncremain = 0; + appendstate->as_needrequest = NULL; + appendstate->as_eventset = NULL; + appendstate->as_valid_asyncplans = NULL; + + if (nasyncplans > 0) + { + appendstate->as_asyncrequests = (AsyncRequest **) + palloc0(nplans * sizeof(AsyncRequest *)); + + i = -1; + while ((i = bms_next_member(asyncplans, i)) >= 0) + { + AsyncRequest *areq; + + areq = palloc(sizeof(AsyncRequest)); + areq->requestor = (PlanState *) appendstate; + areq->requestee = appendplanstates[i]; + areq->request_index = i; + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + + appendstate->as_asyncrequests[i] = areq; + } + + appendstate->as_asyncresults = (TupleTableSlot **) + palloc0(nasyncplans * sizeof(TupleTableSlot *)); + + if (appendstate->as_valid_subplans != NULL) + classify_matching_subplans(appendstate); + } + + /* + * Miscellaneous initialization + */ + + appendstate->ps.ps_ProjInfo = NULL; + + /* For parallel query, this will be overridden later. */ + appendstate->choose_next_subplan = choose_next_subplan_locally; + + return appendstate; +} + +/* ---------------------------------------------------------------- + * ExecAppend + * + * Handles iteration over multiple subplans. + * ---------------------------------------------------------------- + */ +static TupleTableSlot * +ExecAppend(PlanState *pstate) +{ + AppendState *node = castNode(AppendState, pstate); + TupleTableSlot *result; + + /* + * If this is the first call after Init or ReScan, we need to do the + * initialization work. + */ + if (!node->as_begun) + { + Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX); + Assert(!node->as_syncdone); + + /* Nothing to do if there are no subplans */ + if (node->as_nplans == 0) + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + + /* If there are any async subplans, begin executing them. */ + if (node->as_nasyncplans > 0) + ExecAppendAsyncBegin(node); + + /* + * If no sync subplan has been chosen, we must choose one before + * proceeding. + */ + if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + + Assert(node->as_syncdone || + (node->as_whichplan >= 0 && + node->as_whichplan < node->as_nplans)); + + /* And we're initialized. */ + node->as_begun = true; + } + + for (;;) + { + PlanState *subnode; + + CHECK_FOR_INTERRUPTS(); + + /* + * try to get a tuple from an async subplan if any + */ + if (node->as_syncdone || !bms_is_empty(node->as_needrequest)) + { + if (ExecAppendAsyncGetNext(node, &result)) + return result; + Assert(!node->as_syncdone); + Assert(bms_is_empty(node->as_needrequest)); + } + + /* + * figure out which sync subplan we are currently processing + */ + Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans); + subnode = node->appendplans[node->as_whichplan]; + + /* + * get a tuple from the subplan + */ + result = ExecProcNode(subnode); + + if (!TupIsNull(result)) + { + /* + * If the subplan gave us something then return it as-is. We do + * NOT make use of the result slot that was set up in + * ExecInitAppend; there's no need for it. + */ + return result; + } + + /* + * wait or poll for async events if any. We do this before checking + * for the end of iteration, because it might drain the remaining + * async subplans. + */ + if (node->as_nasyncremain > 0) + ExecAppendAsyncEventWait(node); + + /* choose new sync subplan; if no sync/async subplans, we're done */ + if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + } +} + +/* ---------------------------------------------------------------- + * ExecEndAppend + * + * Shuts down the subscans of the append node. + * + * Returns nothing of interest. + * ---------------------------------------------------------------- + */ +void +ExecEndAppend(AppendState *node) +{ + PlanState **appendplans; + int nplans; + int i; + + /* + * get information from the node + */ + appendplans = node->appendplans; + nplans = node->as_nplans; + + /* + * shut down each of the subscans + */ + for (i = 0; i < nplans; i++) + ExecEndNode(appendplans[i]); +} + +void +ExecReScanAppend(AppendState *node) +{ + int nasyncplans = node->as_nasyncplans; + int i; + + /* + * If any PARAM_EXEC Params used in pruning expressions have changed, then + * we'd better unset the valid subplans so that they are reselected for + * the new parameter values. + */ + if (node->as_prune_state && + bms_overlap(node->ps.chgParam, + node->as_prune_state->execparamids)) + { + bms_free(node->as_valid_subplans); + node->as_valid_subplans = NULL; + if (nasyncplans > 0) + { + bms_free(node->as_valid_asyncplans); + node->as_valid_asyncplans = NULL; + } + } + + for (i = 0; i < node->as_nplans; i++) + { + PlanState *subnode = node->appendplans[i]; + + /* + * ExecReScan doesn't know about my subplans, so I have to do + * changed-parameter signaling myself. + */ + if (node->ps.chgParam != NULL) + UpdateChangedParamSet(subnode, node->ps.chgParam); + + /* + * If chgParam of subnode is not null then plan will be re-scanned by + * first ExecProcNode or by first ExecAsyncRequest. + */ + if (subnode->chgParam == NULL) + ExecReScan(subnode); + } + + /* Reset async state */ + if (nasyncplans > 0) + { + i = -1; + while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + } + + node->as_nasyncresults = 0; + node->as_nasyncremain = 0; + bms_free(node->as_needrequest); + node->as_needrequest = NULL; + } + + /* Let choose_next_subplan_* function handle setting the first subplan */ + node->as_whichplan = INVALID_SUBPLAN_INDEX; + node->as_syncdone = false; + node->as_begun = false; +} + +/* ---------------------------------------------------------------- + * Parallel Append Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecAppendEstimate + * + * Compute the amount of space we'll need in the parallel + * query DSM, and inform pcxt->estimator about our needs. + * ---------------------------------------------------------------- + */ +void +ExecAppendEstimate(AppendState *node, + ParallelContext *pcxt) +{ + node->pstate_len = + add_size(offsetof(ParallelAppendState, pa_finished), + sizeof(bool) * node->as_nplans); + + shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + + +/* ---------------------------------------------------------------- + * ExecAppendInitializeDSM + * + * Set up shared state for Parallel Append. + * ---------------------------------------------------------------- + */ +void +ExecAppendInitializeDSM(AppendState *node, + ParallelContext *pcxt) +{ + ParallelAppendState *pstate; + + pstate = shm_toc_allocate(pcxt->toc, node->pstate_len); + memset(pstate, 0, node->pstate_len); + LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND); + shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate); + + node->as_pstate = pstate; + node->choose_next_subplan = choose_next_subplan_for_leader; +} + +/* ---------------------------------------------------------------- + * ExecAppendReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt) +{ + ParallelAppendState *pstate = node->as_pstate; + + pstate->pa_next_plan = 0; + memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans); +} + +/* ---------------------------------------------------------------- + * ExecAppendInitializeWorker + * + * Copy relevant information from TOC into planstate, and initialize + * whatever is required to choose and execute the optimal subplan. + * ---------------------------------------------------------------- + */ +void +ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt) +{ + node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false); + node->choose_next_subplan = choose_next_subplan_for_worker; +} + +/* ---------------------------------------------------------------- + * choose_next_subplan_locally + * + * Choose next sync subplan for a non-parallel-aware Append, + * returning false if there are no more. + * ---------------------------------------------------------------- + */ +static bool +choose_next_subplan_locally(AppendState *node) +{ + int whichplan = node->as_whichplan; + int nextplan; + + /* We should never be called when there are no subplans */ + Assert(node->as_nplans > 0); + + /* Nothing to do if syncdone */ + if (node->as_syncdone) + return false; + + /* + * If first call then have the bms member function choose the first valid + * sync subplan by initializing whichplan to -1. If there happen to be no + * valid sync subplans then the bms member function will handle that by + * returning a negative number which will allow us to exit returning a + * false value. + */ + if (whichplan == INVALID_SUBPLAN_INDEX) + { + if (node->as_nasyncplans > 0) + { + /* We'd have filled as_valid_subplans already */ + Assert(node->as_valid_subplans); + } + else if (node->as_valid_subplans == NULL) + node->as_valid_subplans = + ExecFindMatchingSubPlans(node->as_prune_state); + + whichplan = -1; + } + + /* Ensure whichplan is within the expected range */ + Assert(whichplan >= -1 && whichplan <= node->as_nplans); + + if (ScanDirectionIsForward(node->ps.state->es_direction)) + nextplan = bms_next_member(node->as_valid_subplans, whichplan); + else + nextplan = bms_prev_member(node->as_valid_subplans, whichplan); + + if (nextplan < 0) + { + /* Set as_syncdone if in async mode */ + if (node->as_nasyncplans > 0) + node->as_syncdone = true; + return false; + } + + node->as_whichplan = nextplan; + + return true; +} + +/* ---------------------------------------------------------------- + * choose_next_subplan_for_leader + * + * Try to pick a plan which doesn't commit us to doing much + * work locally, so that as much work as possible is done in + * the workers. Cheapest subplans are at the end. + * ---------------------------------------------------------------- + */ +static bool +choose_next_subplan_for_leader(AppendState *node) +{ + ParallelAppendState *pstate = node->as_pstate; + + /* Backward scan is not supported by parallel-aware plans */ + Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + + /* We should never be called when there are no subplans */ + Assert(node->as_nplans > 0); + + LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); + + if (node->as_whichplan != INVALID_SUBPLAN_INDEX) + { + /* Mark just-completed subplan as finished. */ + node->as_pstate->pa_finished[node->as_whichplan] = true; + } + else + { + /* Start with last subplan. */ + node->as_whichplan = node->as_nplans - 1; + + /* + * If we've yet to determine the valid subplans then do so now. If + * run-time pruning is disabled then the valid subplans will always be + * set to all subplans. + */ + if (node->as_valid_subplans == NULL) + { + node->as_valid_subplans = + ExecFindMatchingSubPlans(node->as_prune_state); + + /* + * Mark each invalid plan as finished to allow the loop below to + * select the first valid subplan. + */ + mark_invalid_subplans_as_finished(node); + } + } + + /* Loop until we find a subplan to execute. */ + while (pstate->pa_finished[node->as_whichplan]) + { + if (node->as_whichplan == 0) + { + pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; + node->as_whichplan = INVALID_SUBPLAN_INDEX; + LWLockRelease(&pstate->pa_lock); + return false; + } + + /* + * We needn't pay attention to as_valid_subplans here as all invalid + * plans have been marked as finished. + */ + node->as_whichplan--; + } + + /* If non-partial, immediately mark as finished. */ + if (node->as_whichplan < node->as_first_partial_plan) + node->as_pstate->pa_finished[node->as_whichplan] = true; + + LWLockRelease(&pstate->pa_lock); + + return true; +} + +/* ---------------------------------------------------------------- + * choose_next_subplan_for_worker + * + * Choose next subplan for a parallel-aware Append, returning + * false if there are no more. + * + * We start from the first plan and advance through the list; + * when we get back to the end, we loop back to the first + * partial plan. This assigns the non-partial plans first in + * order of descending cost and then spreads out the workers + * as evenly as possible across the remaining partial plans. + * ---------------------------------------------------------------- + */ +static bool +choose_next_subplan_for_worker(AppendState *node) +{ + ParallelAppendState *pstate = node->as_pstate; + + /* Backward scan is not supported by parallel-aware plans */ + Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + + /* We should never be called when there are no subplans */ + Assert(node->as_nplans > 0); + + LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); + + /* Mark just-completed subplan as finished. */ + if (node->as_whichplan != INVALID_SUBPLAN_INDEX) + node->as_pstate->pa_finished[node->as_whichplan] = true; + + /* + * If we've yet to determine the valid subplans then do so now. If + * run-time pruning is disabled then the valid subplans will always be set + * to all subplans. + */ + else if (node->as_valid_subplans == NULL) + { + node->as_valid_subplans = + ExecFindMatchingSubPlans(node->as_prune_state); + mark_invalid_subplans_as_finished(node); + } + + /* If all the plans are already done, we have nothing to do */ + if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX) + { + LWLockRelease(&pstate->pa_lock); + return false; + } + + /* Save the plan from which we are starting the search. */ + node->as_whichplan = pstate->pa_next_plan; + + /* Loop until we find a valid subplan to execute. */ + while (pstate->pa_finished[pstate->pa_next_plan]) + { + int nextplan; + + nextplan = bms_next_member(node->as_valid_subplans, + pstate->pa_next_plan); + if (nextplan >= 0) + { + /* Advance to the next valid plan. */ + pstate->pa_next_plan = nextplan; + } + else if (node->as_whichplan > node->as_first_partial_plan) + { + /* + * Try looping back to the first valid partial plan, if there is + * one. If there isn't, arrange to bail out below. + */ + nextplan = bms_next_member(node->as_valid_subplans, + node->as_first_partial_plan - 1); + pstate->pa_next_plan = + nextplan < 0 ? node->as_whichplan : nextplan; + } + else + { + /* + * At last plan, and either there are no partial plans or we've + * tried them all. Arrange to bail out. + */ + pstate->pa_next_plan = node->as_whichplan; + } + + if (pstate->pa_next_plan == node->as_whichplan) + { + /* We've tried everything! */ + pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; + LWLockRelease(&pstate->pa_lock); + return false; + } + } + + /* Pick the plan we found, and advance pa_next_plan one more time. */ + node->as_whichplan = pstate->pa_next_plan; + pstate->pa_next_plan = bms_next_member(node->as_valid_subplans, + pstate->pa_next_plan); + + /* + * If there are no more valid plans then try setting the next plan to the + * first valid partial plan. + */ + if (pstate->pa_next_plan < 0) + { + int nextplan = bms_next_member(node->as_valid_subplans, + node->as_first_partial_plan - 1); + + if (nextplan >= 0) + pstate->pa_next_plan = nextplan; + else + { + /* + * There are no valid partial plans, and we already chose the last + * non-partial plan; so flag that there's nothing more for our + * fellow workers to do. + */ + pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; + } + } + + /* If non-partial, immediately mark as finished. */ + if (node->as_whichplan < node->as_first_partial_plan) + node->as_pstate->pa_finished[node->as_whichplan] = true; + + LWLockRelease(&pstate->pa_lock); + + return true; +} + +/* + * mark_invalid_subplans_as_finished + * Marks the ParallelAppendState's pa_finished as true for each invalid + * subplan. + * + * This function should only be called for parallel Append with run-time + * pruning enabled. + */ +static void +mark_invalid_subplans_as_finished(AppendState *node) +{ + int i; + + /* Only valid to call this while in parallel Append mode */ + Assert(node->as_pstate); + + /* Shouldn't have been called when run-time pruning is not enabled */ + Assert(node->as_prune_state); + + /* Nothing to do if all plans are valid */ + if (bms_num_members(node->as_valid_subplans) == node->as_nplans) + return; + + /* Mark all non-valid plans as finished */ + for (i = 0; i < node->as_nplans; i++) + { + if (!bms_is_member(i, node->as_valid_subplans)) + node->as_pstate->pa_finished[i] = true; + } +} + +/* ---------------------------------------------------------------- + * Asynchronous Append Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecAppendAsyncBegin + * + * Begin executing designed async-capable subplans. + * ---------------------------------------------------------------- + */ +static void +ExecAppendAsyncBegin(AppendState *node) +{ + int i; + + /* Backward scan is not supported by async-aware Appends. */ + Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + + /* We should never be called when there are no subplans */ + Assert(node->as_nplans > 0); + + /* We should never be called when there are no async subplans. */ + Assert(node->as_nasyncplans > 0); + + /* If we've yet to determine the valid subplans then do so now. */ + if (node->as_valid_subplans == NULL) + { + node->as_valid_subplans = + ExecFindMatchingSubPlans(node->as_prune_state); + + classify_matching_subplans(node); + } + + /* Initialize state variables. */ + node->as_syncdone = bms_is_empty(node->as_valid_subplans); + node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans); + + /* Nothing to do if there are no valid async subplans. */ + if (node->as_nasyncremain == 0) + return; + + /* Make a request for each of the valid async subplans. */ + i = -1; + while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + Assert(areq->request_index == i); + Assert(!areq->callback_pending); + + /* Do the actual work. */ + ExecAsyncRequest(areq); + } +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncGetNext + * + * Get the next tuple from any of the asynchronous subplans. + * ---------------------------------------------------------------- + */ +static bool +ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result) +{ + *result = NULL; + + /* We should never be called when there are no valid async subplans. */ + Assert(node->as_nasyncremain > 0); + + /* Request a tuple asynchronously. */ + if (ExecAppendAsyncRequest(node, result)) + return true; + + while (node->as_nasyncremain > 0) + { + CHECK_FOR_INTERRUPTS(); + + /* Wait or poll for async events. */ + ExecAppendAsyncEventWait(node); + + /* Request a tuple asynchronously. */ + if (ExecAppendAsyncRequest(node, result)) + return true; + + /* Break from loop if there's any sync subplan that isn't complete. */ + if (!node->as_syncdone) + break; + } + + /* + * If all sync subplans are complete, we're totally done scanning the + * given node. Otherwise, we're done with the asynchronous stuff but must + * continue scanning the sync subplans. + */ + if (node->as_syncdone) + { + Assert(node->as_nasyncremain == 0); + *result = ExecClearTuple(node->ps.ps_ResultTupleSlot); + return true; + } + + return false; +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncRequest + * + * Request a tuple asynchronously. + * ---------------------------------------------------------------- + */ +static bool +ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) +{ + Bitmapset *needrequest; + int i; + + /* Nothing to do if there are no async subplans needing a new request. */ + if (bms_is_empty(node->as_needrequest)) + { + Assert(node->as_nasyncresults == 0); + return false; + } + + /* + * If there are any asynchronously-generated results that have not yet + * been returned, we have nothing to do; just return one of them. + */ + if (node->as_nasyncresults > 0) + { + --node->as_nasyncresults; + *result = node->as_asyncresults[node->as_nasyncresults]; + return true; + } + + /* Make a new request for each of the async subplans that need it. */ + needrequest = node->as_needrequest; + node->as_needrequest = NULL; + i = -1; + while ((i = bms_next_member(needrequest, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + /* Do the actual work. */ + ExecAsyncRequest(areq); + } + bms_free(needrequest); + + /* Return one of the asynchronously-generated results if any. */ + if (node->as_nasyncresults > 0) + { + --node->as_nasyncresults; + *result = node->as_asyncresults[node->as_nasyncresults]; + return true; + } + + return false; +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncEventWait + * + * Wait or poll for file descriptor events and fire callbacks. + * ---------------------------------------------------------------- + */ +static void +ExecAppendAsyncEventWait(AppendState *node) +{ + int nevents = node->as_nasyncplans + 1; + long timeout = node->as_syncdone ? -1 : 0; + WaitEvent occurred_event[EVENT_BUFFER_SIZE]; + int noccurred; + int i; + + /* We should never be called when there are no valid async subplans. */ + Assert(node->as_nasyncremain > 0); + + node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, nevents); + AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); + + /* Give each waiting subplan a chance to add an event. */ + i = -1; + while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + if (areq->callback_pending) + ExecAsyncConfigureWait(areq); + } + + /* + * No need for further processing if there are no configured events other + * than the postmaster death event. + */ + if (GetNumRegisteredWaitEvents(node->as_eventset) == 1) + { + FreeWaitEventSet(node->as_eventset); + node->as_eventset = NULL; + return; + } + + /* We wait on at most EVENT_BUFFER_SIZE events. */ + if (nevents > EVENT_BUFFER_SIZE) + nevents = EVENT_BUFFER_SIZE; + + /* + * If the timeout is -1, wait until at least one event occurs. If the + * timeout is 0, poll for events, but do not wait at all. + */ + noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event, + nevents, WAIT_EVENT_APPEND_READY); + FreeWaitEventSet(node->as_eventset); + node->as_eventset = NULL; + if (noccurred == 0) + return; + + /* Deliver notifications. */ + for (i = 0; i < noccurred; i++) + { + WaitEvent *w = &occurred_event[i]; + + /* + * Each waiting subplan should have registered its wait event with + * user_data pointing back to its AsyncRequest. + */ + if ((w->events & WL_SOCKET_READABLE) != 0) + { + AsyncRequest *areq = (AsyncRequest *) w->user_data; + + if (areq->callback_pending) + { + /* + * Mark it as no longer needing a callback. We must do this + * before dispatching the callback in case the callback resets + * the flag. + */ + areq->callback_pending = false; + + /* Do the actual work. */ + ExecAsyncNotify(areq); + } + } + } +} + +/* ---------------------------------------------------------------- + * ExecAsyncAppendResponse + * + * Receive a response from an asynchronous request we made. + * ---------------------------------------------------------------- + */ +void +ExecAsyncAppendResponse(AsyncRequest *areq) +{ + AppendState *node = (AppendState *) areq->requestor; + TupleTableSlot *slot = areq->result; + + /* The result should be a TupleTableSlot or NULL. */ + Assert(slot == NULL || IsA(slot, TupleTableSlot)); + + /* Nothing to do if the request is pending. */ + if (!areq->request_complete) + { + /* The request would have been pending for a callback. */ + Assert(areq->callback_pending); + return; + } + + /* If the result is NULL or an empty slot, there's nothing more to do. */ + if (TupIsNull(slot)) + { + /* The ending subplan wouldn't have been pending for a callback. */ + Assert(!areq->callback_pending); + --node->as_nasyncremain; + return; + } + + /* Save result so we can return it. */ + Assert(node->as_nasyncresults < node->as_nasyncplans); + node->as_asyncresults[node->as_nasyncresults++] = slot; + + /* + * Mark the subplan that returned a result as ready for a new request. We + * don't launch another one here immediately because it might complete. + */ + node->as_needrequest = bms_add_member(node->as_needrequest, + areq->request_index); +} + +/* ---------------------------------------------------------------- + * classify_matching_subplans + * + * Classify the node's as_valid_subplans into sync ones and + * async ones, adjust it to contain sync ones only, and save + * async ones in the node's as_valid_asyncplans. + * ---------------------------------------------------------------- + */ +static void +classify_matching_subplans(AppendState *node) +{ + Bitmapset *valid_asyncplans; + + Assert(node->as_valid_asyncplans == NULL); + + /* Nothing to do if there are no valid subplans. */ + if (bms_is_empty(node->as_valid_subplans)) + { + node->as_syncdone = true; + node->as_nasyncremain = 0; + return; + } + + /* Nothing to do if there are no valid async subplans. */ + if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans)) + { + node->as_nasyncremain = 0; + return; + } + + /* Get valid async subplans. */ + valid_asyncplans = bms_copy(node->as_asyncplans); + valid_asyncplans = bms_int_members(valid_asyncplans, + node->as_valid_subplans); + + /* Adjust the valid subplans to contain sync subplans only. */ + node->as_valid_subplans = bms_del_members(node->as_valid_subplans, + valid_asyncplans); + + /* Save valid async subplans. */ + node->as_valid_asyncplans = valid_asyncplans; +} |