diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
commit | 46651ce6fe013220ed397add242004d764fc0153 (patch) | |
tree | 6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/storage/lmgr/lock.c | |
parent | Initial commit. (diff) | |
download | postgresql-14-upstream.tar.xz postgresql-14-upstream.zip |
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/storage/lmgr/lock.c')
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 4738 |
1 files changed, 4738 insertions, 0 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c new file mode 100644 index 0000000..818666f --- /dev/null +++ b/src/backend/storage/lmgr/lock.c @@ -0,0 +1,4738 @@ +/*------------------------------------------------------------------------- + * + * lock.c + * POSTGRES primary lock mechanism + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/storage/lmgr/lock.c + * + * NOTES + * A lock table is a shared memory hash table. When + * a process tries to acquire a lock of a type that conflicts + * with existing locks, it is put to sleep using the routines + * in storage/lmgr/proc.c. + * + * For the most part, this code should be invoked via lmgr.c + * or another lock-management module, not directly. + * + * Interface: + * + * InitLocks(), GetLocksMethodTable(), GetLockTagsMethodTable(), + * LockAcquire(), LockRelease(), LockReleaseAll(), + * LockCheckConflicts(), GrantLock() + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <signal.h> +#include <unistd.h> + +#include "access/transam.h" +#include "access/twophase.h" +#include "access/twophase_rmgr.h" +#include "access/xact.h" +#include "access/xlog.h" +#include "miscadmin.h" +#include "pg_trace.h" +#include "pgstat.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/sinvaladt.h" +#include "storage/spin.h" +#include "storage/standby.h" +#include "utils/memutils.h" +#include "utils/ps_status.h" +#include "utils/resowner_private.h" + + +/* This configuration variable is used to set the lock table size */ +int max_locks_per_xact; /* set by guc.c */ + +#define NLOCKENTS() \ + mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts)) + + +/* + * Data structures defining the semantics of the standard lock methods. + * + * The conflict table defines the semantics of the various lock modes. + */ +static const LOCKMASK LockConflicts[] = { + 0, + + /* AccessShareLock */ + LOCKBIT_ON(AccessExclusiveLock), + + /* RowShareLock */ + LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock), + + /* RowExclusiveLock */ + LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) | + LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock), + + /* ShareUpdateExclusiveLock */ + LOCKBIT_ON(ShareUpdateExclusiveLock) | + LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) | + LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock), + + /* ShareLock */ + LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) | + LOCKBIT_ON(ShareRowExclusiveLock) | + LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock), + + /* ShareRowExclusiveLock */ + LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) | + LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) | + LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock), + + /* ExclusiveLock */ + LOCKBIT_ON(RowShareLock) | + LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) | + LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) | + LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock), + + /* AccessExclusiveLock */ + LOCKBIT_ON(AccessShareLock) | LOCKBIT_ON(RowShareLock) | + LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) | + LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) | + LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock) + +}; + +/* Names of lock modes, for debug printouts */ +static const char *const lock_mode_names[] = +{ + "INVALID", + "AccessShareLock", + "RowShareLock", + "RowExclusiveLock", + "ShareUpdateExclusiveLock", + "ShareLock", + "ShareRowExclusiveLock", + "ExclusiveLock", + "AccessExclusiveLock" +}; + +#ifndef LOCK_DEBUG +static bool Dummy_trace = false; +#endif + +static const LockMethodData default_lockmethod = { + AccessExclusiveLock, /* highest valid lock mode number */ + LockConflicts, + lock_mode_names, +#ifdef LOCK_DEBUG + &Trace_locks +#else + &Dummy_trace +#endif +}; + +static const LockMethodData user_lockmethod = { + AccessExclusiveLock, /* highest valid lock mode number */ + LockConflicts, + lock_mode_names, +#ifdef LOCK_DEBUG + &Trace_userlocks +#else + &Dummy_trace +#endif +}; + +/* + * map from lock method id to the lock table data structures + */ +static const LockMethod LockMethods[] = { + NULL, + &default_lockmethod, + &user_lockmethod +}; + + +/* Record that's written to 2PC state file when a lock is persisted */ +typedef struct TwoPhaseLockRecord +{ + LOCKTAG locktag; + LOCKMODE lockmode; +} TwoPhaseLockRecord; + + +/* + * Count of the number of fast path lock slots we believe to be used. This + * might be higher than the real number if another backend has transferred + * our locks to the primary lock table, but it can never be lower than the + * real value, since only we can acquire locks on our own behalf. + */ +static int FastPathLocalUseCount = 0; + +/* + * Flag to indicate if the relation extension lock is held by this backend. + * This flag is used to ensure that while holding the relation extension lock + * we don't try to acquire a heavyweight lock on any other object. This + * restriction implies that the relation extension lock won't ever participate + * in the deadlock cycle because we can never wait for any other heavyweight + * lock after acquiring this lock. + * + * Such a restriction is okay for relation extension locks as unlike other + * heavyweight locks these are not held till the transaction end. These are + * taken for a short duration to extend a particular relation and then + * released. + */ +static bool IsRelationExtensionLockHeld PG_USED_FOR_ASSERTS_ONLY = false; + +/* + * Flag to indicate if the page lock is held by this backend. We don't + * acquire any other heavyweight lock while holding the page lock except for + * relation extension. However, these locks are never taken in reverse order + * which implies that page locks will also never participate in the deadlock + * cycle. + * + * Similar to relation extension, page locks are also held for a short + * duration, so imposing such a restriction won't hurt. + */ +static bool IsPageLockHeld PG_USED_FOR_ASSERTS_ONLY = false; + +/* Macros for manipulating proc->fpLockBits */ +#define FAST_PATH_BITS_PER_SLOT 3 +#define FAST_PATH_LOCKNUMBER_OFFSET 1 +#define FAST_PATH_MASK ((1 << FAST_PATH_BITS_PER_SLOT) - 1) +#define FAST_PATH_GET_BITS(proc, n) \ + (((proc)->fpLockBits >> (FAST_PATH_BITS_PER_SLOT * n)) & FAST_PATH_MASK) +#define FAST_PATH_BIT_POSITION(n, l) \ + (AssertMacro((l) >= FAST_PATH_LOCKNUMBER_OFFSET), \ + AssertMacro((l) < FAST_PATH_BITS_PER_SLOT+FAST_PATH_LOCKNUMBER_OFFSET), \ + AssertMacro((n) < FP_LOCK_SLOTS_PER_BACKEND), \ + ((l) - FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT * (n))) +#define FAST_PATH_SET_LOCKMODE(proc, n, l) \ + (proc)->fpLockBits |= UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l) +#define FAST_PATH_CLEAR_LOCKMODE(proc, n, l) \ + (proc)->fpLockBits &= ~(UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)) +#define FAST_PATH_CHECK_LOCKMODE(proc, n, l) \ + ((proc)->fpLockBits & (UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l))) + +/* + * The fast-path lock mechanism is concerned only with relation locks on + * unshared relations by backends bound to a database. The fast-path + * mechanism exists mostly to accelerate acquisition and release of locks + * that rarely conflict. Because ShareUpdateExclusiveLock is + * self-conflicting, it can't use the fast-path mechanism; but it also does + * not conflict with any of the locks that do, so we can ignore it completely. + */ +#define EligibleForRelationFastPath(locktag, mode) \ + ((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \ + (locktag)->locktag_type == LOCKTAG_RELATION && \ + (locktag)->locktag_field1 == MyDatabaseId && \ + MyDatabaseId != InvalidOid && \ + (mode) < ShareUpdateExclusiveLock) +#define ConflictsWithRelationFastPath(locktag, mode) \ + ((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \ + (locktag)->locktag_type == LOCKTAG_RELATION && \ + (locktag)->locktag_field1 != InvalidOid && \ + (mode) > ShareUpdateExclusiveLock) + +static bool FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode); +static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode); +static bool FastPathTransferRelationLocks(LockMethod lockMethodTable, + const LOCKTAG *locktag, uint32 hashcode); +static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock); + +/* + * To make the fast-path lock mechanism work, we must have some way of + * preventing the use of the fast-path when a conflicting lock might be present. + * We partition* the locktag space into FAST_PATH_STRONG_LOCK_HASH_PARTITIONS, + * and maintain an integer count of the number of "strong" lockers + * in each partition. When any "strong" lockers are present (which is + * hopefully not very often), the fast-path mechanism can't be used, and we + * must fall back to the slower method of pushing matching locks directly + * into the main lock tables. + * + * The deadlock detector does not know anything about the fast path mechanism, + * so any locks that might be involved in a deadlock must be transferred from + * the fast-path queues to the main lock table. + */ + +#define FAST_PATH_STRONG_LOCK_HASH_BITS 10 +#define FAST_PATH_STRONG_LOCK_HASH_PARTITIONS \ + (1 << FAST_PATH_STRONG_LOCK_HASH_BITS) +#define FastPathStrongLockHashPartition(hashcode) \ + ((hashcode) % FAST_PATH_STRONG_LOCK_HASH_PARTITIONS) + +typedef struct +{ + slock_t mutex; + uint32 count[FAST_PATH_STRONG_LOCK_HASH_PARTITIONS]; +} FastPathStrongRelationLockData; + +static volatile FastPathStrongRelationLockData *FastPathStrongRelationLocks; + + +/* + * Pointers to hash tables containing lock state + * + * The LockMethodLockHash and LockMethodProcLockHash hash tables are in + * shared memory; LockMethodLocalHash is local to each backend. + */ +static HTAB *LockMethodLockHash; +static HTAB *LockMethodProcLockHash; +static HTAB *LockMethodLocalHash; + + +/* private state for error cleanup */ +static LOCALLOCK *StrongLockInProgress; +static LOCALLOCK *awaitedLock; +static ResourceOwner awaitedOwner; + + +#ifdef LOCK_DEBUG + +/*------ + * The following configuration options are available for lock debugging: + * + * TRACE_LOCKS -- give a bunch of output what's going on in this file + * TRACE_USERLOCKS -- same but for user locks + * TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid + * (use to avoid output on system tables) + * TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally + * DEBUG_DEADLOCKS -- currently dumps locks at untimely occasions ;) + * + * Furthermore, but in storage/lmgr/lwlock.c: + * TRACE_LWLOCKS -- trace lightweight locks (pretty useless) + * + * Define LOCK_DEBUG at compile time to get all these enabled. + * -------- + */ + +int Trace_lock_oidmin = FirstNormalObjectId; +bool Trace_locks = false; +bool Trace_userlocks = false; +int Trace_lock_table = 0; +bool Debug_deadlocks = false; + + +inline static bool +LOCK_DEBUG_ENABLED(const LOCKTAG *tag) +{ + return + (*(LockMethods[tag->locktag_lockmethodid]->trace_flag) && + ((Oid) tag->locktag_field2 >= (Oid) Trace_lock_oidmin)) + || (Trace_lock_table && + (tag->locktag_field2 == Trace_lock_table)); +} + + +inline static void +LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type) +{ + if (LOCK_DEBUG_ENABLED(&lock->tag)) + elog(LOG, + "%s: lock(%p) id(%u,%u,%u,%u,%u,%u) grantMask(%x) " + "req(%d,%d,%d,%d,%d,%d,%d)=%d " + "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", + where, lock, + lock->tag.locktag_field1, lock->tag.locktag_field2, + lock->tag.locktag_field3, lock->tag.locktag_field4, + lock->tag.locktag_type, lock->tag.locktag_lockmethodid, + lock->grantMask, + lock->requested[1], lock->requested[2], lock->requested[3], + lock->requested[4], lock->requested[5], lock->requested[6], + lock->requested[7], lock->nRequested, + lock->granted[1], lock->granted[2], lock->granted[3], + lock->granted[4], lock->granted[5], lock->granted[6], + lock->granted[7], lock->nGranted, + lock->waitProcs.size, + LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]); +} + + +inline static void +PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP) +{ + if (LOCK_DEBUG_ENABLED(&proclockP->tag.myLock->tag)) + elog(LOG, + "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x)", + where, proclockP, proclockP->tag.myLock, + PROCLOCK_LOCKMETHOD(*(proclockP)), + proclockP->tag.myProc, (int) proclockP->holdMask); +} +#else /* not LOCK_DEBUG */ + +#define LOCK_PRINT(where, lock, type) ((void) 0) +#define PROCLOCK_PRINT(where, proclockP) ((void) 0) +#endif /* not LOCK_DEBUG */ + + +static uint32 proclock_hash(const void *key, Size keysize); +static void RemoveLocalLock(LOCALLOCK *locallock); +static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, + const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode); +static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner); +static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode); +static void FinishStrongLockAcquire(void); +static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner); +static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock); +static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent); +static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode, + PROCLOCK *proclock, LockMethod lockMethodTable); +static void CleanUpLock(LOCK *lock, PROCLOCK *proclock, + LockMethod lockMethodTable, uint32 hashcode, + bool wakeupNeeded); +static void LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc, + LOCKTAG *locktag, LOCKMODE lockmode, + bool decrement_strong_lock_count); +static void GetSingleProcBlockerStatusData(PGPROC *blocked_proc, + BlockedProcsData *data); + + +/* + * InitLocks -- Initialize the lock manager's data structures. + * + * This is called from CreateSharedMemoryAndSemaphores(), which see for + * more comments. In the normal postmaster case, the shared hash tables + * are created here, as well as a locallock hash table that will remain + * unused and empty in the postmaster itself. Backends inherit the pointers + * to the shared tables via fork(), and also inherit an image of the locallock + * hash table, which they proceed to use. In the EXEC_BACKEND case, each + * backend re-executes this code to obtain pointers to the already existing + * shared hash tables and to create its locallock hash table. + */ +void +InitLocks(void) +{ + HASHCTL info; + long init_table_size, + max_table_size; + bool found; + + /* + * Compute init/max size to request for lock hashtables. Note these + * calculations must agree with LockShmemSize! + */ + max_table_size = NLOCKENTS(); + init_table_size = max_table_size / 2; + + /* + * Allocate hash table for LOCK structs. This stores per-locked-object + * information. + */ + info.keysize = sizeof(LOCKTAG); + info.entrysize = sizeof(LOCK); + info.num_partitions = NUM_LOCK_PARTITIONS; + + LockMethodLockHash = ShmemInitHash("LOCK hash", + init_table_size, + max_table_size, + &info, + HASH_ELEM | HASH_BLOBS | HASH_PARTITION); + + /* Assume an average of 2 holders per lock */ + max_table_size *= 2; + init_table_size *= 2; + + /* + * Allocate hash table for PROCLOCK structs. This stores + * per-lock-per-holder information. + */ + info.keysize = sizeof(PROCLOCKTAG); + info.entrysize = sizeof(PROCLOCK); + info.hash = proclock_hash; + info.num_partitions = NUM_LOCK_PARTITIONS; + + LockMethodProcLockHash = ShmemInitHash("PROCLOCK hash", + init_table_size, + max_table_size, + &info, + HASH_ELEM | HASH_FUNCTION | HASH_PARTITION); + + /* + * Allocate fast-path structures. + */ + FastPathStrongRelationLocks = + ShmemInitStruct("Fast Path Strong Relation Lock Data", + sizeof(FastPathStrongRelationLockData), &found); + if (!found) + SpinLockInit(&FastPathStrongRelationLocks->mutex); + + /* + * Allocate non-shared hash table for LOCALLOCK structs. This stores lock + * counts and resource owner information. + * + * The non-shared table could already exist in this process (this occurs + * when the postmaster is recreating shared memory after a backend crash). + * If so, delete and recreate it. (We could simply leave it, since it + * ought to be empty in the postmaster, but for safety let's zap it.) + */ + if (LockMethodLocalHash) + hash_destroy(LockMethodLocalHash); + + info.keysize = sizeof(LOCALLOCKTAG); + info.entrysize = sizeof(LOCALLOCK); + + LockMethodLocalHash = hash_create("LOCALLOCK hash", + 16, + &info, + HASH_ELEM | HASH_BLOBS); +} + + +/* + * Fetch the lock method table associated with a given lock + */ +LockMethod +GetLocksMethodTable(const LOCK *lock) +{ + LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock); + + Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods)); + return LockMethods[lockmethodid]; +} + +/* + * Fetch the lock method table associated with a given locktag + */ +LockMethod +GetLockTagsMethodTable(const LOCKTAG *locktag) +{ + LOCKMETHODID lockmethodid = (LOCKMETHODID) locktag->locktag_lockmethodid; + + Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods)); + return LockMethods[lockmethodid]; +} + + +/* + * Compute the hash code associated with a LOCKTAG. + * + * To avoid unnecessary recomputations of the hash code, we try to do this + * just once per function, and then pass it around as needed. Aside from + * passing the hashcode to hash_search_with_hash_value(), we can extract + * the lock partition number from the hashcode. + */ +uint32 +LockTagHashCode(const LOCKTAG *locktag) +{ + return get_hash_value(LockMethodLockHash, (const void *) locktag); +} + +/* + * Compute the hash code associated with a PROCLOCKTAG. + * + * Because we want to use just one set of partition locks for both the + * LOCK and PROCLOCK hash tables, we have to make sure that PROCLOCKs + * fall into the same partition number as their associated LOCKs. + * dynahash.c expects the partition number to be the low-order bits of + * the hash code, and therefore a PROCLOCKTAG's hash code must have the + * same low-order bits as the associated LOCKTAG's hash code. We achieve + * this with this specialized hash function. + */ +static uint32 +proclock_hash(const void *key, Size keysize) +{ + const PROCLOCKTAG *proclocktag = (const PROCLOCKTAG *) key; + uint32 lockhash; + Datum procptr; + + Assert(keysize == sizeof(PROCLOCKTAG)); + + /* Look into the associated LOCK object, and compute its hash code */ + lockhash = LockTagHashCode(&proclocktag->myLock->tag); + + /* + * To make the hash code also depend on the PGPROC, we xor the proc + * struct's address into the hash code, left-shifted so that the + * partition-number bits don't change. Since this is only a hash, we + * don't care if we lose high-order bits of the address; use an + * intermediate variable to suppress cast-pointer-to-int warnings. + */ + procptr = PointerGetDatum(proclocktag->myProc); + lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS; + + return lockhash; +} + +/* + * Compute the hash code associated with a PROCLOCKTAG, given the hashcode + * for its underlying LOCK. + * + * We use this just to avoid redundant calls of LockTagHashCode(). + */ +static inline uint32 +ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode) +{ + uint32 lockhash = hashcode; + Datum procptr; + + /* + * This must match proclock_hash()! + */ + procptr = PointerGetDatum(proclocktag->myProc); + lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS; + + return lockhash; +} + +/* + * Given two lock modes, return whether they would conflict. + */ +bool +DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2) +{ + LockMethod lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD]; + + if (lockMethodTable->conflictTab[mode1] & LOCKBIT_ON(mode2)) + return true; + + return false; +} + +/* + * LockHeldByMe -- test whether lock 'locktag' is held with mode 'lockmode' + * by the current transaction + */ +bool +LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode) +{ + LOCALLOCKTAG localtag; + LOCALLOCK *locallock; + + /* + * See if there is a LOCALLOCK entry for this lock and lockmode + */ + MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ + localtag.lock = *locktag; + localtag.mode = lockmode; + + locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash, + (void *) &localtag, + HASH_FIND, NULL); + + return (locallock && locallock->nLocks > 0); +} + +#ifdef USE_ASSERT_CHECKING +/* + * GetLockMethodLocalHash -- return the hash of local locks, for modules that + * evaluate assertions based on all locks held. + */ +HTAB * +GetLockMethodLocalHash(void) +{ + return LockMethodLocalHash; +} +#endif + +/* + * LockHasWaiters -- look up 'locktag' and check if releasing this + * lock would wake up other processes waiting for it. + */ +bool +LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) +{ + LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; + LockMethod lockMethodTable; + LOCALLOCKTAG localtag; + LOCALLOCK *locallock; + LOCK *lock; + PROCLOCK *proclock; + LWLock *partitionLock; + bool hasWaiters = false; + + if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + lockMethodTable = LockMethods[lockmethodid]; + if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes) + elog(ERROR, "unrecognized lock mode: %d", lockmode); + +#ifdef LOCK_DEBUG + if (LOCK_DEBUG_ENABLED(locktag)) + elog(LOG, "LockHasWaiters: lock [%u,%u] %s", + locktag->locktag_field1, locktag->locktag_field2, + lockMethodTable->lockModeNames[lockmode]); +#endif + + /* + * Find the LOCALLOCK entry for this lock and lockmode + */ + MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ + localtag.lock = *locktag; + localtag.mode = lockmode; + + locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash, + (void *) &localtag, + HASH_FIND, NULL); + + /* + * let the caller print its own error message, too. Do not ereport(ERROR). + */ + if (!locallock || locallock->nLocks <= 0) + { + elog(WARNING, "you don't own a lock of type %s", + lockMethodTable->lockModeNames[lockmode]); + return false; + } + + /* + * Check the shared lock table. + */ + partitionLock = LockHashPartitionLock(locallock->hashcode); + + LWLockAcquire(partitionLock, LW_SHARED); + + /* + * We don't need to re-find the lock or proclock, since we kept their + * addresses in the locallock table, and they couldn't have been removed + * while we were holding a lock on them. + */ + lock = locallock->lock; + LOCK_PRINT("LockHasWaiters: found", lock, lockmode); + proclock = locallock->proclock; + PROCLOCK_PRINT("LockHasWaiters: found", proclock); + + /* + * Double-check that we are actually holding a lock of the type we want to + * release. + */ + if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) + { + PROCLOCK_PRINT("LockHasWaiters: WRONGTYPE", proclock); + LWLockRelease(partitionLock); + elog(WARNING, "you don't own a lock of type %s", + lockMethodTable->lockModeNames[lockmode]); + RemoveLocalLock(locallock); + return false; + } + + /* + * Do the checking. + */ + if ((lockMethodTable->conflictTab[lockmode] & lock->waitMask) != 0) + hasWaiters = true; + + LWLockRelease(partitionLock); + + return hasWaiters; +} + +/* + * LockAcquire -- Check for lock conflicts, sleep if conflict found, + * set lock if/when no conflicts. + * + * Inputs: + * locktag: unique identifier for the lockable object + * lockmode: lock mode to acquire + * sessionLock: if true, acquire lock for session not current transaction + * dontWait: if true, don't wait to acquire lock + * + * Returns one of: + * LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true + * LOCKACQUIRE_OK lock successfully acquired + * LOCKACQUIRE_ALREADY_HELD incremented count for lock already held + * LOCKACQUIRE_ALREADY_CLEAR incremented count for lock already clear + * + * In the normal case where dontWait=false and the caller doesn't need to + * distinguish a freshly acquired lock from one already taken earlier in + * this same transaction, there is no need to examine the return value. + * + * Side Effects: The lock is acquired and recorded in lock tables. + * + * NOTE: if we wait for the lock, there is no way to abort the wait + * short of aborting the transaction. + */ +LockAcquireResult +LockAcquire(const LOCKTAG *locktag, + LOCKMODE lockmode, + bool sessionLock, + bool dontWait) +{ + return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, + true, NULL); +} + +/* + * LockAcquireExtended - allows us to specify additional options + * + * reportMemoryError specifies whether a lock request that fills the lock + * table should generate an ERROR or not. Passing "false" allows the caller + * to attempt to recover from lock-table-full situations, perhaps by forcibly + * canceling other lock holders and then retrying. Note, however, that the + * return code for that is LOCKACQUIRE_NOT_AVAIL, so that it's unsafe to use + * in combination with dontWait = true, as the cause of failure couldn't be + * distinguished. + * + * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK + * table entry if a lock is successfully acquired, or NULL if not. + */ +LockAcquireResult +LockAcquireExtended(const LOCKTAG *locktag, + LOCKMODE lockmode, + bool sessionLock, + bool dontWait, + bool reportMemoryError, + LOCALLOCK **locallockp) +{ + LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; + LockMethod lockMethodTable; + LOCALLOCKTAG localtag; + LOCALLOCK *locallock; + LOCK *lock; + PROCLOCK *proclock; + bool found; + ResourceOwner owner; + uint32 hashcode; + LWLock *partitionLock; + bool found_conflict; + bool log_lock = false; + + if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + lockMethodTable = LockMethods[lockmethodid]; + if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes) + elog(ERROR, "unrecognized lock mode: %d", lockmode); + + if (RecoveryInProgress() && !InRecovery && + (locktag->locktag_type == LOCKTAG_OBJECT || + locktag->locktag_type == LOCKTAG_RELATION) && + lockmode > RowExclusiveLock) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot acquire lock mode %s on database objects while recovery is in progress", + lockMethodTable->lockModeNames[lockmode]), + errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery."))); + +#ifdef LOCK_DEBUG + if (LOCK_DEBUG_ENABLED(locktag)) + elog(LOG, "LockAcquire: lock [%u,%u] %s", + locktag->locktag_field1, locktag->locktag_field2, + lockMethodTable->lockModeNames[lockmode]); +#endif + + /* Identify owner for lock */ + if (sessionLock) + owner = NULL; + else + owner = CurrentResourceOwner; + + /* + * Find or create a LOCALLOCK entry for this lock and lockmode + */ + MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ + localtag.lock = *locktag; + localtag.mode = lockmode; + + locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash, + (void *) &localtag, + HASH_ENTER, &found); + + /* + * if it's a new locallock object, initialize it + */ + if (!found) + { + locallock->lock = NULL; + locallock->proclock = NULL; + locallock->hashcode = LockTagHashCode(&(localtag.lock)); + locallock->nLocks = 0; + locallock->holdsStrongLockCount = false; + locallock->lockCleared = false; + locallock->numLockOwners = 0; + locallock->maxLockOwners = 8; + locallock->lockOwners = NULL; /* in case next line fails */ + locallock->lockOwners = (LOCALLOCKOWNER *) + MemoryContextAlloc(TopMemoryContext, + locallock->maxLockOwners * sizeof(LOCALLOCKOWNER)); + } + else + { + /* Make sure there will be room to remember the lock */ + if (locallock->numLockOwners >= locallock->maxLockOwners) + { + int newsize = locallock->maxLockOwners * 2; + + locallock->lockOwners = (LOCALLOCKOWNER *) + repalloc(locallock->lockOwners, + newsize * sizeof(LOCALLOCKOWNER)); + locallock->maxLockOwners = newsize; + } + } + hashcode = locallock->hashcode; + + if (locallockp) + *locallockp = locallock; + + /* + * If we already hold the lock, we can just increase the count locally. + * + * If lockCleared is already set, caller need not worry about absorbing + * sinval messages related to the lock's object. + */ + if (locallock->nLocks > 0) + { + GrantLockLocal(locallock, owner); + if (locallock->lockCleared) + return LOCKACQUIRE_ALREADY_CLEAR; + else + return LOCKACQUIRE_ALREADY_HELD; + } + + /* + * We don't acquire any other heavyweight lock while holding the relation + * extension lock. We do allow to acquire the same relation extension + * lock more than once but that case won't reach here. + */ + Assert(!IsRelationExtensionLockHeld); + + /* + * We don't acquire any other heavyweight lock while holding the page lock + * except for relation extension. + */ + Assert(!IsPageLockHeld || + (locktag->locktag_type == LOCKTAG_RELATION_EXTEND)); + + /* + * Prepare to emit a WAL record if acquisition of this lock needs to be + * replayed in a standby server. + * + * Here we prepare to log; after lock is acquired we'll issue log record. + * This arrangement simplifies error recovery in case the preparation step + * fails. + * + * Only AccessExclusiveLocks can conflict with lock types that read-only + * transactions can acquire in a standby server. Make sure this definition + * matches the one in GetRunningTransactionLocks(). + */ + if (lockmode >= AccessExclusiveLock && + locktag->locktag_type == LOCKTAG_RELATION && + !RecoveryInProgress() && + XLogStandbyInfoActive()) + { + LogAccessExclusiveLockPrepare(); + log_lock = true; + } + + /* + * Attempt to take lock via fast path, if eligible. But if we remember + * having filled up the fast path array, we don't attempt to make any + * further use of it until we release some locks. It's possible that some + * other backend has transferred some of those locks to the shared hash + * table, leaving space free, but it's not worth acquiring the LWLock just + * to check. It's also possible that we're acquiring a second or third + * lock type on a relation we have already locked using the fast-path, but + * for now we don't worry about that case either. + */ + if (EligibleForRelationFastPath(locktag, lockmode) && + FastPathLocalUseCount < FP_LOCK_SLOTS_PER_BACKEND) + { + uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode); + bool acquired; + + /* + * LWLockAcquire acts as a memory sequencing point, so it's safe to + * assume that any strong locker whose increment to + * FastPathStrongRelationLocks->counts becomes visible after we test + * it has yet to begin to transfer fast-path locks. + */ + LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE); + if (FastPathStrongRelationLocks->count[fasthashcode] != 0) + acquired = false; + else + acquired = FastPathGrantRelationLock(locktag->locktag_field2, + lockmode); + LWLockRelease(&MyProc->fpInfoLock); + if (acquired) + { + /* + * The locallock might contain stale pointers to some old shared + * objects; we MUST reset these to null before considering the + * lock to be acquired via fast-path. + */ + locallock->lock = NULL; + locallock->proclock = NULL; + GrantLockLocal(locallock, owner); + return LOCKACQUIRE_OK; + } + } + + /* + * If this lock could potentially have been taken via the fast-path by + * some other backend, we must (temporarily) disable further use of the + * fast-path for this lock tag, and migrate any locks already taken via + * this method to the main lock table. + */ + if (ConflictsWithRelationFastPath(locktag, lockmode)) + { + uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode); + + BeginStrongLockAcquire(locallock, fasthashcode); + if (!FastPathTransferRelationLocks(lockMethodTable, locktag, + hashcode)) + { + AbortStrongLockAcquire(); + if (locallock->nLocks == 0) + RemoveLocalLock(locallock); + if (locallockp) + *locallockp = NULL; + if (reportMemoryError) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"), + errhint("You might need to increase max_locks_per_transaction."))); + else + return LOCKACQUIRE_NOT_AVAIL; + } + } + + /* + * We didn't find the lock in our LOCALLOCK table, and we didn't manage to + * take it via the fast-path, either, so we've got to mess with the shared + * lock table. + */ + partitionLock = LockHashPartitionLock(hashcode); + + LWLockAcquire(partitionLock, LW_EXCLUSIVE); + + /* + * Find or create lock and proclock entries with this tag + * + * Note: if the locallock object already existed, it might have a pointer + * to the lock already ... but we should not assume that that pointer is + * valid, since a lock object with zero hold and request counts can go + * away anytime. So we have to use SetupLockInTable() to recompute the + * lock and proclock pointers, even if they're already set. + */ + proclock = SetupLockInTable(lockMethodTable, MyProc, locktag, + hashcode, lockmode); + if (!proclock) + { + AbortStrongLockAcquire(); + LWLockRelease(partitionLock); + if (locallock->nLocks == 0) + RemoveLocalLock(locallock); + if (locallockp) + *locallockp = NULL; + if (reportMemoryError) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"), + errhint("You might need to increase max_locks_per_transaction."))); + else + return LOCKACQUIRE_NOT_AVAIL; + } + locallock->proclock = proclock; + lock = proclock->tag.myLock; + locallock->lock = lock; + + /* + * If lock requested conflicts with locks requested by waiters, must join + * wait queue. Otherwise, check for conflict with already-held locks. + * (That's last because most complex check.) + */ + if (lockMethodTable->conflictTab[lockmode] & lock->waitMask) + found_conflict = true; + else + found_conflict = LockCheckConflicts(lockMethodTable, lockmode, + lock, proclock); + + if (!found_conflict) + { + /* No conflict with held or previously requested locks */ + GrantLock(lock, proclock, lockmode); + GrantLockLocal(locallock, owner); + } + else + { + /* + * We can't acquire the lock immediately. If caller specified no + * blocking, remove useless table entries and return + * LOCKACQUIRE_NOT_AVAIL without waiting. + */ + if (dontWait) + { + AbortStrongLockAcquire(); + if (proclock->holdMask == 0) + { + uint32 proclock_hashcode; + + proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode); + SHMQueueDelete(&proclock->lockLink); + SHMQueueDelete(&proclock->procLink); + if (!hash_search_with_hash_value(LockMethodProcLockHash, + (void *) &(proclock->tag), + proclock_hashcode, + HASH_REMOVE, + NULL)) + elog(PANIC, "proclock table corrupted"); + } + else + PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock); + lock->nRequested--; + lock->requested[lockmode]--; + LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode); + Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0)); + Assert(lock->nGranted <= lock->nRequested); + LWLockRelease(partitionLock); + if (locallock->nLocks == 0) + RemoveLocalLock(locallock); + if (locallockp) + *locallockp = NULL; + return LOCKACQUIRE_NOT_AVAIL; + } + + /* + * Set bitmask of locks this process already holds on this object. + */ + MyProc->heldLocks = proclock->holdMask; + + /* + * Sleep till someone wakes me up. + */ + + TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1, + locktag->locktag_field2, + locktag->locktag_field3, + locktag->locktag_field4, + locktag->locktag_type, + lockmode); + + WaitOnLock(locallock, owner); + + TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1, + locktag->locktag_field2, + locktag->locktag_field3, + locktag->locktag_field4, + locktag->locktag_type, + lockmode); + + /* + * NOTE: do not do any material change of state between here and + * return. All required changes in locktable state must have been + * done when the lock was granted to us --- see notes in WaitOnLock. + */ + + /* + * Check the proclock entry status, in case something in the ipc + * communication doesn't work correctly. + */ + if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) + { + AbortStrongLockAcquire(); + PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock); + LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); + /* Should we retry ? */ + LWLockRelease(partitionLock); + elog(ERROR, "LockAcquire failed"); + } + PROCLOCK_PRINT("LockAcquire: granted", proclock); + LOCK_PRINT("LockAcquire: granted", lock, lockmode); + } + + /* + * Lock state is fully up-to-date now; if we error out after this, no + * special error cleanup is required. + */ + FinishStrongLockAcquire(); + + LWLockRelease(partitionLock); + + /* + * Emit a WAL record if acquisition of this lock needs to be replayed in a + * standby server. + */ + if (log_lock) + { + /* + * Decode the locktag back to the original values, to avoid sending + * lots of empty bytes with every message. See lock.h to check how a + * locktag is defined for LOCKTAG_RELATION + */ + LogAccessExclusiveLock(locktag->locktag_field1, + locktag->locktag_field2); + } + + return LOCKACQUIRE_OK; +} + +/* + * Find or create LOCK and PROCLOCK objects as needed for a new lock + * request. + * + * Returns the PROCLOCK object, or NULL if we failed to create the objects + * for lack of shared memory. + * + * The appropriate partition lock must be held at entry, and will be + * held at exit. + */ +static PROCLOCK * +SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, + const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode) +{ + LOCK *lock; + PROCLOCK *proclock; + PROCLOCKTAG proclocktag; + uint32 proclock_hashcode; + bool found; + + /* + * Find or create a lock with this tag. + */ + lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, + (const void *) locktag, + hashcode, + HASH_ENTER_NULL, + &found); + if (!lock) + return NULL; + + /* + * if it's a new lock object, initialize it + */ + if (!found) + { + lock->grantMask = 0; + lock->waitMask = 0; + SHMQueueInit(&(lock->procLocks)); + ProcQueueInit(&(lock->waitProcs)); + lock->nRequested = 0; + lock->nGranted = 0; + MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES); + MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES); + LOCK_PRINT("LockAcquire: new", lock, lockmode); + } + else + { + LOCK_PRINT("LockAcquire: found", lock, lockmode); + Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0)); + Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0)); + Assert(lock->nGranted <= lock->nRequested); + } + + /* + * Create the hash key for the proclock table. + */ + proclocktag.myLock = lock; + proclocktag.myProc = proc; + + proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode); + + /* + * Find or create a proclock entry with this tag + */ + proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash, + (void *) &proclocktag, + proclock_hashcode, + HASH_ENTER_NULL, + &found); + if (!proclock) + { + /* Oops, not enough shmem for the proclock */ + if (lock->nRequested == 0) + { + /* + * There are no other requestors of this lock, so garbage-collect + * the lock object. We *must* do this to avoid a permanent leak + * of shared memory, because there won't be anything to cause + * anyone to release the lock object later. + */ + Assert(SHMQueueEmpty(&(lock->procLocks))); + if (!hash_search_with_hash_value(LockMethodLockHash, + (void *) &(lock->tag), + hashcode, + HASH_REMOVE, + NULL)) + elog(PANIC, "lock table corrupted"); + } + return NULL; + } + + /* + * If new, initialize the new entry + */ + if (!found) + { + uint32 partition = LockHashPartition(hashcode); + + /* + * It might seem unsafe to access proclock->groupLeader without a + * lock, but it's not really. Either we are initializing a proclock + * on our own behalf, in which case our group leader isn't changing + * because the group leader for a process can only ever be changed by + * the process itself; or else we are transferring a fast-path lock to + * the main lock table, in which case that process can't change it's + * lock group leader without first releasing all of its locks (and in + * particular the one we are currently transferring). + */ + proclock->groupLeader = proc->lockGroupLeader != NULL ? + proc->lockGroupLeader : proc; + proclock->holdMask = 0; + proclock->releaseMask = 0; + /* Add proclock to appropriate lists */ + SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); + SHMQueueInsertBefore(&(proc->myProcLocks[partition]), + &proclock->procLink); + PROCLOCK_PRINT("LockAcquire: new", proclock); + } + else + { + PROCLOCK_PRINT("LockAcquire: found", proclock); + Assert((proclock->holdMask & ~lock->grantMask) == 0); + +#ifdef CHECK_DEADLOCK_RISK + + /* + * Issue warning if we already hold a lower-level lock on this object + * and do not hold a lock of the requested level or higher. This + * indicates a deadlock-prone coding practice (eg, we'd have a + * deadlock if another backend were following the same code path at + * about the same time). + * + * This is not enabled by default, because it may generate log entries + * about user-level coding practices that are in fact safe in context. + * It can be enabled to help find system-level problems. + * + * XXX Doing numeric comparison on the lockmodes is a hack; it'd be + * better to use a table. For now, though, this works. + */ + { + int i; + + for (i = lockMethodTable->numLockModes; i > 0; i--) + { + if (proclock->holdMask & LOCKBIT_ON(i)) + { + if (i >= (int) lockmode) + break; /* safe: we have a lock >= req level */ + elog(LOG, "deadlock risk: raising lock level" + " from %s to %s on object %u/%u/%u", + lockMethodTable->lockModeNames[i], + lockMethodTable->lockModeNames[lockmode], + lock->tag.locktag_field1, lock->tag.locktag_field2, + lock->tag.locktag_field3); + break; + } + } + } +#endif /* CHECK_DEADLOCK_RISK */ + } + + /* + * lock->nRequested and lock->requested[] count the total number of + * requests, whether granted or waiting, so increment those immediately. + * The other counts don't increment till we get the lock. + */ + lock->nRequested++; + lock->requested[lockmode]++; + Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); + + /* + * We shouldn't already hold the desired lock; else locallock table is + * broken. + */ + if (proclock->holdMask & LOCKBIT_ON(lockmode)) + elog(ERROR, "lock %s on object %u/%u/%u is already held", + lockMethodTable->lockModeNames[lockmode], + lock->tag.locktag_field1, lock->tag.locktag_field2, + lock->tag.locktag_field3); + + return proclock; +} + +/* + * Check and set/reset the flag that we hold the relation extension/page lock. + * + * It is callers responsibility that this function is called after + * acquiring/releasing the relation extension/page lock. + * + * Pass acquired as true if lock is acquired, false otherwise. + */ +static inline void +CheckAndSetLockHeld(LOCALLOCK *locallock, bool acquired) +{ +#ifdef USE_ASSERT_CHECKING + if (LOCALLOCK_LOCKTAG(*locallock) == LOCKTAG_RELATION_EXTEND) + IsRelationExtensionLockHeld = acquired; + else if (LOCALLOCK_LOCKTAG(*locallock) == LOCKTAG_PAGE) + IsPageLockHeld = acquired; + +#endif +} + +/* + * Subroutine to free a locallock entry + */ +static void +RemoveLocalLock(LOCALLOCK *locallock) +{ + int i; + + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (locallock->lockOwners[i].owner != NULL) + ResourceOwnerForgetLock(locallock->lockOwners[i].owner, locallock); + } + locallock->numLockOwners = 0; + if (locallock->lockOwners != NULL) + pfree(locallock->lockOwners); + locallock->lockOwners = NULL; + + if (locallock->holdsStrongLockCount) + { + uint32 fasthashcode; + + fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode); + + SpinLockAcquire(&FastPathStrongRelationLocks->mutex); + Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0); + FastPathStrongRelationLocks->count[fasthashcode]--; + locallock->holdsStrongLockCount = false; + SpinLockRelease(&FastPathStrongRelationLocks->mutex); + } + + if (!hash_search(LockMethodLocalHash, + (void *) &(locallock->tag), + HASH_REMOVE, NULL)) + elog(WARNING, "locallock table corrupted"); + + /* + * Indicate that the lock is released for certain types of locks + */ + CheckAndSetLockHeld(locallock, false); +} + +/* + * LockCheckConflicts -- test whether requested lock conflicts + * with those already granted + * + * Returns true if conflict, false if no conflict. + * + * NOTES: + * Here's what makes this complicated: one process's locks don't + * conflict with one another, no matter what purpose they are held for + * (eg, session and transaction locks do not conflict). Nor do the locks + * of one process in a lock group conflict with those of another process in + * the same group. So, we must subtract off these locks when determining + * whether the requested new lock conflicts with those already held. + */ +bool +LockCheckConflicts(LockMethod lockMethodTable, + LOCKMODE lockmode, + LOCK *lock, + PROCLOCK *proclock) +{ + int numLockModes = lockMethodTable->numLockModes; + LOCKMASK myLocks; + int conflictMask = lockMethodTable->conflictTab[lockmode]; + int conflictsRemaining[MAX_LOCKMODES]; + int totalConflictsRemaining = 0; + int i; + SHM_QUEUE *procLocks; + PROCLOCK *otherproclock; + + /* + * first check for global conflicts: If no locks conflict with my request, + * then I get the lock. + * + * Checking for conflict: lock->grantMask represents the types of + * currently held locks. conflictTable[lockmode] has a bit set for each + * type of lock that conflicts with request. Bitwise compare tells if + * there is a conflict. + */ + if (!(conflictMask & lock->grantMask)) + { + PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock); + return false; + } + + /* + * Rats. Something conflicts. But it could still be my own lock, or a + * lock held by another member of my locking group. First, figure out how + * many conflicts remain after subtracting out any locks I hold myself. + */ + myLocks = proclock->holdMask; + for (i = 1; i <= numLockModes; i++) + { + if ((conflictMask & LOCKBIT_ON(i)) == 0) + { + conflictsRemaining[i] = 0; + continue; + } + conflictsRemaining[i] = lock->granted[i]; + if (myLocks & LOCKBIT_ON(i)) + --conflictsRemaining[i]; + totalConflictsRemaining += conflictsRemaining[i]; + } + + /* If no conflicts remain, we get the lock. */ + if (totalConflictsRemaining == 0) + { + PROCLOCK_PRINT("LockCheckConflicts: resolved (simple)", proclock); + return false; + } + + /* If no group locking, it's definitely a conflict. */ + if (proclock->groupLeader == MyProc && MyProc->lockGroupLeader == NULL) + { + Assert(proclock->tag.myProc == MyProc); + PROCLOCK_PRINT("LockCheckConflicts: conflicting (simple)", + proclock); + return true; + } + + /* + * The relation extension or page lock conflict even between the group + * members. + */ + if (LOCK_LOCKTAG(*lock) == LOCKTAG_RELATION_EXTEND || + (LOCK_LOCKTAG(*lock) == LOCKTAG_PAGE)) + { + PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", + proclock); + return true; + } + + /* + * Locks held in conflicting modes by members of our own lock group are + * not real conflicts; we can subtract those out and see if we still have + * a conflict. This is O(N) in the number of processes holding or + * awaiting locks on this object. We could improve that by making the + * shared memory state more complex (and larger) but it doesn't seem worth + * it. + */ + procLocks = &(lock->procLocks); + otherproclock = (PROCLOCK *) + SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink)); + while (otherproclock != NULL) + { + if (proclock != otherproclock && + proclock->groupLeader == otherproclock->groupLeader && + (otherproclock->holdMask & conflictMask) != 0) + { + int intersectMask = otherproclock->holdMask & conflictMask; + + for (i = 1; i <= numLockModes; i++) + { + if ((intersectMask & LOCKBIT_ON(i)) != 0) + { + if (conflictsRemaining[i] <= 0) + elog(PANIC, "proclocks held do not match lock"); + conflictsRemaining[i]--; + totalConflictsRemaining--; + } + } + + if (totalConflictsRemaining == 0) + { + PROCLOCK_PRINT("LockCheckConflicts: resolved (group)", + proclock); + return false; + } + } + otherproclock = (PROCLOCK *) + SHMQueueNext(procLocks, &otherproclock->lockLink, + offsetof(PROCLOCK, lockLink)); + } + + /* Nope, it's a real conflict. */ + PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", proclock); + return true; +} + +/* + * GrantLock -- update the lock and proclock data structures to show + * the lock request has been granted. + * + * NOTE: if proc was blocked, it also needs to be removed from the wait list + * and have its waitLock/waitProcLock fields cleared. That's not done here. + * + * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK + * table entry; but since we may be awaking some other process, we can't do + * that here; it's done by GrantLockLocal, instead. + */ +void +GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode) +{ + lock->nGranted++; + lock->granted[lockmode]++; + lock->grantMask |= LOCKBIT_ON(lockmode); + if (lock->granted[lockmode] == lock->requested[lockmode]) + lock->waitMask &= LOCKBIT_OFF(lockmode); + proclock->holdMask |= LOCKBIT_ON(lockmode); + LOCK_PRINT("GrantLock", lock, lockmode); + Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0)); + Assert(lock->nGranted <= lock->nRequested); +} + +/* + * UnGrantLock -- opposite of GrantLock. + * + * Updates the lock and proclock data structures to show that the lock + * is no longer held nor requested by the current holder. + * + * Returns true if there were any waiters waiting on the lock that + * should now be woken up with ProcLockWakeup. + */ +static bool +UnGrantLock(LOCK *lock, LOCKMODE lockmode, + PROCLOCK *proclock, LockMethod lockMethodTable) +{ + bool wakeupNeeded = false; + + Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); + Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0)); + Assert(lock->nGranted <= lock->nRequested); + + /* + * fix the general lock stats + */ + lock->nRequested--; + lock->requested[lockmode]--; + lock->nGranted--; + lock->granted[lockmode]--; + + if (lock->granted[lockmode] == 0) + { + /* change the conflict mask. No more of this lock type. */ + lock->grantMask &= LOCKBIT_OFF(lockmode); + } + + LOCK_PRINT("UnGrantLock: updated", lock, lockmode); + + /* + * We need only run ProcLockWakeup if the released lock conflicts with at + * least one of the lock types requested by waiter(s). Otherwise whatever + * conflict made them wait must still exist. NOTE: before MVCC, we could + * skip wakeup if lock->granted[lockmode] was still positive. But that's + * not true anymore, because the remaining granted locks might belong to + * some waiter, who could now be awakened because he doesn't conflict with + * his own locks. + */ + if (lockMethodTable->conflictTab[lockmode] & lock->waitMask) + wakeupNeeded = true; + + /* + * Now fix the per-proclock state. + */ + proclock->holdMask &= LOCKBIT_OFF(lockmode); + PROCLOCK_PRINT("UnGrantLock: updated", proclock); + + return wakeupNeeded; +} + +/* + * CleanUpLock -- clean up after releasing a lock. We garbage-collect the + * proclock and lock objects if possible, and call ProcLockWakeup if there + * are remaining requests and the caller says it's OK. (Normally, this + * should be called after UnGrantLock, and wakeupNeeded is the result from + * UnGrantLock.) + * + * The appropriate partition lock must be held at entry, and will be + * held at exit. + */ +static void +CleanUpLock(LOCK *lock, PROCLOCK *proclock, + LockMethod lockMethodTable, uint32 hashcode, + bool wakeupNeeded) +{ + /* + * If this was my last hold on this lock, delete my entry in the proclock + * table. + */ + if (proclock->holdMask == 0) + { + uint32 proclock_hashcode; + + PROCLOCK_PRINT("CleanUpLock: deleting", proclock); + SHMQueueDelete(&proclock->lockLink); + SHMQueueDelete(&proclock->procLink); + proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode); + if (!hash_search_with_hash_value(LockMethodProcLockHash, + (void *) &(proclock->tag), + proclock_hashcode, + HASH_REMOVE, + NULL)) + elog(PANIC, "proclock table corrupted"); + } + + if (lock->nRequested == 0) + { + /* + * The caller just released the last lock, so garbage-collect the lock + * object. + */ + LOCK_PRINT("CleanUpLock: deleting", lock, 0); + Assert(SHMQueueEmpty(&(lock->procLocks))); + if (!hash_search_with_hash_value(LockMethodLockHash, + (void *) &(lock->tag), + hashcode, + HASH_REMOVE, + NULL)) + elog(PANIC, "lock table corrupted"); + } + else if (wakeupNeeded) + { + /* There are waiters on this lock, so wake them up. */ + ProcLockWakeup(lockMethodTable, lock); + } +} + +/* + * GrantLockLocal -- update the locallock data structures to show + * the lock request has been granted. + * + * We expect that LockAcquire made sure there is room to add a new + * ResourceOwner entry. + */ +static void +GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner) +{ + LOCALLOCKOWNER *lockOwners = locallock->lockOwners; + int i; + + Assert(locallock->numLockOwners < locallock->maxLockOwners); + /* Count the total */ + locallock->nLocks++; + /* Count the per-owner lock */ + for (i = 0; i < locallock->numLockOwners; i++) + { + if (lockOwners[i].owner == owner) + { + lockOwners[i].nLocks++; + return; + } + } + lockOwners[i].owner = owner; + lockOwners[i].nLocks = 1; + locallock->numLockOwners++; + if (owner != NULL) + ResourceOwnerRememberLock(owner, locallock); + + /* Indicate that the lock is acquired for certain types of locks. */ + CheckAndSetLockHeld(locallock, true); +} + +/* + * BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK, + * and arrange for error cleanup if it fails + */ +static void +BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode) +{ + Assert(StrongLockInProgress == NULL); + Assert(locallock->holdsStrongLockCount == false); + + /* + * Adding to a memory location is not atomic, so we take a spinlock to + * ensure we don't collide with someone else trying to bump the count at + * the same time. + * + * XXX: It might be worth considering using an atomic fetch-and-add + * instruction here, on architectures where that is supported. + */ + + SpinLockAcquire(&FastPathStrongRelationLocks->mutex); + FastPathStrongRelationLocks->count[fasthashcode]++; + locallock->holdsStrongLockCount = true; + StrongLockInProgress = locallock; + SpinLockRelease(&FastPathStrongRelationLocks->mutex); +} + +/* + * FinishStrongLockAcquire - cancel pending cleanup for a strong lock + * acquisition once it's no longer needed + */ +static void +FinishStrongLockAcquire(void) +{ + StrongLockInProgress = NULL; +} + +/* + * AbortStrongLockAcquire - undo strong lock state changes performed by + * BeginStrongLockAcquire. + */ +void +AbortStrongLockAcquire(void) +{ + uint32 fasthashcode; + LOCALLOCK *locallock = StrongLockInProgress; + + if (locallock == NULL) + return; + + fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode); + Assert(locallock->holdsStrongLockCount == true); + SpinLockAcquire(&FastPathStrongRelationLocks->mutex); + Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0); + FastPathStrongRelationLocks->count[fasthashcode]--; + locallock->holdsStrongLockCount = false; + StrongLockInProgress = NULL; + SpinLockRelease(&FastPathStrongRelationLocks->mutex); +} + +/* + * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing + * WaitOnLock on. + * + * proc.c needs this for the case where we are booted off the lock by + * timeout, but discover that someone granted us the lock anyway. + * + * We could just export GrantLockLocal, but that would require including + * resowner.h in lock.h, which creates circularity. + */ +void +GrantAwaitedLock(void) +{ + GrantLockLocal(awaitedLock, awaitedOwner); +} + +/* + * MarkLockClear -- mark an acquired lock as "clear" + * + * This means that we know we have absorbed all sinval messages that other + * sessions generated before we acquired this lock, and so we can confidently + * assume we know about any catalog changes protected by this lock. + */ +void +MarkLockClear(LOCALLOCK *locallock) +{ + Assert(locallock->nLocks > 0); + locallock->lockCleared = true; +} + +/* + * WaitOnLock -- wait to acquire a lock + * + * Caller must have set MyProc->heldLocks to reflect locks already held + * on the lockable object by this process. + * + * The appropriate partition lock must be held at entry. + */ +static void +WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner) +{ + LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock); + LockMethod lockMethodTable = LockMethods[lockmethodid]; + char *volatile new_status = NULL; + + LOCK_PRINT("WaitOnLock: sleeping on lock", + locallock->lock, locallock->tag.mode); + + /* Report change to waiting status */ + if (update_process_title) + { + const char *old_status; + int len; + + old_status = get_ps_display(&len); + new_status = (char *) palloc(len + 8 + 1); + memcpy(new_status, old_status, len); + strcpy(new_status + len, " waiting"); + set_ps_display(new_status); + new_status[len] = '\0'; /* truncate off " waiting" */ + } + + awaitedLock = locallock; + awaitedOwner = owner; + + /* + * NOTE: Think not to put any shared-state cleanup after the call to + * ProcSleep, in either the normal or failure path. The lock state must + * be fully set by the lock grantor, or by CheckDeadLock if we give up + * waiting for the lock. This is necessary because of the possibility + * that a cancel/die interrupt will interrupt ProcSleep after someone else + * grants us the lock, but before we've noticed it. Hence, after granting, + * the locktable state must fully reflect the fact that we own the lock; + * we can't do additional work on return. + * + * We can and do use a PG_TRY block to try to clean up after failure, but + * this still has a major limitation: elog(FATAL) can occur while waiting + * (eg, a "die" interrupt), and then control won't come back here. So all + * cleanup of essential state should happen in LockErrorCleanup, not here. + * We can use PG_TRY to clear the "waiting" status flags, since doing that + * is unimportant if the process exits. + */ + PG_TRY(); + { + if (ProcSleep(locallock, lockMethodTable) != PROC_WAIT_STATUS_OK) + { + /* + * We failed as a result of a deadlock, see CheckDeadLock(). Quit + * now. + */ + awaitedLock = NULL; + LOCK_PRINT("WaitOnLock: aborting on lock", + locallock->lock, locallock->tag.mode); + LWLockRelease(LockHashPartitionLock(locallock->hashcode)); + + /* + * Now that we aren't holding the partition lock, we can give an + * error report including details about the detected deadlock. + */ + DeadLockReport(); + /* not reached */ + } + } + PG_CATCH(); + { + /* In this path, awaitedLock remains set until LockErrorCleanup */ + + /* Report change to non-waiting status */ + if (update_process_title) + { + set_ps_display(new_status); + pfree(new_status); + } + + /* and propagate the error */ + PG_RE_THROW(); + } + PG_END_TRY(); + + awaitedLock = NULL; + + /* Report change to non-waiting status */ + if (update_process_title) + { + set_ps_display(new_status); + pfree(new_status); + } + + LOCK_PRINT("WaitOnLock: wakeup on lock", + locallock->lock, locallock->tag.mode); +} + +/* + * Remove a proc from the wait-queue it is on (caller must know it is on one). + * This is only used when the proc has failed to get the lock, so we set its + * waitStatus to PROC_WAIT_STATUS_ERROR. + * + * Appropriate partition lock must be held by caller. Also, caller is + * responsible for signaling the proc if needed. + * + * NB: this does not clean up any locallock object that may exist for the lock. + */ +void +RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode) +{ + LOCK *waitLock = proc->waitLock; + PROCLOCK *proclock = proc->waitProcLock; + LOCKMODE lockmode = proc->waitLockMode; + LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock); + + /* Make sure proc is waiting */ + Assert(proc->waitStatus == PROC_WAIT_STATUS_WAITING); + Assert(proc->links.next != NULL); + Assert(waitLock); + Assert(waitLock->waitProcs.size > 0); + Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods)); + + /* Remove proc from lock's wait queue */ + SHMQueueDelete(&(proc->links)); + waitLock->waitProcs.size--; + + /* Undo increments of request counts by waiting process */ + Assert(waitLock->nRequested > 0); + Assert(waitLock->nRequested > proc->waitLock->nGranted); + waitLock->nRequested--; + Assert(waitLock->requested[lockmode] > 0); + waitLock->requested[lockmode]--; + /* don't forget to clear waitMask bit if appropriate */ + if (waitLock->granted[lockmode] == waitLock->requested[lockmode]) + waitLock->waitMask &= LOCKBIT_OFF(lockmode); + + /* Clean up the proc's own state, and pass it the ok/fail signal */ + proc->waitLock = NULL; + proc->waitProcLock = NULL; + proc->waitStatus = PROC_WAIT_STATUS_ERROR; + + /* + * Delete the proclock immediately if it represents no already-held locks. + * (This must happen now because if the owner of the lock decides to + * release it, and the requested/granted counts then go to zero, + * LockRelease expects there to be no remaining proclocks.) Then see if + * any other waiters for the lock can be woken up now. + */ + CleanUpLock(waitLock, proclock, + LockMethods[lockmethodid], hashcode, + true); +} + +/* + * LockRelease -- look up 'locktag' and release one 'lockmode' lock on it. + * Release a session lock if 'sessionLock' is true, else release a + * regular transaction lock. + * + * Side Effects: find any waiting processes that are now wakable, + * grant them their requested locks and awaken them. + * (We have to grant the lock here to avoid a race between + * the waking process and any new process to + * come along and request the lock.) + */ +bool +LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) +{ + LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; + LockMethod lockMethodTable; + LOCALLOCKTAG localtag; + LOCALLOCK *locallock; + LOCK *lock; + PROCLOCK *proclock; + LWLock *partitionLock; + bool wakeupNeeded; + + if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + lockMethodTable = LockMethods[lockmethodid]; + if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes) + elog(ERROR, "unrecognized lock mode: %d", lockmode); + +#ifdef LOCK_DEBUG + if (LOCK_DEBUG_ENABLED(locktag)) + elog(LOG, "LockRelease: lock [%u,%u] %s", + locktag->locktag_field1, locktag->locktag_field2, + lockMethodTable->lockModeNames[lockmode]); +#endif + + /* + * Find the LOCALLOCK entry for this lock and lockmode + */ + MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ + localtag.lock = *locktag; + localtag.mode = lockmode; + + locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash, + (void *) &localtag, + HASH_FIND, NULL); + + /* + * let the caller print its own error message, too. Do not ereport(ERROR). + */ + if (!locallock || locallock->nLocks <= 0) + { + elog(WARNING, "you don't own a lock of type %s", + lockMethodTable->lockModeNames[lockmode]); + return false; + } + + /* + * Decrease the count for the resource owner. + */ + { + LOCALLOCKOWNER *lockOwners = locallock->lockOwners; + ResourceOwner owner; + int i; + + /* Identify owner for lock */ + if (sessionLock) + owner = NULL; + else + owner = CurrentResourceOwner; + + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (lockOwners[i].owner == owner) + { + Assert(lockOwners[i].nLocks > 0); + if (--lockOwners[i].nLocks == 0) + { + if (owner != NULL) + ResourceOwnerForgetLock(owner, locallock); + /* compact out unused slot */ + locallock->numLockOwners--; + if (i < locallock->numLockOwners) + lockOwners[i] = lockOwners[locallock->numLockOwners]; + } + break; + } + } + if (i < 0) + { + /* don't release a lock belonging to another owner */ + elog(WARNING, "you don't own a lock of type %s", + lockMethodTable->lockModeNames[lockmode]); + return false; + } + } + + /* + * Decrease the total local count. If we're still holding the lock, we're + * done. + */ + locallock->nLocks--; + + if (locallock->nLocks > 0) + return true; + + /* + * At this point we can no longer suppose we are clear of invalidation + * messages related to this lock. Although we'll delete the LOCALLOCK + * object before any intentional return from this routine, it seems worth + * the trouble to explicitly reset lockCleared right now, just in case + * some error prevents us from deleting the LOCALLOCK. + */ + locallock->lockCleared = false; + + /* Attempt fast release of any lock eligible for the fast path. */ + if (EligibleForRelationFastPath(locktag, lockmode) && + FastPathLocalUseCount > 0) + { + bool released; + + /* + * We might not find the lock here, even if we originally entered it + * here. Another backend may have moved it to the main table. + */ + LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE); + released = FastPathUnGrantRelationLock(locktag->locktag_field2, + lockmode); + LWLockRelease(&MyProc->fpInfoLock); + if (released) + { + RemoveLocalLock(locallock); + return true; + } + } + + /* + * Otherwise we've got to mess with the shared lock table. + */ + partitionLock = LockHashPartitionLock(locallock->hashcode); + + LWLockAcquire(partitionLock, LW_EXCLUSIVE); + + /* + * Normally, we don't need to re-find the lock or proclock, since we kept + * their addresses in the locallock table, and they couldn't have been + * removed while we were holding a lock on them. But it's possible that + * the lock was taken fast-path and has since been moved to the main hash + * table by another backend, in which case we will need to look up the + * objects here. We assume the lock field is NULL if so. + */ + lock = locallock->lock; + if (!lock) + { + PROCLOCKTAG proclocktag; + + Assert(EligibleForRelationFastPath(locktag, lockmode)); + lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, + (const void *) locktag, + locallock->hashcode, + HASH_FIND, + NULL); + if (!lock) + elog(ERROR, "failed to re-find shared lock object"); + locallock->lock = lock; + + proclocktag.myLock = lock; + proclocktag.myProc = MyProc; + locallock->proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash, + (void *) &proclocktag, + HASH_FIND, + NULL); + if (!locallock->proclock) + elog(ERROR, "failed to re-find shared proclock object"); + } + LOCK_PRINT("LockRelease: found", lock, lockmode); + proclock = locallock->proclock; + PROCLOCK_PRINT("LockRelease: found", proclock); + + /* + * Double-check that we are actually holding a lock of the type we want to + * release. + */ + if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) + { + PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock); + LWLockRelease(partitionLock); + elog(WARNING, "you don't own a lock of type %s", + lockMethodTable->lockModeNames[lockmode]); + RemoveLocalLock(locallock); + return false; + } + + /* + * Do the releasing. CleanUpLock will waken any now-wakable waiters. + */ + wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable); + + CleanUpLock(lock, proclock, + lockMethodTable, locallock->hashcode, + wakeupNeeded); + + LWLockRelease(partitionLock); + + RemoveLocalLock(locallock); + return true; +} + +/* + * LockReleaseAll -- Release all locks of the specified lock method that + * are held by the current process. + * + * Well, not necessarily *all* locks. The available behaviors are: + * allLocks == true: release all locks including session locks. + * allLocks == false: release all non-session locks. + */ +void +LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) +{ + HASH_SEQ_STATUS status; + LockMethod lockMethodTable; + int i, + numLockModes; + LOCALLOCK *locallock; + LOCK *lock; + PROCLOCK *proclock; + int partition; + bool have_fast_path_lwlock = false; + + if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + lockMethodTable = LockMethods[lockmethodid]; + +#ifdef LOCK_DEBUG + if (*(lockMethodTable->trace_flag)) + elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid); +#endif + + /* + * Get rid of our fast-path VXID lock, if appropriate. Note that this is + * the only way that the lock we hold on our own VXID can ever get + * released: it is always and only released when a toplevel transaction + * ends. + */ + if (lockmethodid == DEFAULT_LOCKMETHOD) + VirtualXactLockTableCleanup(); + + numLockModes = lockMethodTable->numLockModes; + + /* + * First we run through the locallock table and get rid of unwanted + * entries, then we scan the process's proclocks and get rid of those. We + * do this separately because we may have multiple locallock entries + * pointing to the same proclock, and we daren't end up with any dangling + * pointers. Fast-path locks are cleaned up during the locallock table + * scan, though. + */ + hash_seq_init(&status, LockMethodLocalHash); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + /* + * If the LOCALLOCK entry is unused, we must've run out of shared + * memory while trying to set up this lock. Just forget the local + * entry. + */ + if (locallock->nLocks == 0) + { + RemoveLocalLock(locallock); + continue; + } + + /* Ignore items that are not of the lockmethod to be removed */ + if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid) + continue; + + /* + * If we are asked to release all locks, we can just zap the entry. + * Otherwise, must scan to see if there are session locks. We assume + * there is at most one lockOwners entry for session locks. + */ + if (!allLocks) + { + LOCALLOCKOWNER *lockOwners = locallock->lockOwners; + + /* If session lock is above array position 0, move it down to 0 */ + for (i = 0; i < locallock->numLockOwners; i++) + { + if (lockOwners[i].owner == NULL) + lockOwners[0] = lockOwners[i]; + else + ResourceOwnerForgetLock(lockOwners[i].owner, locallock); + } + + if (locallock->numLockOwners > 0 && + lockOwners[0].owner == NULL && + lockOwners[0].nLocks > 0) + { + /* Fix the locallock to show just the session locks */ + locallock->nLocks = lockOwners[0].nLocks; + locallock->numLockOwners = 1; + /* We aren't deleting this locallock, so done */ + continue; + } + else + locallock->numLockOwners = 0; + } + + /* + * If the lock or proclock pointers are NULL, this lock was taken via + * the relation fast-path (and is not known to have been transferred). + */ + if (locallock->proclock == NULL || locallock->lock == NULL) + { + LOCKMODE lockmode = locallock->tag.mode; + Oid relid; + + /* Verify that a fast-path lock is what we've got. */ + if (!EligibleForRelationFastPath(&locallock->tag.lock, lockmode)) + elog(PANIC, "locallock table corrupted"); + + /* + * If we don't currently hold the LWLock that protects our + * fast-path data structures, we must acquire it before attempting + * to release the lock via the fast-path. We will continue to + * hold the LWLock until we're done scanning the locallock table, + * unless we hit a transferred fast-path lock. (XXX is this + * really such a good idea? There could be a lot of entries ...) + */ + if (!have_fast_path_lwlock) + { + LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE); + have_fast_path_lwlock = true; + } + + /* Attempt fast-path release. */ + relid = locallock->tag.lock.locktag_field2; + if (FastPathUnGrantRelationLock(relid, lockmode)) + { + RemoveLocalLock(locallock); + continue; + } + + /* + * Our lock, originally taken via the fast path, has been + * transferred to the main lock table. That's going to require + * some extra work, so release our fast-path lock before starting. + */ + LWLockRelease(&MyProc->fpInfoLock); + have_fast_path_lwlock = false; + + /* + * Now dump the lock. We haven't got a pointer to the LOCK or + * PROCLOCK in this case, so we have to handle this a bit + * differently than a normal lock release. Unfortunately, this + * requires an extra LWLock acquire-and-release cycle on the + * partitionLock, but hopefully it shouldn't happen often. + */ + LockRefindAndRelease(lockMethodTable, MyProc, + &locallock->tag.lock, lockmode, false); + RemoveLocalLock(locallock); + continue; + } + + /* Mark the proclock to show we need to release this lockmode */ + if (locallock->nLocks > 0) + locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode); + + /* And remove the locallock hashtable entry */ + RemoveLocalLock(locallock); + } + + /* Done with the fast-path data structures */ + if (have_fast_path_lwlock) + LWLockRelease(&MyProc->fpInfoLock); + + /* + * Now, scan each lock partition separately. + */ + for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) + { + LWLock *partitionLock; + SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); + PROCLOCK *nextplock; + + partitionLock = LockHashPartitionLockByIndex(partition); + + /* + * If the proclock list for this partition is empty, we can skip + * acquiring the partition lock. This optimization is trickier than + * it looks, because another backend could be in process of adding + * something to our proclock list due to promoting one of our + * fast-path locks. However, any such lock must be one that we + * decided not to delete above, so it's okay to skip it again now; + * we'd just decide not to delete it again. We must, however, be + * careful to re-fetch the list header once we've acquired the + * partition lock, to be sure we have a valid, up-to-date pointer. + * (There is probably no significant risk if pointer fetch/store is + * atomic, but we don't wish to assume that.) + * + * XXX This argument assumes that the locallock table correctly + * represents all of our fast-path locks. While allLocks mode + * guarantees to clean up all of our normal locks regardless of the + * locallock situation, we lose that guarantee for fast-path locks. + * This is not ideal. + */ + if (SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)) == NULL) + continue; /* needn't examine this partition */ + + LWLockAcquire(partitionLock, LW_EXCLUSIVE); + + for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)); + proclock; + proclock = nextplock) + { + bool wakeupNeeded = false; + + /* Get link first, since we may unlink/delete this proclock */ + nextplock = (PROCLOCK *) + SHMQueueNext(procLocks, &proclock->procLink, + offsetof(PROCLOCK, procLink)); + + Assert(proclock->tag.myProc == MyProc); + + lock = proclock->tag.myLock; + + /* Ignore items that are not of the lockmethod to be removed */ + if (LOCK_LOCKMETHOD(*lock) != lockmethodid) + continue; + + /* + * In allLocks mode, force release of all locks even if locallock + * table had problems + */ + if (allLocks) + proclock->releaseMask = proclock->holdMask; + else + Assert((proclock->releaseMask & ~proclock->holdMask) == 0); + + /* + * Ignore items that have nothing to be released, unless they have + * holdMask == 0 and are therefore recyclable + */ + if (proclock->releaseMask == 0 && proclock->holdMask != 0) + continue; + + PROCLOCK_PRINT("LockReleaseAll", proclock); + LOCK_PRINT("LockReleaseAll", lock, 0); + Assert(lock->nRequested >= 0); + Assert(lock->nGranted >= 0); + Assert(lock->nGranted <= lock->nRequested); + Assert((proclock->holdMask & ~lock->grantMask) == 0); + + /* + * Release the previously-marked lock modes + */ + for (i = 1; i <= numLockModes; i++) + { + if (proclock->releaseMask & LOCKBIT_ON(i)) + wakeupNeeded |= UnGrantLock(lock, i, proclock, + lockMethodTable); + } + Assert((lock->nRequested >= 0) && (lock->nGranted >= 0)); + Assert(lock->nGranted <= lock->nRequested); + LOCK_PRINT("LockReleaseAll: updated", lock, 0); + + proclock->releaseMask = 0; + + /* CleanUpLock will wake up waiters if needed. */ + CleanUpLock(lock, proclock, + lockMethodTable, + LockTagHashCode(&lock->tag), + wakeupNeeded); + } /* loop over PROCLOCKs within this partition */ + + LWLockRelease(partitionLock); + } /* loop over partitions */ + +#ifdef LOCK_DEBUG + if (*(lockMethodTable->trace_flag)) + elog(LOG, "LockReleaseAll done"); +#endif +} + +/* + * LockReleaseSession -- Release all session locks of the specified lock method + * that are held by the current process. + */ +void +LockReleaseSession(LOCKMETHODID lockmethodid) +{ + HASH_SEQ_STATUS status; + LOCALLOCK *locallock; + + if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + + hash_seq_init(&status, LockMethodLocalHash); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + /* Ignore items that are not of the specified lock method */ + if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid) + continue; + + ReleaseLockIfHeld(locallock, true); + } +} + +/* + * LockReleaseCurrentOwner + * Release all locks belonging to CurrentResourceOwner + * + * If the caller knows what those locks are, it can pass them as an array. + * That speeds up the call significantly, when a lot of locks are held. + * Otherwise, pass NULL for locallocks, and we'll traverse through our hash + * table to find them. + */ +void +LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks) +{ + if (locallocks == NULL) + { + HASH_SEQ_STATUS status; + LOCALLOCK *locallock; + + hash_seq_init(&status, LockMethodLocalHash); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + ReleaseLockIfHeld(locallock, false); + } + else + { + int i; + + for (i = nlocks - 1; i >= 0; i--) + ReleaseLockIfHeld(locallocks[i], false); + } +} + +/* + * ReleaseLockIfHeld + * Release any session-level locks on this lockable object if sessionLock + * is true; else, release any locks held by CurrentResourceOwner. + * + * It is tempting to pass this a ResourceOwner pointer (or NULL for session + * locks), but without refactoring LockRelease() we cannot support releasing + * locks belonging to resource owners other than CurrentResourceOwner. + * If we were to refactor, it'd be a good idea to fix it so we don't have to + * do a hashtable lookup of the locallock, too. However, currently this + * function isn't used heavily enough to justify refactoring for its + * convenience. + */ +static void +ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock) +{ + ResourceOwner owner; + LOCALLOCKOWNER *lockOwners; + int i; + + /* Identify owner for lock (must match LockRelease!) */ + if (sessionLock) + owner = NULL; + else + owner = CurrentResourceOwner; + + /* Scan to see if there are any locks belonging to the target owner */ + lockOwners = locallock->lockOwners; + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (lockOwners[i].owner == owner) + { + Assert(lockOwners[i].nLocks > 0); + if (lockOwners[i].nLocks < locallock->nLocks) + { + /* + * We will still hold this lock after forgetting this + * ResourceOwner. + */ + locallock->nLocks -= lockOwners[i].nLocks; + /* compact out unused slot */ + locallock->numLockOwners--; + if (owner != NULL) + ResourceOwnerForgetLock(owner, locallock); + if (i < locallock->numLockOwners) + lockOwners[i] = lockOwners[locallock->numLockOwners]; + } + else + { + Assert(lockOwners[i].nLocks == locallock->nLocks); + /* We want to call LockRelease just once */ + lockOwners[i].nLocks = 1; + locallock->nLocks = 1; + if (!LockRelease(&locallock->tag.lock, + locallock->tag.mode, + sessionLock)) + elog(WARNING, "ReleaseLockIfHeld: failed??"); + } + break; + } + } +} + +/* + * LockReassignCurrentOwner + * Reassign all locks belonging to CurrentResourceOwner to belong + * to its parent resource owner. + * + * If the caller knows what those locks are, it can pass them as an array. + * That speeds up the call significantly, when a lot of locks are held + * (e.g pg_dump with a large schema). Otherwise, pass NULL for locallocks, + * and we'll traverse through our hash table to find them. + */ +void +LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks) +{ + ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner); + + Assert(parent != NULL); + + if (locallocks == NULL) + { + HASH_SEQ_STATUS status; + LOCALLOCK *locallock; + + hash_seq_init(&status, LockMethodLocalHash); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + LockReassignOwner(locallock, parent); + } + else + { + int i; + + for (i = nlocks - 1; i >= 0; i--) + LockReassignOwner(locallocks[i], parent); + } +} + +/* + * Subroutine of LockReassignCurrentOwner. Reassigns a given lock belonging to + * CurrentResourceOwner to its parent. + */ +static void +LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent) +{ + LOCALLOCKOWNER *lockOwners; + int i; + int ic = -1; + int ip = -1; + + /* + * Scan to see if there are any locks belonging to current owner or its + * parent + */ + lockOwners = locallock->lockOwners; + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (lockOwners[i].owner == CurrentResourceOwner) + ic = i; + else if (lockOwners[i].owner == parent) + ip = i; + } + + if (ic < 0) + return; /* no current locks */ + + if (ip < 0) + { + /* Parent has no slot, so just give it the child's slot */ + lockOwners[ic].owner = parent; + ResourceOwnerRememberLock(parent, locallock); + } + else + { + /* Merge child's count with parent's */ + lockOwners[ip].nLocks += lockOwners[ic].nLocks; + /* compact out unused slot */ + locallock->numLockOwners--; + if (ic < locallock->numLockOwners) + lockOwners[ic] = lockOwners[locallock->numLockOwners]; + } + ResourceOwnerForgetLock(CurrentResourceOwner, locallock); +} + +/* + * FastPathGrantRelationLock + * Grant lock using per-backend fast-path array, if there is space. + */ +static bool +FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode) +{ + uint32 f; + uint32 unused_slot = FP_LOCK_SLOTS_PER_BACKEND; + + /* Scan for existing entry for this relid, remembering empty slot. */ + for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++) + { + if (FAST_PATH_GET_BITS(MyProc, f) == 0) + unused_slot = f; + else if (MyProc->fpRelId[f] == relid) + { + Assert(!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode)); + FAST_PATH_SET_LOCKMODE(MyProc, f, lockmode); + return true; + } + } + + /* If no existing entry, use any empty slot. */ + if (unused_slot < FP_LOCK_SLOTS_PER_BACKEND) + { + MyProc->fpRelId[unused_slot] = relid; + FAST_PATH_SET_LOCKMODE(MyProc, unused_slot, lockmode); + ++FastPathLocalUseCount; + return true; + } + + /* No existing entry, and no empty slot. */ + return false; +} + +/* + * FastPathUnGrantRelationLock + * Release fast-path lock, if present. Update backend-private local + * use count, while we're at it. + */ +static bool +FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode) +{ + uint32 f; + bool result = false; + + FastPathLocalUseCount = 0; + for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++) + { + if (MyProc->fpRelId[f] == relid + && FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode)) + { + Assert(!result); + FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode); + result = true; + /* we continue iterating so as to update FastPathLocalUseCount */ + } + if (FAST_PATH_GET_BITS(MyProc, f) != 0) + ++FastPathLocalUseCount; + } + return result; +} + +/* + * FastPathTransferRelationLocks + * Transfer locks matching the given lock tag from per-backend fast-path + * arrays to the shared hash table. + * + * Returns true if successful, false if ran out of shared memory. + */ +static bool +FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag, + uint32 hashcode) +{ + LWLock *partitionLock = LockHashPartitionLock(hashcode); + Oid relid = locktag->locktag_field2; + uint32 i; + + /* + * Every PGPROC that can potentially hold a fast-path lock is present in + * ProcGlobal->allProcs. Prepared transactions are not, but any + * outstanding fast-path locks held by prepared transactions are + * transferred to the main lock table. + */ + for (i = 0; i < ProcGlobal->allProcCount; i++) + { + PGPROC *proc = &ProcGlobal->allProcs[i]; + uint32 f; + + LWLockAcquire(&proc->fpInfoLock, LW_EXCLUSIVE); + + /* + * If the target backend isn't referencing the same database as the + * lock, then we needn't examine the individual relation IDs at all; + * none of them can be relevant. + * + * proc->databaseId is set at backend startup time and never changes + * thereafter, so it might be safe to perform this test before + * acquiring &proc->fpInfoLock. In particular, it's certainly safe to + * assume that if the target backend holds any fast-path locks, it + * must have performed a memory-fencing operation (in particular, an + * LWLock acquisition) since setting proc->databaseId. However, it's + * less clear that our backend is certain to have performed a memory + * fencing operation since the other backend set proc->databaseId. So + * for now, we test it after acquiring the LWLock just to be safe. + */ + if (proc->databaseId != locktag->locktag_field1) + { + LWLockRelease(&proc->fpInfoLock); + continue; + } + + for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++) + { + uint32 lockmode; + + /* Look for an allocated slot matching the given relid. */ + if (relid != proc->fpRelId[f] || FAST_PATH_GET_BITS(proc, f) == 0) + continue; + + /* Find or create lock object. */ + LWLockAcquire(partitionLock, LW_EXCLUSIVE); + for (lockmode = FAST_PATH_LOCKNUMBER_OFFSET; + lockmode < FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT; + ++lockmode) + { + PROCLOCK *proclock; + + if (!FAST_PATH_CHECK_LOCKMODE(proc, f, lockmode)) + continue; + proclock = SetupLockInTable(lockMethodTable, proc, locktag, + hashcode, lockmode); + if (!proclock) + { + LWLockRelease(partitionLock); + LWLockRelease(&proc->fpInfoLock); + return false; + } + GrantLock(proclock->tag.myLock, proclock, lockmode); + FAST_PATH_CLEAR_LOCKMODE(proc, f, lockmode); + } + LWLockRelease(partitionLock); + + /* No need to examine remaining slots. */ + break; + } + LWLockRelease(&proc->fpInfoLock); + } + return true; +} + +/* + * FastPathGetRelationLockEntry + * Return the PROCLOCK for a lock originally taken via the fast-path, + * transferring it to the primary lock table if necessary. + * + * Note: caller takes care of updating the locallock object. + */ +static PROCLOCK * +FastPathGetRelationLockEntry(LOCALLOCK *locallock) +{ + LockMethod lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD]; + LOCKTAG *locktag = &locallock->tag.lock; + PROCLOCK *proclock = NULL; + LWLock *partitionLock = LockHashPartitionLock(locallock->hashcode); + Oid relid = locktag->locktag_field2; + uint32 f; + + LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE); + + for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++) + { + uint32 lockmode; + + /* Look for an allocated slot matching the given relid. */ + if (relid != MyProc->fpRelId[f] || FAST_PATH_GET_BITS(MyProc, f) == 0) + continue; + + /* If we don't have a lock of the given mode, forget it! */ + lockmode = locallock->tag.mode; + if (!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode)) + break; + + /* Find or create lock object. */ + LWLockAcquire(partitionLock, LW_EXCLUSIVE); + + proclock = SetupLockInTable(lockMethodTable, MyProc, locktag, + locallock->hashcode, lockmode); + if (!proclock) + { + LWLockRelease(partitionLock); + LWLockRelease(&MyProc->fpInfoLock); + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"), + errhint("You might need to increase max_locks_per_transaction."))); + } + GrantLock(proclock->tag.myLock, proclock, lockmode); + FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode); + + LWLockRelease(partitionLock); + + /* No need to examine remaining slots. */ + break; + } + + LWLockRelease(&MyProc->fpInfoLock); + + /* Lock may have already been transferred by some other backend. */ + if (proclock == NULL) + { + LOCK *lock; + PROCLOCKTAG proclocktag; + uint32 proclock_hashcode; + + LWLockAcquire(partitionLock, LW_SHARED); + + lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, + (void *) locktag, + locallock->hashcode, + HASH_FIND, + NULL); + if (!lock) + elog(ERROR, "failed to re-find shared lock object"); + + proclocktag.myLock = lock; + proclocktag.myProc = MyProc; + + proclock_hashcode = ProcLockHashCode(&proclocktag, locallock->hashcode); + proclock = (PROCLOCK *) + hash_search_with_hash_value(LockMethodProcLockHash, + (void *) &proclocktag, + proclock_hashcode, + HASH_FIND, + NULL); + if (!proclock) + elog(ERROR, "failed to re-find shared proclock object"); + LWLockRelease(partitionLock); + } + + return proclock; +} + +/* + * GetLockConflicts + * Get an array of VirtualTransactionIds of xacts currently holding locks + * that would conflict with the specified lock/lockmode. + * xacts merely awaiting such a lock are NOT reported. + * + * The result array is palloc'd and is terminated with an invalid VXID. + * *countp, if not null, is updated to the number of items set. + * + * Of course, the result could be out of date by the time it's returned, so + * use of this function has to be thought about carefully. Similarly, a + * PGPROC with no "lxid" will be considered non-conflicting regardless of any + * lock it holds. Existing callers don't care about a locker after that + * locker's pg_xact updates complete. CommitTransaction() clears "lxid" after + * pg_xact updates and before releasing locks. + * + * Note we never include the current xact's vxid in the result array, + * since an xact never blocks itself. + */ +VirtualTransactionId * +GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp) +{ + static VirtualTransactionId *vxids; + LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; + LockMethod lockMethodTable; + LOCK *lock; + LOCKMASK conflictMask; + SHM_QUEUE *procLocks; + PROCLOCK *proclock; + uint32 hashcode; + LWLock *partitionLock; + int count = 0; + int fast_count = 0; + + if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + lockMethodTable = LockMethods[lockmethodid]; + if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes) + elog(ERROR, "unrecognized lock mode: %d", lockmode); + + /* + * Allocate memory to store results, and fill with InvalidVXID. We only + * need enough space for MaxBackends + max_prepared_xacts + a terminator. + * InHotStandby allocate once in TopMemoryContext. + */ + if (InHotStandby) + { + if (vxids == NULL) + vxids = (VirtualTransactionId *) + MemoryContextAlloc(TopMemoryContext, + sizeof(VirtualTransactionId) * + (MaxBackends + max_prepared_xacts + 1)); + } + else + vxids = (VirtualTransactionId *) + palloc0(sizeof(VirtualTransactionId) * + (MaxBackends + max_prepared_xacts + 1)); + + /* Compute hash code and partition lock, and look up conflicting modes. */ + hashcode = LockTagHashCode(locktag); + partitionLock = LockHashPartitionLock(hashcode); + conflictMask = lockMethodTable->conflictTab[lockmode]; + + /* + * Fast path locks might not have been entered in the primary lock table. + * If the lock we're dealing with could conflict with such a lock, we must + * examine each backend's fast-path array for conflicts. + */ + if (ConflictsWithRelationFastPath(locktag, lockmode)) + { + int i; + Oid relid = locktag->locktag_field2; + VirtualTransactionId vxid; + + /* + * Iterate over relevant PGPROCs. Anything held by a prepared + * transaction will have been transferred to the primary lock table, + * so we need not worry about those. This is all a bit fuzzy, because + * new locks could be taken after we've visited a particular + * partition, but the callers had better be prepared to deal with that + * anyway, since the locks could equally well be taken between the + * time we return the value and the time the caller does something + * with it. + */ + for (i = 0; i < ProcGlobal->allProcCount; i++) + { + PGPROC *proc = &ProcGlobal->allProcs[i]; + uint32 f; + + /* A backend never blocks itself */ + if (proc == MyProc) + continue; + + LWLockAcquire(&proc->fpInfoLock, LW_SHARED); + + /* + * If the target backend isn't referencing the same database as + * the lock, then we needn't examine the individual relation IDs + * at all; none of them can be relevant. + * + * See FastPathTransferRelationLocks() for discussion of why we do + * this test after acquiring the lock. + */ + if (proc->databaseId != locktag->locktag_field1) + { + LWLockRelease(&proc->fpInfoLock); + continue; + } + + for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++) + { + uint32 lockmask; + + /* Look for an allocated slot matching the given relid. */ + if (relid != proc->fpRelId[f]) + continue; + lockmask = FAST_PATH_GET_BITS(proc, f); + if (!lockmask) + continue; + lockmask <<= FAST_PATH_LOCKNUMBER_OFFSET; + + /* + * There can only be one entry per relation, so if we found it + * and it doesn't conflict, we can skip the rest of the slots. + */ + if ((lockmask & conflictMask) == 0) + break; + + /* Conflict! */ + GET_VXID_FROM_PGPROC(vxid, *proc); + + if (VirtualTransactionIdIsValid(vxid)) + vxids[count++] = vxid; + /* else, xact already committed or aborted */ + + /* No need to examine remaining slots. */ + break; + } + + LWLockRelease(&proc->fpInfoLock); + } + } + + /* Remember how many fast-path conflicts we found. */ + fast_count = count; + + /* + * Look up the lock object matching the tag. + */ + LWLockAcquire(partitionLock, LW_SHARED); + + lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, + (const void *) locktag, + hashcode, + HASH_FIND, + NULL); + if (!lock) + { + /* + * If the lock object doesn't exist, there is nothing holding a lock + * on this lockable object. + */ + LWLockRelease(partitionLock); + vxids[count].backendId = InvalidBackendId; + vxids[count].localTransactionId = InvalidLocalTransactionId; + if (countp) + *countp = count; + return vxids; + } + + /* + * Examine each existing holder (or awaiter) of the lock. + */ + + procLocks = &(lock->procLocks); + + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, lockLink)); + + while (proclock) + { + if (conflictMask & proclock->holdMask) + { + PGPROC *proc = proclock->tag.myProc; + + /* A backend never blocks itself */ + if (proc != MyProc) + { + VirtualTransactionId vxid; + + GET_VXID_FROM_PGPROC(vxid, *proc); + + if (VirtualTransactionIdIsValid(vxid)) + { + int i; + + /* Avoid duplicate entries. */ + for (i = 0; i < fast_count; ++i) + if (VirtualTransactionIdEquals(vxids[i], vxid)) + break; + if (i >= fast_count) + vxids[count++] = vxid; + } + /* else, xact already committed or aborted */ + } + } + + proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink, + offsetof(PROCLOCK, lockLink)); + } + + LWLockRelease(partitionLock); + + if (count > MaxBackends + max_prepared_xacts) /* should never happen */ + elog(PANIC, "too many conflicting locks found"); + + vxids[count].backendId = InvalidBackendId; + vxids[count].localTransactionId = InvalidLocalTransactionId; + if (countp) + *countp = count; + return vxids; +} + +/* + * Find a lock in the shared lock table and release it. It is the caller's + * responsibility to verify that this is a sane thing to do. (For example, it + * would be bad to release a lock here if there might still be a LOCALLOCK + * object with pointers to it.) + * + * We currently use this in two situations: first, to release locks held by + * prepared transactions on commit (see lock_twophase_postcommit); and second, + * to release locks taken via the fast-path, transferred to the main hash + * table, and then released (see LockReleaseAll). + */ +static void +LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc, + LOCKTAG *locktag, LOCKMODE lockmode, + bool decrement_strong_lock_count) +{ + LOCK *lock; + PROCLOCK *proclock; + PROCLOCKTAG proclocktag; + uint32 hashcode; + uint32 proclock_hashcode; + LWLock *partitionLock; + bool wakeupNeeded; + + hashcode = LockTagHashCode(locktag); + partitionLock = LockHashPartitionLock(hashcode); + + LWLockAcquire(partitionLock, LW_EXCLUSIVE); + + /* + * Re-find the lock object (it had better be there). + */ + lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, + (void *) locktag, + hashcode, + HASH_FIND, + NULL); + if (!lock) + elog(PANIC, "failed to re-find shared lock object"); + + /* + * Re-find the proclock object (ditto). + */ + proclocktag.myLock = lock; + proclocktag.myProc = proc; + + proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode); + + proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash, + (void *) &proclocktag, + proclock_hashcode, + HASH_FIND, + NULL); + if (!proclock) + elog(PANIC, "failed to re-find shared proclock object"); + + /* + * Double-check that we are actually holding a lock of the type we want to + * release. + */ + if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) + { + PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock); + LWLockRelease(partitionLock); + elog(WARNING, "you don't own a lock of type %s", + lockMethodTable->lockModeNames[lockmode]); + return; + } + + /* + * Do the releasing. CleanUpLock will waken any now-wakable waiters. + */ + wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable); + + CleanUpLock(lock, proclock, + lockMethodTable, hashcode, + wakeupNeeded); + + LWLockRelease(partitionLock); + + /* + * Decrement strong lock count. This logic is needed only for 2PC. + */ + if (decrement_strong_lock_count + && ConflictsWithRelationFastPath(locktag, lockmode)) + { + uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode); + + SpinLockAcquire(&FastPathStrongRelationLocks->mutex); + Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0); + FastPathStrongRelationLocks->count[fasthashcode]--; + SpinLockRelease(&FastPathStrongRelationLocks->mutex); + } +} + +/* + * CheckForSessionAndXactLocks + * Check to see if transaction holds both session-level and xact-level + * locks on the same object; if so, throw an error. + * + * If we have both session- and transaction-level locks on the same object, + * PREPARE TRANSACTION must fail. This should never happen with regular + * locks, since we only take those at session level in some special operations + * like VACUUM. It's possible to hit this with advisory locks, though. + * + * It would be nice if we could keep the session hold and give away the + * transactional hold to the prepared xact. However, that would require two + * PROCLOCK objects, and we cannot be sure that another PROCLOCK will be + * available when it comes time for PostPrepare_Locks to do the deed. + * So for now, we error out while we can still do so safely. + * + * Since the LOCALLOCK table stores a separate entry for each lockmode, + * we can't implement this check by examining LOCALLOCK entries in isolation. + * We must build a transient hashtable that is indexed by locktag only. + */ +static void +CheckForSessionAndXactLocks(void) +{ + typedef struct + { + LOCKTAG lock; /* identifies the lockable object */ + bool sessLock; /* is any lockmode held at session level? */ + bool xactLock; /* is any lockmode held at xact level? */ + } PerLockTagEntry; + + HASHCTL hash_ctl; + HTAB *lockhtab; + HASH_SEQ_STATUS status; + LOCALLOCK *locallock; + + /* Create a local hash table keyed by LOCKTAG only */ + hash_ctl.keysize = sizeof(LOCKTAG); + hash_ctl.entrysize = sizeof(PerLockTagEntry); + hash_ctl.hcxt = CurrentMemoryContext; + + lockhtab = hash_create("CheckForSessionAndXactLocks table", + 256, /* arbitrary initial size */ + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + /* Scan local lock table to find entries for each LOCKTAG */ + hash_seq_init(&status, LockMethodLocalHash); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + LOCALLOCKOWNER *lockOwners = locallock->lockOwners; + PerLockTagEntry *hentry; + bool found; + int i; + + /* + * Ignore VXID locks. We don't want those to be held by prepared + * transactions, since they aren't meaningful after a restart. + */ + if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION) + continue; + + /* Ignore it if we don't actually hold the lock */ + if (locallock->nLocks <= 0) + continue; + + /* Otherwise, find or make an entry in lockhtab */ + hentry = (PerLockTagEntry *) hash_search(lockhtab, + (void *) &locallock->tag.lock, + HASH_ENTER, &found); + if (!found) /* initialize, if newly created */ + hentry->sessLock = hentry->xactLock = false; + + /* Scan to see if we hold lock at session or xact level or both */ + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (lockOwners[i].owner == NULL) + hentry->sessLock = true; + else + hentry->xactLock = true; + } + + /* + * We can throw error immediately when we see both types of locks; no + * need to wait around to see if there are more violations. + */ + if (hentry->sessLock && hentry->xactLock) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object"))); + } + + /* Success, so clean up */ + hash_destroy(lockhtab); +} + +/* + * AtPrepare_Locks + * Do the preparatory work for a PREPARE: make 2PC state file records + * for all locks currently held. + * + * Session-level locks are ignored, as are VXID locks. + * + * For the most part, we don't need to touch shared memory for this --- + * all the necessary state information is in the locallock table. + * Fast-path locks are an exception, however: we move any such locks to + * the main table before allowing PREPARE TRANSACTION to succeed. + */ +void +AtPrepare_Locks(void) +{ + HASH_SEQ_STATUS status; + LOCALLOCK *locallock; + + /* First, verify there aren't locks of both xact and session level */ + CheckForSessionAndXactLocks(); + + /* Now do the per-locallock cleanup work */ + hash_seq_init(&status, LockMethodLocalHash); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + TwoPhaseLockRecord record; + LOCALLOCKOWNER *lockOwners = locallock->lockOwners; + bool haveSessionLock; + bool haveXactLock; + int i; + + /* + * Ignore VXID locks. We don't want those to be held by prepared + * transactions, since they aren't meaningful after a restart. + */ + if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION) + continue; + + /* Ignore it if we don't actually hold the lock */ + if (locallock->nLocks <= 0) + continue; + + /* Scan to see whether we hold it at session or transaction level */ + haveSessionLock = haveXactLock = false; + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (lockOwners[i].owner == NULL) + haveSessionLock = true; + else + haveXactLock = true; + } + + /* Ignore it if we have only session lock */ + if (!haveXactLock) + continue; + + /* This can't happen, because we already checked it */ + if (haveSessionLock) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object"))); + + /* + * If the local lock was taken via the fast-path, we need to move it + * to the primary lock table, or just get a pointer to the existing + * primary lock table entry if by chance it's already been + * transferred. + */ + if (locallock->proclock == NULL) + { + locallock->proclock = FastPathGetRelationLockEntry(locallock); + locallock->lock = locallock->proclock->tag.myLock; + } + + /* + * Arrange to not release any strong lock count held by this lock + * entry. We must retain the count until the prepared transaction is + * committed or rolled back. + */ + locallock->holdsStrongLockCount = false; + + /* + * Create a 2PC record. + */ + memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG)); + record.lockmode = locallock->tag.mode; + + RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0, + &record, sizeof(TwoPhaseLockRecord)); + } +} + +/* + * PostPrepare_Locks + * Clean up after successful PREPARE + * + * Here, we want to transfer ownership of our locks to a dummy PGPROC + * that's now associated with the prepared transaction, and we want to + * clean out the corresponding entries in the LOCALLOCK table. + * + * Note: by removing the LOCALLOCK entries, we are leaving dangling + * pointers in the transaction's resource owner. This is OK at the + * moment since resowner.c doesn't try to free locks retail at a toplevel + * transaction commit or abort. We could alternatively zero out nLocks + * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll, + * but that probably costs more cycles. + */ +void +PostPrepare_Locks(TransactionId xid) +{ + PGPROC *newproc = TwoPhaseGetDummyProc(xid, false); + HASH_SEQ_STATUS status; + LOCALLOCK *locallock; + LOCK *lock; + PROCLOCK *proclock; + PROCLOCKTAG proclocktag; + int partition; + + /* Can't prepare a lock group follower. */ + Assert(MyProc->lockGroupLeader == NULL || + MyProc->lockGroupLeader == MyProc); + + /* This is a critical section: any error means big trouble */ + START_CRIT_SECTION(); + + /* + * First we run through the locallock table and get rid of unwanted + * entries, then we scan the process's proclocks and transfer them to the + * target proc. + * + * We do this separately because we may have multiple locallock entries + * pointing to the same proclock, and we daren't end up with any dangling + * pointers. + */ + hash_seq_init(&status, LockMethodLocalHash); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + LOCALLOCKOWNER *lockOwners = locallock->lockOwners; + bool haveSessionLock; + bool haveXactLock; + int i; + + if (locallock->proclock == NULL || locallock->lock == NULL) + { + /* + * We must've run out of shared memory while trying to set up this + * lock. Just forget the local entry. + */ + Assert(locallock->nLocks == 0); + RemoveLocalLock(locallock); + continue; + } + + /* Ignore VXID locks */ + if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION) + continue; + + /* Scan to see whether we hold it at session or transaction level */ + haveSessionLock = haveXactLock = false; + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (lockOwners[i].owner == NULL) + haveSessionLock = true; + else + haveXactLock = true; + } + + /* Ignore it if we have only session lock */ + if (!haveXactLock) + continue; + + /* This can't happen, because we already checked it */ + if (haveSessionLock) + ereport(PANIC, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object"))); + + /* Mark the proclock to show we need to release this lockmode */ + if (locallock->nLocks > 0) + locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode); + + /* And remove the locallock hashtable entry */ + RemoveLocalLock(locallock); + } + + /* + * Now, scan each lock partition separately. + */ + for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) + { + LWLock *partitionLock; + SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); + PROCLOCK *nextplock; + + partitionLock = LockHashPartitionLockByIndex(partition); + + /* + * If the proclock list for this partition is empty, we can skip + * acquiring the partition lock. This optimization is safer than the + * situation in LockReleaseAll, because we got rid of any fast-path + * locks during AtPrepare_Locks, so there cannot be any case where + * another backend is adding something to our lists now. For safety, + * though, we code this the same way as in LockReleaseAll. + */ + if (SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)) == NULL) + continue; /* needn't examine this partition */ + + LWLockAcquire(partitionLock, LW_EXCLUSIVE); + + for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)); + proclock; + proclock = nextplock) + { + /* Get link first, since we may unlink/relink this proclock */ + nextplock = (PROCLOCK *) + SHMQueueNext(procLocks, &proclock->procLink, + offsetof(PROCLOCK, procLink)); + + Assert(proclock->tag.myProc == MyProc); + + lock = proclock->tag.myLock; + + /* Ignore VXID locks */ + if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION) + continue; + + PROCLOCK_PRINT("PostPrepare_Locks", proclock); + LOCK_PRINT("PostPrepare_Locks", lock, 0); + Assert(lock->nRequested >= 0); + Assert(lock->nGranted >= 0); + Assert(lock->nGranted <= lock->nRequested); + Assert((proclock->holdMask & ~lock->grantMask) == 0); + + /* Ignore it if nothing to release (must be a session lock) */ + if (proclock->releaseMask == 0) + continue; + + /* Else we should be releasing all locks */ + if (proclock->releaseMask != proclock->holdMask) + elog(PANIC, "we seem to have dropped a bit somewhere"); + + /* + * We cannot simply modify proclock->tag.myProc to reassign + * ownership of the lock, because that's part of the hash key and + * the proclock would then be in the wrong hash chain. Instead + * use hash_update_hash_key. (We used to create a new hash entry, + * but that risks out-of-memory failure if other processes are + * busy making proclocks too.) We must unlink the proclock from + * our procLink chain and put it into the new proc's chain, too. + * + * Note: the updated proclock hash key will still belong to the + * same hash partition, cf proclock_hash(). So the partition lock + * we already hold is sufficient for this. + */ + SHMQueueDelete(&proclock->procLink); + + /* + * Create the new hash key for the proclock. + */ + proclocktag.myLock = lock; + proclocktag.myProc = newproc; + + /* + * Update groupLeader pointer to point to the new proc. (We'd + * better not be a member of somebody else's lock group!) + */ + Assert(proclock->groupLeader == proclock->tag.myProc); + proclock->groupLeader = newproc; + + /* + * Update the proclock. We should not find any existing entry for + * the same hash key, since there can be only one entry for any + * given lock with my own proc. + */ + if (!hash_update_hash_key(LockMethodProcLockHash, + (void *) proclock, + (void *) &proclocktag)) + elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks"); + + /* Re-link into the new proc's proclock list */ + SHMQueueInsertBefore(&(newproc->myProcLocks[partition]), + &proclock->procLink); + + PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock); + } /* loop over PROCLOCKs within this partition */ + + LWLockRelease(partitionLock); + } /* loop over partitions */ + + END_CRIT_SECTION(); +} + + +/* + * Estimate shared-memory space used for lock tables + */ +Size +LockShmemSize(void) +{ + Size size = 0; + long max_table_size; + + /* lock hash table */ + max_table_size = NLOCKENTS(); + size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK))); + + /* proclock hash table */ + max_table_size *= 2; + size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK))); + + /* + * Since NLOCKENTS is only an estimate, add 10% safety margin. + */ + size = add_size(size, size / 10); + + return size; +} + +/* + * GetLockStatusData - Return a summary of the lock manager's internal + * status, for use in a user-level reporting function. + * + * The return data consists of an array of LockInstanceData objects, + * which are a lightly abstracted version of the PROCLOCK data structures, + * i.e. there is one entry for each unique lock and interested PGPROC. + * It is the caller's responsibility to match up related items (such as + * references to the same lockable object or PGPROC) if wanted. + * + * The design goal is to hold the LWLocks for as short a time as possible; + * thus, this function simply makes a copy of the necessary data and releases + * the locks, allowing the caller to contemplate and format the data for as + * long as it pleases. + */ +LockData * +GetLockStatusData(void) +{ + LockData *data; + PROCLOCK *proclock; + HASH_SEQ_STATUS seqstat; + int els; + int el; + int i; + + data = (LockData *) palloc(sizeof(LockData)); + + /* Guess how much space we'll need. */ + els = MaxBackends; + el = 0; + data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * els); + + /* + * First, we iterate through the per-backend fast-path arrays, locking + * them one at a time. This might produce an inconsistent picture of the + * system state, but taking all of those LWLocks at the same time seems + * impractical (in particular, note MAX_SIMUL_LWLOCKS). It shouldn't + * matter too much, because none of these locks can be involved in lock + * conflicts anyway - anything that might must be present in the main lock + * table. (For the same reason, we don't sweat about making leaderPid + * completely valid. We cannot safely dereference another backend's + * lockGroupLeader field without holding all lock partition locks, and + * it's not worth that.) + */ + for (i = 0; i < ProcGlobal->allProcCount; ++i) + { + PGPROC *proc = &ProcGlobal->allProcs[i]; + uint32 f; + + LWLockAcquire(&proc->fpInfoLock, LW_SHARED); + + for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; ++f) + { + LockInstanceData *instance; + uint32 lockbits = FAST_PATH_GET_BITS(proc, f); + + /* Skip unallocated slots. */ + if (!lockbits) + continue; + + if (el >= els) + { + els += MaxBackends; + data->locks = (LockInstanceData *) + repalloc(data->locks, sizeof(LockInstanceData) * els); + } + + instance = &data->locks[el]; + SET_LOCKTAG_RELATION(instance->locktag, proc->databaseId, + proc->fpRelId[f]); + instance->holdMask = lockbits << FAST_PATH_LOCKNUMBER_OFFSET; + instance->waitLockMode = NoLock; + instance->backend = proc->backendId; + instance->lxid = proc->lxid; + instance->pid = proc->pid; + instance->leaderPid = proc->pid; + instance->fastpath = true; + + /* + * Successfully taking fast path lock means there were no + * conflicting locks. + */ + instance->waitStart = 0; + + el++; + } + + if (proc->fpVXIDLock) + { + VirtualTransactionId vxid; + LockInstanceData *instance; + + if (el >= els) + { + els += MaxBackends; + data->locks = (LockInstanceData *) + repalloc(data->locks, sizeof(LockInstanceData) * els); + } + + vxid.backendId = proc->backendId; + vxid.localTransactionId = proc->fpLocalTransactionId; + + instance = &data->locks[el]; + SET_LOCKTAG_VIRTUALTRANSACTION(instance->locktag, vxid); + instance->holdMask = LOCKBIT_ON(ExclusiveLock); + instance->waitLockMode = NoLock; + instance->backend = proc->backendId; + instance->lxid = proc->lxid; + instance->pid = proc->pid; + instance->leaderPid = proc->pid; + instance->fastpath = true; + instance->waitStart = 0; + + el++; + } + + LWLockRelease(&proc->fpInfoLock); + } + + /* + * Next, acquire lock on the entire shared lock data structure. We do + * this so that, at least for locks in the primary lock table, the state + * will be self-consistent. + * + * Since this is a read-only operation, we take shared instead of + * exclusive lock. There's not a whole lot of point to this, because all + * the normal operations require exclusive lock, but it doesn't hurt + * anything either. It will at least allow two backends to do + * GetLockStatusData in parallel. + * + * Must grab LWLocks in partition-number order to avoid LWLock deadlock. + */ + for (i = 0; i < NUM_LOCK_PARTITIONS; i++) + LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED); + + /* Now we can safely count the number of proclocks */ + data->nelements = el + hash_get_num_entries(LockMethodProcLockHash); + if (data->nelements > els) + { + els = data->nelements; + data->locks = (LockInstanceData *) + repalloc(data->locks, sizeof(LockInstanceData) * els); + } + + /* Now scan the tables to copy the data */ + hash_seq_init(&seqstat, LockMethodProcLockHash); + + while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat))) + { + PGPROC *proc = proclock->tag.myProc; + LOCK *lock = proclock->tag.myLock; + LockInstanceData *instance = &data->locks[el]; + + memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG)); + instance->holdMask = proclock->holdMask; + if (proc->waitLock == proclock->tag.myLock) + instance->waitLockMode = proc->waitLockMode; + else + instance->waitLockMode = NoLock; + instance->backend = proc->backendId; + instance->lxid = proc->lxid; + instance->pid = proc->pid; + instance->leaderPid = proclock->groupLeader->pid; + instance->fastpath = false; + instance->waitStart = (TimestampTz) pg_atomic_read_u64(&proc->waitStart); + + el++; + } + + /* + * And release locks. We do this in reverse order for two reasons: (1) + * Anyone else who needs more than one of the locks will be trying to lock + * them in increasing order; we don't want to release the other process + * until it can get all the locks it needs. (2) This avoids O(N^2) + * behavior inside LWLockRelease. + */ + for (i = NUM_LOCK_PARTITIONS; --i >= 0;) + LWLockRelease(LockHashPartitionLockByIndex(i)); + + Assert(el == data->nelements); + + return data; +} + +/* + * GetBlockerStatusData - Return a summary of the lock manager's state + * concerning locks that are blocking the specified PID or any member of + * the PID's lock group, for use in a user-level reporting function. + * + * For each PID within the lock group that is awaiting some heavyweight lock, + * the return data includes an array of LockInstanceData objects, which are + * the same data structure used by GetLockStatusData; but unlike that function, + * this one reports only the PROCLOCKs associated with the lock that that PID + * is blocked on. (Hence, all the locktags should be the same for any one + * blocked PID.) In addition, we return an array of the PIDs of those backends + * that are ahead of the blocked PID in the lock's wait queue. These can be + * compared with the PIDs in the LockInstanceData objects to determine which + * waiters are ahead of or behind the blocked PID in the queue. + * + * If blocked_pid isn't a valid backend PID or nothing in its lock group is + * waiting on any heavyweight lock, return empty arrays. + * + * The design goal is to hold the LWLocks for as short a time as possible; + * thus, this function simply makes a copy of the necessary data and releases + * the locks, allowing the caller to contemplate and format the data for as + * long as it pleases. + */ +BlockedProcsData * +GetBlockerStatusData(int blocked_pid) +{ + BlockedProcsData *data; + PGPROC *proc; + int i; + + data = (BlockedProcsData *) palloc(sizeof(BlockedProcsData)); + + /* + * Guess how much space we'll need, and preallocate. Most of the time + * this will avoid needing to do repalloc while holding the LWLocks. (We + * assume, but check with an Assert, that MaxBackends is enough entries + * for the procs[] array; the other two could need enlargement, though.) + */ + data->nprocs = data->nlocks = data->npids = 0; + data->maxprocs = data->maxlocks = data->maxpids = MaxBackends; + data->procs = (BlockedProcData *) palloc(sizeof(BlockedProcData) * data->maxprocs); + data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * data->maxlocks); + data->waiter_pids = (int *) palloc(sizeof(int) * data->maxpids); + + /* + * In order to search the ProcArray for blocked_pid and assume that that + * entry won't immediately disappear under us, we must hold ProcArrayLock. + * In addition, to examine the lock grouping fields of any other backend, + * we must hold all the hash partition locks. (Only one of those locks is + * actually relevant for any one lock group, but we can't know which one + * ahead of time.) It's fairly annoying to hold all those locks + * throughout this, but it's no worse than GetLockStatusData(), and it + * does have the advantage that we're guaranteed to return a + * self-consistent instantaneous state. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + + proc = BackendPidGetProcWithLock(blocked_pid); + + /* Nothing to do if it's gone */ + if (proc != NULL) + { + /* + * Acquire lock on the entire shared lock data structure. See notes + * in GetLockStatusData(). + */ + for (i = 0; i < NUM_LOCK_PARTITIONS; i++) + LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED); + + if (proc->lockGroupLeader == NULL) + { + /* Easy case, proc is not a lock group member */ + GetSingleProcBlockerStatusData(proc, data); + } + else + { + /* Examine all procs in proc's lock group */ + dlist_iter iter; + + dlist_foreach(iter, &proc->lockGroupLeader->lockGroupMembers) + { + PGPROC *memberProc; + + memberProc = dlist_container(PGPROC, lockGroupLink, iter.cur); + GetSingleProcBlockerStatusData(memberProc, data); + } + } + + /* + * And release locks. See notes in GetLockStatusData(). + */ + for (i = NUM_LOCK_PARTITIONS; --i >= 0;) + LWLockRelease(LockHashPartitionLockByIndex(i)); + + Assert(data->nprocs <= data->maxprocs); + } + + LWLockRelease(ProcArrayLock); + + return data; +} + +/* Accumulate data about one possibly-blocked proc for GetBlockerStatusData */ +static void +GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data) +{ + LOCK *theLock = blocked_proc->waitLock; + BlockedProcData *bproc; + SHM_QUEUE *procLocks; + PROCLOCK *proclock; + PROC_QUEUE *waitQueue; + PGPROC *proc; + int queue_size; + int i; + + /* Nothing to do if this proc is not blocked */ + if (theLock == NULL) + return; + + /* Set up a procs[] element */ + bproc = &data->procs[data->nprocs++]; + bproc->pid = blocked_proc->pid; + bproc->first_lock = data->nlocks; + bproc->first_waiter = data->npids; + + /* + * We may ignore the proc's fast-path arrays, since nothing in those could + * be related to a contended lock. + */ + + /* Collect all PROCLOCKs associated with theLock */ + procLocks = &(theLock->procLocks); + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, lockLink)); + while (proclock) + { + PGPROC *proc = proclock->tag.myProc; + LOCK *lock = proclock->tag.myLock; + LockInstanceData *instance; + + if (data->nlocks >= data->maxlocks) + { + data->maxlocks += MaxBackends; + data->locks = (LockInstanceData *) + repalloc(data->locks, sizeof(LockInstanceData) * data->maxlocks); + } + + instance = &data->locks[data->nlocks]; + memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG)); + instance->holdMask = proclock->holdMask; + if (proc->waitLock == lock) + instance->waitLockMode = proc->waitLockMode; + else + instance->waitLockMode = NoLock; + instance->backend = proc->backendId; + instance->lxid = proc->lxid; + instance->pid = proc->pid; + instance->leaderPid = proclock->groupLeader->pid; + instance->fastpath = false; + data->nlocks++; + + proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink, + offsetof(PROCLOCK, lockLink)); + } + + /* Enlarge waiter_pids[] if it's too small to hold all wait queue PIDs */ + waitQueue = &(theLock->waitProcs); + queue_size = waitQueue->size; + + if (queue_size > data->maxpids - data->npids) + { + data->maxpids = Max(data->maxpids + MaxBackends, + data->npids + queue_size); + data->waiter_pids = (int *) repalloc(data->waiter_pids, + sizeof(int) * data->maxpids); + } + + /* Collect PIDs from the lock's wait queue, stopping at blocked_proc */ + proc = (PGPROC *) waitQueue->links.next; + for (i = 0; i < queue_size; i++) + { + if (proc == blocked_proc) + break; + data->waiter_pids[data->npids++] = proc->pid; + proc = (PGPROC *) proc->links.next; + } + + bproc->num_locks = data->nlocks - bproc->first_lock; + bproc->num_waiters = data->npids - bproc->first_waiter; +} + +/* + * Returns a list of currently held AccessExclusiveLocks, for use by + * LogStandbySnapshot(). The result is a palloc'd array, + * with the number of elements returned into *nlocks. + * + * XXX This currently takes a lock on all partitions of the lock table, + * but it's possible to do better. By reference counting locks and storing + * the value in the ProcArray entry for each backend we could tell if any + * locks need recording without having to acquire the partition locks and + * scan the lock table. Whether that's worth the additional overhead + * is pretty dubious though. + */ +xl_standby_lock * +GetRunningTransactionLocks(int *nlocks) +{ + xl_standby_lock *accessExclusiveLocks; + PROCLOCK *proclock; + HASH_SEQ_STATUS seqstat; + int i; + int index; + int els; + + /* + * Acquire lock on the entire shared lock data structure. + * + * Must grab LWLocks in partition-number order to avoid LWLock deadlock. + */ + for (i = 0; i < NUM_LOCK_PARTITIONS; i++) + LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED); + + /* Now we can safely count the number of proclocks */ + els = hash_get_num_entries(LockMethodProcLockHash); + + /* + * Allocating enough space for all locks in the lock table is overkill, + * but it's more convenient and faster than having to enlarge the array. + */ + accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock)); + + /* Now scan the tables to copy the data */ + hash_seq_init(&seqstat, LockMethodProcLockHash); + + /* + * If lock is a currently granted AccessExclusiveLock then it will have + * just one proclock holder, so locks are never accessed twice in this + * particular case. Don't copy this code for use elsewhere because in the + * general case this will give you duplicate locks when looking at + * non-exclusive lock types. + */ + index = 0; + while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat))) + { + /* make sure this definition matches the one used in LockAcquire */ + if ((proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock)) && + proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION) + { + PGPROC *proc = proclock->tag.myProc; + LOCK *lock = proclock->tag.myLock; + TransactionId xid = proc->xid; + + /* + * Don't record locks for transactions if we know they have + * already issued their WAL record for commit but not yet released + * lock. It is still possible that we see locks held by already + * complete transactions, if they haven't yet zeroed their xids. + */ + if (!TransactionIdIsValid(xid)) + continue; + + accessExclusiveLocks[index].xid = xid; + accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1; + accessExclusiveLocks[index].relOid = lock->tag.locktag_field2; + + index++; + } + } + + Assert(index <= els); + + /* + * And release locks. We do this in reverse order for two reasons: (1) + * Anyone else who needs more than one of the locks will be trying to lock + * them in increasing order; we don't want to release the other process + * until it can get all the locks it needs. (2) This avoids O(N^2) + * behavior inside LWLockRelease. + */ + for (i = NUM_LOCK_PARTITIONS; --i >= 0;) + LWLockRelease(LockHashPartitionLockByIndex(i)); + + *nlocks = index; + return accessExclusiveLocks; +} + +/* Provide the textual name of any lock mode */ +const char * +GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode) +{ + Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods)); + Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes); + return LockMethods[lockmethodid]->lockModeNames[mode]; +} + +#ifdef LOCK_DEBUG +/* + * Dump all locks in the given proc's myProcLocks lists. + * + * Caller is responsible for having acquired appropriate LWLocks. + */ +void +DumpLocks(PGPROC *proc) +{ + SHM_QUEUE *procLocks; + PROCLOCK *proclock; + LOCK *lock; + int i; + + if (proc == NULL) + return; + + if (proc->waitLock) + LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0); + + for (i = 0; i < NUM_LOCK_PARTITIONS; i++) + { + procLocks = &(proc->myProcLocks[i]); + + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)); + + while (proclock) + { + Assert(proclock->tag.myProc == proc); + + lock = proclock->tag.myLock; + + PROCLOCK_PRINT("DumpLocks", proclock); + LOCK_PRINT("DumpLocks", lock, 0); + + proclock = (PROCLOCK *) + SHMQueueNext(procLocks, &proclock->procLink, + offsetof(PROCLOCK, procLink)); + } + } +} + +/* + * Dump all lmgr locks. + * + * Caller is responsible for having acquired appropriate LWLocks. + */ +void +DumpAllLocks(void) +{ + PGPROC *proc; + PROCLOCK *proclock; + LOCK *lock; + HASH_SEQ_STATUS status; + + proc = MyProc; + + if (proc && proc->waitLock) + LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0); + + hash_seq_init(&status, LockMethodProcLockHash); + + while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL) + { + PROCLOCK_PRINT("DumpAllLocks", proclock); + + lock = proclock->tag.myLock; + if (lock) + LOCK_PRINT("DumpAllLocks", lock, 0); + else + elog(LOG, "DumpAllLocks: proclock->tag.myLock = NULL"); + } +} +#endif /* LOCK_DEBUG */ + +/* + * LOCK 2PC resource manager's routines + */ + +/* + * Re-acquire a lock belonging to a transaction that was prepared. + * + * Because this function is run at db startup, re-acquiring the locks should + * never conflict with running transactions because there are none. We + * assume that the lock state represented by the stored 2PC files is legal. + * + * When switching from Hot Standby mode to normal operation, the locks will + * be already held by the startup process. The locks are acquired for the new + * procs without checking for conflicts, so we don't get a conflict between the + * startup process and the dummy procs, even though we will momentarily have + * a situation where two procs are holding the same AccessExclusiveLock, + * which isn't normally possible because the conflict. If we're in standby + * mode, but a recovery snapshot hasn't been established yet, it's possible + * that some but not all of the locks are already held by the startup process. + * + * This approach is simple, but also a bit dangerous, because if there isn't + * enough shared memory to acquire the locks, an error will be thrown, which + * is promoted to FATAL and recovery will abort, bringing down postmaster. + * A safer approach would be to transfer the locks like we do in + * AtPrepare_Locks, but then again, in hot standby mode it's possible for + * read-only backends to use up all the shared lock memory anyway, so that + * replaying the WAL record that needs to acquire a lock will throw an error + * and PANIC anyway. + */ +void +lock_twophase_recover(TransactionId xid, uint16 info, + void *recdata, uint32 len) +{ + TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; + PGPROC *proc = TwoPhaseGetDummyProc(xid, false); + LOCKTAG *locktag; + LOCKMODE lockmode; + LOCKMETHODID lockmethodid; + LOCK *lock; + PROCLOCK *proclock; + PROCLOCKTAG proclocktag; + bool found; + uint32 hashcode; + uint32 proclock_hashcode; + int partition; + LWLock *partitionLock; + LockMethod lockMethodTable; + + Assert(len == sizeof(TwoPhaseLockRecord)); + locktag = &rec->locktag; + lockmode = rec->lockmode; + lockmethodid = locktag->locktag_lockmethodid; + + if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + lockMethodTable = LockMethods[lockmethodid]; + + hashcode = LockTagHashCode(locktag); + partition = LockHashPartition(hashcode); + partitionLock = LockHashPartitionLock(hashcode); + + LWLockAcquire(partitionLock, LW_EXCLUSIVE); + + /* + * Find or create a lock with this tag. + */ + lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, + (void *) locktag, + hashcode, + HASH_ENTER_NULL, + &found); + if (!lock) + { + LWLockRelease(partitionLock); + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"), + errhint("You might need to increase max_locks_per_transaction."))); + } + + /* + * if it's a new lock object, initialize it + */ + if (!found) + { + lock->grantMask = 0; + lock->waitMask = 0; + SHMQueueInit(&(lock->procLocks)); + ProcQueueInit(&(lock->waitProcs)); + lock->nRequested = 0; + lock->nGranted = 0; + MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES); + MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES); + LOCK_PRINT("lock_twophase_recover: new", lock, lockmode); + } + else + { + LOCK_PRINT("lock_twophase_recover: found", lock, lockmode); + Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0)); + Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0)); + Assert(lock->nGranted <= lock->nRequested); + } + + /* + * Create the hash key for the proclock table. + */ + proclocktag.myLock = lock; + proclocktag.myProc = proc; + + proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode); + + /* + * Find or create a proclock entry with this tag + */ + proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash, + (void *) &proclocktag, + proclock_hashcode, + HASH_ENTER_NULL, + &found); + if (!proclock) + { + /* Oops, not enough shmem for the proclock */ + if (lock->nRequested == 0) + { + /* + * There are no other requestors of this lock, so garbage-collect + * the lock object. We *must* do this to avoid a permanent leak + * of shared memory, because there won't be anything to cause + * anyone to release the lock object later. + */ + Assert(SHMQueueEmpty(&(lock->procLocks))); + if (!hash_search_with_hash_value(LockMethodLockHash, + (void *) &(lock->tag), + hashcode, + HASH_REMOVE, + NULL)) + elog(PANIC, "lock table corrupted"); + } + LWLockRelease(partitionLock); + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"), + errhint("You might need to increase max_locks_per_transaction."))); + } + + /* + * If new, initialize the new entry + */ + if (!found) + { + Assert(proc->lockGroupLeader == NULL); + proclock->groupLeader = proc; + proclock->holdMask = 0; + proclock->releaseMask = 0; + /* Add proclock to appropriate lists */ + SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); + SHMQueueInsertBefore(&(proc->myProcLocks[partition]), + &proclock->procLink); + PROCLOCK_PRINT("lock_twophase_recover: new", proclock); + } + else + { + PROCLOCK_PRINT("lock_twophase_recover: found", proclock); + Assert((proclock->holdMask & ~lock->grantMask) == 0); + } + + /* + * lock->nRequested and lock->requested[] count the total number of + * requests, whether granted or waiting, so increment those immediately. + */ + lock->nRequested++; + lock->requested[lockmode]++; + Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); + + /* + * We shouldn't already hold the desired lock. + */ + if (proclock->holdMask & LOCKBIT_ON(lockmode)) + elog(ERROR, "lock %s on object %u/%u/%u is already held", + lockMethodTable->lockModeNames[lockmode], + lock->tag.locktag_field1, lock->tag.locktag_field2, + lock->tag.locktag_field3); + + /* + * We ignore any possible conflicts and just grant ourselves the lock. Not + * only because we don't bother, but also to avoid deadlocks when + * switching from standby to normal mode. See function comment. + */ + GrantLock(lock, proclock, lockmode); + + /* + * Bump strong lock count, to make sure any fast-path lock requests won't + * be granted without consulting the primary lock table. + */ + if (ConflictsWithRelationFastPath(&lock->tag, lockmode)) + { + uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode); + + SpinLockAcquire(&FastPathStrongRelationLocks->mutex); + FastPathStrongRelationLocks->count[fasthashcode]++; + SpinLockRelease(&FastPathStrongRelationLocks->mutex); + } + + LWLockRelease(partitionLock); +} + +/* + * Re-acquire a lock belonging to a transaction that was prepared, when + * starting up into hot standby mode. + */ +void +lock_twophase_standby_recover(TransactionId xid, uint16 info, + void *recdata, uint32 len) +{ + TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; + LOCKTAG *locktag; + LOCKMODE lockmode; + LOCKMETHODID lockmethodid; + + Assert(len == sizeof(TwoPhaseLockRecord)); + locktag = &rec->locktag; + lockmode = rec->lockmode; + lockmethodid = locktag->locktag_lockmethodid; + + if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + + if (lockmode == AccessExclusiveLock && + locktag->locktag_type == LOCKTAG_RELATION) + { + StandbyAcquireAccessExclusiveLock(xid, + locktag->locktag_field1 /* dboid */ , + locktag->locktag_field2 /* reloid */ ); + } +} + + +/* + * 2PC processing routine for COMMIT PREPARED case. + * + * Find and release the lock indicated by the 2PC record. + */ +void +lock_twophase_postcommit(TransactionId xid, uint16 info, + void *recdata, uint32 len) +{ + TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; + PGPROC *proc = TwoPhaseGetDummyProc(xid, true); + LOCKTAG *locktag; + LOCKMETHODID lockmethodid; + LockMethod lockMethodTable; + + Assert(len == sizeof(TwoPhaseLockRecord)); + locktag = &rec->locktag; + lockmethodid = locktag->locktag_lockmethodid; + + if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + lockMethodTable = LockMethods[lockmethodid]; + + LockRefindAndRelease(lockMethodTable, proc, locktag, rec->lockmode, true); +} + +/* + * 2PC processing routine for ROLLBACK PREPARED case. + * + * This is actually just the same as the COMMIT case. + */ +void +lock_twophase_postabort(TransactionId xid, uint16 info, + void *recdata, uint32 len) +{ + lock_twophase_postcommit(xid, info, recdata, len); +} + +/* + * VirtualXactLockTableInsert + * + * Take vxid lock via the fast-path. There can't be any pre-existing + * lockers, as we haven't advertised this vxid via the ProcArray yet. + * + * Since MyProc->fpLocalTransactionId will normally contain the same data + * as MyProc->lxid, you might wonder if we really need both. The + * difference is that MyProc->lxid is set and cleared unlocked, and + * examined by procarray.c, while fpLocalTransactionId is protected by + * fpInfoLock and is used only by the locking subsystem. Doing it this + * way makes it easier to verify that there are no funny race conditions. + * + * We don't bother recording this lock in the local lock table, since it's + * only ever released at the end of a transaction. Instead, + * LockReleaseAll() calls VirtualXactLockTableCleanup(). + */ +void +VirtualXactLockTableInsert(VirtualTransactionId vxid) +{ + Assert(VirtualTransactionIdIsValid(vxid)); + + LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE); + + Assert(MyProc->backendId == vxid.backendId); + Assert(MyProc->fpLocalTransactionId == InvalidLocalTransactionId); + Assert(MyProc->fpVXIDLock == false); + + MyProc->fpVXIDLock = true; + MyProc->fpLocalTransactionId = vxid.localTransactionId; + + LWLockRelease(&MyProc->fpInfoLock); +} + +/* + * VirtualXactLockTableCleanup + * + * Check whether a VXID lock has been materialized; if so, release it, + * unblocking waiters. + */ +void +VirtualXactLockTableCleanup(void) +{ + bool fastpath; + LocalTransactionId lxid; + + Assert(MyProc->backendId != InvalidBackendId); + + /* + * Clean up shared memory state. + */ + LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE); + + fastpath = MyProc->fpVXIDLock; + lxid = MyProc->fpLocalTransactionId; + MyProc->fpVXIDLock = false; + MyProc->fpLocalTransactionId = InvalidLocalTransactionId; + + LWLockRelease(&MyProc->fpInfoLock); + + /* + * If fpVXIDLock has been cleared without touching fpLocalTransactionId, + * that means someone transferred the lock to the main lock table. + */ + if (!fastpath && LocalTransactionIdIsValid(lxid)) + { + VirtualTransactionId vxid; + LOCKTAG locktag; + + vxid.backendId = MyBackendId; + vxid.localTransactionId = lxid; + SET_LOCKTAG_VIRTUALTRANSACTION(locktag, vxid); + + LockRefindAndRelease(LockMethods[DEFAULT_LOCKMETHOD], MyProc, + &locktag, ExclusiveLock, false); + } +} + +/* + * XactLockForVirtualXact + * + * If TransactionIdIsValid(xid), this is essentially XactLockTableWait(xid, + * NULL, NULL, XLTW_None) or ConditionalXactLockTableWait(xid). Unlike those + * functions, it assumes "xid" is never a subtransaction and that "xid" is + * prepared, committed, or aborted. + * + * If !TransactionIdIsValid(xid), this locks every prepared XID having been + * known as "vxid" before its PREPARE TRANSACTION. + */ +static bool +XactLockForVirtualXact(VirtualTransactionId vxid, + TransactionId xid, bool wait) +{ + bool more = false; + + /* There is no point to wait for 2PCs if you have no 2PCs. */ + if (max_prepared_xacts == 0) + return true; + + do + { + LockAcquireResult lar; + LOCKTAG tag; + + /* Clear state from previous iterations. */ + if (more) + { + xid = InvalidTransactionId; + more = false; + } + + /* If we have no xid, try to find one. */ + if (!TransactionIdIsValid(xid)) + xid = TwoPhaseGetXidByVirtualXID(vxid, &more); + if (!TransactionIdIsValid(xid)) + { + Assert(!more); + return true; + } + + /* Check or wait for XID completion. */ + SET_LOCKTAG_TRANSACTION(tag, xid); + lar = LockAcquire(&tag, ShareLock, false, !wait); + if (lar == LOCKACQUIRE_NOT_AVAIL) + return false; + LockRelease(&tag, ShareLock, false); + } while (more); + + return true; +} + +/* + * VirtualXactLock + * + * If wait = true, wait as long as the given VXID or any XID acquired by the + * same transaction is still running. Then, return true. + * + * If wait = false, just check whether that VXID or one of those XIDs is still + * running, and return true or false. + */ +bool +VirtualXactLock(VirtualTransactionId vxid, bool wait) +{ + LOCKTAG tag; + PGPROC *proc; + TransactionId xid = InvalidTransactionId; + + Assert(VirtualTransactionIdIsValid(vxid)); + + if (VirtualTransactionIdIsRecoveredPreparedXact(vxid)) + /* no vxid lock; localTransactionId is a normal, locked XID */ + return XactLockForVirtualXact(vxid, vxid.localTransactionId, wait); + + SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid); + + /* + * If a lock table entry must be made, this is the PGPROC on whose behalf + * it must be done. Note that the transaction might end or the PGPROC + * might be reassigned to a new backend before we get around to examining + * it, but it doesn't matter. If we find upon examination that the + * relevant lxid is no longer running here, that's enough to prove that + * it's no longer running anywhere. + */ + proc = BackendIdGetProc(vxid.backendId); + if (proc == NULL) + return XactLockForVirtualXact(vxid, InvalidTransactionId, wait); + + /* + * We must acquire this lock before checking the backendId and lxid + * against the ones we're waiting for. The target backend will only set + * or clear lxid while holding this lock. + */ + LWLockAcquire(&proc->fpInfoLock, LW_EXCLUSIVE); + + if (proc->backendId != vxid.backendId + || proc->fpLocalTransactionId != vxid.localTransactionId) + { + /* VXID ended */ + LWLockRelease(&proc->fpInfoLock); + return XactLockForVirtualXact(vxid, InvalidTransactionId, wait); + } + + /* + * If we aren't asked to wait, there's no need to set up a lock table + * entry. The transaction is still in progress, so just return false. + */ + if (!wait) + { + LWLockRelease(&proc->fpInfoLock); + return false; + } + + /* + * OK, we're going to need to sleep on the VXID. But first, we must set + * up the primary lock table entry, if needed (ie, convert the proc's + * fast-path lock on its VXID to a regular lock). + */ + if (proc->fpVXIDLock) + { + PROCLOCK *proclock; + uint32 hashcode; + LWLock *partitionLock; + + hashcode = LockTagHashCode(&tag); + + partitionLock = LockHashPartitionLock(hashcode); + LWLockAcquire(partitionLock, LW_EXCLUSIVE); + + proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc, + &tag, hashcode, ExclusiveLock); + if (!proclock) + { + LWLockRelease(partitionLock); + LWLockRelease(&proc->fpInfoLock); + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"), + errhint("You might need to increase max_locks_per_transaction."))); + } + GrantLock(proclock->tag.myLock, proclock, ExclusiveLock); + + LWLockRelease(partitionLock); + + proc->fpVXIDLock = false; + } + + /* + * If the proc has an XID now, we'll avoid a TwoPhaseGetXidByVirtualXID() + * search. The proc might have assigned this XID but not yet locked it, + * in which case the proc will lock this XID before releasing the VXID. + * The fpInfoLock critical section excludes VirtualXactLockTableCleanup(), + * so we won't save an XID of a different VXID. It doesn't matter whether + * we save this before or after setting up the primary lock table entry. + */ + xid = proc->xid; + + /* Done with proc->fpLockBits */ + LWLockRelease(&proc->fpInfoLock); + + /* Time to wait. */ + (void) LockAcquire(&tag, ShareLock, false, false); + + LockRelease(&tag, ShareLock, false); + return XactLockForVirtualXact(vxid, xid, wait); +} + +/* + * LockWaiterCount + * + * Find the number of lock requester on this locktag + */ +int +LockWaiterCount(const LOCKTAG *locktag) +{ + LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; + LOCK *lock; + bool found; + uint32 hashcode; + LWLock *partitionLock; + int waiters = 0; + + if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) + elog(ERROR, "unrecognized lock method: %d", lockmethodid); + + hashcode = LockTagHashCode(locktag); + partitionLock = LockHashPartitionLock(hashcode); + LWLockAcquire(partitionLock, LW_EXCLUSIVE); + + lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, + (const void *) locktag, + hashcode, + HASH_FIND, + &found); + if (found) + { + Assert(lock != NULL); + waiters = lock->nRequested; + } + LWLockRelease(partitionLock); + + return waiters; +} |