summaryrefslogtreecommitdiffstats
path: root/src/backend/utils/cache/catcache.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/backend/utils/cache/catcache.c294
1 files changed, 189 insertions, 105 deletions
diff --git a/src/backend/utils/cache/catcache.c b/src/backend/utils/cache/catcache.c
index 38e943f..d22bc07 100644
--- a/src/backend/utils/cache/catcache.c
+++ b/src/backend/utils/cache/catcache.c
@@ -25,6 +25,7 @@
#include "catalog/pg_operator.h"
#include "catalog/pg_type.h"
#include "common/hashfn.h"
+#include "common/pg_prng.h"
#include "miscadmin.h"
#include "port/pg_bitutils.h"
#ifdef CATCACHE_STATS
@@ -90,10 +91,10 @@ static void CatCachePrintStats(int code, Datum arg);
static void CatCacheRemoveCTup(CatCache *cache, CatCTup *ct);
static void CatCacheRemoveCList(CatCache *cache, CatCList *cl);
static void CatalogCacheInitializeCache(CatCache *cache);
-static CatCTup *CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp,
+static CatCTup *CatalogCacheCreateEntry(CatCache *cache,
+ HeapTuple ntp, SysScanDesc scandesc,
Datum *arguments,
- uint32 hashValue, Index hashIndex,
- bool negative);
+ uint32 hashValue, Index hashIndex);
static void CatCacheFreeKeys(TupleDesc tupdesc, int nkeys, int *attnos,
Datum *keys);
@@ -1318,6 +1319,7 @@ SearchCatCacheMiss(CatCache *cache,
SysScanDesc scandesc;
HeapTuple ntp;
CatCTup *ct;
+ bool stale;
Datum arguments[CATCACHE_MAXKEYS];
/* Initialize local parameter array */
@@ -1327,16 +1329,6 @@ SearchCatCacheMiss(CatCache *cache,
arguments[3] = v4;
/*
- * Ok, need to make a lookup in the relation, copy the scankey and fill
- * out any per-call fields.
- */
- memcpy(cur_skey, cache->cc_skey, sizeof(ScanKeyData) * nkeys);
- cur_skey[0].sk_argument = v1;
- cur_skey[1].sk_argument = v2;
- cur_skey[2].sk_argument = v3;
- cur_skey[3].sk_argument = v4;
-
- /*
* Tuple was not found in cache, so we have to try to retrieve it directly
* from the relation. If found, we will add it to the cache; if not
* found, we will add a negative cache entry instead.
@@ -1350,31 +1342,57 @@ SearchCatCacheMiss(CatCache *cache,
* will eventually age out of the cache, so there's no functional problem.
* This case is rare enough that it's not worth expending extra cycles to
* detect.
+ *
+ * Another case, which we *must* handle, is that the tuple could become
+ * outdated during CatalogCacheCreateEntry's attempt to detoast it (since
+ * AcceptInvalidationMessages can run during TOAST table access). We do
+ * not want to return already-stale catcache entries, so we loop around
+ * and do the table scan again if that happens.
*/
relation = table_open(cache->cc_reloid, AccessShareLock);
- scandesc = systable_beginscan(relation,
- cache->cc_indexoid,
- IndexScanOK(cache, cur_skey),
- NULL,
- nkeys,
- cur_skey);
+ do
+ {
+ /*
+ * Ok, need to make a lookup in the relation, copy the scankey and
+ * fill out any per-call fields. (We must re-do this when retrying,
+ * because systable_beginscan scribbles on the scankey.)
+ */
+ memcpy(cur_skey, cache->cc_skey, sizeof(ScanKeyData) * nkeys);
+ cur_skey[0].sk_argument = v1;
+ cur_skey[1].sk_argument = v2;
+ cur_skey[2].sk_argument = v3;
+ cur_skey[3].sk_argument = v4;
+
+ scandesc = systable_beginscan(relation,
+ cache->cc_indexoid,
+ IndexScanOK(cache, cur_skey),
+ NULL,
+ nkeys,
+ cur_skey);
- ct = NULL;
+ ct = NULL;
+ stale = false;
- while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
- {
- ct = CatalogCacheCreateEntry(cache, ntp, arguments,
- hashValue, hashIndex,
- false);
- /* immediately set the refcount to 1 */
- ResourceOwnerEnlargeCatCacheRefs(CurrentResourceOwner);
- ct->refcount++;
- ResourceOwnerRememberCatCacheRef(CurrentResourceOwner, &ct->tuple);
- break; /* assume only one match */
- }
+ while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
+ {
+ ct = CatalogCacheCreateEntry(cache, ntp, scandesc, NULL,
+ hashValue, hashIndex);
+ /* upon failure, we must start the scan over */
+ if (ct == NULL)
+ {
+ stale = true;
+ break;
+ }
+ /* immediately set the refcount to 1 */
+ ResourceOwnerEnlargeCatCacheRefs(CurrentResourceOwner);
+ ct->refcount++;
+ ResourceOwnerRememberCatCacheRef(CurrentResourceOwner, &ct->tuple);
+ break; /* assume only one match */
+ }
- systable_endscan(scandesc);
+ systable_endscan(scandesc);
+ } while (stale);
table_close(relation, AccessShareLock);
@@ -1393,9 +1411,11 @@ SearchCatCacheMiss(CatCache *cache,
if (IsBootstrapProcessingMode())
return NULL;
- ct = CatalogCacheCreateEntry(cache, NULL, arguments,
- hashValue, hashIndex,
- true);
+ ct = CatalogCacheCreateEntry(cache, NULL, NULL, arguments,
+ hashValue, hashIndex);
+
+ /* Creating a negative cache entry shouldn't fail */
+ Assert(ct != NULL);
CACHE_elog(DEBUG2, "SearchCatCache(%s): Contains %d/%d tuples",
cache->cc_relname, cache->cc_ntup, CacheHdr->ch_ntup);
@@ -1602,7 +1622,8 @@ SearchCatCacheList(CatCache *cache,
* We have to bump the member refcounts temporarily to ensure they won't
* get dropped from the cache while loading other members. We use a PG_TRY
* block to ensure we can undo those refcounts if we get an error before
- * we finish constructing the CatCList.
+ * we finish constructing the CatCList. ctlist must be valid throughout
+ * the PG_TRY block.
*/
ResourceOwnerEnlargeCatCacheListRefs(CurrentResourceOwner);
@@ -1613,83 +1634,113 @@ SearchCatCacheList(CatCache *cache,
ScanKeyData cur_skey[CATCACHE_MAXKEYS];
Relation relation;
SysScanDesc scandesc;
-
- /*
- * Ok, need to make a lookup in the relation, copy the scankey and
- * fill out any per-call fields.
- */
- memcpy(cur_skey, cache->cc_skey, sizeof(ScanKeyData) * cache->cc_nkeys);
- cur_skey[0].sk_argument = v1;
- cur_skey[1].sk_argument = v2;
- cur_skey[2].sk_argument = v3;
- cur_skey[3].sk_argument = v4;
+ bool stale;
relation = table_open(cache->cc_reloid, AccessShareLock);
- scandesc = systable_beginscan(relation,
- cache->cc_indexoid,
- IndexScanOK(cache, cur_skey),
- NULL,
- nkeys,
- cur_skey);
-
- /* The list will be ordered iff we are doing an index scan */
- ordered = (scandesc->irel != NULL);
-
- while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
+ do
{
- uint32 hashValue;
- Index hashIndex;
- bool found = false;
- dlist_head *bucket;
-
/*
- * See if there's an entry for this tuple already.
+ * Ok, need to make a lookup in the relation, copy the scankey and
+ * fill out any per-call fields. (We must re-do this when
+ * retrying, because systable_beginscan scribbles on the scankey.)
*/
- ct = NULL;
- hashValue = CatalogCacheComputeTupleHashValue(cache, cache->cc_nkeys, ntp);
- hashIndex = HASH_INDEX(hashValue, cache->cc_nbuckets);
+ memcpy(cur_skey, cache->cc_skey, sizeof(ScanKeyData) * cache->cc_nkeys);
+ cur_skey[0].sk_argument = v1;
+ cur_skey[1].sk_argument = v2;
+ cur_skey[2].sk_argument = v3;
+ cur_skey[3].sk_argument = v4;
- bucket = &cache->cc_bucket[hashIndex];
- dlist_foreach(iter, bucket)
- {
- ct = dlist_container(CatCTup, cache_elem, iter.cur);
+ scandesc = systable_beginscan(relation,
+ cache->cc_indexoid,
+ IndexScanOK(cache, cur_skey),
+ NULL,
+ nkeys,
+ cur_skey);
- if (ct->dead || ct->negative)
- continue; /* ignore dead and negative entries */
+ /* The list will be ordered iff we are doing an index scan */
+ ordered = (scandesc->irel != NULL);
- if (ct->hash_value != hashValue)
- continue; /* quickly skip entry if wrong hash val */
+ stale = false;
- if (!ItemPointerEquals(&(ct->tuple.t_self), &(ntp->t_self)))
- continue; /* not same tuple */
+ while (HeapTupleIsValid(ntp = systable_getnext(scandesc)))
+ {
+ uint32 hashValue;
+ Index hashIndex;
+ bool found = false;
+ dlist_head *bucket;
/*
- * Found a match, but can't use it if it belongs to another
- * list already
+ * See if there's an entry for this tuple already.
*/
- if (ct->c_list)
- continue;
-
- found = true;
- break; /* A-OK */
- }
-
- if (!found)
- {
- /* We didn't find a usable entry, so make a new one */
- ct = CatalogCacheCreateEntry(cache, ntp, arguments,
- hashValue, hashIndex,
- false);
+ ct = NULL;
+ hashValue = CatalogCacheComputeTupleHashValue(cache, cache->cc_nkeys, ntp);
+ hashIndex = HASH_INDEX(hashValue, cache->cc_nbuckets);
+
+ bucket = &cache->cc_bucket[hashIndex];
+ dlist_foreach(iter, bucket)
+ {
+ ct = dlist_container(CatCTup, cache_elem, iter.cur);
+
+ if (ct->dead || ct->negative)
+ continue; /* ignore dead and negative entries */
+
+ if (ct->hash_value != hashValue)
+ continue; /* quickly skip entry if wrong hash val */
+
+ if (!ItemPointerEquals(&(ct->tuple.t_self), &(ntp->t_self)))
+ continue; /* not same tuple */
+
+ /*
+ * Found a match, but can't use it if it belongs to
+ * another list already
+ */
+ if (ct->c_list)
+ continue;
+
+ found = true;
+ break; /* A-OK */
+ }
+
+ if (!found)
+ {
+ /* We didn't find a usable entry, so make a new one */
+ ct = CatalogCacheCreateEntry(cache, ntp, scandesc, NULL,
+ hashValue, hashIndex);
+ /* upon failure, we must start the scan over */
+ if (ct == NULL)
+ {
+ /*
+ * Release refcounts on any items we already had. We
+ * dare not try to free them if they're now
+ * unreferenced, since an error while doing that would
+ * result in the PG_CATCH below doing extra refcount
+ * decrements. Besides, we'll likely re-adopt those
+ * items in the next iteration, so it's not worth
+ * complicating matters to try to get rid of them.
+ */
+ foreach(ctlist_item, ctlist)
+ {
+ ct = (CatCTup *) lfirst(ctlist_item);
+ Assert(ct->c_list == NULL);
+ Assert(ct->refcount > 0);
+ ct->refcount--;
+ }
+ /* Reset ctlist in preparation for new try */
+ ctlist = NIL;
+ stale = true;
+ break;
+ }
+ }
+
+ /* Careful here: add entry to ctlist, then bump its refcount */
+ /* This way leaves state correct if lappend runs out of memory */
+ ctlist = lappend(ctlist, ct);
+ ct->refcount++;
}
- /* Careful here: add entry to ctlist, then bump its refcount */
- /* This way leaves state correct if lappend runs out of memory */
- ctlist = lappend(ctlist, ct);
- ct->refcount++;
- }
-
- systable_endscan(scandesc);
+ systable_endscan(scandesc);
+ } while (stale);
table_close(relation, AccessShareLock);
@@ -1796,22 +1847,42 @@ ReleaseCatCacheList(CatCList *list)
* CatalogCacheCreateEntry
* Create a new CatCTup entry, copying the given HeapTuple and other
* supplied data into it. The new entry initially has refcount 0.
+ *
+ * To create a normal cache entry, ntp must be the HeapTuple just fetched
+ * from scandesc, and "arguments" is not used. To create a negative cache
+ * entry, pass NULL for ntp and scandesc; then "arguments" is the cache
+ * keys to use. In either case, hashValue/hashIndex are the hash values
+ * computed from the cache keys.
+ *
+ * Returns NULL if we attempt to detoast the tuple and observe that it
+ * became stale. (This cannot happen for a negative entry.) Caller must
+ * retry the tuple lookup in that case.
*/
static CatCTup *
-CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp, Datum *arguments,
- uint32 hashValue, Index hashIndex,
- bool negative)
+CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp, SysScanDesc scandesc,
+ Datum *arguments,
+ uint32 hashValue, Index hashIndex)
{
CatCTup *ct;
HeapTuple dtp;
MemoryContext oldcxt;
- /* negative entries have no tuple associated */
if (ntp)
{
int i;
- Assert(!negative);
+ /*
+ * The visibility recheck below essentially never fails during our
+ * regression tests, and there's no easy way to force it to fail for
+ * testing purposes. To ensure we have test coverage for the retry
+ * paths in our callers, make debug builds randomly fail about 0.1% of
+ * the times through this code path, even when there's no toasted
+ * fields.
+ */
+#ifdef USE_ASSERT_CHECKING
+ if (pg_prng_uint32(&pg_global_prng_state) <= (PG_UINT32_MAX / 1000))
+ return NULL;
+#endif
/*
* If there are any out-of-line toasted fields in the tuple, expand
@@ -1821,7 +1892,20 @@ CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp, Datum *arguments,
* something using a slightly stale catcache entry.
*/
if (HeapTupleHasExternal(ntp))
+ {
dtp = toast_flatten_tuple(ntp, cache->cc_tupdesc);
+
+ /*
+ * The tuple could become stale while we are doing toast table
+ * access (since AcceptInvalidationMessages can run then), so we
+ * must recheck its visibility afterwards.
+ */
+ if (!systable_recheck_tuple(scandesc, ntp))
+ {
+ heap_freetuple(dtp);
+ return NULL;
+ }
+ }
else
dtp = ntp;
@@ -1860,7 +1944,7 @@ CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp, Datum *arguments,
}
else
{
- Assert(negative);
+ /* Set up keys for a negative cache entry */
oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
ct = (CatCTup *) palloc(sizeof(CatCTup));
@@ -1882,7 +1966,7 @@ CatalogCacheCreateEntry(CatCache *cache, HeapTuple ntp, Datum *arguments,
ct->c_list = NULL;
ct->refcount = 0; /* for the moment */
ct->dead = false;
- ct->negative = negative;
+ ct->negative = (ntp == NULL);
ct->hash_value = hashValue;
dlist_push_head(&cache->cc_bucket[hashIndex], &ct->cache_elem);