/*------------------------------------------------------------------------- * * lock.c * POSTGRES primary lock mechanism * * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * src/backend/storage/lmgr/lock.c * * NOTES * A lock table is a shared memory hash table. When * a process tries to acquire a lock of a type that conflicts * with existing locks, it is put to sleep using the routines * in storage/lmgr/proc.c. * * For the most part, this code should be invoked via lmgr.c * or another lock-management module, not directly. * * Interface: * * InitLocks(), GetLocksMethodTable(), GetLockTagsMethodTable(), * LockAcquire(), LockRelease(), LockReleaseAll(), * LockCheckConflicts(), GrantLock() * *------------------------------------------------------------------------- */ #include "postgres.h" #include #include #include "access/transam.h" #include "access/twophase.h" #include "access/twophase_rmgr.h" #include "access/xact.h" #include "access/xlog.h" #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" #include "storage/proc.h" #include "storage/procarray.h" #include "storage/sinvaladt.h" #include "storage/spin.h" #include "storage/standby.h" #include "utils/memutils.h" #include "utils/ps_status.h" #include "utils/resowner_private.h" /* This configuration variable is used to set the lock table size */ int max_locks_per_xact; /* set by guc.c */ #define NLOCKENTS() \ mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts)) /* * Data structures defining the semantics of the standard lock methods. * * The conflict table defines the semantics of the various lock modes. */ static const LOCKMASK LockConflicts[] = { 0, /* AccessShareLock */ LOCKBIT_ON(AccessExclusiveLock), /* RowShareLock */ LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock), /* RowExclusiveLock */ LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) | LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock), /* ShareUpdateExclusiveLock */ LOCKBIT_ON(ShareUpdateExclusiveLock) | LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) | LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock), /* ShareLock */ LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) | LOCKBIT_ON(ShareRowExclusiveLock) | LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock), /* ShareRowExclusiveLock */ LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) | LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) | LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock), /* ExclusiveLock */ LOCKBIT_ON(RowShareLock) | LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) | LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) | LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock), /* AccessExclusiveLock */ LOCKBIT_ON(AccessShareLock) | LOCKBIT_ON(RowShareLock) | LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) | LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) | LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock) }; /* Names of lock modes, for debug printouts */ static const char *const lock_mode_names[] = { "INVALID", "AccessShareLock", "RowShareLock", "RowExclusiveLock", "ShareUpdateExclusiveLock", "ShareLock", "ShareRowExclusiveLock", "ExclusiveLock", "AccessExclusiveLock" }; #ifndef LOCK_DEBUG static bool Dummy_trace = false; #endif static const LockMethodData default_lockmethod = { AccessExclusiveLock, /* highest valid lock mode number */ LockConflicts, lock_mode_names, #ifdef LOCK_DEBUG &Trace_locks #else &Dummy_trace #endif }; static const LockMethodData user_lockmethod = { AccessExclusiveLock, /* highest valid lock mode number */ LockConflicts, lock_mode_names, #ifdef LOCK_DEBUG &Trace_userlocks #else &Dummy_trace #endif }; /* * map from lock method id to the lock table data structures */ static const LockMethod LockMethods[] = { NULL, &default_lockmethod, &user_lockmethod }; /* Record that's written to 2PC state file when a lock is persisted */ typedef struct TwoPhaseLockRecord { LOCKTAG locktag; LOCKMODE lockmode; } TwoPhaseLockRecord; /* * Count of the number of fast path lock slots we believe to be used. This * might be higher than the real number if another backend has transferred * our locks to the primary lock table, but it can never be lower than the * real value, since only we can acquire locks on our own behalf. */ static int FastPathLocalUseCount = 0; /* * Flag to indicate if the relation extension lock is held by this backend. * This flag is used to ensure that while holding the relation extension lock * we don't try to acquire a heavyweight lock on any other object. This * restriction implies that the relation extension lock won't ever participate * in the deadlock cycle because we can never wait for any other heavyweight * lock after acquiring this lock. * * Such a restriction is okay for relation extension locks as unlike other * heavyweight locks these are not held till the transaction end. These are * taken for a short duration to extend a particular relation and then * released. */ static bool IsRelationExtensionLockHeld PG_USED_FOR_ASSERTS_ONLY = false; /* * Flag to indicate if the page lock is held by this backend. We don't * acquire any other heavyweight lock while holding the page lock except for * relation extension. However, these locks are never taken in reverse order * which implies that page locks will also never participate in the deadlock * cycle. * * Similar to relation extension, page locks are also held for a short * duration, so imposing such a restriction won't hurt. */ static bool IsPageLockHeld PG_USED_FOR_ASSERTS_ONLY = false; /* Macros for manipulating proc->fpLockBits */ #define FAST_PATH_BITS_PER_SLOT 3 #define FAST_PATH_LOCKNUMBER_OFFSET 1 #define FAST_PATH_MASK ((1 << FAST_PATH_BITS_PER_SLOT) - 1) #define FAST_PATH_GET_BITS(proc, n) \ (((proc)->fpLockBits >> (FAST_PATH_BITS_PER_SLOT * n)) & FAST_PATH_MASK) #define FAST_PATH_BIT_POSITION(n, l) \ (AssertMacro((l) >= FAST_PATH_LOCKNUMBER_OFFSET), \ AssertMacro((l) < FAST_PATH_BITS_PER_SLOT+FAST_PATH_LOCKNUMBER_OFFSET), \ AssertMacro((n) < FP_LOCK_SLOTS_PER_BACKEND), \ ((l) - FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT * (n))) #define FAST_PATH_SET_LOCKMODE(proc, n, l) \ (proc)->fpLockBits |= UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l) #define FAST_PATH_CLEAR_LOCKMODE(proc, n, l) \ (proc)->fpLockBits &= ~(UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)) #define FAST_PATH_CHECK_LOCKMODE(proc, n, l) \ ((proc)->fpLockBits & (UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l))) /* * The fast-path lock mechanism is concerned only with relation locks on * unshared relations by backends bound to a database. The fast-path * mechanism exists mostly to accelerate acquisition and release of locks * that rarely conflict. Because ShareUpdateExclusiveLock is * self-conflicting, it can't use the fast-path mechanism; but it also does * not conflict with any of the locks that do, so we can ignore it completely. */ #define EligibleForRelationFastPath(locktag, mode) \ ((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \ (locktag)->locktag_type == LOCKTAG_RELATION && \ (locktag)->locktag_field1 == MyDatabaseId && \ MyDatabaseId != InvalidOid && \ (mode) < ShareUpdateExclusiveLock) #define ConflictsWithRelationFastPath(locktag, mode) \ ((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \ (locktag)->locktag_type == LOCKTAG_RELATION && \ (locktag)->locktag_field1 != InvalidOid && \ (mode) > ShareUpdateExclusiveLock) static bool FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode); static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode); static bool FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag, uint32 hashcode); static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock); /* * To make the fast-path lock mechanism work, we must have some way of * preventing the use of the fast-path when a conflicting lock might be present. * We partition* the locktag space into FAST_PATH_STRONG_LOCK_HASH_PARTITIONS, * and maintain an integer count of the number of "strong" lockers * in each partition. When any "strong" lockers are present (which is * hopefully not very often), the fast-path mechanism can't be used, and we * must fall back to the slower method of pushing matching locks directly * into the main lock tables. * * The deadlock detector does not know anything about the fast path mechanism, * so any locks that might be involved in a deadlock must be transferred from * the fast-path queues to the main lock table. */ #define FAST_PATH_STRONG_LOCK_HASH_BITS 10 #define FAST_PATH_STRONG_LOCK_HASH_PARTITIONS \ (1 << FAST_PATH_STRONG_LOCK_HASH_BITS) #define FastPathStrongLockHashPartition(hashcode) \ ((hashcode) % FAST_PATH_STRONG_LOCK_HASH_PARTITIONS) typedef struct { slock_t mutex; uint32 count[FAST_PATH_STRONG_LOCK_HASH_PARTITIONS]; } FastPathStrongRelationLockData; static volatile FastPathStrongRelationLockData *FastPathStrongRelationLocks; /* * Pointers to hash tables containing lock state * * The LockMethodLockHash and LockMethodProcLockHash hash tables are in * shared memory; LockMethodLocalHash is local to each backend. */ static HTAB *LockMethodLockHash; static HTAB *LockMethodProcLockHash; static HTAB *LockMethodLocalHash; /* private state for error cleanup */ static LOCALLOCK *StrongLockInProgress; static LOCALLOCK *awaitedLock; static ResourceOwner awaitedOwner; #ifdef LOCK_DEBUG /*------ * The following configuration options are available for lock debugging: * * TRACE_LOCKS -- give a bunch of output what's going on in this file * TRACE_USERLOCKS -- same but for user locks * TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid * (use to avoid output on system tables) * TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally * DEBUG_DEADLOCKS -- currently dumps locks at untimely occasions ;) * * Furthermore, but in storage/lmgr/lwlock.c: * TRACE_LWLOCKS -- trace lightweight locks (pretty useless) * * Define LOCK_DEBUG at compile time to get all these enabled. * -------- */ int Trace_lock_oidmin = FirstNormalObjectId; bool Trace_locks = false; bool Trace_userlocks = false; int Trace_lock_table = 0; bool Debug_deadlocks = false; inline static bool LOCK_DEBUG_ENABLED(const LOCKTAG *tag) { return (*(LockMethods[tag->locktag_lockmethodid]->trace_flag) && ((Oid) tag->locktag_field2 >= (Oid) Trace_lock_oidmin)) || (Trace_lock_table && (tag->locktag_field2 == Trace_lock_table)); } inline static void LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type) { if (LOCK_DEBUG_ENABLED(&lock->tag)) elog(LOG, "%s: lock(%p) id(%u,%u,%u,%u,%u,%u) grantMask(%x) " "req(%d,%d,%d,%d,%d,%d,%d)=%d " "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", where, lock, lock->tag.locktag_field1, lock->tag.locktag_field2, lock->tag.locktag_field3, lock->tag.locktag_field4, lock->tag.locktag_type, lock->tag.locktag_lockmethodid, lock->grantMask, lock->requested[1], lock->requested[2], lock->requested[3], lock->requested[4], lock->requested[5], lock->requested[6], lock->requested[7], lock->nRequested, lock->granted[1], lock->granted[2], lock->granted[3], lock->granted[4], lock->granted[5], lock->granted[6], lock->granted[7], lock->nGranted, lock->waitProcs.size, LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]); } inline static void PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP) { if (LOCK_DEBUG_ENABLED(&proclockP->tag.myLock->tag)) elog(LOG, "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x)", where, proclockP, proclockP->tag.myLock, PROCLOCK_LOCKMETHOD(*(proclockP)), proclockP->tag.myProc, (int) proclockP->holdMask); } #else /* not LOCK_DEBUG */ #define LOCK_PRINT(where, lock, type) ((void) 0) #define PROCLOCK_PRINT(where, proclockP) ((void) 0) #endif /* not LOCK_DEBUG */ static uint32 proclock_hash(const void *key, Size keysize); static void RemoveLocalLock(LOCALLOCK *locallock); static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode); static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner); static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode); static void FinishStrongLockAcquire(void); static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner); static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock); static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent); static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode, PROCLOCK *proclock, LockMethod lockMethodTable); static void CleanUpLock(LOCK *lock, PROCLOCK *proclock, LockMethod lockMethodTable, uint32 hashcode, bool wakeupNeeded); static void LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc, LOCKTAG *locktag, LOCKMODE lockmode, bool decrement_strong_lock_count); static void GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data); /* * InitLocks -- Initialize the lock manager's data structures. * * This is called from CreateSharedMemoryAndSemaphores(), which see for * more comments. In the normal postmaster case, the shared hash tables * are created here, as well as a locallock hash table that will remain * unused and empty in the postmaster itself. Backends inherit the pointers * to the shared tables via fork(), and also inherit an image of the locallock * hash table, which they proceed to use. In the EXEC_BACKEND case, each * backend re-executes this code to obtain pointers to the already existing * shared hash tables and to create its locallock hash table. */ void InitLocks(void) { HASHCTL info; long init_table_size, max_table_size; bool found; /* * Compute init/max size to request for lock hashtables. Note these * calculations must agree with LockShmemSize! */ max_table_size = NLOCKENTS(); init_table_size = max_table_size / 2; /* * Allocate hash table for LOCK structs. This stores per-locked-object * information. */ MemSet(&info, 0, sizeof(info)); info.keysize = sizeof(LOCKTAG); info.entrysize = sizeof(LOCK); info.num_partitions = NUM_LOCK_PARTITIONS; LockMethodLockHash = ShmemInitHash("LOCK hash", init_table_size, max_table_size, &info, HASH_ELEM | HASH_BLOBS | HASH_PARTITION); /* Assume an average of 2 holders per lock */ max_table_size *= 2; init_table_size *= 2; /* * Allocate hash table for PROCLOCK structs. This stores * per-lock-per-holder information. */ info.keysize = sizeof(PROCLOCKTAG); info.entrysize = sizeof(PROCLOCK); info.hash = proclock_hash; info.num_partitions = NUM_LOCK_PARTITIONS; LockMethodProcLockHash = ShmemInitHash("PROCLOCK hash", init_table_size, max_table_size, &info, HASH_ELEM | HASH_FUNCTION | HASH_PARTITION); /* * Allocate fast-path structures. */ FastPathStrongRelationLocks = ShmemInitStruct("Fast Path Strong Relation Lock Data", sizeof(FastPathStrongRelationLockData), &found); if (!found) SpinLockInit(&FastPathStrongRelationLocks->mutex); /* * Allocate non-shared hash table for LOCALLOCK structs. This stores lock * counts and resource owner information. * * The non-shared table could already exist in this process (this occurs * when the postmaster is recreating shared memory after a backend crash). * If so, delete and recreate it. (We could simply leave it, since it * ought to be empty in the postmaster, but for safety let's zap it.) */ if (LockMethodLocalHash) hash_destroy(LockMethodLocalHash); info.keysize = sizeof(LOCALLOCKTAG); info.entrysize = sizeof(LOCALLOCK); LockMethodLocalHash = hash_create("LOCALLOCK hash", 16, &info, HASH_ELEM | HASH_BLOBS); } /* * Fetch the lock method table associated with a given lock */ LockMethod GetLocksMethodTable(const LOCK *lock) { LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock); Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods)); return LockMethods[lockmethodid]; } /* * Fetch the lock method table associated with a given locktag */ LockMethod GetLockTagsMethodTable(const LOCKTAG *locktag) { LOCKMETHODID lockmethodid = (LOCKMETHODID) locktag->locktag_lockmethodid; Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods)); return LockMethods[lockmethodid]; } /* * Compute the hash code associated with a LOCKTAG. * * To avoid unnecessary recomputations of the hash code, we try to do this * just once per function, and then pass it around as needed. Aside from * passing the hashcode to hash_search_with_hash_value(), we can extract * the lock partition number from the hashcode. */ uint32 LockTagHashCode(const LOCKTAG *locktag) { return get_hash_value(LockMethodLockHash, (const void *) locktag); } /* * Compute the hash code associated with a PROCLOCKTAG. * * Because we want to use just one set of partition locks for both the * LOCK and PROCLOCK hash tables, we have to make sure that PROCLOCKs * fall into the same partition number as their associated LOCKs. * dynahash.c expects the partition number to be the low-order bits of * the hash code, and therefore a PROCLOCKTAG's hash code must have the * same low-order bits as the associated LOCKTAG's hash code. We achieve * this with this specialized hash function. */ static uint32 proclock_hash(const void *key, Size keysize) { const PROCLOCKTAG *proclocktag = (const PROCLOCKTAG *) key; uint32 lockhash; Datum procptr; Assert(keysize == sizeof(PROCLOCKTAG)); /* Look into the associated LOCK object, and compute its hash code */ lockhash = LockTagHashCode(&proclocktag->myLock->tag); /* * To make the hash code also depend on the PGPROC, we xor the proc * struct's address into the hash code, left-shifted so that the * partition-number bits don't change. Since this is only a hash, we * don't care if we lose high-order bits of the address; use an * intermediate variable to suppress cast-pointer-to-int warnings. */ procptr = PointerGetDatum(proclocktag->myProc); lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS; return lockhash; } /* * Compute the hash code associated with a PROCLOCKTAG, given the hashcode * for its underlying LOCK. * * We use this just to avoid redundant calls of LockTagHashCode(). */ static inline uint32 ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode) { uint32 lockhash = hashcode; Datum procptr; /* * This must match proclock_hash()! */ procptr = PointerGetDatum(proclocktag->myProc); lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS; return lockhash; } /* * Given two lock modes, return whether they would conflict. */ bool DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2) { LockMethod lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD]; if (lockMethodTable->conflictTab[mode1] & LOCKBIT_ON(mode2)) return true; return false; } /* * LockHeldByMe -- test whether lock 'locktag' is held with mode 'lockmode' * by the current transaction */ bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode) { LOCALLOCKTAG localtag; LOCALLOCK *locallock; /* * See if there is a LOCALLOCK entry for this lock and lockmode */ MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ localtag.lock = *locktag; localtag.mode = lockmode; locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash, (void *) &localtag, HASH_FIND, NULL); return (locallock && locallock->nLocks > 0); } #ifdef USE_ASSERT_CHECKING /* * GetLockMethodLocalHash -- return the hash of local locks, for modules that * evaluate assertions based on all locks held. */ HTAB * GetLockMethodLocalHash(void) { return LockMethodLocalHash; } #endif /* * LockHasWaiters -- look up 'locktag' and check if releasing this * lock would wake up other processes waiting for it. */ bool LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) { LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; LockMethod lockMethodTable; LOCALLOCKTAG localtag; LOCALLOCK *locallock; LOCK *lock; PROCLOCK *proclock; LWLock *partitionLock; bool hasWaiters = false; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) elog(ERROR, "unrecognized lock method: %d", lockmethodid); lockMethodTable = LockMethods[lockmethodid]; if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes) elog(ERROR, "unrecognized lock mode: %d", lockmode); #ifdef LOCK_DEBUG if (LOCK_DEBUG_ENABLED(locktag)) elog(LOG, "LockHasWaiters: lock [%u,%u] %s", locktag->locktag_field1, locktag->locktag_field2, lockMethodTable->lockModeNames[lockmode]); #endif /* * Find the LOCALLOCK entry for this lock and lockmode */ MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ localtag.lock = *locktag; localtag.mode = lockmode; locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash, (void *) &localtag, HASH_FIND, NULL); /* * let the caller print its own error message, too. Do not ereport(ERROR). */ if (!locallock || locallock->nLocks <= 0) { elog(WARNING, "you don't own a lock of type %s", lockMethodTable->lockModeNames[lockmode]); return false; } /* * Check the shared lock table. */ partitionLock = LockHashPartitionLock(locallock->hashcode); LWLockAcquire(partitionLock, LW_SHARED); /* * We don't need to re-find the lock or proclock, since we kept their * addresses in the locallock table, and they couldn't have been removed * while we were holding a lock on them. */ lock = locallock->lock; LOCK_PRINT("LockHasWaiters: found", lock, lockmode); proclock = locallock->proclock; PROCLOCK_PRINT("LockHasWaiters: found", proclock); /* * Double-check that we are actually holding a lock of the type we want to * release. */ if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) { PROCLOCK_PRINT("LockHasWaiters: WRONGTYPE", proclock); LWLockRelease(partitionLock); elog(WARNING, "you don't own a lock of type %s", lockMethodTable->lockModeNames[lockmode]); RemoveLocalLock(locallock); return false; } /* * Do the checking. */ if ((lockMethodTable->conflictTab[lockmode] & lock->waitMask) != 0) hasWaiters = true; LWLockRelease(partitionLock); return hasWaiters; } /* * LockAcquire -- Check for lock conflicts, sleep if conflict found, * set lock if/when no conflicts. * * Inputs: * locktag: unique identifier for the lockable object * lockmode: lock mode to acquire * sessionLock: if true, acquire lock for session not current transaction * dontWait: if true, don't wait to acquire lock * * Returns one of: * LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true * LOCKACQUIRE_OK lock successfully acquired * LOCKACQUIRE_ALREADY_HELD incremented count for lock already held * LOCKACQUIRE_ALREADY_CLEAR incremented count for lock already clear * * In the normal case where dontWait=false and the caller doesn't need to * distinguish a freshly acquired lock from one already taken earlier in * this same transaction, there is no need to examine the return value. * * Side Effects: The lock is acquired and recorded in lock tables. * * NOTE: if we wait for the lock, there is no way to abort the wait * short of aborting the transaction. */ LockAcquireResult LockAcquire(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait) { return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true, NULL); } /* * LockAcquireExtended - allows us to specify additional options * * reportMemoryError specifies whether a lock request that fills the lock * table should generate an ERROR or not. Passing "false" allows the caller * to attempt to recover from lock-table-full situations, perhaps by forcibly * canceling other lock holders and then retrying. Note, however, that the * return code for that is LOCKACQUIRE_NOT_AVAIL, so that it's unsafe to use * in combination with dontWait = true, as the cause of failure couldn't be * distinguished. * * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK * table entry if a lock is successfully acquired, or NULL if not. */ LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait, bool reportMemoryError, LOCALLOCK **locallockp) { LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; LockMethod lockMethodTable; LOCALLOCKTAG localtag; LOCALLOCK *locallock; LOCK *lock; PROCLOCK *proclock; bool found; ResourceOwner owner; uint32 hashcode; LWLock *partitionLock; bool found_conflict; bool log_lock = false; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) elog(ERROR, "unrecognized lock method: %d", lockmethodid); lockMethodTable = LockMethods[lockmethodid]; if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes) elog(ERROR, "unrecognized lock mode: %d", lockmode); if (RecoveryInProgress() && !InRecovery && (locktag->locktag_type == LOCKTAG_OBJECT || locktag->locktag_type == LOCKTAG_RELATION) && lockmode > RowExclusiveLock) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot acquire lock mode %s on database objects while recovery is in progress", lockMethodTable->lockModeNames[lockmode]), errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery."))); #ifdef LOCK_DEBUG if (LOCK_DEBUG_ENABLED(locktag)) elog(LOG, "LockAcquire: lock [%u,%u] %s", locktag->locktag_field1, locktag->locktag_field2, lockMethodTable->lockModeNames[lockmode]); #endif /* Identify owner for lock */ if (sessionLock) owner = NULL; else owner = CurrentResourceOwner; /* * Find or create a LOCALLOCK entry for this lock and lockmode */ MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ localtag.lock = *locktag; localtag.mode = lockmode; locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash, (void *) &localtag, HASH_ENTER, &found); /* * if it's a new locallock object, initialize it */ if (!found) { locallock->lock = NULL; locallock->proclock = NULL; locallock->hashcode = LockTagHashCode(&(localtag.lock)); locallock->nLocks = 0; locallock->holdsStrongLockCount = false; locallock->lockCleared = false; locallock->numLockOwners = 0; locallock->maxLockOwners = 8; locallock->lockOwners = NULL; /* in case next line fails */ locallock->lockOwners = (LOCALLOCKOWNER *) MemoryContextAlloc(TopMemoryContext, locallock->maxLockOwners * sizeof(LOCALLOCKOWNER)); } else { /* Make sure there will be room to remember the lock */ if (locallock->numLockOwners >= locallock->maxLockOwners) { int newsize = locallock->maxLockOwners * 2; locallock->lockOwners = (LOCALLOCKOWNER *) repalloc(locallock->lockOwners, newsize * sizeof(LOCALLOCKOWNER)); locallock->maxLockOwners = newsize; } } hashcode = locallock->hashcode; if (locallockp) *locallockp = locallock; /* * If we already hold the lock, we can just increase the count locally. * * If lockCleared is already set, caller need not worry about absorbing * sinval messages related to the lock's object. */ if (locallock->nLocks > 0) { GrantLockLocal(locallock, owner); if (locallock->lockCleared) return LOCKACQUIRE_ALREADY_CLEAR; else return LOCKACQUIRE_ALREADY_HELD; } /* * We don't acquire any other heavyweight lock while holding the relation * extension lock. We do allow to acquire the same relation extension * lock more than once but that case won't reach here. */ Assert(!IsRelationExtensionLockHeld); /* * We don't acquire any other heavyweight lock while holding the page lock * except for relation extension. */ Assert(!IsPageLockHeld || (locktag->locktag_type == LOCKTAG_RELATION_EXTEND)); /* * Prepare to emit a WAL record if acquisition of this lock needs to be * replayed in a standby server. * * Here we prepare to log; after lock is acquired we'll issue log record. * This arrangement simplifies error recovery in case the preparation step * fails. * * Only AccessExclusiveLocks can conflict with lock types that read-only * transactions can acquire in a standby server. Make sure this definition * matches the one in GetRunningTransactionLocks(). */ if (lockmode >= AccessExclusiveLock && locktag->locktag_type == LOCKTAG_RELATION && !RecoveryInProgress() && XLogStandbyInfoActive()) { LogAccessExclusiveLockPrepare(); log_lock = true; } /* * Attempt to take lock via fast path, if eligible. But if we remember * having filled up the fast path array, we don't attempt to make any * further use of it until we release some locks. It's possible that some * other backend has transferred some of those locks to the shared hash * table, leaving space free, but it's not worth acquiring the LWLock just * to check. It's also possible that we're acquiring a second or third * lock type on a relation we have already locked using the fast-path, but * for now we don't worry about that case either. */ if (EligibleForRelationFastPath(locktag, lockmode) && FastPathLocalUseCount < FP_LOCK_SLOTS_PER_BACKEND) { uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode); bool acquired; /* * LWLockAcquire acts as a memory sequencing point, so it's safe to * assume that any strong locker whose increment to * FastPathStrongRelationLocks->counts becomes visible after we test * it has yet to begin to transfer fast-path locks. */ LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE); if (FastPathStrongRelationLocks->count[fasthashcode] != 0) acquired = false; else acquired = FastPathGrantRelationLock(locktag->locktag_field2, lockmode); LWLockRelease(&MyProc->fpInfoLock); if (acquired) { /* * The locallock might contain stale pointers to some old shared * objects; we MUST reset these to null before considering the * lock to be acquired via fast-path. */ locallock->lock = NULL; locallock->proclock = NULL; GrantLockLocal(locallock, owner); return LOCKACQUIRE_OK; } } /* * If this lock could potentially have been taken via the fast-path by * some other backend, we must (temporarily) disable further use of the * fast-path for this lock tag, and migrate any locks already taken via * this method to the main lock table. */ if (ConflictsWithRelationFastPath(locktag, lockmode)) { uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode); BeginStrongLockAcquire(locallock, fasthashcode); if (!FastPathTransferRelationLocks(lockMethodTable, locktag, hashcode)) { AbortStrongLockAcquire(); if (locallock->nLocks == 0) RemoveLocalLock(locallock); if (locallockp) *locallockp = NULL; if (reportMemoryError) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"), errhint("You might need to increase max_locks_per_transaction."))); else return LOCKACQUIRE_NOT_AVAIL; } } /* * We didn't find the lock in our LOCALLOCK table, and we didn't manage to * take it via the fast-path, either, so we've got to mess with the shared * lock table. */ partitionLock = LockHashPartitionLock(hashcode); LWLockAcquire(partitionLock, LW_EXCLUSIVE); /* * Find or create lock and proclock entries with this tag * * Note: if the locallock object already existed, it might have a pointer * to the lock already ... but we should not assume that that pointer is * valid, since a lock object with zero hold and request counts can go * away anytime. So we have to use SetupLockInTable() to recompute the * lock and proclock pointers, even if they're already set. */ proclock = SetupLockInTable(lockMethodTable, MyProc, locktag, hashcode, lockmode); if (!proclock) { AbortStrongLockAcquire(); LWLockRelease(partitionLock); if (locallock->nLocks == 0) RemoveLocalLock(locallock); if (locallockp) *locallockp = NULL; if (reportMemoryError) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"), errhint("You might need to increase max_locks_per_transaction."))); else return LOCKACQUIRE_NOT_AVAIL; } locallock->proclock = proclock; lock = proclock->tag.myLock; locallock->lock = lock; /* * If lock requested conflicts with locks requested by waiters, must join * wait queue. Otherwise, check for conflict with already-held locks. * (That's last because most complex check.) */ if (lockMethodTable->conflictTab[lockmode] & lock->waitMask) found_conflict = true; else found_conflict = LockCheckConflicts(lockMethodTable, lockmode, lock, proclock); if (!found_conflict) { /* No conflict with held or previously requested locks */ GrantLock(lock, proclock, lockmode); GrantLockLocal(locallock, owner); } else { /* * We can't acquire the lock immediately. If caller specified no * blocking, remove useless table entries and return * LOCKACQUIRE_NOT_AVAIL without waiting. */ if (dontWait) { AbortStrongLockAcquire(); if (proclock->holdMask == 0) { uint32 proclock_hashcode; proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode); SHMQueueDelete(&proclock->lockLink); SHMQueueDelete(&proclock->procLink); if (!hash_search_with_hash_value(LockMethodProcLockHash, (void *) &(proclock->tag), proclock_hashcode, HASH_REMOVE, NULL)) elog(PANIC, "proclock table corrupted"); } else PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock); lock->nRequested--; lock->requested[lockmode]--; LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode); Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0)); Assert(lock->nGranted <= lock->nRequested); LWLockRelease(partitionLock); if (locallock->nLocks == 0) RemoveLocalLock(locallock); if (locallockp) *locallockp = NULL; return LOCKACQUIRE_NOT_AVAIL; } /* * Set bitmask of locks this process already holds on this object. */ MyProc->heldLocks = proclock->holdMask; /* * Sleep till someone wakes me up. */ TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1, locktag->locktag_field2, locktag->locktag_field3, locktag->locktag_field4, locktag->locktag_type, lockmode); WaitOnLock(locallock, owner); TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1, locktag->locktag_field2, locktag->locktag_field3, locktag->locktag_field4, locktag->locktag_type, lockmode); /* * NOTE: do not do any material change of state between here and * return. All required changes in locktable state must have been * done when the lock was granted to us --- see notes in WaitOnLock. */ /* * Check the proclock entry status, in case something in the ipc * communication doesn't work correctly. */ if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) { AbortStrongLockAcquire(); PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock); LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); /* Should we retry ? */ LWLockRelease(partitionLock); elog(ERROR, "LockAcquire failed"); } PROCLOCK_PRINT("LockAcquire: granted", proclock); LOCK_PRINT("LockAcquire: granted", lock, lockmode); } /* * Lock state is fully up-to-date now; if we error out after this, no * special error cleanup is required. */ FinishStrongLockAcquire(); LWLockRelease(partitionLock); /* * Emit a WAL record if acquisition of this lock needs to be replayed in a * standby server. */ if (log_lock) { /* * Decode the locktag back to the original values, to avoid sending * lots of empty bytes with every message. See lock.h to check how a * locktag is defined for LOCKTAG_RELATION */ LogAccessExclusiveLock(locktag->locktag_field1, locktag->locktag_field2); } return LOCKACQUIRE_OK; } /* * Find or create LOCK and PROCLOCK objects as needed for a new lock * request. * * Returns the PROCLOCK object, or NULL if we failed to create the objects * for lack of shared memory. * * The appropriate partition lock must be held at entry, and will be * held at exit. */ static PROCLOCK * SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode) { LOCK *lock; PROCLOCK *proclock; PROCLOCKTAG proclocktag; uint32 proclock_hashcode; bool found; /* * Find or create a lock with this tag. */ lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, (const void *) locktag, hashcode, HASH_ENTER_NULL, &found); if (!lock) return NULL; /* * if it's a new lock object, initialize it */ if (!found) { lock->grantMask = 0; lock->waitMask = 0; SHMQueueInit(&(lock->procLocks)); ProcQueueInit(&(lock->waitProcs)); lock->nRequested = 0; lock->nGranted = 0; MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES); MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES); LOCK_PRINT("LockAcquire: new", lock, lockmode); } else { LOCK_PRINT("LockAcquire: found", lock, lockmode); Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0)); Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0)); Assert(lock->nGranted <= lock->nRequested); } /* * Create the hash key for the proclock table. */ proclocktag.myLock = lock; proclocktag.myProc = proc; proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode); /* * Find or create a proclock entry with this tag */ proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash, (void *) &proclocktag, proclock_hashcode, HASH_ENTER_NULL, &found); if (!proclock) { /* Oops, not enough shmem for the proclock */ if (lock->nRequested == 0) { /* * There are no other requestors of this lock, so garbage-collect * the lock object. We *must* do this to avoid a permanent leak * of shared memory, because there won't be anything to cause * anyone to release the lock object later. */ Assert(SHMQueueEmpty(&(lock->procLocks))); if (!hash_search_with_hash_value(LockMethodLockHash, (void *) &(lock->tag), hashcode, HASH_REMOVE, NULL)) elog(PANIC, "lock table corrupted"); } return NULL; } /* * If new, initialize the new entry */ if (!found) { uint32 partition = LockHashPartition(hashcode); /* * It might seem unsafe to access proclock->groupLeader without a * lock, but it's not really. Either we are initializing a proclock * on our own behalf, in which case our group leader isn't changing * because the group leader for a process can only ever be changed by * the process itself; or else we are transferring a fast-path lock to * the main lock table, in which case that process can't change it's * lock group leader without first releasing all of its locks (and in * particular the one we are currently transferring). */ proclock->groupLeader = proc->lockGroupLeader != NULL ? proc->lockGroupLeader : proc; proclock->holdMask = 0; proclock->releaseMask = 0; /* Add proclock to appropriate lists */ SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); SHMQueueInsertBefore(&(proc->myProcLocks[partition]), &proclock->procLink); PROCLOCK_PRINT("LockAcquire: new", proclock); } else { PROCLOCK_PRINT("LockAcquire: found", proclock); Assert((proclock->holdMask & ~lock->grantMask) == 0); #ifdef CHECK_DEADLOCK_RISK /* * Issue warning if we already hold a lower-level lock on this object * and do not hold a lock of the requested level or higher. This * indicates a deadlock-prone coding practice (eg, we'd have a * deadlock if another backend were following the same code path at * about the same time). * * This is not enabled by default, because it may generate log entries * about user-level coding practices that are in fact safe in context. * It can be enabled to help find system-level problems. * * XXX Doing numeric comparison on the lockmodes is a hack; it'd be * better to use a table. For now, though, this works. */ { int i; for (i = lockMethodTable->numLockModes; i > 0; i--) { if (proclock->holdMask & LOCKBIT_ON(i)) { if (i >= (int) lockmode) break; /* safe: we have a lock >= req level */ elog(LOG, "deadlock risk: raising lock level" " from %s to %s on object %u/%u/%u", lockMethodTable->lockModeNames[i], lockMethodTable->lockModeNames[lockmode], lock->tag.locktag_field1, lock->tag.locktag_field2, lock->tag.locktag_field3); break; } } } #endif /* CHECK_DEADLOCK_RISK */ } /* * lock->nRequested and lock->requested[] count the total number of * requests, whether granted or waiting, so increment those immediately. * The other counts don't increment till we get the lock. */ lock->nRequested++; lock->requested[lockmode]++; Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); /* * We shouldn't already hold the desired lock; else locallock table is * broken. */ if (proclock->holdMask & LOCKBIT_ON(lockmode)) elog(ERROR, "lock %s on object %u/%u/%u is already held", lockMethodTable->lockModeNames[lockmode], lock->tag.locktag_field1, lock->tag.locktag_field2, lock->tag.locktag_field3); return proclock; } /* * Check and set/reset the flag that we hold the relation extension/page lock. * * It is callers responsibility that this function is called after * acquiring/releasing the relation extension/page lock. * * Pass acquired as true if lock is acquired, false otherwise. */ static inline void CheckAndSetLockHeld(LOCALLOCK *locallock, bool acquired) { #ifdef USE_ASSERT_CHECKING if (LOCALLOCK_LOCKTAG(*locallock) == LOCKTAG_RELATION_EXTEND) IsRelationExtensionLockHeld = acquired; else if (LOCALLOCK_LOCKTAG(*locallock) == LOCKTAG_PAGE) IsPageLockHeld = acquired; #endif } /* * Subroutine to free a locallock entry */ static void RemoveLocalLock(LOCALLOCK *locallock) { int i; for (i = locallock->numLockOwners - 1; i >= 0; i--) { if (locallock->lockOwners[i].owner != NULL) ResourceOwnerForgetLock(locallock->lockOwners[i].owner, locallock); } locallock->numLockOwners = 0; if (locallock->lockOwners != NULL) pfree(locallock->lockOwners); locallock->lockOwners = NULL; if (locallock->holdsStrongLockCount) { uint32 fasthashcode; fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode); SpinLockAcquire(&FastPathStrongRelationLocks->mutex); Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0); FastPathStrongRelationLocks->count[fasthashcode]--; locallock->holdsStrongLockCount = false; SpinLockRelease(&FastPathStrongRelationLocks->mutex); } if (!hash_search(LockMethodLocalHash, (void *) &(locallock->tag), HASH_REMOVE, NULL)) elog(WARNING, "locallock table corrupted"); /* * Indicate that the lock is released for certain types of locks */ CheckAndSetLockHeld(locallock, false); } /* * LockCheckConflicts -- test whether requested lock conflicts * with those already granted * * Returns true if conflict, false if no conflict. * * NOTES: * Here's what makes this complicated: one process's locks don't * conflict with one another, no matter what purpose they are held for * (eg, session and transaction locks do not conflict). Nor do the locks * of one process in a lock group conflict with those of another process in * the same group. So, we must subtract off these locks when determining * whether the requested new lock conflicts with those already held. */ bool LockCheckConflicts(LockMethod lockMethodTable, LOCKMODE lockmode, LOCK *lock, PROCLOCK *proclock) { int numLockModes = lockMethodTable->numLockModes; LOCKMASK myLocks; int conflictMask = lockMethodTable->conflictTab[lockmode]; int conflictsRemaining[MAX_LOCKMODES]; int totalConflictsRemaining = 0; int i; SHM_QUEUE *procLocks; PROCLOCK *otherproclock; /* * first check for global conflicts: If no locks conflict with my request, * then I get the lock. * * Checking for conflict: lock->grantMask represents the types of * currently held locks. conflictTable[lockmode] has a bit set for each * type of lock that conflicts with request. Bitwise compare tells if * there is a conflict. */ if (!(conflictMask & lock->grantMask)) { PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock); return false; } /* * Rats. Something conflicts. But it could still be my own lock, or a * lock held by another member of my locking group. First, figure out how * many conflicts remain after subtracting out any locks I hold myself. */ myLocks = proclock->holdMask; for (i = 1; i <= numLockModes; i++) { if ((conflictMask & LOCKBIT_ON(i)) == 0) { conflictsRemaining[i] = 0; continue; } conflictsRemaining[i] = lock->granted[i]; if (myLocks & LOCKBIT_ON(i)) --conflictsRemaining[i]; totalConflictsRemaining += conflictsRemaining[i]; } /* If no conflicts remain, we get the lock. */ if (totalConflictsRemaining == 0) { PROCLOCK_PRINT("LockCheckConflicts: resolved (simple)", proclock); return false; } /* If no group locking, it's definitely a conflict. */ if (proclock->groupLeader == MyProc && MyProc->lockGroupLeader == NULL) { Assert(proclock->tag.myProc == MyProc); PROCLOCK_PRINT("LockCheckConflicts: conflicting (simple)", proclock); return true; } /* * The relation extension or page lock conflict even between the group * members. */ if (LOCK_LOCKTAG(*lock) == LOCKTAG_RELATION_EXTEND || (LOCK_LOCKTAG(*lock) == LOCKTAG_PAGE)) { PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", proclock); return true; } /* * Locks held in conflicting modes by members of our own lock group are * not real conflicts; we can subtract those out and see if we still have * a conflict. This is O(N) in the number of processes holding or * awaiting locks on this object. We could improve that by making the * shared memory state more complex (and larger) but it doesn't seem worth * it. */ procLocks = &(lock->procLocks); otherproclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink)); while (otherproclock != NULL) { if (proclock != otherproclock && proclock->groupLeader == otherproclock->groupLeader && (otherproclock->holdMask & conflictMask) != 0) { int intersectMask = otherproclock->holdMask & conflictMask; for (i = 1; i <= numLockModes; i++) { if ((intersectMask & LOCKBIT_ON(i)) != 0) { if (conflictsRemaining[i] <= 0) elog(PANIC, "proclocks held do not match lock"); conflictsRemaining[i]--; totalConflictsRemaining--; } } if (totalConflictsRemaining == 0) { PROCLOCK_PRINT("LockCheckConflicts: resolved (group)", proclock); return false; } } otherproclock = (PROCLOCK *) SHMQueueNext(procLocks, &otherproclock->lockLink, offsetof(PROCLOCK, lockLink)); } /* Nope, it's a real conflict. */ PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", proclock); return true; } /* * GrantLock -- update the lock and proclock data structures to show * the lock request has been granted. * * NOTE: if proc was blocked, it also needs to be removed from the wait list * and have its waitLock/waitProcLock fields cleared. That's not done here. * * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK * table entry; but since we may be awaking some other process, we can't do * that here; it's done by GrantLockLocal, instead. */ void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode) { lock->nGranted++; lock->granted[lockmode]++; lock->grantMask |= LOCKBIT_ON(lockmode); if (lock->granted[lockmode] == lock->requested[lockmode]) lock->waitMask &= LOCKBIT_OFF(lockmode); proclock->holdMask |= LOCKBIT_ON(lockmode); LOCK_PRINT("GrantLock", lock, lockmode); Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0)); Assert(lock->nGranted <= lock->nRequested); } /* * UnGrantLock -- opposite of GrantLock. * * Updates the lock and proclock data structures to show that the lock * is no longer held nor requested by the current holder. * * Returns true if there were any waiters waiting on the lock that * should now be woken up with ProcLockWakeup. */ static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode, PROCLOCK *proclock, LockMethod lockMethodTable) { bool wakeupNeeded = false; Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0)); Assert(lock->nGranted <= lock->nRequested); /* * fix the general lock stats */ lock->nRequested--; lock->requested[lockmode]--; lock->nGranted--; lock->granted[lockmode]--; if (lock->granted[lockmode] == 0) { /* change the conflict mask. No more of this lock type. */ lock->grantMask &= LOCKBIT_OFF(lockmode); } LOCK_PRINT("UnGrantLock: updated", lock, lockmode); /* * We need only run ProcLockWakeup if the released lock conflicts with at * least one of the lock types requested by waiter(s). Otherwise whatever * conflict made them wait must still exist. NOTE: before MVCC, we could * skip wakeup if lock->granted[lockmode] was still positive. But that's * not true anymore, because the remaining granted locks might belong to * some waiter, who could now be awakened because he doesn't conflict with * his own locks. */ if (lockMethodTable->conflictTab[lockmode] & lock->waitMask) wakeupNeeded = true; /* * Now fix the per-proclock state. */ proclock->holdMask &= LOCKBIT_OFF(lockmode); PROCLOCK_PRINT("UnGrantLock: updated", proclock); return wakeupNeeded; } /* * CleanUpLock -- clean up after releasing a lock. We garbage-collect the * proclock and lock objects if possible, and call ProcLockWakeup if there * are remaining requests and the caller says it's OK. (Normally, this * should be called after UnGrantLock, and wakeupNeeded is the result from * UnGrantLock.) * * The appropriate partition lock must be held at entry, and will be * held at exit. */ static void CleanUpLock(LOCK *lock, PROCLOCK *proclock, LockMethod lockMethodTable, uint32 hashcode, bool wakeupNeeded) { /* * If this was my last hold on this lock, delete my entry in the proclock * table. */ if (proclock->holdMask == 0) { uint32 proclock_hashcode; PROCLOCK_PRINT("CleanUpLock: deleting", proclock); SHMQueueDelete(&proclock->lockLink); SHMQueueDelete(&proclock->procLink); proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode); if (!hash_search_with_hash_value(LockMethodProcLockHash, (void *) &(proclock->tag), proclock_hashcode, HASH_REMOVE, NULL)) elog(PANIC, "proclock table corrupted"); } if (lock->nRequested == 0) { /* * The caller just released the last lock, so garbage-collect the lock * object. */ LOCK_PRINT("CleanUpLock: deleting", lock, 0); Assert(SHMQueueEmpty(&(lock->procLocks))); if (!hash_search_with_hash_value(LockMethodLockHash, (void *) &(lock->tag), hashcode, HASH_REMOVE, NULL)) elog(PANIC, "lock table corrupted"); } else if (wakeupNeeded) { /* There are waiters on this lock, so wake them up. */ ProcLockWakeup(lockMethodTable, lock); } } /* * GrantLockLocal -- update the locallock data structures to show * the lock request has been granted. * * We expect that LockAcquire made sure there is room to add a new * ResourceOwner entry. */ static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner) { LOCALLOCKOWNER *lockOwners = locallock->lockOwners; int i; Assert(locallock->numLockOwners < locallock->maxLockOwners); /* Count the total */ locallock->nLocks++; /* Count the per-owner lock */ for (i = 0; i < locallock->numLockOwners; i++) { if (lockOwners[i].owner == owner) { lockOwners[i].nLocks++; return; } } lockOwners[i].owner = owner; lockOwners[i].nLocks = 1; locallock->numLockOwners++; if (owner != NULL) ResourceOwnerRememberLock(owner, locallock); /* Indicate that the lock is acquired for certain types of locks. */ CheckAndSetLockHeld(locallock, true); } /* * BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK, * and arrange for error cleanup if it fails */ static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode) { Assert(StrongLockInProgress == NULL); Assert(locallock->holdsStrongLockCount == false); /* * Adding to a memory location is not atomic, so we take a spinlock to * ensure we don't collide with someone else trying to bump the count at * the same time. * * XXX: It might be worth considering using an atomic fetch-and-add * instruction here, on architectures where that is supported. */ SpinLockAcquire(&FastPathStrongRelationLocks->mutex); FastPathStrongRelationLocks->count[fasthashcode]++; locallock->holdsStrongLockCount = true; StrongLockInProgress = locallock; SpinLockRelease(&FastPathStrongRelationLocks->mutex); } /* * FinishStrongLockAcquire - cancel pending cleanup for a strong lock * acquisition once it's no longer needed */ static void FinishStrongLockAcquire(void) { StrongLockInProgress = NULL; } /* * AbortStrongLockAcquire - undo strong lock state changes performed by * BeginStrongLockAcquire. */ void AbortStrongLockAcquire(void) { uint32 fasthashcode; LOCALLOCK *locallock = StrongLockInProgress; if (locallock == NULL) return; fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode); Assert(locallock->holdsStrongLockCount == true); SpinLockAcquire(&FastPathStrongRelationLocks->mutex); Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0); FastPathStrongRelationLocks->count[fasthashcode]--; locallock->holdsStrongLockCount = false; StrongLockInProgress = NULL; SpinLockRelease(&FastPathStrongRelationLocks->mutex); } /* * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing * WaitOnLock on. * * proc.c needs this for the case where we are booted off the lock by * timeout, but discover that someone granted us the lock anyway. * * We could just export GrantLockLocal, but that would require including * resowner.h in lock.h, which creates circularity. */ void GrantAwaitedLock(void) { GrantLockLocal(awaitedLock, awaitedOwner); } /* * MarkLockClear -- mark an acquired lock as "clear" * * This means that we know we have absorbed all sinval messages that other * sessions generated before we acquired this lock, and so we can confidently * assume we know about any catalog changes protected by this lock. */ void MarkLockClear(LOCALLOCK *locallock) { Assert(locallock->nLocks > 0); locallock->lockCleared = true; } /* * WaitOnLock -- wait to acquire a lock * * Caller must have set MyProc->heldLocks to reflect locks already held * on the lockable object by this process. * * The appropriate partition lock must be held at entry. */ static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner) { LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock); LockMethod lockMethodTable = LockMethods[lockmethodid]; char *volatile new_status = NULL; LOCK_PRINT("WaitOnLock: sleeping on lock", locallock->lock, locallock->tag.mode); /* Report change to waiting status */ if (update_process_title) { const char *old_status; int len; old_status = get_ps_display(&len); new_status = (char *) palloc(len + 8 + 1); memcpy(new_status, old_status, len); strcpy(new_status + len, " waiting"); set_ps_display(new_status); new_status[len] = '\0'; /* truncate off " waiting" */ } awaitedLock = locallock; awaitedOwner = owner; /* * NOTE: Think not to put any shared-state cleanup after the call to * ProcSleep, in either the normal or failure path. The lock state must * be fully set by the lock grantor, or by CheckDeadLock if we give up * waiting for the lock. This is necessary because of the possibility * that a cancel/die interrupt will interrupt ProcSleep after someone else * grants us the lock, but before we've noticed it. Hence, after granting, * the locktable state must fully reflect the fact that we own the lock; * we can't do additional work on return. * * We can and do use a PG_TRY block to try to clean up after failure, but * this still has a major limitation: elog(FATAL) can occur while waiting * (eg, a "die" interrupt), and then control won't come back here. So all * cleanup of essential state should happen in LockErrorCleanup, not here. * We can use PG_TRY to clear the "waiting" status flags, since doing that * is unimportant if the process exits. */ PG_TRY(); { if (ProcSleep(locallock, lockMethodTable) != STATUS_OK) { /* * We failed as a result of a deadlock, see CheckDeadLock(). Quit * now. */ awaitedLock = NULL; LOCK_PRINT("WaitOnLock: aborting on lock", locallock->lock, locallock->tag.mode); LWLockRelease(LockHashPartitionLock(locallock->hashcode)); /* * Now that we aren't holding the partition lock, we can give an * error report including details about the detected deadlock. */ DeadLockReport(); /* not reached */ } } PG_CATCH(); { /* In this path, awaitedLock remains set until LockErrorCleanup */ /* Report change to non-waiting status */ if (update_process_title) { set_ps_display(new_status); pfree(new_status); } /* and propagate the error */ PG_RE_THROW(); } PG_END_TRY(); awaitedLock = NULL; /* Report change to non-waiting status */ if (update_process_title) { set_ps_display(new_status); pfree(new_status); } LOCK_PRINT("WaitOnLock: wakeup on lock", locallock->lock, locallock->tag.mode); } /* * Remove a proc from the wait-queue it is on (caller must know it is on one). * This is only used when the proc has failed to get the lock, so we set its * waitStatus to STATUS_ERROR. * * Appropriate partition lock must be held by caller. Also, caller is * responsible for signaling the proc if needed. * * NB: this does not clean up any locallock object that may exist for the lock. */ void RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode) { LOCK *waitLock = proc->waitLock; PROCLOCK *proclock = proc->waitProcLock; LOCKMODE lockmode = proc->waitLockMode; LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock); /* Make sure proc is waiting */ Assert(proc->waitStatus == STATUS_WAITING); Assert(proc->links.next != NULL); Assert(waitLock); Assert(waitLock->waitProcs.size > 0); Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods)); /* Remove proc from lock's wait queue */ SHMQueueDelete(&(proc->links)); waitLock->waitProcs.size--; /* Undo increments of request counts by waiting process */ Assert(waitLock->nRequested > 0); Assert(waitLock->nRequested > proc->waitLock->nGranted); waitLock->nRequested--; Assert(waitLock->requested[lockmode] > 0); waitLock->requested[lockmode]--; /* don't forget to clear waitMask bit if appropriate */ if (waitLock->granted[lockmode] == waitLock->requested[lockmode]) waitLock->waitMask &= LOCKBIT_OFF(lockmode); /* Clean up the proc's own state, and pass it the ok/fail signal */ proc->waitLock = NULL; proc->waitProcLock = NULL; proc->waitStatus = STATUS_ERROR; /* * Delete the proclock immediately if it represents no already-held locks. * (This must happen now because if the owner of the lock decides to * release it, and the requested/granted counts then go to zero, * LockRelease expects there to be no remaining proclocks.) Then see if * any other waiters for the lock can be woken up now. */ CleanUpLock(waitLock, proclock, LockMethods[lockmethodid], hashcode, true); } /* * LockRelease -- look up 'locktag' and release one 'lockmode' lock on it. * Release a session lock if 'sessionLock' is true, else release a * regular transaction lock. * * Side Effects: find any waiting processes that are now wakable, * grant them their requested locks and awaken them. * (We have to grant the lock here to avoid a race between * the waking process and any new process to * come along and request the lock.) */ bool LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock) { LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; LockMethod lockMethodTable; LOCALLOCKTAG localtag; LOCALLOCK *locallock; LOCK *lock; PROCLOCK *proclock; LWLock *partitionLock; bool wakeupNeeded; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) elog(ERROR, "unrecognized lock method: %d", lockmethodid); lockMethodTable = LockMethods[lockmethodid]; if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes) elog(ERROR, "unrecognized lock mode: %d", lockmode); #ifdef LOCK_DEBUG if (LOCK_DEBUG_ENABLED(locktag)) elog(LOG, "LockRelease: lock [%u,%u] %s", locktag->locktag_field1, locktag->locktag_field2, lockMethodTable->lockModeNames[lockmode]); #endif /* * Find the LOCALLOCK entry for this lock and lockmode */ MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ localtag.lock = *locktag; localtag.mode = lockmode; locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash, (void *) &localtag, HASH_FIND, NULL); /* * let the caller print its own error message, too. Do not ereport(ERROR). */ if (!locallock || locallock->nLocks <= 0) { elog(WARNING, "you don't own a lock of type %s", lockMethodTable->lockModeNames[lockmode]); return false; } /* * Decrease the count for the resource owner. */ { LOCALLOCKOWNER *lockOwners = locallock->lockOwners; ResourceOwner owner; int i; /* Identify owner for lock */ if (sessionLock) owner = NULL; else owner = CurrentResourceOwner; for (i = locallock->numLockOwners - 1; i >= 0; i--) { if (lockOwners[i].owner == owner) { Assert(lockOwners[i].nLocks > 0); if (--lockOwners[i].nLocks == 0) { if (owner != NULL) ResourceOwnerForgetLock(owner, locallock); /* compact out unused slot */ locallock->numLockOwners--; if (i < locallock->numLockOwners) lockOwners[i] = lockOwners[locallock->numLockOwners]; } break; } } if (i < 0) { /* don't release a lock belonging to another owner */ elog(WARNING, "you don't own a lock of type %s", lockMethodTable->lockModeNames[lockmode]); return false; } } /* * Decrease the total local count. If we're still holding the lock, we're * done. */ locallock->nLocks--; if (locallock->nLocks > 0) return true; /* * At this point we can no longer suppose we are clear of invalidation * messages related to this lock. Although we'll delete the LOCALLOCK * object before any intentional return from this routine, it seems worth * the trouble to explicitly reset lockCleared right now, just in case * some error prevents us from deleting the LOCALLOCK. */ locallock->lockCleared = false; /* Attempt fast release of any lock eligible for the fast path. */ if (EligibleForRelationFastPath(locktag, lockmode) && FastPathLocalUseCount > 0) { bool released; /* * We might not find the lock here, even if we originally entered it * here. Another backend may have moved it to the main table. */ LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE); released = FastPathUnGrantRelationLock(locktag->locktag_field2, lockmode); LWLockRelease(&MyProc->fpInfoLock); if (released) { RemoveLocalLock(locallock); return true; } } /* * Otherwise we've got to mess with the shared lock table. */ partitionLock = LockHashPartitionLock(locallock->hashcode); LWLockAcquire(partitionLock, LW_EXCLUSIVE); /* * Normally, we don't need to re-find the lock or proclock, since we kept * their addresses in the locallock table, and they couldn't have been * removed while we were holding a lock on them. But it's possible that * the lock was taken fast-path and has since been moved to the main hash * table by another backend, in which case we will need to look up the * objects here. We assume the lock field is NULL if so. */ lock = locallock->lock; if (!lock) { PROCLOCKTAG proclocktag; Assert(EligibleForRelationFastPath(locktag, lockmode)); lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, (const void *) locktag, locallock->hashcode, HASH_FIND, NULL); if (!lock) elog(ERROR, "failed to re-find shared lock object"); locallock->lock = lock; proclocktag.myLock = lock; proclocktag.myProc = MyProc; locallock->proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash, (void *) &proclocktag, HASH_FIND, NULL); if (!locallock->proclock) elog(ERROR, "failed to re-find shared proclock object"); } LOCK_PRINT("LockRelease: found", lock, lockmode); proclock = locallock->proclock; PROCLOCK_PRINT("LockRelease: found", proclock); /* * Double-check that we are actually holding a lock of the type we want to * release. */ if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) { PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock); LWLockRelease(partitionLock); elog(WARNING, "you don't own a lock of type %s", lockMethodTable->lockModeNames[lockmode]); RemoveLocalLock(locallock); return false; } /* * Do the releasing. CleanUpLock will waken any now-wakable waiters. */ wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable); CleanUpLock(lock, proclock, lockMethodTable, locallock->hashcode, wakeupNeeded); LWLockRelease(partitionLock); RemoveLocalLock(locallock); return true; } /* * LockReleaseAll -- Release all locks of the specified lock method that * are held by the current process. * * Well, not necessarily *all* locks. The available behaviors are: * allLocks == true: release all locks including session locks. * allLocks == false: release all non-session locks. */ void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks) { HASH_SEQ_STATUS status; LockMethod lockMethodTable; int i, numLockModes; LOCALLOCK *locallock; LOCK *lock; PROCLOCK *proclock; int partition; bool have_fast_path_lwlock = false; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) elog(ERROR, "unrecognized lock method: %d", lockmethodid); lockMethodTable = LockMethods[lockmethodid]; #ifdef LOCK_DEBUG if (*(lockMethodTable->trace_flag)) elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid); #endif /* * Get rid of our fast-path VXID lock, if appropriate. Note that this is * the only way that the lock we hold on our own VXID can ever get * released: it is always and only released when a toplevel transaction * ends. */ if (lockmethodid == DEFAULT_LOCKMETHOD) VirtualXactLockTableCleanup(); numLockModes = lockMethodTable->numLockModes; /* * First we run through the locallock table and get rid of unwanted * entries, then we scan the process's proclocks and get rid of those. We * do this separately because we may have multiple locallock entries * pointing to the same proclock, and we daren't end up with any dangling * pointers. Fast-path locks are cleaned up during the locallock table * scan, though. */ hash_seq_init(&status, LockMethodLocalHash); while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) { /* * If the LOCALLOCK entry is unused, we must've run out of shared * memory while trying to set up this lock. Just forget the local * entry. */ if (locallock->nLocks == 0) { RemoveLocalLock(locallock); continue; } /* Ignore items that are not of the lockmethod to be removed */ if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid) continue; /* * If we are asked to release all locks, we can just zap the entry. * Otherwise, must scan to see if there are session locks. We assume * there is at most one lockOwners entry for session locks. */ if (!allLocks) { LOCALLOCKOWNER *lockOwners = locallock->lockOwners; /* If session lock is above array position 0, move it down to 0 */ for (i = 0; i < locallock->numLockOwners; i++) { if (lockOwners[i].owner == NULL) lockOwners[0] = lockOwners[i]; else ResourceOwnerForgetLock(lockOwners[i].owner, locallock); } if (locallock->numLockOwners > 0 && lockOwners[0].owner == NULL && lockOwners[0].nLocks > 0) { /* Fix the locallock to show just the session locks */ locallock->nLocks = lockOwners[0].nLocks; locallock->numLockOwners = 1; /* We aren't deleting this locallock, so done */ continue; } else locallock->numLockOwners = 0; } /* * If the lock or proclock pointers are NULL, this lock was taken via * the relation fast-path (and is not known to have been transferred). */ if (locallock->proclock == NULL || locallock->lock == NULL) { LOCKMODE lockmode = locallock->tag.mode; Oid relid; /* Verify that a fast-path lock is what we've got. */ if (!EligibleForRelationFastPath(&locallock->tag.lock, lockmode)) elog(PANIC, "locallock table corrupted"); /* * If we don't currently hold the LWLock that protects our * fast-path data structures, we must acquire it before attempting * to release the lock via the fast-path. We will continue to * hold the LWLock until we're done scanning the locallock table, * unless we hit a transferred fast-path lock. (XXX is this * really such a good idea? There could be a lot of entries ...) */ if (!have_fast_path_lwlock) { LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE); have_fast_path_lwlock = true; } /* Attempt fast-path release. */ relid = locallock->tag.lock.locktag_field2; if (FastPathUnGrantRelationLock(relid, lockmode)) { RemoveLocalLock(locallock); continue; } /* * Our lock, originally taken via the fast path, has been * transferred to the main lock table. That's going to require * some extra work, so release our fast-path lock before starting. */ LWLockRelease(&MyProc->fpInfoLock); have_fast_path_lwlock = false; /* * Now dump the lock. We haven't got a pointer to the LOCK or * PROCLOCK in this case, so we have to handle this a bit * differently than a normal lock release. Unfortunately, this * requires an extra LWLock acquire-and-release cycle on the * partitionLock, but hopefully it shouldn't happen often. */ LockRefindAndRelease(lockMethodTable, MyProc, &locallock->tag.lock, lockmode, false); RemoveLocalLock(locallock); continue; } /* Mark the proclock to show we need to release this lockmode */ if (locallock->nLocks > 0) locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode); /* And remove the locallock hashtable entry */ RemoveLocalLock(locallock); } /* Done with the fast-path data structures */ if (have_fast_path_lwlock) LWLockRelease(&MyProc->fpInfoLock); /* * Now, scan each lock partition separately. */ for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) { LWLock *partitionLock; SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); PROCLOCK *nextplock; partitionLock = LockHashPartitionLockByIndex(partition); /* * If the proclock list for this partition is empty, we can skip * acquiring the partition lock. This optimization is trickier than * it looks, because another backend could be in process of adding * something to our proclock list due to promoting one of our * fast-path locks. However, any such lock must be one that we * decided not to delete above, so it's okay to skip it again now; * we'd just decide not to delete it again. We must, however, be * careful to re-fetch the list header once we've acquired the * partition lock, to be sure we have a valid, up-to-date pointer. * (There is probably no significant risk if pointer fetch/store is * atomic, but we don't wish to assume that.) * * XXX This argument assumes that the locallock table correctly * represents all of our fast-path locks. While allLocks mode * guarantees to clean up all of our normal locks regardless of the * locallock situation, we lose that guarantee for fast-path locks. * This is not ideal. */ if (SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, procLink)) == NULL) continue; /* needn't examine this partition */ LWLockAcquire(partitionLock, LW_EXCLUSIVE); for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, procLink)); proclock; proclock = nextplock) { bool wakeupNeeded = false; /* Get link first, since we may unlink/delete this proclock */ nextplock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, offsetof(PROCLOCK, procLink)); Assert(proclock->tag.myProc == MyProc); lock = proclock->tag.myLock; /* Ignore items that are not of the lockmethod to be removed */ if (LOCK_LOCKMETHOD(*lock) != lockmethodid) continue; /* * In allLocks mode, force release of all locks even if locallock * table had problems */ if (allLocks) proclock->releaseMask = proclock->holdMask; else Assert((proclock->releaseMask & ~proclock->holdMask) == 0); /* * Ignore items that have nothing to be released, unless they have * holdMask == 0 and are therefore recyclable */ if (proclock->releaseMask == 0 && proclock->holdMask != 0) continue; PROCLOCK_PRINT("LockReleaseAll", proclock); LOCK_PRINT("LockReleaseAll", lock, 0); Assert(lock->nRequested >= 0); Assert(lock->nGranted >= 0); Assert(lock->nGranted <= lock->nRequested); Assert((proclock->holdMask & ~lock->grantMask) == 0); /* * Release the previously-marked lock modes */ for (i = 1; i <= numLockModes; i++) { if (proclock->releaseMask & LOCKBIT_ON(i)) wakeupNeeded |= UnGrantLock(lock, i, proclock, lockMethodTable); } Assert((lock->nRequested >= 0) && (lock->nGranted >= 0)); Assert(lock->nGranted <= lock->nRequested); LOCK_PRINT("LockReleaseAll: updated", lock, 0); proclock->releaseMask = 0; /* CleanUpLock will wake up waiters if needed. */ CleanUpLock(lock, proclock, lockMethodTable, LockTagHashCode(&lock->tag), wakeupNeeded); } /* loop over PROCLOCKs within this partition */ LWLockRelease(partitionLock); } /* loop over partitions */ #ifdef LOCK_DEBUG if (*(lockMethodTable->trace_flag)) elog(LOG, "LockReleaseAll done"); #endif } /* * LockReleaseSession -- Release all session locks of the specified lock method * that are held by the current process. */ void LockReleaseSession(LOCKMETHODID lockmethodid) { HASH_SEQ_STATUS status; LOCALLOCK *locallock; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) elog(ERROR, "unrecognized lock method: %d", lockmethodid); hash_seq_init(&status, LockMethodLocalHash); while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) { /* Ignore items that are not of the specified lock method */ if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid) continue; ReleaseLockIfHeld(locallock, true); } } /* * LockReleaseCurrentOwner * Release all locks belonging to CurrentResourceOwner * * If the caller knows what those locks are, it can pass them as an array. * That speeds up the call significantly, when a lot of locks are held. * Otherwise, pass NULL for locallocks, and we'll traverse through our hash * table to find them. */ void LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks) { if (locallocks == NULL) { HASH_SEQ_STATUS status; LOCALLOCK *locallock; hash_seq_init(&status, LockMethodLocalHash); while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) ReleaseLockIfHeld(locallock, false); } else { int i; for (i = nlocks - 1; i >= 0; i--) ReleaseLockIfHeld(locallocks[i], false); } } /* * ReleaseLockIfHeld * Release any session-level locks on this lockable object if sessionLock * is true; else, release any locks held by CurrentResourceOwner. * * It is tempting to pass this a ResourceOwner pointer (or NULL for session * locks), but without refactoring LockRelease() we cannot support releasing * locks belonging to resource owners other than CurrentResourceOwner. * If we were to refactor, it'd be a good idea to fix it so we don't have to * do a hashtable lookup of the locallock, too. However, currently this * function isn't used heavily enough to justify refactoring for its * convenience. */ static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock) { ResourceOwner owner; LOCALLOCKOWNER *lockOwners; int i; /* Identify owner for lock (must match LockRelease!) */ if (sessionLock) owner = NULL; else owner = CurrentResourceOwner; /* Scan to see if there are any locks belonging to the target owner */ lockOwners = locallock->lockOwners; for (i = locallock->numLockOwners - 1; i >= 0; i--) { if (lockOwners[i].owner == owner) { Assert(lockOwners[i].nLocks > 0); if (lockOwners[i].nLocks < locallock->nLocks) { /* * We will still hold this lock after forgetting this * ResourceOwner. */ locallock->nLocks -= lockOwners[i].nLocks; /* compact out unused slot */ locallock->numLockOwners--; if (owner != NULL) ResourceOwnerForgetLock(owner, locallock); if (i < locallock->numLockOwners) lockOwners[i] = lockOwners[locallock->numLockOwners]; } else { Assert(lockOwners[i].nLocks == locallock->nLocks); /* We want to call LockRelease just once */ lockOwners[i].nLocks = 1; locallock->nLocks = 1; if (!LockRelease(&locallock->tag.lock, locallock->tag.mode, sessionLock)) elog(WARNING, "ReleaseLockIfHeld: failed??"); } break; } } } /* * LockReassignCurrentOwner * Reassign all locks belonging to CurrentResourceOwner to belong * to its parent resource owner. * * If the caller knows what those locks are, it can pass them as an array. * That speeds up the call significantly, when a lot of locks are held * (e.g pg_dump with a large schema). Otherwise, pass NULL for locallocks, * and we'll traverse through our hash table to find them. */ void LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks) { ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner); Assert(parent != NULL); if (locallocks == NULL) { HASH_SEQ_STATUS status; LOCALLOCK *locallock; hash_seq_init(&status, LockMethodLocalHash); while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) LockReassignOwner(locallock, parent); } else { int i; for (i = nlocks - 1; i >= 0; i--) LockReassignOwner(locallocks[i], parent); } } /* * Subroutine of LockReassignCurrentOwner. Reassigns a given lock belonging to * CurrentResourceOwner to its parent. */ static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent) { LOCALLOCKOWNER *lockOwners; int i; int ic = -1; int ip = -1; /* * Scan to see if there are any locks belonging to current owner or its * parent */ lockOwners = locallock->lockOwners; for (i = locallock->numLockOwners - 1; i >= 0; i--) { if (lockOwners[i].owner == CurrentResourceOwner) ic = i; else if (lockOwners[i].owner == parent) ip = i; } if (ic < 0) return; /* no current locks */ if (ip < 0) { /* Parent has no slot, so just give it the child's slot */ lockOwners[ic].owner = parent; ResourceOwnerRememberLock(parent, locallock); } else { /* Merge child's count with parent's */ lockOwners[ip].nLocks += lockOwners[ic].nLocks; /* compact out unused slot */ locallock->numLockOwners--; if (ic < locallock->numLockOwners) lockOwners[ic] = lockOwners[locallock->numLockOwners]; } ResourceOwnerForgetLock(CurrentResourceOwner, locallock); } /* * FastPathGrantRelationLock * Grant lock using per-backend fast-path array, if there is space. */ static bool FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode) { uint32 f; uint32 unused_slot = FP_LOCK_SLOTS_PER_BACKEND; /* Scan for existing entry for this relid, remembering empty slot. */ for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++) { if (FAST_PATH_GET_BITS(MyProc, f) == 0) unused_slot = f; else if (MyProc->fpRelId[f] == relid) { Assert(!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode)); FAST_PATH_SET_LOCKMODE(MyProc, f, lockmode); return true; } } /* If no existing entry, use any empty slot. */ if (unused_slot < FP_LOCK_SLOTS_PER_BACKEND) { MyProc->fpRelId[unused_slot] = relid; FAST_PATH_SET_LOCKMODE(MyProc, unused_slot, lockmode); ++FastPathLocalUseCount; return true; } /* No existing entry, and no empty slot. */ return false; } /* * FastPathUnGrantRelationLock * Release fast-path lock, if present. Update backend-private local * use count, while we're at it. */ static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode) { uint32 f; bool result = false; FastPathLocalUseCount = 0; for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++) { if (MyProc->fpRelId[f] == relid && FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode)) { Assert(!result); FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode); result = true; /* we continue iterating so as to update FastPathLocalUseCount */ } if (FAST_PATH_GET_BITS(MyProc, f) != 0) ++FastPathLocalUseCount; } return result; } /* * FastPathTransferRelationLocks * Transfer locks matching the given lock tag from per-backend fast-path * arrays to the shared hash table. * * Returns true if successful, false if ran out of shared memory. */ static bool FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag, uint32 hashcode) { LWLock *partitionLock = LockHashPartitionLock(hashcode); Oid relid = locktag->locktag_field2; uint32 i; /* * Every PGPROC that can potentially hold a fast-path lock is present in * ProcGlobal->allProcs. Prepared transactions are not, but any * outstanding fast-path locks held by prepared transactions are * transferred to the main lock table. */ for (i = 0; i < ProcGlobal->allProcCount; i++) { PGPROC *proc = &ProcGlobal->allProcs[i]; uint32 f; LWLockAcquire(&proc->fpInfoLock, LW_EXCLUSIVE); /* * If the target backend isn't referencing the same database as the * lock, then we needn't examine the individual relation IDs at all; * none of them can be relevant. * * proc->databaseId is set at backend startup time and never changes * thereafter, so it might be safe to perform this test before * acquiring &proc->fpInfoLock. In particular, it's certainly safe to * assume that if the target backend holds any fast-path locks, it * must have performed a memory-fencing operation (in particular, an * LWLock acquisition) since setting proc->databaseId. However, it's * less clear that our backend is certain to have performed a memory * fencing operation since the other backend set proc->databaseId. So * for now, we test it after acquiring the LWLock just to be safe. */ if (proc->databaseId != locktag->locktag_field1) { LWLockRelease(&proc->fpInfoLock); continue; } for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++) { uint32 lockmode; /* Look for an allocated slot matching the given relid. */ if (relid != proc->fpRelId[f] || FAST_PATH_GET_BITS(proc, f) == 0) continue; /* Find or create lock object. */ LWLockAcquire(partitionLock, LW_EXCLUSIVE); for (lockmode = FAST_PATH_LOCKNUMBER_OFFSET; lockmode < FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT; ++lockmode) { PROCLOCK *proclock; if (!FAST_PATH_CHECK_LOCKMODE(proc, f, lockmode)) continue; proclock = SetupLockInTable(lockMethodTable, proc, locktag, hashcode, lockmode); if (!proclock) { LWLockRelease(partitionLock); LWLockRelease(&proc->fpInfoLock); return false; } GrantLock(proclock->tag.myLock, proclock, lockmode); FAST_PATH_CLEAR_LOCKMODE(proc, f, lockmode); } LWLockRelease(partitionLock); /* No need to examine remaining slots. */ break; } LWLockRelease(&proc->fpInfoLock); } return true; } /* * FastPathGetRelationLockEntry * Return the PROCLOCK for a lock originally taken via the fast-path, * transferring it to the primary lock table if necessary. * * Note: caller takes care of updating the locallock object. */ static PROCLOCK * FastPathGetRelationLockEntry(LOCALLOCK *locallock) { LockMethod lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD]; LOCKTAG *locktag = &locallock->tag.lock; PROCLOCK *proclock = NULL; LWLock *partitionLock = LockHashPartitionLock(locallock->hashcode); Oid relid = locktag->locktag_field2; uint32 f; LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE); for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++) { uint32 lockmode; /* Look for an allocated slot matching the given relid. */ if (relid != MyProc->fpRelId[f] || FAST_PATH_GET_BITS(MyProc, f) == 0) continue; /* If we don't have a lock of the given mode, forget it! */ lockmode = locallock->tag.mode; if (!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode)) break; /* Find or create lock object. */ LWLockAcquire(partitionLock, LW_EXCLUSIVE); proclock = SetupLockInTable(lockMethodTable, MyProc, locktag, locallock->hashcode, lockmode); if (!proclock) { LWLockRelease(partitionLock); LWLockRelease(&MyProc->fpInfoLock); ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"), errhint("You might need to increase max_locks_per_transaction."))); } GrantLock(proclock->tag.myLock, proclock, lockmode); FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode); LWLockRelease(partitionLock); /* No need to examine remaining slots. */ break; } LWLockRelease(&MyProc->fpInfoLock); /* Lock may have already been transferred by some other backend. */ if (proclock == NULL) { LOCK *lock; PROCLOCKTAG proclocktag; uint32 proclock_hashcode; LWLockAcquire(partitionLock, LW_SHARED); lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, (void *) locktag, locallock->hashcode, HASH_FIND, NULL); if (!lock) elog(ERROR, "failed to re-find shared lock object"); proclocktag.myLock = lock; proclocktag.myProc = MyProc; proclock_hashcode = ProcLockHashCode(&proclocktag, locallock->hashcode); proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash, (void *) &proclocktag, proclock_hashcode, HASH_FIND, NULL); if (!proclock) elog(ERROR, "failed to re-find shared proclock object"); LWLockRelease(partitionLock); } return proclock; } /* * GetLockConflicts * Get an array of VirtualTransactionIds of xacts currently holding locks * that would conflict with the specified lock/lockmode. * xacts merely awaiting such a lock are NOT reported. * * The result array is palloc'd and is terminated with an invalid VXID. * *countp, if not null, is updated to the number of items set. * * Of course, the result could be out of date by the time it's returned, * so use of this function has to be thought about carefully. * * Note we never include the current xact's vxid in the result array, * since an xact never blocks itself. */ VirtualTransactionId * GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp) { static VirtualTransactionId *vxids; LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; LockMethod lockMethodTable; LOCK *lock; LOCKMASK conflictMask; SHM_QUEUE *procLocks; PROCLOCK *proclock; uint32 hashcode; LWLock *partitionLock; int count = 0; int fast_count = 0; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) elog(ERROR, "unrecognized lock method: %d", lockmethodid); lockMethodTable = LockMethods[lockmethodid]; if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes) elog(ERROR, "unrecognized lock mode: %d", lockmode); /* * Allocate memory to store results, and fill with InvalidVXID. We only * need enough space for MaxBackends + max_prepared_xacts + a terminator. * InHotStandby allocate once in TopMemoryContext. */ if (InHotStandby) { if (vxids == NULL) vxids = (VirtualTransactionId *) MemoryContextAlloc(TopMemoryContext, sizeof(VirtualTransactionId) * (MaxBackends + max_prepared_xacts + 1)); } else vxids = (VirtualTransactionId *) palloc0(sizeof(VirtualTransactionId) * (MaxBackends + max_prepared_xacts + 1)); /* Compute hash code and partition lock, and look up conflicting modes. */ hashcode = LockTagHashCode(locktag); partitionLock = LockHashPartitionLock(hashcode); conflictMask = lockMethodTable->conflictTab[lockmode]; /* * Fast path locks might not have been entered in the primary lock table. * If the lock we're dealing with could conflict with such a lock, we must * examine each backend's fast-path array for conflicts. */ if (ConflictsWithRelationFastPath(locktag, lockmode)) { int i; Oid relid = locktag->locktag_field2; VirtualTransactionId vxid; /* * Iterate over relevant PGPROCs. Anything held by a prepared * transaction will have been transferred to the primary lock table, * so we need not worry about those. This is all a bit fuzzy, because * new locks could be taken after we've visited a particular * partition, but the callers had better be prepared to deal with that * anyway, since the locks could equally well be taken between the * time we return the value and the time the caller does something * with it. */ for (i = 0; i < ProcGlobal->allProcCount; i++) { PGPROC *proc = &ProcGlobal->allProcs[i]; uint32 f; /* A backend never blocks itself */ if (proc == MyProc) continue; LWLockAcquire(&proc->fpInfoLock, LW_SHARED); /* * If the target backend isn't referencing the same database as * the lock, then we needn't examine the individual relation IDs * at all; none of them can be relevant. * * See FastPathTransferRelationLocks() for discussion of why we do * this test after acquiring the lock. */ if (proc->databaseId != locktag->locktag_field1) { LWLockRelease(&proc->fpInfoLock); continue; } for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++) { uint32 lockmask; /* Look for an allocated slot matching the given relid. */ if (relid != proc->fpRelId[f]) continue; lockmask = FAST_PATH_GET_BITS(proc, f); if (!lockmask) continue; lockmask <<= FAST_PATH_LOCKNUMBER_OFFSET; /* * There can only be one entry per relation, so if we found it * and it doesn't conflict, we can skip the rest of the slots. */ if ((lockmask & conflictMask) == 0) break; /* Conflict! */ GET_VXID_FROM_PGPROC(vxid, *proc); if (VirtualTransactionIdIsValid(vxid)) vxids[count++] = vxid; /* else, xact already committed or aborted */ /* No need to examine remaining slots. */ break; } LWLockRelease(&proc->fpInfoLock); } } /* Remember how many fast-path conflicts we found. */ fast_count = count; /* * Look up the lock object matching the tag. */ LWLockAcquire(partitionLock, LW_SHARED); lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, (const void *) locktag, hashcode, HASH_FIND, NULL); if (!lock) { /* * If the lock object doesn't exist, there is nothing holding a lock * on this lockable object. */ LWLockRelease(partitionLock); vxids[count].backendId = InvalidBackendId; vxids[count].localTransactionId = InvalidLocalTransactionId; if (countp) *countp = count; return vxids; } /* * Examine each existing holder (or awaiter) of the lock. */ procLocks = &(lock->procLocks); proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink)); while (proclock) { if (conflictMask & proclock->holdMask) { PGPROC *proc = proclock->tag.myProc; /* A backend never blocks itself */ if (proc != MyProc) { VirtualTransactionId vxid; GET_VXID_FROM_PGPROC(vxid, *proc); if (VirtualTransactionIdIsValid(vxid)) { int i; /* Avoid duplicate entries. */ for (i = 0; i < fast_count; ++i) if (VirtualTransactionIdEquals(vxids[i], vxid)) break; if (i >= fast_count) vxids[count++] = vxid; } /* else, xact already committed or aborted */ } } proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink, offsetof(PROCLOCK, lockLink)); } LWLockRelease(partitionLock); if (count > MaxBackends + max_prepared_xacts) /* should never happen */ elog(PANIC, "too many conflicting locks found"); vxids[count].backendId = InvalidBackendId; vxids[count].localTransactionId = InvalidLocalTransactionId; if (countp) *countp = count; return vxids; } /* * Find a lock in the shared lock table and release it. It is the caller's * responsibility to verify that this is a sane thing to do. (For example, it * would be bad to release a lock here if there might still be a LOCALLOCK * object with pointers to it.) * * We currently use this in two situations: first, to release locks held by * prepared transactions on commit (see lock_twophase_postcommit); and second, * to release locks taken via the fast-path, transferred to the main hash * table, and then released (see LockReleaseAll). */ static void LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc, LOCKTAG *locktag, LOCKMODE lockmode, bool decrement_strong_lock_count) { LOCK *lock; PROCLOCK *proclock; PROCLOCKTAG proclocktag; uint32 hashcode; uint32 proclock_hashcode; LWLock *partitionLock; bool wakeupNeeded; hashcode = LockTagHashCode(locktag); partitionLock = LockHashPartitionLock(hashcode); LWLockAcquire(partitionLock, LW_EXCLUSIVE); /* * Re-find the lock object (it had better be there). */ lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, (void *) locktag, hashcode, HASH_FIND, NULL); if (!lock) elog(PANIC, "failed to re-find shared lock object"); /* * Re-find the proclock object (ditto). */ proclocktag.myLock = lock; proclocktag.myProc = proc; proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode); proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash, (void *) &proclocktag, proclock_hashcode, HASH_FIND, NULL); if (!proclock) elog(PANIC, "failed to re-find shared proclock object"); /* * Double-check that we are actually holding a lock of the type we want to * release. */ if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) { PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock); LWLockRelease(partitionLock); elog(WARNING, "you don't own a lock of type %s", lockMethodTable->lockModeNames[lockmode]); return; } /* * Do the releasing. CleanUpLock will waken any now-wakable waiters. */ wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable); CleanUpLock(lock, proclock, lockMethodTable, hashcode, wakeupNeeded); LWLockRelease(partitionLock); /* * Decrement strong lock count. This logic is needed only for 2PC. */ if (decrement_strong_lock_count && ConflictsWithRelationFastPath(locktag, lockmode)) { uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode); SpinLockAcquire(&FastPathStrongRelationLocks->mutex); Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0); FastPathStrongRelationLocks->count[fasthashcode]--; SpinLockRelease(&FastPathStrongRelationLocks->mutex); } } /* * CheckForSessionAndXactLocks * Check to see if transaction holds both session-level and xact-level * locks on the same object; if so, throw an error. * * If we have both session- and transaction-level locks on the same object, * PREPARE TRANSACTION must fail. This should never happen with regular * locks, since we only take those at session level in some special operations * like VACUUM. It's possible to hit this with advisory locks, though. * * It would be nice if we could keep the session hold and give away the * transactional hold to the prepared xact. However, that would require two * PROCLOCK objects, and we cannot be sure that another PROCLOCK will be * available when it comes time for PostPrepare_Locks to do the deed. * So for now, we error out while we can still do so safely. * * Since the LOCALLOCK table stores a separate entry for each lockmode, * we can't implement this check by examining LOCALLOCK entries in isolation. * We must build a transient hashtable that is indexed by locktag only. */ static void CheckForSessionAndXactLocks(void) { typedef struct { LOCKTAG lock; /* identifies the lockable object */ bool sessLock; /* is any lockmode held at session level? */ bool xactLock; /* is any lockmode held at xact level? */ } PerLockTagEntry; HASHCTL hash_ctl; HTAB *lockhtab; HASH_SEQ_STATUS status; LOCALLOCK *locallock; /* Create a local hash table keyed by LOCKTAG only */ hash_ctl.keysize = sizeof(LOCKTAG); hash_ctl.entrysize = sizeof(PerLockTagEntry); hash_ctl.hcxt = CurrentMemoryContext; lockhtab = hash_create("CheckForSessionAndXactLocks table", 256, /* arbitrary initial size */ &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); /* Scan local lock table to find entries for each LOCKTAG */ hash_seq_init(&status, LockMethodLocalHash); while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) { LOCALLOCKOWNER *lockOwners = locallock->lockOwners; PerLockTagEntry *hentry; bool found; int i; /* * Ignore VXID locks. We don't want those to be held by prepared * transactions, since they aren't meaningful after a restart. */ if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION) continue; /* Ignore it if we don't actually hold the lock */ if (locallock->nLocks <= 0) continue; /* Otherwise, find or make an entry in lockhtab */ hentry = (PerLockTagEntry *) hash_search(lockhtab, (void *) &locallock->tag.lock, HASH_ENTER, &found); if (!found) /* initialize, if newly created */ hentry->sessLock = hentry->xactLock = false; /* Scan to see if we hold lock at session or xact level or both */ for (i = locallock->numLockOwners - 1; i >= 0; i--) { if (lockOwners[i].owner == NULL) hentry->sessLock = true; else hentry->xactLock = true; } /* * We can throw error immediately when we see both types of locks; no * need to wait around to see if there are more violations. */ if (hentry->sessLock && hentry->xactLock) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object"))); } /* Success, so clean up */ hash_destroy(lockhtab); } /* * AtPrepare_Locks * Do the preparatory work for a PREPARE: make 2PC state file records * for all locks currently held. * * Session-level locks are ignored, as are VXID locks. * * For the most part, we don't need to touch shared memory for this --- * all the necessary state information is in the locallock table. * Fast-path locks are an exception, however: we move any such locks to * the main table before allowing PREPARE TRANSACTION to succeed. */ void AtPrepare_Locks(void) { HASH_SEQ_STATUS status; LOCALLOCK *locallock; /* First, verify there aren't locks of both xact and session level */ CheckForSessionAndXactLocks(); /* Now do the per-locallock cleanup work */ hash_seq_init(&status, LockMethodLocalHash); while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) { TwoPhaseLockRecord record; LOCALLOCKOWNER *lockOwners = locallock->lockOwners; bool haveSessionLock; bool haveXactLock; int i; /* * Ignore VXID locks. We don't want those to be held by prepared * transactions, since they aren't meaningful after a restart. */ if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION) continue; /* Ignore it if we don't actually hold the lock */ if (locallock->nLocks <= 0) continue; /* Scan to see whether we hold it at session or transaction level */ haveSessionLock = haveXactLock = false; for (i = locallock->numLockOwners - 1; i >= 0; i--) { if (lockOwners[i].owner == NULL) haveSessionLock = true; else haveXactLock = true; } /* Ignore it if we have only session lock */ if (!haveXactLock) continue; /* This can't happen, because we already checked it */ if (haveSessionLock) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object"))); /* * If the local lock was taken via the fast-path, we need to move it * to the primary lock table, or just get a pointer to the existing * primary lock table entry if by chance it's already been * transferred. */ if (locallock->proclock == NULL) { locallock->proclock = FastPathGetRelationLockEntry(locallock); locallock->lock = locallock->proclock->tag.myLock; } /* * Arrange to not release any strong lock count held by this lock * entry. We must retain the count until the prepared transaction is * committed or rolled back. */ locallock->holdsStrongLockCount = false; /* * Create a 2PC record. */ memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG)); record.lockmode = locallock->tag.mode; RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0, &record, sizeof(TwoPhaseLockRecord)); } } /* * PostPrepare_Locks * Clean up after successful PREPARE * * Here, we want to transfer ownership of our locks to a dummy PGPROC * that's now associated with the prepared transaction, and we want to * clean out the corresponding entries in the LOCALLOCK table. * * Note: by removing the LOCALLOCK entries, we are leaving dangling * pointers in the transaction's resource owner. This is OK at the * moment since resowner.c doesn't try to free locks retail at a toplevel * transaction commit or abort. We could alternatively zero out nLocks * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll, * but that probably costs more cycles. */ void PostPrepare_Locks(TransactionId xid) { PGPROC *newproc = TwoPhaseGetDummyProc(xid, false); HASH_SEQ_STATUS status; LOCALLOCK *locallock; LOCK *lock; PROCLOCK *proclock; PROCLOCKTAG proclocktag; int partition; /* Can't prepare a lock group follower. */ Assert(MyProc->lockGroupLeader == NULL || MyProc->lockGroupLeader == MyProc); /* This is a critical section: any error means big trouble */ START_CRIT_SECTION(); /* * First we run through the locallock table and get rid of unwanted * entries, then we scan the process's proclocks and transfer them to the * target proc. * * We do this separately because we may have multiple locallock entries * pointing to the same proclock, and we daren't end up with any dangling * pointers. */ hash_seq_init(&status, LockMethodLocalHash); while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) { LOCALLOCKOWNER *lockOwners = locallock->lockOwners; bool haveSessionLock; bool haveXactLock; int i; if (locallock->proclock == NULL || locallock->lock == NULL) { /* * We must've run out of shared memory while trying to set up this * lock. Just forget the local entry. */ Assert(locallock->nLocks == 0); RemoveLocalLock(locallock); continue; } /* Ignore VXID locks */ if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION) continue; /* Scan to see whether we hold it at session or transaction level */ haveSessionLock = haveXactLock = false; for (i = locallock->numLockOwners - 1; i >= 0; i--) { if (lockOwners[i].owner == NULL) haveSessionLock = true; else haveXactLock = true; } /* Ignore it if we have only session lock */ if (!haveXactLock) continue; /* This can't happen, because we already checked it */ if (haveSessionLock) ereport(PANIC, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object"))); /* Mark the proclock to show we need to release this lockmode */ if (locallock->nLocks > 0) locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode); /* And remove the locallock hashtable entry */ RemoveLocalLock(locallock); } /* * Now, scan each lock partition separately. */ for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++) { LWLock *partitionLock; SHM_QUEUE *procLocks = &(MyProc->myProcLocks[partition]); PROCLOCK *nextplock; partitionLock = LockHashPartitionLockByIndex(partition); /* * If the proclock list for this partition is empty, we can skip * acquiring the partition lock. This optimization is safer than the * situation in LockReleaseAll, because we got rid of any fast-path * locks during AtPrepare_Locks, so there cannot be any case where * another backend is adding something to our lists now. For safety, * though, we code this the same way as in LockReleaseAll. */ if (SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, procLink)) == NULL) continue; /* needn't examine this partition */ LWLockAcquire(partitionLock, LW_EXCLUSIVE); for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, procLink)); proclock; proclock = nextplock) { /* Get link first, since we may unlink/relink this proclock */ nextplock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, offsetof(PROCLOCK, procLink)); Assert(proclock->tag.myProc == MyProc); lock = proclock->tag.myLock; /* Ignore VXID locks */ if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION) continue; PROCLOCK_PRINT("PostPrepare_Locks", proclock); LOCK_PRINT("PostPrepare_Locks", lock, 0); Assert(lock->nRequested >= 0); Assert(lock->nGranted >= 0); Assert(lock->nGranted <= lock->nRequested); Assert((proclock->holdMask & ~lock->grantMask) == 0); /* Ignore it if nothing to release (must be a session lock) */ if (proclock->releaseMask == 0) continue; /* Else we should be releasing all locks */ if (proclock->releaseMask != proclock->holdMask) elog(PANIC, "we seem to have dropped a bit somewhere"); /* * We cannot simply modify proclock->tag.myProc to reassign * ownership of the lock, because that's part of the hash key and * the proclock would then be in the wrong hash chain. Instead * use hash_update_hash_key. (We used to create a new hash entry, * but that risks out-of-memory failure if other processes are * busy making proclocks too.) We must unlink the proclock from * our procLink chain and put it into the new proc's chain, too. * * Note: the updated proclock hash key will still belong to the * same hash partition, cf proclock_hash(). So the partition lock * we already hold is sufficient for this. */ SHMQueueDelete(&proclock->procLink); /* * Create the new hash key for the proclock. */ proclocktag.myLock = lock; proclocktag.myProc = newproc; /* * Update groupLeader pointer to point to the new proc. (We'd * better not be a member of somebody else's lock group!) */ Assert(proclock->groupLeader == proclock->tag.myProc); proclock->groupLeader = newproc; /* * Update the proclock. We should not find any existing entry for * the same hash key, since there can be only one entry for any * given lock with my own proc. */ if (!hash_update_hash_key(LockMethodProcLockHash, (void *) proclock, (void *) &proclocktag)) elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks"); /* Re-link into the new proc's proclock list */ SHMQueueInsertBefore(&(newproc->myProcLocks[partition]), &proclock->procLink); PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock); } /* loop over PROCLOCKs within this partition */ LWLockRelease(partitionLock); } /* loop over partitions */ END_CRIT_SECTION(); } /* * Estimate shared-memory space used for lock tables */ Size LockShmemSize(void) { Size size = 0; long max_table_size; /* lock hash table */ max_table_size = NLOCKENTS(); size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK))); /* proclock hash table */ max_table_size *= 2; size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK))); /* * Since NLOCKENTS is only an estimate, add 10% safety margin. */ size = add_size(size, size / 10); return size; } /* * GetLockStatusData - Return a summary of the lock manager's internal * status, for use in a user-level reporting function. * * The return data consists of an array of LockInstanceData objects, * which are a lightly abstracted version of the PROCLOCK data structures, * i.e. there is one entry for each unique lock and interested PGPROC. * It is the caller's responsibility to match up related items (such as * references to the same lockable object or PGPROC) if wanted. * * The design goal is to hold the LWLocks for as short a time as possible; * thus, this function simply makes a copy of the necessary data and releases * the locks, allowing the caller to contemplate and format the data for as * long as it pleases. */ LockData * GetLockStatusData(void) { LockData *data; PROCLOCK *proclock; HASH_SEQ_STATUS seqstat; int els; int el; int i; data = (LockData *) palloc(sizeof(LockData)); /* Guess how much space we'll need. */ els = MaxBackends; el = 0; data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * els); /* * First, we iterate through the per-backend fast-path arrays, locking * them one at a time. This might produce an inconsistent picture of the * system state, but taking all of those LWLocks at the same time seems * impractical (in particular, note MAX_SIMUL_LWLOCKS). It shouldn't * matter too much, because none of these locks can be involved in lock * conflicts anyway - anything that might must be present in the main lock * table. (For the same reason, we don't sweat about making leaderPid * completely valid. We cannot safely dereference another backend's * lockGroupLeader field without holding all lock partition locks, and * it's not worth that.) */ for (i = 0; i < ProcGlobal->allProcCount; ++i) { PGPROC *proc = &ProcGlobal->allProcs[i]; uint32 f; LWLockAcquire(&proc->fpInfoLock, LW_SHARED); for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; ++f) { LockInstanceData *instance; uint32 lockbits = FAST_PATH_GET_BITS(proc, f); /* Skip unallocated slots. */ if (!lockbits) continue; if (el >= els) { els += MaxBackends; data->locks = (LockInstanceData *) repalloc(data->locks, sizeof(LockInstanceData) * els); } instance = &data->locks[el]; SET_LOCKTAG_RELATION(instance->locktag, proc->databaseId, proc->fpRelId[f]); instance->holdMask = lockbits << FAST_PATH_LOCKNUMBER_OFFSET; instance->waitLockMode = NoLock; instance->backend = proc->backendId; instance->lxid = proc->lxid; instance->pid = proc->pid; instance->leaderPid = proc->pid; instance->fastpath = true; el++; } if (proc->fpVXIDLock) { VirtualTransactionId vxid; LockInstanceData *instance; if (el >= els) { els += MaxBackends; data->locks = (LockInstanceData *) repalloc(data->locks, sizeof(LockInstanceData) * els); } vxid.backendId = proc->backendId; vxid.localTransactionId = proc->fpLocalTransactionId; instance = &data->locks[el]; SET_LOCKTAG_VIRTUALTRANSACTION(instance->locktag, vxid); instance->holdMask = LOCKBIT_ON(ExclusiveLock); instance->waitLockMode = NoLock; instance->backend = proc->backendId; instance->lxid = proc->lxid; instance->pid = proc->pid; instance->leaderPid = proc->pid; instance->fastpath = true; el++; } LWLockRelease(&proc->fpInfoLock); } /* * Next, acquire lock on the entire shared lock data structure. We do * this so that, at least for locks in the primary lock table, the state * will be self-consistent. * * Since this is a read-only operation, we take shared instead of * exclusive lock. There's not a whole lot of point to this, because all * the normal operations require exclusive lock, but it doesn't hurt * anything either. It will at least allow two backends to do * GetLockStatusData in parallel. * * Must grab LWLocks in partition-number order to avoid LWLock deadlock. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED); /* Now we can safely count the number of proclocks */ data->nelements = el + hash_get_num_entries(LockMethodProcLockHash); if (data->nelements > els) { els = data->nelements; data->locks = (LockInstanceData *) repalloc(data->locks, sizeof(LockInstanceData) * els); } /* Now scan the tables to copy the data */ hash_seq_init(&seqstat, LockMethodProcLockHash); while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat))) { PGPROC *proc = proclock->tag.myProc; LOCK *lock = proclock->tag.myLock; LockInstanceData *instance = &data->locks[el]; memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG)); instance->holdMask = proclock->holdMask; if (proc->waitLock == proclock->tag.myLock) instance->waitLockMode = proc->waitLockMode; else instance->waitLockMode = NoLock; instance->backend = proc->backendId; instance->lxid = proc->lxid; instance->pid = proc->pid; instance->leaderPid = proclock->groupLeader->pid; instance->fastpath = false; el++; } /* * And release locks. We do this in reverse order for two reasons: (1) * Anyone else who needs more than one of the locks will be trying to lock * them in increasing order; we don't want to release the other process * until it can get all the locks it needs. (2) This avoids O(N^2) * behavior inside LWLockRelease. */ for (i = NUM_LOCK_PARTITIONS; --i >= 0;) LWLockRelease(LockHashPartitionLockByIndex(i)); Assert(el == data->nelements); return data; } /* * GetBlockerStatusData - Return a summary of the lock manager's state * concerning locks that are blocking the specified PID or any member of * the PID's lock group, for use in a user-level reporting function. * * For each PID within the lock group that is awaiting some heavyweight lock, * the return data includes an array of LockInstanceData objects, which are * the same data structure used by GetLockStatusData; but unlike that function, * this one reports only the PROCLOCKs associated with the lock that that PID * is blocked on. (Hence, all the locktags should be the same for any one * blocked PID.) In addition, we return an array of the PIDs of those backends * that are ahead of the blocked PID in the lock's wait queue. These can be * compared with the PIDs in the LockInstanceData objects to determine which * waiters are ahead of or behind the blocked PID in the queue. * * If blocked_pid isn't a valid backend PID or nothing in its lock group is * waiting on any heavyweight lock, return empty arrays. * * The design goal is to hold the LWLocks for as short a time as possible; * thus, this function simply makes a copy of the necessary data and releases * the locks, allowing the caller to contemplate and format the data for as * long as it pleases. */ BlockedProcsData * GetBlockerStatusData(int blocked_pid) { BlockedProcsData *data; PGPROC *proc; int i; data = (BlockedProcsData *) palloc(sizeof(BlockedProcsData)); /* * Guess how much space we'll need, and preallocate. Most of the time * this will avoid needing to do repalloc while holding the LWLocks. (We * assume, but check with an Assert, that MaxBackends is enough entries * for the procs[] array; the other two could need enlargement, though.) */ data->nprocs = data->nlocks = data->npids = 0; data->maxprocs = data->maxlocks = data->maxpids = MaxBackends; data->procs = (BlockedProcData *) palloc(sizeof(BlockedProcData) * data->maxprocs); data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * data->maxlocks); data->waiter_pids = (int *) palloc(sizeof(int) * data->maxpids); /* * In order to search the ProcArray for blocked_pid and assume that that * entry won't immediately disappear under us, we must hold ProcArrayLock. * In addition, to examine the lock grouping fields of any other backend, * we must hold all the hash partition locks. (Only one of those locks is * actually relevant for any one lock group, but we can't know which one * ahead of time.) It's fairly annoying to hold all those locks * throughout this, but it's no worse than GetLockStatusData(), and it * does have the advantage that we're guaranteed to return a * self-consistent instantaneous state. */ LWLockAcquire(ProcArrayLock, LW_SHARED); proc = BackendPidGetProcWithLock(blocked_pid); /* Nothing to do if it's gone */ if (proc != NULL) { /* * Acquire lock on the entire shared lock data structure. See notes * in GetLockStatusData(). */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED); if (proc->lockGroupLeader == NULL) { /* Easy case, proc is not a lock group member */ GetSingleProcBlockerStatusData(proc, data); } else { /* Examine all procs in proc's lock group */ dlist_iter iter; dlist_foreach(iter, &proc->lockGroupLeader->lockGroupMembers) { PGPROC *memberProc; memberProc = dlist_container(PGPROC, lockGroupLink, iter.cur); GetSingleProcBlockerStatusData(memberProc, data); } } /* * And release locks. See notes in GetLockStatusData(). */ for (i = NUM_LOCK_PARTITIONS; --i >= 0;) LWLockRelease(LockHashPartitionLockByIndex(i)); Assert(data->nprocs <= data->maxprocs); } LWLockRelease(ProcArrayLock); return data; } /* Accumulate data about one possibly-blocked proc for GetBlockerStatusData */ static void GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data) { LOCK *theLock = blocked_proc->waitLock; BlockedProcData *bproc; SHM_QUEUE *procLocks; PROCLOCK *proclock; PROC_QUEUE *waitQueue; PGPROC *proc; int queue_size; int i; /* Nothing to do if this proc is not blocked */ if (theLock == NULL) return; /* Set up a procs[] element */ bproc = &data->procs[data->nprocs++]; bproc->pid = blocked_proc->pid; bproc->first_lock = data->nlocks; bproc->first_waiter = data->npids; /* * We may ignore the proc's fast-path arrays, since nothing in those could * be related to a contended lock. */ /* Collect all PROCLOCKs associated with theLock */ procLocks = &(theLock->procLocks); proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink)); while (proclock) { PGPROC *proc = proclock->tag.myProc; LOCK *lock = proclock->tag.myLock; LockInstanceData *instance; if (data->nlocks >= data->maxlocks) { data->maxlocks += MaxBackends; data->locks = (LockInstanceData *) repalloc(data->locks, sizeof(LockInstanceData) * data->maxlocks); } instance = &data->locks[data->nlocks]; memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG)); instance->holdMask = proclock->holdMask; if (proc->waitLock == lock) instance->waitLockMode = proc->waitLockMode; else instance->waitLockMode = NoLock; instance->backend = proc->backendId; instance->lxid = proc->lxid; instance->pid = proc->pid; instance->leaderPid = proclock->groupLeader->pid; instance->fastpath = false; data->nlocks++; proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink, offsetof(PROCLOCK, lockLink)); } /* Enlarge waiter_pids[] if it's too small to hold all wait queue PIDs */ waitQueue = &(theLock->waitProcs); queue_size = waitQueue->size; if (queue_size > data->maxpids - data->npids) { data->maxpids = Max(data->maxpids + MaxBackends, data->npids + queue_size); data->waiter_pids = (int *) repalloc(data->waiter_pids, sizeof(int) * data->maxpids); } /* Collect PIDs from the lock's wait queue, stopping at blocked_proc */ proc = (PGPROC *) waitQueue->links.next; for (i = 0; i < queue_size; i++) { if (proc == blocked_proc) break; data->waiter_pids[data->npids++] = proc->pid; proc = (PGPROC *) proc->links.next; } bproc->num_locks = data->nlocks - bproc->first_lock; bproc->num_waiters = data->npids - bproc->first_waiter; } /* * Returns a list of currently held AccessExclusiveLocks, for use by * LogStandbySnapshot(). The result is a palloc'd array, * with the number of elements returned into *nlocks. * * XXX This currently takes a lock on all partitions of the lock table, * but it's possible to do better. By reference counting locks and storing * the value in the ProcArray entry for each backend we could tell if any * locks need recording without having to acquire the partition locks and * scan the lock table. Whether that's worth the additional overhead * is pretty dubious though. */ xl_standby_lock * GetRunningTransactionLocks(int *nlocks) { xl_standby_lock *accessExclusiveLocks; PROCLOCK *proclock; HASH_SEQ_STATUS seqstat; int i; int index; int els; /* * Acquire lock on the entire shared lock data structure. * * Must grab LWLocks in partition-number order to avoid LWLock deadlock. */ for (i = 0; i < NUM_LOCK_PARTITIONS; i++) LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED); /* Now we can safely count the number of proclocks */ els = hash_get_num_entries(LockMethodProcLockHash); /* * Allocating enough space for all locks in the lock table is overkill, * but it's more convenient and faster than having to enlarge the array. */ accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock)); /* Now scan the tables to copy the data */ hash_seq_init(&seqstat, LockMethodProcLockHash); /* * If lock is a currently granted AccessExclusiveLock then it will have * just one proclock holder, so locks are never accessed twice in this * particular case. Don't copy this code for use elsewhere because in the * general case this will give you duplicate locks when looking at * non-exclusive lock types. */ index = 0; while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat))) { /* make sure this definition matches the one used in LockAcquire */ if ((proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock)) && proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION) { PGPROC *proc = proclock->tag.myProc; PGXACT *pgxact = &ProcGlobal->allPgXact[proc->pgprocno]; LOCK *lock = proclock->tag.myLock; TransactionId xid = pgxact->xid; /* * Don't record locks for transactions if we know they have * already issued their WAL record for commit but not yet released * lock. It is still possible that we see locks held by already * complete transactions, if they haven't yet zeroed their xids. */ if (!TransactionIdIsValid(xid)) continue; accessExclusiveLocks[index].xid = xid; accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1; accessExclusiveLocks[index].relOid = lock->tag.locktag_field2; index++; } } Assert(index <= els); /* * And release locks. We do this in reverse order for two reasons: (1) * Anyone else who needs more than one of the locks will be trying to lock * them in increasing order; we don't want to release the other process * until it can get all the locks it needs. (2) This avoids O(N^2) * behavior inside LWLockRelease. */ for (i = NUM_LOCK_PARTITIONS; --i >= 0;) LWLockRelease(LockHashPartitionLockByIndex(i)); *nlocks = index; return accessExclusiveLocks; } /* Provide the textual name of any lock mode */ const char * GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode) { Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods)); Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes); return LockMethods[lockmethodid]->lockModeNames[mode]; } #ifdef LOCK_DEBUG /* * Dump all locks in the given proc's myProcLocks lists. * * Caller is responsible for having acquired appropriate LWLocks. */ void DumpLocks(PGPROC *proc) { SHM_QUEUE *procLocks; PROCLOCK *proclock; LOCK *lock; int i; if (proc == NULL) return; if (proc->waitLock) LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0); for (i = 0; i < NUM_LOCK_PARTITIONS; i++) { procLocks = &(proc->myProcLocks[i]); proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, procLink)); while (proclock) { Assert(proclock->tag.myProc == proc); lock = proclock->tag.myLock; PROCLOCK_PRINT("DumpLocks", proclock); LOCK_PRINT("DumpLocks", lock, 0); proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, offsetof(PROCLOCK, procLink)); } } } /* * Dump all lmgr locks. * * Caller is responsible for having acquired appropriate LWLocks. */ void DumpAllLocks(void) { PGPROC *proc; PROCLOCK *proclock; LOCK *lock; HASH_SEQ_STATUS status; proc = MyProc; if (proc && proc->waitLock) LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0); hash_seq_init(&status, LockMethodProcLockHash); while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL) { PROCLOCK_PRINT("DumpAllLocks", proclock); lock = proclock->tag.myLock; if (lock) LOCK_PRINT("DumpAllLocks", lock, 0); else elog(LOG, "DumpAllLocks: proclock->tag.myLock = NULL"); } } #endif /* LOCK_DEBUG */ /* * LOCK 2PC resource manager's routines */ /* * Re-acquire a lock belonging to a transaction that was prepared. * * Because this function is run at db startup, re-acquiring the locks should * never conflict with running transactions because there are none. We * assume that the lock state represented by the stored 2PC files is legal. * * When switching from Hot Standby mode to normal operation, the locks will * be already held by the startup process. The locks are acquired for the new * procs without checking for conflicts, so we don't get a conflict between the * startup process and the dummy procs, even though we will momentarily have * a situation where two procs are holding the same AccessExclusiveLock, * which isn't normally possible because the conflict. If we're in standby * mode, but a recovery snapshot hasn't been established yet, it's possible * that some but not all of the locks are already held by the startup process. * * This approach is simple, but also a bit dangerous, because if there isn't * enough shared memory to acquire the locks, an error will be thrown, which * is promoted to FATAL and recovery will abort, bringing down postmaster. * A safer approach would be to transfer the locks like we do in * AtPrepare_Locks, but then again, in hot standby mode it's possible for * read-only backends to use up all the shared lock memory anyway, so that * replaying the WAL record that needs to acquire a lock will throw an error * and PANIC anyway. */ void lock_twophase_recover(TransactionId xid, uint16 info, void *recdata, uint32 len) { TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; PGPROC *proc = TwoPhaseGetDummyProc(xid, false); LOCKTAG *locktag; LOCKMODE lockmode; LOCKMETHODID lockmethodid; LOCK *lock; PROCLOCK *proclock; PROCLOCKTAG proclocktag; bool found; uint32 hashcode; uint32 proclock_hashcode; int partition; LWLock *partitionLock; LockMethod lockMethodTable; Assert(len == sizeof(TwoPhaseLockRecord)); locktag = &rec->locktag; lockmode = rec->lockmode; lockmethodid = locktag->locktag_lockmethodid; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) elog(ERROR, "unrecognized lock method: %d", lockmethodid); lockMethodTable = LockMethods[lockmethodid]; hashcode = LockTagHashCode(locktag); partition = LockHashPartition(hashcode); partitionLock = LockHashPartitionLock(hashcode); LWLockAcquire(partitionLock, LW_EXCLUSIVE); /* * Find or create a lock with this tag. */ lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, (void *) locktag, hashcode, HASH_ENTER_NULL, &found); if (!lock) { LWLockRelease(partitionLock); ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"), errhint("You might need to increase max_locks_per_transaction."))); } /* * if it's a new lock object, initialize it */ if (!found) { lock->grantMask = 0; lock->waitMask = 0; SHMQueueInit(&(lock->procLocks)); ProcQueueInit(&(lock->waitProcs)); lock->nRequested = 0; lock->nGranted = 0; MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES); MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES); LOCK_PRINT("lock_twophase_recover: new", lock, lockmode); } else { LOCK_PRINT("lock_twophase_recover: found", lock, lockmode); Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0)); Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0)); Assert(lock->nGranted <= lock->nRequested); } /* * Create the hash key for the proclock table. */ proclocktag.myLock = lock; proclocktag.myProc = proc; proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode); /* * Find or create a proclock entry with this tag */ proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash, (void *) &proclocktag, proclock_hashcode, HASH_ENTER_NULL, &found); if (!proclock) { /* Oops, not enough shmem for the proclock */ if (lock->nRequested == 0) { /* * There are no other requestors of this lock, so garbage-collect * the lock object. We *must* do this to avoid a permanent leak * of shared memory, because there won't be anything to cause * anyone to release the lock object later. */ Assert(SHMQueueEmpty(&(lock->procLocks))); if (!hash_search_with_hash_value(LockMethodLockHash, (void *) &(lock->tag), hashcode, HASH_REMOVE, NULL)) elog(PANIC, "lock table corrupted"); } LWLockRelease(partitionLock); ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"), errhint("You might need to increase max_locks_per_transaction."))); } /* * If new, initialize the new entry */ if (!found) { Assert(proc->lockGroupLeader == NULL); proclock->groupLeader = proc; proclock->holdMask = 0; proclock->releaseMask = 0; /* Add proclock to appropriate lists */ SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); SHMQueueInsertBefore(&(proc->myProcLocks[partition]), &proclock->procLink); PROCLOCK_PRINT("lock_twophase_recover: new", proclock); } else { PROCLOCK_PRINT("lock_twophase_recover: found", proclock); Assert((proclock->holdMask & ~lock->grantMask) == 0); } /* * lock->nRequested and lock->requested[] count the total number of * requests, whether granted or waiting, so increment those immediately. */ lock->nRequested++; lock->requested[lockmode]++; Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); /* * We shouldn't already hold the desired lock. */ if (proclock->holdMask & LOCKBIT_ON(lockmode)) elog(ERROR, "lock %s on object %u/%u/%u is already held", lockMethodTable->lockModeNames[lockmode], lock->tag.locktag_field1, lock->tag.locktag_field2, lock->tag.locktag_field3); /* * We ignore any possible conflicts and just grant ourselves the lock. Not * only because we don't bother, but also to avoid deadlocks when * switching from standby to normal mode. See function comment. */ GrantLock(lock, proclock, lockmode); /* * Bump strong lock count, to make sure any fast-path lock requests won't * be granted without consulting the primary lock table. */ if (ConflictsWithRelationFastPath(&lock->tag, lockmode)) { uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode); SpinLockAcquire(&FastPathStrongRelationLocks->mutex); FastPathStrongRelationLocks->count[fasthashcode]++; SpinLockRelease(&FastPathStrongRelationLocks->mutex); } LWLockRelease(partitionLock); } /* * Re-acquire a lock belonging to a transaction that was prepared, when * starting up into hot standby mode. */ void lock_twophase_standby_recover(TransactionId xid, uint16 info, void *recdata, uint32 len) { TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; LOCKTAG *locktag; LOCKMODE lockmode; LOCKMETHODID lockmethodid; Assert(len == sizeof(TwoPhaseLockRecord)); locktag = &rec->locktag; lockmode = rec->lockmode; lockmethodid = locktag->locktag_lockmethodid; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) elog(ERROR, "unrecognized lock method: %d", lockmethodid); if (lockmode == AccessExclusiveLock && locktag->locktag_type == LOCKTAG_RELATION) { StandbyAcquireAccessExclusiveLock(xid, locktag->locktag_field1 /* dboid */ , locktag->locktag_field2 /* reloid */ ); } } /* * 2PC processing routine for COMMIT PREPARED case. * * Find and release the lock indicated by the 2PC record. */ void lock_twophase_postcommit(TransactionId xid, uint16 info, void *recdata, uint32 len) { TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata; PGPROC *proc = TwoPhaseGetDummyProc(xid, true); LOCKTAG *locktag; LOCKMETHODID lockmethodid; LockMethod lockMethodTable; Assert(len == sizeof(TwoPhaseLockRecord)); locktag = &rec->locktag; lockmethodid = locktag->locktag_lockmethodid; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) elog(ERROR, "unrecognized lock method: %d", lockmethodid); lockMethodTable = LockMethods[lockmethodid]; LockRefindAndRelease(lockMethodTable, proc, locktag, rec->lockmode, true); } /* * 2PC processing routine for ROLLBACK PREPARED case. * * This is actually just the same as the COMMIT case. */ void lock_twophase_postabort(TransactionId xid, uint16 info, void *recdata, uint32 len) { lock_twophase_postcommit(xid, info, recdata, len); } /* * VirtualXactLockTableInsert * * Take vxid lock via the fast-path. There can't be any pre-existing * lockers, as we haven't advertised this vxid via the ProcArray yet. * * Since MyProc->fpLocalTransactionId will normally contain the same data * as MyProc->lxid, you might wonder if we really need both. The * difference is that MyProc->lxid is set and cleared unlocked, and * examined by procarray.c, while fpLocalTransactionId is protected by * fpInfoLock and is used only by the locking subsystem. Doing it this * way makes it easier to verify that there are no funny race conditions. * * We don't bother recording this lock in the local lock table, since it's * only ever released at the end of a transaction. Instead, * LockReleaseAll() calls VirtualXactLockTableCleanup(). */ void VirtualXactLockTableInsert(VirtualTransactionId vxid) { Assert(VirtualTransactionIdIsValid(vxid)); LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE); Assert(MyProc->backendId == vxid.backendId); Assert(MyProc->fpLocalTransactionId == InvalidLocalTransactionId); Assert(MyProc->fpVXIDLock == false); MyProc->fpVXIDLock = true; MyProc->fpLocalTransactionId = vxid.localTransactionId; LWLockRelease(&MyProc->fpInfoLock); } /* * VirtualXactLockTableCleanup * * Check whether a VXID lock has been materialized; if so, release it, * unblocking waiters. */ void VirtualXactLockTableCleanup(void) { bool fastpath; LocalTransactionId lxid; Assert(MyProc->backendId != InvalidBackendId); /* * Clean up shared memory state. */ LWLockAcquire(&MyProc->fpInfoLock, LW_EXCLUSIVE); fastpath = MyProc->fpVXIDLock; lxid = MyProc->fpLocalTransactionId; MyProc->fpVXIDLock = false; MyProc->fpLocalTransactionId = InvalidLocalTransactionId; LWLockRelease(&MyProc->fpInfoLock); /* * If fpVXIDLock has been cleared without touching fpLocalTransactionId, * that means someone transferred the lock to the main lock table. */ if (!fastpath && LocalTransactionIdIsValid(lxid)) { VirtualTransactionId vxid; LOCKTAG locktag; vxid.backendId = MyBackendId; vxid.localTransactionId = lxid; SET_LOCKTAG_VIRTUALTRANSACTION(locktag, vxid); LockRefindAndRelease(LockMethods[DEFAULT_LOCKMETHOD], MyProc, &locktag, ExclusiveLock, false); } } /* * VirtualXactLock * * If wait = true, wait until the given VXID has been released, and then * return true. * * If wait = false, just check whether the VXID is still running, and return * true or false. */ bool VirtualXactLock(VirtualTransactionId vxid, bool wait) { LOCKTAG tag; PGPROC *proc; Assert(VirtualTransactionIdIsValid(vxid)); if (VirtualTransactionIdIsPreparedXact(vxid)) { LockAcquireResult lar; /* * Prepared transactions don't hold vxid locks. The * LocalTransactionId is always a normal, locked XID. */ SET_LOCKTAG_TRANSACTION(tag, vxid.localTransactionId); lar = LockAcquire(&tag, ShareLock, false, !wait); if (lar != LOCKACQUIRE_NOT_AVAIL) LockRelease(&tag, ShareLock, false); return lar != LOCKACQUIRE_NOT_AVAIL; } SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid); /* * If a lock table entry must be made, this is the PGPROC on whose behalf * it must be done. Note that the transaction might end or the PGPROC * might be reassigned to a new backend before we get around to examining * it, but it doesn't matter. If we find upon examination that the * relevant lxid is no longer running here, that's enough to prove that * it's no longer running anywhere. */ proc = BackendIdGetProc(vxid.backendId); if (proc == NULL) return true; /* * We must acquire this lock before checking the backendId and lxid * against the ones we're waiting for. The target backend will only set * or clear lxid while holding this lock. */ LWLockAcquire(&proc->fpInfoLock, LW_EXCLUSIVE); /* If the transaction has ended, our work here is done. */ if (proc->backendId != vxid.backendId || proc->fpLocalTransactionId != vxid.localTransactionId) { LWLockRelease(&proc->fpInfoLock); return true; } /* * If we aren't asked to wait, there's no need to set up a lock table * entry. The transaction is still in progress, so just return false. */ if (!wait) { LWLockRelease(&proc->fpInfoLock); return false; } /* * OK, we're going to need to sleep on the VXID. But first, we must set * up the primary lock table entry, if needed (ie, convert the proc's * fast-path lock on its VXID to a regular lock). */ if (proc->fpVXIDLock) { PROCLOCK *proclock; uint32 hashcode; LWLock *partitionLock; hashcode = LockTagHashCode(&tag); partitionLock = LockHashPartitionLock(hashcode); LWLockAcquire(partitionLock, LW_EXCLUSIVE); proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc, &tag, hashcode, ExclusiveLock); if (!proclock) { LWLockRelease(partitionLock); LWLockRelease(&proc->fpInfoLock); ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"), errhint("You might need to increase max_locks_per_transaction."))); } GrantLock(proclock->tag.myLock, proclock, ExclusiveLock); LWLockRelease(partitionLock); proc->fpVXIDLock = false; } /* Done with proc->fpLockBits */ LWLockRelease(&proc->fpInfoLock); /* Time to wait. */ (void) LockAcquire(&tag, ShareLock, false, false); LockRelease(&tag, ShareLock, false); return true; } /* * LockWaiterCount * * Find the number of lock requester on this locktag */ int LockWaiterCount(const LOCKTAG *locktag) { LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid; LOCK *lock; bool found; uint32 hashcode; LWLock *partitionLock; int waiters = 0; if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)) elog(ERROR, "unrecognized lock method: %d", lockmethodid); hashcode = LockTagHashCode(locktag); partitionLock = LockHashPartitionLock(hashcode); LWLockAcquire(partitionLock, LW_EXCLUSIVE); lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash, (const void *) locktag, hashcode, HASH_FIND, &found); if (found) { Assert(lock != NULL); waiters = lock->nRequested; } LWLockRelease(partitionLock); return waiters; }