diff options
Diffstat (limited to 'src/backend/executor/nodeWindowAgg.c')
-rw-r--r-- | src/backend/executor/nodeWindowAgg.c | 3463 |
1 files changed, 3463 insertions, 0 deletions
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c new file mode 100644 index 0000000..f8ea9e9 --- /dev/null +++ b/src/backend/executor/nodeWindowAgg.c @@ -0,0 +1,3463 @@ +/*------------------------------------------------------------------------- + * + * nodeWindowAgg.c + * routines to handle WindowAgg nodes. + * + * A WindowAgg node evaluates "window functions" across suitable partitions + * of the input tuple set. Any one WindowAgg works for just a single window + * specification, though it can evaluate multiple window functions sharing + * identical window specifications. The input tuples are required to be + * delivered in sorted order, with the PARTITION BY columns (if any) as + * major sort keys and the ORDER BY columns (if any) as minor sort keys. + * (The planner generates a stack of WindowAggs with intervening Sort nodes + * as needed, if a query involves more than one window specification.) + * + * Since window functions can require access to any or all of the rows in + * the current partition, we accumulate rows of the partition into a + * tuplestore. The window functions are called using the WindowObject API + * so that they can access those rows as needed. + * + * We also support using plain aggregate functions as window functions. + * For these, the regular Agg-node environment is emulated for each partition. + * As required by the SQL spec, the output represents the value of the + * aggregate function over all rows in the current row's window frame. + * + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/nodeWindowAgg.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/htup_details.h" +#include "catalog/objectaccess.h" +#include "catalog/pg_aggregate.h" +#include "catalog/pg_proc.h" +#include "executor/executor.h" +#include "executor/nodeWindowAgg.h" +#include "miscadmin.h" +#include "nodes/nodeFuncs.h" +#include "optimizer/optimizer.h" +#include "parser/parse_agg.h" +#include "parser/parse_coerce.h" +#include "utils/acl.h" +#include "utils/builtins.h" +#include "utils/datum.h" +#include "utils/expandeddatum.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/regproc.h" +#include "utils/syscache.h" +#include "windowapi.h" + +/* + * All the window function APIs are called with this object, which is passed + * to window functions as fcinfo->context. + */ +typedef struct WindowObjectData +{ + NodeTag type; + WindowAggState *winstate; /* parent WindowAggState */ + List *argstates; /* ExprState trees for fn's arguments */ + void *localmem; /* WinGetPartitionLocalMemory's chunk */ + int markptr; /* tuplestore mark pointer for this fn */ + int readptr; /* tuplestore read pointer for this fn */ + int64 markpos; /* row that markptr is positioned on */ + int64 seekpos; /* row that readptr is positioned on */ +} WindowObjectData; + +/* + * We have one WindowStatePerFunc struct for each window function and + * window aggregate handled by this node. + */ +typedef struct WindowStatePerFuncData +{ + /* Links to WindowFunc expr and state nodes this working state is for */ + WindowFuncExprState *wfuncstate; + WindowFunc *wfunc; + + int numArguments; /* number of arguments */ + + FmgrInfo flinfo; /* fmgr lookup data for window function */ + + Oid winCollation; /* collation derived for window function */ + + /* + * We need the len and byval info for the result of each function in order + * to know how to copy/delete values. + */ + int16 resulttypeLen; + bool resulttypeByVal; + + bool plain_agg; /* is it just a plain aggregate function? */ + int aggno; /* if so, index of its WindowStatePerAggData */ + + WindowObject winobj; /* object used in window function API */ +} WindowStatePerFuncData; + +/* + * For plain aggregate window functions, we also have one of these. + */ +typedef struct WindowStatePerAggData +{ + /* Oids of transition functions */ + Oid transfn_oid; + Oid invtransfn_oid; /* may be InvalidOid */ + Oid finalfn_oid; /* may be InvalidOid */ + + /* + * fmgr lookup data for transition functions --- only valid when + * corresponding oid is not InvalidOid. Note in particular that fn_strict + * flags are kept here. + */ + FmgrInfo transfn; + FmgrInfo invtransfn; + FmgrInfo finalfn; + + int numFinalArgs; /* number of arguments to pass to finalfn */ + + /* + * initial value from pg_aggregate entry + */ + Datum initValue; + bool initValueIsNull; + + /* + * cached value for current frame boundaries + */ + Datum resultValue; + bool resultValueIsNull; + + /* + * We need the len and byval info for the agg's input, result, and + * transition data types in order to know how to copy/delete values. + */ + int16 inputtypeLen, + resulttypeLen, + transtypeLen; + bool inputtypeByVal, + resulttypeByVal, + transtypeByVal; + + int wfuncno; /* index of associated WindowStatePerFuncData */ + + /* Context holding transition value and possibly other subsidiary data */ + MemoryContext aggcontext; /* may be private, or winstate->aggcontext */ + + /* Current transition value */ + Datum transValue; /* current transition value */ + bool transValueIsNull; + + int64 transValueCount; /* number of currently-aggregated rows */ + + /* Data local to eval_windowaggregates() */ + bool restart; /* need to restart this agg in this cycle? */ +} WindowStatePerAggData; + +static void initialize_windowaggregate(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate); +static void advance_windowaggregate(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate); +static bool advance_windowaggregate_base(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate); +static void finalize_windowaggregate(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate, + Datum *result, bool *isnull); + +static void eval_windowaggregates(WindowAggState *winstate); +static void eval_windowfunction(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + Datum *result, bool *isnull); + +static void begin_partition(WindowAggState *winstate); +static void spool_tuples(WindowAggState *winstate, int64 pos); +static void release_partition(WindowAggState *winstate); + +static int row_is_in_frame(WindowAggState *winstate, int64 pos, + TupleTableSlot *slot); +static void update_frameheadpos(WindowAggState *winstate); +static void update_frametailpos(WindowAggState *winstate); +static void update_grouptailpos(WindowAggState *winstate); + +static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate, + WindowFunc *wfunc, + WindowStatePerAgg peraggstate); +static Datum GetAggInitVal(Datum textInitVal, Oid transtype); + +static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1, + TupleTableSlot *slot2); +static bool window_gettupleslot(WindowObject winobj, int64 pos, + TupleTableSlot *slot); + + +/* + * initialize_windowaggregate + * parallel to initialize_aggregates in nodeAgg.c + */ +static void +initialize_windowaggregate(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate) +{ + MemoryContext oldContext; + + /* + * If we're using a private aggcontext, we may reset it here. But if the + * context is shared, we don't know which other aggregates may still need + * it, so we must leave it to the caller to reset at an appropriate time. + */ + if (peraggstate->aggcontext != winstate->aggcontext) + MemoryContextResetAndDeleteChildren(peraggstate->aggcontext); + + if (peraggstate->initValueIsNull) + peraggstate->transValue = peraggstate->initValue; + else + { + oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); + peraggstate->transValue = datumCopy(peraggstate->initValue, + peraggstate->transtypeByVal, + peraggstate->transtypeLen); + MemoryContextSwitchTo(oldContext); + } + peraggstate->transValueIsNull = peraggstate->initValueIsNull; + peraggstate->transValueCount = 0; + peraggstate->resultValue = (Datum) 0; + peraggstate->resultValueIsNull = true; +} + +/* + * advance_windowaggregate + * parallel to advance_aggregates in nodeAgg.c + */ +static void +advance_windowaggregate(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate) +{ + LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS); + WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate; + int numArguments = perfuncstate->numArguments; + Datum newVal; + ListCell *arg; + int i; + MemoryContext oldContext; + ExprContext *econtext = winstate->tmpcontext; + ExprState *filter = wfuncstate->aggfilter; + + oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); + + /* Skip anything FILTERed out */ + if (filter) + { + bool isnull; + Datum res = ExecEvalExpr(filter, econtext, &isnull); + + if (isnull || !DatumGetBool(res)) + { + MemoryContextSwitchTo(oldContext); + return; + } + } + + /* We start from 1, since the 0th arg will be the transition value */ + i = 1; + foreach(arg, wfuncstate->args) + { + ExprState *argstate = (ExprState *) lfirst(arg); + + fcinfo->args[i].value = ExecEvalExpr(argstate, econtext, + &fcinfo->args[i].isnull); + i++; + } + + if (peraggstate->transfn.fn_strict) + { + /* + * For a strict transfn, nothing happens when there's a NULL input; we + * just keep the prior transValue. Note transValueCount doesn't + * change either. + */ + for (i = 1; i <= numArguments; i++) + { + if (fcinfo->args[i].isnull) + { + MemoryContextSwitchTo(oldContext); + return; + } + } + + /* + * For strict transition functions with initial value NULL we use the + * first non-NULL input as the initial state. (We already checked + * that the agg's input type is binary-compatible with its transtype, + * so straight copy here is OK.) + * + * We must copy the datum into aggcontext if it is pass-by-ref. We do + * not need to pfree the old transValue, since it's NULL. + */ + if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull) + { + MemoryContextSwitchTo(peraggstate->aggcontext); + peraggstate->transValue = datumCopy(fcinfo->args[1].value, + peraggstate->transtypeByVal, + peraggstate->transtypeLen); + peraggstate->transValueIsNull = false; + peraggstate->transValueCount = 1; + MemoryContextSwitchTo(oldContext); + return; + } + + if (peraggstate->transValueIsNull) + { + /* + * Don't call a strict function with NULL inputs. Note it is + * possible to get here despite the above tests, if the transfn is + * strict *and* returned a NULL on a prior cycle. If that happens + * we will propagate the NULL all the way to the end. That can + * only happen if there's no inverse transition function, though, + * since we disallow transitions back to NULL when there is one. + */ + MemoryContextSwitchTo(oldContext); + Assert(!OidIsValid(peraggstate->invtransfn_oid)); + return; + } + } + + /* + * OK to call the transition function. Set winstate->curaggcontext while + * calling it, for possible use by AggCheckCallContext. + */ + InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn), + numArguments + 1, + perfuncstate->winCollation, + (void *) winstate, NULL); + fcinfo->args[0].value = peraggstate->transValue; + fcinfo->args[0].isnull = peraggstate->transValueIsNull; + winstate->curaggcontext = peraggstate->aggcontext; + newVal = FunctionCallInvoke(fcinfo); + winstate->curaggcontext = NULL; + + /* + * Moving-aggregate transition functions must not return null, see + * advance_windowaggregate_base(). + */ + if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid)) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("moving-aggregate transition function must not return null"))); + + /* + * We must track the number of rows included in transValue, since to + * remove the last input, advance_windowaggregate_base() mustn't call the + * inverse transition function, but simply reset transValue back to its + * initial value. + */ + peraggstate->transValueCount++; + + /* + * If pass-by-ref datatype, must copy the new value into aggcontext and + * free the prior transValue. But if transfn returned a pointer to its + * first input, we don't need to do anything. Also, if transfn returned a + * pointer to a R/W expanded object that is already a child of the + * aggcontext, assume we can adopt that value without copying it. + */ + if (!peraggstate->transtypeByVal && + DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue)) + { + if (!fcinfo->isnull) + { + MemoryContextSwitchTo(peraggstate->aggcontext); + if (DatumIsReadWriteExpandedObject(newVal, + false, + peraggstate->transtypeLen) && + MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext) + /* do nothing */ ; + else + newVal = datumCopy(newVal, + peraggstate->transtypeByVal, + peraggstate->transtypeLen); + } + if (!peraggstate->transValueIsNull) + { + if (DatumIsReadWriteExpandedObject(peraggstate->transValue, + false, + peraggstate->transtypeLen)) + DeleteExpandedObject(peraggstate->transValue); + else + pfree(DatumGetPointer(peraggstate->transValue)); + } + } + + MemoryContextSwitchTo(oldContext); + peraggstate->transValue = newVal; + peraggstate->transValueIsNull = fcinfo->isnull; +} + +/* + * advance_windowaggregate_base + * Remove the oldest tuple from an aggregation. + * + * This is very much like advance_windowaggregate, except that we will call + * the inverse transition function (which caller must have checked is + * available). + * + * Returns true if we successfully removed the current row from this + * aggregate, false if not (in the latter case, caller is responsible + * for cleaning up by restarting the aggregation). + */ +static bool +advance_windowaggregate_base(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate) +{ + LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS); + WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate; + int numArguments = perfuncstate->numArguments; + Datum newVal; + ListCell *arg; + int i; + MemoryContext oldContext; + ExprContext *econtext = winstate->tmpcontext; + ExprState *filter = wfuncstate->aggfilter; + + oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); + + /* Skip anything FILTERed out */ + if (filter) + { + bool isnull; + Datum res = ExecEvalExpr(filter, econtext, &isnull); + + if (isnull || !DatumGetBool(res)) + { + MemoryContextSwitchTo(oldContext); + return true; + } + } + + /* We start from 1, since the 0th arg will be the transition value */ + i = 1; + foreach(arg, wfuncstate->args) + { + ExprState *argstate = (ExprState *) lfirst(arg); + + fcinfo->args[i].value = ExecEvalExpr(argstate, econtext, + &fcinfo->args[i].isnull); + i++; + } + + if (peraggstate->invtransfn.fn_strict) + { + /* + * For a strict (inv)transfn, nothing happens when there's a NULL + * input; we just keep the prior transValue. Note transValueCount + * doesn't change either. + */ + for (i = 1; i <= numArguments; i++) + { + if (fcinfo->args[i].isnull) + { + MemoryContextSwitchTo(oldContext); + return true; + } + } + } + + /* There should still be an added but not yet removed value */ + Assert(peraggstate->transValueCount > 0); + + /* + * In moving-aggregate mode, the state must never be NULL, except possibly + * before any rows have been aggregated (which is surely not the case at + * this point). This restriction allows us to interpret a NULL result + * from the inverse function as meaning "sorry, can't do an inverse + * transition in this case". We already checked this in + * advance_windowaggregate, but just for safety, check again. + */ + if (peraggstate->transValueIsNull) + elog(ERROR, "aggregate transition value is NULL before inverse transition"); + + /* + * We mustn't use the inverse transition function to remove the last + * input. Doing so would yield a non-NULL state, whereas we should be in + * the initial state afterwards which may very well be NULL. So instead, + * we simply re-initialize the aggregate in this case. + */ + if (peraggstate->transValueCount == 1) + { + MemoryContextSwitchTo(oldContext); + initialize_windowaggregate(winstate, + &winstate->perfunc[peraggstate->wfuncno], + peraggstate); + return true; + } + + /* + * OK to call the inverse transition function. Set + * winstate->curaggcontext while calling it, for possible use by + * AggCheckCallContext. + */ + InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn), + numArguments + 1, + perfuncstate->winCollation, + (void *) winstate, NULL); + fcinfo->args[0].value = peraggstate->transValue; + fcinfo->args[0].isnull = peraggstate->transValueIsNull; + winstate->curaggcontext = peraggstate->aggcontext; + newVal = FunctionCallInvoke(fcinfo); + winstate->curaggcontext = NULL; + + /* + * If the function returns NULL, report failure, forcing a restart. + */ + if (fcinfo->isnull) + { + MemoryContextSwitchTo(oldContext); + return false; + } + + /* Update number of rows included in transValue */ + peraggstate->transValueCount--; + + /* + * If pass-by-ref datatype, must copy the new value into aggcontext and + * free the prior transValue. But if invtransfn returned a pointer to its + * first input, we don't need to do anything. Also, if invtransfn + * returned a pointer to a R/W expanded object that is already a child of + * the aggcontext, assume we can adopt that value without copying it. + * + * Note: the checks for null values here will never fire, but it seems + * best to have this stanza look just like advance_windowaggregate. + */ + if (!peraggstate->transtypeByVal && + DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue)) + { + if (!fcinfo->isnull) + { + MemoryContextSwitchTo(peraggstate->aggcontext); + if (DatumIsReadWriteExpandedObject(newVal, + false, + peraggstate->transtypeLen) && + MemoryContextGetParent(DatumGetEOHP(newVal)->eoh_context) == CurrentMemoryContext) + /* do nothing */ ; + else + newVal = datumCopy(newVal, + peraggstate->transtypeByVal, + peraggstate->transtypeLen); + } + if (!peraggstate->transValueIsNull) + { + if (DatumIsReadWriteExpandedObject(peraggstate->transValue, + false, + peraggstate->transtypeLen)) + DeleteExpandedObject(peraggstate->transValue); + else + pfree(DatumGetPointer(peraggstate->transValue)); + } + } + + MemoryContextSwitchTo(oldContext); + peraggstate->transValue = newVal; + peraggstate->transValueIsNull = fcinfo->isnull; + + return true; +} + +/* + * finalize_windowaggregate + * parallel to finalize_aggregate in nodeAgg.c + */ +static void +finalize_windowaggregate(WindowAggState *winstate, + WindowStatePerFunc perfuncstate, + WindowStatePerAgg peraggstate, + Datum *result, bool *isnull) +{ + MemoryContext oldContext; + + oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); + + /* + * Apply the agg's finalfn if one is provided, else return transValue. + */ + if (OidIsValid(peraggstate->finalfn_oid)) + { + LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS); + int numFinalArgs = peraggstate->numFinalArgs; + bool anynull; + int i; + + InitFunctionCallInfoData(fcinfodata.fcinfo, &(peraggstate->finalfn), + numFinalArgs, + perfuncstate->winCollation, + (void *) winstate, NULL); + fcinfo->args[0].value = + MakeExpandedObjectReadOnly(peraggstate->transValue, + peraggstate->transValueIsNull, + peraggstate->transtypeLen); + fcinfo->args[0].isnull = peraggstate->transValueIsNull; + anynull = peraggstate->transValueIsNull; + + /* Fill any remaining argument positions with nulls */ + for (i = 1; i < numFinalArgs; i++) + { + fcinfo->args[i].value = (Datum) 0; + fcinfo->args[i].isnull = true; + anynull = true; + } + + if (fcinfo->flinfo->fn_strict && anynull) + { + /* don't call a strict function with NULL inputs */ + *result = (Datum) 0; + *isnull = true; + } + else + { + winstate->curaggcontext = peraggstate->aggcontext; + *result = FunctionCallInvoke(fcinfo); + winstate->curaggcontext = NULL; + *isnull = fcinfo->isnull; + } + } + else + { + /* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */ + *result = peraggstate->transValue; + *isnull = peraggstate->transValueIsNull; + } + + /* + * If result is pass-by-ref, make sure it is in the right context. + */ + if (!peraggstate->resulttypeByVal && !*isnull && + !MemoryContextContains(CurrentMemoryContext, + DatumGetPointer(*result))) + *result = datumCopy(*result, + peraggstate->resulttypeByVal, + peraggstate->resulttypeLen); + MemoryContextSwitchTo(oldContext); +} + +/* + * eval_windowaggregates + * evaluate plain aggregates being used as window functions + * + * This differs from nodeAgg.c in two ways. First, if the window's frame + * start position moves, we use the inverse transition function (if it exists) + * to remove rows from the transition value. And second, we expect to be + * able to call aggregate final functions repeatedly after aggregating more + * data onto the same transition value. This is not a behavior required by + * nodeAgg.c. + */ +static void +eval_windowaggregates(WindowAggState *winstate) +{ + WindowStatePerAgg peraggstate; + int wfuncno, + numaggs, + numaggs_restart, + i; + int64 aggregatedupto_nonrestarted; + MemoryContext oldContext; + ExprContext *econtext; + WindowObject agg_winobj; + TupleTableSlot *agg_row_slot; + TupleTableSlot *temp_slot; + + numaggs = winstate->numaggs; + if (numaggs == 0) + return; /* nothing to do */ + + /* final output execution is in ps_ExprContext */ + econtext = winstate->ss.ps.ps_ExprContext; + agg_winobj = winstate->agg_winobj; + agg_row_slot = winstate->agg_row_slot; + temp_slot = winstate->temp_slot_1; + + /* + * If the window's frame start clause is UNBOUNDED_PRECEDING and no + * exclusion clause is specified, then the window frame consists of a + * contiguous group of rows extending forward from the start of the + * partition, and rows only enter the frame, never exit it, as the current + * row advances forward. This makes it possible to use an incremental + * strategy for evaluating aggregates: we run the transition function for + * each row added to the frame, and run the final function whenever we + * need the current aggregate value. This is considerably more efficient + * than the naive approach of re-running the entire aggregate calculation + * for each current row. It does assume that the final function doesn't + * damage the running transition value, but we have the same assumption in + * nodeAgg.c too (when it rescans an existing hash table). + * + * If the frame start does sometimes move, we can still optimize as above + * whenever successive rows share the same frame head, but if the frame + * head moves beyond the previous head we try to remove those rows using + * the aggregate's inverse transition function. This function restores + * the aggregate's current state to what it would be if the removed row + * had never been aggregated in the first place. Inverse transition + * functions may optionally return NULL, indicating that the function was + * unable to remove the tuple from aggregation. If this happens, or if + * the aggregate doesn't have an inverse transition function at all, we + * must perform the aggregation all over again for all tuples within the + * new frame boundaries. + * + * If there's any exclusion clause, then we may have to aggregate over a + * non-contiguous set of rows, so we punt and recalculate for every row. + * (For some frame end choices, it might be that the frame is always + * contiguous anyway, but that's an optimization to investigate later.) + * + * In many common cases, multiple rows share the same frame and hence the + * same aggregate value. (In particular, if there's no ORDER BY in a RANGE + * window, then all rows are peers and so they all have window frame equal + * to the whole partition.) We optimize such cases by calculating the + * aggregate value once when we reach the first row of a peer group, and + * then returning the saved value for all subsequent rows. + * + * 'aggregatedupto' keeps track of the first row that has not yet been + * accumulated into the aggregate transition values. Whenever we start a + * new peer group, we accumulate forward to the end of the peer group. + */ + + /* + * First, update the frame head position. + * + * The frame head should never move backwards, and the code below wouldn't + * cope if it did, so for safety we complain if it does. + */ + update_frameheadpos(winstate); + if (winstate->frameheadpos < winstate->aggregatedbase) + elog(ERROR, "window frame head moved backward"); + + /* + * If the frame didn't change compared to the previous row, we can re-use + * the result values that were previously saved at the bottom of this + * function. Since we don't know the current frame's end yet, this is not + * possible to check for fully. But if the frame end mode is UNBOUNDED + * FOLLOWING or CURRENT ROW, no exclusion clause is specified, and the + * current row lies within the previous row's frame, then the two frames' + * ends must coincide. Note that on the first row aggregatedbase == + * aggregatedupto, meaning this test must fail, so we don't need to check + * the "there was no previous row" case explicitly here. + */ + if (winstate->aggregatedbase == winstate->frameheadpos && + (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING | + FRAMEOPTION_END_CURRENT_ROW)) && + !(winstate->frameOptions & FRAMEOPTION_EXCLUSION) && + winstate->aggregatedbase <= winstate->currentpos && + winstate->aggregatedupto > winstate->currentpos) + { + for (i = 0; i < numaggs; i++) + { + peraggstate = &winstate->peragg[i]; + wfuncno = peraggstate->wfuncno; + econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue; + econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull; + } + return; + } + + /*---------- + * Initialize restart flags. + * + * We restart the aggregation: + * - if we're processing the first row in the partition, or + * - if the frame's head moved and we cannot use an inverse + * transition function, or + * - we have an EXCLUSION clause, or + * - if the new frame doesn't overlap the old one + * + * Note that we don't strictly need to restart in the last case, but if + * we're going to remove all rows from the aggregation anyway, a restart + * surely is faster. + *---------- + */ + numaggs_restart = 0; + for (i = 0; i < numaggs; i++) + { + peraggstate = &winstate->peragg[i]; + if (winstate->currentpos == 0 || + (winstate->aggregatedbase != winstate->frameheadpos && + !OidIsValid(peraggstate->invtransfn_oid)) || + (winstate->frameOptions & FRAMEOPTION_EXCLUSION) || + winstate->aggregatedupto <= winstate->frameheadpos) + { + peraggstate->restart = true; + numaggs_restart++; + } + else + peraggstate->restart = false; + } + + /* + * If we have any possibly-moving aggregates, attempt to advance + * aggregatedbase to match the frame's head by removing input rows that + * fell off the top of the frame from the aggregations. This can fail, + * i.e. advance_windowaggregate_base() can return false, in which case + * we'll restart that aggregate below. + */ + while (numaggs_restart < numaggs && + winstate->aggregatedbase < winstate->frameheadpos) + { + /* + * Fetch the next tuple of those being removed. This should never fail + * as we should have been here before. + */ + if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase, + temp_slot)) + elog(ERROR, "could not re-fetch previously fetched frame row"); + + /* Set tuple context for evaluation of aggregate arguments */ + winstate->tmpcontext->ecxt_outertuple = temp_slot; + + /* + * Perform the inverse transition for each aggregate function in the + * window, unless it has already been marked as needing a restart. + */ + for (i = 0; i < numaggs; i++) + { + bool ok; + + peraggstate = &winstate->peragg[i]; + if (peraggstate->restart) + continue; + + wfuncno = peraggstate->wfuncno; + ok = advance_windowaggregate_base(winstate, + &winstate->perfunc[wfuncno], + peraggstate); + if (!ok) + { + /* Inverse transition function has failed, must restart */ + peraggstate->restart = true; + numaggs_restart++; + } + } + + /* Reset per-input-tuple context after each tuple */ + ResetExprContext(winstate->tmpcontext); + + /* And advance the aggregated-row state */ + winstate->aggregatedbase++; + ExecClearTuple(temp_slot); + } + + /* + * If we successfully advanced the base rows of all the aggregates, + * aggregatedbase now equals frameheadpos; but if we failed for any, we + * must forcibly update aggregatedbase. + */ + winstate->aggregatedbase = winstate->frameheadpos; + + /* + * If we created a mark pointer for aggregates, keep it pushed up to frame + * head, so that tuplestore can discard unnecessary rows. + */ + if (agg_winobj->markptr >= 0) + WinSetMarkPosition(agg_winobj, winstate->frameheadpos); + + /* + * Now restart the aggregates that require it. + * + * We assume that aggregates using the shared context always restart if + * *any* aggregate restarts, and we may thus clean up the shared + * aggcontext if that is the case. Private aggcontexts are reset by + * initialize_windowaggregate() if their owning aggregate restarts. If we + * aren't restarting an aggregate, we need to free any previously saved + * result for it, else we'll leak memory. + */ + if (numaggs_restart > 0) + MemoryContextResetAndDeleteChildren(winstate->aggcontext); + for (i = 0; i < numaggs; i++) + { + peraggstate = &winstate->peragg[i]; + + /* Aggregates using the shared ctx must restart if *any* agg does */ + Assert(peraggstate->aggcontext != winstate->aggcontext || + numaggs_restart == 0 || + peraggstate->restart); + + if (peraggstate->restart) + { + wfuncno = peraggstate->wfuncno; + initialize_windowaggregate(winstate, + &winstate->perfunc[wfuncno], + peraggstate); + } + else if (!peraggstate->resultValueIsNull) + { + if (!peraggstate->resulttypeByVal) + pfree(DatumGetPointer(peraggstate->resultValue)); + peraggstate->resultValue = (Datum) 0; + peraggstate->resultValueIsNull = true; + } + } + + /* + * Non-restarted aggregates now contain the rows between aggregatedbase + * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates + * contain no rows. If there are any restarted aggregates, we must thus + * begin aggregating anew at frameheadpos, otherwise we may simply + * continue at aggregatedupto. We must remember the old value of + * aggregatedupto to know how long to skip advancing non-restarted + * aggregates. If we modify aggregatedupto, we must also clear + * agg_row_slot, per the loop invariant below. + */ + aggregatedupto_nonrestarted = winstate->aggregatedupto; + if (numaggs_restart > 0 && + winstate->aggregatedupto != winstate->frameheadpos) + { + winstate->aggregatedupto = winstate->frameheadpos; + ExecClearTuple(agg_row_slot); + } + + /* + * Advance until we reach a row not in frame (or end of partition). + * + * Note the loop invariant: agg_row_slot is either empty or holds the row + * at position aggregatedupto. We advance aggregatedupto after processing + * a row. + */ + for (;;) + { + int ret; + + /* Fetch next row if we didn't already */ + if (TupIsNull(agg_row_slot)) + { + if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto, + agg_row_slot)) + break; /* must be end of partition */ + } + + /* + * Exit loop if no more rows can be in frame. Skip aggregation if + * current row is not in frame but there might be more in the frame. + */ + ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot); + if (ret < 0) + break; + if (ret == 0) + goto next_tuple; + + /* Set tuple context for evaluation of aggregate arguments */ + winstate->tmpcontext->ecxt_outertuple = agg_row_slot; + + /* Accumulate row into the aggregates */ + for (i = 0; i < numaggs; i++) + { + peraggstate = &winstate->peragg[i]; + + /* Non-restarted aggs skip until aggregatedupto_nonrestarted */ + if (!peraggstate->restart && + winstate->aggregatedupto < aggregatedupto_nonrestarted) + continue; + + wfuncno = peraggstate->wfuncno; + advance_windowaggregate(winstate, + &winstate->perfunc[wfuncno], + peraggstate); + } + +next_tuple: + /* Reset per-input-tuple context after each tuple */ + ResetExprContext(winstate->tmpcontext); + + /* And advance the aggregated-row state */ + winstate->aggregatedupto++; + ExecClearTuple(agg_row_slot); + } + + /* The frame's end is not supposed to move backwards, ever */ + Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto); + + /* + * finalize aggregates and fill result/isnull fields. + */ + for (i = 0; i < numaggs; i++) + { + Datum *result; + bool *isnull; + + peraggstate = &winstate->peragg[i]; + wfuncno = peraggstate->wfuncno; + result = &econtext->ecxt_aggvalues[wfuncno]; + isnull = &econtext->ecxt_aggnulls[wfuncno]; + finalize_windowaggregate(winstate, + &winstate->perfunc[wfuncno], + peraggstate, + result, isnull); + + /* + * save the result in case next row shares the same frame. + * + * XXX in some framing modes, eg ROWS/END_CURRENT_ROW, we can know in + * advance that the next row can't possibly share the same frame. Is + * it worth detecting that and skipping this code? + */ + if (!peraggstate->resulttypeByVal && !*isnull) + { + oldContext = MemoryContextSwitchTo(peraggstate->aggcontext); + peraggstate->resultValue = + datumCopy(*result, + peraggstate->resulttypeByVal, + peraggstate->resulttypeLen); + MemoryContextSwitchTo(oldContext); + } + else + { + peraggstate->resultValue = *result; + } + peraggstate->resultValueIsNull = *isnull; + } +} + +/* + * eval_windowfunction + * + * Arguments of window functions are not evaluated here, because a window + * function can need random access to arbitrary rows in the partition. + * The window function uses the special WinGetFuncArgInPartition and + * WinGetFuncArgInFrame functions to evaluate the arguments for the rows + * it wants. + */ +static void +eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate, + Datum *result, bool *isnull) +{ + LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS); + MemoryContext oldContext; + + oldContext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory); + + /* + * We don't pass any normal arguments to a window function, but we do pass + * it the number of arguments, in order to permit window function + * implementations to support varying numbers of arguments. The real info + * goes through the WindowObject, which is passed via fcinfo->context. + */ + InitFunctionCallInfoData(*fcinfo, &(perfuncstate->flinfo), + perfuncstate->numArguments, + perfuncstate->winCollation, + (void *) perfuncstate->winobj, NULL); + /* Just in case, make all the regular argument slots be null */ + for (int argno = 0; argno < perfuncstate->numArguments; argno++) + fcinfo->args[argno].isnull = true; + /* Window functions don't have a current aggregate context, either */ + winstate->curaggcontext = NULL; + + *result = FunctionCallInvoke(fcinfo); + *isnull = fcinfo->isnull; + + /* + * Make sure pass-by-ref data is allocated in the appropriate context. (We + * need this in case the function returns a pointer into some short-lived + * tuple, as is entirely possible.) + */ + if (!perfuncstate->resulttypeByVal && !fcinfo->isnull && + !MemoryContextContains(CurrentMemoryContext, + DatumGetPointer(*result))) + *result = datumCopy(*result, + perfuncstate->resulttypeByVal, + perfuncstate->resulttypeLen); + + MemoryContextSwitchTo(oldContext); +} + +/* + * begin_partition + * Start buffering rows of the next partition. + */ +static void +begin_partition(WindowAggState *winstate) +{ + WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; + PlanState *outerPlan = outerPlanState(winstate); + int frameOptions = winstate->frameOptions; + int numfuncs = winstate->numfuncs; + int i; + + winstate->partition_spooled = false; + winstate->framehead_valid = false; + winstate->frametail_valid = false; + winstate->grouptail_valid = false; + winstate->spooled_rows = 0; + winstate->currentpos = 0; + winstate->frameheadpos = 0; + winstate->frametailpos = 0; + winstate->currentgroup = 0; + winstate->frameheadgroup = 0; + winstate->frametailgroup = 0; + winstate->groupheadpos = 0; + winstate->grouptailpos = -1; /* see update_grouptailpos */ + ExecClearTuple(winstate->agg_row_slot); + if (winstate->framehead_slot) + ExecClearTuple(winstate->framehead_slot); + if (winstate->frametail_slot) + ExecClearTuple(winstate->frametail_slot); + + /* + * If this is the very first partition, we need to fetch the first input + * row to store in first_part_slot. + */ + if (TupIsNull(winstate->first_part_slot)) + { + TupleTableSlot *outerslot = ExecProcNode(outerPlan); + + if (!TupIsNull(outerslot)) + ExecCopySlot(winstate->first_part_slot, outerslot); + else + { + /* outer plan is empty, so we have nothing to do */ + winstate->partition_spooled = true; + winstate->more_partitions = false; + return; + } + } + + /* Create new tuplestore for this partition */ + winstate->buffer = tuplestore_begin_heap(false, false, work_mem); + + /* + * Set up read pointers for the tuplestore. The current pointer doesn't + * need BACKWARD capability, but the per-window-function read pointers do, + * and the aggregate pointer does if we might need to restart aggregation. + */ + winstate->current_ptr = 0; /* read pointer 0 is pre-allocated */ + + /* reset default REWIND capability bit for current ptr */ + tuplestore_set_eflags(winstate->buffer, 0); + + /* create read pointers for aggregates, if needed */ + if (winstate->numaggs > 0) + { + WindowObject agg_winobj = winstate->agg_winobj; + int readptr_flags = 0; + + /* + * If the frame head is potentially movable, or we have an EXCLUSION + * clause, we might need to restart aggregation ... + */ + if (!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) || + (frameOptions & FRAMEOPTION_EXCLUSION)) + { + /* ... so create a mark pointer to track the frame head */ + agg_winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, 0); + /* and the read pointer will need BACKWARD capability */ + readptr_flags |= EXEC_FLAG_BACKWARD; + } + + agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer, + readptr_flags); + agg_winobj->markpos = -1; + agg_winobj->seekpos = -1; + + /* Also reset the row counters for aggregates */ + winstate->aggregatedbase = 0; + winstate->aggregatedupto = 0; + } + + /* create mark and read pointers for each real window function */ + for (i = 0; i < numfuncs; i++) + { + WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]); + + if (!perfuncstate->plain_agg) + { + WindowObject winobj = perfuncstate->winobj; + + winobj->markptr = tuplestore_alloc_read_pointer(winstate->buffer, + 0); + winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer, + EXEC_FLAG_BACKWARD); + winobj->markpos = -1; + winobj->seekpos = -1; + } + } + + /* + * If we are in RANGE or GROUPS mode, then determining frame boundaries + * requires physical access to the frame endpoint rows, except in certain + * degenerate cases. We create read pointers to point to those rows, to + * simplify access and ensure that the tuplestore doesn't discard the + * endpoint rows prematurely. (Must create pointers in exactly the same + * cases that update_frameheadpos and update_frametailpos need them.) + */ + winstate->framehead_ptr = winstate->frametail_ptr = -1; /* if not used */ + + if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) + { + if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) && + node->ordNumCols != 0) || + (frameOptions & FRAMEOPTION_START_OFFSET)) + winstate->framehead_ptr = + tuplestore_alloc_read_pointer(winstate->buffer, 0); + if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) && + node->ordNumCols != 0) || + (frameOptions & FRAMEOPTION_END_OFFSET)) + winstate->frametail_ptr = + tuplestore_alloc_read_pointer(winstate->buffer, 0); + } + + /* + * If we have an exclusion clause that requires knowing the boundaries of + * the current row's peer group, we create a read pointer to track the + * tail position of the peer group (i.e., first row of the next peer + * group). The head position does not require its own pointer because we + * maintain that as a side effect of advancing the current row. + */ + winstate->grouptail_ptr = -1; + + if ((frameOptions & (FRAMEOPTION_EXCLUDE_GROUP | + FRAMEOPTION_EXCLUDE_TIES)) && + node->ordNumCols != 0) + { + winstate->grouptail_ptr = + tuplestore_alloc_read_pointer(winstate->buffer, 0); + } + + /* + * Store the first tuple into the tuplestore (it's always available now; + * we either read it above, or saved it at the end of previous partition) + */ + tuplestore_puttupleslot(winstate->buffer, winstate->first_part_slot); + winstate->spooled_rows++; +} + +/* + * Read tuples from the outer node, up to and including position 'pos', and + * store them into the tuplestore. If pos is -1, reads the whole partition. + */ +static void +spool_tuples(WindowAggState *winstate, int64 pos) +{ + WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; + PlanState *outerPlan; + TupleTableSlot *outerslot; + MemoryContext oldcontext; + + if (!winstate->buffer) + return; /* just a safety check */ + if (winstate->partition_spooled) + return; /* whole partition done already */ + + /* + * If the tuplestore has spilled to disk, alternate reading and writing + * becomes quite expensive due to frequent buffer flushes. It's cheaper + * to force the entire partition to get spooled in one go. + * + * XXX this is a horrid kluge --- it'd be better to fix the performance + * problem inside tuplestore. FIXME + */ + if (!tuplestore_in_memory(winstate->buffer)) + pos = -1; + + outerPlan = outerPlanState(winstate); + + /* Must be in query context to call outerplan */ + oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); + + while (winstate->spooled_rows <= pos || pos == -1) + { + outerslot = ExecProcNode(outerPlan); + if (TupIsNull(outerslot)) + { + /* reached the end of the last partition */ + winstate->partition_spooled = true; + winstate->more_partitions = false; + break; + } + + if (node->partNumCols > 0) + { + ExprContext *econtext = winstate->tmpcontext; + + econtext->ecxt_innertuple = winstate->first_part_slot; + econtext->ecxt_outertuple = outerslot; + + /* Check if this tuple still belongs to the current partition */ + if (!ExecQualAndReset(winstate->partEqfunction, econtext)) + { + /* + * end of partition; copy the tuple for the next cycle. + */ + ExecCopySlot(winstate->first_part_slot, outerslot); + winstate->partition_spooled = true; + winstate->more_partitions = true; + break; + } + } + + /* Still in partition, so save it into the tuplestore */ + tuplestore_puttupleslot(winstate->buffer, outerslot); + winstate->spooled_rows++; + } + + MemoryContextSwitchTo(oldcontext); +} + +/* + * release_partition + * clear information kept within a partition, including + * tuplestore and aggregate results. + */ +static void +release_partition(WindowAggState *winstate) +{ + int i; + + for (i = 0; i < winstate->numfuncs; i++) + { + WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]); + + /* Release any partition-local state of this window function */ + if (perfuncstate->winobj) + perfuncstate->winobj->localmem = NULL; + } + + /* + * Release all partition-local memory (in particular, any partition-local + * state that we might have trashed our pointers to in the above loop, and + * any aggregate temp data). We don't rely on retail pfree because some + * aggregates might have allocated data we don't have direct pointers to. + */ + MemoryContextResetAndDeleteChildren(winstate->partcontext); + MemoryContextResetAndDeleteChildren(winstate->aggcontext); + for (i = 0; i < winstate->numaggs; i++) + { + if (winstate->peragg[i].aggcontext != winstate->aggcontext) + MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext); + } + + if (winstate->buffer) + tuplestore_end(winstate->buffer); + winstate->buffer = NULL; + winstate->partition_spooled = false; +} + +/* + * row_is_in_frame + * Determine whether a row is in the current row's window frame according + * to our window framing rule + * + * The caller must have already determined that the row is in the partition + * and fetched it into a slot. This function just encapsulates the framing + * rules. + * + * Returns: + * -1, if the row is out of frame and no succeeding rows can be in frame + * 0, if the row is out of frame but succeeding rows might be in frame + * 1, if the row is in frame + * + * May clobber winstate->temp_slot_2. + */ +static int +row_is_in_frame(WindowAggState *winstate, int64 pos, TupleTableSlot *slot) +{ + int frameOptions = winstate->frameOptions; + + Assert(pos >= 0); /* else caller error */ + + /* + * First, check frame starting conditions. We might as well delegate this + * to update_frameheadpos always; it doesn't add any notable cost. + */ + update_frameheadpos(winstate); + if (pos < winstate->frameheadpos) + return 0; + + /* + * Okay so far, now check frame ending conditions. Here, we avoid calling + * update_frametailpos in simple cases, so as not to spool tuples further + * ahead than necessary. + */ + if (frameOptions & FRAMEOPTION_END_CURRENT_ROW) + { + if (frameOptions & FRAMEOPTION_ROWS) + { + /* rows after current row are out of frame */ + if (pos > winstate->currentpos) + return -1; + } + else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) + { + /* following row that is not peer is out of frame */ + if (pos > winstate->currentpos && + !are_peers(winstate, slot, winstate->ss.ss_ScanTupleSlot)) + return -1; + } + else + Assert(false); + } + else if (frameOptions & FRAMEOPTION_END_OFFSET) + { + if (frameOptions & FRAMEOPTION_ROWS) + { + int64 offset = DatumGetInt64(winstate->endOffsetValue); + + /* rows after current row + offset are out of frame */ + if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING) + offset = -offset; + + if (pos > winstate->currentpos + offset) + return -1; + } + else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) + { + /* hard cases, so delegate to update_frametailpos */ + update_frametailpos(winstate); + if (pos >= winstate->frametailpos) + return -1; + } + else + Assert(false); + } + + /* Check exclusion clause */ + if (frameOptions & FRAMEOPTION_EXCLUDE_CURRENT_ROW) + { + if (pos == winstate->currentpos) + return 0; + } + else if ((frameOptions & FRAMEOPTION_EXCLUDE_GROUP) || + ((frameOptions & FRAMEOPTION_EXCLUDE_TIES) && + pos != winstate->currentpos)) + { + WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; + + /* If no ORDER BY, all rows are peers with each other */ + if (node->ordNumCols == 0) + return 0; + /* Otherwise, check the group boundaries */ + if (pos >= winstate->groupheadpos) + { + update_grouptailpos(winstate); + if (pos < winstate->grouptailpos) + return 0; + } + } + + /* If we get here, it's in frame */ + return 1; +} + +/* + * update_frameheadpos + * make frameheadpos valid for the current row + * + * Note that frameheadpos is computed without regard for any window exclusion + * clause; the current row and/or its peers are considered part of the frame + * for this purpose even if they must be excluded later. + * + * May clobber winstate->temp_slot_2. + */ +static void +update_frameheadpos(WindowAggState *winstate) +{ + WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; + int frameOptions = winstate->frameOptions; + MemoryContext oldcontext; + + if (winstate->framehead_valid) + return; /* already known for current row */ + + /* We may be called in a short-lived context */ + oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); + + if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) + { + /* In UNBOUNDED PRECEDING mode, frame head is always row 0 */ + winstate->frameheadpos = 0; + winstate->framehead_valid = true; + } + else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW) + { + if (frameOptions & FRAMEOPTION_ROWS) + { + /* In ROWS mode, frame head is the same as current */ + winstate->frameheadpos = winstate->currentpos; + winstate->framehead_valid = true; + } + else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) + { + /* If no ORDER BY, all rows are peers with each other */ + if (node->ordNumCols == 0) + { + winstate->frameheadpos = 0; + winstate->framehead_valid = true; + MemoryContextSwitchTo(oldcontext); + return; + } + + /* + * In RANGE or GROUPS START_CURRENT_ROW mode, frame head is the + * first row that is a peer of current row. We keep a copy of the + * last-known frame head row in framehead_slot, and advance as + * necessary. Note that if we reach end of partition, we will + * leave frameheadpos = end+1 and framehead_slot empty. + */ + tuplestore_select_read_pointer(winstate->buffer, + winstate->framehead_ptr); + if (winstate->frameheadpos == 0 && + TupIsNull(winstate->framehead_slot)) + { + /* fetch first row into framehead_slot, if we didn't already */ + if (!tuplestore_gettupleslot(winstate->buffer, true, true, + winstate->framehead_slot)) + elog(ERROR, "unexpected end of tuplestore"); + } + + while (!TupIsNull(winstate->framehead_slot)) + { + if (are_peers(winstate, winstate->framehead_slot, + winstate->ss.ss_ScanTupleSlot)) + break; /* this row is the correct frame head */ + /* Note we advance frameheadpos even if the fetch fails */ + winstate->frameheadpos++; + spool_tuples(winstate, winstate->frameheadpos); + if (!tuplestore_gettupleslot(winstate->buffer, true, true, + winstate->framehead_slot)) + break; /* end of partition */ + } + winstate->framehead_valid = true; + } + else + Assert(false); + } + else if (frameOptions & FRAMEOPTION_START_OFFSET) + { + if (frameOptions & FRAMEOPTION_ROWS) + { + /* In ROWS mode, bound is physically n before/after current */ + int64 offset = DatumGetInt64(winstate->startOffsetValue); + + if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING) + offset = -offset; + + winstate->frameheadpos = winstate->currentpos + offset; + /* frame head can't go before first row */ + if (winstate->frameheadpos < 0) + winstate->frameheadpos = 0; + else if (winstate->frameheadpos > winstate->currentpos + 1) + { + /* make sure frameheadpos is not past end of partition */ + spool_tuples(winstate, winstate->frameheadpos - 1); + if (winstate->frameheadpos > winstate->spooled_rows) + winstate->frameheadpos = winstate->spooled_rows; + } + winstate->framehead_valid = true; + } + else if (frameOptions & FRAMEOPTION_RANGE) + { + /* + * In RANGE START_OFFSET mode, frame head is the first row that + * satisfies the in_range constraint relative to the current row. + * We keep a copy of the last-known frame head row in + * framehead_slot, and advance as necessary. Note that if we + * reach end of partition, we will leave frameheadpos = end+1 and + * framehead_slot empty. + */ + int sortCol = node->ordColIdx[0]; + bool sub, + less; + + /* We must have an ordering column */ + Assert(node->ordNumCols == 1); + + /* Precompute flags for in_range checks */ + if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING) + sub = true; /* subtract startOffset from current row */ + else + sub = false; /* add it */ + less = false; /* normally, we want frame head >= sum */ + /* If sort order is descending, flip both flags */ + if (!winstate->inRangeAsc) + { + sub = !sub; + less = true; + } + + tuplestore_select_read_pointer(winstate->buffer, + winstate->framehead_ptr); + if (winstate->frameheadpos == 0 && + TupIsNull(winstate->framehead_slot)) + { + /* fetch first row into framehead_slot, if we didn't already */ + if (!tuplestore_gettupleslot(winstate->buffer, true, true, + winstate->framehead_slot)) + elog(ERROR, "unexpected end of tuplestore"); + } + + while (!TupIsNull(winstate->framehead_slot)) + { + Datum headval, + currval; + bool headisnull, + currisnull; + + headval = slot_getattr(winstate->framehead_slot, sortCol, + &headisnull); + currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol, + &currisnull); + if (headisnull || currisnull) + { + /* order of the rows depends only on nulls_first */ + if (winstate->inRangeNullsFirst) + { + /* advance head if head is null and curr is not */ + if (!headisnull || currisnull) + break; + } + else + { + /* advance head if head is not null and curr is null */ + if (headisnull || !currisnull) + break; + } + } + else + { + if (DatumGetBool(FunctionCall5Coll(&winstate->startInRangeFunc, + winstate->inRangeColl, + headval, + currval, + winstate->startOffsetValue, + BoolGetDatum(sub), + BoolGetDatum(less)))) + break; /* this row is the correct frame head */ + } + /* Note we advance frameheadpos even if the fetch fails */ + winstate->frameheadpos++; + spool_tuples(winstate, winstate->frameheadpos); + if (!tuplestore_gettupleslot(winstate->buffer, true, true, + winstate->framehead_slot)) + break; /* end of partition */ + } + winstate->framehead_valid = true; + } + else if (frameOptions & FRAMEOPTION_GROUPS) + { + /* + * In GROUPS START_OFFSET mode, frame head is the first row of the + * first peer group whose number satisfies the offset constraint. + * We keep a copy of the last-known frame head row in + * framehead_slot, and advance as necessary. Note that if we + * reach end of partition, we will leave frameheadpos = end+1 and + * framehead_slot empty. + */ + int64 offset = DatumGetInt64(winstate->startOffsetValue); + int64 minheadgroup; + + if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING) + minheadgroup = winstate->currentgroup - offset; + else + minheadgroup = winstate->currentgroup + offset; + + tuplestore_select_read_pointer(winstate->buffer, + winstate->framehead_ptr); + if (winstate->frameheadpos == 0 && + TupIsNull(winstate->framehead_slot)) + { + /* fetch first row into framehead_slot, if we didn't already */ + if (!tuplestore_gettupleslot(winstate->buffer, true, true, + winstate->framehead_slot)) + elog(ERROR, "unexpected end of tuplestore"); + } + + while (!TupIsNull(winstate->framehead_slot)) + { + if (winstate->frameheadgroup >= minheadgroup) + break; /* this row is the correct frame head */ + ExecCopySlot(winstate->temp_slot_2, winstate->framehead_slot); + /* Note we advance frameheadpos even if the fetch fails */ + winstate->frameheadpos++; + spool_tuples(winstate, winstate->frameheadpos); + if (!tuplestore_gettupleslot(winstate->buffer, true, true, + winstate->framehead_slot)) + break; /* end of partition */ + if (!are_peers(winstate, winstate->temp_slot_2, + winstate->framehead_slot)) + winstate->frameheadgroup++; + } + ExecClearTuple(winstate->temp_slot_2); + winstate->framehead_valid = true; + } + else + Assert(false); + } + else + Assert(false); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * update_frametailpos + * make frametailpos valid for the current row + * + * Note that frametailpos is computed without regard for any window exclusion + * clause; the current row and/or its peers are considered part of the frame + * for this purpose even if they must be excluded later. + * + * May clobber winstate->temp_slot_2. + */ +static void +update_frametailpos(WindowAggState *winstate) +{ + WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; + int frameOptions = winstate->frameOptions; + MemoryContext oldcontext; + + if (winstate->frametail_valid) + return; /* already known for current row */ + + /* We may be called in a short-lived context */ + oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); + + if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING) + { + /* In UNBOUNDED FOLLOWING mode, all partition rows are in frame */ + spool_tuples(winstate, -1); + winstate->frametailpos = winstate->spooled_rows; + winstate->frametail_valid = true; + } + else if (frameOptions & FRAMEOPTION_END_CURRENT_ROW) + { + if (frameOptions & FRAMEOPTION_ROWS) + { + /* In ROWS mode, exactly the rows up to current are in frame */ + winstate->frametailpos = winstate->currentpos + 1; + winstate->frametail_valid = true; + } + else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) + { + /* If no ORDER BY, all rows are peers with each other */ + if (node->ordNumCols == 0) + { + spool_tuples(winstate, -1); + winstate->frametailpos = winstate->spooled_rows; + winstate->frametail_valid = true; + MemoryContextSwitchTo(oldcontext); + return; + } + + /* + * In RANGE or GROUPS END_CURRENT_ROW mode, frame end is the last + * row that is a peer of current row, frame tail is the row after + * that (if any). We keep a copy of the last-known frame tail row + * in frametail_slot, and advance as necessary. Note that if we + * reach end of partition, we will leave frametailpos = end+1 and + * frametail_slot empty. + */ + tuplestore_select_read_pointer(winstate->buffer, + winstate->frametail_ptr); + if (winstate->frametailpos == 0 && + TupIsNull(winstate->frametail_slot)) + { + /* fetch first row into frametail_slot, if we didn't already */ + if (!tuplestore_gettupleslot(winstate->buffer, true, true, + winstate->frametail_slot)) + elog(ERROR, "unexpected end of tuplestore"); + } + + while (!TupIsNull(winstate->frametail_slot)) + { + if (winstate->frametailpos > winstate->currentpos && + !are_peers(winstate, winstate->frametail_slot, + winstate->ss.ss_ScanTupleSlot)) + break; /* this row is the frame tail */ + /* Note we advance frametailpos even if the fetch fails */ + winstate->frametailpos++; + spool_tuples(winstate, winstate->frametailpos); + if (!tuplestore_gettupleslot(winstate->buffer, true, true, + winstate->frametail_slot)) + break; /* end of partition */ + } + winstate->frametail_valid = true; + } + else + Assert(false); + } + else if (frameOptions & FRAMEOPTION_END_OFFSET) + { + if (frameOptions & FRAMEOPTION_ROWS) + { + /* In ROWS mode, bound is physically n before/after current */ + int64 offset = DatumGetInt64(winstate->endOffsetValue); + + if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING) + offset = -offset; + + winstate->frametailpos = winstate->currentpos + offset + 1; + /* smallest allowable value of frametailpos is 0 */ + if (winstate->frametailpos < 0) + winstate->frametailpos = 0; + else if (winstate->frametailpos > winstate->currentpos + 1) + { + /* make sure frametailpos is not past end of partition */ + spool_tuples(winstate, winstate->frametailpos - 1); + if (winstate->frametailpos > winstate->spooled_rows) + winstate->frametailpos = winstate->spooled_rows; + } + winstate->frametail_valid = true; + } + else if (frameOptions & FRAMEOPTION_RANGE) + { + /* + * In RANGE END_OFFSET mode, frame end is the last row that + * satisfies the in_range constraint relative to the current row, + * frame tail is the row after that (if any). We keep a copy of + * the last-known frame tail row in frametail_slot, and advance as + * necessary. Note that if we reach end of partition, we will + * leave frametailpos = end+1 and frametail_slot empty. + */ + int sortCol = node->ordColIdx[0]; + bool sub, + less; + + /* We must have an ordering column */ + Assert(node->ordNumCols == 1); + + /* Precompute flags for in_range checks */ + if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING) + sub = true; /* subtract endOffset from current row */ + else + sub = false; /* add it */ + less = true; /* normally, we want frame tail <= sum */ + /* If sort order is descending, flip both flags */ + if (!winstate->inRangeAsc) + { + sub = !sub; + less = false; + } + + tuplestore_select_read_pointer(winstate->buffer, + winstate->frametail_ptr); + if (winstate->frametailpos == 0 && + TupIsNull(winstate->frametail_slot)) + { + /* fetch first row into frametail_slot, if we didn't already */ + if (!tuplestore_gettupleslot(winstate->buffer, true, true, + winstate->frametail_slot)) + elog(ERROR, "unexpected end of tuplestore"); + } + + while (!TupIsNull(winstate->frametail_slot)) + { + Datum tailval, + currval; + bool tailisnull, + currisnull; + + tailval = slot_getattr(winstate->frametail_slot, sortCol, + &tailisnull); + currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol, + &currisnull); + if (tailisnull || currisnull) + { + /* order of the rows depends only on nulls_first */ + if (winstate->inRangeNullsFirst) + { + /* advance tail if tail is null or curr is not */ + if (!tailisnull) + break; + } + else + { + /* advance tail if tail is not null or curr is null */ + if (!currisnull) + break; + } + } + else + { + if (!DatumGetBool(FunctionCall5Coll(&winstate->endInRangeFunc, + winstate->inRangeColl, + tailval, + currval, + winstate->endOffsetValue, + BoolGetDatum(sub), + BoolGetDatum(less)))) + break; /* this row is the correct frame tail */ + } + /* Note we advance frametailpos even if the fetch fails */ + winstate->frametailpos++; + spool_tuples(winstate, winstate->frametailpos); + if (!tuplestore_gettupleslot(winstate->buffer, true, true, + winstate->frametail_slot)) + break; /* end of partition */ + } + winstate->frametail_valid = true; + } + else if (frameOptions & FRAMEOPTION_GROUPS) + { + /* + * In GROUPS END_OFFSET mode, frame end is the last row of the + * last peer group whose number satisfies the offset constraint, + * and frame tail is the row after that (if any). We keep a copy + * of the last-known frame tail row in frametail_slot, and advance + * as necessary. Note that if we reach end of partition, we will + * leave frametailpos = end+1 and frametail_slot empty. + */ + int64 offset = DatumGetInt64(winstate->endOffsetValue); + int64 maxtailgroup; + + if (frameOptions & FRAMEOPTION_END_OFFSET_PRECEDING) + maxtailgroup = winstate->currentgroup - offset; + else + maxtailgroup = winstate->currentgroup + offset; + + tuplestore_select_read_pointer(winstate->buffer, + winstate->frametail_ptr); + if (winstate->frametailpos == 0 && + TupIsNull(winstate->frametail_slot)) + { + /* fetch first row into frametail_slot, if we didn't already */ + if (!tuplestore_gettupleslot(winstate->buffer, true, true, + winstate->frametail_slot)) + elog(ERROR, "unexpected end of tuplestore"); + } + + while (!TupIsNull(winstate->frametail_slot)) + { + if (winstate->frametailgroup > maxtailgroup) + break; /* this row is the correct frame tail */ + ExecCopySlot(winstate->temp_slot_2, winstate->frametail_slot); + /* Note we advance frametailpos even if the fetch fails */ + winstate->frametailpos++; + spool_tuples(winstate, winstate->frametailpos); + if (!tuplestore_gettupleslot(winstate->buffer, true, true, + winstate->frametail_slot)) + break; /* end of partition */ + if (!are_peers(winstate, winstate->temp_slot_2, + winstate->frametail_slot)) + winstate->frametailgroup++; + } + ExecClearTuple(winstate->temp_slot_2); + winstate->frametail_valid = true; + } + else + Assert(false); + } + else + Assert(false); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * update_grouptailpos + * make grouptailpos valid for the current row + * + * May clobber winstate->temp_slot_2. + */ +static void +update_grouptailpos(WindowAggState *winstate) +{ + WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; + MemoryContext oldcontext; + + if (winstate->grouptail_valid) + return; /* already known for current row */ + + /* We may be called in a short-lived context */ + oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); + + /* If no ORDER BY, all rows are peers with each other */ + if (node->ordNumCols == 0) + { + spool_tuples(winstate, -1); + winstate->grouptailpos = winstate->spooled_rows; + winstate->grouptail_valid = true; + MemoryContextSwitchTo(oldcontext); + return; + } + + /* + * Because grouptail_valid is reset only when current row advances into a + * new peer group, we always reach here knowing that grouptailpos needs to + * be advanced by at least one row. Hence, unlike the otherwise similar + * case for frame tail tracking, we do not need persistent storage of the + * group tail row. + */ + Assert(winstate->grouptailpos <= winstate->currentpos); + tuplestore_select_read_pointer(winstate->buffer, + winstate->grouptail_ptr); + for (;;) + { + /* Note we advance grouptailpos even if the fetch fails */ + winstate->grouptailpos++; + spool_tuples(winstate, winstate->grouptailpos); + if (!tuplestore_gettupleslot(winstate->buffer, true, true, + winstate->temp_slot_2)) + break; /* end of partition */ + if (winstate->grouptailpos > winstate->currentpos && + !are_peers(winstate, winstate->temp_slot_2, + winstate->ss.ss_ScanTupleSlot)) + break; /* this row is the group tail */ + } + ExecClearTuple(winstate->temp_slot_2); + winstate->grouptail_valid = true; + + MemoryContextSwitchTo(oldcontext); +} + + +/* ----------------- + * ExecWindowAgg + * + * ExecWindowAgg receives tuples from its outer subplan and + * stores them into a tuplestore, then processes window functions. + * This node doesn't reduce nor qualify any row so the number of + * returned rows is exactly the same as its outer subplan's result. + * ----------------- + */ +static TupleTableSlot * +ExecWindowAgg(PlanState *pstate) +{ + WindowAggState *winstate = castNode(WindowAggState, pstate); + ExprContext *econtext; + int i; + int numfuncs; + + CHECK_FOR_INTERRUPTS(); + + if (winstate->all_done) + return NULL; + + /* + * Compute frame offset values, if any, during first call (or after a + * rescan). These are assumed to hold constant throughout the scan; if + * user gives us a volatile expression, we'll only use its initial value. + */ + if (winstate->all_first) + { + int frameOptions = winstate->frameOptions; + ExprContext *econtext = winstate->ss.ps.ps_ExprContext; + Datum value; + bool isnull; + int16 len; + bool byval; + + if (frameOptions & FRAMEOPTION_START_OFFSET) + { + Assert(winstate->startOffset != NULL); + value = ExecEvalExprSwitchContext(winstate->startOffset, + econtext, + &isnull); + if (isnull) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("frame starting offset must not be null"))); + /* copy value into query-lifespan context */ + get_typlenbyval(exprType((Node *) winstate->startOffset->expr), + &len, &byval); + winstate->startOffsetValue = datumCopy(value, byval, len); + if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS)) + { + /* value is known to be int8 */ + int64 offset = DatumGetInt64(value); + + if (offset < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE), + errmsg("frame starting offset must not be negative"))); + } + } + if (frameOptions & FRAMEOPTION_END_OFFSET) + { + Assert(winstate->endOffset != NULL); + value = ExecEvalExprSwitchContext(winstate->endOffset, + econtext, + &isnull); + if (isnull) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("frame ending offset must not be null"))); + /* copy value into query-lifespan context */ + get_typlenbyval(exprType((Node *) winstate->endOffset->expr), + &len, &byval); + winstate->endOffsetValue = datumCopy(value, byval, len); + if (frameOptions & (FRAMEOPTION_ROWS | FRAMEOPTION_GROUPS)) + { + /* value is known to be int8 */ + int64 offset = DatumGetInt64(value); + + if (offset < 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PRECEDING_OR_FOLLOWING_SIZE), + errmsg("frame ending offset must not be negative"))); + } + } + winstate->all_first = false; + } + + if (winstate->buffer == NULL) + { + /* Initialize for first partition and set current row = 0 */ + begin_partition(winstate); + /* If there are no input rows, we'll detect that and exit below */ + } + else + { + /* Advance current row within partition */ + winstate->currentpos++; + /* This might mean that the frame moves, too */ + winstate->framehead_valid = false; + winstate->frametail_valid = false; + /* we don't need to invalidate grouptail here; see below */ + } + + /* + * Spool all tuples up to and including the current row, if we haven't + * already + */ + spool_tuples(winstate, winstate->currentpos); + + /* Move to the next partition if we reached the end of this partition */ + if (winstate->partition_spooled && + winstate->currentpos >= winstate->spooled_rows) + { + release_partition(winstate); + + if (winstate->more_partitions) + { + begin_partition(winstate); + Assert(winstate->spooled_rows > 0); + } + else + { + winstate->all_done = true; + return NULL; + } + } + + /* final output execution is in ps_ExprContext */ + econtext = winstate->ss.ps.ps_ExprContext; + + /* Clear the per-output-tuple context for current row */ + ResetExprContext(econtext); + + /* + * Read the current row from the tuplestore, and save in ScanTupleSlot. + * (We can't rely on the outerplan's output slot because we may have to + * read beyond the current row. Also, we have to actually copy the row + * out of the tuplestore, since window function evaluation might cause the + * tuplestore to dump its state to disk.) + * + * In GROUPS mode, or when tracking a group-oriented exclusion clause, we + * must also detect entering a new peer group and update associated state + * when that happens. We use temp_slot_2 to temporarily hold the previous + * row for this purpose. + * + * Current row must be in the tuplestore, since we spooled it above. + */ + tuplestore_select_read_pointer(winstate->buffer, winstate->current_ptr); + if ((winstate->frameOptions & (FRAMEOPTION_GROUPS | + FRAMEOPTION_EXCLUDE_GROUP | + FRAMEOPTION_EXCLUDE_TIES)) && + winstate->currentpos > 0) + { + ExecCopySlot(winstate->temp_slot_2, winstate->ss.ss_ScanTupleSlot); + if (!tuplestore_gettupleslot(winstate->buffer, true, true, + winstate->ss.ss_ScanTupleSlot)) + elog(ERROR, "unexpected end of tuplestore"); + if (!are_peers(winstate, winstate->temp_slot_2, + winstate->ss.ss_ScanTupleSlot)) + { + winstate->currentgroup++; + winstate->groupheadpos = winstate->currentpos; + winstate->grouptail_valid = false; + } + ExecClearTuple(winstate->temp_slot_2); + } + else + { + if (!tuplestore_gettupleslot(winstate->buffer, true, true, + winstate->ss.ss_ScanTupleSlot)) + elog(ERROR, "unexpected end of tuplestore"); + } + + /* + * Evaluate true window functions + */ + numfuncs = winstate->numfuncs; + for (i = 0; i < numfuncs; i++) + { + WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]); + + if (perfuncstate->plain_agg) + continue; + eval_windowfunction(winstate, perfuncstate, + &(econtext->ecxt_aggvalues[perfuncstate->wfuncstate->wfuncno]), + &(econtext->ecxt_aggnulls[perfuncstate->wfuncstate->wfuncno])); + } + + /* + * Evaluate aggregates + */ + if (winstate->numaggs > 0) + eval_windowaggregates(winstate); + + /* + * If we have created auxiliary read pointers for the frame or group + * boundaries, force them to be kept up-to-date, because we don't know + * whether the window function(s) will do anything that requires that. + * Failing to advance the pointers would result in being unable to trim + * data from the tuplestore, which is bad. (If we could know in advance + * whether the window functions will use frame boundary info, we could + * skip creating these pointers in the first place ... but unfortunately + * the window function API doesn't require that.) + */ + if (winstate->framehead_ptr >= 0) + update_frameheadpos(winstate); + if (winstate->frametail_ptr >= 0) + update_frametailpos(winstate); + if (winstate->grouptail_ptr >= 0) + update_grouptailpos(winstate); + + /* + * Truncate any no-longer-needed rows from the tuplestore. + */ + tuplestore_trim(winstate->buffer); + + /* + * Form and return a projection tuple using the windowfunc results and the + * current row. Setting ecxt_outertuple arranges that any Vars will be + * evaluated with respect to that row. + */ + econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot; + + return ExecProject(winstate->ss.ps.ps_ProjInfo); +} + +/* ----------------- + * ExecInitWindowAgg + * + * Creates the run-time information for the WindowAgg node produced by the + * planner and initializes its outer subtree + * ----------------- + */ +WindowAggState * +ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) +{ + WindowAggState *winstate; + Plan *outerPlan; + ExprContext *econtext; + ExprContext *tmpcontext; + WindowStatePerFunc perfunc; + WindowStatePerAgg peragg; + int frameOptions = node->frameOptions; + int numfuncs, + wfuncno, + numaggs, + aggno; + TupleDesc scanDesc; + ListCell *l; + + /* check for unsupported flags */ + Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); + + /* + * create state structure + */ + winstate = makeNode(WindowAggState); + winstate->ss.ps.plan = (Plan *) node; + winstate->ss.ps.state = estate; + winstate->ss.ps.ExecProcNode = ExecWindowAgg; + + /* + * Create expression contexts. We need two, one for per-input-tuple + * processing and one for per-output-tuple processing. We cheat a little + * by using ExecAssignExprContext() to build both. + */ + ExecAssignExprContext(estate, &winstate->ss.ps); + tmpcontext = winstate->ss.ps.ps_ExprContext; + winstate->tmpcontext = tmpcontext; + ExecAssignExprContext(estate, &winstate->ss.ps); + + /* Create long-lived context for storage of partition-local memory etc */ + winstate->partcontext = + AllocSetContextCreate(CurrentMemoryContext, + "WindowAgg Partition", + ALLOCSET_DEFAULT_SIZES); + + /* + * Create mid-lived context for aggregate trans values etc. + * + * Note that moving aggregates each use their own private context, not + * this one. + */ + winstate->aggcontext = + AllocSetContextCreate(CurrentMemoryContext, + "WindowAgg Aggregates", + ALLOCSET_DEFAULT_SIZES); + + /* + * WindowAgg nodes never have quals, since they can only occur at the + * logical top level of a query (ie, after any WHERE or HAVING filters) + */ + Assert(node->plan.qual == NIL); + winstate->ss.ps.qual = NULL; + + /* + * initialize child nodes + */ + outerPlan = outerPlan(node); + outerPlanState(winstate) = ExecInitNode(outerPlan, estate, eflags); + + /* + * initialize source tuple type (which is also the tuple type that we'll + * store in the tuplestore and use in all our working slots). + */ + ExecCreateScanSlotFromOuterPlan(estate, &winstate->ss, &TTSOpsMinimalTuple); + scanDesc = winstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; + + /* the outer tuple isn't the child's tuple, but always a minimal tuple */ + winstate->ss.ps.outeropsset = true; + winstate->ss.ps.outerops = &TTSOpsMinimalTuple; + winstate->ss.ps.outeropsfixed = true; + + /* + * tuple table initialization + */ + winstate->first_part_slot = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsMinimalTuple); + winstate->agg_row_slot = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsMinimalTuple); + winstate->temp_slot_1 = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsMinimalTuple); + winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsMinimalTuple); + + /* + * create frame head and tail slots only if needed (must create slots in + * exactly the same cases that update_frameheadpos and update_frametailpos + * need them) + */ + winstate->framehead_slot = winstate->frametail_slot = NULL; + + if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS)) + { + if (((frameOptions & FRAMEOPTION_START_CURRENT_ROW) && + node->ordNumCols != 0) || + (frameOptions & FRAMEOPTION_START_OFFSET)) + winstate->framehead_slot = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsMinimalTuple); + if (((frameOptions & FRAMEOPTION_END_CURRENT_ROW) && + node->ordNumCols != 0) || + (frameOptions & FRAMEOPTION_END_OFFSET)) + winstate->frametail_slot = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsMinimalTuple); + } + + /* + * Initialize result slot, type and projection. + */ + ExecInitResultTupleSlotTL(&winstate->ss.ps, &TTSOpsVirtual); + ExecAssignProjectionInfo(&winstate->ss.ps, NULL); + + /* Set up data for comparing tuples */ + if (node->partNumCols > 0) + winstate->partEqfunction = + execTuplesMatchPrepare(scanDesc, + node->partNumCols, + node->partColIdx, + node->partOperators, + node->partCollations, + &winstate->ss.ps); + + if (node->ordNumCols > 0) + winstate->ordEqfunction = + execTuplesMatchPrepare(scanDesc, + node->ordNumCols, + node->ordColIdx, + node->ordOperators, + node->ordCollations, + &winstate->ss.ps); + + /* + * WindowAgg nodes use aggvalues and aggnulls as well as Agg nodes. + */ + numfuncs = winstate->numfuncs; + numaggs = winstate->numaggs; + econtext = winstate->ss.ps.ps_ExprContext; + econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numfuncs); + econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numfuncs); + + /* + * allocate per-wfunc/per-agg state information. + */ + perfunc = (WindowStatePerFunc) palloc0(sizeof(WindowStatePerFuncData) * numfuncs); + peragg = (WindowStatePerAgg) palloc0(sizeof(WindowStatePerAggData) * numaggs); + winstate->perfunc = perfunc; + winstate->peragg = peragg; + + wfuncno = -1; + aggno = -1; + foreach(l, winstate->funcs) + { + WindowFuncExprState *wfuncstate = (WindowFuncExprState *) lfirst(l); + WindowFunc *wfunc = wfuncstate->wfunc; + WindowStatePerFunc perfuncstate; + AclResult aclresult; + int i; + + if (wfunc->winref != node->winref) /* planner screwed up? */ + elog(ERROR, "WindowFunc with winref %u assigned to WindowAgg with winref %u", + wfunc->winref, node->winref); + + /* Look for a previous duplicate window function */ + for (i = 0; i <= wfuncno; i++) + { + if (equal(wfunc, perfunc[i].wfunc) && + !contain_volatile_functions((Node *) wfunc)) + break; + } + if (i <= wfuncno) + { + /* Found a match to an existing entry, so just mark it */ + wfuncstate->wfuncno = i; + continue; + } + + /* Nope, so assign a new PerAgg record */ + perfuncstate = &perfunc[++wfuncno]; + + /* Mark WindowFunc state node with assigned index in the result array */ + wfuncstate->wfuncno = wfuncno; + + /* Check permission to call window function */ + aclresult = pg_proc_aclcheck(wfunc->winfnoid, GetUserId(), + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FUNCTION, + get_func_name(wfunc->winfnoid)); + InvokeFunctionExecuteHook(wfunc->winfnoid); + + /* Fill in the perfuncstate data */ + perfuncstate->wfuncstate = wfuncstate; + perfuncstate->wfunc = wfunc; + perfuncstate->numArguments = list_length(wfuncstate->args); + perfuncstate->winCollation = wfunc->inputcollid; + + get_typlenbyval(wfunc->wintype, + &perfuncstate->resulttypeLen, + &perfuncstate->resulttypeByVal); + + /* + * If it's really just a plain aggregate function, we'll emulate the + * Agg environment for it. + */ + perfuncstate->plain_agg = wfunc->winagg; + if (wfunc->winagg) + { + WindowStatePerAgg peraggstate; + + perfuncstate->aggno = ++aggno; + peraggstate = &winstate->peragg[aggno]; + initialize_peragg(winstate, wfunc, peraggstate); + peraggstate->wfuncno = wfuncno; + } + else + { + WindowObject winobj = makeNode(WindowObjectData); + + winobj->winstate = winstate; + winobj->argstates = wfuncstate->args; + winobj->localmem = NULL; + perfuncstate->winobj = winobj; + + /* It's a real window function, so set up to call it. */ + fmgr_info_cxt(wfunc->winfnoid, &perfuncstate->flinfo, + econtext->ecxt_per_query_memory); + fmgr_info_set_expr((Node *) wfunc, &perfuncstate->flinfo); + } + } + + /* Update numfuncs, numaggs to match number of unique functions found */ + winstate->numfuncs = wfuncno + 1; + winstate->numaggs = aggno + 1; + + /* Set up WindowObject for aggregates, if needed */ + if (winstate->numaggs > 0) + { + WindowObject agg_winobj = makeNode(WindowObjectData); + + agg_winobj->winstate = winstate; + agg_winobj->argstates = NIL; + agg_winobj->localmem = NULL; + /* make sure markptr = -1 to invalidate. It may not get used */ + agg_winobj->markptr = -1; + agg_winobj->readptr = -1; + winstate->agg_winobj = agg_winobj; + } + + /* copy frame options to state node for easy access */ + winstate->frameOptions = frameOptions; + + /* initialize frame bound offset expressions */ + winstate->startOffset = ExecInitExpr((Expr *) node->startOffset, + (PlanState *) winstate); + winstate->endOffset = ExecInitExpr((Expr *) node->endOffset, + (PlanState *) winstate); + + /* Lookup in_range support functions if needed */ + if (OidIsValid(node->startInRangeFunc)) + fmgr_info(node->startInRangeFunc, &winstate->startInRangeFunc); + if (OidIsValid(node->endInRangeFunc)) + fmgr_info(node->endInRangeFunc, &winstate->endInRangeFunc); + winstate->inRangeColl = node->inRangeColl; + winstate->inRangeAsc = node->inRangeAsc; + winstate->inRangeNullsFirst = node->inRangeNullsFirst; + + winstate->all_first = true; + winstate->partition_spooled = false; + winstate->more_partitions = false; + + return winstate; +} + +/* ----------------- + * ExecEndWindowAgg + * ----------------- + */ +void +ExecEndWindowAgg(WindowAggState *node) +{ + PlanState *outerPlan; + int i; + + release_partition(node); + + ExecClearTuple(node->ss.ss_ScanTupleSlot); + ExecClearTuple(node->first_part_slot); + ExecClearTuple(node->agg_row_slot); + ExecClearTuple(node->temp_slot_1); + ExecClearTuple(node->temp_slot_2); + if (node->framehead_slot) + ExecClearTuple(node->framehead_slot); + if (node->frametail_slot) + ExecClearTuple(node->frametail_slot); + + /* + * Free both the expr contexts. + */ + ExecFreeExprContext(&node->ss.ps); + node->ss.ps.ps_ExprContext = node->tmpcontext; + ExecFreeExprContext(&node->ss.ps); + + for (i = 0; i < node->numaggs; i++) + { + if (node->peragg[i].aggcontext != node->aggcontext) + MemoryContextDelete(node->peragg[i].aggcontext); + } + MemoryContextDelete(node->partcontext); + MemoryContextDelete(node->aggcontext); + + pfree(node->perfunc); + pfree(node->peragg); + + outerPlan = outerPlanState(node); + ExecEndNode(outerPlan); +} + +/* ----------------- + * ExecReScanWindowAgg + * ----------------- + */ +void +ExecReScanWindowAgg(WindowAggState *node) +{ + PlanState *outerPlan = outerPlanState(node); + ExprContext *econtext = node->ss.ps.ps_ExprContext; + + node->all_done = false; + node->all_first = true; + + /* release tuplestore et al */ + release_partition(node); + + /* release all temp tuples, but especially first_part_slot */ + ExecClearTuple(node->ss.ss_ScanTupleSlot); + ExecClearTuple(node->first_part_slot); + ExecClearTuple(node->agg_row_slot); + ExecClearTuple(node->temp_slot_1); + ExecClearTuple(node->temp_slot_2); + if (node->framehead_slot) + ExecClearTuple(node->framehead_slot); + if (node->frametail_slot) + ExecClearTuple(node->frametail_slot); + + /* Forget current wfunc values */ + MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numfuncs); + MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numfuncs); + + /* + * if chgParam of subnode is not null then plan will be re-scanned by + * first ExecProcNode. + */ + if (outerPlan->chgParam == NULL) + ExecReScan(outerPlan); +} + +/* + * initialize_peragg + * + * Almost same as in nodeAgg.c, except we don't support DISTINCT currently. + */ +static WindowStatePerAggData * +initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc, + WindowStatePerAgg peraggstate) +{ + Oid inputTypes[FUNC_MAX_ARGS]; + int numArguments; + HeapTuple aggTuple; + Form_pg_aggregate aggform; + Oid aggtranstype; + AttrNumber initvalAttNo; + AclResult aclresult; + bool use_ma_code; + Oid transfn_oid, + invtransfn_oid, + finalfn_oid; + bool finalextra; + char finalmodify; + Expr *transfnexpr, + *invtransfnexpr, + *finalfnexpr; + Datum textInitVal; + int i; + ListCell *lc; + + numArguments = list_length(wfunc->args); + + i = 0; + foreach(lc, wfunc->args) + { + inputTypes[i++] = exprType((Node *) lfirst(lc)); + } + + aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(wfunc->winfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + wfunc->winfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + + /* + * Figure out whether we want to use the moving-aggregate implementation, + * and collect the right set of fields from the pg_attribute entry. + * + * It's possible that an aggregate would supply a safe moving-aggregate + * implementation and an unsafe normal one, in which case our hand is + * forced. Otherwise, if the frame head can't move, we don't need + * moving-aggregate code. Even if we'd like to use it, don't do so if the + * aggregate's arguments (and FILTER clause if any) contain any calls to + * volatile functions. Otherwise, the difference between restarting and + * not restarting the aggregation would be user-visible. + */ + if (!OidIsValid(aggform->aggminvtransfn)) + use_ma_code = false; /* sine qua non */ + else if (aggform->aggmfinalmodify == AGGMODIFY_READ_ONLY && + aggform->aggfinalmodify != AGGMODIFY_READ_ONLY) + use_ma_code = true; /* decision forced by safety */ + else if (winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) + use_ma_code = false; /* non-moving frame head */ + else if (contain_volatile_functions((Node *) wfunc)) + use_ma_code = false; /* avoid possible behavioral change */ + else + use_ma_code = true; /* yes, let's use it */ + if (use_ma_code) + { + peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn; + peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn; + peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn; + finalextra = aggform->aggmfinalextra; + finalmodify = aggform->aggmfinalmodify; + aggtranstype = aggform->aggmtranstype; + initvalAttNo = Anum_pg_aggregate_aggminitval; + } + else + { + peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn; + peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid; + peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn; + finalextra = aggform->aggfinalextra; + finalmodify = aggform->aggfinalmodify; + aggtranstype = aggform->aggtranstype; + initvalAttNo = Anum_pg_aggregate_agginitval; + } + + /* + * ExecInitWindowAgg already checked permission to call aggregate function + * ... but we still need to check the component functions + */ + + /* Check that aggregate owner has permission to call component fns */ + { + HeapTuple procTuple; + Oid aggOwner; + + procTuple = SearchSysCache1(PROCOID, + ObjectIdGetDatum(wfunc->winfnoid)); + if (!HeapTupleIsValid(procTuple)) + elog(ERROR, "cache lookup failed for function %u", + wfunc->winfnoid); + aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner; + ReleaseSysCache(procTuple); + + aclresult = pg_proc_aclcheck(transfn_oid, aggOwner, + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FUNCTION, + get_func_name(transfn_oid)); + InvokeFunctionExecuteHook(transfn_oid); + + if (OidIsValid(invtransfn_oid)) + { + aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner, + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FUNCTION, + get_func_name(invtransfn_oid)); + InvokeFunctionExecuteHook(invtransfn_oid); + } + + if (OidIsValid(finalfn_oid)) + { + aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner, + ACL_EXECUTE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FUNCTION, + get_func_name(finalfn_oid)); + InvokeFunctionExecuteHook(finalfn_oid); + } + } + + /* + * If the selected finalfn isn't read-only, we can't run this aggregate as + * a window function. This is a user-facing error, so we take a bit more + * care with the error message than elsewhere in this function. + */ + if (finalmodify != AGGMODIFY_READ_ONLY) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("aggregate function %s does not support use as a window function", + format_procedure(wfunc->winfnoid)))); + + /* Detect how many arguments to pass to the finalfn */ + if (finalextra) + peraggstate->numFinalArgs = numArguments + 1; + else + peraggstate->numFinalArgs = 1; + + /* resolve actual type of transition state, if polymorphic */ + aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid, + aggtranstype, + inputTypes, + numArguments); + + /* build expression trees using actual argument & result types */ + build_aggregate_transfn_expr(inputTypes, + numArguments, + 0, /* no ordered-set window functions yet */ + false, /* no variadic window functions yet */ + aggtranstype, + wfunc->inputcollid, + transfn_oid, + invtransfn_oid, + &transfnexpr, + &invtransfnexpr); + + /* set up infrastructure for calling the transfn(s) and finalfn */ + fmgr_info(transfn_oid, &peraggstate->transfn); + fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn); + + if (OidIsValid(invtransfn_oid)) + { + fmgr_info(invtransfn_oid, &peraggstate->invtransfn); + fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn); + } + + if (OidIsValid(finalfn_oid)) + { + build_aggregate_finalfn_expr(inputTypes, + peraggstate->numFinalArgs, + aggtranstype, + wfunc->wintype, + wfunc->inputcollid, + finalfn_oid, + &finalfnexpr); + fmgr_info(finalfn_oid, &peraggstate->finalfn); + fmgr_info_set_expr((Node *) finalfnexpr, &peraggstate->finalfn); + } + + /* get info about relevant datatypes */ + get_typlenbyval(wfunc->wintype, + &peraggstate->resulttypeLen, + &peraggstate->resulttypeByVal); + get_typlenbyval(aggtranstype, + &peraggstate->transtypeLen, + &peraggstate->transtypeByVal); + + /* + * initval is potentially null, so don't try to access it as a struct + * field. Must do it the hard way with SysCacheGetAttr. + */ + textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo, + &peraggstate->initValueIsNull); + + if (peraggstate->initValueIsNull) + peraggstate->initValue = (Datum) 0; + else + peraggstate->initValue = GetAggInitVal(textInitVal, + aggtranstype); + + /* + * If the transfn is strict and the initval is NULL, make sure input type + * and transtype are the same (or at least binary-compatible), so that + * it's OK to use the first input value as the initial transValue. This + * should have been checked at agg definition time, but we must check + * again in case the transfn's strictness property has been changed. + */ + if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull) + { + if (numArguments < 1 || + !IsBinaryCoercible(inputTypes[0], aggtranstype)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("aggregate %u needs to have compatible input type and transition type", + wfunc->winfnoid))); + } + + /* + * Insist that forward and inverse transition functions have the same + * strictness setting. Allowing them to differ would require handling + * more special cases in advance_windowaggregate and + * advance_windowaggregate_base, for no discernible benefit. This should + * have been checked at agg definition time, but we must check again in + * case either function's strictness property has been changed. + */ + if (OidIsValid(invtransfn_oid) && + peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), + errmsg("strictness of aggregate's forward and inverse transition functions must match"))); + + /* + * Moving aggregates use their own aggcontext. + * + * This is necessary because they might restart at different times, so we + * might never be able to reset the shared context otherwise. We can't + * make it the aggregates' responsibility to clean up after themselves, + * because strict aggregates must be restarted whenever we remove their + * last non-NULL input, which the aggregate won't be aware is happening. + * Also, just pfree()ing the transValue upon restarting wouldn't help, + * since we'd miss any indirectly referenced data. We could, in theory, + * make the memory allocation rules for moving aggregates different than + * they have historically been for plain aggregates, but that seems grotty + * and likely to lead to memory leaks. + */ + if (OidIsValid(invtransfn_oid)) + peraggstate->aggcontext = + AllocSetContextCreate(CurrentMemoryContext, + "WindowAgg Per Aggregate", + ALLOCSET_DEFAULT_SIZES); + else + peraggstate->aggcontext = winstate->aggcontext; + + ReleaseSysCache(aggTuple); + + return peraggstate; +} + +static Datum +GetAggInitVal(Datum textInitVal, Oid transtype) +{ + Oid typinput, + typioparam; + char *strInitVal; + Datum initVal; + + getTypeInputInfo(transtype, &typinput, &typioparam); + strInitVal = TextDatumGetCString(textInitVal); + initVal = OidInputFunctionCall(typinput, strInitVal, + typioparam, -1); + pfree(strInitVal); + return initVal; +} + +/* + * are_peers + * compare two rows to see if they are equal according to the ORDER BY clause + * + * NB: this does not consider the window frame mode. + */ +static bool +are_peers(WindowAggState *winstate, TupleTableSlot *slot1, + TupleTableSlot *slot2) +{ + WindowAgg *node = (WindowAgg *) winstate->ss.ps.plan; + ExprContext *econtext = winstate->tmpcontext; + + /* If no ORDER BY, all rows are peers with each other */ + if (node->ordNumCols == 0) + return true; + + econtext->ecxt_outertuple = slot1; + econtext->ecxt_innertuple = slot2; + return ExecQualAndReset(winstate->ordEqfunction, econtext); +} + +/* + * window_gettupleslot + * Fetch the pos'th tuple of the current partition into the slot, + * using the winobj's read pointer + * + * Returns true if successful, false if no such row + */ +static bool +window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot) +{ + WindowAggState *winstate = winobj->winstate; + MemoryContext oldcontext; + + /* often called repeatedly in a row */ + CHECK_FOR_INTERRUPTS(); + + /* Don't allow passing -1 to spool_tuples here */ + if (pos < 0) + return false; + + /* If necessary, fetch the tuple into the spool */ + spool_tuples(winstate, pos); + + if (pos >= winstate->spooled_rows) + return false; + + if (pos < winobj->markpos) + elog(ERROR, "cannot fetch row before WindowObject's mark position"); + + oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); + + tuplestore_select_read_pointer(winstate->buffer, winobj->readptr); + + /* + * Advance or rewind until we are within one tuple of the one we want. + */ + if (winobj->seekpos < pos - 1) + { + if (!tuplestore_skiptuples(winstate->buffer, + pos - 1 - winobj->seekpos, + true)) + elog(ERROR, "unexpected end of tuplestore"); + winobj->seekpos = pos - 1; + } + else if (winobj->seekpos > pos + 1) + { + if (!tuplestore_skiptuples(winstate->buffer, + winobj->seekpos - (pos + 1), + false)) + elog(ERROR, "unexpected end of tuplestore"); + winobj->seekpos = pos + 1; + } + else if (winobj->seekpos == pos) + { + /* + * There's no API to refetch the tuple at the current position. We + * have to move one tuple forward, and then one backward. (We don't + * do it the other way because we might try to fetch the row before + * our mark, which isn't allowed.) XXX this case could stand to be + * optimized. + */ + tuplestore_advance(winstate->buffer, true); + winobj->seekpos++; + } + + /* + * Now we should be on the tuple immediately before or after the one we + * want, so just fetch forwards or backwards as appropriate. + */ + if (winobj->seekpos > pos) + { + if (!tuplestore_gettupleslot(winstate->buffer, false, true, slot)) + elog(ERROR, "unexpected end of tuplestore"); + winobj->seekpos--; + } + else + { + if (!tuplestore_gettupleslot(winstate->buffer, true, true, slot)) + elog(ERROR, "unexpected end of tuplestore"); + winobj->seekpos++; + } + + Assert(winobj->seekpos == pos); + + MemoryContextSwitchTo(oldcontext); + + return true; +} + + +/*********************************************************************** + * API exposed to window functions + ***********************************************************************/ + + +/* + * WinGetPartitionLocalMemory + * Get working memory that lives till end of partition processing + * + * On first call within a given partition, this allocates and zeroes the + * requested amount of space. Subsequent calls just return the same chunk. + * + * Memory obtained this way is normally used to hold state that should be + * automatically reset for each new partition. If a window function wants + * to hold state across the whole query, fcinfo->fn_extra can be used in the + * usual way for that. + */ +void * +WinGetPartitionLocalMemory(WindowObject winobj, Size sz) +{ + Assert(WindowObjectIsValid(winobj)); + if (winobj->localmem == NULL) + winobj->localmem = + MemoryContextAllocZero(winobj->winstate->partcontext, sz); + return winobj->localmem; +} + +/* + * WinGetCurrentPosition + * Return the current row's position (counting from 0) within the current + * partition. + */ +int64 +WinGetCurrentPosition(WindowObject winobj) +{ + Assert(WindowObjectIsValid(winobj)); + return winobj->winstate->currentpos; +} + +/* + * WinGetPartitionRowCount + * Return total number of rows contained in the current partition. + * + * Note: this is a relatively expensive operation because it forces the + * whole partition to be "spooled" into the tuplestore at once. Once + * executed, however, additional calls within the same partition are cheap. + */ +int64 +WinGetPartitionRowCount(WindowObject winobj) +{ + Assert(WindowObjectIsValid(winobj)); + spool_tuples(winobj->winstate, -1); + return winobj->winstate->spooled_rows; +} + +/* + * WinSetMarkPosition + * Set the "mark" position for the window object, which is the oldest row + * number (counting from 0) it is allowed to fetch during all subsequent + * operations within the current partition. + * + * Window functions do not have to call this, but are encouraged to move the + * mark forward when possible to keep the tuplestore size down and prevent + * having to spill rows to disk. + */ +void +WinSetMarkPosition(WindowObject winobj, int64 markpos) +{ + WindowAggState *winstate; + + Assert(WindowObjectIsValid(winobj)); + winstate = winobj->winstate; + + if (markpos < winobj->markpos) + elog(ERROR, "cannot move WindowObject's mark position backward"); + tuplestore_select_read_pointer(winstate->buffer, winobj->markptr); + if (markpos > winobj->markpos) + { + tuplestore_skiptuples(winstate->buffer, + markpos - winobj->markpos, + true); + winobj->markpos = markpos; + } + tuplestore_select_read_pointer(winstate->buffer, winobj->readptr); + if (markpos > winobj->seekpos) + { + tuplestore_skiptuples(winstate->buffer, + markpos - winobj->seekpos, + true); + winobj->seekpos = markpos; + } +} + +/* + * WinRowsArePeers + * Compare two rows (specified by absolute position in partition) to see + * if they are equal according to the ORDER BY clause. + * + * NB: this does not consider the window frame mode. + */ +bool +WinRowsArePeers(WindowObject winobj, int64 pos1, int64 pos2) +{ + WindowAggState *winstate; + WindowAgg *node; + TupleTableSlot *slot1; + TupleTableSlot *slot2; + bool res; + + Assert(WindowObjectIsValid(winobj)); + winstate = winobj->winstate; + node = (WindowAgg *) winstate->ss.ps.plan; + + /* If no ORDER BY, all rows are peers; don't bother to fetch them */ + if (node->ordNumCols == 0) + return true; + + /* + * Note: OK to use temp_slot_2 here because we aren't calling any + * frame-related functions (those tend to clobber temp_slot_2). + */ + slot1 = winstate->temp_slot_1; + slot2 = winstate->temp_slot_2; + + if (!window_gettupleslot(winobj, pos1, slot1)) + elog(ERROR, "specified position is out of window: " INT64_FORMAT, + pos1); + if (!window_gettupleslot(winobj, pos2, slot2)) + elog(ERROR, "specified position is out of window: " INT64_FORMAT, + pos2); + + res = are_peers(winstate, slot1, slot2); + + ExecClearTuple(slot1); + ExecClearTuple(slot2); + + return res; +} + +/* + * WinGetFuncArgInPartition + * Evaluate a window function's argument expression on a specified + * row of the partition. The row is identified in lseek(2) style, + * i.e. relative to the current, first, or last row. + * + * argno: argument number to evaluate (counted from 0) + * relpos: signed rowcount offset from the seek position + * seektype: WINDOW_SEEK_CURRENT, WINDOW_SEEK_HEAD, or WINDOW_SEEK_TAIL + * set_mark: If the row is found and set_mark is true, the mark is moved to + * the row as a side-effect. + * isnull: output argument, receives isnull status of result + * isout: output argument, set to indicate whether target row position + * is out of partition (can pass NULL if caller doesn't care about this) + * + * Specifying a nonexistent row is not an error, it just causes a null result + * (plus setting *isout true, if isout isn't NULL). + */ +Datum +WinGetFuncArgInPartition(WindowObject winobj, int argno, + int relpos, int seektype, bool set_mark, + bool *isnull, bool *isout) +{ + WindowAggState *winstate; + ExprContext *econtext; + TupleTableSlot *slot; + bool gottuple; + int64 abs_pos; + + Assert(WindowObjectIsValid(winobj)); + winstate = winobj->winstate; + econtext = winstate->ss.ps.ps_ExprContext; + slot = winstate->temp_slot_1; + + switch (seektype) + { + case WINDOW_SEEK_CURRENT: + abs_pos = winstate->currentpos + relpos; + break; + case WINDOW_SEEK_HEAD: + abs_pos = relpos; + break; + case WINDOW_SEEK_TAIL: + spool_tuples(winstate, -1); + abs_pos = winstate->spooled_rows - 1 + relpos; + break; + default: + elog(ERROR, "unrecognized window seek type: %d", seektype); + abs_pos = 0; /* keep compiler quiet */ + break; + } + + gottuple = window_gettupleslot(winobj, abs_pos, slot); + + if (!gottuple) + { + if (isout) + *isout = true; + *isnull = true; + return (Datum) 0; + } + else + { + if (isout) + *isout = false; + if (set_mark) + WinSetMarkPosition(winobj, abs_pos); + econtext->ecxt_outertuple = slot; + return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), + econtext, isnull); + } +} + +/* + * WinGetFuncArgInFrame + * Evaluate a window function's argument expression on a specified + * row of the window frame. The row is identified in lseek(2) style, + * i.e. relative to the first or last row of the frame. (We do not + * support WINDOW_SEEK_CURRENT here, because it's not very clear what + * that should mean if the current row isn't part of the frame.) + * + * argno: argument number to evaluate (counted from 0) + * relpos: signed rowcount offset from the seek position + * seektype: WINDOW_SEEK_HEAD or WINDOW_SEEK_TAIL + * set_mark: If the row is found/in frame and set_mark is true, the mark is + * moved to the row as a side-effect. + * isnull: output argument, receives isnull status of result + * isout: output argument, set to indicate whether target row position + * is out of frame (can pass NULL if caller doesn't care about this) + * + * Specifying a nonexistent or not-in-frame row is not an error, it just + * causes a null result (plus setting *isout true, if isout isn't NULL). + * + * Note that some exclusion-clause options lead to situations where the + * rows that are in-frame are not consecutive in the partition. But we + * count only in-frame rows when measuring relpos. + * + * The set_mark flag is interpreted as meaning that the caller will specify + * a constant (or, perhaps, monotonically increasing) relpos in successive + * calls, so that *if there is no exclusion clause* there will be no need + * to fetch a row before the previously fetched row. But we do not expect + * the caller to know how to account for exclusion clauses. Therefore, + * if there is an exclusion clause we take responsibility for adjusting the + * mark request to something that will be safe given the above assumption + * about relpos. + */ +Datum +WinGetFuncArgInFrame(WindowObject winobj, int argno, + int relpos, int seektype, bool set_mark, + bool *isnull, bool *isout) +{ + WindowAggState *winstate; + ExprContext *econtext; + TupleTableSlot *slot; + int64 abs_pos; + int64 mark_pos; + + Assert(WindowObjectIsValid(winobj)); + winstate = winobj->winstate; + econtext = winstate->ss.ps.ps_ExprContext; + slot = winstate->temp_slot_1; + + switch (seektype) + { + case WINDOW_SEEK_CURRENT: + elog(ERROR, "WINDOW_SEEK_CURRENT is not supported for WinGetFuncArgInFrame"); + abs_pos = mark_pos = 0; /* keep compiler quiet */ + break; + case WINDOW_SEEK_HEAD: + /* rejecting relpos < 0 is easy and simplifies code below */ + if (relpos < 0) + goto out_of_frame; + update_frameheadpos(winstate); + abs_pos = winstate->frameheadpos + relpos; + mark_pos = abs_pos; + + /* + * Account for exclusion option if one is active, but advance only + * abs_pos not mark_pos. This prevents changes of the current + * row's peer group from resulting in trying to fetch a row before + * some previous mark position. + * + * Note that in some corner cases such as current row being + * outside frame, these calculations are theoretically too simple, + * but it doesn't matter because we'll end up deciding the row is + * out of frame. We do not attempt to avoid fetching rows past + * end of frame; that would happen in some cases anyway. + */ + switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION) + { + case 0: + /* no adjustment needed */ + break; + case FRAMEOPTION_EXCLUDE_CURRENT_ROW: + if (abs_pos >= winstate->currentpos && + winstate->currentpos >= winstate->frameheadpos) + abs_pos++; + break; + case FRAMEOPTION_EXCLUDE_GROUP: + update_grouptailpos(winstate); + if (abs_pos >= winstate->groupheadpos && + winstate->grouptailpos > winstate->frameheadpos) + { + int64 overlapstart = Max(winstate->groupheadpos, + winstate->frameheadpos); + + abs_pos += winstate->grouptailpos - overlapstart; + } + break; + case FRAMEOPTION_EXCLUDE_TIES: + update_grouptailpos(winstate); + if (abs_pos >= winstate->groupheadpos && + winstate->grouptailpos > winstate->frameheadpos) + { + int64 overlapstart = Max(winstate->groupheadpos, + winstate->frameheadpos); + + if (abs_pos == overlapstart) + abs_pos = winstate->currentpos; + else + abs_pos += winstate->grouptailpos - overlapstart - 1; + } + break; + default: + elog(ERROR, "unrecognized frame option state: 0x%x", + winstate->frameOptions); + break; + } + break; + case WINDOW_SEEK_TAIL: + /* rejecting relpos > 0 is easy and simplifies code below */ + if (relpos > 0) + goto out_of_frame; + update_frametailpos(winstate); + abs_pos = winstate->frametailpos - 1 + relpos; + + /* + * Account for exclusion option if one is active. If there is no + * exclusion, we can safely set the mark at the accessed row. But + * if there is, we can only mark the frame start, because we can't + * be sure how far back in the frame the exclusion might cause us + * to fetch in future. Furthermore, we have to actually check + * against frameheadpos here, since it's unsafe to try to fetch a + * row before frame start if the mark might be there already. + */ + switch (winstate->frameOptions & FRAMEOPTION_EXCLUSION) + { + case 0: + /* no adjustment needed */ + mark_pos = abs_pos; + break; + case FRAMEOPTION_EXCLUDE_CURRENT_ROW: + if (abs_pos <= winstate->currentpos && + winstate->currentpos < winstate->frametailpos) + abs_pos--; + update_frameheadpos(winstate); + if (abs_pos < winstate->frameheadpos) + goto out_of_frame; + mark_pos = winstate->frameheadpos; + break; + case FRAMEOPTION_EXCLUDE_GROUP: + update_grouptailpos(winstate); + if (abs_pos < winstate->grouptailpos && + winstate->groupheadpos < winstate->frametailpos) + { + int64 overlapend = Min(winstate->grouptailpos, + winstate->frametailpos); + + abs_pos -= overlapend - winstate->groupheadpos; + } + update_frameheadpos(winstate); + if (abs_pos < winstate->frameheadpos) + goto out_of_frame; + mark_pos = winstate->frameheadpos; + break; + case FRAMEOPTION_EXCLUDE_TIES: + update_grouptailpos(winstate); + if (abs_pos < winstate->grouptailpos && + winstate->groupheadpos < winstate->frametailpos) + { + int64 overlapend = Min(winstate->grouptailpos, + winstate->frametailpos); + + if (abs_pos == overlapend - 1) + abs_pos = winstate->currentpos; + else + abs_pos -= overlapend - 1 - winstate->groupheadpos; + } + update_frameheadpos(winstate); + if (abs_pos < winstate->frameheadpos) + goto out_of_frame; + mark_pos = winstate->frameheadpos; + break; + default: + elog(ERROR, "unrecognized frame option state: 0x%x", + winstate->frameOptions); + mark_pos = 0; /* keep compiler quiet */ + break; + } + break; + default: + elog(ERROR, "unrecognized window seek type: %d", seektype); + abs_pos = mark_pos = 0; /* keep compiler quiet */ + break; + } + + if (!window_gettupleslot(winobj, abs_pos, slot)) + goto out_of_frame; + + /* The code above does not detect all out-of-frame cases, so check */ + if (row_is_in_frame(winstate, abs_pos, slot) <= 0) + goto out_of_frame; + + if (isout) + *isout = false; + if (set_mark) + WinSetMarkPosition(winobj, mark_pos); + econtext->ecxt_outertuple = slot; + return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), + econtext, isnull); + +out_of_frame: + if (isout) + *isout = true; + *isnull = true; + return (Datum) 0; +} + +/* + * WinGetFuncArgCurrent + * Evaluate a window function's argument expression on the current row. + * + * argno: argument number to evaluate (counted from 0) + * isnull: output argument, receives isnull status of result + * + * Note: this isn't quite equivalent to WinGetFuncArgInPartition or + * WinGetFuncArgInFrame targeting the current row, because it will succeed + * even if the WindowObject's mark has been set beyond the current row. + * This should generally be used for "ordinary" arguments of a window + * function, such as the offset argument of lead() or lag(). + */ +Datum +WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull) +{ + WindowAggState *winstate; + ExprContext *econtext; + + Assert(WindowObjectIsValid(winobj)); + winstate = winobj->winstate; + + econtext = winstate->ss.ps.ps_ExprContext; + + econtext->ecxt_outertuple = winstate->ss.ss_ScanTupleSlot; + return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), + econtext, isnull); +} |