/*------------------------------------------------------------------------- * * nodeMemoize.c * Routines to handle caching of results from parameterized nodes * * Portions Copyright (c) 2021, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * src/backend/executor/nodeMemoize.c * * Memoize nodes are intended to sit above parameterized nodes in the plan * tree in order to cache results from them. The intention here is that a * repeat scan with a parameter value that has already been seen by the node * can fetch tuples from the cache rather than having to re-scan the outer * node all over again. The query planner may choose to make use of one of * these when it thinks rescans for previously seen values are likely enough * to warrant adding the additional node. * * The method of cache we use is a hash table. When the cache fills, we never * spill tuples to disk, instead, we choose to evict the least recently used * cache entry from the cache. We remember the least recently used entry by * always pushing new entries and entries we look for onto the tail of a * doubly linked list. This means that older items always bubble to the top * of this LRU list. * * Sometimes our callers won't run their scans to completion. For example a * semi-join only needs to run until it finds a matching tuple, and once it * does, the join operator skips to the next outer tuple and does not execute * the inner side again on that scan. Because of this, we must keep track of * when a cache entry is complete, and by default, we know it is when we run * out of tuples to read during the scan. However, there are cases where we * can mark the cache entry as complete without exhausting the scan of all * tuples. One case is unique joins, where the join operator knows that there * will only be at most one match for any given outer tuple. In order to * support such cases we allow the "singlerow" option to be set for the cache. * This option marks the cache entry as complete after we read the first tuple * from the subnode. * * It's possible when we're filling the cache for a given set of parameters * that we're unable to free enough memory to store any more tuples. If this * happens then we'll have already evicted all other cache entries. When * caching another tuple would cause us to exceed our memory budget, we must * free the entry that we're currently populating and move the state machine * into MEMO_CACHE_BYPASS_MODE. This means that we'll not attempt to cache * any further tuples for this particular scan. We don't have the memory for * it. The state machine will be reset again on the next rescan. If the * memory requirements to cache the next parameter's tuples are less * demanding, then that may allow us to start putting useful entries back into * the cache again. * * * INTERFACE ROUTINES * ExecMemoize - lookup cache, exec subplan when not found * ExecInitMemoize - initialize node and subnodes * ExecEndMemoize - shutdown node and subnodes * ExecReScanMemoize - rescan the memoize node * * ExecMemoizeEstimate estimates DSM space needed for parallel plan * ExecMemoizeInitializeDSM initialize DSM for parallel plan * ExecMemoizeInitializeWorker attach to DSM info in parallel worker * ExecMemoizeRetrieveInstrumentation get instrumentation from worker *------------------------------------------------------------------------- */ #include "postgres.h" #include "common/hashfn.h" #include "executor/executor.h" #include "executor/nodeMemoize.h" #include "lib/ilist.h" #include "miscadmin.h" #include "utils/datum.h" #include "utils/lsyscache.h" /* States of the ExecMemoize state machine */ #define MEMO_CACHE_LOOKUP 1 /* Attempt to perform a cache lookup */ #define MEMO_CACHE_FETCH_NEXT_TUPLE 2 /* Get another tuple from the cache */ #define MEMO_FILLING_CACHE 3 /* Read outer node to fill cache */ #define MEMO_CACHE_BYPASS_MODE 4 /* Bypass mode. Just read from our * subplan without caching anything */ #define MEMO_END_OF_SCAN 5 /* Ready for rescan */ /* Helper macros for memory accounting */ #define EMPTY_ENTRY_MEMORY_BYTES(e) (sizeof(MemoizeEntry) + \ sizeof(MemoizeKey) + \ (e)->key->params->t_len); #define CACHE_TUPLE_BYTES(t) (sizeof(MemoizeTuple) + \ (t)->mintuple->t_len) /* MemoizeTuple Stores an individually cached tuple */ typedef struct MemoizeTuple { MinimalTuple mintuple; /* Cached tuple */ struct MemoizeTuple *next; /* The next tuple with the same parameter * values or NULL if it's the last one */ } MemoizeTuple; /* * MemoizeKey * The hash table key for cached entries plus the LRU list link */ typedef struct MemoizeKey { MinimalTuple params; dlist_node lru_node; /* Pointer to next/prev key in LRU list */ } MemoizeKey; /* * MemoizeEntry * The data struct that the cache hash table stores */ typedef struct MemoizeEntry { MemoizeKey *key; /* Hash key for hash table lookups */ MemoizeTuple *tuplehead; /* Pointer to the first tuple or NULL if * no tuples are cached for this entry */ uint32 hash; /* Hash value (cached) */ char status; /* Hash status */ bool complete; /* Did we read the outer plan to completion? */ } MemoizeEntry; #define SH_PREFIX memoize #define SH_ELEMENT_TYPE MemoizeEntry #define SH_KEY_TYPE MemoizeKey * #define SH_SCOPE static inline #define SH_DECLARE #include "lib/simplehash.h" static uint32 MemoizeHash_hash(struct memoize_hash *tb, const MemoizeKey *key); static bool MemoizeHash_equal(struct memoize_hash *tb, const MemoizeKey *params1, const MemoizeKey *params2); #define SH_PREFIX memoize #define SH_ELEMENT_TYPE MemoizeEntry #define SH_KEY_TYPE MemoizeKey * #define SH_KEY key #define SH_HASH_KEY(tb, key) MemoizeHash_hash(tb, key) #define SH_EQUAL(tb, a, b) MemoizeHash_equal(tb, a, b) #define SH_SCOPE static inline #define SH_STORE_HASH #define SH_GET_HASH(tb, a) a->hash #define SH_DEFINE #include "lib/simplehash.h" /* * MemoizeHash_hash * Hash function for simplehash hashtable. 'key' is unused here as we * require that all table lookups first populate the MemoizeState's * probeslot with the key values to be looked up. */ static uint32 MemoizeHash_hash(struct memoize_hash *tb, const MemoizeKey *key) { MemoizeState *mstate = (MemoizeState *) tb->private_data; TupleTableSlot *pslot = mstate->probeslot; uint32 hashkey = 0; int numkeys = mstate->nkeys; if (mstate->binary_mode) { for (int i = 0; i < numkeys; i++) { /* rotate hashkey left 1 bit at each step */ hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); if (!pslot->tts_isnull[i]) /* treat nulls as having hash key 0 */ { FormData_pg_attribute *attr; uint32 hkey; attr = &pslot->tts_tupleDescriptor->attrs[i]; hkey = datum_image_hash(pslot->tts_values[i], attr->attbyval, attr->attlen); hashkey ^= hkey; } } } else { FmgrInfo *hashfunctions = mstate->hashfunctions; Oid *collations = mstate->collations; for (int i = 0; i < numkeys; i++) { /* rotate hashkey left 1 bit at each step */ hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0); if (!pslot->tts_isnull[i]) /* treat nulls as having hash key 0 */ { uint32 hkey; hkey = DatumGetUInt32(FunctionCall1Coll(&hashfunctions[i], collations[i], pslot->tts_values[i])); hashkey ^= hkey; } } } return murmurhash32(hashkey); } /* * MemoizeHash_equal * Equality function for confirming hash value matches during a hash * table lookup. 'key2' is never used. Instead the MemoizeState's * probeslot is always populated with details of what's being looked up. */ static bool MemoizeHash_equal(struct memoize_hash *tb, const MemoizeKey *key1, const MemoizeKey *key2) { MemoizeState *mstate = (MemoizeState *) tb->private_data; ExprContext *econtext = mstate->ss.ps.ps_ExprContext; TupleTableSlot *tslot = mstate->tableslot; TupleTableSlot *pslot = mstate->probeslot; /* probeslot should have already been prepared by prepare_probe_slot() */ ExecStoreMinimalTuple(key1->params, tslot, false); if (mstate->binary_mode) { int numkeys = mstate->nkeys; slot_getallattrs(tslot); slot_getallattrs(pslot); for (int i = 0; i < numkeys; i++) { FormData_pg_attribute *attr; if (tslot->tts_isnull[i] != pslot->tts_isnull[i]) return false; /* both NULL? they're equal */ if (tslot->tts_isnull[i]) continue; /* perform binary comparison on the two datums */ attr = &tslot->tts_tupleDescriptor->attrs[i]; if (!datum_image_eq(tslot->tts_values[i], pslot->tts_values[i], attr->attbyval, attr->attlen)) return false; } return true; } else { econtext->ecxt_innertuple = tslot; econtext->ecxt_outertuple = pslot; return ExecQualAndReset(mstate->cache_eq_expr, econtext); } } /* * Initialize the hash table to empty. */ static void build_hash_table(MemoizeState *mstate, uint32 size) { /* Make a guess at a good size when we're not given a valid size. */ if (size == 0) size = 1024; /* memoize_create will convert the size to a power of 2 */ mstate->hashtable = memoize_create(mstate->tableContext, size, mstate); } /* * prepare_probe_slot * Populate mstate's probeslot with the values from the tuple stored * in 'key'. If 'key' is NULL, then perform the population by evaluating * mstate's param_exprs. */ static inline void prepare_probe_slot(MemoizeState *mstate, MemoizeKey *key) { TupleTableSlot *pslot = mstate->probeslot; TupleTableSlot *tslot = mstate->tableslot; int numKeys = mstate->nkeys; ExecClearTuple(pslot); if (key == NULL) { /* Set the probeslot's values based on the current parameter values */ for (int i = 0; i < numKeys; i++) pslot->tts_values[i] = ExecEvalExpr(mstate->param_exprs[i], mstate->ss.ps.ps_ExprContext, &pslot->tts_isnull[i]); } else { /* Process the key's MinimalTuple and store the values in probeslot */ ExecStoreMinimalTuple(key->params, tslot, false); slot_getallattrs(tslot); memcpy(pslot->tts_values, tslot->tts_values, sizeof(Datum) * numKeys); memcpy(pslot->tts_isnull, tslot->tts_isnull, sizeof(bool) * numKeys); } ExecStoreVirtualTuple(pslot); } /* * entry_purge_tuples * Remove all tuples from the cache entry pointed to by 'entry'. This * leaves an empty cache entry. Also, update the memory accounting to * reflect the removal of the tuples. */ static inline void entry_purge_tuples(MemoizeState *mstate, MemoizeEntry *entry) { MemoizeTuple *tuple = entry->tuplehead; uint64 freed_mem = 0; while (tuple != NULL) { MemoizeTuple *next = tuple->next; freed_mem += CACHE_TUPLE_BYTES(tuple); /* Free memory used for this tuple */ pfree(tuple->mintuple); pfree(tuple); tuple = next; } entry->complete = false; entry->tuplehead = NULL; /* Update the memory accounting */ mstate->mem_used -= freed_mem; } /* * remove_cache_entry * Remove 'entry' from the cache and free memory used by it. */ static void remove_cache_entry(MemoizeState *mstate, MemoizeEntry *entry) { MemoizeKey *key = entry->key; dlist_delete(&entry->key->lru_node); /* Remove all of the tuples from this entry */ entry_purge_tuples(mstate, entry); /* * Update memory accounting. entry_purge_tuples should have already * subtracted the memory used for each cached tuple. Here we just update * the amount used by the entry itself. */ mstate->mem_used -= EMPTY_ENTRY_MEMORY_BYTES(entry); /* Remove the entry from the cache */ memoize_delete_item(mstate->hashtable, entry); pfree(key->params); pfree(key); } /* * cache_purge_all * Remove all items from the cache */ static void cache_purge_all(MemoizeState *mstate) { uint64 evictions = mstate->hashtable->members; PlanState *pstate = (PlanState *) mstate; /* * Likely the most efficient way to remove all items is to just reset the * memory context for the cache and then rebuild a fresh hash table. This * saves having to remove each item one by one and pfree each cached tuple */ MemoryContextReset(mstate->tableContext); /* Make the hash table the same size as the original size */ build_hash_table(mstate, ((Memoize *) pstate->plan)->est_entries); /* reset the LRU list */ dlist_init(&mstate->lru_list); mstate->last_tuple = NULL; mstate->entry = NULL; mstate->mem_used = 0; /* XXX should we add something new to track these purges? */ mstate->stats.cache_evictions += evictions; /* Update Stats */ } /* * cache_reduce_memory * Evict older and less recently used items from the cache in order to * reduce the memory consumption back to something below the * MemoizeState's mem_limit. * * 'specialkey', if not NULL, causes the function to return false if the entry * which the key belongs to is removed from the cache. */ static bool cache_reduce_memory(MemoizeState *mstate, MemoizeKey *specialkey) { bool specialkey_intact = true; /* for now */ dlist_mutable_iter iter; uint64 evictions = 0; /* Update peak memory usage */ if (mstate->mem_used > mstate->stats.mem_peak) mstate->stats.mem_peak = mstate->mem_used; /* We expect only to be called when we've gone over budget on memory */ Assert(mstate->mem_used > mstate->mem_limit); /* Start the eviction process starting at the head of the LRU list. */ dlist_foreach_modify(iter, &mstate->lru_list) { MemoizeKey *key = dlist_container(MemoizeKey, lru_node, iter.cur); MemoizeEntry *entry; /* * Populate the hash probe slot in preparation for looking up this LRU * entry. */ prepare_probe_slot(mstate, key); /* * Ideally the LRU list pointers would be stored in the entry itself * rather than in the key. Unfortunately, we can't do that as the * simplehash.h code may resize the table and allocate new memory for * entries which would result in those pointers pointing to the old * buckets. However, it's fine to use the key to store this as that's * only referenced by a pointer in the entry, which of course follows * the entry whenever the hash table is resized. Since we only have a * pointer to the key here, we must perform a hash table lookup to * find the entry that the key belongs to. */ entry = memoize_lookup(mstate->hashtable, NULL); /* * Sanity check that we found the entry belonging to the LRU list * item. A misbehaving hash or equality function could cause the * entry not to be found or the wrong entry to be found. */ if (unlikely(entry == NULL || entry->key != key)) elog(ERROR, "could not find memoization table entry"); /* * If we're being called to free memory while the cache is being * populated with new tuples, then we'd better take some care as we * could end up freeing the entry which 'specialkey' belongs to. * Generally callers will pass 'specialkey' as the key for the cache * entry which is currently being populated, so we must set * 'specialkey_intact' to false to inform the caller the specialkey * entry has been removed. */ if (key == specialkey) specialkey_intact = false; /* * Finally remove the entry. This will remove from the LRU list too. */ remove_cache_entry(mstate, entry); evictions++; /* Exit if we've freed enough memory */ if (mstate->mem_used <= mstate->mem_limit) break; } mstate->stats.cache_evictions += evictions; /* Update Stats */ return specialkey_intact; } /* * cache_lookup * Perform a lookup to see if we've already cached tuples based on the * scan's current parameters. If we find an existing entry we move it to * the end of the LRU list, set *found to true then return it. If we * don't find an entry then we create a new one and add it to the end of * the LRU list. We also update cache memory accounting and remove older * entries if we go over the memory budget. If we managed to free enough * memory we return the new entry, else we return NULL. * * Callers can assume we'll never return NULL when *found is true. */ static MemoizeEntry * cache_lookup(MemoizeState *mstate, bool *found) { MemoizeKey *key; MemoizeEntry *entry; MemoryContext oldcontext; /* prepare the probe slot with the current scan parameters */ prepare_probe_slot(mstate, NULL); /* * Add the new entry to the cache. No need to pass a valid key since the * hash function uses mstate's probeslot, which we populated above. */ entry = memoize_insert(mstate->hashtable, NULL, found); if (*found) { /* * Move existing entry to the tail of the LRU list to mark it as the * most recently used item. */ dlist_move_tail(&mstate->lru_list, &entry->key->lru_node); return entry; } oldcontext = MemoryContextSwitchTo(mstate->tableContext); /* Allocate a new key */ entry->key = key = (MemoizeKey *) palloc(sizeof(MemoizeKey)); key->params = ExecCopySlotMinimalTuple(mstate->probeslot); /* Update the total cache memory utilization */ mstate->mem_used += EMPTY_ENTRY_MEMORY_BYTES(entry); /* Initialize this entry */ entry->complete = false; entry->tuplehead = NULL; /* * Since this is the most recently used entry, push this entry onto the * end of the LRU list. */ dlist_push_tail(&mstate->lru_list, &entry->key->lru_node); mstate->last_tuple = NULL; MemoryContextSwitchTo(oldcontext); /* * If we've gone over our memory budget, then we'll free up some space in * the cache. */ if (mstate->mem_used > mstate->mem_limit) { /* * Try to free up some memory. It's highly unlikely that we'll fail * to do so here since the entry we've just added is yet to contain * any tuples and we're able to remove any other entry to reduce the * memory consumption. */ if (unlikely(!cache_reduce_memory(mstate, key))) return NULL; /* * The process of removing entries from the cache may have caused the * code in simplehash.h to shuffle elements to earlier buckets in the * hash table. If it has, we'll need to find the entry again by * performing a lookup. Fortunately, we can detect if this has * happened by seeing if the entry is still in use and that the key * pointer matches our expected key. */ if (entry->status != memoize_SH_IN_USE || entry->key != key) { /* * We need to repopulate the probeslot as lookups performed during * the cache evictions above will have stored some other key. */ prepare_probe_slot(mstate, key); /* Re-find the newly added entry */ entry = memoize_lookup(mstate->hashtable, NULL); Assert(entry != NULL); } } return entry; } /* * cache_store_tuple * Add the tuple stored in 'slot' to the mstate's current cache entry. * The cache entry must have already been made with cache_lookup(). * mstate's last_tuple field must point to the tail of mstate->entry's * list of tuples. */ static bool cache_store_tuple(MemoizeState *mstate, TupleTableSlot *slot) { MemoizeTuple *tuple; MemoizeEntry *entry = mstate->entry; MemoryContext oldcontext; Assert(slot != NULL); Assert(entry != NULL); oldcontext = MemoryContextSwitchTo(mstate->tableContext); tuple = (MemoizeTuple *) palloc(sizeof(MemoizeTuple)); tuple->mintuple = ExecCopySlotMinimalTuple(slot); tuple->next = NULL; /* Account for the memory we just consumed */ mstate->mem_used += CACHE_TUPLE_BYTES(tuple); if (entry->tuplehead == NULL) { /* * This is the first tuple for this entry, so just point the list head * to it. */ entry->tuplehead = tuple; } else { /* push this tuple onto the tail of the list */ mstate->last_tuple->next = tuple; } mstate->last_tuple = tuple; MemoryContextSwitchTo(oldcontext); /* * If we've gone over our memory budget then free up some space in the * cache. */ if (mstate->mem_used > mstate->mem_limit) { MemoizeKey *key = entry->key; if (!cache_reduce_memory(mstate, key)) return false; /* * The process of removing entries from the cache may have caused the * code in simplehash.h to shuffle elements to earlier buckets in the * hash table. If it has, we'll need to find the entry again by * performing a lookup. Fortunately, we can detect if this has * happened by seeing if the entry is still in use and that the key * pointer matches our expected key. */ if (entry->status != memoize_SH_IN_USE || entry->key != key) { /* * We need to repopulate the probeslot as lookups performed during * the cache evictions above will have stored some other key. */ prepare_probe_slot(mstate, key); /* Re-find the entry */ mstate->entry = entry = memoize_lookup(mstate->hashtable, NULL); Assert(entry != NULL); } } return true; } static TupleTableSlot * ExecMemoize(PlanState *pstate) { MemoizeState *node = castNode(MemoizeState, pstate); PlanState *outerNode; TupleTableSlot *slot; switch (node->mstatus) { case MEMO_CACHE_LOOKUP: { MemoizeEntry *entry; TupleTableSlot *outerslot; bool found; Assert(node->entry == NULL); /* * We're only ever in this state for the first call of the * scan. Here we have a look to see if we've already seen the * current parameters before and if we have already cached a * complete set of records that the outer plan will return for * these parameters. * * When we find a valid cache entry, we'll return the first * tuple from it. If not found, we'll create a cache entry and * then try to fetch a tuple from the outer scan. If we find * one there, we'll try to cache it. */ /* see if we've got anything cached for the current parameters */ entry = cache_lookup(node, &found); if (found && entry->complete) { node->stats.cache_hits += 1; /* stats update */ /* * Set last_tuple and entry so that the state * MEMO_CACHE_FETCH_NEXT_TUPLE can easily find the next * tuple for these parameters. */ node->last_tuple = entry->tuplehead; node->entry = entry; /* Fetch the first cached tuple, if there is one */ if (entry->tuplehead) { node->mstatus = MEMO_CACHE_FETCH_NEXT_TUPLE; slot = node->ss.ps.ps_ResultTupleSlot; ExecStoreMinimalTuple(entry->tuplehead->mintuple, slot, false); return slot; } /* The cache entry is void of any tuples. */ node->mstatus = MEMO_END_OF_SCAN; return NULL; } /* Handle cache miss */ node->stats.cache_misses += 1; /* stats update */ if (found) { /* * A cache entry was found, but the scan for that entry * did not run to completion. We'll just remove all * tuples and start again. It might be tempting to * continue where we left off, but there's no guarantee * the outer node will produce the tuples in the same * order as it did last time. */ entry_purge_tuples(node, entry); } /* Scan the outer node for a tuple to cache */ outerNode = outerPlanState(node); outerslot = ExecProcNode(outerNode); if (TupIsNull(outerslot)) { /* * cache_lookup may have returned NULL due to failure to * free enough cache space, so ensure we don't do anything * here that assumes it worked. There's no need to go into * bypass mode here as we're setting mstatus to end of * scan. */ if (likely(entry)) entry->complete = true; node->mstatus = MEMO_END_OF_SCAN; return NULL; } node->entry = entry; /* * If we failed to create the entry or failed to store the * tuple in the entry, then go into bypass mode. */ if (unlikely(entry == NULL || !cache_store_tuple(node, outerslot))) { node->stats.cache_overflows += 1; /* stats update */ node->mstatus = MEMO_CACHE_BYPASS_MODE; /* * No need to clear out last_tuple as we'll stay in bypass * mode until the end of the scan. */ } else { /* * If we only expect a single row from this scan then we * can mark that we're not expecting more. This allows * cache lookups to work even when the scan has not been * executed to completion. */ entry->complete = node->singlerow; node->mstatus = MEMO_FILLING_CACHE; } slot = node->ss.ps.ps_ResultTupleSlot; ExecCopySlot(slot, outerslot); return slot; } case MEMO_CACHE_FETCH_NEXT_TUPLE: { /* We shouldn't be in this state if these are not set */ Assert(node->entry != NULL); Assert(node->last_tuple != NULL); /* Skip to the next tuple to output */ node->last_tuple = node->last_tuple->next; /* No more tuples in the cache */ if (node->last_tuple == NULL) { node->mstatus = MEMO_END_OF_SCAN; return NULL; } slot = node->ss.ps.ps_ResultTupleSlot; ExecStoreMinimalTuple(node->last_tuple->mintuple, slot, false); return slot; } case MEMO_FILLING_CACHE: { TupleTableSlot *outerslot; MemoizeEntry *entry = node->entry; /* entry should already have been set by MEMO_CACHE_LOOKUP */ Assert(entry != NULL); /* * When in the MEMO_FILLING_CACHE state, we've just had a * cache miss and are populating the cache with the current * scan tuples. */ outerNode = outerPlanState(node); outerslot = ExecProcNode(outerNode); if (TupIsNull(outerslot)) { /* No more tuples. Mark it as complete */ entry->complete = true; node->mstatus = MEMO_END_OF_SCAN; return NULL; } /* * Validate if the planner properly set the singlerow flag. It * should only set that if each cache entry can, at most, * return 1 row. */ if (unlikely(entry->complete)) elog(ERROR, "cache entry already complete"); /* Record the tuple in the current cache entry */ if (unlikely(!cache_store_tuple(node, outerslot))) { /* Couldn't store it? Handle overflow */ node->stats.cache_overflows += 1; /* stats update */ node->mstatus = MEMO_CACHE_BYPASS_MODE; /* * No need to clear out entry or last_tuple as we'll stay * in bypass mode until the end of the scan. */ } slot = node->ss.ps.ps_ResultTupleSlot; ExecCopySlot(slot, outerslot); return slot; } case MEMO_CACHE_BYPASS_MODE: { TupleTableSlot *outerslot; /* * When in bypass mode we just continue to read tuples without * caching. We need to wait until the next rescan before we * can come out of this mode. */ outerNode = outerPlanState(node); outerslot = ExecProcNode(outerNode); if (TupIsNull(outerslot)) { node->mstatus = MEMO_END_OF_SCAN; return NULL; } slot = node->ss.ps.ps_ResultTupleSlot; ExecCopySlot(slot, outerslot); return slot; } case MEMO_END_OF_SCAN: /* * We've already returned NULL for this scan, but just in case * something calls us again by mistake. */ return NULL; default: elog(ERROR, "unrecognized memoize state: %d", (int) node->mstatus); return NULL; } /* switch */ } MemoizeState * ExecInitMemoize(Memoize *node, EState *estate, int eflags) { MemoizeState *mstate = makeNode(MemoizeState); Plan *outerNode; int i; int nkeys; Oid *eqfuncoids; /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); mstate->ss.ps.plan = (Plan *) node; mstate->ss.ps.state = estate; mstate->ss.ps.ExecProcNode = ExecMemoize; /* * Miscellaneous initialization * * create expression context for node */ ExecAssignExprContext(estate, &mstate->ss.ps); outerNode = outerPlan(node); outerPlanState(mstate) = ExecInitNode(outerNode, estate, eflags); /* * Initialize return slot and type. No need to initialize projection info * because this node doesn't do projections. */ ExecInitResultTupleSlotTL(&mstate->ss.ps, &TTSOpsMinimalTuple); mstate->ss.ps.ps_ProjInfo = NULL; /* * Initialize scan slot and type. */ ExecCreateScanSlotFromOuterPlan(estate, &mstate->ss, &TTSOpsMinimalTuple); /* * Set the state machine to lookup the cache. We won't find anything * until we cache something, but this saves a special case to create the * first entry. */ mstate->mstatus = MEMO_CACHE_LOOKUP; mstate->nkeys = nkeys = node->numKeys; mstate->hashkeydesc = ExecTypeFromExprList(node->param_exprs); mstate->tableslot = MakeSingleTupleTableSlot(mstate->hashkeydesc, &TTSOpsMinimalTuple); mstate->probeslot = MakeSingleTupleTableSlot(mstate->hashkeydesc, &TTSOpsVirtual); mstate->param_exprs = (ExprState **) palloc(nkeys * sizeof(ExprState *)); mstate->collations = node->collations; /* Just point directly to the plan * data */ mstate->hashfunctions = (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo)); eqfuncoids = palloc(nkeys * sizeof(Oid)); for (i = 0; i < nkeys; i++) { Oid hashop = node->hashOperators[i]; Oid left_hashfn; Oid right_hashfn; Expr *param_expr = (Expr *) list_nth(node->param_exprs, i); if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn)) elog(ERROR, "could not find hash function for hash operator %u", hashop); fmgr_info(left_hashfn, &mstate->hashfunctions[i]); mstate->param_exprs[i] = ExecInitExpr(param_expr, (PlanState *) mstate); eqfuncoids[i] = get_opcode(hashop); } mstate->cache_eq_expr = ExecBuildParamSetEqual(mstate->hashkeydesc, &TTSOpsMinimalTuple, &TTSOpsVirtual, eqfuncoids, node->collations, node->param_exprs, (PlanState *) mstate); pfree(eqfuncoids); mstate->mem_used = 0; /* Limit the total memory consumed by the cache to this */ mstate->mem_limit = get_hash_memory_limit(); /* A memory context dedicated for the cache */ mstate->tableContext = AllocSetContextCreate(CurrentMemoryContext, "MemoizeHashTable", ALLOCSET_DEFAULT_SIZES); dlist_init(&mstate->lru_list); mstate->last_tuple = NULL; mstate->entry = NULL; /* * Mark if we can assume the cache entry is completed after we get the * first record for it. Some callers might not call us again after * getting the first match. e.g. A join operator performing a unique join * is able to skip to the next outer tuple after getting the first * matching inner tuple. In this case, the cache entry is complete after * getting the first tuple. This allows us to mark it as so. */ mstate->singlerow = node->singlerow; mstate->keyparamids = node->keyparamids; /* * Record if the cache keys should be compared bit by bit, or logically * using the type's hash equality operator */ mstate->binary_mode = node->binary_mode; /* Zero the statistics counters */ memset(&mstate->stats, 0, sizeof(MemoizeInstrumentation)); /* Allocate and set up the actual cache */ build_hash_table(mstate, node->est_entries); return mstate; } void ExecEndMemoize(MemoizeState *node) { #ifdef USE_ASSERT_CHECKING /* Validate the memory accounting code is correct in assert builds. */ { int count; uint64 mem = 0; memoize_iterator i; MemoizeEntry *entry; memoize_start_iterate(node->hashtable, &i); count = 0; while ((entry = memoize_iterate(node->hashtable, &i)) != NULL) { MemoizeTuple *tuple = entry->tuplehead; mem += EMPTY_ENTRY_MEMORY_BYTES(entry); while (tuple != NULL) { mem += CACHE_TUPLE_BYTES(tuple); tuple = tuple->next; } count++; } Assert(count == node->hashtable->members); Assert(mem == node->mem_used); } #endif /* * When ending a parallel worker, copy the statistics gathered by the * worker back into shared memory so that it can be picked up by the main * process to report in EXPLAIN ANALYZE. */ if (node->shared_info != NULL && IsParallelWorker()) { MemoizeInstrumentation *si; /* Make mem_peak available for EXPLAIN */ if (node->stats.mem_peak == 0) node->stats.mem_peak = node->mem_used; Assert(ParallelWorkerNumber <= node->shared_info->num_workers); si = &node->shared_info->sinstrument[ParallelWorkerNumber]; memcpy(si, &node->stats, sizeof(MemoizeInstrumentation)); } /* Remove the cache context */ MemoryContextDelete(node->tableContext); ExecClearTuple(node->ss.ss_ScanTupleSlot); /* must drop pointer to cache result tuple */ ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); /* * free exprcontext */ ExecFreeExprContext(&node->ss.ps); /* * shut down the subplan */ ExecEndNode(outerPlanState(node)); } void ExecReScanMemoize(MemoizeState *node) { PlanState *outerPlan = outerPlanState(node); /* Mark that we must lookup the cache for a new set of parameters */ node->mstatus = MEMO_CACHE_LOOKUP; /* nullify pointers used for the last scan */ node->entry = NULL; node->last_tuple = NULL; /* * if chgParam of subnode is not null then plan will be re-scanned by * first ExecProcNode. */ if (outerPlan->chgParam == NULL) ExecReScan(outerPlan); /* * Purge the entire cache if a parameter changed that is not part of the * cache key. */ if (bms_nonempty_difference(outerPlan->chgParam, node->keyparamids)) cache_purge_all(node); } /* * ExecEstimateCacheEntryOverheadBytes * For use in the query planner to help it estimate the amount of memory * required to store a single entry in the cache. */ double ExecEstimateCacheEntryOverheadBytes(double ntuples) { return sizeof(MemoizeEntry) + sizeof(MemoizeKey) + sizeof(MemoizeTuple) * ntuples; } /* ---------------------------------------------------------------- * Parallel Query Support * ---------------------------------------------------------------- */ /* ---------------------------------------------------------------- * ExecMemoizeEstimate * * Estimate space required to propagate memoize statistics. * ---------------------------------------------------------------- */ void ExecMemoizeEstimate(MemoizeState *node, ParallelContext *pcxt) { Size size; /* don't need this if not instrumenting or no workers */ if (!node->ss.ps.instrument || pcxt->nworkers == 0) return; size = mul_size(pcxt->nworkers, sizeof(MemoizeInstrumentation)); size = add_size(size, offsetof(SharedMemoizeInfo, sinstrument)); shm_toc_estimate_chunk(&pcxt->estimator, size); shm_toc_estimate_keys(&pcxt->estimator, 1); } /* ---------------------------------------------------------------- * ExecMemoizeInitializeDSM * * Initialize DSM space for memoize statistics. * ---------------------------------------------------------------- */ void ExecMemoizeInitializeDSM(MemoizeState *node, ParallelContext *pcxt) { Size size; /* don't need this if not instrumenting or no workers */ if (!node->ss.ps.instrument || pcxt->nworkers == 0) return; size = offsetof(SharedMemoizeInfo, sinstrument) + pcxt->nworkers * sizeof(MemoizeInstrumentation); node->shared_info = shm_toc_allocate(pcxt->toc, size); /* ensure any unfilled slots will contain zeroes */ memset(node->shared_info, 0, size); node->shared_info->num_workers = pcxt->nworkers; shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, node->shared_info); } /* ---------------------------------------------------------------- * ExecMemoizeInitializeWorker * * Attach worker to DSM space for memoize statistics. * ---------------------------------------------------------------- */ void ExecMemoizeInitializeWorker(MemoizeState *node, ParallelWorkerContext *pwcxt) { node->shared_info = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true); } /* ---------------------------------------------------------------- * ExecMemoizeRetrieveInstrumentation * * Transfer memoize statistics from DSM to private memory. * ---------------------------------------------------------------- */ void ExecMemoizeRetrieveInstrumentation(MemoizeState *node) { Size size; SharedMemoizeInfo *si; if (node->shared_info == NULL) return; size = offsetof(SharedMemoizeInfo, sinstrument) + node->shared_info->num_workers * sizeof(MemoizeInstrumentation); si = palloc(size); memcpy(si, node->shared_info, size); node->shared_info = si; }