summaryrefslogtreecommitdiffstats
path: root/src/backend/storage/ipc/sinvaladt.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 12:15:05 +0000
commit46651ce6fe013220ed397add242004d764fc0153 (patch)
tree6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/storage/ipc/sinvaladt.c
parentInitial commit. (diff)
downloadpostgresql-14-upstream.tar.xz
postgresql-14-upstream.zip
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/storage/ipc/sinvaladt.c')
-rw-r--r--src/backend/storage/ipc/sinvaladt.c777
1 files changed, 777 insertions, 0 deletions
diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c
new file mode 100644
index 0000000..946bd8e
--- /dev/null
+++ b/src/backend/storage/ipc/sinvaladt.c
@@ -0,0 +1,777 @@
+/*-------------------------------------------------------------------------
+ *
+ * sinvaladt.c
+ * POSTGRES shared cache invalidation data manager.
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/storage/ipc/sinvaladt.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <signal.h>
+#include <unistd.h>
+
+#include "access/transam.h"
+#include "miscadmin.h"
+#include "storage/backendid.h"
+#include "storage/ipc.h"
+#include "storage/proc.h"
+#include "storage/procsignal.h"
+#include "storage/shmem.h"
+#include "storage/sinvaladt.h"
+#include "storage/spin.h"
+
+/*
+ * Conceptually, the shared cache invalidation messages are stored in an
+ * infinite array, where maxMsgNum is the next array subscript to store a
+ * submitted message in, minMsgNum is the smallest array subscript containing
+ * a message not yet read by all backends, and we always have maxMsgNum >=
+ * minMsgNum. (They are equal when there are no messages pending.) For each
+ * active backend, there is a nextMsgNum pointer indicating the next message it
+ * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
+ * backend.
+ *
+ * (In the current implementation, minMsgNum is a lower bound for the
+ * per-process nextMsgNum values, but it isn't rigorously kept equal to the
+ * smallest nextMsgNum --- it may lag behind. We only update it when
+ * SICleanupQueue is called, and we try not to do that often.)
+ *
+ * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
+ * entries. We translate MsgNum values into circular-buffer indexes by
+ * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
+ * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
+ * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
+ * in the buffer. If the buffer does overflow, we recover by setting the
+ * "reset" flag for each backend that has fallen too far behind. A backend
+ * that is in "reset" state is ignored while determining minMsgNum. When
+ * it does finally attempt to receive inval messages, it must discard all
+ * its invalidatable state, since it won't know what it missed.
+ *
+ * To reduce the probability of needing resets, we send a "catchup" interrupt
+ * to any backend that seems to be falling unreasonably far behind. The
+ * normal behavior is that at most one such interrupt is in flight at a time;
+ * when a backend completes processing a catchup interrupt, it executes
+ * SICleanupQueue, which will signal the next-furthest-behind backend if
+ * needed. This avoids undue contention from multiple backends all trying
+ * to catch up at once. However, the furthest-back backend might be stuck
+ * in a state where it can't catch up. Eventually it will get reset, so it
+ * won't cause any more problems for anyone but itself. But we don't want
+ * to find that a bunch of other backends are now too close to the reset
+ * threshold to be saved. So SICleanupQueue is designed to occasionally
+ * send extra catchup interrupts as the queue gets fuller, to backends that
+ * are far behind and haven't gotten one yet. As long as there aren't a lot
+ * of "stuck" backends, we won't need a lot of extra interrupts, since ones
+ * that aren't stuck will propagate their interrupts to the next guy.
+ *
+ * We would have problems if the MsgNum values overflow an integer, so
+ * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
+ * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
+ * large so that we don't need to do this often. It must be a multiple of
+ * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
+ * to be moved when we do it.
+ *
+ * Access to the shared sinval array is protected by two locks, SInvalReadLock
+ * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
+ * authorizes them to modify their own ProcState but not to modify or even
+ * look at anyone else's. When we need to perform array-wide updates,
+ * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
+ * lock out all readers. Writers take SInvalWriteLock (always in exclusive
+ * mode) to serialize adding messages to the queue. Note that a writer
+ * can operate in parallel with one or more readers, because the writer
+ * has no need to touch anyone's ProcState, except in the infrequent cases
+ * when SICleanupQueue is needed. The only point of overlap is that
+ * the writer wants to change maxMsgNum while readers need to read it.
+ * We deal with that by having a spinlock that readers must take for just
+ * long enough to read maxMsgNum, while writers take it for just long enough
+ * to write maxMsgNum. (The exact rule is that you need the spinlock to
+ * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
+ * spinlock to write maxMsgNum unless you are holding both locks.)
+ *
+ * Note: since maxMsgNum is an int and hence presumably atomically readable/
+ * writable, the spinlock might seem unnecessary. The reason it is needed
+ * is to provide a memory barrier: we need to be sure that messages written
+ * to the array are actually there before maxMsgNum is increased, and that
+ * readers will see that data after fetching maxMsgNum. Multiprocessors
+ * that have weak memory-ordering guarantees can fail without the memory
+ * barrier instructions that are included in the spinlock sequences.
+ */
+
+
+/*
+ * Configurable parameters.
+ *
+ * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
+ * Must be a power of 2 for speed.
+ *
+ * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
+ * Must be a multiple of MAXNUMMESSAGES. Should be large.
+ *
+ * CLEANUP_MIN: the minimum number of messages that must be in the buffer
+ * before we bother to call SICleanupQueue.
+ *
+ * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
+ * we exceed CLEANUP_MIN. Should be a power of 2 for speed.
+ *
+ * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
+ * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
+ *
+ * WRITE_QUANTUM: the max number of messages to push into the buffer per
+ * iteration of SIInsertDataEntries. Noncritical but should be less than
+ * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
+ * per iteration.
+ */
+
+#define MAXNUMMESSAGES 4096
+#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
+#define CLEANUP_MIN (MAXNUMMESSAGES / 2)
+#define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
+#define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
+#define WRITE_QUANTUM 64
+
+/* Per-backend state in shared invalidation structure */
+typedef struct ProcState
+{
+ /* procPid is zero in an inactive ProcState array entry. */
+ pid_t procPid; /* PID of backend, for signaling */
+ PGPROC *proc; /* PGPROC of backend */
+ /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
+ int nextMsgNum; /* next message number to read */
+ bool resetState; /* backend needs to reset its state */
+ bool signaled; /* backend has been sent catchup signal */
+ bool hasMessages; /* backend has unread messages */
+
+ /*
+ * Backend only sends invalidations, never receives them. This only makes
+ * sense for Startup process during recovery because it doesn't maintain a
+ * relcache, yet it fires inval messages to allow query backends to see
+ * schema changes.
+ */
+ bool sendOnly; /* backend only sends, never receives */
+
+ /*
+ * Next LocalTransactionId to use for each idle backend slot. We keep
+ * this here because it is indexed by BackendId and it is convenient to
+ * copy the value to and from local memory when MyBackendId is set. It's
+ * meaningless in an active ProcState entry.
+ */
+ LocalTransactionId nextLXID;
+} ProcState;
+
+/* Shared cache invalidation memory segment */
+typedef struct SISeg
+{
+ /*
+ * General state information
+ */
+ int minMsgNum; /* oldest message still needed */
+ int maxMsgNum; /* next message number to be assigned */
+ int nextThreshold; /* # of messages to call SICleanupQueue */
+ int lastBackend; /* index of last active procState entry, +1 */
+ int maxBackends; /* size of procState array */
+
+ slock_t msgnumLock; /* spinlock protecting maxMsgNum */
+
+ /*
+ * Circular buffer holding shared-inval messages
+ */
+ SharedInvalidationMessage buffer[MAXNUMMESSAGES];
+
+ /*
+ * Per-backend invalidation state info (has MaxBackends entries).
+ */
+ ProcState procState[FLEXIBLE_ARRAY_MEMBER];
+} SISeg;
+
+static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
+
+
+static LocalTransactionId nextLocalTransactionId;
+
+static void CleanupInvalidationState(int status, Datum arg);
+
+
+/*
+ * SInvalShmemSize --- return shared-memory space needed
+ */
+Size
+SInvalShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(SISeg, procState);
+ size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
+
+ return size;
+}
+
+/*
+ * CreateSharedInvalidationState
+ * Create and initialize the SI message buffer
+ */
+void
+CreateSharedInvalidationState(void)
+{
+ int i;
+ bool found;
+
+ /* Allocate space in shared memory */
+ shmInvalBuffer = (SISeg *)
+ ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
+ if (found)
+ return;
+
+ /* Clear message counters, save size of procState array, init spinlock */
+ shmInvalBuffer->minMsgNum = 0;
+ shmInvalBuffer->maxMsgNum = 0;
+ shmInvalBuffer->nextThreshold = CLEANUP_MIN;
+ shmInvalBuffer->lastBackend = 0;
+ shmInvalBuffer->maxBackends = MaxBackends;
+ SpinLockInit(&shmInvalBuffer->msgnumLock);
+
+ /* The buffer[] array is initially all unused, so we need not fill it */
+
+ /* Mark all backends inactive, and initialize nextLXID */
+ for (i = 0; i < shmInvalBuffer->maxBackends; i++)
+ {
+ shmInvalBuffer->procState[i].procPid = 0; /* inactive */
+ shmInvalBuffer->procState[i].proc = NULL;
+ shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
+ shmInvalBuffer->procState[i].resetState = false;
+ shmInvalBuffer->procState[i].signaled = false;
+ shmInvalBuffer->procState[i].hasMessages = false;
+ shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
+ }
+}
+
+/*
+ * SharedInvalBackendInit
+ * Initialize a new backend to operate on the sinval buffer
+ */
+void
+SharedInvalBackendInit(bool sendOnly)
+{
+ int index;
+ ProcState *stateP = NULL;
+ SISeg *segP = shmInvalBuffer;
+
+ /*
+ * This can run in parallel with read operations, but not with write
+ * operations, since SIInsertDataEntries relies on lastBackend to set
+ * hasMessages appropriately.
+ */
+ LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
+
+ /* Look for a free entry in the procState array */
+ for (index = 0; index < segP->lastBackend; index++)
+ {
+ if (segP->procState[index].procPid == 0) /* inactive slot? */
+ {
+ stateP = &segP->procState[index];
+ break;
+ }
+ }
+
+ if (stateP == NULL)
+ {
+ if (segP->lastBackend < segP->maxBackends)
+ {
+ stateP = &segP->procState[segP->lastBackend];
+ Assert(stateP->procPid == 0);
+ segP->lastBackend++;
+ }
+ else
+ {
+ /*
+ * out of procState slots: MaxBackends exceeded -- report normally
+ */
+ MyBackendId = InvalidBackendId;
+ LWLockRelease(SInvalWriteLock);
+ ereport(FATAL,
+ (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
+ errmsg("sorry, too many clients already")));
+ }
+ }
+
+ MyBackendId = (stateP - &segP->procState[0]) + 1;
+
+ /* Advertise assigned backend ID in MyProc */
+ MyProc->backendId = MyBackendId;
+
+ /* Fetch next local transaction ID into local memory */
+ nextLocalTransactionId = stateP->nextLXID;
+
+ /* mark myself active, with all extant messages already read */
+ stateP->procPid = MyProcPid;
+ stateP->proc = MyProc;
+ stateP->nextMsgNum = segP->maxMsgNum;
+ stateP->resetState = false;
+ stateP->signaled = false;
+ stateP->hasMessages = false;
+ stateP->sendOnly = sendOnly;
+
+ LWLockRelease(SInvalWriteLock);
+
+ /* register exit routine to mark my entry inactive at exit */
+ on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
+
+ elog(DEBUG4, "my backend ID is %d", MyBackendId);
+}
+
+/*
+ * CleanupInvalidationState
+ * Mark the current backend as no longer active.
+ *
+ * This function is called via on_shmem_exit() during backend shutdown.
+ *
+ * arg is really of type "SISeg*".
+ */
+static void
+CleanupInvalidationState(int status, Datum arg)
+{
+ SISeg *segP = (SISeg *) DatumGetPointer(arg);
+ ProcState *stateP;
+ int i;
+
+ Assert(PointerIsValid(segP));
+
+ LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
+
+ stateP = &segP->procState[MyBackendId - 1];
+
+ /* Update next local transaction ID for next holder of this backendID */
+ stateP->nextLXID = nextLocalTransactionId;
+
+ /* Mark myself inactive */
+ stateP->procPid = 0;
+ stateP->proc = NULL;
+ stateP->nextMsgNum = 0;
+ stateP->resetState = false;
+ stateP->signaled = false;
+
+ /* Recompute index of last active backend */
+ for (i = segP->lastBackend; i > 0; i--)
+ {
+ if (segP->procState[i - 1].procPid != 0)
+ break;
+ }
+ segP->lastBackend = i;
+
+ LWLockRelease(SInvalWriteLock);
+}
+
+/*
+ * BackendIdGetProc
+ * Get the PGPROC structure for a backend, given the backend ID.
+ * The result may be out of date arbitrarily quickly, so the caller
+ * must be careful about how this information is used. NULL is
+ * returned if the backend is not active.
+ */
+PGPROC *
+BackendIdGetProc(int backendID)
+{
+ PGPROC *result = NULL;
+ SISeg *segP = shmInvalBuffer;
+
+ /* Need to lock out additions/removals of backends */
+ LWLockAcquire(SInvalWriteLock, LW_SHARED);
+
+ if (backendID > 0 && backendID <= segP->lastBackend)
+ {
+ ProcState *stateP = &segP->procState[backendID - 1];
+
+ result = stateP->proc;
+ }
+
+ LWLockRelease(SInvalWriteLock);
+
+ return result;
+}
+
+/*
+ * BackendIdGetTransactionIds
+ * Get the xid and xmin of the backend. The result may be out of date
+ * arbitrarily quickly, so the caller must be careful about how this
+ * information is used.
+ */
+void
+BackendIdGetTransactionIds(int backendID, TransactionId *xid, TransactionId *xmin)
+{
+ SISeg *segP = shmInvalBuffer;
+
+ *xid = InvalidTransactionId;
+ *xmin = InvalidTransactionId;
+
+ /* Need to lock out additions/removals of backends */
+ LWLockAcquire(SInvalWriteLock, LW_SHARED);
+
+ if (backendID > 0 && backendID <= segP->lastBackend)
+ {
+ ProcState *stateP = &segP->procState[backendID - 1];
+ PGPROC *proc = stateP->proc;
+
+ if (proc != NULL)
+ {
+ *xid = proc->xid;
+ *xmin = proc->xmin;
+ }
+ }
+
+ LWLockRelease(SInvalWriteLock);
+}
+
+/*
+ * SIInsertDataEntries
+ * Add new invalidation message(s) to the buffer.
+ */
+void
+SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
+{
+ SISeg *segP = shmInvalBuffer;
+
+ /*
+ * N can be arbitrarily large. We divide the work into groups of no more
+ * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
+ * an unreasonably long time. (This is not so much because we care about
+ * letting in other writers, as that some just-caught-up backend might be
+ * trying to do SICleanupQueue to pass on its signal, and we don't want it
+ * to have to wait a long time.) Also, we need to consider calling
+ * SICleanupQueue every so often.
+ */
+ while (n > 0)
+ {
+ int nthistime = Min(n, WRITE_QUANTUM);
+ int numMsgs;
+ int max;
+ int i;
+
+ n -= nthistime;
+
+ LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
+
+ /*
+ * If the buffer is full, we *must* acquire some space. Clean the
+ * queue and reset anyone who is preventing space from being freed.
+ * Otherwise, clean the queue only when it's exceeded the next
+ * fullness threshold. We have to loop and recheck the buffer state
+ * after any call of SICleanupQueue.
+ */
+ for (;;)
+ {
+ numMsgs = segP->maxMsgNum - segP->minMsgNum;
+ if (numMsgs + nthistime > MAXNUMMESSAGES ||
+ numMsgs >= segP->nextThreshold)
+ SICleanupQueue(true, nthistime);
+ else
+ break;
+ }
+
+ /*
+ * Insert new message(s) into proper slot of circular buffer
+ */
+ max = segP->maxMsgNum;
+ while (nthistime-- > 0)
+ {
+ segP->buffer[max % MAXNUMMESSAGES] = *data++;
+ max++;
+ }
+
+ /* Update current value of maxMsgNum using spinlock */
+ SpinLockAcquire(&segP->msgnumLock);
+ segP->maxMsgNum = max;
+ SpinLockRelease(&segP->msgnumLock);
+
+ /*
+ * Now that the maxMsgNum change is globally visible, we give everyone
+ * a swift kick to make sure they read the newly added messages.
+ * Releasing SInvalWriteLock will enforce a full memory barrier, so
+ * these (unlocked) changes will be committed to memory before we exit
+ * the function.
+ */
+ for (i = 0; i < segP->lastBackend; i++)
+ {
+ ProcState *stateP = &segP->procState[i];
+
+ stateP->hasMessages = true;
+ }
+
+ LWLockRelease(SInvalWriteLock);
+ }
+}
+
+/*
+ * SIGetDataEntries
+ * get next SI message(s) for current backend, if there are any
+ *
+ * Possible return values:
+ * 0: no SI message available
+ * n>0: next n SI messages have been extracted into data[]
+ * -1: SI reset message extracted
+ *
+ * If the return value is less than the array size "datasize", the caller
+ * can assume that there are no more SI messages after the one(s) returned.
+ * Otherwise, another call is needed to collect more messages.
+ *
+ * NB: this can run in parallel with other instances of SIGetDataEntries
+ * executing on behalf of other backends, since each instance will modify only
+ * fields of its own backend's ProcState, and no instance will look at fields
+ * of other backends' ProcStates. We express this by grabbing SInvalReadLock
+ * in shared mode. Note that this is not exactly the normal (read-only)
+ * interpretation of a shared lock! Look closely at the interactions before
+ * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
+ *
+ * NB: this can also run in parallel with SIInsertDataEntries. It is not
+ * guaranteed that we will return any messages added after the routine is
+ * entered.
+ *
+ * Note: we assume that "datasize" is not so large that it might be important
+ * to break our hold on SInvalReadLock into segments.
+ */
+int
+SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
+{
+ SISeg *segP;
+ ProcState *stateP;
+ int max;
+ int n;
+
+ segP = shmInvalBuffer;
+ stateP = &segP->procState[MyBackendId - 1];
+
+ /*
+ * Before starting to take locks, do a quick, unlocked test to see whether
+ * there can possibly be anything to read. On a multiprocessor system,
+ * it's possible that this load could migrate backwards and occur before
+ * we actually enter this function, so we might miss a sinval message that
+ * was just added by some other processor. But they can't migrate
+ * backwards over a preceding lock acquisition, so it should be OK. If we
+ * haven't acquired a lock preventing against further relevant
+ * invalidations, any such occurrence is not much different than if the
+ * invalidation had arrived slightly later in the first place.
+ */
+ if (!stateP->hasMessages)
+ return 0;
+
+ LWLockAcquire(SInvalReadLock, LW_SHARED);
+
+ /*
+ * We must reset hasMessages before determining how many messages we're
+ * going to read. That way, if new messages arrive after we have
+ * determined how many we're reading, the flag will get reset and we'll
+ * notice those messages part-way through.
+ *
+ * Note that, if we don't end up reading all of the messages, we had
+ * better be certain to reset this flag before exiting!
+ */
+ stateP->hasMessages = false;
+
+ /* Fetch current value of maxMsgNum using spinlock */
+ SpinLockAcquire(&segP->msgnumLock);
+ max = segP->maxMsgNum;
+ SpinLockRelease(&segP->msgnumLock);
+
+ if (stateP->resetState)
+ {
+ /*
+ * Force reset. We can say we have dealt with any messages added
+ * since the reset, as well; and that means we should clear the
+ * signaled flag, too.
+ */
+ stateP->nextMsgNum = max;
+ stateP->resetState = false;
+ stateP->signaled = false;
+ LWLockRelease(SInvalReadLock);
+ return -1;
+ }
+
+ /*
+ * Retrieve messages and advance backend's counter, until data array is
+ * full or there are no more messages.
+ *
+ * There may be other backends that haven't read the message(s), so we
+ * cannot delete them here. SICleanupQueue() will eventually remove them
+ * from the queue.
+ */
+ n = 0;
+ while (n < datasize && stateP->nextMsgNum < max)
+ {
+ data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
+ stateP->nextMsgNum++;
+ }
+
+ /*
+ * If we have caught up completely, reset our "signaled" flag so that
+ * we'll get another signal if we fall behind again.
+ *
+ * If we haven't caught up completely, reset the hasMessages flag so that
+ * we see the remaining messages next time.
+ */
+ if (stateP->nextMsgNum >= max)
+ stateP->signaled = false;
+ else
+ stateP->hasMessages = true;
+
+ LWLockRelease(SInvalReadLock);
+ return n;
+}
+
+/*
+ * SICleanupQueue
+ * Remove messages that have been consumed by all active backends
+ *
+ * callerHasWriteLock is true if caller is holding SInvalWriteLock.
+ * minFree is the minimum number of message slots to make free.
+ *
+ * Possible side effects of this routine include marking one or more
+ * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
+ * to some backend that seems to be getting too far behind. We signal at
+ * most one backend at a time, for reasons explained at the top of the file.
+ *
+ * Caution: because we transiently release write lock when we have to signal
+ * some other backend, it is NOT guaranteed that there are still minFree
+ * free message slots at exit. Caller must recheck and perhaps retry.
+ */
+void
+SICleanupQueue(bool callerHasWriteLock, int minFree)
+{
+ SISeg *segP = shmInvalBuffer;
+ int min,
+ minsig,
+ lowbound,
+ numMsgs,
+ i;
+ ProcState *needSig = NULL;
+
+ /* Lock out all writers and readers */
+ if (!callerHasWriteLock)
+ LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
+ LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
+
+ /*
+ * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
+ * furthest-back backend that needs signaling (if any), and reset any
+ * backends that are too far back. Note that because we ignore sendOnly
+ * backends here it is possible for them to keep sending messages without
+ * a problem even when they are the only active backend.
+ */
+ min = segP->maxMsgNum;
+ minsig = min - SIG_THRESHOLD;
+ lowbound = min - MAXNUMMESSAGES + minFree;
+
+ for (i = 0; i < segP->lastBackend; i++)
+ {
+ ProcState *stateP = &segP->procState[i];
+ int n = stateP->nextMsgNum;
+
+ /* Ignore if inactive or already in reset state */
+ if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
+ continue;
+
+ /*
+ * If we must free some space and this backend is preventing it, force
+ * him into reset state and then ignore until he catches up.
+ */
+ if (n < lowbound)
+ {
+ stateP->resetState = true;
+ /* no point in signaling him ... */
+ continue;
+ }
+
+ /* Track the global minimum nextMsgNum */
+ if (n < min)
+ min = n;
+
+ /* Also see who's furthest back of the unsignaled backends */
+ if (n < minsig && !stateP->signaled)
+ {
+ minsig = n;
+ needSig = stateP;
+ }
+ }
+ segP->minMsgNum = min;
+
+ /*
+ * When minMsgNum gets really large, decrement all message counters so as
+ * to forestall overflow of the counters. This happens seldom enough that
+ * folding it into the previous loop would be a loser.
+ */
+ if (min >= MSGNUMWRAPAROUND)
+ {
+ segP->minMsgNum -= MSGNUMWRAPAROUND;
+ segP->maxMsgNum -= MSGNUMWRAPAROUND;
+ for (i = 0; i < segP->lastBackend; i++)
+ {
+ /* we don't bother skipping inactive entries here */
+ segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
+ }
+ }
+
+ /*
+ * Determine how many messages are still in the queue, and set the
+ * threshold at which we should repeat SICleanupQueue().
+ */
+ numMsgs = segP->maxMsgNum - segP->minMsgNum;
+ if (numMsgs < CLEANUP_MIN)
+ segP->nextThreshold = CLEANUP_MIN;
+ else
+ segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
+
+ /*
+ * Lastly, signal anyone who needs a catchup interrupt. Since
+ * SendProcSignal() might not be fast, we don't want to hold locks while
+ * executing it.
+ */
+ if (needSig)
+ {
+ pid_t his_pid = needSig->procPid;
+ BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
+
+ needSig->signaled = true;
+ LWLockRelease(SInvalReadLock);
+ LWLockRelease(SInvalWriteLock);
+ elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
+ SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
+ if (callerHasWriteLock)
+ LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
+ }
+ else
+ {
+ LWLockRelease(SInvalReadLock);
+ if (!callerHasWriteLock)
+ LWLockRelease(SInvalWriteLock);
+ }
+}
+
+
+/*
+ * GetNextLocalTransactionId --- allocate a new LocalTransactionId
+ *
+ * We split VirtualTransactionIds into two parts so that it is possible
+ * to allocate a new one without any contention for shared memory, except
+ * for a bit of additional overhead during backend startup/shutdown.
+ * The high-order part of a VirtualTransactionId is a BackendId, and the
+ * low-order part is a LocalTransactionId, which we assign from a local
+ * counter. To avoid the risk of a VirtualTransactionId being reused
+ * within a short interval, successive procs occupying the same backend ID
+ * slot should use a consecutive sequence of local IDs, which is implemented
+ * by copying nextLocalTransactionId as seen above.
+ */
+LocalTransactionId
+GetNextLocalTransactionId(void)
+{
+ LocalTransactionId result;
+
+ /* loop to avoid returning InvalidLocalTransactionId at wraparound */
+ do
+ {
+ result = nextLocalTransactionId++;
+ } while (!LocalTransactionIdIsValid(result));
+
+ return result;
+}