1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
/*-------------------------------------------------------------------------
*
* message.c
* Generic logical messages.
*
* Copyright (c) 2013-2023, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/logical/message.c
*
* NOTES
*
* Generic logical messages allow XLOG logging of arbitrary binary blobs that
* get passed to the logical decoding plugin. In normal XLOG processing they
* are same as NOOP.
*
* These messages can be either transactional or non-transactional.
* Transactional messages are part of current transaction and will be sent to
* decoding plugin using in a same way as DML operations.
* Non-transactional messages are sent to the plugin at the time when the
* logical decoding reads them from XLOG. This also means that transactional
* messages won't be delivered if the transaction was rolled back but the
* non-transactional one will always be delivered.
*
* Every message carries prefix to avoid conflicts between different decoding
* plugins. The plugin authors must take extra care to use unique prefix,
* good options seems to be for example to use the name of the extension.
*
* ---------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/xact.h"
#include "access/xloginsert.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
#include "replication/logical.h"
#include "replication/message.h"
#include "utils/memutils.h"
/*
* Write logical decoding message into XLog.
*/
XLogRecPtr
LogLogicalMessage(const char *prefix, const char *message, size_t size,
bool transactional)
{
xl_logical_message xlrec;
/*
* Force xid to be allocated if we're emitting a transactional message.
*/
if (transactional)
{
Assert(IsTransactionState());
GetCurrentTransactionId();
}
xlrec.dbId = MyDatabaseId;
xlrec.transactional = transactional;
/* trailing zero is critical; see logicalmsg_desc */
xlrec.prefix_size = strlen(prefix) + 1;
xlrec.message_size = size;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
XLogRegisterData(unconstify(char *, prefix), xlrec.prefix_size);
XLogRegisterData(unconstify(char *, message), size);
/* allow origin filtering */
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
}
/*
* Redo is basically just noop for logical decoding messages.
*/
void
logicalmsg_redo(XLogReaderState *record)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
if (info != XLOG_LOGICAL_MESSAGE)
elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
/* This is only interesting for logical decoding, see decode.c. */
}
|