summaryrefslogtreecommitdiffstats
path: root/src/backend/replication/logical/proto.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/proto.c')
-rw-r--r--src/backend/replication/logical/proto.c900
1 files changed, 900 insertions, 0 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
new file mode 100644
index 0000000..1cf59e0
--- /dev/null
+++ b/src/backend/replication/logical/proto.c
@@ -0,0 +1,900 @@
+/*-------------------------------------------------------------------------
+ *
+ * proto.c
+ * logical replication protocol functions
+ *
+ * Copyright (c) 2015-2021, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/proto.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/sysattr.h"
+#include "catalog/pg_namespace.h"
+#include "catalog/pg_type.h"
+#include "libpq/pqformat.h"
+#include "replication/logicalproto.h"
+#include "utils/lsyscache.h"
+#include "utils/syscache.h"
+
+/*
+ * Protocol message flags.
+ */
+#define LOGICALREP_IS_REPLICA_IDENTITY 1
+
+#define MESSAGE_TRANSACTIONAL (1<<0)
+#define TRUNCATE_CASCADE (1<<0)
+#define TRUNCATE_RESTART_SEQS (1<<1)
+
+static void logicalrep_write_attrs(StringInfo out, Relation rel);
+static void logicalrep_write_tuple(StringInfo out, Relation rel,
+ HeapTuple tuple, bool binary);
+
+static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
+static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
+
+static void logicalrep_write_namespace(StringInfo out, Oid nspid);
+static const char *logicalrep_read_namespace(StringInfo in);
+
+/*
+ * Write BEGIN to the output stream.
+ */
+void
+logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
+{
+ pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
+
+ /* fixed fields */
+ pq_sendint64(out, txn->final_lsn);
+ pq_sendint64(out, txn->commit_time);
+ pq_sendint32(out, txn->xid);
+}
+
+/*
+ * Read transaction BEGIN from the stream.
+ */
+void
+logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data)
+{
+ /* read fields */
+ begin_data->final_lsn = pq_getmsgint64(in);
+ if (begin_data->final_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "final_lsn not set in begin message");
+ begin_data->committime = pq_getmsgint64(in);
+ begin_data->xid = pq_getmsgint(in, 4);
+}
+
+
+/*
+ * Write COMMIT to the output stream.
+ */
+void
+logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
+
+ /* send the flags field (unused for now) */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, commit_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->commit_time);
+}
+
+/*
+ * Read transaction COMMIT from the stream.
+ */
+void
+logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data)
+{
+ /* read flags (unused for now) */
+ uint8 flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in commit message", flags);
+
+ /* read fields */
+ commit_data->commit_lsn = pq_getmsgint64(in);
+ commit_data->end_lsn = pq_getmsgint64(in);
+ commit_data->committime = pq_getmsgint64(in);
+}
+
+/*
+ * Write ORIGIN to the output stream.
+ */
+void
+logicalrep_write_origin(StringInfo out, const char *origin,
+ XLogRecPtr origin_lsn)
+{
+ pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
+
+ /* fixed fields */
+ pq_sendint64(out, origin_lsn);
+
+ /* origin string */
+ pq_sendstring(out, origin);
+}
+
+/*
+ * Read ORIGIN from the output stream.
+ */
+char *
+logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
+{
+ /* fixed fields */
+ *origin_lsn = pq_getmsgint64(in);
+
+ /* return origin */
+ return pstrdup(pq_getmsgstring(in));
+}
+
+/*
+ * Write INSERT to the output stream.
+ */
+void
+logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
+ HeapTuple newtuple, bool binary)
+{
+ pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ /* use Oid as relation identifier */
+ pq_sendint32(out, RelationGetRelid(rel));
+
+ pq_sendbyte(out, 'N'); /* new tuple follows */
+ logicalrep_write_tuple(out, rel, newtuple, binary);
+}
+
+/*
+ * Read INSERT from stream.
+ *
+ * Fills the new tuple.
+ */
+LogicalRepRelId
+logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
+{
+ char action;
+ LogicalRepRelId relid;
+
+ /* read the relation id */
+ relid = pq_getmsgint(in, 4);
+
+ action = pq_getmsgbyte(in);
+ if (action != 'N')
+ elog(ERROR, "expected new tuple but got %d",
+ action);
+
+ logicalrep_read_tuple(in, newtup);
+
+ return relid;
+}
+
+/*
+ * Write UPDATE to the output stream.
+ */
+void
+logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
+ HeapTuple oldtuple, HeapTuple newtuple, bool binary)
+{
+ pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
+
+ Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+ rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+ rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ /* use Oid as relation identifier */
+ pq_sendint32(out, RelationGetRelid(rel));
+
+ if (oldtuple != NULL)
+ {
+ if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ pq_sendbyte(out, 'O'); /* old tuple follows */
+ else
+ pq_sendbyte(out, 'K'); /* old key follows */
+ logicalrep_write_tuple(out, rel, oldtuple, binary);
+ }
+
+ pq_sendbyte(out, 'N'); /* new tuple follows */
+ logicalrep_write_tuple(out, rel, newtuple, binary);
+}
+
+/*
+ * Read UPDATE from stream.
+ */
+LogicalRepRelId
+logicalrep_read_update(StringInfo in, bool *has_oldtuple,
+ LogicalRepTupleData *oldtup,
+ LogicalRepTupleData *newtup)
+{
+ char action;
+ LogicalRepRelId relid;
+
+ /* read the relation id */
+ relid = pq_getmsgint(in, 4);
+
+ /* read and verify action */
+ action = pq_getmsgbyte(in);
+ if (action != 'K' && action != 'O' && action != 'N')
+ elog(ERROR, "expected action 'N', 'O' or 'K', got %c",
+ action);
+
+ /* check for old tuple */
+ if (action == 'K' || action == 'O')
+ {
+ logicalrep_read_tuple(in, oldtup);
+ *has_oldtuple = true;
+
+ action = pq_getmsgbyte(in);
+ }
+ else
+ *has_oldtuple = false;
+
+ /* check for new tuple */
+ if (action != 'N')
+ elog(ERROR, "expected action 'N', got %c",
+ action);
+
+ logicalrep_read_tuple(in, newtup);
+
+ return relid;
+}
+
+/*
+ * Write DELETE to the output stream.
+ */
+void
+logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
+ HeapTuple oldtuple, bool binary)
+{
+ Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+ rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+ rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ /* use Oid as relation identifier */
+ pq_sendint32(out, RelationGetRelid(rel));
+
+ if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+ pq_sendbyte(out, 'O'); /* old tuple follows */
+ else
+ pq_sendbyte(out, 'K'); /* old key follows */
+
+ logicalrep_write_tuple(out, rel, oldtuple, binary);
+}
+
+/*
+ * Read DELETE from stream.
+ *
+ * Fills the old tuple.
+ */
+LogicalRepRelId
+logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
+{
+ char action;
+ LogicalRepRelId relid;
+
+ /* read the relation id */
+ relid = pq_getmsgint(in, 4);
+
+ /* read and verify action */
+ action = pq_getmsgbyte(in);
+ if (action != 'K' && action != 'O')
+ elog(ERROR, "expected action 'O' or 'K', got %c", action);
+
+ logicalrep_read_tuple(in, oldtup);
+
+ return relid;
+}
+
+/*
+ * Write TRUNCATE to the output stream.
+ */
+void
+logicalrep_write_truncate(StringInfo out,
+ TransactionId xid,
+ int nrelids,
+ Oid relids[],
+ bool cascade, bool restart_seqs)
+{
+ int i;
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ pq_sendint32(out, nrelids);
+
+ /* encode and send truncate flags */
+ if (cascade)
+ flags |= TRUNCATE_CASCADE;
+ if (restart_seqs)
+ flags |= TRUNCATE_RESTART_SEQS;
+ pq_sendint8(out, flags);
+
+ for (i = 0; i < nrelids; i++)
+ pq_sendint32(out, relids[i]);
+}
+
+/*
+ * Read TRUNCATE from stream.
+ */
+List *
+logicalrep_read_truncate(StringInfo in,
+ bool *cascade, bool *restart_seqs)
+{
+ int i;
+ int nrelids;
+ List *relids = NIL;
+ uint8 flags;
+
+ nrelids = pq_getmsgint(in, 4);
+
+ /* read and decode truncate flags */
+ flags = pq_getmsgint(in, 1);
+ *cascade = (flags & TRUNCATE_CASCADE) > 0;
+ *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0;
+
+ for (i = 0; i < nrelids; i++)
+ relids = lappend_oid(relids, pq_getmsgint(in, 4));
+
+ return relids;
+}
+
+/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+ bool transactional, const char *prefix, Size sz,
+ const char *message)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+
+ /* encode and send message flags */
+ if (transactional)
+ flags |= MESSAGE_TRANSACTIONAL;
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ pq_sendint8(out, flags);
+ pq_sendint64(out, lsn);
+ pq_sendstring(out, prefix);
+ pq_sendint32(out, sz);
+ pq_sendbytes(out, message, sz);
+}
+
+/*
+ * Write relation description to the output stream.
+ */
+void
+logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
+{
+ char *relname;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ /* use Oid as relation identifier */
+ pq_sendint32(out, RelationGetRelid(rel));
+
+ /* send qualified relation name */
+ logicalrep_write_namespace(out, RelationGetNamespace(rel));
+ relname = RelationGetRelationName(rel);
+ pq_sendstring(out, relname);
+
+ /* send replica identity */
+ pq_sendbyte(out, rel->rd_rel->relreplident);
+
+ /* send the attribute info */
+ logicalrep_write_attrs(out, rel);
+}
+
+/*
+ * Read the relation info from stream and return as LogicalRepRelation.
+ */
+LogicalRepRelation *
+logicalrep_read_rel(StringInfo in)
+{
+ LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation));
+
+ rel->remoteid = pq_getmsgint(in, 4);
+
+ /* Read relation name from stream */
+ rel->nspname = pstrdup(logicalrep_read_namespace(in));
+ rel->relname = pstrdup(pq_getmsgstring(in));
+
+ /* Read the replica identity. */
+ rel->replident = pq_getmsgbyte(in);
+
+ /* Get attribute description */
+ logicalrep_read_attrs(in, rel);
+
+ return rel;
+}
+
+/*
+ * Write type info to the output stream.
+ *
+ * This function will always write base type info.
+ */
+void
+logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
+{
+ Oid basetypoid = getBaseType(typoid);
+ HeapTuple tup;
+ Form_pg_type typtup;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
+
+ /* transaction ID (if not valid, we're not streaming) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
+ tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for type %u", basetypoid);
+ typtup = (Form_pg_type) GETSTRUCT(tup);
+
+ /* use Oid as relation identifier */
+ pq_sendint32(out, typoid);
+
+ /* send qualified type name */
+ logicalrep_write_namespace(out, typtup->typnamespace);
+ pq_sendstring(out, NameStr(typtup->typname));
+
+ ReleaseSysCache(tup);
+}
+
+/*
+ * Read type info from the output stream.
+ */
+void
+logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
+{
+ ltyp->remoteid = pq_getmsgint(in, 4);
+
+ /* Read type name from stream */
+ ltyp->nspname = pstrdup(logicalrep_read_namespace(in));
+ ltyp->typname = pstrdup(pq_getmsgstring(in));
+}
+
+/*
+ * Write a tuple to the outputstream, in the most efficient format possible.
+ */
+static void
+logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary)
+{
+ TupleDesc desc;
+ Datum values[MaxTupleAttributeNumber];
+ bool isnull[MaxTupleAttributeNumber];
+ int i;
+ uint16 nliveatts = 0;
+
+ desc = RelationGetDescr(rel);
+
+ for (i = 0; i < desc->natts; i++)
+ {
+ if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
+ continue;
+ nliveatts++;
+ }
+ pq_sendint16(out, nliveatts);
+
+ /* try to allocate enough memory from the get-go */
+ enlargeStringInfo(out, tuple->t_len +
+ nliveatts * (1 + 4));
+
+ heap_deform_tuple(tuple, desc, values, isnull);
+
+ /* Write the values */
+ for (i = 0; i < desc->natts; i++)
+ {
+ HeapTuple typtup;
+ Form_pg_type typclass;
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+
+ if (att->attisdropped || att->attgenerated)
+ continue;
+
+ if (isnull[i])
+ {
+ pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
+ continue;
+ }
+
+ if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
+ {
+ /*
+ * Unchanged toasted datum. (Note that we don't promise to detect
+ * unchanged data in general; this is just a cheap check to avoid
+ * sending large values unnecessarily.)
+ */
+ pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED);
+ continue;
+ }
+
+ typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid));
+ if (!HeapTupleIsValid(typtup))
+ elog(ERROR, "cache lookup failed for type %u", att->atttypid);
+ typclass = (Form_pg_type) GETSTRUCT(typtup);
+
+ /*
+ * Send in binary if requested and type has suitable send function.
+ */
+ if (binary && OidIsValid(typclass->typsend))
+ {
+ bytea *outputbytes;
+ int len;
+
+ pq_sendbyte(out, LOGICALREP_COLUMN_BINARY);
+ outputbytes = OidSendFunctionCall(typclass->typsend, values[i]);
+ len = VARSIZE(outputbytes) - VARHDRSZ;
+ pq_sendint(out, len, 4); /* length */
+ pq_sendbytes(out, VARDATA(outputbytes), len); /* data */
+ pfree(outputbytes);
+ }
+ else
+ {
+ char *outputstr;
+
+ pq_sendbyte(out, LOGICALREP_COLUMN_TEXT);
+ outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]);
+ pq_sendcountedtext(out, outputstr, strlen(outputstr), false);
+ pfree(outputstr);
+ }
+
+ ReleaseSysCache(typtup);
+ }
+}
+
+/*
+ * Read tuple in logical replication format from stream.
+ */
+static void
+logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
+{
+ int i;
+ int natts;
+
+ /* Get number of attributes */
+ natts = pq_getmsgint(in, 2);
+
+ /* Allocate space for per-column values; zero out unused StringInfoDatas */
+ tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData));
+ tuple->colstatus = (char *) palloc(natts * sizeof(char));
+ tuple->ncols = natts;
+
+ /* Read the data */
+ for (i = 0; i < natts; i++)
+ {
+ char kind;
+ int len;
+ StringInfo value = &tuple->colvalues[i];
+
+ kind = pq_getmsgbyte(in);
+ tuple->colstatus[i] = kind;
+
+ switch (kind)
+ {
+ case LOGICALREP_COLUMN_NULL:
+ /* nothing more to do */
+ break;
+ case LOGICALREP_COLUMN_UNCHANGED:
+ /* we don't receive the value of an unchanged column */
+ break;
+ case LOGICALREP_COLUMN_TEXT:
+ len = pq_getmsgint(in, 4); /* read length */
+
+ /* and data */
+ value->data = palloc(len + 1);
+ pq_copymsgbytes(in, value->data, len);
+ value->data[len] = '\0';
+ /* make StringInfo fully valid */
+ value->len = len;
+ value->cursor = 0;
+ value->maxlen = len;
+ break;
+ case LOGICALREP_COLUMN_BINARY:
+ len = pq_getmsgint(in, 4); /* read length */
+
+ /* and data */
+ value->data = palloc(len + 1);
+ pq_copymsgbytes(in, value->data, len);
+ /* not strictly necessary but per StringInfo practice */
+ value->data[len] = '\0';
+ /* make StringInfo fully valid */
+ value->len = len;
+ value->cursor = 0;
+ value->maxlen = len;
+ break;
+ default:
+ elog(ERROR, "unrecognized data representation type '%c'", kind);
+ }
+ }
+}
+
+/*
+ * Write relation attribute metadata to the stream.
+ */
+static void
+logicalrep_write_attrs(StringInfo out, Relation rel)
+{
+ TupleDesc desc;
+ int i;
+ uint16 nliveatts = 0;
+ Bitmapset *idattrs = NULL;
+ bool replidentfull;
+
+ desc = RelationGetDescr(rel);
+
+ /* send number of live attributes */
+ for (i = 0; i < desc->natts; i++)
+ {
+ if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated)
+ continue;
+ nliveatts++;
+ }
+ pq_sendint16(out, nliveatts);
+
+ /* fetch bitmap of REPLICATION IDENTITY attributes */
+ replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL);
+ if (!replidentfull)
+ idattrs = RelationGetIdentityKeyBitmap(rel);
+
+ /* send the attributes */
+ for (i = 0; i < desc->natts; i++)
+ {
+ Form_pg_attribute att = TupleDescAttr(desc, i);
+ uint8 flags = 0;
+
+ if (att->attisdropped || att->attgenerated)
+ continue;
+
+ /* REPLICA IDENTITY FULL means all columns are sent as part of key. */
+ if (replidentfull ||
+ bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+ idattrs))
+ flags |= LOGICALREP_IS_REPLICA_IDENTITY;
+
+ pq_sendbyte(out, flags);
+
+ /* attribute name */
+ pq_sendstring(out, NameStr(att->attname));
+
+ /* attribute type id */
+ pq_sendint32(out, (int) att->atttypid);
+
+ /* attribute mode */
+ pq_sendint32(out, att->atttypmod);
+ }
+
+ bms_free(idattrs);
+}
+
+/*
+ * Read relation attribute metadata from the stream.
+ */
+static void
+logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel)
+{
+ int i;
+ int natts;
+ char **attnames;
+ Oid *atttyps;
+ Bitmapset *attkeys = NULL;
+
+ natts = pq_getmsgint(in, 2);
+ attnames = palloc(natts * sizeof(char *));
+ atttyps = palloc(natts * sizeof(Oid));
+
+ /* read the attributes */
+ for (i = 0; i < natts; i++)
+ {
+ uint8 flags;
+
+ /* Check for replica identity column */
+ flags = pq_getmsgbyte(in);
+ if (flags & LOGICALREP_IS_REPLICA_IDENTITY)
+ attkeys = bms_add_member(attkeys, i);
+
+ /* attribute name */
+ attnames[i] = pstrdup(pq_getmsgstring(in));
+
+ /* attribute type id */
+ atttyps[i] = (Oid) pq_getmsgint(in, 4);
+
+ /* we ignore attribute mode for now */
+ (void) pq_getmsgint(in, 4);
+ }
+
+ rel->attnames = attnames;
+ rel->atttyps = atttyps;
+ rel->attkeys = attkeys;
+ rel->natts = natts;
+}
+
+/*
+ * Write the namespace name or empty string for pg_catalog (to save space).
+ */
+static void
+logicalrep_write_namespace(StringInfo out, Oid nspid)
+{
+ if (nspid == PG_CATALOG_NAMESPACE)
+ pq_sendbyte(out, '\0');
+ else
+ {
+ char *nspname = get_namespace_name(nspid);
+
+ if (nspname == NULL)
+ elog(ERROR, "cache lookup failed for namespace %u",
+ nspid);
+
+ pq_sendstring(out, nspname);
+ }
+}
+
+/*
+ * Read the namespace name while treating empty string as pg_catalog.
+ */
+static const char *
+logicalrep_read_namespace(StringInfo in)
+{
+ const char *nspname = pq_getmsgstring(in);
+
+ if (nspname[0] == '\0')
+ nspname = "pg_catalog";
+
+ return nspname;
+}
+
+/*
+ * Write the information for the start stream message to the output stream.
+ */
+void
+logicalrep_write_stream_start(StringInfo out,
+ TransactionId xid, bool first_segment)
+{
+ pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
+
+ Assert(TransactionIdIsValid(xid));
+
+ /* transaction ID (we're starting to stream, so must be valid) */
+ pq_sendint32(out, xid);
+
+ /* 1 if this is the first streaming segment for this xid */
+ pq_sendbyte(out, first_segment ? 1 : 0);
+}
+
+/*
+ * Read the information about the start stream message from output stream.
+ */
+TransactionId
+logicalrep_read_stream_start(StringInfo in, bool *first_segment)
+{
+ TransactionId xid;
+
+ Assert(first_segment);
+
+ xid = pq_getmsgint(in, 4);
+ *first_segment = (pq_getmsgbyte(in) == 1);
+
+ return xid;
+}
+
+/*
+ * Write the stop stream message to the output stream.
+ */
+void
+logicalrep_write_stream_stop(StringInfo out)
+{
+ pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_END);
+}
+
+/*
+ * Write STREAM COMMIT to the output stream.
+ */
+void
+logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
+
+ Assert(TransactionIdIsValid(txn->xid));
+
+ /* transaction ID */
+ pq_sendint32(out, txn->xid);
+
+ /* send the flags field (unused for now) */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, commit_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->commit_time);
+}
+
+/*
+ * Read STREAM COMMIT from the output stream.
+ */
+TransactionId
+logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
+{
+ TransactionId xid;
+ uint8 flags;
+
+ xid = pq_getmsgint(in, 4);
+
+ /* read flags (unused for now) */
+ flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in commit message", flags);
+
+ /* read fields */
+ commit_data->commit_lsn = pq_getmsgint64(in);
+ commit_data->end_lsn = pq_getmsgint64(in);
+ commit_data->committime = pq_getmsgint64(in);
+
+ return xid;
+}
+
+/*
+ * Write STREAM ABORT to the output stream. Note that xid and subxid will be
+ * same for the top-level transaction abort.
+ */
+void
+logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
+ TransactionId subxid)
+{
+ pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
+
+ Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
+
+ /* transaction ID */
+ pq_sendint32(out, xid);
+ pq_sendint32(out, subxid);
+}
+
+/*
+ * Read STREAM ABORT from the output stream.
+ */
+void
+logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
+ TransactionId *subxid)
+{
+ Assert(xid && subxid);
+
+ *xid = pq_getmsgint(in, 4);
+ *subxid = pq_getmsgint(in, 4);
+}