diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
commit | 46651ce6fe013220ed397add242004d764fc0153 (patch) | |
tree | 6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/commands/async.c | |
parent | Initial commit. (diff) | |
download | postgresql-14-upstream.tar.xz postgresql-14-upstream.zip |
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/backend/commands/async.c | 2460 |
1 files changed, 2460 insertions, 0 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c new file mode 100644 index 0000000..409fece --- /dev/null +++ b/src/backend/commands/async.c @@ -0,0 +1,2460 @@ +/*------------------------------------------------------------------------- + * + * async.c + * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/commands/async.c + * + *------------------------------------------------------------------------- + */ + +/*------------------------------------------------------------------------- + * Async Notification Model as of 9.0: + * + * 1. Multiple backends on same machine. Multiple backends listening on + * several channels. (Channels are also called "conditions" in other + * parts of the code.) + * + * 2. There is one central queue in disk-based storage (directory pg_notify/), + * with actively-used pages mapped into shared memory by the slru.c module. + * All notification messages are placed in the queue and later read out + * by listening backends. + * + * There is no central knowledge of which backend listens on which channel; + * every backend has its own list of interesting channels. + * + * Although there is only one queue, notifications are treated as being + * database-local; this is done by including the sender's database OID + * in each notification message. Listening backends ignore messages + * that don't match their database OID. This is important because it + * ensures senders and receivers have the same database encoding and won't + * misinterpret non-ASCII text in the channel name or payload string. + * + * Since notifications are not expected to survive database crashes, + * we can simply clean out the pg_notify data at any reboot, and there + * is no need for WAL support or fsync'ing. + * + * 3. Every backend that is listening on at least one channel registers by + * entering its PID into the array in AsyncQueueControl. It then scans all + * incoming notifications in the central queue and first compares the + * database OID of the notification with its own database OID and then + * compares the notified channel with the list of channels that it listens + * to. In case there is a match it delivers the notification event to its + * frontend. Non-matching events are simply skipped. + * + * 4. The NOTIFY statement (routine Async_Notify) stores the notification in + * a backend-local list which will not be processed until transaction end. + * + * Duplicate notifications from the same transaction are sent out as one + * notification only. This is done to save work when for example a trigger + * on a 2 million row table fires a notification for each row that has been + * changed. If the application needs to receive every single notification + * that has been sent, it can easily add some unique string into the extra + * payload parameter. + * + * When the transaction is ready to commit, PreCommit_Notify() adds the + * pending notifications to the head of the queue. The head pointer of the + * queue always points to the next free position and a position is just a + * page number and the offset in that page. This is done before marking the + * transaction as committed in clog. If we run into problems writing the + * notifications, we can still call elog(ERROR, ...) and the transaction + * will roll back. + * + * Once we have put all of the notifications into the queue, we return to + * CommitTransaction() which will then do the actual transaction commit. + * + * After commit we are called another time (AtCommit_Notify()). Here we + * make any actual updates to the effective listen state (listenChannels). + * Then we signal any backends that may be interested in our messages + * (including our own backend, if listening). This is done by + * SignalBackends(), which scans the list of listening backends and sends a + * PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't + * know which backend is listening on which channel so we must signal them + * all). We can exclude backends that are already up to date, though, and + * we can also exclude backends that are in other databases (unless they + * are way behind and should be kicked to make them advance their + * pointers). + * + * Finally, after we are out of the transaction altogether and about to go + * idle, we scan the queue for messages that need to be sent to our + * frontend (which might be notifies from other backends, or self-notifies + * from our own). This step is not part of the CommitTransaction sequence + * for two important reasons. First, we could get errors while sending + * data to our frontend, and it's really bad for errors to happen in + * post-commit cleanup. Second, in cases where a procedure issues commits + * within a single frontend command, we don't want to send notifies to our + * frontend until the command is done; but notifies to other backends + * should go out immediately after each commit. + * + * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler + * sets the process's latch, which triggers the event to be processed + * immediately if this backend is idle (i.e., it is waiting for a frontend + * command and is not within a transaction block. C.f. + * ProcessClientReadInterrupt()). Otherwise the handler may only set a + * flag, which will cause the processing to occur just before we next go + * idle. + * + * Inbound-notify processing consists of reading all of the notifications + * that have arrived since scanning last time. We read every notification + * until we reach either a notification from an uncommitted transaction or + * the head pointer's position. + * + * 6. To avoid SLRU wraparound and limit disk space consumption, the tail + * pointer needs to be advanced so that old pages can be truncated. + * This is relatively expensive (notably, it requires an exclusive lock), + * so we don't want to do it often. We make sending backends do this work + * if they advanced the queue head into a new page, but only once every + * QUEUE_CLEANUP_DELAY pages. + * + * An application that listens on the same channel it notifies will get + * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, + * by comparing be_pid in the NOTIFY message to the application's own backend's + * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the + * frontend during startup.) The above design guarantees that notifies from + * other backends will never be missed by ignoring self-notifies. + * + * The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS) + * can be varied without affecting anything but performance. The maximum + * amount of notification data that can be queued at one time is determined + * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below. + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <limits.h> +#include <unistd.h> +#include <signal.h> + +#include "access/parallel.h" +#include "access/slru.h" +#include "access/transam.h" +#include "access/xact.h" +#include "catalog/pg_database.h" +#include "commands/async.h" +#include "common/hashfn.h" +#include "funcapi.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "miscadmin.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/procsignal.h" +#include "storage/sinval.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/memutils.h" +#include "utils/ps_status.h" +#include "utils/snapmgr.h" +#include "utils/timestamp.h" + + +/* + * Maximum size of a NOTIFY payload, including terminating NULL. This + * must be kept small enough so that a notification message fits on one + * SLRU page. The magic fudge factor here is noncritical as long as it's + * more than AsyncQueueEntryEmptySize --- we make it significantly bigger + * than that, so changes in that data structure won't affect user-visible + * restrictions. + */ +#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128) + +/* + * Struct representing an entry in the global notify queue + * + * This struct declaration has the maximal length, but in a real queue entry + * the data area is only big enough for the actual channel and payload strings + * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible + * entry size, if both channel and payload strings are empty (but note it + * doesn't include alignment padding). + * + * The "length" field should always be rounded up to the next QUEUEALIGN + * multiple so that all fields are properly aligned. + */ +typedef struct AsyncQueueEntry +{ + int length; /* total allocated length of entry */ + Oid dboid; /* sender's database OID */ + TransactionId xid; /* sender's XID */ + int32 srcPid; /* sender's PID */ + char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; +} AsyncQueueEntry; + +/* Currently, no field of AsyncQueueEntry requires more than int alignment */ +#define QUEUEALIGN(len) INTALIGN(len) + +#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2) + +/* + * Struct describing a queue position, and assorted macros for working with it + */ +typedef struct QueuePosition +{ + int page; /* SLRU page number */ + int offset; /* byte offset within page */ +} QueuePosition; + +#define QUEUE_POS_PAGE(x) ((x).page) +#define QUEUE_POS_OFFSET(x) ((x).offset) + +#define SET_QUEUE_POS(x,y,z) \ + do { \ + (x).page = (y); \ + (x).offset = (z); \ + } while (0) + +#define QUEUE_POS_EQUAL(x,y) \ + ((x).page == (y).page && (x).offset == (y).offset) + +#define QUEUE_POS_IS_ZERO(x) \ + ((x).page == 0 && (x).offset == 0) + +/* choose logically smaller QueuePosition */ +#define QUEUE_POS_MIN(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \ + (x).page != (y).page ? (y) : \ + (x).offset < (y).offset ? (x) : (y)) + +/* choose logically larger QueuePosition */ +#define QUEUE_POS_MAX(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \ + (x).page != (y).page ? (x) : \ + (x).offset > (y).offset ? (x) : (y)) + +/* + * Parameter determining how often we try to advance the tail pointer: + * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is + * also the distance by which a backend in another database needs to be + * behind before we'll decide we need to wake it up to advance its pointer. + * + * Resist the temptation to make this really large. While that would save + * work in some places, it would add cost in others. In particular, this + * should likely be less than NUM_NOTIFY_BUFFERS, to ensure that backends + * catch up before the pages they'll need to read fall out of SLRU cache. + */ +#define QUEUE_CLEANUP_DELAY 4 + +/* + * Struct describing a listening backend's status + */ +typedef struct QueueBackendStatus +{ + int32 pid; /* either a PID or InvalidPid */ + Oid dboid; /* backend's database OID, or InvalidOid */ + BackendId nextListener; /* id of next listener, or InvalidBackendId */ + QueuePosition pos; /* backend has read queue up to here */ +} QueueBackendStatus; + +/* + * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff) + * + * The AsyncQueueControl structure is protected by the NotifyQueueLock and + * NotifyQueueTailLock. + * + * When holding NotifyQueueLock in SHARED mode, backends may only inspect + * their own entries as well as the head and tail pointers. Consequently we + * can allow a backend to update its own record while holding only SHARED lock + * (since no other backend will inspect it). + * + * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the + * entries of other backends and also change the head pointer. When holding + * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends + * can change the tail pointers. + * + * NotifySLRULock is used as the control lock for the pg_notify SLRU buffers. + * In order to avoid deadlocks, whenever we need multiple locks, we first get + * NotifyQueueTailLock, then NotifyQueueLock, and lastly NotifySLRULock. + * + * Each backend uses the backend[] array entry with index equal to its + * BackendId (which can range from 1 to MaxBackends). We rely on this to make + * SendProcSignal fast. + * + * The backend[] array entries for actively-listening backends are threaded + * together using firstListener and the nextListener links, so that we can + * scan them without having to iterate over inactive entries. We keep this + * list in order by BackendId so that the scan is cache-friendly when there + * are many active entries. + */ +typedef struct AsyncQueueControl +{ + QueuePosition head; /* head points to the next free location */ + QueuePosition tail; /* tail must be <= the queue position of every + * listening backend */ + int stopPage; /* oldest unrecycled page; must be <= + * tail.page */ + BackendId firstListener; /* id of first listener, or InvalidBackendId */ + TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ + QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; + /* backend[0] is not used; used entries are from [1] to [MaxBackends] */ +} AsyncQueueControl; + +static AsyncQueueControl *asyncQueueControl; + +#define QUEUE_HEAD (asyncQueueControl->head) +#define QUEUE_TAIL (asyncQueueControl->tail) +#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage) +#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener) +#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid) +#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) +#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) +#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) + +/* + * The SLRU buffer area through which we access the notification queue + */ +static SlruCtlData NotifyCtlData; + +#define NotifyCtl (&NotifyCtlData) +#define QUEUE_PAGESIZE BLCKSZ +#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */ + +/* + * Use segments 0000 through FFFF. Each contains SLRU_PAGES_PER_SEGMENT pages + * which gives us the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1. + * We could use as many segments as SlruScanDirectory() allows, but this gives + * us so much space already that it doesn't seem worth the trouble. + * + * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2 + * pages, because more than that would confuse slru.c into thinking there + * was a wraparound condition. With the default BLCKSZ this means there + * can be up to 8GB of queued-and-not-read data. + * + * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of + * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour. + */ +#define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1) + +/* + * listenChannels identifies the channels we are actually listening to + * (ie, have committed a LISTEN on). It is a simple list of channel names, + * allocated in TopMemoryContext. + */ +static List *listenChannels = NIL; /* list of C strings */ + +/* + * State for pending LISTEN/UNLISTEN actions consists of an ordered list of + * all actions requested in the current transaction. As explained above, + * we don't actually change listenChannels until we reach transaction commit. + * + * The list is kept in CurTransactionContext. In subtransactions, each + * subtransaction has its own list in its own CurTransactionContext, but + * successful subtransactions attach their lists to their parent's list. + * Failed subtransactions simply discard their lists. + */ +typedef enum +{ + LISTEN_LISTEN, + LISTEN_UNLISTEN, + LISTEN_UNLISTEN_ALL +} ListenActionKind; + +typedef struct +{ + ListenActionKind action; + char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */ +} ListenAction; + +typedef struct ActionList +{ + int nestingLevel; /* current transaction nesting depth */ + List *actions; /* list of ListenAction structs */ + struct ActionList *upper; /* details for upper transaction levels */ +} ActionList; + +static ActionList *pendingActions = NULL; + +/* + * State for outbound notifies consists of a list of all channels+payloads + * NOTIFYed in the current transaction. We do not actually perform a NOTIFY + * until and unless the transaction commits. pendingNotifies is NULL if no + * NOTIFYs have been done in the current (sub) transaction. + * + * We discard duplicate notify events issued in the same transaction. + * Hence, in addition to the list proper (which we need to track the order + * of the events, since we guarantee to deliver them in order), we build a + * hash table which we can probe to detect duplicates. Since building the + * hash table is somewhat expensive, we do so only once we have at least + * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction; + * before that we just scan the events linearly. + * + * The list is kept in CurTransactionContext. In subtransactions, each + * subtransaction has its own list in its own CurTransactionContext, but + * successful subtransactions add their entries to their parent's list. + * Failed subtransactions simply discard their lists. Since these lists + * are independent, there may be notify events in a subtransaction's list + * that duplicate events in some ancestor (sub) transaction; we get rid of + * the dups when merging the subtransaction's list into its parent's. + * + * Note: the action and notify lists do not interact within a transaction. + * In particular, if a transaction does NOTIFY and then LISTEN on the same + * condition name, it will get a self-notify at commit. This is a bit odd + * but is consistent with our historical behavior. + */ +typedef struct Notification +{ + uint16 channel_len; /* length of channel-name string */ + uint16 payload_len; /* length of payload string */ + /* null-terminated channel name, then null-terminated payload follow */ + char data[FLEXIBLE_ARRAY_MEMBER]; +} Notification; + +typedef struct NotificationList +{ + int nestingLevel; /* current transaction nesting depth */ + List *events; /* list of Notification structs */ + HTAB *hashtab; /* hash of NotificationHash structs, or NULL */ + struct NotificationList *upper; /* details for upper transaction levels */ +} NotificationList; + +#define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */ + +typedef struct NotificationHash +{ + Notification *event; /* => the actual Notification struct */ +} NotificationHash; + +static NotificationList *pendingNotifies = NULL; + +/* + * Inbound notifications are initially processed by HandleNotifyInterrupt(), + * called from inside a signal handler. That just sets the + * notifyInterruptPending flag and sets the process + * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to + * actually deal with the interrupt. + */ +volatile sig_atomic_t notifyInterruptPending = false; + +/* True if we've registered an on_shmem_exit cleanup */ +static bool unlistenExitRegistered = false; + +/* True if we're currently registered as a listener in asyncQueueControl */ +static bool amRegisteredListener = false; + +/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */ +static bool tryAdvanceTail = false; + +/* GUC parameter */ +bool Trace_notify = false; + +/* local function prototypes */ +static int asyncQueuePageDiff(int p, int q); +static bool asyncQueuePagePrecedes(int p, int q); +static void queue_listen(ListenActionKind action, const char *channel); +static void Async_UnlistenOnExit(int code, Datum arg); +static void Exec_ListenPreCommit(void); +static void Exec_ListenCommit(const char *channel); +static void Exec_UnlistenCommit(const char *channel); +static void Exec_UnlistenAllCommit(void); +static bool IsListeningOn(const char *channel); +static void asyncQueueUnregister(void); +static bool asyncQueueIsFull(void); +static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength); +static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); +static ListCell *asyncQueueAddEntries(ListCell *nextNotify); +static double asyncQueueUsage(void); +static void asyncQueueFillWarning(void); +static void SignalBackends(void); +static void asyncQueueReadAllNotifications(void); +static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, + QueuePosition stop, + char *page_buffer, + Snapshot snapshot); +static void asyncQueueAdvanceTail(void); +static void ProcessIncomingNotify(bool flush); +static bool AsyncExistsPendingNotify(Notification *n); +static void AddEventToPendingNotifies(Notification *n); +static uint32 notification_hash(const void *key, Size keysize); +static int notification_match(const void *key1, const void *key2, Size keysize); +static void ClearPendingActionsAndNotifies(void); + +/* + * Compute the difference between two queue page numbers (i.e., p - q), + * accounting for wraparound. + */ +static int +asyncQueuePageDiff(int p, int q) +{ + int diff; + + /* + * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be + * in the range 0..QUEUE_MAX_PAGE. + */ + Assert(p >= 0 && p <= QUEUE_MAX_PAGE); + Assert(q >= 0 && q <= QUEUE_MAX_PAGE); + + diff = p - q; + if (diff >= ((QUEUE_MAX_PAGE + 1) / 2)) + diff -= QUEUE_MAX_PAGE + 1; + else if (diff < -((QUEUE_MAX_PAGE + 1) / 2)) + diff += QUEUE_MAX_PAGE + 1; + return diff; +} + +/* + * Is p < q, accounting for wraparound? + * + * Since asyncQueueIsFull() blocks creation of a page that could precede any + * extant page, we need not assess entries within a page. + */ +static bool +asyncQueuePagePrecedes(int p, int q) +{ + return asyncQueuePageDiff(p, q) < 0; +} + +/* + * Report space needed for our shared memory area + */ +Size +AsyncShmemSize(void) +{ + Size size; + + /* This had better match AsyncShmemInit */ + size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus)); + size = add_size(size, offsetof(AsyncQueueControl, backend)); + + size = add_size(size, SimpleLruShmemSize(NUM_NOTIFY_BUFFERS, 0)); + + return size; +} + +/* + * Initialize our shared memory area + */ +void +AsyncShmemInit(void) +{ + bool found; + Size size; + + /* + * Create or attach to the AsyncQueueControl structure. + * + * The used entries in the backend[] array run from 1 to MaxBackends; the + * zero'th entry is unused but must be allocated. + */ + size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus)); + size = add_size(size, offsetof(AsyncQueueControl, backend)); + + asyncQueueControl = (AsyncQueueControl *) + ShmemInitStruct("Async Queue Control", size, &found); + + if (!found) + { + /* First time through, so initialize it */ + SET_QUEUE_POS(QUEUE_HEAD, 0, 0); + SET_QUEUE_POS(QUEUE_TAIL, 0, 0); + QUEUE_STOP_PAGE = 0; + QUEUE_FIRST_LISTENER = InvalidBackendId; + asyncQueueControl->lastQueueFillWarn = 0; + /* zero'th entry won't be used, but let's initialize it anyway */ + for (int i = 0; i <= MaxBackends; i++) + { + QUEUE_BACKEND_PID(i) = InvalidPid; + QUEUE_BACKEND_DBOID(i) = InvalidOid; + QUEUE_NEXT_LISTENER(i) = InvalidBackendId; + SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + } + } + + /* + * Set up SLRU management of the pg_notify data. + */ + NotifyCtl->PagePrecedes = asyncQueuePagePrecedes; + SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, 0, + NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER, + SYNC_HANDLER_NONE); + + if (!found) + { + /* + * During start or reboot, clean out the pg_notify directory. + */ + (void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL); + } +} + + +/* + * pg_notify - + * SQL function to send a notification event + */ +Datum +pg_notify(PG_FUNCTION_ARGS) +{ + const char *channel; + const char *payload; + + if (PG_ARGISNULL(0)) + channel = ""; + else + channel = text_to_cstring(PG_GETARG_TEXT_PP(0)); + + if (PG_ARGISNULL(1)) + payload = ""; + else + payload = text_to_cstring(PG_GETARG_TEXT_PP(1)); + + /* For NOTIFY as a statement, this is checked in ProcessUtility */ + PreventCommandDuringRecovery("NOTIFY"); + + Async_Notify(channel, payload); + + PG_RETURN_VOID(); +} + + +/* + * Async_Notify + * + * This is executed by the SQL notify command. + * + * Adds the message to the list of pending notifies. + * Actual notification happens during transaction commit. + * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + */ +void +Async_Notify(const char *channel, const char *payload) +{ + int my_level = GetCurrentTransactionNestLevel(); + size_t channel_len; + size_t payload_len; + Notification *n; + MemoryContext oldcontext; + + if (IsParallelWorker()) + elog(ERROR, "cannot send notifications from a parallel worker"); + + if (Trace_notify) + elog(DEBUG1, "Async_Notify(%s)", channel); + + channel_len = channel ? strlen(channel) : 0; + payload_len = payload ? strlen(payload) : 0; + + /* a channel name must be specified */ + if (channel_len == 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("channel name cannot be empty"))); + + /* enforce length limits */ + if (channel_len >= NAMEDATALEN) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("channel name too long"))); + + if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("payload string too long"))); + + /* + * We must construct the Notification entry, even if we end up not using + * it, in order to compare it cheaply to existing list entries. + * + * The notification list needs to live until end of transaction, so store + * it in the transaction context. + */ + oldcontext = MemoryContextSwitchTo(CurTransactionContext); + + n = (Notification *) palloc(offsetof(Notification, data) + + channel_len + payload_len + 2); + n->channel_len = channel_len; + n->payload_len = payload_len; + strcpy(n->data, channel); + if (payload) + strcpy(n->data + channel_len + 1, payload); + else + n->data[channel_len + 1] = '\0'; + + if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel) + { + NotificationList *notifies; + + /* + * First notify event in current (sub)xact. Note that we allocate the + * NotificationList in TopTransactionContext; the nestingLevel might + * get changed later by AtSubCommit_Notify. + */ + notifies = (NotificationList *) + MemoryContextAlloc(TopTransactionContext, + sizeof(NotificationList)); + notifies->nestingLevel = my_level; + notifies->events = list_make1(n); + /* We certainly don't need a hashtable yet */ + notifies->hashtab = NULL; + notifies->upper = pendingNotifies; + pendingNotifies = notifies; + } + else + { + /* Now check for duplicates */ + if (AsyncExistsPendingNotify(n)) + { + /* It's a dup, so forget it */ + pfree(n); + MemoryContextSwitchTo(oldcontext); + return; + } + + /* Append more events to existing list */ + AddEventToPendingNotifies(n); + } + + MemoryContextSwitchTo(oldcontext); +} + +/* + * queue_listen + * Common code for listen, unlisten, unlisten all commands. + * + * Adds the request to the list of pending actions. + * Actual update of the listenChannels list happens during transaction + * commit. + */ +static void +queue_listen(ListenActionKind action, const char *channel) +{ + MemoryContext oldcontext; + ListenAction *actrec; + int my_level = GetCurrentTransactionNestLevel(); + + /* + * Unlike Async_Notify, we don't try to collapse out duplicates. It would + * be too complicated to ensure we get the right interactions of + * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there + * would be any performance benefit anyway in sane applications. + */ + oldcontext = MemoryContextSwitchTo(CurTransactionContext); + + /* space for terminating null is included in sizeof(ListenAction) */ + actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) + + strlen(channel) + 1); + actrec->action = action; + strcpy(actrec->channel, channel); + + if (pendingActions == NULL || my_level > pendingActions->nestingLevel) + { + ActionList *actions; + + /* + * First action in current sub(xact). Note that we allocate the + * ActionList in TopTransactionContext; the nestingLevel might get + * changed later by AtSubCommit_Notify. + */ + actions = (ActionList *) + MemoryContextAlloc(TopTransactionContext, sizeof(ActionList)); + actions->nestingLevel = my_level; + actions->actions = list_make1(actrec); + actions->upper = pendingActions; + pendingActions = actions; + } + else + pendingActions->actions = lappend(pendingActions->actions, actrec); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Async_Listen + * + * This is executed by the SQL listen command. + */ +void +Async_Listen(const char *channel) +{ + if (Trace_notify) + elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid); + + queue_listen(LISTEN_LISTEN, channel); +} + +/* + * Async_Unlisten + * + * This is executed by the SQL unlisten command. + */ +void +Async_Unlisten(const char *channel) +{ + if (Trace_notify) + elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid); + + /* If we couldn't possibly be listening, no need to queue anything */ + if (pendingActions == NULL && !unlistenExitRegistered) + return; + + queue_listen(LISTEN_UNLISTEN, channel); +} + +/* + * Async_UnlistenAll + * + * This is invoked by UNLISTEN * command, and also at backend exit. + */ +void +Async_UnlistenAll(void) +{ + if (Trace_notify) + elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid); + + /* If we couldn't possibly be listening, no need to queue anything */ + if (pendingActions == NULL && !unlistenExitRegistered) + return; + + queue_listen(LISTEN_UNLISTEN_ALL, ""); +} + +/* + * SQL function: return a set of the channel names this backend is actively + * listening to. + * + * Note: this coding relies on the fact that the listenChannels list cannot + * change within a transaction. + */ +Datum +pg_listening_channels(PG_FUNCTION_ARGS) +{ + FuncCallContext *funcctx; + + /* stuff done only on the first call of the function */ + if (SRF_IS_FIRSTCALL()) + { + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + + if (funcctx->call_cntr < list_length(listenChannels)) + { + char *channel = (char *) list_nth(listenChannels, + funcctx->call_cntr); + + SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel)); + } + + SRF_RETURN_DONE(funcctx); +} + +/* + * Async_UnlistenOnExit + * + * This is executed at backend exit if we have done any LISTENs in this + * backend. It might not be necessary anymore, if the user UNLISTENed + * everything, but we don't try to detect that case. + */ +static void +Async_UnlistenOnExit(int code, Datum arg) +{ + Exec_UnlistenAllCommit(); + asyncQueueUnregister(); +} + +/* + * AtPrepare_Notify + * + * This is called at the prepare phase of a two-phase + * transaction. Save the state for possible commit later. + */ +void +AtPrepare_Notify(void) +{ + /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */ + if (pendingActions || pendingNotifies) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY"))); +} + +/* + * PreCommit_Notify + * + * This is called at transaction commit, before actually committing to + * clog. + * + * If there are pending LISTEN actions, make sure we are listed in the + * shared-memory listener array. This must happen before commit to + * ensure we don't miss any notifies from transactions that commit + * just after ours. + * + * If there are outbound notify requests in the pendingNotifies list, + * add them to the global queue. We do that before commit so that + * we can still throw error if we run out of queue space. + */ +void +PreCommit_Notify(void) +{ + ListCell *p; + + if (!pendingActions && !pendingNotifies) + return; /* no relevant statements in this xact */ + + if (Trace_notify) + elog(DEBUG1, "PreCommit_Notify"); + + /* Preflight for any pending listen/unlisten actions */ + if (pendingActions != NULL) + { + foreach(p, pendingActions->actions) + { + ListenAction *actrec = (ListenAction *) lfirst(p); + + switch (actrec->action) + { + case LISTEN_LISTEN: + Exec_ListenPreCommit(); + break; + case LISTEN_UNLISTEN: + /* there is no Exec_UnlistenPreCommit() */ + break; + case LISTEN_UNLISTEN_ALL: + /* there is no Exec_UnlistenAllPreCommit() */ + break; + } + } + } + + /* Queue any pending notifies (must happen after the above) */ + if (pendingNotifies) + { + ListCell *nextNotify; + + /* + * Make sure that we have an XID assigned to the current transaction. + * GetCurrentTransactionId is cheap if we already have an XID, but not + * so cheap if we don't, and we'd prefer not to do that work while + * holding NotifyQueueLock. + */ + (void) GetCurrentTransactionId(); + + /* + * Serialize writers by acquiring a special lock that we hold till + * after commit. This ensures that queue entries appear in commit + * order, and in particular that there are never uncommitted queue + * entries ahead of committed ones, so an uncommitted transaction + * can't block delivery of deliverable notifications. + * + * We use a heavyweight lock so that it'll automatically be released + * after either commit or abort. This also allows deadlocks to be + * detected, though really a deadlock shouldn't be possible here. + * + * The lock is on "database 0", which is pretty ugly but it doesn't + * seem worth inventing a special locktag category just for this. + * (Historical note: before PG 9.0, a similar lock on "database 0" was + * used by the flatfiles mechanism.) + */ + LockSharedObject(DatabaseRelationId, InvalidOid, 0, + AccessExclusiveLock); + + /* Now push the notifications into the queue */ + nextNotify = list_head(pendingNotifies->events); + while (nextNotify != NULL) + { + /* + * Add the pending notifications to the queue. We acquire and + * release NotifyQueueLock once per page, which might be overkill + * but it does allow readers to get in while we're doing this. + * + * A full queue is very uncommon and should really not happen, + * given that we have so much space available in the SLRU pages. + * Nevertheless we need to deal with this possibility. Note that + * when we get here we are in the process of committing our + * transaction, but we have not yet committed to clog, so at this + * point in time we can still roll the transaction back. + */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + asyncQueueFillWarning(); + if (asyncQueueIsFull()) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("too many notifications in the NOTIFY queue"))); + nextNotify = asyncQueueAddEntries(nextNotify); + LWLockRelease(NotifyQueueLock); + } + + /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */ + } +} + +/* + * AtCommit_Notify + * + * This is called at transaction commit, after committing to clog. + * + * Update listenChannels and clear transaction-local state. + * + * If we issued any notifications in the transaction, send signals to + * listening backends (possibly including ourselves) to process them. + * Also, if we filled enough queue pages with new notifies, try to + * advance the queue tail pointer. + */ +void +AtCommit_Notify(void) +{ + ListCell *p; + + /* + * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to + * return as soon as possible + */ + if (!pendingActions && !pendingNotifies) + return; + + if (Trace_notify) + elog(DEBUG1, "AtCommit_Notify"); + + /* Perform any pending listen/unlisten actions */ + if (pendingActions != NULL) + { + foreach(p, pendingActions->actions) + { + ListenAction *actrec = (ListenAction *) lfirst(p); + + switch (actrec->action) + { + case LISTEN_LISTEN: + Exec_ListenCommit(actrec->channel); + break; + case LISTEN_UNLISTEN: + Exec_UnlistenCommit(actrec->channel); + break; + case LISTEN_UNLISTEN_ALL: + Exec_UnlistenAllCommit(); + break; + } + } + } + + /* If no longer listening to anything, get out of listener array */ + if (amRegisteredListener && listenChannels == NIL) + asyncQueueUnregister(); + + /* + * Send signals to listening backends. We need do this only if there are + * pending notifies, which were previously added to the shared queue by + * PreCommit_Notify(). + */ + if (pendingNotifies != NULL) + SignalBackends(); + + /* + * If it's time to try to advance the global tail pointer, do that. + * + * (It might seem odd to do this in the sender, when more than likely the + * listeners won't yet have read the messages we just sent. However, + * there's less contention if only the sender does it, and there is little + * need for urgency in advancing the global tail. So this typically will + * be clearing out messages that were sent some time ago.) + */ + if (tryAdvanceTail) + { + tryAdvanceTail = false; + asyncQueueAdvanceTail(); + } + + /* And clean up */ + ClearPendingActionsAndNotifies(); +} + +/* + * Exec_ListenPreCommit --- subroutine for PreCommit_Notify + * + * This function must make sure we are ready to catch any incoming messages. + */ +static void +Exec_ListenPreCommit(void) +{ + QueuePosition head; + QueuePosition max; + BackendId prevListener; + + /* + * Nothing to do if we are already listening to something, nor if we + * already ran this routine in this transaction. + */ + if (amRegisteredListener) + return; + + if (Trace_notify) + elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid); + + /* + * Before registering, make sure we will unlisten before dying. (Note: + * this action does not get undone if we abort later.) + */ + if (!unlistenExitRegistered) + { + before_shmem_exit(Async_UnlistenOnExit, 0); + unlistenExitRegistered = true; + } + + /* + * This is our first LISTEN, so establish our pointer. + * + * We set our pointer to the global tail pointer and then move it forward + * over already-committed notifications. This ensures we cannot miss any + * not-yet-committed notifications. We might get a few more but that + * doesn't hurt. + * + * In some scenarios there might be a lot of committed notifications that + * have not yet been pruned away (because some backend is being lazy about + * reading them). To reduce our startup time, we can look at other + * backends and adopt the maximum "pos" pointer of any backend that's in + * our database; any notifications it's already advanced over are surely + * committed and need not be re-examined by us. (We must consider only + * backends connected to our DB, because others will not have bothered to + * check committed-ness of notifications in our DB.) + * + * We need exclusive lock here so we can look at other backends' entries + * and manipulate the list links. + */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + head = QUEUE_HEAD; + max = QUEUE_TAIL; + prevListener = InvalidBackendId; + for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) + { + if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) + max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i)); + /* Also find last listening backend before this one */ + if (i < MyBackendId) + prevListener = i; + } + QUEUE_BACKEND_POS(MyBackendId) = max; + QUEUE_BACKEND_PID(MyBackendId) = MyProcPid; + QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId; + /* Insert backend into list of listeners at correct position */ + if (prevListener > 0) + { + QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_NEXT_LISTENER(prevListener); + QUEUE_NEXT_LISTENER(prevListener) = MyBackendId; + } + else + { + QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_FIRST_LISTENER; + QUEUE_FIRST_LISTENER = MyBackendId; + } + LWLockRelease(NotifyQueueLock); + + /* Now we are listed in the global array, so remember we're listening */ + amRegisteredListener = true; + + /* + * Try to move our pointer forward as far as possible. This will skip + * over already-committed notifications, which we want to do because they + * might be quite stale. Note that we are not yet listening on anything, + * so we won't deliver such notifications to our frontend. Also, although + * our transaction might have executed NOTIFY, those message(s) aren't + * queued yet so we won't skip them here. + */ + if (!QUEUE_POS_EQUAL(max, head)) + asyncQueueReadAllNotifications(); +} + +/* + * Exec_ListenCommit --- subroutine for AtCommit_Notify + * + * Add the channel to the list of channels we are listening on. + */ +static void +Exec_ListenCommit(const char *channel) +{ + MemoryContext oldcontext; + + /* Do nothing if we are already listening on this channel */ + if (IsListeningOn(channel)) + return; + + /* + * Add the new channel name to listenChannels. + * + * XXX It is theoretically possible to get an out-of-memory failure here, + * which would be bad because we already committed. For the moment it + * doesn't seem worth trying to guard against that, but maybe improve this + * later. + */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + listenChannels = lappend(listenChannels, pstrdup(channel)); + MemoryContextSwitchTo(oldcontext); +} + +/* + * Exec_UnlistenCommit --- subroutine for AtCommit_Notify + * + * Remove the specified channel name from listenChannels. + */ +static void +Exec_UnlistenCommit(const char *channel) +{ + ListCell *q; + + if (Trace_notify) + elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid); + + foreach(q, listenChannels) + { + char *lchan = (char *) lfirst(q); + + if (strcmp(lchan, channel) == 0) + { + listenChannels = foreach_delete_current(listenChannels, q); + pfree(lchan); + break; + } + } + + /* + * We do not complain about unlistening something not being listened; + * should we? + */ +} + +/* + * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify + * + * Unlisten on all channels for this backend. + */ +static void +Exec_UnlistenAllCommit(void) +{ + if (Trace_notify) + elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid); + + list_free_deep(listenChannels); + listenChannels = NIL; +} + +/* + * ProcessCompletedNotifies --- nowadays this does nothing + * + * This routine used to send signals and handle self-notifies, + * but that functionality has been moved elsewhere. + * We'd delete it entirely, except that the documentation used to instruct + * background-worker authors to call it. To avoid an ABI break in stable + * branches, keep it as a no-op routine. + */ +void +ProcessCompletedNotifies(void) +{ +} + +/* + * Test whether we are actively listening on the given channel name. + * + * Note: this function is executed for every notification found in the queue. + * Perhaps it is worth further optimization, eg convert the list to a sorted + * array so we can binary-search it. In practice the list is likely to be + * fairly short, though. + */ +static bool +IsListeningOn(const char *channel) +{ + ListCell *p; + + foreach(p, listenChannels) + { + char *lchan = (char *) lfirst(p); + + if (strcmp(lchan, channel) == 0) + return true; + } + return false; +} + +/* + * Remove our entry from the listeners array when we are no longer listening + * on any channel. NB: must not fail if we're already not listening. + */ +static void +asyncQueueUnregister(void) +{ + Assert(listenChannels == NIL); /* else caller error */ + + if (!amRegisteredListener) /* nothing to do */ + return; + + /* + * Need exclusive lock here to manipulate list links. + */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + /* Mark our entry as invalid */ + QUEUE_BACKEND_PID(MyBackendId) = InvalidPid; + QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid; + /* and remove it from the list */ + if (QUEUE_FIRST_LISTENER == MyBackendId) + QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyBackendId); + else + { + for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) + { + if (QUEUE_NEXT_LISTENER(i) == MyBackendId) + { + QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyBackendId); + break; + } + } + } + QUEUE_NEXT_LISTENER(MyBackendId) = InvalidBackendId; + LWLockRelease(NotifyQueueLock); + + /* mark ourselves as no longer listed in the global array */ + amRegisteredListener = false; +} + +/* + * Test whether there is room to insert more notification messages. + * + * Caller must hold at least shared NotifyQueueLock. + */ +static bool +asyncQueueIsFull(void) +{ + int nexthead; + int boundary; + + /* + * The queue is full if creating a new head page would create a page that + * logically precedes the current global tail pointer, ie, the head + * pointer would wrap around compared to the tail. We cannot create such + * a head page for fear of confusing slru.c. For safety we round the tail + * pointer back to a segment boundary (truncation logic in + * asyncQueueAdvanceTail does not do this, so doing it here is optional). + * + * Note that this test is *not* dependent on how much space there is on + * the current head page. This is necessary because asyncQueueAddEntries + * might try to create the next head page in any case. + */ + nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1; + if (nexthead > QUEUE_MAX_PAGE) + nexthead = 0; /* wrap around */ + boundary = QUEUE_STOP_PAGE; + boundary -= boundary % SLRU_PAGES_PER_SEGMENT; + return asyncQueuePagePrecedes(nexthead, boundary); +} + +/* + * Advance the QueuePosition to the next entry, assuming that the current + * entry is of length entryLength. If we jump to a new page the function + * returns true, else false. + */ +static bool +asyncQueueAdvance(volatile QueuePosition *position, int entryLength) +{ + int pageno = QUEUE_POS_PAGE(*position); + int offset = QUEUE_POS_OFFSET(*position); + bool pageJump = false; + + /* + * Move to the next writing position: First jump over what we have just + * written or read. + */ + offset += entryLength; + Assert(offset <= QUEUE_PAGESIZE); + + /* + * In a second step check if another entry can possibly be written to the + * page. If so, stay here, we have reached the next position. If not, then + * we need to move on to the next page. + */ + if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE) + { + pageno++; + if (pageno > QUEUE_MAX_PAGE) + pageno = 0; /* wrap around */ + offset = 0; + pageJump = true; + } + + SET_QUEUE_POS(*position, pageno, offset); + return pageJump; +} + +/* + * Fill the AsyncQueueEntry at *qe with an outbound notification message. + */ +static void +asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) +{ + size_t channellen = n->channel_len; + size_t payloadlen = n->payload_len; + int entryLength; + + Assert(channellen < NAMEDATALEN); + Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH); + + /* The terminators are already included in AsyncQueueEntryEmptySize */ + entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen; + entryLength = QUEUEALIGN(entryLength); + qe->length = entryLength; + qe->dboid = MyDatabaseId; + qe->xid = GetCurrentTransactionId(); + qe->srcPid = MyProcPid; + memcpy(qe->data, n->data, channellen + payloadlen + 2); +} + +/* + * Add pending notifications to the queue. + * + * We go page by page here, i.e. we stop once we have to go to a new page but + * we will be called again and then fill that next page. If an entry does not + * fit into the current page, we write a dummy entry with an InvalidOid as the + * database OID in order to fill the page. So every page is always used up to + * the last byte which simplifies reading the page later. + * + * We are passed the list cell (in pendingNotifies->events) containing the next + * notification to write and return the first still-unwritten cell back. + * Eventually we will return NULL indicating all is done. + * + * We are holding NotifyQueueLock already from the caller and grab + * NotifySLRULock locally in this function. + */ +static ListCell * +asyncQueueAddEntries(ListCell *nextNotify) +{ + AsyncQueueEntry qe; + QueuePosition queue_head; + int pageno; + int offset; + int slotno; + + /* We hold both NotifyQueueLock and NotifySLRULock during this operation */ + LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE); + + /* + * We work with a local copy of QUEUE_HEAD, which we write back to shared + * memory upon exiting. The reason for this is that if we have to advance + * to a new page, SimpleLruZeroPage might fail (out of disk space, for + * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise, + * subsequent insertions would try to put entries into a page that slru.c + * thinks doesn't exist yet.) So, use a local position variable. Note + * that if we do fail, any already-inserted queue entries are forgotten; + * this is okay, since they'd be useless anyway after our transaction + * rolls back. + */ + queue_head = QUEUE_HEAD; + + /* + * If this is the first write since the postmaster started, we need to + * initialize the first page of the async SLRU. Otherwise, the current + * page should be initialized already, so just fetch it. + * + * (We could also take the first path when the SLRU position has just + * wrapped around, but re-zeroing the page is harmless in that case.) + */ + pageno = QUEUE_POS_PAGE(queue_head); + if (QUEUE_POS_IS_ZERO(queue_head)) + slotno = SimpleLruZeroPage(NotifyCtl, pageno); + else + slotno = SimpleLruReadPage(NotifyCtl, pageno, true, + InvalidTransactionId); + + /* Note we mark the page dirty before writing in it */ + NotifyCtl->shared->page_dirty[slotno] = true; + + while (nextNotify != NULL) + { + Notification *n = (Notification *) lfirst(nextNotify); + + /* Construct a valid queue entry in local variable qe */ + asyncQueueNotificationToEntry(n, &qe); + + offset = QUEUE_POS_OFFSET(queue_head); + + /* Check whether the entry really fits on the current page */ + if (offset + qe.length <= QUEUE_PAGESIZE) + { + /* OK, so advance nextNotify past this item */ + nextNotify = lnext(pendingNotifies->events, nextNotify); + } + else + { + /* + * Write a dummy entry to fill up the page. Actually readers will + * only check dboid and since it won't match any reader's database + * OID, they will ignore this entry and move on. + */ + qe.length = QUEUE_PAGESIZE - offset; + qe.dboid = InvalidOid; + qe.data[0] = '\0'; /* empty channel */ + qe.data[1] = '\0'; /* empty payload */ + } + + /* Now copy qe into the shared buffer page */ + memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, + &qe, + qe.length); + + /* Advance queue_head appropriately, and detect if page is full */ + if (asyncQueueAdvance(&(queue_head), qe.length)) + { + /* + * Page is full, so we're done here, but first fill the next page + * with zeroes. The reason to do this is to ensure that slru.c's + * idea of the head page is always the same as ours, which avoids + * boundary problems in SimpleLruTruncate. The test in + * asyncQueueIsFull() ensured that there is room to create this + * page without overrunning the queue. + */ + slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + + /* + * If the new page address is a multiple of QUEUE_CLEANUP_DELAY, + * set flag to remember that we should try to advance the tail + * pointer (we don't want to actually do that right here). + */ + if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) + tryAdvanceTail = true; + + /* And exit the loop */ + break; + } + } + + /* Success, so update the global QUEUE_HEAD */ + QUEUE_HEAD = queue_head; + + LWLockRelease(NotifySLRULock); + + return nextNotify; +} + +/* + * SQL function to return the fraction of the notification queue currently + * occupied. + */ +Datum +pg_notification_queue_usage(PG_FUNCTION_ARGS) +{ + double usage; + + /* Advance the queue tail so we don't report a too-large result */ + asyncQueueAdvanceTail(); + + LWLockAcquire(NotifyQueueLock, LW_SHARED); + usage = asyncQueueUsage(); + LWLockRelease(NotifyQueueLock); + + PG_RETURN_FLOAT8(usage); +} + +/* + * Return the fraction of the queue that is currently occupied. + * + * The caller must hold NotifyQueueLock in (at least) shared mode. + * + * Note: we measure the distance to the logical tail page, not the physical + * tail page. In some sense that's wrong, but the relative position of the + * physical tail is affected by details such as SLRU segment boundaries, + * so that a result based on that is unpleasantly unstable. + */ +static double +asyncQueueUsage(void) +{ + int headPage = QUEUE_POS_PAGE(QUEUE_HEAD); + int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); + int occupied; + + occupied = headPage - tailPage; + + if (occupied == 0) + return (double) 0; /* fast exit for common case */ + + if (occupied < 0) + { + /* head has wrapped around, tail not yet */ + occupied += QUEUE_MAX_PAGE + 1; + } + + return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2); +} + +/* + * Check whether the queue is at least half full, and emit a warning if so. + * + * This is unlikely given the size of the queue, but possible. + * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL. + * + * Caller must hold exclusive NotifyQueueLock. + */ +static void +asyncQueueFillWarning(void) +{ + double fillDegree; + TimestampTz t; + + fillDegree = asyncQueueUsage(); + if (fillDegree < 0.5) + return; + + t = GetCurrentTimestamp(); + + if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn, + t, QUEUE_FULL_WARN_INTERVAL)) + { + QueuePosition min = QUEUE_HEAD; + int32 minPid = InvalidPid; + + for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) + { + Assert(QUEUE_BACKEND_PID(i) != InvalidPid); + min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); + if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i))) + minPid = QUEUE_BACKEND_PID(i); + } + + ereport(WARNING, + (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100), + (minPid != InvalidPid ? + errdetail("The server process with PID %d is among those with the oldest transactions.", minPid) + : 0), + (minPid != InvalidPid ? + errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.") + : 0))); + + asyncQueueControl->lastQueueFillWarn = t; + } +} + +/* + * Send signals to listening backends. + * + * Normally we signal only backends in our own database, since only those + * backends could be interested in notifies we send. However, if there's + * notify traffic in our database but no traffic in another database that + * does have listener(s), those listeners will fall further and further + * behind. Waken them anyway if they're far enough behind, so that they'll + * advance their queue position pointers, allowing the global tail to advance. + * + * Since we know the BackendId and the Pid the signaling is quite cheap. + * + * This is called during CommitTransaction(), so it's important for it + * to have very low probability of failure. + */ +static void +SignalBackends(void) +{ + int32 *pids; + BackendId *ids; + int count; + + /* + * Identify backends that we need to signal. We don't want to send + * signals while holding the NotifyQueueLock, so this loop just builds a + * list of target PIDs. + * + * XXX in principle these pallocs could fail, which would be bad. Maybe + * preallocate the arrays? They're not that large, though. + */ + pids = (int32 *) palloc(MaxBackends * sizeof(int32)); + ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId)); + count = 0; + + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) + { + int32 pid = QUEUE_BACKEND_PID(i); + QueuePosition pos; + + Assert(pid != InvalidPid); + pos = QUEUE_BACKEND_POS(i); + if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) + { + /* + * Always signal listeners in our own database, unless they're + * already caught up (unlikely, but possible). + */ + if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) + continue; + } + else + { + /* + * Listeners in other databases should be signaled only if they + * are far behind. + */ + if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), + QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY) + continue; + } + /* OK, need to signal this one */ + pids[count] = pid; + ids[count] = i; + count++; + } + LWLockRelease(NotifyQueueLock); + + /* Now send signals */ + for (int i = 0; i < count; i++) + { + int32 pid = pids[i]; + + /* + * If we are signaling our own process, no need to involve the kernel; + * just set the flag directly. + */ + if (pid == MyProcPid) + { + notifyInterruptPending = true; + continue; + } + + /* + * Note: assuming things aren't broken, a signal failure here could + * only occur if the target backend exited since we released + * NotifyQueueLock; which is unlikely but certainly possible. So we + * just log a low-level debug message if it happens. + */ + if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0) + elog(DEBUG3, "could not signal backend with PID %d: %m", pid); + } + + pfree(pids); + pfree(ids); +} + +/* + * AtAbort_Notify + * + * This is called at transaction abort. + * + * Gets rid of pending actions and outbound notifies that we would have + * executed if the transaction got committed. + */ +void +AtAbort_Notify(void) +{ + /* + * If we LISTEN but then roll back the transaction after PreCommit_Notify, + * we have registered as a listener but have not made any entry in + * listenChannels. In that case, deregister again. + */ + if (amRegisteredListener && listenChannels == NIL) + asyncQueueUnregister(); + + /* And clean up */ + ClearPendingActionsAndNotifies(); +} + +/* + * AtSubCommit_Notify() --- Take care of subtransaction commit. + * + * Reassign all items in the pending lists to the parent transaction. + */ +void +AtSubCommit_Notify(void) +{ + int my_level = GetCurrentTransactionNestLevel(); + + /* If there are actions at our nesting level, we must reparent them. */ + if (pendingActions != NULL && + pendingActions->nestingLevel >= my_level) + { + if (pendingActions->upper == NULL || + pendingActions->upper->nestingLevel < my_level - 1) + { + /* nothing to merge; give the whole thing to the parent */ + --pendingActions->nestingLevel; + } + else + { + ActionList *childPendingActions = pendingActions; + + pendingActions = pendingActions->upper; + + /* + * Mustn't try to eliminate duplicates here --- see queue_listen() + */ + pendingActions->actions = + list_concat(pendingActions->actions, + childPendingActions->actions); + pfree(childPendingActions); + } + } + + /* If there are notifies at our nesting level, we must reparent them. */ + if (pendingNotifies != NULL && + pendingNotifies->nestingLevel >= my_level) + { + Assert(pendingNotifies->nestingLevel == my_level); + + if (pendingNotifies->upper == NULL || + pendingNotifies->upper->nestingLevel < my_level - 1) + { + /* nothing to merge; give the whole thing to the parent */ + --pendingNotifies->nestingLevel; + } + else + { + /* + * Formerly, we didn't bother to eliminate duplicates here, but + * now we must, else we fall foul of "Assert(!found)", either here + * or during a later attempt to build the parent-level hashtable. + */ + NotificationList *childPendingNotifies = pendingNotifies; + ListCell *l; + + pendingNotifies = pendingNotifies->upper; + /* Insert all the subxact's events into parent, except for dups */ + foreach(l, childPendingNotifies->events) + { + Notification *childn = (Notification *) lfirst(l); + + if (!AsyncExistsPendingNotify(childn)) + AddEventToPendingNotifies(childn); + } + pfree(childPendingNotifies); + } + } +} + +/* + * AtSubAbort_Notify() --- Take care of subtransaction abort. + */ +void +AtSubAbort_Notify(void) +{ + int my_level = GetCurrentTransactionNestLevel(); + + /* + * All we have to do is pop the stack --- the actions/notifies made in + * this subxact are no longer interesting, and the space will be freed + * when CurTransactionContext is recycled. We still have to free the + * ActionList and NotificationList objects themselves, though, because + * those are allocated in TopTransactionContext. + * + * Note that there might be no entries at all, or no entries for the + * current subtransaction level, either because none were ever created, or + * because we reentered this routine due to trouble during subxact abort. + */ + while (pendingActions != NULL && + pendingActions->nestingLevel >= my_level) + { + ActionList *childPendingActions = pendingActions; + + pendingActions = pendingActions->upper; + pfree(childPendingActions); + } + + while (pendingNotifies != NULL && + pendingNotifies->nestingLevel >= my_level) + { + NotificationList *childPendingNotifies = pendingNotifies; + + pendingNotifies = pendingNotifies->upper; + pfree(childPendingNotifies); + } +} + +/* + * HandleNotifyInterrupt + * + * Signal handler portion of interrupt handling. Let the backend know + * that there's a pending notify interrupt. If we're currently reading + * from the client, this will interrupt the read and + * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt(). + */ +void +HandleNotifyInterrupt(void) +{ + /* + * Note: this is called by a SIGNAL HANDLER. You must be very wary what + * you do here. + */ + + /* signal that work needs to be done */ + notifyInterruptPending = true; + + /* make sure the event is processed in due course */ + SetLatch(MyLatch); +} + +/* + * ProcessNotifyInterrupt + * + * This is called if we see notifyInterruptPending set, just before + * transmitting ReadyForQuery at the end of a frontend command, and + * also if a notify signal occurs while reading from the frontend. + * HandleNotifyInterrupt() will cause the read to be interrupted + * via the process's latch, and this routine will get called. + * If we are truly idle (ie, *not* inside a transaction block), + * process the incoming notifies. + * + * If "flush" is true, force any frontend messages out immediately. + * This can be false when being called at the end of a frontend command, + * since we'll flush after sending ReadyForQuery. + */ +void +ProcessNotifyInterrupt(bool flush) +{ + if (IsTransactionOrTransactionBlock()) + return; /* not really idle */ + + /* Loop in case another signal arrives while sending messages */ + while (notifyInterruptPending) + ProcessIncomingNotify(flush); +} + + +/* + * Read all pending notifications from the queue, and deliver appropriate + * ones to my frontend. Stop when we reach queue head or an uncommitted + * notification. + */ +static void +asyncQueueReadAllNotifications(void) +{ + volatile QueuePosition pos; + QueuePosition head; + Snapshot snapshot; + + /* page_buffer must be adequately aligned, so use a union */ + union + { + char buf[QUEUE_PAGESIZE]; + AsyncQueueEntry align; + } page_buffer; + + /* Fetch current state */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + /* Assert checks that we have a valid state entry */ + Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId)); + pos = QUEUE_BACKEND_POS(MyBackendId); + head = QUEUE_HEAD; + LWLockRelease(NotifyQueueLock); + + if (QUEUE_POS_EQUAL(pos, head)) + { + /* Nothing to do, we have read all notifications already. */ + return; + } + + /*---------- + * Get snapshot we'll use to decide which xacts are still in progress. + * This is trickier than it might seem, because of race conditions. + * Consider the following example: + * + * Backend 1: Backend 2: + * + * transaction starts + * UPDATE foo SET ...; + * NOTIFY foo; + * commit starts + * queue the notify message + * transaction starts + * LISTEN foo; -- first LISTEN in session + * SELECT * FROM foo WHERE ...; + * commit to clog + * commit starts + * add backend 2 to array of listeners + * advance to queue head (this code) + * commit to clog + * + * Transaction 2's SELECT has not seen the UPDATE's effects, since that + * wasn't committed yet. Ideally we'd ensure that client 2 would + * eventually get transaction 1's notify message, but there's no way + * to do that; until we're in the listener array, there's no guarantee + * that the notify message doesn't get removed from the queue. + * + * Therefore the coding technique transaction 2 is using is unsafe: + * applications must commit a LISTEN before inspecting database state, + * if they want to ensure they will see notifications about subsequent + * changes to that state. + * + * What we do guarantee is that we'll see all notifications from + * transactions committing after the snapshot we take here. + * Exec_ListenPreCommit has already added us to the listener array, + * so no not-yet-committed messages can be removed from the queue + * before we see them. + *---------- + */ + snapshot = RegisterSnapshot(GetLatestSnapshot()); + + /* + * It is possible that we fail while trying to send a message to our + * frontend (for example, because of encoding conversion failure). If + * that happens it is critical that we not try to send the same message + * over and over again. Therefore, we place a PG_TRY block here that will + * forcibly advance our queue position before we lose control to an error. + * (We could alternatively retake NotifyQueueLock and move the position + * before handling each individual message, but that seems like too much + * lock traffic.) + */ + PG_TRY(); + { + bool reachedStop; + + do + { + int curpage = QUEUE_POS_PAGE(pos); + int curoffset = QUEUE_POS_OFFSET(pos); + int slotno; + int copysize; + + /* + * We copy the data from SLRU into a local buffer, so as to avoid + * holding the NotifySLRULock while we are examining the entries + * and possibly transmitting them to our frontend. Copy only the + * part of the page we will actually inspect. + */ + slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, + InvalidTransactionId); + if (curpage == QUEUE_POS_PAGE(head)) + { + /* we only want to read as far as head */ + copysize = QUEUE_POS_OFFSET(head) - curoffset; + if (copysize < 0) + copysize = 0; /* just for safety */ + } + else + { + /* fetch all the rest of the page */ + copysize = QUEUE_PAGESIZE - curoffset; + } + memcpy(page_buffer.buf + curoffset, + NotifyCtl->shared->page_buffer[slotno] + curoffset, + copysize); + /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ + LWLockRelease(NotifySLRULock); + + /* + * Process messages up to the stop position, end of page, or an + * uncommitted message. + * + * Our stop position is what we found to be the head's position + * when we entered this function. It might have changed already. + * But if it has, we will receive (or have already received and + * queued) another signal and come here again. + * + * We are not holding NotifyQueueLock here! The queue can only + * extend beyond the head pointer (see above) and we leave our + * backend's pointer where it is so nobody will truncate or + * rewrite pages under us. Especially we don't want to hold a lock + * while sending the notifications to the frontend. + */ + reachedStop = asyncQueueProcessPageEntries(&pos, head, + page_buffer.buf, + snapshot); + } while (!reachedStop); + } + PG_FINALLY(); + { + /* Update shared state */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + QUEUE_BACKEND_POS(MyBackendId) = pos; + LWLockRelease(NotifyQueueLock); + } + PG_END_TRY(); + + /* Done with snapshot */ + UnregisterSnapshot(snapshot); +} + +/* + * Fetch notifications from the shared queue, beginning at position current, + * and deliver relevant ones to my frontend. + * + * The current page must have been fetched into page_buffer from shared + * memory. (We could access the page right in shared memory, but that + * would imply holding the NotifySLRULock throughout this routine.) + * + * We stop if we reach the "stop" position, or reach a notification from an + * uncommitted transaction, or reach the end of the page. + * + * The function returns true once we have reached the stop position or an + * uncommitted notification, and false if we have finished with the page. + * In other words: once it returns true there is no need to look further. + * The QueuePosition *current is advanced past all processed messages. + */ +static bool +asyncQueueProcessPageEntries(volatile QueuePosition *current, + QueuePosition stop, + char *page_buffer, + Snapshot snapshot) +{ + bool reachedStop = false; + bool reachedEndOfPage; + AsyncQueueEntry *qe; + + do + { + QueuePosition thisentry = *current; + + if (QUEUE_POS_EQUAL(thisentry, stop)) + break; + + qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry)); + + /* + * Advance *current over this message, possibly to the next page. As + * noted in the comments for asyncQueueReadAllNotifications, we must + * do this before possibly failing while processing the message. + */ + reachedEndOfPage = asyncQueueAdvance(current, qe->length); + + /* Ignore messages destined for other databases */ + if (qe->dboid == MyDatabaseId) + { + if (XidInMVCCSnapshot(qe->xid, snapshot)) + { + /* + * The source transaction is still in progress, so we can't + * process this message yet. Break out of the loop, but first + * back up *current so we will reprocess the message next + * time. (Note: it is unlikely but not impossible for + * TransactionIdDidCommit to fail, so we can't really avoid + * this advance-then-back-up behavior when dealing with an + * uncommitted message.) + * + * Note that we must test XidInMVCCSnapshot before we test + * TransactionIdDidCommit, else we might return a message from + * a transaction that is not yet visible to snapshots; compare + * the comments at the head of heapam_visibility.c. + * + * Also, while our own xact won't be listed in the snapshot, + * we need not check for TransactionIdIsCurrentTransactionId + * because our transaction cannot (yet) have queued any + * messages. + */ + *current = thisentry; + reachedStop = true; + break; + } + else if (TransactionIdDidCommit(qe->xid)) + { + /* qe->data is the null-terminated channel name */ + char *channel = qe->data; + + if (IsListeningOn(channel)) + { + /* payload follows channel name */ + char *payload = qe->data + strlen(channel) + 1; + + NotifyMyFrontEnd(channel, payload, qe->srcPid); + } + } + else + { + /* + * The source transaction aborted or crashed, so we just + * ignore its notifications. + */ + } + } + + /* Loop back if we're not at end of page */ + } while (!reachedEndOfPage); + + if (QUEUE_POS_EQUAL(*current, stop)) + reachedStop = true; + + return reachedStop; +} + +/* + * Advance the shared queue tail variable to the minimum of all the + * per-backend tail pointers. Truncate pg_notify space if possible. + * + * This is (usually) called during CommitTransaction(), so it's important for + * it to have very low probability of failure. + */ +static void +asyncQueueAdvanceTail(void) +{ + QueuePosition min; + int oldtailpage; + int newtailpage; + int boundary; + + /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */ + LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE); + + /* + * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact + * (ie, exactly match at least one backend's queue position), so it must + * be updated atomically with the actual computation. Since v13, we could + * get away with not doing it like that, but it seems prudent to keep it + * so. + * + * Also, because incoming backends will scan forward from QUEUE_TAIL, that + * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is + * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest + * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL), + * there are pages we can truncate but haven't yet finished doing so. + * + * For concurrency's sake, we don't want to hold NotifyQueueLock while + * performing SimpleLruTruncate. This is OK because no backend will try + * to access the pages we are in the midst of truncating. + */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + min = QUEUE_HEAD; + for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) + { + Assert(QUEUE_BACKEND_PID(i) != InvalidPid); + min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); + } + QUEUE_TAIL = min; + oldtailpage = QUEUE_STOP_PAGE; + LWLockRelease(NotifyQueueLock); + + /* + * We can truncate something if the global tail advanced across an SLRU + * segment boundary. + * + * XXX it might be better to truncate only once every several segments, to + * reduce the number of directory scans. + */ + newtailpage = QUEUE_POS_PAGE(min); + boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT); + if (asyncQueuePagePrecedes(oldtailpage, boundary)) + { + /* + * SimpleLruTruncate() will ask for NotifySLRULock but will also + * release the lock again. + */ + SimpleLruTruncate(NotifyCtl, newtailpage); + + /* + * Update QUEUE_STOP_PAGE. This changes asyncQueueIsFull()'s verdict + * for the segment immediately prior to the old tail, allowing fresh + * data into that segment. + */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + QUEUE_STOP_PAGE = newtailpage; + LWLockRelease(NotifyQueueLock); + } + + LWLockRelease(NotifyQueueTailLock); +} + +/* + * ProcessIncomingNotify + * + * Scan the queue for arriving notifications and report them to the front + * end. The notifications might be from other sessions, or our own; + * there's no need to distinguish here. + * + * If "flush" is true, force any frontend messages out immediately. + * + * NOTE: since we are outside any transaction, we must create our own. + */ +static void +ProcessIncomingNotify(bool flush) +{ + /* We *must* reset the flag */ + notifyInterruptPending = false; + + /* Do nothing else if we aren't actively listening */ + if (listenChannels == NIL) + return; + + if (Trace_notify) + elog(DEBUG1, "ProcessIncomingNotify"); + + set_ps_display("notify interrupt"); + + /* + * We must run asyncQueueReadAllNotifications inside a transaction, else + * bad things happen if it gets an error. + */ + StartTransactionCommand(); + + asyncQueueReadAllNotifications(); + + CommitTransactionCommand(); + + /* + * If this isn't an end-of-command case, we must flush the notify messages + * to ensure frontend gets them promptly. + */ + if (flush) + pq_flush(); + + set_ps_display("idle"); + + if (Trace_notify) + elog(DEBUG1, "ProcessIncomingNotify: done"); +} + +/* + * Send NOTIFY message to my front end. + */ +void +NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) +{ + if (whereToSendOutput == DestRemote) + { + StringInfoData buf; + + pq_beginmessage(&buf, 'A'); + pq_sendint32(&buf, srcPid); + pq_sendstring(&buf, channel); + pq_sendstring(&buf, payload); + pq_endmessage(&buf); + + /* + * NOTE: we do not do pq_flush() here. Some level of caller will + * handle it later, allowing this message to be combined into a packet + * with other ones. + */ + } + else + elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload); +} + +/* Does pendingNotifies include a match for the given event? */ +static bool +AsyncExistsPendingNotify(Notification *n) +{ + if (pendingNotifies == NULL) + return false; + + if (pendingNotifies->hashtab != NULL) + { + /* Use the hash table to probe for a match */ + if (hash_search(pendingNotifies->hashtab, + &n, + HASH_FIND, + NULL)) + return true; + } + else + { + /* Must scan the event list */ + ListCell *l; + + foreach(l, pendingNotifies->events) + { + Notification *oldn = (Notification *) lfirst(l); + + if (n->channel_len == oldn->channel_len && + n->payload_len == oldn->payload_len && + memcmp(n->data, oldn->data, + n->channel_len + n->payload_len + 2) == 0) + return true; + } + } + + return false; +} + +/* + * Add a notification event to a pre-existing pendingNotifies list. + * + * Because pendingNotifies->events is already nonempty, this works + * correctly no matter what CurrentMemoryContext is. + */ +static void +AddEventToPendingNotifies(Notification *n) +{ + Assert(pendingNotifies->events != NIL); + + /* Create the hash table if it's time to */ + if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES && + pendingNotifies->hashtab == NULL) + { + HASHCTL hash_ctl; + ListCell *l; + + /* Create the hash table */ + hash_ctl.keysize = sizeof(Notification *); + hash_ctl.entrysize = sizeof(NotificationHash); + hash_ctl.hash = notification_hash; + hash_ctl.match = notification_match; + hash_ctl.hcxt = CurTransactionContext; + pendingNotifies->hashtab = + hash_create("Pending Notifies", + 256L, + &hash_ctl, + HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT); + + /* Insert all the already-existing events */ + foreach(l, pendingNotifies->events) + { + Notification *oldn = (Notification *) lfirst(l); + NotificationHash *hentry; + bool found; + + hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab, + &oldn, + HASH_ENTER, + &found); + Assert(!found); + hentry->event = oldn; + } + } + + /* Add new event to the list, in order */ + pendingNotifies->events = lappend(pendingNotifies->events, n); + + /* Add event to the hash table if needed */ + if (pendingNotifies->hashtab != NULL) + { + NotificationHash *hentry; + bool found; + + hentry = (NotificationHash *) hash_search(pendingNotifies->hashtab, + &n, + HASH_ENTER, + &found); + Assert(!found); + hentry->event = n; + } +} + +/* + * notification_hash: hash function for notification hash table + * + * The hash "keys" are pointers to Notification structs. + */ +static uint32 +notification_hash(const void *key, Size keysize) +{ + const Notification *k = *(const Notification *const *) key; + + Assert(keysize == sizeof(Notification *)); + /* We don't bother to include the payload's trailing null in the hash */ + return DatumGetUInt32(hash_any((const unsigned char *) k->data, + k->channel_len + k->payload_len + 1)); +} + +/* + * notification_match: match function to use with notification_hash + */ +static int +notification_match(const void *key1, const void *key2, Size keysize) +{ + const Notification *k1 = *(const Notification *const *) key1; + const Notification *k2 = *(const Notification *const *) key2; + + Assert(keysize == sizeof(Notification *)); + if (k1->channel_len == k2->channel_len && + k1->payload_len == k2->payload_len && + memcmp(k1->data, k2->data, + k1->channel_len + k1->payload_len + 2) == 0) + return 0; /* equal */ + return 1; /* not equal */ +} + +/* Clear the pendingActions and pendingNotifies lists. */ +static void +ClearPendingActionsAndNotifies(void) +{ + /* + * Everything's allocated in either TopTransactionContext or the context + * for the subtransaction to which it corresponds. So, there's nothing to + * do here except reset the pointers; the space will be reclaimed when the + * contexts are deleted. + */ + pendingActions = NULL; + pendingNotifies = NULL; +} |