/*------------------------------------------------------------------------- * * message.c * Generic logical messages. * * Copyright (c) 2013-2021, PostgreSQL Global Development Group * * IDENTIFICATION * src/backend/replication/logical/message.c * * NOTES * * Generic logical messages allow XLOG logging of arbitrary binary blobs that * get passed to the logical decoding plugin. In normal XLOG processing they * are same as NOOP. * * These messages can be either transactional or non-transactional. * Transactional messages are part of current transaction and will be sent to * decoding plugin using in a same way as DML operations. * Non-transactional messages are sent to the plugin at the time when the * logical decoding reads them from XLOG. This also means that transactional * messages won't be delivered if the transaction was rolled back but the * non-transactional one will always be delivered. * * Every message carries prefix to avoid conflicts between different decoding * plugins. The plugin authors must take extra care to use unique prefix, * good options seems to be for example to use the name of the extension. * * --------------------------------------------------------------------------- */ #include "postgres.h" #include "access/xact.h" #include "miscadmin.h" #include "nodes/execnodes.h" #include "replication/logical.h" #include "replication/message.h" #include "utils/memutils.h" /* * Write logical decoding message into XLog. */ XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, size_t size, bool transactional) { xl_logical_message xlrec; /* * Force xid to be allocated if we're emitting a transactional message. */ if (transactional) { Assert(IsTransactionState()); GetCurrentTransactionId(); } xlrec.dbId = MyDatabaseId; xlrec.transactional = transactional; /* trailing zero is critical; see logicalmsg_desc */ xlrec.prefix_size = strlen(prefix) + 1; xlrec.message_size = size; XLogBeginInsert(); XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage); XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size); XLogRegisterData(unconstify(char *, message), size); /* allow origin filtering */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); } /* * Redo is basically just noop for logical decoding messages. */ void logicalmsg_redo(XLogReaderState *record) { uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; if (info != XLOG_LOGICAL_MESSAGE) elog(PANIC, "logicalmsg_redo: unknown op code %u", info); /* This is only interesting for logical decoding, see decode.c. */ }