diff options
Diffstat (limited to 'src/backend/executor/nodeAgg.c')
-rw-r--r-- | src/backend/executor/nodeAgg.c | 4829 |
1 files changed, 4829 insertions, 0 deletions
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c new file mode 100644 index 0000000..31609c6 --- /dev/null +++ b/src/backend/executor/nodeAgg.c @@ -0,0 +1,4829 @@ +/*------------------------------------------------------------------------- + * + * nodeAgg.c + * Routines to handle aggregate nodes. + * + * ExecAgg normally evaluates each aggregate in the following steps: + * + * transvalue = initcond + * foreach input_tuple do + * transvalue = transfunc(transvalue, input_value(s)) + * result = finalfunc(transvalue, direct_argument(s)) + * + * If a finalfunc is not supplied then the result is just the ending + * value of transvalue. + * + * Other behaviors can be selected by the "aggsplit" mode, which exists + * to support partial aggregation. It is possible to: + * * Skip running the finalfunc, so that the output is always the + * final transvalue state. + * * Substitute the combinefunc for the transfunc, so that transvalue + * states (propagated up from a child partial-aggregation step) are merged + * rather than processing raw input rows. (The statements below about + * the transfunc apply equally to the combinefunc, when it's selected.) + * * Apply the serializefunc to the output values (this only makes sense + * when skipping the finalfunc, since the serializefunc works on the + * transvalue data type). + * * Apply the deserializefunc to the input values (this only makes sense + * when using the combinefunc, for similar reasons). + * It is the planner's responsibility to connect up Agg nodes using these + * alternate behaviors in a way that makes sense, with partial aggregation + * results being fed to nodes that expect them. + * + * If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the + * input tuples and eliminate duplicates (if required) before performing + * the above-depicted process. (However, we don't do that for ordered-set + * aggregates; their "ORDER BY" inputs are ordinary aggregate arguments + * so far as this module is concerned.) Note that partial aggregation + * is not supported in these cases, since we couldn't ensure global + * ordering or distinctness of the inputs. + * + * If transfunc is marked "strict" in pg_proc and initcond is NULL, + * then the first non-NULL input_value is assigned directly to transvalue, + * and transfunc isn't applied until the second non-NULL input_value. + * The agg's first input type and transtype must be the same in this case! + * + * If transfunc is marked "strict" then NULL input_values are skipped, + * keeping the previous transvalue. If transfunc is not strict then it + * is called for every input tuple and must deal with NULL initcond + * or NULL input_values for itself. + * + * If finalfunc is marked "strict" then it is not called when the + * ending transvalue is NULL, instead a NULL result is created + * automatically (this is just the usual handling of strict functions, + * of course). A non-strict finalfunc can make its own choice of + * what to return for a NULL ending transvalue. + * + * Ordered-set aggregates are treated specially in one other way: we + * evaluate any "direct" arguments and pass them to the finalfunc along + * with the transition value. + * + * A finalfunc can have additional arguments beyond the transvalue and + * any "direct" arguments, corresponding to the input arguments of the + * aggregate. These are always just passed as NULL. Such arguments may be + * needed to allow resolution of a polymorphic aggregate's result type. + * + * We compute aggregate input expressions and run the transition functions + * in a temporary econtext (aggstate->tmpcontext). This is reset at least + * once per input tuple, so when the transvalue datatype is + * pass-by-reference, we have to be careful to copy it into a longer-lived + * memory context, and free the prior value to avoid memory leakage. We + * store transvalues in another set of econtexts, aggstate->aggcontexts + * (one per grouping set, see below), which are also used for the hashtable + * structures in AGG_HASHED mode. These econtexts are rescanned, not just + * reset, at group boundaries so that aggregate transition functions can + * register shutdown callbacks via AggRegisterCallback. + * + * The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to + * run finalize functions and compute the output tuple; this context can be + * reset once per output tuple. + * + * The executor's AggState node is passed as the fmgr "context" value in + * all transfunc and finalfunc calls. It is not recommended that the + * transition functions look at the AggState node directly, but they can + * use AggCheckCallContext() to verify that they are being called by + * nodeAgg.c (and not as ordinary SQL functions). The main reason a + * transition function might want to know this is so that it can avoid + * palloc'ing a fixed-size pass-by-ref transition value on every call: + * it can instead just scribble on and return its left input. Ordinarily + * it is completely forbidden for functions to modify pass-by-ref inputs, + * but in the aggregate case we know the left input is either the initial + * transition value or a previous function result, and in either case its + * value need not be preserved. See int8inc() for an example. Notice that + * the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when + * the previous transition value pointer is returned. It is also possible + * to avoid repeated data copying when the transition value is an expanded + * object: to do that, the transition function must take care to return + * an expanded object that is in a child context of the memory context + * returned by AggCheckCallContext(). Also, some transition functions want + * to store working state in addition to the nominal transition value; they + * can use the memory context returned by AggCheckCallContext() to do that. + * + * Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The + * AggState is available as context in earlier releases (back to 8.1), + * but direct examination of the node is needed to use it before 9.0. + * + * As of 9.4, aggregate transition functions can also use AggGetAggref() + * to get hold of the Aggref expression node for their aggregate call. + * This is mainly intended for ordered-set aggregates, which are not + * supported as window functions. (A regular aggregate function would + * need some fallback logic to use this, since there's no Aggref node + * for a window function.) + * + * Grouping sets: + * + * A list of grouping sets which is structurally equivalent to a ROLLUP + * clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over + * ordered data. We do this by keeping a separate set of transition values + * for each grouping set being concurrently processed; for each input tuple + * we update them all, and on group boundaries we reset those states + * (starting at the front of the list) whose grouping values have changed + * (the list of grouping sets is ordered from most specific to least + * specific). + * + * Where more complex grouping sets are used, we break them down into + * "phases", where each phase has a different sort order (except phase 0 + * which is reserved for hashing). During each phase but the last, the + * input tuples are additionally stored in a tuplesort which is keyed to the + * next phase's sort order; during each phase but the first, the input + * tuples are drawn from the previously sorted data. (The sorting of the + * data for the first phase is handled by the planner, as it might be + * satisfied by underlying nodes.) + * + * Hashing can be mixed with sorted grouping. To do this, we have an + * AGG_MIXED strategy that populates the hashtables during the first sorted + * phase, and switches to reading them out after completing all sort phases. + * We can also support AGG_HASHED with multiple hash tables and no sorting + * at all. + * + * From the perspective of aggregate transition and final functions, the + * only issue regarding grouping sets is this: a single call site (flinfo) + * of an aggregate function may be used for updating several different + * transition values in turn. So the function must not cache in the flinfo + * anything which logically belongs as part of the transition value (most + * importantly, the memory context in which the transition value exists). + * The support API functions (AggCheckCallContext, AggRegisterCallback) are + * sensitive to the grouping set for which the aggregate function is + * currently being called. + * + * Plan structure: + * + * What we get from the planner is actually one "real" Agg node which is + * part of the plan tree proper, but which optionally has an additional list + * of Agg nodes hung off the side via the "chain" field. This is because an + * Agg node happens to be a convenient representation of all the data we + * need for grouping sets. + * + * For many purposes, we treat the "real" node as if it were just the first + * node in the chain. The chain must be ordered such that hashed entries + * come before sorted/plain entries; the real node is marked AGG_MIXED if + * there are both types present (in which case the real node describes one + * of the hashed groupings, other AGG_HASHED nodes may optionally follow in + * the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node). If + * the real node is marked AGG_HASHED or AGG_SORTED, then all the chained + * nodes must be of the same type; if it is AGG_PLAIN, there can be no + * chained nodes. + * + * We collect all hashed nodes into a single "phase", numbered 0, and create + * a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node. + * Phase 0 is allocated even if there are no hashes, but remains unused in + * that case. + * + * AGG_HASHED nodes actually refer to only a single grouping set each, + * because for each hashed grouping we need a separate grpColIdx and + * numGroups estimate. AGG_SORTED nodes represent a "rollup", a list of + * grouping sets that share a sort order. Each AGG_SORTED node other than + * the first one has an associated Sort node which describes the sort order + * to be used; the first sorted node takes its input from the outer subtree, + * which the planner has already arranged to provide ordered data. + * + * Memory and ExprContext usage: + * + * Because we're accumulating aggregate values across input rows, we need to + * use more memory contexts than just simple input/output tuple contexts. + * In fact, for a rollup, we need a separate context for each grouping set + * so that we can reset the inner (finer-grained) aggregates on their group + * boundaries while continuing to accumulate values for outer + * (coarser-grained) groupings. On top of this, we might be simultaneously + * populating hashtables; however, we only need one context for all the + * hashtables. + * + * So we create an array, aggcontexts, with an ExprContext for each grouping + * set in the largest rollup that we're going to process, and use the + * per-tuple memory context of those ExprContexts to store the aggregate + * transition values. hashcontext is the single context created to support + * all hash tables. + * + * Spilling To Disk + * + * When performing hash aggregation, if the hash table memory exceeds the + * limit (see hash_agg_check_limits()), we enter "spill mode". In spill + * mode, we advance the transition states only for groups already in the + * hash table. For tuples that would need to create a new hash table + * entries (and initialize new transition states), we instead spill them to + * disk to be processed later. The tuples are spilled in a partitioned + * manner, so that subsequent batches are smaller and less likely to exceed + * hash_mem (if a batch does exceed hash_mem, it must be spilled + * recursively). + * + * Spilled data is written to logical tapes. These provide better control + * over memory usage, disk space, and the number of files than if we were + * to use a BufFile for each spill. + * + * Note that it's possible for transition states to start small but then + * grow very large; for instance in the case of ARRAY_AGG. In such cases, + * it's still possible to significantly exceed hash_mem. We try to avoid + * this situation by estimating what will fit in the available memory, and + * imposing a limit on the number of groups separately from the amount of + * memory consumed. + * + * Transition / Combine function invocation: + * + * For performance reasons transition functions, including combine + * functions, aren't invoked one-by-one from nodeAgg.c after computing + * arguments using the expression evaluation engine. Instead + * ExecBuildAggTrans() builds one large expression that does both argument + * evaluation and transition function invocation. That avoids performance + * issues due to repeated uses of expression evaluation, complications due + * to filter expressions having to be evaluated early, and allows to JIT + * the entire expression into one native function. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/nodeAgg.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "access/parallel.h" +#include "catalog/objectaccess.h" +#include "catalog/pg_aggregate.h" +#include "catalog/pg_proc.h" +#include "catalog/pg_type.h" +#include "common/hashfn.h" +#include "executor/execExpr.h" +#include "executor/executor.h" +#include "executor/nodeAgg.h" +#include "lib/hyperloglog.h" +#include "miscadmin.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "optimizer/optimizer.h" +#include "parser/parse_agg.h" +#include "parser/parse_coerce.h" +#include "utils/acl.h" +#include "utils/builtins.h" +#include "utils/datum.h" +#include "utils/dynahash.h" +#include "utils/expandeddatum.h" +#include "utils/logtape.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/syscache.h" +#include "utils/tuplesort.h" + +/* + * Control how many partitions are created when spilling HashAgg to + * disk. + * + * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of + * partitions needed such that each partition will fit in memory. The factor + * is set higher than one because there's not a high cost to having a few too + * many partitions, and it makes it less likely that a partition will need to + * be spilled recursively. Another benefit of having more, smaller partitions + * is that small hash tables may perform better than large ones due to memory + * caching effects. + * + * We also specify a min and max number of partitions per spill. Too few might + * mean a lot of wasted I/O from repeated spilling of the same tuples. Too + * many will result in lots of memory wasted buffering the spill files (which + * could instead be spent on a larger hash table). + */ +#define HASHAGG_PARTITION_FACTOR 1.50 +#define HASHAGG_MIN_PARTITIONS 4 +#define HASHAGG_MAX_PARTITIONS 1024 + +/* + * For reading from tapes, the buffer size must be a multiple of + * BLCKSZ. Larger values help when reading from multiple tapes concurrently, + * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a + * tape always uses a buffer of size BLCKSZ. + */ +#define HASHAGG_READ_BUFFER_SIZE BLCKSZ +#define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ + +/* + * HyperLogLog is used for estimating the cardinality of the spilled tuples in + * a given partition. 5 bits corresponds to a size of about 32 bytes and a + * worst-case error of around 18%. That's effective enough to choose a + * reasonable number of partitions when recursing. + */ +#define HASHAGG_HLL_BIT_WIDTH 5 + +/* + * Estimate chunk overhead as a constant 16 bytes. XXX: should this be + * improved? + */ +#define CHUNKHDRSZ 16 + +/* + * Track all tapes needed for a HashAgg that spills. We don't know the maximum + * number of tapes needed at the start of the algorithm (because it can + * recurse), so one tape set is allocated and extended as needed for new + * tapes. When a particular tape is already read, rewind it for write mode and + * put it in the free list. + * + * Tapes' buffers can take up substantial memory when many tapes are open at + * once. We only need one tape open at a time in read mode (using a buffer + * that's a multiple of BLCKSZ); but we need one tape open in write mode (each + * requiring a buffer of size BLCKSZ) for each partition. + */ +typedef struct HashTapeInfo +{ + LogicalTapeSet *tapeset; + int ntapes; + int *freetapes; + int nfreetapes; + int freetapes_alloc; +} HashTapeInfo; + +/* + * Represents partitioned spill data for a single hashtable. Contains the + * necessary information to route tuples to the correct partition, and to + * transform the spilled data into new batches. + * + * The high bits are used for partition selection (when recursing, we ignore + * the bits that have already been used for partition selection at an earlier + * level). + */ +typedef struct HashAggSpill +{ + LogicalTapeSet *tapeset; /* borrowed reference to tape set */ + int npartitions; /* number of partitions */ + int *partitions; /* spill partition tape numbers */ + int64 *ntuples; /* number of tuples in each partition */ + uint32 mask; /* mask to find partition from hash value */ + int shift; /* after masking, shift by this amount */ + hyperLogLogState *hll_card; /* cardinality estimate for contents */ +} HashAggSpill; + +/* + * Represents work to be done for one pass of hash aggregation (with only one + * grouping set). + * + * Also tracks the bits of the hash already used for partition selection by + * earlier iterations, so that this batch can use new bits. If all bits have + * already been used, no partitioning will be done (any spilled data will go + * to a single output tape). + */ +typedef struct HashAggBatch +{ + int setno; /* grouping set */ + int used_bits; /* number of bits of hash already used */ + LogicalTapeSet *tapeset; /* borrowed reference to tape set */ + int input_tapenum; /* input partition tape */ + int64 input_tuples; /* number of tuples in this batch */ + double input_card; /* estimated group cardinality */ +} HashAggBatch; + +/* used to find referenced colnos */ +typedef struct FindColsContext +{ + bool is_aggref; /* is under an aggref */ + Bitmapset *aggregated; /* column references under an aggref */ + Bitmapset *unaggregated; /* other column references */ +} FindColsContext; + +static void select_current_set(AggState *aggstate, int setno, bool is_hash); +static void initialize_phase(AggState *aggstate, int newphase); +static TupleTableSlot *fetch_input_tuple(AggState *aggstate); +static void initialize_aggregates(AggState *aggstate, + AggStatePerGroup *pergroups, + int numReset); +static void advance_transition_function(AggState *aggstate, + AggStatePerTrans pertrans, + AggStatePerGroup pergroupstate); +static void advance_aggregates(AggState *aggstate); +static void process_ordered_aggregate_single(AggState *aggstate, + AggStatePerTrans pertrans, + AggStatePerGroup pergroupstate); +static void process_ordered_aggregate_multi(AggState *aggstate, + AggStatePerTrans pertrans, + AggStatePerGroup pergroupstate); +static void finalize_aggregate(AggState *aggstate, + AggStatePerAgg peragg, + AggStatePerGroup pergroupstate, + Datum *resultVal, bool *resultIsNull); +static void finalize_partialaggregate(AggState *aggstate, + AggStatePerAgg peragg, + AggStatePerGroup pergroupstate, + Datum *resultVal, bool *resultIsNull); +static inline void prepare_hash_slot(AggStatePerHash perhash, + TupleTableSlot *inputslot, + TupleTableSlot *hashslot); +static void prepare_projection_slot(AggState *aggstate, + TupleTableSlot *slot, + int currentSet); +static void finalize_aggregates(AggState *aggstate, + AggStatePerAgg peragg, + AggStatePerGroup pergroup); +static TupleTableSlot *project_aggregates(AggState *aggstate); +static void find_cols(AggState *aggstate, Bitmapset **aggregated, + Bitmapset **unaggregated); +static bool find_cols_walker(Node *node, FindColsContext *context); +static void build_hash_tables(AggState *aggstate); +static void build_hash_table(AggState *aggstate, int setno, long nbuckets); +static void hashagg_recompile_expressions(AggState *aggstate, bool minslot, + bool nullcheck); +static long hash_choose_num_buckets(double hashentrysize, + long estimated_nbuckets, + Size memory); +static int hash_choose_num_partitions(double input_groups, + double hashentrysize, + int used_bits, + int *log2_npartittions); +static void initialize_hash_entry(AggState *aggstate, + TupleHashTable hashtable, + TupleHashEntry entry); +static void lookup_hash_entries(AggState *aggstate); +static TupleTableSlot *agg_retrieve_direct(AggState *aggstate); +static void agg_fill_hash_table(AggState *aggstate); +static bool agg_refill_hash_table(AggState *aggstate); +static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate); +static TupleTableSlot *agg_retrieve_hash_table_in_memory(AggState *aggstate); +static void hash_agg_check_limits(AggState *aggstate); +static void hash_agg_enter_spill_mode(AggState *aggstate); +static void hash_agg_update_metrics(AggState *aggstate, bool from_tape, + int npartitions); +static void hashagg_finish_initial_spills(AggState *aggstate); +static void hashagg_reset_spill_state(AggState *aggstate); +static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset, + int input_tapenum, int setno, + int64 input_tuples, double input_card, + int used_bits); +static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp); +static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, + int used_bits, double input_groups, + double hashentrysize); +static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, + TupleTableSlot *slot, uint32 hash); +static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, + int setno); +static void hashagg_tapeinfo_init(AggState *aggstate); +static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest, + int ndest); +static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum); +static Datum GetAggInitVal(Datum textInitVal, Oid transtype); +static void build_pertrans_for_aggref(AggStatePerTrans pertrans, + AggState *aggstate, EState *estate, + Aggref *aggref, Oid aggtransfn, Oid aggtranstype, + Oid aggserialfn, Oid aggdeserialfn, + Datum initValue, bool initValueIsNull, + Oid *inputTypes, int numArguments); + + +/* + * Select the current grouping set; affects current_set and + * curaggcontext. + */ +static void +select_current_set(AggState *aggstate, int setno, bool is_hash) +{ + /* + * When changing this, also adapt ExecAggPlainTransByVal() and + * ExecAggPlainTransByRef(). + */ + if (is_hash) + aggstate->curaggcontext = aggstate->hashcontext; + else + aggstate->curaggcontext = aggstate->aggcontexts[setno]; + + aggstate->current_set = setno; +} + +/* + * Switch to phase "newphase", which must either be 0 or 1 (to reset) or + * current_phase + 1. Juggle the tuplesorts accordingly. + * + * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED + * case, so when entering phase 0, all we need to do is drop open sorts. + */ +static void +initialize_phase(AggState *aggstate, int newphase) +{ + Assert(newphase <= 1 || newphase == aggstate->current_phase + 1); + + /* + * Whatever the previous state, we're now done with whatever input + * tuplesort was in use. + */ + if (aggstate->sort_in) + { + tuplesort_end(aggstate->sort_in); + aggstate->sort_in = NULL; + } + + if (newphase <= 1) + { + /* + * Discard any existing output tuplesort. + */ + if (aggstate->sort_out) + { + tuplesort_end(aggstate->sort_out); + aggstate->sort_out = NULL; + } + } + else + { + /* + * The old output tuplesort becomes the new input one, and this is the + * right time to actually sort it. + */ + aggstate->sort_in = aggstate->sort_out; + aggstate->sort_out = NULL; + Assert(aggstate->sort_in); + tuplesort_performsort(aggstate->sort_in); + } + + /* + * If this isn't the last phase, we need to sort appropriately for the + * next phase in sequence. + */ + if (newphase > 0 && newphase < aggstate->numphases - 1) + { + Sort *sortnode = aggstate->phases[newphase + 1].sortnode; + PlanState *outerNode = outerPlanState(aggstate); + TupleDesc tupDesc = ExecGetResultType(outerNode); + + aggstate->sort_out = tuplesort_begin_heap(tupDesc, + sortnode->numCols, + sortnode->sortColIdx, + sortnode->sortOperators, + sortnode->collations, + sortnode->nullsFirst, + work_mem, + NULL, false); + } + + aggstate->current_phase = newphase; + aggstate->phase = &aggstate->phases[newphase]; +} + +/* + * Fetch a tuple from either the outer plan (for phase 1) or from the sorter + * populated by the previous phase. Copy it to the sorter for the next phase + * if any. + * + * Callers cannot rely on memory for tuple in returned slot remaining valid + * past any subsequently fetched tuple. + */ +static TupleTableSlot * +fetch_input_tuple(AggState *aggstate) +{ + TupleTableSlot *slot; + + if (aggstate->sort_in) + { + /* make sure we check for interrupts in either path through here */ + CHECK_FOR_INTERRUPTS(); + if (!tuplesort_gettupleslot(aggstate->sort_in, true, false, + aggstate->sort_slot, NULL)) + return NULL; + slot = aggstate->sort_slot; + } + else + slot = ExecProcNode(outerPlanState(aggstate)); + + if (!TupIsNull(slot) && aggstate->sort_out) + tuplesort_puttupleslot(aggstate->sort_out, slot); + + return slot; +} + +/* + * (Re)Initialize an individual aggregate. + * + * This function handles only one grouping set, already set in + * aggstate->current_set. + * + * When called, CurrentMemoryContext should be the per-query context. + */ +static void +initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, + AggStatePerGroup pergroupstate) +{ + /* + * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate. + */ + if (pertrans->numSortCols > 0) + { + /* + * In case of rescan, maybe there could be an uncompleted sort + * operation? Clean it up if so. + */ + if (pertrans->sortstates[aggstate->current_set]) + tuplesort_end(pertrans->sortstates[aggstate->current_set]); + + + /* + * We use a plain Datum sorter when there's a single input column; + * otherwise sort the full tuple. (See comments for + * process_ordered_aggregate_single.) + */ + if (pertrans->numInputs == 1) + { + Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0); + + pertrans->sortstates[aggstate->current_set] = + tuplesort_begin_datum(attr->atttypid, + pertrans->sortOperators[0], + pertrans->sortCollations[0], + pertrans->sortNullsFirst[0], + work_mem, NULL, false); + } + else + pertrans->sortstates[aggstate->current_set] = + tuplesort_begin_heap(pertrans->sortdesc, + pertrans->numSortCols, + pertrans->sortColIdx, + pertrans->sortOperators, + pertrans->sortCollations, + pertrans->sortNullsFirst, + work_mem, NULL, false); + } + + /* + * (Re)set transValue to the initial value. + * + * Note that when the initial value is pass-by-ref, we must copy it (into + * the aggcontext) since we will pfree the transValue later. + */ + if (pertrans->initValueIsNull) + pergroupstate->transValue = pertrans->initValue; + else + { + MemoryContext oldContext; + + oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory); + pergroupstate->transValue = datumCopy(pertrans->initValue, + pertrans->transtypeByVal, + pertrans->transtypeLen); + MemoryContextSwitchTo(oldContext); + } + pergroupstate->transValueIsNull = pertrans->initValueIsNull; + + /* + * If the initial value for the transition state doesn't exist in the + * pg_aggregate table then we will let the first non-NULL value returned + * from the outer procNode become the initial value. (This is useful for + * aggregates like max() and min().) The noTransValue flag signals that we + * still need to do this. + */ + pergroupstate->noTransValue = pertrans->initValueIsNull; +} + +/* + * Initialize all aggregate transition states for a new group of input values. + * + * If there are multiple grouping sets, we initialize only the first numReset + * of them (the grouping sets are ordered so that the most specific one, which + * is reset most often, is first). As a convenience, if numReset is 0, we + * reinitialize all sets. + * + * NB: This cannot be used for hash aggregates, as for those the grouping set + * number has to be specified from further up. + * + * When called, CurrentMemoryContext should be the per-query context. + */ +static void +initialize_aggregates(AggState *aggstate, + AggStatePerGroup *pergroups, + int numReset) +{ + int transno; + int numGroupingSets = Max(aggstate->phase->numsets, 1); + int setno = 0; + int numTrans = aggstate->numtrans; + AggStatePerTrans transstates = aggstate->pertrans; + + if (numReset == 0) + numReset = numGroupingSets; + + for (setno = 0; setno < numReset; setno++) + { + AggStatePerGroup pergroup = pergroups[setno]; + + select_current_set(aggstate, setno, false); + + for (transno = 0; transno < numTrans; transno++) + { + AggStatePerTrans pertrans = &transstates[transno]; + AggStatePerGroup pergroupstate = &pergroup[transno]; + + initialize_aggregate(aggstate, pertrans, pergroupstate); + } + } +} + +/* + * Given new input value(s), advance the transition function of one aggregate + * state within one grouping set only (already set in aggstate->current_set) + * + * The new values (and null flags) have been preloaded into argument positions + * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to + * pass to the transition function. We also expect that the static fields of + * the fcinfo are already initialized; that was done by ExecInitAgg(). + * + * It doesn't matter which memory context this is called in. + */ +static void +advance_transition_function(AggState *aggstate, + AggStatePerTrans pertrans, + AggStatePerGroup pergroupstate) +{ + FunctionCallInfo fcinfo = pertrans->transfn_fcinfo; + MemoryContext oldContext; + Datum newVal; + + if (pertrans->transfn.fn_strict) + { + /* + * For a strict transfn, nothing happens when there's a NULL input; we + * just keep the prior transValue. + */ + int numTransInputs = pertrans->numTransInputs; + int i; + + for (i = 1; i <= numTransInputs; i++) + { + if (fcinfo->args[i].isnull) + return; + } + if (pergroupstate->noTransValue) + { + /* + * transValue has not been initialized. This is the first non-NULL + * input value. We use it as the initial value for transValue. (We + * already checked that the agg's input type is binary-compatible + * with its transtype, so straight copy here is OK.) + * + * We must copy the datum into aggcontext if it is pass-by-ref. We + * do not need to pfree the old transValue, since it's NULL. + */ + oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory); + pergroupstate->transValue = datumCopy(fcinfo->args[1].value, + pertrans->transtypeByVal, + pertrans->transtypeLen); + pergroupstate->transValueIsNull = false; + pergroupstate->noTransValue = false; + MemoryContextSwitchTo(oldContext); + return; + } + if (pergroupstate->transValueIsNull) + { + /* + * Don't call a strict function with NULL inputs. Note it is + * possible to get here despite the above tests, if the transfn is + * strict *and* returned a NULL on a prior cycle. If that happens + * we will propagate the NULL all the way to the end. + */ + return; + } + } + + /* We run the transition functions in per-input-tuple memory context */ + oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); + + /* set up aggstate->curpertrans for AggGetAggref() */ + aggstate->curpertrans = pertrans; + + /* + * OK to call the transition function + */ + fcinfo->args[0].value = pergroupstate->transValue; + fcinfo->args[0].isnull = pergroupstate->transValueIsNull; + fcinfo->isnull = false; /* just in case transfn doesn't set it */ + + newVal = FunctionCallInvoke(fcinfo); + + aggstate->curpertrans = NULL; + + /* + * If pass-by-ref datatype, must copy the new value into aggcontext and + * free the prior transValue. But if transfn returned a pointer to its + * first input, we don't need to do anything. Also, if transfn returned a + * pointer to a R/W expanded object that is already a child of the + * aggcontext, assume we can adopt that value without copying it. + * + * It's safe to compare newVal with pergroup->transValue without regard + * for either being NULL, because ExecAggTransReparent() takes care to set + * transValue to 0 when NULL. Otherwise we could end up accidentally not + * reparenting, when the transValue has the same numerical value as + * newValue, despite being NULL. This is a somewhat hot path, making it + * undesirable to instead solve this with another branch for the common + * case of the transition function returning its (modified) input + * argument. + */ + if (!pertrans->transtypeByVal && + DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue)) + newVal = ExecAggTransReparent(aggstate, pertrans, + newVal, fcinfo->isnull, + pergroupstate->transValue, + pergroupstate->transValueIsNull); + + pergroupstate->transValue = newVal; + pergroupstate->transValueIsNull = fcinfo->isnull; + + MemoryContextSwitchTo(oldContext); +} + +/* + * Advance each aggregate transition state for one input tuple. The input + * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is + * accessible to ExecEvalExpr. + * + * We have two sets of transition states to handle: one for sorted aggregation + * and one for hashed; we do them both here, to avoid multiple evaluation of + * the inputs. + * + * When called, CurrentMemoryContext should be the per-query context. + */ +static void +advance_aggregates(AggState *aggstate) +{ + bool dummynull; + + ExecEvalExprSwitchContext(aggstate->phase->evaltrans, + aggstate->tmpcontext, + &dummynull); +} + +/* + * Run the transition function for a DISTINCT or ORDER BY aggregate + * with only one input. This is called after we have completed + * entering all the input values into the sort object. We complete the + * sort, read out the values in sorted order, and run the transition + * function on each value (applying DISTINCT if appropriate). + * + * Note that the strictness of the transition function was checked when + * entering the values into the sort, so we don't check it again here; + * we just apply standard SQL DISTINCT logic. + * + * The one-input case is handled separately from the multi-input case + * for performance reasons: for single by-value inputs, such as the + * common case of count(distinct id), the tuplesort_getdatum code path + * is around 300% faster. (The speedup for by-reference types is less + * but still noticeable.) + * + * This function handles only one grouping set (already set in + * aggstate->current_set). + * + * When called, CurrentMemoryContext should be the per-query context. + */ +static void +process_ordered_aggregate_single(AggState *aggstate, + AggStatePerTrans pertrans, + AggStatePerGroup pergroupstate) +{ + Datum oldVal = (Datum) 0; + bool oldIsNull = true; + bool haveOldVal = false; + MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory; + MemoryContext oldContext; + bool isDistinct = (pertrans->numDistinctCols > 0); + Datum newAbbrevVal = (Datum) 0; + Datum oldAbbrevVal = (Datum) 0; + FunctionCallInfo fcinfo = pertrans->transfn_fcinfo; + Datum *newVal; + bool *isNull; + + Assert(pertrans->numDistinctCols < 2); + + tuplesort_performsort(pertrans->sortstates[aggstate->current_set]); + + /* Load the column into argument 1 (arg 0 will be transition value) */ + newVal = &fcinfo->args[1].value; + isNull = &fcinfo->args[1].isnull; + + /* + * Note: if input type is pass-by-ref, the datums returned by the sort are + * freshly palloc'd in the per-query context, so we must be careful to + * pfree them when they are no longer needed. + */ + + while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set], + true, newVal, isNull, &newAbbrevVal)) + { + /* + * Clear and select the working context for evaluation of the equality + * function and transition function. + */ + MemoryContextReset(workcontext); + oldContext = MemoryContextSwitchTo(workcontext); + + /* + * If DISTINCT mode, and not distinct from prior, skip it. + */ + if (isDistinct && + haveOldVal && + ((oldIsNull && *isNull) || + (!oldIsNull && !*isNull && + oldAbbrevVal == newAbbrevVal && + DatumGetBool(FunctionCall2Coll(&pertrans->equalfnOne, + pertrans->aggCollation, + oldVal, *newVal))))) + { + /* equal to prior, so forget this one */ + if (!pertrans->inputtypeByVal && !*isNull) + pfree(DatumGetPointer(*newVal)); + } + else + { + advance_transition_function(aggstate, pertrans, pergroupstate); + /* forget the old value, if any */ + if (!oldIsNull && !pertrans->inputtypeByVal) + pfree(DatumGetPointer(oldVal)); + /* and remember the new one for subsequent equality checks */ + oldVal = *newVal; + oldAbbrevVal = newAbbrevVal; + oldIsNull = *isNull; + haveOldVal = true; + } + + MemoryContextSwitchTo(oldContext); + } + + if (!oldIsNull && !pertrans->inputtypeByVal) + pfree(DatumGetPointer(oldVal)); + + tuplesort_end(pertrans->sortstates[aggstate->current_set]); + pertrans->sortstates[aggstate->current_set] = NULL; +} + +/* + * Run the transition function for a DISTINCT or ORDER BY aggregate + * with more than one input. This is called after we have completed + * entering all the input values into the sort object. We complete the + * sort, read out the values in sorted order, and run the transition + * function on each value (applying DISTINCT if appropriate). + * + * This function handles only one grouping set (already set in + * aggstate->current_set). + * + * When called, CurrentMemoryContext should be the per-query context. + */ +static void +process_ordered_aggregate_multi(AggState *aggstate, + AggStatePerTrans pertrans, + AggStatePerGroup pergroupstate) +{ + ExprContext *tmpcontext = aggstate->tmpcontext; + FunctionCallInfo fcinfo = pertrans->transfn_fcinfo; + TupleTableSlot *slot1 = pertrans->sortslot; + TupleTableSlot *slot2 = pertrans->uniqslot; + int numTransInputs = pertrans->numTransInputs; + int numDistinctCols = pertrans->numDistinctCols; + Datum newAbbrevVal = (Datum) 0; + Datum oldAbbrevVal = (Datum) 0; + bool haveOldValue = false; + TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple; + int i; + + tuplesort_performsort(pertrans->sortstates[aggstate->current_set]); + + ExecClearTuple(slot1); + if (slot2) + ExecClearTuple(slot2); + + while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set], + true, true, slot1, &newAbbrevVal)) + { + CHECK_FOR_INTERRUPTS(); + + tmpcontext->ecxt_outertuple = slot1; + tmpcontext->ecxt_innertuple = slot2; + + if (numDistinctCols == 0 || + !haveOldValue || + newAbbrevVal != oldAbbrevVal || + !ExecQual(pertrans->equalfnMulti, tmpcontext)) + { + /* + * Extract the first numTransInputs columns as datums to pass to + * the transfn. + */ + slot_getsomeattrs(slot1, numTransInputs); + + /* Load values into fcinfo */ + /* Start from 1, since the 0th arg will be the transition value */ + for (i = 0; i < numTransInputs; i++) + { + fcinfo->args[i + 1].value = slot1->tts_values[i]; + fcinfo->args[i + 1].isnull = slot1->tts_isnull[i]; + } + + advance_transition_function(aggstate, pertrans, pergroupstate); + + if (numDistinctCols > 0) + { + /* swap the slot pointers to retain the current tuple */ + TupleTableSlot *tmpslot = slot2; + + slot2 = slot1; + slot1 = tmpslot; + /* avoid ExecQual() calls by reusing abbreviated keys */ + oldAbbrevVal = newAbbrevVal; + haveOldValue = true; + } + } + + /* Reset context each time */ + ResetExprContext(tmpcontext); + + ExecClearTuple(slot1); + } + + if (slot2) + ExecClearTuple(slot2); + + tuplesort_end(pertrans->sortstates[aggstate->current_set]); + pertrans->sortstates[aggstate->current_set] = NULL; + + /* restore previous slot, potentially in use for grouping sets */ + tmpcontext->ecxt_outertuple = save; +} + +/* + * Compute the final value of one aggregate. + * + * This function handles only one grouping set (already set in + * aggstate->current_set). + * + * The finalfn will be run, and the result delivered, in the + * output-tuple context; caller's CurrentMemoryContext does not matter. + * + * The finalfn uses the state as set in the transno. This also might be + * being used by another aggregate function, so it's important that we do + * nothing destructive here. + */ +static void +finalize_aggregate(AggState *aggstate, + AggStatePerAgg peragg, + AggStatePerGroup pergroupstate, + Datum *resultVal, bool *resultIsNull) +{ + LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS); + bool anynull = false; + MemoryContext oldContext; + int i; + ListCell *lc; + AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno]; + + oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); + + /* + * Evaluate any direct arguments. We do this even if there's no finalfn + * (which is unlikely anyway), so that side-effects happen as expected. + * The direct arguments go into arg positions 1 and up, leaving position 0 + * for the transition state value. + */ + i = 1; + foreach(lc, peragg->aggdirectargs) + { + ExprState *expr = (ExprState *) lfirst(lc); + + fcinfo->args[i].value = ExecEvalExpr(expr, + aggstate->ss.ps.ps_ExprContext, + &fcinfo->args[i].isnull); + anynull |= fcinfo->args[i].isnull; + i++; + } + + /* + * Apply the agg's finalfn if one is provided, else return transValue. + */ + if (OidIsValid(peragg->finalfn_oid)) + { + int numFinalArgs = peragg->numFinalArgs; + + /* set up aggstate->curperagg for AggGetAggref() */ + aggstate->curperagg = peragg; + + InitFunctionCallInfoData(*fcinfo, &peragg->finalfn, + numFinalArgs, + pertrans->aggCollation, + (void *) aggstate, NULL); + + /* Fill in the transition state value */ + fcinfo->args[0].value = + MakeExpandedObjectReadOnly(pergroupstate->transValue, + pergroupstate->transValueIsNull, + pertrans->transtypeLen); + fcinfo->args[0].isnull = pergroupstate->transValueIsNull; + anynull |= pergroupstate->transValueIsNull; + + /* Fill any remaining argument positions with nulls */ + for (; i < numFinalArgs; i++) + { + fcinfo->args[i].value = (Datum) 0; + fcinfo->args[i].isnull = true; + anynull = true; + } + + if (fcinfo->flinfo->fn_strict && anynull) + { + /* don't call a strict function with NULL inputs */ + *resultVal = (Datum) 0; + *resultIsNull = true; + } + else + { + *resultVal = FunctionCallInvoke(fcinfo); + *resultIsNull = fcinfo->isnull; + } + aggstate->curperagg = NULL; + } + else + { + /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */ + *resultVal = pergroupstate->transValue; + *resultIsNull = pergroupstate->transValueIsNull; + } + + /* + * If result is pass-by-ref, make sure it is in the right context. + */ + if (!peragg->resulttypeByVal && !*resultIsNull && + !MemoryContextContains(CurrentMemoryContext, + DatumGetPointer(*resultVal))) + *resultVal = datumCopy(*resultVal, + peragg->resulttypeByVal, + peragg->resulttypeLen); + + MemoryContextSwitchTo(oldContext); +} + +/* + * Compute the output value of one partial aggregate. + * + * The serialization function will be run, and the result delivered, in the + * output-tuple context; caller's CurrentMemoryContext does not matter. + */ +static void +finalize_partialaggregate(AggState *aggstate, + AggStatePerAgg peragg, + AggStatePerGroup pergroupstate, + Datum *resultVal, bool *resultIsNull) +{ + AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno]; + MemoryContext oldContext; + + oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); + + /* + * serialfn_oid will be set if we must serialize the transvalue before + * returning it + */ + if (OidIsValid(pertrans->serialfn_oid)) + { + /* Don't call a strict serialization function with NULL input. */ + if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull) + { + *resultVal = (Datum) 0; + *resultIsNull = true; + } + else + { + FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo; + + fcinfo->args[0].value = + MakeExpandedObjectReadOnly(pergroupstate->transValue, + pergroupstate->transValueIsNull, + pertrans->transtypeLen); + fcinfo->args[0].isnull = pergroupstate->transValueIsNull; + fcinfo->isnull = false; + + *resultVal = FunctionCallInvoke(fcinfo); + *resultIsNull = fcinfo->isnull; + } + } + else + { + /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */ + *resultVal = pergroupstate->transValue; + *resultIsNull = pergroupstate->transValueIsNull; + } + + /* If result is pass-by-ref, make sure it is in the right context. */ + if (!peragg->resulttypeByVal && !*resultIsNull && + !MemoryContextContains(CurrentMemoryContext, + DatumGetPointer(*resultVal))) + *resultVal = datumCopy(*resultVal, + peragg->resulttypeByVal, + peragg->resulttypeLen); + + MemoryContextSwitchTo(oldContext); +} + +/* + * Extract the attributes that make up the grouping key into the + * hashslot. This is necessary to compute the hash or perform a lookup. + */ +static inline void +prepare_hash_slot(AggStatePerHash perhash, + TupleTableSlot *inputslot, + TupleTableSlot *hashslot) +{ + int i; + + /* transfer just the needed columns into hashslot */ + slot_getsomeattrs(inputslot, perhash->largestGrpColIdx); + ExecClearTuple(hashslot); + + for (i = 0; i < perhash->numhashGrpCols; i++) + { + int varNumber = perhash->hashGrpColIdxInput[i] - 1; + + hashslot->tts_values[i] = inputslot->tts_values[varNumber]; + hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber]; + } + ExecStoreVirtualTuple(hashslot); +} + +/* + * Prepare to finalize and project based on the specified representative tuple + * slot and grouping set. + * + * In the specified tuple slot, force to null all attributes that should be + * read as null in the context of the current grouping set. Also stash the + * current group bitmap where GroupingExpr can get at it. + * + * This relies on three conditions: + * + * 1) Nothing is ever going to try and extract the whole tuple from this slot, + * only reference it in evaluations, which will only access individual + * attributes. + * + * 2) No system columns are going to need to be nulled. (If a system column is + * referenced in a group clause, it is actually projected in the outer plan + * tlist.) + * + * 3) Within a given phase, we never need to recover the value of an attribute + * once it has been set to null. + * + * Poking into the slot this way is a bit ugly, but the consensus is that the + * alternative was worse. + */ +static void +prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet) +{ + if (aggstate->phase->grouped_cols) + { + Bitmapset *grouped_cols = aggstate->phase->grouped_cols[currentSet]; + + aggstate->grouped_cols = grouped_cols; + + if (TTS_EMPTY(slot)) + { + /* + * Force all values to be NULL if working on an empty input tuple + * (i.e. an empty grouping set for which no input rows were + * supplied). + */ + ExecStoreAllNullTuple(slot); + } + else if (aggstate->all_grouped_cols) + { + ListCell *lc; + + /* all_grouped_cols is arranged in desc order */ + slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols)); + + foreach(lc, aggstate->all_grouped_cols) + { + int attnum = lfirst_int(lc); + + if (!bms_is_member(attnum, grouped_cols)) + slot->tts_isnull[attnum - 1] = true; + } + } + } +} + +/* + * Compute the final value of all aggregates for one group. + * + * This function handles only one grouping set at a time, which the caller must + * have selected. It's also the caller's responsibility to adjust the supplied + * pergroup parameter to point to the current set's transvalues. + * + * Results are stored in the output econtext aggvalues/aggnulls. + */ +static void +finalize_aggregates(AggState *aggstate, + AggStatePerAgg peraggs, + AggStatePerGroup pergroup) +{ + ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; + Datum *aggvalues = econtext->ecxt_aggvalues; + bool *aggnulls = econtext->ecxt_aggnulls; + int aggno; + int transno; + + /* + * If there were any DISTINCT and/or ORDER BY aggregates, sort their + * inputs and run the transition functions. + */ + for (transno = 0; transno < aggstate->numtrans; transno++) + { + AggStatePerTrans pertrans = &aggstate->pertrans[transno]; + AggStatePerGroup pergroupstate; + + pergroupstate = &pergroup[transno]; + + if (pertrans->numSortCols > 0) + { + Assert(aggstate->aggstrategy != AGG_HASHED && + aggstate->aggstrategy != AGG_MIXED); + + if (pertrans->numInputs == 1) + process_ordered_aggregate_single(aggstate, + pertrans, + pergroupstate); + else + process_ordered_aggregate_multi(aggstate, + pertrans, + pergroupstate); + } + } + + /* + * Run the final functions. + */ + for (aggno = 0; aggno < aggstate->numaggs; aggno++) + { + AggStatePerAgg peragg = &peraggs[aggno]; + int transno = peragg->transno; + AggStatePerGroup pergroupstate; + + pergroupstate = &pergroup[transno]; + + if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)) + finalize_partialaggregate(aggstate, peragg, pergroupstate, + &aggvalues[aggno], &aggnulls[aggno]); + else + finalize_aggregate(aggstate, peragg, pergroupstate, + &aggvalues[aggno], &aggnulls[aggno]); + } +} + +/* + * Project the result of a group (whose aggs have already been calculated by + * finalize_aggregates). Returns the result slot, or NULL if no row is + * projected (suppressed by qual). + */ +static TupleTableSlot * +project_aggregates(AggState *aggstate) +{ + ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; + + /* + * Check the qual (HAVING clause); if the group does not match, ignore it. + */ + if (ExecQual(aggstate->ss.ps.qual, econtext)) + { + /* + * Form and return projection tuple using the aggregate results and + * the representative input tuple. + */ + return ExecProject(aggstate->ss.ps.ps_ProjInfo); + } + else + InstrCountFiltered1(aggstate, 1); + + return NULL; +} + +/* + * Find input-tuple columns that are needed, dividing them into + * aggregated and unaggregated sets. + */ +static void +find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated) +{ + Agg *agg = (Agg *) aggstate->ss.ps.plan; + FindColsContext context; + + context.is_aggref = false; + context.aggregated = NULL; + context.unaggregated = NULL; + + /* Examine tlist and quals */ + (void) find_cols_walker((Node *) agg->plan.targetlist, &context); + (void) find_cols_walker((Node *) agg->plan.qual, &context); + + /* In some cases, grouping columns will not appear in the tlist */ + for (int i = 0; i < agg->numCols; i++) + context.unaggregated = bms_add_member(context.unaggregated, + agg->grpColIdx[i]); + + *aggregated = context.aggregated; + *unaggregated = context.unaggregated; +} + +static bool +find_cols_walker(Node *node, FindColsContext *context) +{ + if (node == NULL) + return false; + if (IsA(node, Var)) + { + Var *var = (Var *) node; + + /* setrefs.c should have set the varno to OUTER_VAR */ + Assert(var->varno == OUTER_VAR); + Assert(var->varlevelsup == 0); + if (context->is_aggref) + context->aggregated = bms_add_member(context->aggregated, + var->varattno); + else + context->unaggregated = bms_add_member(context->unaggregated, + var->varattno); + return false; + } + if (IsA(node, Aggref)) + { + Assert(!context->is_aggref); + context->is_aggref = true; + expression_tree_walker(node, find_cols_walker, (void *) context); + context->is_aggref = false; + return false; + } + return expression_tree_walker(node, find_cols_walker, + (void *) context); +} + +/* + * (Re-)initialize the hash table(s) to empty. + * + * To implement hashed aggregation, we need a hashtable that stores a + * representative tuple and an array of AggStatePerGroup structs for each + * distinct set of GROUP BY column values. We compute the hash key from the + * GROUP BY columns. The per-group data is allocated in lookup_hash_entry(), + * for each entry. + * + * We have a separate hashtable and associated perhash data structure for each + * grouping set for which we're doing hashing. + * + * The contents of the hash tables always live in the hashcontext's per-tuple + * memory context (there is only one of these for all tables together, since + * they are all reset at the same time). + */ +static void +build_hash_tables(AggState *aggstate) +{ + int setno; + + for (setno = 0; setno < aggstate->num_hashes; ++setno) + { + AggStatePerHash perhash = &aggstate->perhash[setno]; + long nbuckets; + Size memory; + + if (perhash->hashtable != NULL) + { + ResetTupleHashTable(perhash->hashtable); + continue; + } + + Assert(perhash->aggnode->numGroups > 0); + + memory = aggstate->hash_mem_limit / aggstate->num_hashes; + + /* choose reasonable number of buckets per hashtable */ + nbuckets = hash_choose_num_buckets(aggstate->hashentrysize, + perhash->aggnode->numGroups, + memory); + + build_hash_table(aggstate, setno, nbuckets); + } + + aggstate->hash_ngroups_current = 0; +} + +/* + * Build a single hashtable for this grouping set. + */ +static void +build_hash_table(AggState *aggstate, int setno, long nbuckets) +{ + AggStatePerHash perhash = &aggstate->perhash[setno]; + MemoryContext metacxt = aggstate->hash_metacxt; + MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory; + MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory; + Size additionalsize; + + Assert(aggstate->aggstrategy == AGG_HASHED || + aggstate->aggstrategy == AGG_MIXED); + + /* + * Used to make sure initial hash table allocation does not exceed + * hash_mem. Note that the estimate does not include space for + * pass-by-reference transition data values, nor for the representative + * tuple of each group. + */ + additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData); + + perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps, + perhash->hashslot->tts_tupleDescriptor, + perhash->numCols, + perhash->hashGrpColIdxHash, + perhash->eqfuncoids, + perhash->hashfunctions, + perhash->aggnode->grpCollations, + nbuckets, + additionalsize, + metacxt, + hashcxt, + tmpcxt, + DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)); +} + +/* + * Compute columns that actually need to be stored in hashtable entries. The + * incoming tuples from the child plan node will contain grouping columns, + * other columns referenced in our targetlist and qual, columns used to + * compute the aggregate functions, and perhaps just junk columns we don't use + * at all. Only columns of the first two types need to be stored in the + * hashtable, and getting rid of the others can make the table entries + * significantly smaller. The hashtable only contains the relevant columns, + * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table() + * into the format of the normal input descriptor. + * + * Additional columns, in addition to the columns grouped by, come from two + * sources: Firstly functionally dependent columns that we don't need to group + * by themselves, and secondly ctids for row-marks. + * + * To eliminate duplicates, we build a bitmapset of the needed columns, and + * then build an array of the columns included in the hashtable. We might + * still have duplicates if the passed-in grpColIdx has them, which can happen + * in edge cases from semijoins/distinct; these can't always be removed, + * because it's not certain that the duplicate cols will be using the same + * hash function. + * + * Note that the array is preserved over ExecReScanAgg, so we allocate it in + * the per-query context (unlike the hash table itself). + */ +static void +find_hash_columns(AggState *aggstate) +{ + Bitmapset *base_colnos; + Bitmapset *aggregated_colnos; + TupleDesc scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; + List *outerTlist = outerPlanState(aggstate)->plan->targetlist; + int numHashes = aggstate->num_hashes; + EState *estate = aggstate->ss.ps.state; + int j; + + /* Find Vars that will be needed in tlist and qual */ + find_cols(aggstate, &aggregated_colnos, &base_colnos); + aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos); + aggstate->max_colno_needed = 0; + aggstate->all_cols_needed = true; + + for (int i = 0; i < scanDesc->natts; i++) + { + int colno = i + 1; + + if (bms_is_member(colno, aggstate->colnos_needed)) + aggstate->max_colno_needed = colno; + else + aggstate->all_cols_needed = false; + } + + for (j = 0; j < numHashes; ++j) + { + AggStatePerHash perhash = &aggstate->perhash[j]; + Bitmapset *colnos = bms_copy(base_colnos); + AttrNumber *grpColIdx = perhash->aggnode->grpColIdx; + List *hashTlist = NIL; + TupleDesc hashDesc; + int maxCols; + int i; + + perhash->largestGrpColIdx = 0; + + /* + * If we're doing grouping sets, then some Vars might be referenced in + * tlist/qual for the benefit of other grouping sets, but not needed + * when hashing; i.e. prepare_projection_slot will null them out, so + * there'd be no point storing them. Use prepare_projection_slot's + * logic to determine which. + */ + if (aggstate->phases[0].grouped_cols) + { + Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j]; + ListCell *lc; + + foreach(lc, aggstate->all_grouped_cols) + { + int attnum = lfirst_int(lc); + + if (!bms_is_member(attnum, grouped_cols)) + colnos = bms_del_member(colnos, attnum); + } + } + + /* + * Compute maximum number of input columns accounting for possible + * duplications in the grpColIdx array, which can happen in some edge + * cases where HashAggregate was generated as part of a semijoin or a + * DISTINCT. + */ + maxCols = bms_num_members(colnos) + perhash->numCols; + + perhash->hashGrpColIdxInput = + palloc(maxCols * sizeof(AttrNumber)); + perhash->hashGrpColIdxHash = + palloc(perhash->numCols * sizeof(AttrNumber)); + + /* Add all the grouping columns to colnos */ + for (i = 0; i < perhash->numCols; i++) + colnos = bms_add_member(colnos, grpColIdx[i]); + + /* + * First build mapping for columns directly hashed. These are the + * first, because they'll be accessed when computing hash values and + * comparing tuples for exact matches. We also build simple mapping + * for execGrouping, so it knows where to find the to-be-hashed / + * compared columns in the input. + */ + for (i = 0; i < perhash->numCols; i++) + { + perhash->hashGrpColIdxInput[i] = grpColIdx[i]; + perhash->hashGrpColIdxHash[i] = i + 1; + perhash->numhashGrpCols++; + /* delete already mapped columns */ + bms_del_member(colnos, grpColIdx[i]); + } + + /* and add the remaining columns */ + while ((i = bms_first_member(colnos)) >= 0) + { + perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i; + perhash->numhashGrpCols++; + } + + /* and build a tuple descriptor for the hashtable */ + for (i = 0; i < perhash->numhashGrpCols; i++) + { + int varNumber = perhash->hashGrpColIdxInput[i] - 1; + + hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber)); + perhash->largestGrpColIdx = + Max(varNumber + 1, perhash->largestGrpColIdx); + } + + hashDesc = ExecTypeFromTL(hashTlist); + + execTuplesHashPrepare(perhash->numCols, + perhash->aggnode->grpOperators, + &perhash->eqfuncoids, + &perhash->hashfunctions); + perhash->hashslot = + ExecAllocTableSlot(&estate->es_tupleTable, hashDesc, + &TTSOpsMinimalTuple); + + list_free(hashTlist); + bms_free(colnos); + } + + bms_free(base_colnos); +} + +/* + * Estimate per-hash-table-entry overhead. + */ +Size +hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace) +{ + Size tupleChunkSize; + Size pergroupChunkSize; + Size transitionChunkSize; + Size tupleSize = (MAXALIGN(SizeofMinimalTupleHeader) + + tupleWidth); + Size pergroupSize = numTrans * sizeof(AggStatePerGroupData); + + tupleChunkSize = CHUNKHDRSZ + tupleSize; + + if (pergroupSize > 0) + pergroupChunkSize = CHUNKHDRSZ + pergroupSize; + else + pergroupChunkSize = 0; + + if (transitionSpace > 0) + transitionChunkSize = CHUNKHDRSZ + transitionSpace; + else + transitionChunkSize = 0; + + return + sizeof(TupleHashEntryData) + + tupleChunkSize + + pergroupChunkSize + + transitionChunkSize; +} + +/* + * hashagg_recompile_expressions() + * + * Identifies the right phase, compiles the right expression given the + * arguments, and then sets phase->evalfunc to that expression. + * + * Different versions of the compiled expression are needed depending on + * whether hash aggregation has spilled or not, and whether it's reading from + * the outer plan or a tape. Before spilling to disk, the expression reads + * from the outer plan and does not need to perform a NULL check. After + * HashAgg begins to spill, new groups will not be created in the hash table, + * and the AggStatePerGroup array may be NULL; therefore we need to add a null + * pointer check to the expression. Then, when reading spilled data from a + * tape, we change the outer slot type to be a fixed minimal tuple slot. + * + * It would be wasteful to recompile every time, so cache the compiled + * expressions in the AggStatePerPhase, and reuse when appropriate. + */ +static void +hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck) +{ + AggStatePerPhase phase; + int i = minslot ? 1 : 0; + int j = nullcheck ? 1 : 0; + + Assert(aggstate->aggstrategy == AGG_HASHED || + aggstate->aggstrategy == AGG_MIXED); + + if (aggstate->aggstrategy == AGG_HASHED) + phase = &aggstate->phases[0]; + else /* AGG_MIXED */ + phase = &aggstate->phases[1]; + + if (phase->evaltrans_cache[i][j] == NULL) + { + const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops; + bool outerfixed = aggstate->ss.ps.outeropsfixed; + bool dohash = true; + bool dosort = false; + + /* + * If minslot is true, that means we are processing a spilled batch + * (inside agg_refill_hash_table()), and we must not advance the + * sorted grouping sets. + */ + if (aggstate->aggstrategy == AGG_MIXED && !minslot) + dosort = true; + + /* temporarily change the outerops while compiling the expression */ + if (minslot) + { + aggstate->ss.ps.outerops = &TTSOpsMinimalTuple; + aggstate->ss.ps.outeropsfixed = true; + } + + phase->evaltrans_cache[i][j] = ExecBuildAggTrans(aggstate, phase, + dosort, dohash, + nullcheck); + + /* change back */ + aggstate->ss.ps.outerops = outerops; + aggstate->ss.ps.outeropsfixed = outerfixed; + } + + phase->evaltrans = phase->evaltrans_cache[i][j]; +} + +/* + * Set limits that trigger spilling to avoid exceeding hash_mem. Consider the + * number of partitions we expect to create (if we do spill). + * + * There are two limits: a memory limit, and also an ngroups limit. The + * ngroups limit becomes important when we expect transition values to grow + * substantially larger than the initial value. + */ +void +hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, + Size *mem_limit, uint64 *ngroups_limit, + int *num_partitions) +{ + int npartitions; + Size partition_mem; + Size hash_mem_limit = get_hash_memory_limit(); + + /* if not expected to spill, use all of hash_mem */ + if (input_groups * hashentrysize <= hash_mem_limit) + { + if (num_partitions != NULL) + *num_partitions = 0; + *mem_limit = hash_mem_limit; + *ngroups_limit = hash_mem_limit / hashentrysize; + return; + } + + /* + * Calculate expected memory requirements for spilling, which is the size + * of the buffers needed for all the tapes that need to be open at once. + * Then, subtract that from the memory available for holding hash tables. + */ + npartitions = hash_choose_num_partitions(input_groups, + hashentrysize, + used_bits, + NULL); + if (num_partitions != NULL) + *num_partitions = npartitions; + + partition_mem = + HASHAGG_READ_BUFFER_SIZE + + HASHAGG_WRITE_BUFFER_SIZE * npartitions; + + /* + * Don't set the limit below 3/4 of hash_mem. In that case, we are at the + * minimum number of partitions, so we aren't going to dramatically exceed + * work mem anyway. + */ + if (hash_mem_limit > 4 * partition_mem) + *mem_limit = hash_mem_limit - partition_mem; + else + *mem_limit = hash_mem_limit * 0.75; + + if (*mem_limit > hashentrysize) + *ngroups_limit = *mem_limit / hashentrysize; + else + *ngroups_limit = 1; +} + +/* + * hash_agg_check_limits + * + * After adding a new group to the hash table, check whether we need to enter + * spill mode. Allocations may happen without adding new groups (for instance, + * if the transition state size grows), so this check is imperfect. + */ +static void +hash_agg_check_limits(AggState *aggstate) +{ + uint64 ngroups = aggstate->hash_ngroups_current; + Size meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, + true); + Size hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, + true); + + /* + * Don't spill unless there's at least one group in the hash table so we + * can be sure to make progress even in edge cases. + */ + if (aggstate->hash_ngroups_current > 0 && + (meta_mem + hashkey_mem > aggstate->hash_mem_limit || + ngroups > aggstate->hash_ngroups_limit)) + { + hash_agg_enter_spill_mode(aggstate); + } +} + +/* + * Enter "spill mode", meaning that no new groups are added to any of the hash + * tables. Tuples that would create a new group are instead spilled, and + * processed later. + */ +static void +hash_agg_enter_spill_mode(AggState *aggstate) +{ + aggstate->hash_spill_mode = true; + hashagg_recompile_expressions(aggstate, aggstate->table_filled, true); + + if (!aggstate->hash_ever_spilled) + { + Assert(aggstate->hash_tapeinfo == NULL); + Assert(aggstate->hash_spills == NULL); + + aggstate->hash_ever_spilled = true; + + hashagg_tapeinfo_init(aggstate); + + aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes); + + for (int setno = 0; setno < aggstate->num_hashes; setno++) + { + AggStatePerHash perhash = &aggstate->perhash[setno]; + HashAggSpill *spill = &aggstate->hash_spills[setno]; + + hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0, + perhash->aggnode->numGroups, + aggstate->hashentrysize); + } + } +} + +/* + * Update metrics after filling the hash table. + * + * If reading from the outer plan, from_tape should be false; if reading from + * another tape, from_tape should be true. + */ +static void +hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions) +{ + Size meta_mem; + Size hashkey_mem; + Size buffer_mem; + Size total_mem; + + if (aggstate->aggstrategy != AGG_MIXED && + aggstate->aggstrategy != AGG_HASHED) + return; + + /* memory for the hash table itself */ + meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true); + + /* memory for the group keys and transition states */ + hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true); + + /* memory for read/write tape buffers, if spilled */ + buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE; + if (from_tape) + buffer_mem += HASHAGG_READ_BUFFER_SIZE; + + /* update peak mem */ + total_mem = meta_mem + hashkey_mem + buffer_mem; + if (total_mem > aggstate->hash_mem_peak) + aggstate->hash_mem_peak = total_mem; + + /* update disk usage */ + if (aggstate->hash_tapeinfo != NULL) + { + uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024); + + if (aggstate->hash_disk_used < disk_used) + aggstate->hash_disk_used = disk_used; + } + + /* update hashentrysize estimate based on contents */ + if (aggstate->hash_ngroups_current > 0) + { + aggstate->hashentrysize = + sizeof(TupleHashEntryData) + + (hashkey_mem / (double) aggstate->hash_ngroups_current); + } +} + +/* + * Choose a reasonable number of buckets for the initial hash table size. + */ +static long +hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory) +{ + long max_nbuckets; + long nbuckets = ngroups; + + max_nbuckets = memory / hashentrysize; + + /* + * Underestimating is better than overestimating. Too many buckets crowd + * out space for group keys and transition state values. + */ + max_nbuckets >>= 1; + + if (nbuckets > max_nbuckets) + nbuckets = max_nbuckets; + + return Max(nbuckets, 1); +} + +/* + * Determine the number of partitions to create when spilling, which will + * always be a power of two. If log2_npartitions is non-NULL, set + * *log2_npartitions to the log2() of the number of partitions. + */ +static int +hash_choose_num_partitions(double input_groups, double hashentrysize, + int used_bits, int *log2_npartitions) +{ + Size hash_mem_limit = get_hash_memory_limit(); + double partition_limit; + double mem_wanted; + double dpartitions; + int npartitions; + int partition_bits; + + /* + * Avoid creating so many partitions that the memory requirements of the + * open partition files are greater than 1/4 of hash_mem. + */ + partition_limit = + (hash_mem_limit * 0.25 - HASHAGG_READ_BUFFER_SIZE) / + HASHAGG_WRITE_BUFFER_SIZE; + + mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize; + + /* make enough partitions so that each one is likely to fit in memory */ + dpartitions = 1 + (mem_wanted / hash_mem_limit); + + if (dpartitions > partition_limit) + dpartitions = partition_limit; + + if (dpartitions < HASHAGG_MIN_PARTITIONS) + dpartitions = HASHAGG_MIN_PARTITIONS; + if (dpartitions > HASHAGG_MAX_PARTITIONS) + dpartitions = HASHAGG_MAX_PARTITIONS; + + /* HASHAGG_MAX_PARTITIONS limit makes this safe */ + npartitions = (int) dpartitions; + + /* ceil(log2(npartitions)) */ + partition_bits = my_log2(npartitions); + + /* make sure that we don't exhaust the hash bits */ + if (partition_bits + used_bits >= 32) + partition_bits = 32 - used_bits; + + if (log2_npartitions != NULL) + *log2_npartitions = partition_bits; + + /* number of partitions will be a power of two */ + npartitions = 1 << partition_bits; + + return npartitions; +} + +/* + * Initialize a freshly-created TupleHashEntry. + */ +static void +initialize_hash_entry(AggState *aggstate, TupleHashTable hashtable, + TupleHashEntry entry) +{ + AggStatePerGroup pergroup; + int transno; + + aggstate->hash_ngroups_current++; + hash_agg_check_limits(aggstate); + + /* no need to allocate or initialize per-group state */ + if (aggstate->numtrans == 0) + return; + + pergroup = (AggStatePerGroup) + MemoryContextAlloc(hashtable->tablecxt, + sizeof(AggStatePerGroupData) * aggstate->numtrans); + + entry->additional = pergroup; + + /* + * Initialize aggregates for new tuple group, lookup_hash_entries() + * already has selected the relevant grouping set. + */ + for (transno = 0; transno < aggstate->numtrans; transno++) + { + AggStatePerTrans pertrans = &aggstate->pertrans[transno]; + AggStatePerGroup pergroupstate = &pergroup[transno]; + + initialize_aggregate(aggstate, pertrans, pergroupstate); + } +} + +/* + * Look up hash entries for the current tuple in all hashed grouping sets. + * + * Be aware that lookup_hash_entry can reset the tmpcontext. + * + * Some entries may be left NULL if we are in "spill mode". The same tuple + * will belong to different groups for each grouping set, so may match a group + * already in memory for one set and match a group not in memory for another + * set. When in "spill mode", the tuple will be spilled for each grouping set + * where it doesn't match a group in memory. + * + * NB: It's possible to spill the same tuple for several different grouping + * sets. This may seem wasteful, but it's actually a trade-off: if we spill + * the tuple multiple times for multiple grouping sets, it can be partitioned + * for each grouping set, making the refilling of the hash table very + * efficient. + */ +static void +lookup_hash_entries(AggState *aggstate) +{ + AggStatePerGroup *pergroup = aggstate->hash_pergroup; + TupleTableSlot *outerslot = aggstate->tmpcontext->ecxt_outertuple; + int setno; + + for (setno = 0; setno < aggstate->num_hashes; setno++) + { + AggStatePerHash perhash = &aggstate->perhash[setno]; + TupleHashTable hashtable = perhash->hashtable; + TupleTableSlot *hashslot = perhash->hashslot; + TupleHashEntry entry; + uint32 hash; + bool isnew = false; + bool *p_isnew; + + /* if hash table already spilled, don't create new entries */ + p_isnew = aggstate->hash_spill_mode ? NULL : &isnew; + + select_current_set(aggstate, setno, true); + prepare_hash_slot(perhash, + outerslot, + hashslot); + + entry = LookupTupleHashEntry(hashtable, hashslot, + p_isnew, &hash); + + if (entry != NULL) + { + if (isnew) + initialize_hash_entry(aggstate, hashtable, entry); + pergroup[setno] = entry->additional; + } + else + { + HashAggSpill *spill = &aggstate->hash_spills[setno]; + TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple; + + if (spill->partitions == NULL) + hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0, + perhash->aggnode->numGroups, + aggstate->hashentrysize); + + hashagg_spill_tuple(aggstate, spill, slot, hash); + pergroup[setno] = NULL; + } + } +} + +/* + * ExecAgg - + * + * ExecAgg receives tuples from its outer subplan and aggregates over + * the appropriate attribute for each aggregate function use (Aggref + * node) appearing in the targetlist or qual of the node. The number + * of tuples to aggregate over depends on whether grouped or plain + * aggregation is selected. In grouped aggregation, we produce a result + * row for each group; in plain aggregation there's a single result row + * for the whole query. In either case, the value of each aggregate is + * stored in the expression context to be used when ExecProject evaluates + * the result tuple. + */ +static TupleTableSlot * +ExecAgg(PlanState *pstate) +{ + AggState *node = castNode(AggState, pstate); + TupleTableSlot *result = NULL; + + CHECK_FOR_INTERRUPTS(); + + if (!node->agg_done) + { + /* Dispatch based on strategy */ + switch (node->phase->aggstrategy) + { + case AGG_HASHED: + if (!node->table_filled) + agg_fill_hash_table(node); + /* FALLTHROUGH */ + case AGG_MIXED: + result = agg_retrieve_hash_table(node); + break; + case AGG_PLAIN: + case AGG_SORTED: + result = agg_retrieve_direct(node); + break; + } + + if (!TupIsNull(result)) + return result; + } + + return NULL; +} + +/* + * ExecAgg for non-hashed case + */ +static TupleTableSlot * +agg_retrieve_direct(AggState *aggstate) +{ + Agg *node = aggstate->phase->aggnode; + ExprContext *econtext; + ExprContext *tmpcontext; + AggStatePerAgg peragg; + AggStatePerGroup *pergroups; + TupleTableSlot *outerslot; + TupleTableSlot *firstSlot; + TupleTableSlot *result; + bool hasGroupingSets = aggstate->phase->numsets > 0; + int numGroupingSets = Max(aggstate->phase->numsets, 1); + int currentSet; + int nextSetSize; + int numReset; + int i; + + /* + * get state info from node + * + * econtext is the per-output-tuple expression context + * + * tmpcontext is the per-input-tuple expression context + */ + econtext = aggstate->ss.ps.ps_ExprContext; + tmpcontext = aggstate->tmpcontext; + + peragg = aggstate->peragg; + pergroups = aggstate->pergroups; + firstSlot = aggstate->ss.ss_ScanTupleSlot; + + /* + * We loop retrieving groups until we find one matching + * aggstate->ss.ps.qual + * + * For grouping sets, we have the invariant that aggstate->projected_set + * is either -1 (initial call) or the index (starting from 0) in + * gset_lengths for the group we just completed (either by projecting a + * row or by discarding it in the qual). + */ + while (!aggstate->agg_done) + { + /* + * Clear the per-output-tuple context for each group, as well as + * aggcontext (which contains any pass-by-ref transvalues of the old + * group). Some aggregate functions store working state in child + * contexts; those now get reset automatically without us needing to + * do anything special. + * + * We use ReScanExprContext not just ResetExprContext because we want + * any registered shutdown callbacks to be called. That allows + * aggregate functions to ensure they've cleaned up any non-memory + * resources. + */ + ReScanExprContext(econtext); + + /* + * Determine how many grouping sets need to be reset at this boundary. + */ + if (aggstate->projected_set >= 0 && + aggstate->projected_set < numGroupingSets) + numReset = aggstate->projected_set + 1; + else + numReset = numGroupingSets; + + /* + * numReset can change on a phase boundary, but that's OK; we want to + * reset the contexts used in _this_ phase, and later, after possibly + * changing phase, initialize the right number of aggregates for the + * _new_ phase. + */ + + for (i = 0; i < numReset; i++) + { + ReScanExprContext(aggstate->aggcontexts[i]); + } + + /* + * Check if input is complete and there are no more groups to project + * in this phase; move to next phase or mark as done. + */ + if (aggstate->input_done == true && + aggstate->projected_set >= (numGroupingSets - 1)) + { + if (aggstate->current_phase < aggstate->numphases - 1) + { + initialize_phase(aggstate, aggstate->current_phase + 1); + aggstate->input_done = false; + aggstate->projected_set = -1; + numGroupingSets = Max(aggstate->phase->numsets, 1); + node = aggstate->phase->aggnode; + numReset = numGroupingSets; + } + else if (aggstate->aggstrategy == AGG_MIXED) + { + /* + * Mixed mode; we've output all the grouped stuff and have + * full hashtables, so switch to outputting those. + */ + initialize_phase(aggstate, 0); + aggstate->table_filled = true; + ResetTupleHashIterator(aggstate->perhash[0].hashtable, + &aggstate->perhash[0].hashiter); + select_current_set(aggstate, 0, true); + return agg_retrieve_hash_table(aggstate); + } + else + { + aggstate->agg_done = true; + break; + } + } + + /* + * Get the number of columns in the next grouping set after the last + * projected one (if any). This is the number of columns to compare to + * see if we reached the boundary of that set too. + */ + if (aggstate->projected_set >= 0 && + aggstate->projected_set < (numGroupingSets - 1)) + nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1]; + else + nextSetSize = 0; + + /*---------- + * If a subgroup for the current grouping set is present, project it. + * + * We have a new group if: + * - we're out of input but haven't projected all grouping sets + * (checked above) + * OR + * - we already projected a row that wasn't from the last grouping + * set + * AND + * - the next grouping set has at least one grouping column (since + * empty grouping sets project only once input is exhausted) + * AND + * - the previous and pending rows differ on the grouping columns + * of the next grouping set + *---------- + */ + tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple; + if (aggstate->input_done || + (node->aggstrategy != AGG_PLAIN && + aggstate->projected_set != -1 && + aggstate->projected_set < (numGroupingSets - 1) && + nextSetSize > 0 && + !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1], + tmpcontext))) + { + aggstate->projected_set += 1; + + Assert(aggstate->projected_set < numGroupingSets); + Assert(nextSetSize > 0 || aggstate->input_done); + } + else + { + /* + * We no longer care what group we just projected, the next + * projection will always be the first (or only) grouping set + * (unless the input proves to be empty). + */ + aggstate->projected_set = 0; + + /* + * If we don't already have the first tuple of the new group, + * fetch it from the outer plan. + */ + if (aggstate->grp_firstTuple == NULL) + { + outerslot = fetch_input_tuple(aggstate); + if (!TupIsNull(outerslot)) + { + /* + * Make a copy of the first input tuple; we will use this + * for comparisons (in group mode) and for projection. + */ + aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot); + } + else + { + /* outer plan produced no tuples at all */ + if (hasGroupingSets) + { + /* + * If there was no input at all, we need to project + * rows only if there are grouping sets of size 0. + * Note that this implies that there can't be any + * references to ungrouped Vars, which would otherwise + * cause issues with the empty output slot. + * + * XXX: This is no longer true, we currently deal with + * this in finalize_aggregates(). + */ + aggstate->input_done = true; + + while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0) + { + aggstate->projected_set += 1; + if (aggstate->projected_set >= numGroupingSets) + { + /* + * We can't set agg_done here because we might + * have more phases to do, even though the + * input is empty. So we need to restart the + * whole outer loop. + */ + break; + } + } + + if (aggstate->projected_set >= numGroupingSets) + continue; + } + else + { + aggstate->agg_done = true; + /* If we are grouping, we should produce no tuples too */ + if (node->aggstrategy != AGG_PLAIN) + return NULL; + } + } + } + + /* + * Initialize working state for a new input tuple group. + */ + initialize_aggregates(aggstate, pergroups, numReset); + + if (aggstate->grp_firstTuple != NULL) + { + /* + * Store the copied first input tuple in the tuple table slot + * reserved for it. The tuple will be deleted when it is + * cleared from the slot. + */ + ExecForceStoreHeapTuple(aggstate->grp_firstTuple, + firstSlot, true); + aggstate->grp_firstTuple = NULL; /* don't keep two pointers */ + + /* set up for first advance_aggregates call */ + tmpcontext->ecxt_outertuple = firstSlot; + + /* + * Process each outer-plan tuple, and then fetch the next one, + * until we exhaust the outer plan or cross a group boundary. + */ + for (;;) + { + /* + * During phase 1 only of a mixed agg, we need to update + * hashtables as well in advance_aggregates. + */ + if (aggstate->aggstrategy == AGG_MIXED && + aggstate->current_phase == 1) + { + lookup_hash_entries(aggstate); + } + + /* Advance the aggregates (or combine functions) */ + advance_aggregates(aggstate); + + /* Reset per-input-tuple context after each tuple */ + ResetExprContext(tmpcontext); + + outerslot = fetch_input_tuple(aggstate); + if (TupIsNull(outerslot)) + { + /* no more outer-plan tuples available */ + + /* if we built hash tables, finalize any spills */ + if (aggstate->aggstrategy == AGG_MIXED && + aggstate->current_phase == 1) + hashagg_finish_initial_spills(aggstate); + + if (hasGroupingSets) + { + aggstate->input_done = true; + break; + } + else + { + aggstate->agg_done = true; + break; + } + } + /* set up for next advance_aggregates call */ + tmpcontext->ecxt_outertuple = outerslot; + + /* + * If we are grouping, check whether we've crossed a group + * boundary. + */ + if (node->aggstrategy != AGG_PLAIN) + { + tmpcontext->ecxt_innertuple = firstSlot; + if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1], + tmpcontext)) + { + aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot); + break; + } + } + } + } + + /* + * Use the representative input tuple for any references to + * non-aggregated input columns in aggregate direct args, the node + * qual, and the tlist. (If we are not grouping, and there are no + * input rows at all, we will come here with an empty firstSlot + * ... but if not grouping, there can't be any references to + * non-aggregated input columns, so no problem.) + */ + econtext->ecxt_outertuple = firstSlot; + } + + Assert(aggstate->projected_set >= 0); + + currentSet = aggstate->projected_set; + + prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet); + + select_current_set(aggstate, currentSet, false); + + finalize_aggregates(aggstate, + peragg, + pergroups[currentSet]); + + /* + * If there's no row to project right now, we must continue rather + * than returning a null since there might be more groups. + */ + result = project_aggregates(aggstate); + if (result) + return result; + } + + /* No more groups */ + return NULL; +} + +/* + * ExecAgg for hashed case: read input and build hash table + */ +static void +agg_fill_hash_table(AggState *aggstate) +{ + TupleTableSlot *outerslot; + ExprContext *tmpcontext = aggstate->tmpcontext; + + /* + * Process each outer-plan tuple, and then fetch the next one, until we + * exhaust the outer plan. + */ + for (;;) + { + outerslot = fetch_input_tuple(aggstate); + if (TupIsNull(outerslot)) + break; + + /* set up for lookup_hash_entries and advance_aggregates */ + tmpcontext->ecxt_outertuple = outerslot; + + /* Find or build hashtable entries */ + lookup_hash_entries(aggstate); + + /* Advance the aggregates (or combine functions) */ + advance_aggregates(aggstate); + + /* + * Reset per-input-tuple context after each tuple, but note that the + * hash lookups do this too + */ + ResetExprContext(aggstate->tmpcontext); + } + + /* finalize spills, if any */ + hashagg_finish_initial_spills(aggstate); + + aggstate->table_filled = true; + /* Initialize to walk the first hash table */ + select_current_set(aggstate, 0, true); + ResetTupleHashIterator(aggstate->perhash[0].hashtable, + &aggstate->perhash[0].hashiter); +} + +/* + * If any data was spilled during hash aggregation, reset the hash table and + * reprocess one batch of spilled data. After reprocessing a batch, the hash + * table will again contain data, ready to be consumed by + * agg_retrieve_hash_table_in_memory(). + * + * Should only be called after all in memory hash table entries have been + * finalized and emitted. + * + * Return false when input is exhausted and there's no more work to be done; + * otherwise return true. + */ +static bool +agg_refill_hash_table(AggState *aggstate) +{ + HashAggBatch *batch; + AggStatePerHash perhash; + HashAggSpill spill; + HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo; + bool spill_initialized = false; + + if (aggstate->hash_batches == NIL) + return false; + + /* hash_batches is a stack, with the top item at the end of the list */ + batch = llast(aggstate->hash_batches); + aggstate->hash_batches = list_delete_last(aggstate->hash_batches); + + hash_agg_set_limits(aggstate->hashentrysize, batch->input_card, + batch->used_bits, &aggstate->hash_mem_limit, + &aggstate->hash_ngroups_limit, NULL); + + /* + * Each batch only processes one grouping set; set the rest to NULL so + * that advance_aggregates() knows to ignore them. We don't touch + * pergroups for sorted grouping sets here, because they will be needed if + * we rescan later. The expressions for sorted grouping sets will not be + * evaluated after we recompile anyway. + */ + MemSet(aggstate->hash_pergroup, 0, + sizeof(AggStatePerGroup) * aggstate->num_hashes); + + /* free memory and reset hash tables */ + ReScanExprContext(aggstate->hashcontext); + for (int setno = 0; setno < aggstate->num_hashes; setno++) + ResetTupleHashTable(aggstate->perhash[setno].hashtable); + + aggstate->hash_ngroups_current = 0; + + /* + * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output + * happens in phase 0. So, we switch to phase 1 when processing a batch, + * and back to phase 0 after the batch is done. + */ + Assert(aggstate->current_phase == 0); + if (aggstate->phase->aggstrategy == AGG_MIXED) + { + aggstate->current_phase = 1; + aggstate->phase = &aggstate->phases[aggstate->current_phase]; + } + + select_current_set(aggstate, batch->setno, true); + + perhash = &aggstate->perhash[aggstate->current_set]; + + /* + * Spilled tuples are always read back as MinimalTuples, which may be + * different from the outer plan, so recompile the aggregate expressions. + * + * We still need the NULL check, because we are only processing one + * grouping set at a time and the rest will be NULL. + */ + hashagg_recompile_expressions(aggstate, true, true); + + for (;;) + { + TupleTableSlot *spillslot = aggstate->hash_spill_rslot; + TupleTableSlot *hashslot = perhash->hashslot; + TupleHashEntry entry; + MinimalTuple tuple; + uint32 hash; + bool isnew = false; + bool *p_isnew = aggstate->hash_spill_mode ? NULL : &isnew; + + CHECK_FOR_INTERRUPTS(); + + tuple = hashagg_batch_read(batch, &hash); + if (tuple == NULL) + break; + + ExecStoreMinimalTuple(tuple, spillslot, true); + aggstate->tmpcontext->ecxt_outertuple = spillslot; + + prepare_hash_slot(perhash, + aggstate->tmpcontext->ecxt_outertuple, + hashslot); + entry = LookupTupleHashEntryHash( + perhash->hashtable, hashslot, p_isnew, hash); + + if (entry != NULL) + { + if (isnew) + initialize_hash_entry(aggstate, perhash->hashtable, entry); + aggstate->hash_pergroup[batch->setno] = entry->additional; + advance_aggregates(aggstate); + } + else + { + if (!spill_initialized) + { + /* + * Avoid initializing the spill until we actually need it so + * that we don't assign tapes that will never be used. + */ + spill_initialized = true; + hashagg_spill_init(&spill, tapeinfo, batch->used_bits, + batch->input_card, aggstate->hashentrysize); + } + /* no memory for a new group, spill */ + hashagg_spill_tuple(aggstate, &spill, spillslot, hash); + + aggstate->hash_pergroup[batch->setno] = NULL; + } + + /* + * Reset per-input-tuple context after each tuple, but note that the + * hash lookups do this too + */ + ResetExprContext(aggstate->tmpcontext); + } + + hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum); + + /* change back to phase 0 */ + aggstate->current_phase = 0; + aggstate->phase = &aggstate->phases[aggstate->current_phase]; + + if (spill_initialized) + { + hashagg_spill_finish(aggstate, &spill, batch->setno); + hash_agg_update_metrics(aggstate, true, spill.npartitions); + } + else + hash_agg_update_metrics(aggstate, true, 0); + + aggstate->hash_spill_mode = false; + + /* prepare to walk the first hash table */ + select_current_set(aggstate, batch->setno, true); + ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable, + &aggstate->perhash[batch->setno].hashiter); + + pfree(batch); + + return true; +} + +/* + * ExecAgg for hashed case: retrieving groups from hash table + * + * After exhausting in-memory tuples, also try refilling the hash table using + * previously-spilled tuples. Only returns NULL after all in-memory and + * spilled tuples are exhausted. + */ +static TupleTableSlot * +agg_retrieve_hash_table(AggState *aggstate) +{ + TupleTableSlot *result = NULL; + + while (result == NULL) + { + result = agg_retrieve_hash_table_in_memory(aggstate); + if (result == NULL) + { + if (!agg_refill_hash_table(aggstate)) + { + aggstate->agg_done = true; + break; + } + } + } + + return result; +} + +/* + * Retrieve the groups from the in-memory hash tables without considering any + * spilled tuples. + */ +static TupleTableSlot * +agg_retrieve_hash_table_in_memory(AggState *aggstate) +{ + ExprContext *econtext; + AggStatePerAgg peragg; + AggStatePerGroup pergroup; + TupleHashEntryData *entry; + TupleTableSlot *firstSlot; + TupleTableSlot *result; + AggStatePerHash perhash; + + /* + * get state info from node. + * + * econtext is the per-output-tuple expression context. + */ + econtext = aggstate->ss.ps.ps_ExprContext; + peragg = aggstate->peragg; + firstSlot = aggstate->ss.ss_ScanTupleSlot; + + /* + * Note that perhash (and therefore anything accessed through it) can + * change inside the loop, as we change between grouping sets. + */ + perhash = &aggstate->perhash[aggstate->current_set]; + + /* + * We loop retrieving groups until we find one satisfying + * aggstate->ss.ps.qual + */ + for (;;) + { + TupleTableSlot *hashslot = perhash->hashslot; + int i; + + CHECK_FOR_INTERRUPTS(); + + /* + * Find the next entry in the hash table + */ + entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter); + if (entry == NULL) + { + int nextset = aggstate->current_set + 1; + + if (nextset < aggstate->num_hashes) + { + /* + * Switch to next grouping set, reinitialize, and restart the + * loop. + */ + select_current_set(aggstate, nextset, true); + + perhash = &aggstate->perhash[aggstate->current_set]; + + ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter); + + continue; + } + else + { + return NULL; + } + } + + /* + * Clear the per-output-tuple context for each group + * + * We intentionally don't use ReScanExprContext here; if any aggs have + * registered shutdown callbacks, they mustn't be called yet, since we + * might not be done with that agg. + */ + ResetExprContext(econtext); + + /* + * Transform representative tuple back into one with the right + * columns. + */ + ExecStoreMinimalTuple(entry->firstTuple, hashslot, false); + slot_getallattrs(hashslot); + + ExecClearTuple(firstSlot); + memset(firstSlot->tts_isnull, true, + firstSlot->tts_tupleDescriptor->natts * sizeof(bool)); + + for (i = 0; i < perhash->numhashGrpCols; i++) + { + int varNumber = perhash->hashGrpColIdxInput[i] - 1; + + firstSlot->tts_values[varNumber] = hashslot->tts_values[i]; + firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i]; + } + ExecStoreVirtualTuple(firstSlot); + + pergroup = (AggStatePerGroup) entry->additional; + + /* + * Use the representative input tuple for any references to + * non-aggregated input columns in the qual and tlist. + */ + econtext->ecxt_outertuple = firstSlot; + + prepare_projection_slot(aggstate, + econtext->ecxt_outertuple, + aggstate->current_set); + + finalize_aggregates(aggstate, peragg, pergroup); + + result = project_aggregates(aggstate); + if (result) + return result; + } + + /* No more groups */ + return NULL; +} + +/* + * Initialize HashTapeInfo + */ +static void +hashagg_tapeinfo_init(AggState *aggstate) +{ + HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo)); + int init_tapes = 16; /* expanded dynamically */ + + tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1); + tapeinfo->ntapes = init_tapes; + tapeinfo->nfreetapes = init_tapes; + tapeinfo->freetapes_alloc = init_tapes; + tapeinfo->freetapes = palloc(init_tapes * sizeof(int)); + for (int i = 0; i < init_tapes; i++) + tapeinfo->freetapes[i] = i; + + aggstate->hash_tapeinfo = tapeinfo; +} + +/* + * Assign unused tapes to spill partitions, extending the tape set if + * necessary. + */ +static void +hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions, + int npartitions) +{ + int partidx = 0; + + /* use free tapes if available */ + while (partidx < npartitions && tapeinfo->nfreetapes > 0) + partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes]; + + if (partidx < npartitions) + { + LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx); + + while (partidx < npartitions) + partitions[partidx++] = tapeinfo->ntapes++; + } +} + +/* + * After a tape has already been written to and then read, this function + * rewinds it for writing and adds it to the free list. + */ +static void +hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum) +{ + /* rewinding frees the buffer while not in use */ + LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum); + if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes) + { + tapeinfo->freetapes_alloc <<= 1; + tapeinfo->freetapes = repalloc(tapeinfo->freetapes, + tapeinfo->freetapes_alloc * sizeof(int)); + } + tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum; +} + +/* + * hashagg_spill_init + * + * Called after we determined that spilling is necessary. Chooses the number + * of partitions to create, and initializes them. + */ +static void +hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, + double input_groups, double hashentrysize) +{ + int npartitions; + int partition_bits; + + npartitions = hash_choose_num_partitions(input_groups, hashentrysize, + used_bits, &partition_bits); + + spill->partitions = palloc0(sizeof(int) * npartitions); + spill->ntuples = palloc0(sizeof(int64) * npartitions); + spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions); + + hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions); + + spill->tapeset = tapeinfo->tapeset; + spill->shift = 32 - used_bits - partition_bits; + spill->mask = (npartitions - 1) << spill->shift; + spill->npartitions = npartitions; + + for (int i = 0; i < npartitions; i++) + initHyperLogLog(&spill->hll_card[i], HASHAGG_HLL_BIT_WIDTH); +} + +/* + * hashagg_spill_tuple + * + * No room for new groups in the hash table. Save for later in the appropriate + * partition. + */ +static Size +hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, + TupleTableSlot *inputslot, uint32 hash) +{ + LogicalTapeSet *tapeset = spill->tapeset; + TupleTableSlot *spillslot; + int partition; + MinimalTuple tuple; + int tapenum; + int total_written = 0; + bool shouldFree; + + Assert(spill->partitions != NULL); + + /* spill only attributes that we actually need */ + if (!aggstate->all_cols_needed) + { + spillslot = aggstate->hash_spill_wslot; + slot_getsomeattrs(inputslot, aggstate->max_colno_needed); + ExecClearTuple(spillslot); + for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++) + { + if (bms_is_member(i + 1, aggstate->colnos_needed)) + { + spillslot->tts_values[i] = inputslot->tts_values[i]; + spillslot->tts_isnull[i] = inputslot->tts_isnull[i]; + } + else + spillslot->tts_isnull[i] = true; + } + ExecStoreVirtualTuple(spillslot); + } + else + spillslot = inputslot; + + tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree); + + partition = (hash & spill->mask) >> spill->shift; + spill->ntuples[partition]++; + + /* + * All hash values destined for a given partition have some bits in + * common, which causes bad HLL cardinality estimates. Hash the hash to + * get a more uniform distribution. + */ + addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash)); + + tapenum = spill->partitions[partition]; + + LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32)); + total_written += sizeof(uint32); + + LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len); + total_written += tuple->t_len; + + if (shouldFree) + pfree(tuple); + + return total_written; +} + +/* + * hashagg_batch_new + * + * Construct a HashAggBatch item, which represents one iteration of HashAgg to + * be done. + */ +static HashAggBatch * +hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno, + int64 input_tuples, double input_card, int used_bits) +{ + HashAggBatch *batch = palloc0(sizeof(HashAggBatch)); + + batch->setno = setno; + batch->used_bits = used_bits; + batch->tapeset = tapeset; + batch->input_tapenum = tapenum; + batch->input_tuples = input_tuples; + batch->input_card = input_card; + + return batch; +} + +/* + * read_spilled_tuple + * read the next tuple from a batch's tape. Return NULL if no more. + */ +static MinimalTuple +hashagg_batch_read(HashAggBatch *batch, uint32 *hashp) +{ + LogicalTapeSet *tapeset = batch->tapeset; + int tapenum = batch->input_tapenum; + MinimalTuple tuple; + uint32 t_len; + size_t nread; + uint32 hash; + + nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32)); + if (nread == 0) + return NULL; + if (nread != sizeof(uint32)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes", + tapenum, sizeof(uint32), nread))); + if (hashp != NULL) + *hashp = hash; + + nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len)); + if (nread != sizeof(uint32)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes", + tapenum, sizeof(uint32), nread))); + + tuple = (MinimalTuple) palloc(t_len); + tuple->t_len = t_len; + + nread = LogicalTapeRead(tapeset, tapenum, + (void *) ((char *) tuple + sizeof(uint32)), + t_len - sizeof(uint32)); + if (nread != t_len - sizeof(uint32)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes", + tapenum, t_len - sizeof(uint32), nread))); + + return tuple; +} + +/* + * hashagg_finish_initial_spills + * + * After a HashAggBatch has been processed, it may have spilled tuples to + * disk. If so, turn the spilled partitions into new batches that must later + * be executed. + */ +static void +hashagg_finish_initial_spills(AggState *aggstate) +{ + int setno; + int total_npartitions = 0; + + if (aggstate->hash_spills != NULL) + { + for (setno = 0; setno < aggstate->num_hashes; setno++) + { + HashAggSpill *spill = &aggstate->hash_spills[setno]; + + total_npartitions += spill->npartitions; + hashagg_spill_finish(aggstate, spill, setno); + } + + /* + * We're not processing tuples from outer plan any more; only + * processing batches of spilled tuples. The initial spill structures + * are no longer needed. + */ + pfree(aggstate->hash_spills); + aggstate->hash_spills = NULL; + } + + hash_agg_update_metrics(aggstate, false, total_npartitions); + aggstate->hash_spill_mode = false; +} + +/* + * hashagg_spill_finish + * + * Transform spill partitions into new batches. + */ +static void +hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno) +{ + int i; + int used_bits = 32 - spill->shift; + + if (spill->npartitions == 0) + return; /* didn't spill */ + + for (i = 0; i < spill->npartitions; i++) + { + LogicalTapeSet *tapeset = aggstate->hash_tapeinfo->tapeset; + int tapenum = spill->partitions[i]; + HashAggBatch *new_batch; + double cardinality; + + /* if the partition is empty, don't create a new batch of work */ + if (spill->ntuples[i] == 0) + continue; + + cardinality = estimateHyperLogLog(&spill->hll_card[i]); + freeHyperLogLog(&spill->hll_card[i]); + + /* rewinding frees the buffer while not in use */ + LogicalTapeRewindForRead(tapeset, tapenum, + HASHAGG_READ_BUFFER_SIZE); + + new_batch = hashagg_batch_new(tapeset, tapenum, setno, + spill->ntuples[i], cardinality, + used_bits); + aggstate->hash_batches = lappend(aggstate->hash_batches, new_batch); + aggstate->hash_batches_used++; + } + + pfree(spill->ntuples); + pfree(spill->hll_card); + pfree(spill->partitions); +} + +/* + * Free resources related to a spilled HashAgg. + */ +static void +hashagg_reset_spill_state(AggState *aggstate) +{ + /* free spills from initial pass */ + if (aggstate->hash_spills != NULL) + { + int setno; + + for (setno = 0; setno < aggstate->num_hashes; setno++) + { + HashAggSpill *spill = &aggstate->hash_spills[setno]; + + pfree(spill->ntuples); + pfree(spill->partitions); + } + pfree(aggstate->hash_spills); + aggstate->hash_spills = NULL; + } + + /* free batches */ + list_free_deep(aggstate->hash_batches); + aggstate->hash_batches = NIL; + + /* close tape set */ + if (aggstate->hash_tapeinfo != NULL) + { + HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo; + + LogicalTapeSetClose(tapeinfo->tapeset); + pfree(tapeinfo->freetapes); + pfree(tapeinfo); + aggstate->hash_tapeinfo = NULL; + } +} + + +/* ----------------- + * ExecInitAgg + * + * Creates the run-time information for the agg node produced by the + * planner and initializes its outer subtree. + * + * ----------------- + */ +AggState * +ExecInitAgg(Agg *node, EState *estate, int eflags) +{ + AggState *aggstate; + AggStatePerAgg peraggs; + AggStatePerTrans pertransstates; + AggStatePerGroup *pergroups; + Plan *outerPlan; + ExprContext *econtext; + TupleDesc scanDesc; + int max_aggno; + int max_transno; + int numaggrefs; + int numaggs; + int numtrans; + int phase; + int phaseidx; + ListCell *l; + Bitmapset *all_grouped_cols = NULL; + int numGroupingSets = 1; + int numPhases; + int numHashes; + int i = 0; + int j = 0; + bool use_hashing = (node->aggstrategy == AGG_HASHED || + node->aggstrategy == AGG_MIXED); + + /* check for unsupported flags */ + Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); + + /* + * create state structure + */ + aggstate = makeNode(AggState); + aggstate->ss.ps.plan = (Plan *) node; + aggstate->ss.ps.state = estate; + aggstate->ss.ps.ExecProcNode = ExecAgg; + + aggstate->aggs = NIL; + aggstate->numaggs = 0; + aggstate->numtrans = 0; + aggstate->aggstrategy = node->aggstrategy; + aggstate->aggsplit = node->aggsplit; + aggstate->maxsets = 0; + aggstate->projected_set = -1; + aggstate->current_set = 0; + aggstate->peragg = NULL; + aggstate->pertrans = NULL; + aggstate->curperagg = NULL; + aggstate->curpertrans = NULL; + aggstate->input_done = false; + aggstate->agg_done = false; + aggstate->pergroups = NULL; + aggstate->grp_firstTuple = NULL; + aggstate->sort_in = NULL; + aggstate->sort_out = NULL; + + /* + * phases[0] always exists, but is dummy in sorted/plain mode + */ + numPhases = (use_hashing ? 1 : 2); + numHashes = (use_hashing ? 1 : 0); + + /* + * Calculate the maximum number of grouping sets in any phase; this + * determines the size of some allocations. Also calculate the number of + * phases, since all hashed/mixed nodes contribute to only a single phase. + */ + if (node->groupingSets) + { + numGroupingSets = list_length(node->groupingSets); + + foreach(l, node->chain) + { + Agg *agg = lfirst(l); + + numGroupingSets = Max(numGroupingSets, + list_length(agg->groupingSets)); + + /* + * additional AGG_HASHED aggs become part of phase 0, but all + * others add an extra phase. + */ + if (agg->aggstrategy != AGG_HASHED) + ++numPhases; + else + ++numHashes; + } + } + + aggstate->maxsets = numGroupingSets; + aggstate->numphases = numPhases; + + aggstate->aggcontexts = (ExprContext **) + palloc0(sizeof(ExprContext *) * numGroupingSets); + + /* + * Create expression contexts. We need three or more, one for + * per-input-tuple processing, one for per-output-tuple processing, one + * for all the hashtables, and one for each grouping set. The per-tuple + * memory context of the per-grouping-set ExprContexts (aggcontexts) + * replaces the standalone memory context formerly used to hold transition + * values. We cheat a little by using ExecAssignExprContext() to build + * all of them. + * + * NOTE: the details of what is stored in aggcontexts and what is stored + * in the regular per-query memory context are driven by a simple + * decision: we want to reset the aggcontext at group boundaries (if not + * hashing) and in ExecReScanAgg to recover no-longer-wanted space. + */ + ExecAssignExprContext(estate, &aggstate->ss.ps); + aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext; + + for (i = 0; i < numGroupingSets; ++i) + { + ExecAssignExprContext(estate, &aggstate->ss.ps); + aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext; + } + + if (use_hashing) + aggstate->hashcontext = CreateWorkExprContext(estate); + + ExecAssignExprContext(estate, &aggstate->ss.ps); + + /* + * Initialize child nodes. + * + * If we are doing a hashed aggregation then the child plan does not need + * to handle REWIND efficiently; see ExecReScanAgg. + */ + if (node->aggstrategy == AGG_HASHED) + eflags &= ~EXEC_FLAG_REWIND; + outerPlan = outerPlan(node); + outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags); + + /* + * initialize source tuple type. + */ + aggstate->ss.ps.outerops = + ExecGetResultSlotOps(outerPlanState(&aggstate->ss), + &aggstate->ss.ps.outeropsfixed); + aggstate->ss.ps.outeropsset = true; + + ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss, + aggstate->ss.ps.outerops); + scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; + + /* + * If there are more than two phases (including a potential dummy phase + * 0), input will be resorted using tuplesort. Need a slot for that. + */ + if (numPhases > 2) + { + aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsMinimalTuple); + + /* + * The output of the tuplesort, and the output from the outer child + * might not use the same type of slot. In most cases the child will + * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the + * input can also be presorted due an index, in which case it could be + * a different type of slot. + * + * XXX: For efficiency it would be good to instead/additionally + * generate expressions with corresponding settings of outerops* for + * the individual phases - deforming is often a bottleneck for + * aggregations with lots of rows per group. If there's multiple + * sorts, we know that all but the first use TTSOpsMinimalTuple (via + * the nodeAgg.c internal tuplesort). + */ + if (aggstate->ss.ps.outeropsfixed && + aggstate->ss.ps.outerops != &TTSOpsMinimalTuple) + aggstate->ss.ps.outeropsfixed = false; + } + + /* + * Initialize result type, slot and projection. + */ + ExecInitResultTupleSlotTL(&aggstate->ss.ps, &TTSOpsVirtual); + ExecAssignProjectionInfo(&aggstate->ss.ps, NULL); + + /* + * initialize child expressions + * + * We expect the parser to have checked that no aggs contain other agg + * calls in their arguments (and just to be sure, we verify it again while + * initializing the plan node). This would make no sense under SQL + * semantics, and it's forbidden by the spec. Because it is true, we + * don't need to worry about evaluating the aggs in any particular order. + * + * Note: execExpr.c finds Aggrefs for us, and adds them to aggstate->aggs. + * Aggrefs in the qual are found here; Aggrefs in the targetlist are found + * during ExecAssignProjectionInfo, above. + */ + aggstate->ss.ps.qual = + ExecInitQual(node->plan.qual, (PlanState *) aggstate); + + /* + * We should now have found all Aggrefs in the targetlist and quals. + */ + numaggrefs = list_length(aggstate->aggs); + max_aggno = -1; + max_transno = -1; + foreach(l, aggstate->aggs) + { + Aggref *aggref = (Aggref *) lfirst(l); + + max_aggno = Max(max_aggno, aggref->aggno); + max_transno = Max(max_transno, aggref->aggtransno); + } + numaggs = max_aggno + 1; + numtrans = max_transno + 1; + + /* + * For each phase, prepare grouping set data and fmgr lookup data for + * compare functions. Accumulate all_grouped_cols in passing. + */ + aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData)); + + aggstate->num_hashes = numHashes; + if (numHashes) + { + aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes); + aggstate->phases[0].numsets = 0; + aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int)); + aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *)); + } + + phase = 0; + for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx) + { + Agg *aggnode; + Sort *sortnode; + + if (phaseidx > 0) + { + aggnode = list_nth_node(Agg, node->chain, phaseidx - 1); + sortnode = castNode(Sort, aggnode->plan.lefttree); + } + else + { + aggnode = node; + sortnode = NULL; + } + + Assert(phase <= 1 || sortnode); + + if (aggnode->aggstrategy == AGG_HASHED + || aggnode->aggstrategy == AGG_MIXED) + { + AggStatePerPhase phasedata = &aggstate->phases[0]; + AggStatePerHash perhash; + Bitmapset *cols = NULL; + + Assert(phase == 0); + i = phasedata->numsets++; + perhash = &aggstate->perhash[i]; + + /* phase 0 always points to the "real" Agg in the hash case */ + phasedata->aggnode = node; + phasedata->aggstrategy = node->aggstrategy; + + /* but the actual Agg node representing this hash is saved here */ + perhash->aggnode = aggnode; + + phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols; + + for (j = 0; j < aggnode->numCols; ++j) + cols = bms_add_member(cols, aggnode->grpColIdx[j]); + + phasedata->grouped_cols[i] = cols; + + all_grouped_cols = bms_add_members(all_grouped_cols, cols); + continue; + } + else + { + AggStatePerPhase phasedata = &aggstate->phases[++phase]; + int num_sets; + + phasedata->numsets = num_sets = list_length(aggnode->groupingSets); + + if (num_sets) + { + phasedata->gset_lengths = palloc(num_sets * sizeof(int)); + phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *)); + + i = 0; + foreach(l, aggnode->groupingSets) + { + int current_length = list_length(lfirst(l)); + Bitmapset *cols = NULL; + + /* planner forces this to be correct */ + for (j = 0; j < current_length; ++j) + cols = bms_add_member(cols, aggnode->grpColIdx[j]); + + phasedata->grouped_cols[i] = cols; + phasedata->gset_lengths[i] = current_length; + + ++i; + } + + all_grouped_cols = bms_add_members(all_grouped_cols, + phasedata->grouped_cols[0]); + } + else + { + Assert(phaseidx == 0); + + phasedata->gset_lengths = NULL; + phasedata->grouped_cols = NULL; + } + + /* + * If we are grouping, precompute fmgr lookup data for inner loop. + */ + if (aggnode->aggstrategy == AGG_SORTED) + { + int i = 0; + + Assert(aggnode->numCols > 0); + + /* + * Build a separate function for each subset of columns that + * need to be compared. + */ + phasedata->eqfunctions = + (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *)); + + /* for each grouping set */ + for (i = 0; i < phasedata->numsets; i++) + { + int length = phasedata->gset_lengths[i]; + + if (phasedata->eqfunctions[length - 1] != NULL) + continue; + + phasedata->eqfunctions[length - 1] = + execTuplesMatchPrepare(scanDesc, + length, + aggnode->grpColIdx, + aggnode->grpOperators, + aggnode->grpCollations, + (PlanState *) aggstate); + } + + /* and for all grouped columns, unless already computed */ + if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL) + { + phasedata->eqfunctions[aggnode->numCols - 1] = + execTuplesMatchPrepare(scanDesc, + aggnode->numCols, + aggnode->grpColIdx, + aggnode->grpOperators, + aggnode->grpCollations, + (PlanState *) aggstate); + } + } + + phasedata->aggnode = aggnode; + phasedata->aggstrategy = aggnode->aggstrategy; + phasedata->sortnode = sortnode; + } + } + + /* + * Convert all_grouped_cols to a descending-order list. + */ + i = -1; + while ((i = bms_next_member(all_grouped_cols, i)) >= 0) + aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols); + + /* + * Set up aggregate-result storage in the output expr context, and also + * allocate my private per-agg working storage + */ + econtext = aggstate->ss.ps.ps_ExprContext; + econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs); + econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs); + + peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs); + pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numtrans); + + aggstate->peragg = peraggs; + aggstate->pertrans = pertransstates; + + + aggstate->all_pergroups = + (AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup) + * (numGroupingSets + numHashes)); + pergroups = aggstate->all_pergroups; + + if (node->aggstrategy != AGG_HASHED) + { + for (i = 0; i < numGroupingSets; i++) + { + pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) + * numaggs); + } + + aggstate->pergroups = pergroups; + pergroups += numGroupingSets; + } + + /* + * Hashing can only appear in the initial phase. + */ + if (use_hashing) + { + Plan *outerplan = outerPlan(node); + uint64 totalGroups = 0; + int i; + + aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt, + "HashAgg meta context", + ALLOCSET_DEFAULT_SIZES); + aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsMinimalTuple); + aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsVirtual); + + /* this is an array of pointers, not structures */ + aggstate->hash_pergroup = pergroups; + + aggstate->hashentrysize = hash_agg_entry_size(aggstate->numtrans, + outerplan->plan_width, + node->transitionSpace); + + /* + * Consider all of the grouping sets together when setting the limits + * and estimating the number of partitions. This can be inaccurate + * when there is more than one grouping set, but should still be + * reasonable. + */ + for (i = 0; i < aggstate->num_hashes; i++) + totalGroups += aggstate->perhash[i].aggnode->numGroups; + + hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0, + &aggstate->hash_mem_limit, + &aggstate->hash_ngroups_limit, + &aggstate->hash_planned_partitions); + find_hash_columns(aggstate); + + /* Skip massive memory allocation if we are just doing EXPLAIN */ + if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY)) + build_hash_tables(aggstate); + + aggstate->table_filled = false; + + /* Initialize this to 1, meaning nothing spilled, yet */ + aggstate->hash_batches_used = 1; + } + + /* + * Initialize current phase-dependent values to initial phase. The initial + * phase is 1 (first sort pass) for all strategies that use sorting (if + * hashing is being done too, then phase 0 is processed last); but if only + * hashing is being done, then phase 0 is all there is. + */ + if (node->aggstrategy == AGG_HASHED) + { + aggstate->current_phase = 0; + initialize_phase(aggstate, 0); + select_current_set(aggstate, 0, true); + } + else + { + aggstate->current_phase = 1; + initialize_phase(aggstate, 1); + select_current_set(aggstate, 0, false); + } + + /* + * Perform lookups of aggregate function info, and initialize the + * unchanging fields of the per-agg and per-trans data. + */ + foreach(l, aggstate->aggs) + { + Aggref *aggref = lfirst(l); + AggStatePerAgg peragg; + AggStatePerTrans pertrans; + Oid inputTypes[FUNC_MAX_ARGS]; + int numArguments; + int numDirectArgs; + HeapTuple aggTuple; + Form_pg_aggregate aggform; + AclResult aclresult; + Oid finalfn_oid; + Oid serialfn_oid, + deserialfn_oid; + Oid aggOwner; + Expr *finalfnexpr; + Oid aggtranstype; + + /* Planner should have assigned aggregate to correct level */ + Assert(aggref->agglevelsup == 0); + /* ... and the split mode should match */ + Assert(aggref->aggsplit == aggstate->aggsplit); + + peragg = &peraggs[aggref->aggno]; + + /* Check if we initialized the state for this aggregate already. */ + if (peragg->aggref != NULL) + continue; + + peragg->aggref = aggref; + peragg->transno = aggref->aggtransno; + + /* Fetch the pg_aggregate row */ + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + aggref->aggfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + + /* Check permission to call aggregate function */ + aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(), + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_AGGREGATE, + get_func_name(aggref->aggfnoid)); + InvokeFunctionExecuteHook(aggref->aggfnoid); + + /* planner recorded transition state type in the Aggref itself */ + aggtranstype = aggref->aggtranstype; + Assert(OidIsValid(aggtranstype)); + + /* Final function only required if we're finalizing the aggregates */ + if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)) + peragg->finalfn_oid = finalfn_oid = InvalidOid; + else + peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn; + + serialfn_oid = InvalidOid; + deserialfn_oid = InvalidOid; + + /* + * Check if serialization/deserialization is required. We only do it + * for aggregates that have transtype INTERNAL. + */ + if (aggtranstype == INTERNALOID) + { + /* + * The planner should only have generated a serialize agg node if + * every aggregate with an INTERNAL state has a serialization + * function. Verify that. + */ + if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit)) + { + /* serialization only valid when not running finalfn */ + Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit)); + + if (!OidIsValid(aggform->aggserialfn)) + elog(ERROR, "serialfunc not provided for serialization aggregation"); + serialfn_oid = aggform->aggserialfn; + } + + /* Likewise for deserialization functions */ + if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit)) + { + /* deserialization only valid when combining states */ + Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit)); + + if (!OidIsValid(aggform->aggdeserialfn)) + elog(ERROR, "deserialfunc not provided for deserialization aggregation"); + deserialfn_oid = aggform->aggdeserialfn; + } + } + + /* Check that aggregate owner has permission to call component fns */ + { + HeapTuple procTuple; + + procTuple = SearchSysCache1(PROCOID, + ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(procTuple)) + elog(ERROR, "cache lookup failed for function %u", + aggref->aggfnoid); + aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner; + ReleaseSysCache(procTuple); + + if (OidIsValid(finalfn_oid)) + { + aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FUNCTION, + get_func_name(finalfn_oid)); + InvokeFunctionExecuteHook(finalfn_oid); + } + if (OidIsValid(serialfn_oid)) + { + aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner, + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FUNCTION, + get_func_name(serialfn_oid)); + InvokeFunctionExecuteHook(serialfn_oid); + } + if (OidIsValid(deserialfn_oid)) + { + aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner, + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FUNCTION, + get_func_name(deserialfn_oid)); + InvokeFunctionExecuteHook(deserialfn_oid); + } + } + + /* + * Get actual datatypes of the (nominal) aggregate inputs. These + * could be different from the agg's declared input types, when the + * agg accepts ANY or a polymorphic type. + */ + numArguments = get_aggregate_argtypes(aggref, inputTypes); + + /* Count the "direct" arguments, if any */ + numDirectArgs = list_length(aggref->aggdirectargs); + + /* Detect how many arguments to pass to the finalfn */ + if (aggform->aggfinalextra) + peragg->numFinalArgs = numArguments + 1; + else + peragg->numFinalArgs = numDirectArgs + 1; + + /* Initialize any direct-argument expressions */ + peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs, + (PlanState *) aggstate); + + /* + * build expression trees using actual argument & result types for the + * finalfn, if it exists and is required. + */ + if (OidIsValid(finalfn_oid)) + { + build_aggregate_finalfn_expr(inputTypes, + peragg->numFinalArgs, + aggtranstype, + aggref->aggtype, + aggref->inputcollid, + finalfn_oid, + &finalfnexpr); + fmgr_info(finalfn_oid, &peragg->finalfn); + fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn); + } + + /* get info about the output value's datatype */ + get_typlenbyval(aggref->aggtype, + &peragg->resulttypeLen, + &peragg->resulttypeByVal); + + /* + * Build working state for invoking the transition function, if we + * haven't done it already. + */ + pertrans = &pertransstates[aggref->aggtransno]; + if (pertrans->aggref == NULL) + { + Datum textInitVal; + Datum initValue; + bool initValueIsNull; + Oid transfn_oid; + + /* + * If this aggregation is performing state combines, then instead + * of using the transition function, we'll use the combine + * function + */ + if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) + { + transfn_oid = aggform->aggcombinefn; + + /* If not set then the planner messed up */ + if (!OidIsValid(transfn_oid)) + elog(ERROR, "combinefn not set for aggregate function"); + } + else + transfn_oid = aggform->aggtransfn; + + aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FUNCTION, + get_func_name(transfn_oid)); + InvokeFunctionExecuteHook(transfn_oid); + + /* + * initval is potentially null, so don't try to access it as a + * struct field. Must do it the hard way with SysCacheGetAttr. + */ + textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, + Anum_pg_aggregate_agginitval, + &initValueIsNull); + if (initValueIsNull) + initValue = (Datum) 0; + else + initValue = GetAggInitVal(textInitVal, aggtranstype); + + build_pertrans_for_aggref(pertrans, aggstate, estate, + aggref, transfn_oid, aggtranstype, + serialfn_oid, deserialfn_oid, + initValue, initValueIsNull, + inputTypes, numArguments); + } + else + pertrans->aggshared = true; + ReleaseSysCache(aggTuple); + } + + /* + * Update aggstate->numaggs to be the number of unique aggregates found. + * Also set numstates to the number of unique transition states found. + */ + aggstate->numaggs = numaggs; + aggstate->numtrans = numtrans; + + /* + * Last, check whether any more aggregates got added onto the node while + * we processed the expressions for the aggregate arguments (including not + * only the regular arguments and FILTER expressions handled immediately + * above, but any direct arguments we might've handled earlier). If so, + * we have nested aggregate functions, which is semantically nonsensical, + * so complain. (This should have been caught by the parser, so we don't + * need to work hard on a helpful error message; but we defend against it + * here anyway, just to be sure.) + */ + if (numaggrefs != list_length(aggstate->aggs)) + ereport(ERROR, + (errcode(ERRCODE_GROUPING_ERROR), + errmsg("aggregate function calls cannot be nested"))); + + /* + * Build expressions doing all the transition work at once. We build a + * different one for each phase, as the number of transition function + * invocation can differ between phases. Note this'll work both for + * transition and combination functions (although there'll only be one + * phase in the latter case). + */ + for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++) + { + AggStatePerPhase phase = &aggstate->phases[phaseidx]; + bool dohash = false; + bool dosort = false; + + /* phase 0 doesn't necessarily exist */ + if (!phase->aggnode) + continue; + + if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1) + { + /* + * Phase one, and only phase one, in a mixed agg performs both + * sorting and aggregation. + */ + dohash = true; + dosort = true; + } + else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0) + { + /* + * No need to compute a transition function for an AGG_MIXED phase + * 0 - the contents of the hashtables will have been computed + * during phase 1. + */ + continue; + } + else if (phase->aggstrategy == AGG_PLAIN || + phase->aggstrategy == AGG_SORTED) + { + dohash = false; + dosort = true; + } + else if (phase->aggstrategy == AGG_HASHED) + { + dohash = true; + dosort = false; + } + else + Assert(false); + + phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash, + false); + + /* cache compiled expression for outer slot without NULL check */ + phase->evaltrans_cache[0][0] = phase->evaltrans; + } + + return aggstate; +} + +/* + * Build the state needed to calculate a state value for an aggregate. + * + * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate + * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest + * of the arguments could be calculated from 'aggref', but the caller has + * calculated them already, so might as well pass them. + */ +static void +build_pertrans_for_aggref(AggStatePerTrans pertrans, + AggState *aggstate, EState *estate, + Aggref *aggref, + Oid aggtransfn, Oid aggtranstype, + Oid aggserialfn, Oid aggdeserialfn, + Datum initValue, bool initValueIsNull, + Oid *inputTypes, int numArguments) +{ + int numGroupingSets = Max(aggstate->maxsets, 1); + Expr *serialfnexpr = NULL; + Expr *deserialfnexpr = NULL; + ListCell *lc; + int numInputs; + int numDirectArgs; + List *sortlist; + int numSortCols; + int numDistinctCols; + int i; + + /* Begin filling in the pertrans data */ + pertrans->aggref = aggref; + pertrans->aggshared = false; + pertrans->aggCollation = aggref->inputcollid; + pertrans->transfn_oid = aggtransfn; + pertrans->serialfn_oid = aggserialfn; + pertrans->deserialfn_oid = aggdeserialfn; + pertrans->initValue = initValue; + pertrans->initValueIsNull = initValueIsNull; + + /* Count the "direct" arguments, if any */ + numDirectArgs = list_length(aggref->aggdirectargs); + + /* Count the number of aggregated input columns */ + pertrans->numInputs = numInputs = list_length(aggref->args); + + pertrans->aggtranstype = aggtranstype; + + /* + * When combining states, we have no use at all for the aggregate + * function's transfn. Instead we use the combinefn. In this case, the + * transfn and transfn_oid fields of pertrans refer to the combine + * function rather than the transition function. + */ + if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) + { + Expr *combinefnexpr; + size_t numTransArgs; + + /* + * When combining there's only one input, the to-be-combined added + * transition value from below (this node's transition value is + * counted separately). + */ + pertrans->numTransInputs = 1; + + /* account for the current transition state */ + numTransArgs = pertrans->numTransInputs + 1; + + build_aggregate_combinefn_expr(aggtranstype, + aggref->inputcollid, + aggtransfn, + &combinefnexpr); + fmgr_info(aggtransfn, &pertrans->transfn); + fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn); + + pertrans->transfn_fcinfo = + (FunctionCallInfo) palloc(SizeForFunctionCallInfo(2)); + InitFunctionCallInfoData(*pertrans->transfn_fcinfo, + &pertrans->transfn, + numTransArgs, + pertrans->aggCollation, + (void *) aggstate, NULL); + + /* + * Ensure that a combine function to combine INTERNAL states is not + * strict. This should have been checked during CREATE AGGREGATE, but + * the strict property could have been changed since then. + */ + if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("combine function with transition type %s must not be declared STRICT", + format_type_be(aggtranstype)))); + } + else + { + Expr *transfnexpr; + size_t numTransArgs; + + /* Detect how many arguments to pass to the transfn */ + if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) + pertrans->numTransInputs = numInputs; + else + pertrans->numTransInputs = numArguments; + + /* account for the current transition state */ + numTransArgs = pertrans->numTransInputs + 1; + + /* + * Set up infrastructure for calling the transfn. Note that + * invtransfn is not needed here. + */ + build_aggregate_transfn_expr(inputTypes, + numArguments, + numDirectArgs, + aggref->aggvariadic, + aggtranstype, + aggref->inputcollid, + aggtransfn, + InvalidOid, + &transfnexpr, + NULL); + fmgr_info(aggtransfn, &pertrans->transfn); + fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn); + + pertrans->transfn_fcinfo = + (FunctionCallInfo) palloc(SizeForFunctionCallInfo(numTransArgs)); + InitFunctionCallInfoData(*pertrans->transfn_fcinfo, + &pertrans->transfn, + numTransArgs, + pertrans->aggCollation, + (void *) aggstate, NULL); + + /* + * If the transfn is strict and the initval is NULL, make sure input + * type and transtype are the same (or at least binary-compatible), so + * that it's OK to use the first aggregated input value as the initial + * transValue. This should have been checked at agg definition time, + * but we must check again in case the transfn's strictness property + * has been changed. + */ + if (pertrans->transfn.fn_strict && pertrans->initValueIsNull) + { + if (numArguments <= numDirectArgs || + !IsBinaryCoercible(inputTypes[numDirectArgs], + aggtranstype)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate %u needs to have compatible input type and transition type", + aggref->aggfnoid))); + } + } + + /* get info about the state value's datatype */ + get_typlenbyval(aggtranstype, + &pertrans->transtypeLen, + &pertrans->transtypeByVal); + + if (OidIsValid(aggserialfn)) + { + build_aggregate_serialfn_expr(aggserialfn, + &serialfnexpr); + fmgr_info(aggserialfn, &pertrans->serialfn); + fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn); + + pertrans->serialfn_fcinfo = + (FunctionCallInfo) palloc(SizeForFunctionCallInfo(1)); + InitFunctionCallInfoData(*pertrans->serialfn_fcinfo, + &pertrans->serialfn, + 1, + InvalidOid, + (void *) aggstate, NULL); + } + + if (OidIsValid(aggdeserialfn)) + { + build_aggregate_deserialfn_expr(aggdeserialfn, + &deserialfnexpr); + fmgr_info(aggdeserialfn, &pertrans->deserialfn); + fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn); + + pertrans->deserialfn_fcinfo = + (FunctionCallInfo) palloc(SizeForFunctionCallInfo(2)); + InitFunctionCallInfoData(*pertrans->deserialfn_fcinfo, + &pertrans->deserialfn, + 2, + InvalidOid, + (void *) aggstate, NULL); + + } + + /* + * If we're doing either DISTINCT or ORDER BY for a plain agg, then we + * have a list of SortGroupClause nodes; fish out the data in them and + * stick them into arrays. We ignore ORDER BY for an ordered-set agg, + * however; the agg's transfn and finalfn are responsible for that. + * + * Note that by construction, if there is a DISTINCT clause then the ORDER + * BY clause is a prefix of it (see transformDistinctClause). + */ + if (AGGKIND_IS_ORDERED_SET(aggref->aggkind)) + { + sortlist = NIL; + numSortCols = numDistinctCols = 0; + } + else if (aggref->aggdistinct) + { + sortlist = aggref->aggdistinct; + numSortCols = numDistinctCols = list_length(sortlist); + Assert(numSortCols >= list_length(aggref->aggorder)); + } + else + { + sortlist = aggref->aggorder; + numSortCols = list_length(sortlist); + numDistinctCols = 0; + } + + pertrans->numSortCols = numSortCols; + pertrans->numDistinctCols = numDistinctCols; + + /* + * If we have either sorting or filtering to do, create a tupledesc and + * slot corresponding to the aggregated inputs (including sort + * expressions) of the agg. + */ + if (numSortCols > 0 || aggref->aggfilter) + { + pertrans->sortdesc = ExecTypeFromTL(aggref->args); + pertrans->sortslot = + ExecInitExtraTupleSlot(estate, pertrans->sortdesc, + &TTSOpsMinimalTuple); + } + + if (numSortCols > 0) + { + /* + * We don't implement DISTINCT or ORDER BY aggs in the HASHED case + * (yet) + */ + Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED); + + /* If we have only one input, we need its len/byval info. */ + if (numInputs == 1) + { + get_typlenbyval(inputTypes[numDirectArgs], + &pertrans->inputtypeLen, + &pertrans->inputtypeByVal); + } + else if (numDistinctCols > 0) + { + /* we will need an extra slot to store prior values */ + pertrans->uniqslot = + ExecInitExtraTupleSlot(estate, pertrans->sortdesc, + &TTSOpsMinimalTuple); + } + + /* Extract the sort information for use later */ + pertrans->sortColIdx = + (AttrNumber *) palloc(numSortCols * sizeof(AttrNumber)); + pertrans->sortOperators = + (Oid *) palloc(numSortCols * sizeof(Oid)); + pertrans->sortCollations = + (Oid *) palloc(numSortCols * sizeof(Oid)); + pertrans->sortNullsFirst = + (bool *) palloc(numSortCols * sizeof(bool)); + + i = 0; + foreach(lc, sortlist) + { + SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc); + TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args); + + /* the parser should have made sure of this */ + Assert(OidIsValid(sortcl->sortop)); + + pertrans->sortColIdx[i] = tle->resno; + pertrans->sortOperators[i] = sortcl->sortop; + pertrans->sortCollations[i] = exprCollation((Node *) tle->expr); + pertrans->sortNullsFirst[i] = sortcl->nulls_first; + i++; + } + Assert(i == numSortCols); + } + + if (aggref->aggdistinct) + { + Oid *ops; + + Assert(numArguments > 0); + Assert(list_length(aggref->aggdistinct) == numDistinctCols); + + ops = palloc(numDistinctCols * sizeof(Oid)); + + i = 0; + foreach(lc, aggref->aggdistinct) + ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop; + + /* lookup / build the necessary comparators */ + if (numDistinctCols == 1) + fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne); + else + pertrans->equalfnMulti = + execTuplesMatchPrepare(pertrans->sortdesc, + numDistinctCols, + pertrans->sortColIdx, + ops, + pertrans->sortCollations, + &aggstate->ss.ps); + pfree(ops); + } + + pertrans->sortstates = (Tuplesortstate **) + palloc0(sizeof(Tuplesortstate *) * numGroupingSets); +} + + +static Datum +GetAggInitVal(Datum textInitVal, Oid transtype) +{ + Oid typinput, + typioparam; + char *strInitVal; + Datum initVal; + + getTypeInputInfo(transtype, &typinput, &typioparam); + strInitVal = TextDatumGetCString(textInitVal); + initVal = OidInputFunctionCall(typinput, strInitVal, + typioparam, -1); + pfree(strInitVal); + return initVal; +} + +void +ExecEndAgg(AggState *node) +{ + PlanState *outerPlan; + int transno; + int numGroupingSets = Max(node->maxsets, 1); + int setno; + + /* + * When ending a parallel worker, copy the statistics gathered by the + * worker back into shared memory so that it can be picked up by the main + * process to report in EXPLAIN ANALYZE. + */ + if (node->shared_info && IsParallelWorker()) + { + AggregateInstrumentation *si; + + Assert(ParallelWorkerNumber <= node->shared_info->num_workers); + si = &node->shared_info->sinstrument[ParallelWorkerNumber]; + si->hash_batches_used = node->hash_batches_used; + si->hash_disk_used = node->hash_disk_used; + si->hash_mem_peak = node->hash_mem_peak; + } + + /* Make sure we have closed any open tuplesorts */ + + if (node->sort_in) + tuplesort_end(node->sort_in); + if (node->sort_out) + tuplesort_end(node->sort_out); + + hashagg_reset_spill_state(node); + + if (node->hash_metacxt != NULL) + { + MemoryContextDelete(node->hash_metacxt); + node->hash_metacxt = NULL; + } + + for (transno = 0; transno < node->numtrans; transno++) + { + AggStatePerTrans pertrans = &node->pertrans[transno]; + + for (setno = 0; setno < numGroupingSets; setno++) + { + if (pertrans->sortstates[setno]) + tuplesort_end(pertrans->sortstates[setno]); + } + } + + /* And ensure any agg shutdown callbacks have been called */ + for (setno = 0; setno < numGroupingSets; setno++) + ReScanExprContext(node->aggcontexts[setno]); + if (node->hashcontext) + ReScanExprContext(node->hashcontext); + + /* + * We don't actually free any ExprContexts here (see comment in + * ExecFreeExprContext), just unlinking the output one from the plan node + * suffices. + */ + ExecFreeExprContext(&node->ss.ps); + + /* clean up tuple table */ + ExecClearTuple(node->ss.ss_ScanTupleSlot); + + outerPlan = outerPlanState(node); + ExecEndNode(outerPlan); +} + +void +ExecReScanAgg(AggState *node) +{ + ExprContext *econtext = node->ss.ps.ps_ExprContext; + PlanState *outerPlan = outerPlanState(node); + Agg *aggnode = (Agg *) node->ss.ps.plan; + int transno; + int numGroupingSets = Max(node->maxsets, 1); + int setno; + + node->agg_done = false; + + if (node->aggstrategy == AGG_HASHED) + { + /* + * In the hashed case, if we haven't yet built the hash table then we + * can just return; nothing done yet, so nothing to undo. If subnode's + * chgParam is not NULL then it will be re-scanned by ExecProcNode, + * else no reason to re-scan it at all. + */ + if (!node->table_filled) + return; + + /* + * If we do have the hash table, and it never spilled, and the subplan + * does not have any parameter changes, and none of our own parameter + * changes affect input expressions of the aggregated functions, then + * we can just rescan the existing hash table; no need to build it + * again. + */ + if (outerPlan->chgParam == NULL && !node->hash_ever_spilled && + !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams)) + { + ResetTupleHashIterator(node->perhash[0].hashtable, + &node->perhash[0].hashiter); + select_current_set(node, 0, true); + return; + } + } + + /* Make sure we have closed any open tuplesorts */ + for (transno = 0; transno < node->numtrans; transno++) + { + for (setno = 0; setno < numGroupingSets; setno++) + { + AggStatePerTrans pertrans = &node->pertrans[transno]; + + if (pertrans->sortstates[setno]) + { + tuplesort_end(pertrans->sortstates[setno]); + pertrans->sortstates[setno] = NULL; + } + } + } + + /* + * We don't need to ReScanExprContext the output tuple context here; + * ExecReScan already did it. But we do need to reset our per-grouping-set + * contexts, which may have transvalues stored in them. (We use rescan + * rather than just reset because transfns may have registered callbacks + * that need to be run now.) For the AGG_HASHED case, see below. + */ + + for (setno = 0; setno < numGroupingSets; setno++) + { + ReScanExprContext(node->aggcontexts[setno]); + } + + /* Release first tuple of group, if we have made a copy */ + if (node->grp_firstTuple != NULL) + { + heap_freetuple(node->grp_firstTuple); + node->grp_firstTuple = NULL; + } + ExecClearTuple(node->ss.ss_ScanTupleSlot); + + /* Forget current agg values */ + MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs); + MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs); + + /* + * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of + * the hashcontext. This used to be an issue, but now, resetting a context + * automatically deletes sub-contexts too. + */ + if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED) + { + hashagg_reset_spill_state(node); + + node->hash_ever_spilled = false; + node->hash_spill_mode = false; + node->hash_ngroups_current = 0; + + ReScanExprContext(node->hashcontext); + /* Rebuild an empty hash table */ + build_hash_tables(node); + node->table_filled = false; + /* iterator will be reset when the table is filled */ + + hashagg_recompile_expressions(node, false, false); + } + + if (node->aggstrategy != AGG_HASHED) + { + /* + * Reset the per-group state (in particular, mark transvalues null) + */ + for (setno = 0; setno < numGroupingSets; setno++) + { + MemSet(node->pergroups[setno], 0, + sizeof(AggStatePerGroupData) * node->numaggs); + } + + /* reset to phase 1 */ + initialize_phase(node, 1); + + node->input_done = false; + node->projected_set = -1; + } + + if (outerPlan->chgParam == NULL) + ExecReScan(outerPlan); +} + + +/*********************************************************************** + * API exposed to aggregate functions + ***********************************************************************/ + + +/* + * AggCheckCallContext - test if a SQL function is being called as an aggregate + * + * The transition and/or final functions of an aggregate may want to verify + * that they are being called as aggregates, rather than as plain SQL + * functions. They should use this function to do so. The return value + * is nonzero if being called as an aggregate, or zero if not. (Specific + * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more + * values could conceivably appear in future.) + * + * If aggcontext isn't NULL, the function also stores at *aggcontext the + * identity of the memory context that aggregate transition values are being + * stored in. Note that the same aggregate call site (flinfo) may be called + * interleaved on different transition values in different contexts, so it's + * not kosher to cache aggcontext under fn_extra. It is, however, kosher to + * cache it in the transvalue itself (for internal-type transvalues). + */ +int +AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext) +{ + if (fcinfo->context && IsA(fcinfo->context, AggState)) + { + if (aggcontext) + { + AggState *aggstate = ((AggState *) fcinfo->context); + ExprContext *cxt = aggstate->curaggcontext; + + *aggcontext = cxt->ecxt_per_tuple_memory; + } + return AGG_CONTEXT_AGGREGATE; + } + if (fcinfo->context && IsA(fcinfo->context, WindowAggState)) + { + if (aggcontext) + *aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext; + return AGG_CONTEXT_WINDOW; + } + + /* this is just to prevent "uninitialized variable" warnings */ + if (aggcontext) + *aggcontext = NULL; + return 0; +} + +/* + * AggGetAggref - allow an aggregate support function to get its Aggref + * + * If the function is being called as an aggregate support function, + * return the Aggref node for the aggregate call. Otherwise, return NULL. + * + * Aggregates sharing the same inputs and transition functions can get + * merged into a single transition calculation. If the transition function + * calls AggGetAggref, it will get some one of the Aggrefs for which it is + * executing. It must therefore not pay attention to the Aggref fields that + * relate to the final function, as those are indeterminate. But if a final + * function calls AggGetAggref, it will get a precise result. + * + * Note that if an aggregate is being used as a window function, this will + * return NULL. We could provide a similar function to return the relevant + * WindowFunc node in such cases, but it's not needed yet. + */ +Aggref * +AggGetAggref(FunctionCallInfo fcinfo) +{ + if (fcinfo->context && IsA(fcinfo->context, AggState)) + { + AggState *aggstate = (AggState *) fcinfo->context; + AggStatePerAgg curperagg; + AggStatePerTrans curpertrans; + + /* check curperagg (valid when in a final function) */ + curperagg = aggstate->curperagg; + + if (curperagg) + return curperagg->aggref; + + /* check curpertrans (valid when in a transition function) */ + curpertrans = aggstate->curpertrans; + + if (curpertrans) + return curpertrans->aggref; + } + return NULL; +} + +/* + * AggGetTempMemoryContext - fetch short-term memory context for aggregates + * + * This is useful in agg final functions; the context returned is one that + * the final function can safely reset as desired. This isn't useful for + * transition functions, since the context returned MAY (we don't promise) + * be the same as the context those are called in. + * + * As above, this is currently not useful for aggs called as window functions. + */ +MemoryContext +AggGetTempMemoryContext(FunctionCallInfo fcinfo) +{ + if (fcinfo->context && IsA(fcinfo->context, AggState)) + { + AggState *aggstate = (AggState *) fcinfo->context; + + return aggstate->tmpcontext->ecxt_per_tuple_memory; + } + return NULL; +} + +/* + * AggStateIsShared - find out whether transition state is shared + * + * If the function is being called as an aggregate support function, + * return true if the aggregate's transition state is shared across + * multiple aggregates, false if it is not. + * + * Returns true if not called as an aggregate support function. + * This is intended as a conservative answer, ie "no you'd better not + * scribble on your input". In particular, will return true if the + * aggregate is being used as a window function, which is a scenario + * in which changing the transition state is a bad idea. We might + * want to refine the behavior for the window case in future. + */ +bool +AggStateIsShared(FunctionCallInfo fcinfo) +{ + if (fcinfo->context && IsA(fcinfo->context, AggState)) + { + AggState *aggstate = (AggState *) fcinfo->context; + AggStatePerAgg curperagg; + AggStatePerTrans curpertrans; + + /* check curperagg (valid when in a final function) */ + curperagg = aggstate->curperagg; + + if (curperagg) + return aggstate->pertrans[curperagg->transno].aggshared; + + /* check curpertrans (valid when in a transition function) */ + curpertrans = aggstate->curpertrans; + + if (curpertrans) + return curpertrans->aggshared; + } + return true; +} + +/* + * AggRegisterCallback - register a cleanup callback for an aggregate + * + * This is useful for aggs to register shutdown callbacks, which will ensure + * that non-memory resources are freed. The callback will occur just before + * the associated aggcontext (as returned by AggCheckCallContext) is reset, + * either between groups or as a result of rescanning the query. The callback + * will NOT be called on error paths. The typical use-case is for freeing of + * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots + * created by the agg functions. (The callback will not be called until after + * the result of the finalfn is no longer needed, so it's safe for the finalfn + * to return data that will be freed by the callback.) + * + * As above, this is currently not useful for aggs called as window functions. + */ +void +AggRegisterCallback(FunctionCallInfo fcinfo, + ExprContextCallbackFunction func, + Datum arg) +{ + if (fcinfo->context && IsA(fcinfo->context, AggState)) + { + AggState *aggstate = (AggState *) fcinfo->context; + ExprContext *cxt = aggstate->curaggcontext; + + RegisterExprContextCallback(cxt, func, arg); + + return; + } + elog(ERROR, "aggregate function cannot register a callback in this context"); +} + + +/* ---------------------------------------------------------------- + * Parallel Query Support + * ---------------------------------------------------------------- + */ + + /* ---------------------------------------------------------------- + * ExecAggEstimate + * + * Estimate space required to propagate aggregate statistics. + * ---------------------------------------------------------------- + */ +void +ExecAggEstimate(AggState *node, ParallelContext *pcxt) +{ + Size size; + + /* don't need this if not instrumenting or no workers */ + if (!node->ss.ps.instrument || pcxt->nworkers == 0) + return; + + size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation)); + size = add_size(size, offsetof(SharedAggInfo, sinstrument)); + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* ---------------------------------------------------------------- + * ExecAggInitializeDSM + * + * Initialize DSM space for aggregate statistics. + * ---------------------------------------------------------------- + */ +void +ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt) +{ + Size size; + + /* don't need this if not instrumenting or no workers */ + if (!node->ss.ps.instrument || pcxt->nworkers == 0) + return; + + size = offsetof(SharedAggInfo, sinstrument) + + pcxt->nworkers * sizeof(AggregateInstrumentation); + node->shared_info = shm_toc_allocate(pcxt->toc, size); + /* ensure any unfilled slots will contain zeroes */ + memset(node->shared_info, 0, size); + node->shared_info->num_workers = pcxt->nworkers; + shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, + node->shared_info); +} + +/* ---------------------------------------------------------------- + * ExecAggInitializeWorker + * + * Attach worker to DSM space for aggregate statistics. + * ---------------------------------------------------------------- + */ +void +ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt) +{ + node->shared_info = + shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true); +} + +/* ---------------------------------------------------------------- + * ExecAggRetrieveInstrumentation + * + * Transfer aggregate statistics from DSM to private memory. + * ---------------------------------------------------------------- + */ +void +ExecAggRetrieveInstrumentation(AggState *node) +{ + Size size; + SharedAggInfo *si; + + if (node->shared_info == NULL) + return; + + size = offsetof(SharedAggInfo, sinstrument) + + node->shared_info->num_workers * sizeof(AggregateInstrumentation); + si = palloc(size); + memcpy(si, node->shared_info, size); + node->shared_info = si; +} |