diff options
Diffstat (limited to 'src/backend/replication/logical/message.c')
-rw-r--r-- | src/backend/replication/logical/message.c | 89 |
1 files changed, 89 insertions, 0 deletions
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c new file mode 100644 index 0000000..c5de14a --- /dev/null +++ b/src/backend/replication/logical/message.c @@ -0,0 +1,89 @@ +/*------------------------------------------------------------------------- + * + * message.c + * Generic logical messages. + * + * Copyright (c) 2013-2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/message.c + * + * NOTES + * + * Generic logical messages allow XLOG logging of arbitrary binary blobs that + * get passed to the logical decoding plugin. In normal XLOG processing they + * are same as NOOP. + * + * These messages can be either transactional or non-transactional. + * Transactional messages are part of current transaction and will be sent to + * decoding plugin using in a same way as DML operations. + * Non-transactional messages are sent to the plugin at the time when the + * logical decoding reads them from XLOG. This also means that transactional + * messages won't be delivered if the transaction was rolled back but the + * non-transactional one will always be delivered. + * + * Every message carries prefix to avoid conflicts between different decoding + * plugins. The plugin authors must take extra care to use unique prefix, + * good options seems to be for example to use the name of the extension. + * + * --------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "access/xloginsert.h" +#include "miscadmin.h" +#include "nodes/execnodes.h" +#include "replication/logical.h" +#include "replication/message.h" +#include "utils/memutils.h" + +/* + * Write logical decoding message into XLog. + */ +XLogRecPtr +LogLogicalMessage(const char *prefix, const char *message, size_t size, + bool transactional) +{ + xl_logical_message xlrec; + + /* + * Force xid to be allocated if we're emitting a transactional message. + */ + if (transactional) + { + Assert(IsTransactionState()); + GetCurrentTransactionId(); + } + + xlrec.dbId = MyDatabaseId; + xlrec.transactional = transactional; + /* trailing zero is critical; see logicalmsg_desc */ + xlrec.prefix_size = strlen(prefix) + 1; + xlrec.message_size = size; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage); + XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size); + XLogRegisterData(unconstify(char *, message), size); + + /* allow origin filtering */ + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + + return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); +} + +/* + * Redo is basically just noop for logical decoding messages. + */ +void +logicalmsg_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info != XLOG_LOGICAL_MESSAGE) + elog(PANIC, "logicalmsg_redo: unknown op code %u", info); + + /* This is only interesting for logical decoding, see decode.c. */ +} |