diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
commit | 46651ce6fe013220ed397add242004d764fc0153 (patch) | |
tree | 6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/storage/ipc/sinvaladt.c | |
parent | Initial commit. (diff) | |
download | postgresql-14-upstream.tar.xz postgresql-14-upstream.zip |
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/storage/ipc/sinvaladt.c')
-rw-r--r-- | src/backend/storage/ipc/sinvaladt.c | 777 |
1 files changed, 777 insertions, 0 deletions
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c new file mode 100644 index 0000000..946bd8e --- /dev/null +++ b/src/backend/storage/ipc/sinvaladt.c @@ -0,0 +1,777 @@ +/*------------------------------------------------------------------------- + * + * sinvaladt.c + * POSTGRES shared cache invalidation data manager. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/storage/ipc/sinvaladt.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <signal.h> +#include <unistd.h> + +#include "access/transam.h" +#include "miscadmin.h" +#include "storage/backendid.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "storage/procsignal.h" +#include "storage/shmem.h" +#include "storage/sinvaladt.h" +#include "storage/spin.h" + +/* + * Conceptually, the shared cache invalidation messages are stored in an + * infinite array, where maxMsgNum is the next array subscript to store a + * submitted message in, minMsgNum is the smallest array subscript containing + * a message not yet read by all backends, and we always have maxMsgNum >= + * minMsgNum. (They are equal when there are no messages pending.) For each + * active backend, there is a nextMsgNum pointer indicating the next message it + * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every + * backend. + * + * (In the current implementation, minMsgNum is a lower bound for the + * per-process nextMsgNum values, but it isn't rigorously kept equal to the + * smallest nextMsgNum --- it may lag behind. We only update it when + * SICleanupQueue is called, and we try not to do that often.) + * + * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES + * entries. We translate MsgNum values into circular-buffer indexes by + * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as + * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum + * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space + * in the buffer. If the buffer does overflow, we recover by setting the + * "reset" flag for each backend that has fallen too far behind. A backend + * that is in "reset" state is ignored while determining minMsgNum. When + * it does finally attempt to receive inval messages, it must discard all + * its invalidatable state, since it won't know what it missed. + * + * To reduce the probability of needing resets, we send a "catchup" interrupt + * to any backend that seems to be falling unreasonably far behind. The + * normal behavior is that at most one such interrupt is in flight at a time; + * when a backend completes processing a catchup interrupt, it executes + * SICleanupQueue, which will signal the next-furthest-behind backend if + * needed. This avoids undue contention from multiple backends all trying + * to catch up at once. However, the furthest-back backend might be stuck + * in a state where it can't catch up. Eventually it will get reset, so it + * won't cause any more problems for anyone but itself. But we don't want + * to find that a bunch of other backends are now too close to the reset + * threshold to be saved. So SICleanupQueue is designed to occasionally + * send extra catchup interrupts as the queue gets fuller, to backends that + * are far behind and haven't gotten one yet. As long as there aren't a lot + * of "stuck" backends, we won't need a lot of extra interrupts, since ones + * that aren't stuck will propagate their interrupts to the next guy. + * + * We would have problems if the MsgNum values overflow an integer, so + * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND + * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be + * large so that we don't need to do this often. It must be a multiple of + * MAXNUMMESSAGES so that the existing circular-buffer entries don't need + * to be moved when we do it. + * + * Access to the shared sinval array is protected by two locks, SInvalReadLock + * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this + * authorizes them to modify their own ProcState but not to modify or even + * look at anyone else's. When we need to perform array-wide updates, + * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to + * lock out all readers. Writers take SInvalWriteLock (always in exclusive + * mode) to serialize adding messages to the queue. Note that a writer + * can operate in parallel with one or more readers, because the writer + * has no need to touch anyone's ProcState, except in the infrequent cases + * when SICleanupQueue is needed. The only point of overlap is that + * the writer wants to change maxMsgNum while readers need to read it. + * We deal with that by having a spinlock that readers must take for just + * long enough to read maxMsgNum, while writers take it for just long enough + * to write maxMsgNum. (The exact rule is that you need the spinlock to + * read maxMsgNum if you are not holding SInvalWriteLock, and you need the + * spinlock to write maxMsgNum unless you are holding both locks.) + * + * Note: since maxMsgNum is an int and hence presumably atomically readable/ + * writable, the spinlock might seem unnecessary. The reason it is needed + * is to provide a memory barrier: we need to be sure that messages written + * to the array are actually there before maxMsgNum is increased, and that + * readers will see that data after fetching maxMsgNum. Multiprocessors + * that have weak memory-ordering guarantees can fail without the memory + * barrier instructions that are included in the spinlock sequences. + */ + + +/* + * Configurable parameters. + * + * MAXNUMMESSAGES: max number of shared-inval messages we can buffer. + * Must be a power of 2 for speed. + * + * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow. + * Must be a multiple of MAXNUMMESSAGES. Should be large. + * + * CLEANUP_MIN: the minimum number of messages that must be in the buffer + * before we bother to call SICleanupQueue. + * + * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once + * we exceed CLEANUP_MIN. Should be a power of 2 for speed. + * + * SIG_THRESHOLD: the minimum number of messages a backend must have fallen + * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT. + * + * WRITE_QUANTUM: the max number of messages to push into the buffer per + * iteration of SIInsertDataEntries. Noncritical but should be less than + * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once + * per iteration. + */ + +#define MAXNUMMESSAGES 4096 +#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144) +#define CLEANUP_MIN (MAXNUMMESSAGES / 2) +#define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16) +#define SIG_THRESHOLD (MAXNUMMESSAGES / 2) +#define WRITE_QUANTUM 64 + +/* Per-backend state in shared invalidation structure */ +typedef struct ProcState +{ + /* procPid is zero in an inactive ProcState array entry. */ + pid_t procPid; /* PID of backend, for signaling */ + PGPROC *proc; /* PGPROC of backend */ + /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */ + int nextMsgNum; /* next message number to read */ + bool resetState; /* backend needs to reset its state */ + bool signaled; /* backend has been sent catchup signal */ + bool hasMessages; /* backend has unread messages */ + + /* + * Backend only sends invalidations, never receives them. This only makes + * sense for Startup process during recovery because it doesn't maintain a + * relcache, yet it fires inval messages to allow query backends to see + * schema changes. + */ + bool sendOnly; /* backend only sends, never receives */ + + /* + * Next LocalTransactionId to use for each idle backend slot. We keep + * this here because it is indexed by BackendId and it is convenient to + * copy the value to and from local memory when MyBackendId is set. It's + * meaningless in an active ProcState entry. + */ + LocalTransactionId nextLXID; +} ProcState; + +/* Shared cache invalidation memory segment */ +typedef struct SISeg +{ + /* + * General state information + */ + int minMsgNum; /* oldest message still needed */ + int maxMsgNum; /* next message number to be assigned */ + int nextThreshold; /* # of messages to call SICleanupQueue */ + int lastBackend; /* index of last active procState entry, +1 */ + int maxBackends; /* size of procState array */ + + slock_t msgnumLock; /* spinlock protecting maxMsgNum */ + + /* + * Circular buffer holding shared-inval messages + */ + SharedInvalidationMessage buffer[MAXNUMMESSAGES]; + + /* + * Per-backend invalidation state info (has MaxBackends entries). + */ + ProcState procState[FLEXIBLE_ARRAY_MEMBER]; +} SISeg; + +static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */ + + +static LocalTransactionId nextLocalTransactionId; + +static void CleanupInvalidationState(int status, Datum arg); + + +/* + * SInvalShmemSize --- return shared-memory space needed + */ +Size +SInvalShmemSize(void) +{ + Size size; + + size = offsetof(SISeg, procState); + size = add_size(size, mul_size(sizeof(ProcState), MaxBackends)); + + return size; +} + +/* + * CreateSharedInvalidationState + * Create and initialize the SI message buffer + */ +void +CreateSharedInvalidationState(void) +{ + int i; + bool found; + + /* Allocate space in shared memory */ + shmInvalBuffer = (SISeg *) + ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found); + if (found) + return; + + /* Clear message counters, save size of procState array, init spinlock */ + shmInvalBuffer->minMsgNum = 0; + shmInvalBuffer->maxMsgNum = 0; + shmInvalBuffer->nextThreshold = CLEANUP_MIN; + shmInvalBuffer->lastBackend = 0; + shmInvalBuffer->maxBackends = MaxBackends; + SpinLockInit(&shmInvalBuffer->msgnumLock); + + /* The buffer[] array is initially all unused, so we need not fill it */ + + /* Mark all backends inactive, and initialize nextLXID */ + for (i = 0; i < shmInvalBuffer->maxBackends; i++) + { + shmInvalBuffer->procState[i].procPid = 0; /* inactive */ + shmInvalBuffer->procState[i].proc = NULL; + shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */ + shmInvalBuffer->procState[i].resetState = false; + shmInvalBuffer->procState[i].signaled = false; + shmInvalBuffer->procState[i].hasMessages = false; + shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId; + } +} + +/* + * SharedInvalBackendInit + * Initialize a new backend to operate on the sinval buffer + */ +void +SharedInvalBackendInit(bool sendOnly) +{ + int index; + ProcState *stateP = NULL; + SISeg *segP = shmInvalBuffer; + + /* + * This can run in parallel with read operations, but not with write + * operations, since SIInsertDataEntries relies on lastBackend to set + * hasMessages appropriately. + */ + LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); + + /* Look for a free entry in the procState array */ + for (index = 0; index < segP->lastBackend; index++) + { + if (segP->procState[index].procPid == 0) /* inactive slot? */ + { + stateP = &segP->procState[index]; + break; + } + } + + if (stateP == NULL) + { + if (segP->lastBackend < segP->maxBackends) + { + stateP = &segP->procState[segP->lastBackend]; + Assert(stateP->procPid == 0); + segP->lastBackend++; + } + else + { + /* + * out of procState slots: MaxBackends exceeded -- report normally + */ + MyBackendId = InvalidBackendId; + LWLockRelease(SInvalWriteLock); + ereport(FATAL, + (errcode(ERRCODE_TOO_MANY_CONNECTIONS), + errmsg("sorry, too many clients already"))); + } + } + + MyBackendId = (stateP - &segP->procState[0]) + 1; + + /* Advertise assigned backend ID in MyProc */ + MyProc->backendId = MyBackendId; + + /* Fetch next local transaction ID into local memory */ + nextLocalTransactionId = stateP->nextLXID; + + /* mark myself active, with all extant messages already read */ + stateP->procPid = MyProcPid; + stateP->proc = MyProc; + stateP->nextMsgNum = segP->maxMsgNum; + stateP->resetState = false; + stateP->signaled = false; + stateP->hasMessages = false; + stateP->sendOnly = sendOnly; + + LWLockRelease(SInvalWriteLock); + + /* register exit routine to mark my entry inactive at exit */ + on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP)); + + elog(DEBUG4, "my backend ID is %d", MyBackendId); +} + +/* + * CleanupInvalidationState + * Mark the current backend as no longer active. + * + * This function is called via on_shmem_exit() during backend shutdown. + * + * arg is really of type "SISeg*". + */ +static void +CleanupInvalidationState(int status, Datum arg) +{ + SISeg *segP = (SISeg *) DatumGetPointer(arg); + ProcState *stateP; + int i; + + Assert(PointerIsValid(segP)); + + LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); + + stateP = &segP->procState[MyBackendId - 1]; + + /* Update next local transaction ID for next holder of this backendID */ + stateP->nextLXID = nextLocalTransactionId; + + /* Mark myself inactive */ + stateP->procPid = 0; + stateP->proc = NULL; + stateP->nextMsgNum = 0; + stateP->resetState = false; + stateP->signaled = false; + + /* Recompute index of last active backend */ + for (i = segP->lastBackend; i > 0; i--) + { + if (segP->procState[i - 1].procPid != 0) + break; + } + segP->lastBackend = i; + + LWLockRelease(SInvalWriteLock); +} + +/* + * BackendIdGetProc + * Get the PGPROC structure for a backend, given the backend ID. + * The result may be out of date arbitrarily quickly, so the caller + * must be careful about how this information is used. NULL is + * returned if the backend is not active. + */ +PGPROC * +BackendIdGetProc(int backendID) +{ + PGPROC *result = NULL; + SISeg *segP = shmInvalBuffer; + + /* Need to lock out additions/removals of backends */ + LWLockAcquire(SInvalWriteLock, LW_SHARED); + + if (backendID > 0 && backendID <= segP->lastBackend) + { + ProcState *stateP = &segP->procState[backendID - 1]; + + result = stateP->proc; + } + + LWLockRelease(SInvalWriteLock); + + return result; +} + +/* + * BackendIdGetTransactionIds + * Get the xid and xmin of the backend. The result may be out of date + * arbitrarily quickly, so the caller must be careful about how this + * information is used. + */ +void +BackendIdGetTransactionIds(int backendID, TransactionId *xid, TransactionId *xmin) +{ + SISeg *segP = shmInvalBuffer; + + *xid = InvalidTransactionId; + *xmin = InvalidTransactionId; + + /* Need to lock out additions/removals of backends */ + LWLockAcquire(SInvalWriteLock, LW_SHARED); + + if (backendID > 0 && backendID <= segP->lastBackend) + { + ProcState *stateP = &segP->procState[backendID - 1]; + PGPROC *proc = stateP->proc; + + if (proc != NULL) + { + *xid = proc->xid; + *xmin = proc->xmin; + } + } + + LWLockRelease(SInvalWriteLock); +} + +/* + * SIInsertDataEntries + * Add new invalidation message(s) to the buffer. + */ +void +SIInsertDataEntries(const SharedInvalidationMessage *data, int n) +{ + SISeg *segP = shmInvalBuffer; + + /* + * N can be arbitrarily large. We divide the work into groups of no more + * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for + * an unreasonably long time. (This is not so much because we care about + * letting in other writers, as that some just-caught-up backend might be + * trying to do SICleanupQueue to pass on its signal, and we don't want it + * to have to wait a long time.) Also, we need to consider calling + * SICleanupQueue every so often. + */ + while (n > 0) + { + int nthistime = Min(n, WRITE_QUANTUM); + int numMsgs; + int max; + int i; + + n -= nthistime; + + LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); + + /* + * If the buffer is full, we *must* acquire some space. Clean the + * queue and reset anyone who is preventing space from being freed. + * Otherwise, clean the queue only when it's exceeded the next + * fullness threshold. We have to loop and recheck the buffer state + * after any call of SICleanupQueue. + */ + for (;;) + { + numMsgs = segP->maxMsgNum - segP->minMsgNum; + if (numMsgs + nthistime > MAXNUMMESSAGES || + numMsgs >= segP->nextThreshold) + SICleanupQueue(true, nthistime); + else + break; + } + + /* + * Insert new message(s) into proper slot of circular buffer + */ + max = segP->maxMsgNum; + while (nthistime-- > 0) + { + segP->buffer[max % MAXNUMMESSAGES] = *data++; + max++; + } + + /* Update current value of maxMsgNum using spinlock */ + SpinLockAcquire(&segP->msgnumLock); + segP->maxMsgNum = max; + SpinLockRelease(&segP->msgnumLock); + + /* + * Now that the maxMsgNum change is globally visible, we give everyone + * a swift kick to make sure they read the newly added messages. + * Releasing SInvalWriteLock will enforce a full memory barrier, so + * these (unlocked) changes will be committed to memory before we exit + * the function. + */ + for (i = 0; i < segP->lastBackend; i++) + { + ProcState *stateP = &segP->procState[i]; + + stateP->hasMessages = true; + } + + LWLockRelease(SInvalWriteLock); + } +} + +/* + * SIGetDataEntries + * get next SI message(s) for current backend, if there are any + * + * Possible return values: + * 0: no SI message available + * n>0: next n SI messages have been extracted into data[] + * -1: SI reset message extracted + * + * If the return value is less than the array size "datasize", the caller + * can assume that there are no more SI messages after the one(s) returned. + * Otherwise, another call is needed to collect more messages. + * + * NB: this can run in parallel with other instances of SIGetDataEntries + * executing on behalf of other backends, since each instance will modify only + * fields of its own backend's ProcState, and no instance will look at fields + * of other backends' ProcStates. We express this by grabbing SInvalReadLock + * in shared mode. Note that this is not exactly the normal (read-only) + * interpretation of a shared lock! Look closely at the interactions before + * allowing SInvalReadLock to be grabbed in shared mode for any other reason! + * + * NB: this can also run in parallel with SIInsertDataEntries. It is not + * guaranteed that we will return any messages added after the routine is + * entered. + * + * Note: we assume that "datasize" is not so large that it might be important + * to break our hold on SInvalReadLock into segments. + */ +int +SIGetDataEntries(SharedInvalidationMessage *data, int datasize) +{ + SISeg *segP; + ProcState *stateP; + int max; + int n; + + segP = shmInvalBuffer; + stateP = &segP->procState[MyBackendId - 1]; + + /* + * Before starting to take locks, do a quick, unlocked test to see whether + * there can possibly be anything to read. On a multiprocessor system, + * it's possible that this load could migrate backwards and occur before + * we actually enter this function, so we might miss a sinval message that + * was just added by some other processor. But they can't migrate + * backwards over a preceding lock acquisition, so it should be OK. If we + * haven't acquired a lock preventing against further relevant + * invalidations, any such occurrence is not much different than if the + * invalidation had arrived slightly later in the first place. + */ + if (!stateP->hasMessages) + return 0; + + LWLockAcquire(SInvalReadLock, LW_SHARED); + + /* + * We must reset hasMessages before determining how many messages we're + * going to read. That way, if new messages arrive after we have + * determined how many we're reading, the flag will get reset and we'll + * notice those messages part-way through. + * + * Note that, if we don't end up reading all of the messages, we had + * better be certain to reset this flag before exiting! + */ + stateP->hasMessages = false; + + /* Fetch current value of maxMsgNum using spinlock */ + SpinLockAcquire(&segP->msgnumLock); + max = segP->maxMsgNum; + SpinLockRelease(&segP->msgnumLock); + + if (stateP->resetState) + { + /* + * Force reset. We can say we have dealt with any messages added + * since the reset, as well; and that means we should clear the + * signaled flag, too. + */ + stateP->nextMsgNum = max; + stateP->resetState = false; + stateP->signaled = false; + LWLockRelease(SInvalReadLock); + return -1; + } + + /* + * Retrieve messages and advance backend's counter, until data array is + * full or there are no more messages. + * + * There may be other backends that haven't read the message(s), so we + * cannot delete them here. SICleanupQueue() will eventually remove them + * from the queue. + */ + n = 0; + while (n < datasize && stateP->nextMsgNum < max) + { + data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES]; + stateP->nextMsgNum++; + } + + /* + * If we have caught up completely, reset our "signaled" flag so that + * we'll get another signal if we fall behind again. + * + * If we haven't caught up completely, reset the hasMessages flag so that + * we see the remaining messages next time. + */ + if (stateP->nextMsgNum >= max) + stateP->signaled = false; + else + stateP->hasMessages = true; + + LWLockRelease(SInvalReadLock); + return n; +} + +/* + * SICleanupQueue + * Remove messages that have been consumed by all active backends + * + * callerHasWriteLock is true if caller is holding SInvalWriteLock. + * minFree is the minimum number of message slots to make free. + * + * Possible side effects of this routine include marking one or more + * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT + * to some backend that seems to be getting too far behind. We signal at + * most one backend at a time, for reasons explained at the top of the file. + * + * Caution: because we transiently release write lock when we have to signal + * some other backend, it is NOT guaranteed that there are still minFree + * free message slots at exit. Caller must recheck and perhaps retry. + */ +void +SICleanupQueue(bool callerHasWriteLock, int minFree) +{ + SISeg *segP = shmInvalBuffer; + int min, + minsig, + lowbound, + numMsgs, + i; + ProcState *needSig = NULL; + + /* Lock out all writers and readers */ + if (!callerHasWriteLock) + LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); + LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE); + + /* + * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the + * furthest-back backend that needs signaling (if any), and reset any + * backends that are too far back. Note that because we ignore sendOnly + * backends here it is possible for them to keep sending messages without + * a problem even when they are the only active backend. + */ + min = segP->maxMsgNum; + minsig = min - SIG_THRESHOLD; + lowbound = min - MAXNUMMESSAGES + minFree; + + for (i = 0; i < segP->lastBackend; i++) + { + ProcState *stateP = &segP->procState[i]; + int n = stateP->nextMsgNum; + + /* Ignore if inactive or already in reset state */ + if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly) + continue; + + /* + * If we must free some space and this backend is preventing it, force + * him into reset state and then ignore until he catches up. + */ + if (n < lowbound) + { + stateP->resetState = true; + /* no point in signaling him ... */ + continue; + } + + /* Track the global minimum nextMsgNum */ + if (n < min) + min = n; + + /* Also see who's furthest back of the unsignaled backends */ + if (n < minsig && !stateP->signaled) + { + minsig = n; + needSig = stateP; + } + } + segP->minMsgNum = min; + + /* + * When minMsgNum gets really large, decrement all message counters so as + * to forestall overflow of the counters. This happens seldom enough that + * folding it into the previous loop would be a loser. + */ + if (min >= MSGNUMWRAPAROUND) + { + segP->minMsgNum -= MSGNUMWRAPAROUND; + segP->maxMsgNum -= MSGNUMWRAPAROUND; + for (i = 0; i < segP->lastBackend; i++) + { + /* we don't bother skipping inactive entries here */ + segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; + } + } + + /* + * Determine how many messages are still in the queue, and set the + * threshold at which we should repeat SICleanupQueue(). + */ + numMsgs = segP->maxMsgNum - segP->minMsgNum; + if (numMsgs < CLEANUP_MIN) + segP->nextThreshold = CLEANUP_MIN; + else + segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM; + + /* + * Lastly, signal anyone who needs a catchup interrupt. Since + * SendProcSignal() might not be fast, we don't want to hold locks while + * executing it. + */ + if (needSig) + { + pid_t his_pid = needSig->procPid; + BackendId his_backendId = (needSig - &segP->procState[0]) + 1; + + needSig->signaled = true; + LWLockRelease(SInvalReadLock); + LWLockRelease(SInvalWriteLock); + elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid); + SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId); + if (callerHasWriteLock) + LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); + } + else + { + LWLockRelease(SInvalReadLock); + if (!callerHasWriteLock) + LWLockRelease(SInvalWriteLock); + } +} + + +/* + * GetNextLocalTransactionId --- allocate a new LocalTransactionId + * + * We split VirtualTransactionIds into two parts so that it is possible + * to allocate a new one without any contention for shared memory, except + * for a bit of additional overhead during backend startup/shutdown. + * The high-order part of a VirtualTransactionId is a BackendId, and the + * low-order part is a LocalTransactionId, which we assign from a local + * counter. To avoid the risk of a VirtualTransactionId being reused + * within a short interval, successive procs occupying the same backend ID + * slot should use a consecutive sequence of local IDs, which is implemented + * by copying nextLocalTransactionId as seen above. + */ +LocalTransactionId +GetNextLocalTransactionId(void) +{ + LocalTransactionId result; + + /* loop to avoid returning InvalidLocalTransactionId at wraparound */ + do + { + result = nextLocalTransactionId++; + } while (!LocalTransactionIdIsValid(result)); + + return result; +} |