summaryrefslogtreecommitdiffstats
path: root/src/backend/executor/nodeMergeAppend.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeMergeAppend.c')
-rw-r--r--src/backend/executor/nodeMergeAppend.c389
1 files changed, 389 insertions, 0 deletions
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
new file mode 100644
index 0000000..617bffb
--- /dev/null
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -0,0 +1,389 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeMergeAppend.c
+ * routines to handle MergeAppend nodes.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/executor/nodeMergeAppend.c
+ *
+ *-------------------------------------------------------------------------
+ */
+/* INTERFACE ROUTINES
+ * ExecInitMergeAppend - initialize the MergeAppend node
+ * ExecMergeAppend - retrieve the next tuple from the node
+ * ExecEndMergeAppend - shut down the MergeAppend node
+ * ExecReScanMergeAppend - rescan the MergeAppend node
+ *
+ * NOTES
+ * A MergeAppend node contains a list of one or more subplans.
+ * These are each expected to deliver tuples that are sorted according
+ * to a common sort key. The MergeAppend node merges these streams
+ * to produce output sorted the same way.
+ *
+ * MergeAppend nodes don't make use of their left and right
+ * subtrees, rather they maintain a list of subplans so
+ * a typical MergeAppend node looks like this in the plan tree:
+ *
+ * ...
+ * /
+ * MergeAppend---+------+------+--- nil
+ * / \ | | |
+ * nil nil ... ... ...
+ * subplans
+ */
+
+#include "postgres.h"
+
+#include "executor/execdebug.h"
+#include "executor/execPartition.h"
+#include "executor/nodeMergeAppend.h"
+#include "lib/binaryheap.h"
+#include "miscadmin.h"
+
+/*
+ * We have one slot for each item in the heap array. We use SlotNumber
+ * to store slot indexes. This doesn't actually provide any formal
+ * type-safety, but it makes the code more self-documenting.
+ */
+typedef int32 SlotNumber;
+
+static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
+static int heap_compare_slots(Datum a, Datum b, void *arg);
+
+
+/* ----------------------------------------------------------------
+ * ExecInitMergeAppend
+ *
+ * Begin all of the subscans of the MergeAppend node.
+ * ----------------------------------------------------------------
+ */
+MergeAppendState *
+ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
+{
+ MergeAppendState *mergestate = makeNode(MergeAppendState);
+ PlanState **mergeplanstates;
+ Bitmapset *validsubplans;
+ int nplans;
+ int i,
+ j;
+
+ /* check for unsupported flags */
+ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
+
+ /*
+ * create new MergeAppendState for our node
+ */
+ mergestate->ps.plan = (Plan *) node;
+ mergestate->ps.state = estate;
+ mergestate->ps.ExecProcNode = ExecMergeAppend;
+
+ /* If run-time partition pruning is enabled, then set that up now */
+ if (node->part_prune_info != NULL)
+ {
+ PartitionPruneState *prunestate;
+
+ /* We may need an expression context to evaluate partition exprs */
+ ExecAssignExprContext(estate, &mergestate->ps);
+
+ prunestate = ExecCreatePartitionPruneState(&mergestate->ps,
+ node->part_prune_info);
+ mergestate->ms_prune_state = prunestate;
+
+ /* Perform an initial partition prune, if required. */
+ if (prunestate->do_initial_prune)
+ {
+ /* Determine which subplans survive initial pruning */
+ validsubplans = ExecFindInitialMatchingSubPlans(prunestate,
+ list_length(node->mergeplans));
+
+ nplans = bms_num_members(validsubplans);
+ }
+ else
+ {
+ /* We'll need to initialize all subplans */
+ nplans = list_length(node->mergeplans);
+ Assert(nplans > 0);
+ validsubplans = bms_add_range(NULL, 0, nplans - 1);
+ }
+
+ /*
+ * When no run-time pruning is required and there's at least one
+ * subplan, we can fill as_valid_subplans immediately, preventing
+ * later calls to ExecFindMatchingSubPlans.
+ */
+ if (!prunestate->do_exec_prune && nplans > 0)
+ mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
+ }
+ else
+ {
+ nplans = list_length(node->mergeplans);
+
+ /*
+ * When run-time partition pruning is not enabled we can just mark all
+ * subplans as valid; they must also all be initialized.
+ */
+ Assert(nplans > 0);
+ mergestate->ms_valid_subplans = validsubplans =
+ bms_add_range(NULL, 0, nplans - 1);
+ mergestate->ms_prune_state = NULL;
+ }
+
+ mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
+ mergestate->mergeplans = mergeplanstates;
+ mergestate->ms_nplans = nplans;
+
+ mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
+ mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
+ mergestate);
+
+ /*
+ * Miscellaneous initialization
+ *
+ * MergeAppend nodes do have Result slots, which hold pointers to tuples,
+ * so we have to initialize them. FIXME
+ */
+ ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
+
+ /* node returns slots from each of its subnodes, therefore not fixed */
+ mergestate->ps.resultopsset = true;
+ mergestate->ps.resultopsfixed = false;
+
+ /*
+ * call ExecInitNode on each of the valid plans to be executed and save
+ * the results into the mergeplanstates array.
+ */
+ j = 0;
+ i = -1;
+ while ((i = bms_next_member(validsubplans, i)) >= 0)
+ {
+ Plan *initNode = (Plan *) list_nth(node->mergeplans, i);
+
+ mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
+ }
+
+ mergestate->ps.ps_ProjInfo = NULL;
+
+ /*
+ * initialize sort-key information
+ */
+ mergestate->ms_nkeys = node->numCols;
+ mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
+
+ for (i = 0; i < node->numCols; i++)
+ {
+ SortSupport sortKey = mergestate->ms_sortkeys + i;
+
+ sortKey->ssup_cxt = CurrentMemoryContext;
+ sortKey->ssup_collation = node->collations[i];
+ sortKey->ssup_nulls_first = node->nullsFirst[i];
+ sortKey->ssup_attno = node->sortColIdx[i];
+
+ /*
+ * It isn't feasible to perform abbreviated key conversion, since
+ * tuples are pulled into mergestate's binary heap as needed. It
+ * would likely be counter-productive to convert tuples into an
+ * abbreviated representation as they're pulled up, so opt out of that
+ * additional optimization entirely.
+ */
+ sortKey->abbreviate = false;
+
+ PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
+ }
+
+ /*
+ * initialize to show we have not run the subplans yet
+ */
+ mergestate->ms_initialized = false;
+
+ return mergestate;
+}
+
+/* ----------------------------------------------------------------
+ * ExecMergeAppend
+ *
+ * Handles iteration over multiple subplans.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot *
+ExecMergeAppend(PlanState *pstate)
+{
+ MergeAppendState *node = castNode(MergeAppendState, pstate);
+ TupleTableSlot *result;
+ SlotNumber i;
+
+ CHECK_FOR_INTERRUPTS();
+
+ if (!node->ms_initialized)
+ {
+ /* Nothing to do if all subplans were pruned */
+ if (node->ms_nplans == 0)
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+
+ /*
+ * If we've yet to determine the valid subplans then do so now. If
+ * run-time pruning is disabled then the valid subplans will always be
+ * set to all subplans.
+ */
+ if (node->ms_valid_subplans == NULL)
+ node->ms_valid_subplans =
+ ExecFindMatchingSubPlans(node->ms_prune_state);
+
+ /*
+ * First time through: pull the first tuple from each valid subplan,
+ * and set up the heap.
+ */
+ i = -1;
+ while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
+ {
+ node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+ if (!TupIsNull(node->ms_slots[i]))
+ binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
+ }
+ binaryheap_build(node->ms_heap);
+ node->ms_initialized = true;
+ }
+ else
+ {
+ /*
+ * Otherwise, pull the next tuple from whichever subplan we returned
+ * from last time, and reinsert the subplan index into the heap,
+ * because it might now compare differently against the existing
+ * elements of the heap. (We could perhaps simplify the logic a bit
+ * by doing this before returning from the prior call, but it's better
+ * to not pull tuples until necessary.)
+ */
+ i = DatumGetInt32(binaryheap_first(node->ms_heap));
+ node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+ if (!TupIsNull(node->ms_slots[i]))
+ binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
+ else
+ (void) binaryheap_remove_first(node->ms_heap);
+ }
+
+ if (binaryheap_empty(node->ms_heap))
+ {
+ /* All the subplans are exhausted, and so is the heap */
+ result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
+ else
+ {
+ i = DatumGetInt32(binaryheap_first(node->ms_heap));
+ result = node->ms_slots[i];
+ }
+
+ return result;
+}
+
+/*
+ * Compare the tuples in the two given slots.
+ */
+static int32
+heap_compare_slots(Datum a, Datum b, void *arg)
+{
+ MergeAppendState *node = (MergeAppendState *) arg;
+ SlotNumber slot1 = DatumGetInt32(a);
+ SlotNumber slot2 = DatumGetInt32(b);
+
+ TupleTableSlot *s1 = node->ms_slots[slot1];
+ TupleTableSlot *s2 = node->ms_slots[slot2];
+ int nkey;
+
+ Assert(!TupIsNull(s1));
+ Assert(!TupIsNull(s2));
+
+ for (nkey = 0; nkey < node->ms_nkeys; nkey++)
+ {
+ SortSupport sortKey = node->ms_sortkeys + nkey;
+ AttrNumber attno = sortKey->ssup_attno;
+ Datum datum1,
+ datum2;
+ bool isNull1,
+ isNull2;
+ int compare;
+
+ datum1 = slot_getattr(s1, attno, &isNull1);
+ datum2 = slot_getattr(s2, attno, &isNull2);
+
+ compare = ApplySortComparator(datum1, isNull1,
+ datum2, isNull2,
+ sortKey);
+ if (compare != 0)
+ {
+ INVERT_COMPARE_RESULT(compare);
+ return compare;
+ }
+ }
+ return 0;
+}
+
+/* ----------------------------------------------------------------
+ * ExecEndMergeAppend
+ *
+ * Shuts down the subscans of the MergeAppend node.
+ *
+ * Returns nothing of interest.
+ * ----------------------------------------------------------------
+ */
+void
+ExecEndMergeAppend(MergeAppendState *node)
+{
+ PlanState **mergeplans;
+ int nplans;
+ int i;
+
+ /*
+ * get information from the node
+ */
+ mergeplans = node->mergeplans;
+ nplans = node->ms_nplans;
+
+ /*
+ * shut down each of the subscans
+ */
+ for (i = 0; i < nplans; i++)
+ ExecEndNode(mergeplans[i]);
+}
+
+void
+ExecReScanMergeAppend(MergeAppendState *node)
+{
+ int i;
+
+ /*
+ * If any PARAM_EXEC Params used in pruning expressions have changed, then
+ * we'd better unset the valid subplans so that they are reselected for
+ * the new parameter values.
+ */
+ if (node->ms_prune_state &&
+ bms_overlap(node->ps.chgParam,
+ node->ms_prune_state->execparamids))
+ {
+ bms_free(node->ms_valid_subplans);
+ node->ms_valid_subplans = NULL;
+ }
+
+ for (i = 0; i < node->ms_nplans; i++)
+ {
+ PlanState *subnode = node->mergeplans[i];
+
+ /*
+ * ExecReScan doesn't know about my subplans, so I have to do
+ * changed-parameter signaling myself.
+ */
+ if (node->ps.chgParam != NULL)
+ UpdateChangedParamSet(subnode, node->ps.chgParam);
+
+ /*
+ * If chgParam of subnode is not null then plan will be re-scanned by
+ * first ExecProcNode.
+ */
+ if (subnode->chgParam == NULL)
+ ExecReScan(subnode);
+ }
+ binaryheap_reset(node->ms_heap);
+ node->ms_initialized = false;
+}