summaryrefslogtreecommitdiffstats
path: root/src/backend/replication/logical/message.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/message.c')
-rw-r--r--src/backend/replication/logical/message.c89
1 files changed, 89 insertions, 0 deletions
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c
new file mode 100644
index 0000000..1c34912
--- /dev/null
+++ b/src/backend/replication/logical/message.c
@@ -0,0 +1,89 @@
+/*-------------------------------------------------------------------------
+ *
+ * message.c
+ * Generic logical messages.
+ *
+ * Copyright (c) 2013-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/message.c
+ *
+ * NOTES
+ *
+ * Generic logical messages allow XLOG logging of arbitrary binary blobs that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * These messages can be either transactional or non-transactional.
+ * Transactional messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ * Non-transactional messages are sent to the plugin at the time when the
+ * logical decoding reads them from XLOG. This also means that transactional
+ * messages won't be delivered if the transaction was rolled back but the
+ * non-transactional one will always be delivered.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The plugin authors must take extra care to use unique prefix,
+ * good options seems to be for example to use the name of the extension.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/xloginsert.h"
+#include "miscadmin.h"
+#include "nodes/execnodes.h"
+#include "replication/logical.h"
+#include "replication/message.h"
+#include "utils/memutils.h"
+
+/*
+ * Write logical decoding message into XLog.
+ */
+XLogRecPtr
+LogLogicalMessage(const char *prefix, const char *message, size_t size,
+ bool transactional)
+{
+ xl_logical_message xlrec;
+
+ /*
+ * Force xid to be allocated if we're emitting a transactional message.
+ */
+ if (transactional)
+ {
+ Assert(IsTransactionState());
+ GetCurrentTransactionId();
+ }
+
+ xlrec.dbId = MyDatabaseId;
+ xlrec.transactional = transactional;
+ /* trailing zero is critical; see logicalmsg_desc */
+ xlrec.prefix_size = strlen(prefix) + 1;
+ xlrec.message_size = size;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
+ XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
+ XLogRegisterData(unconstify(char *, message), size);
+
+ /* allow origin filtering */
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+ return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+}
+
+/*
+ * Redo is basically just noop for logical decoding messages.
+ */
+void
+logicalmsg_redo(XLogReaderState *record)
+{
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ if (info != XLOG_LOGICAL_MESSAGE)
+ elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
+
+ /* This is only interesting for logical decoding, see decode.c. */
+}