diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 12:15:05 +0000 |
commit | 46651ce6fe013220ed397add242004d764fc0153 (patch) | |
tree | 6e5299f990f88e60174a1d3ae6e48eedd2688b2b /src/backend/replication/logical/proto.c | |
parent | Initial commit. (diff) | |
download | postgresql-14-upstream.tar.xz postgresql-14-upstream.zip |
Adding upstream version 14.5.upstream/14.5upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/backend/replication/logical/proto.c')
-rw-r--r-- | src/backend/replication/logical/proto.c | 900 |
1 files changed, 900 insertions, 0 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c new file mode 100644 index 0000000..1cf59e0 --- /dev/null +++ b/src/backend/replication/logical/proto.c @@ -0,0 +1,900 @@ +/*------------------------------------------------------------------------- + * + * proto.c + * logical replication protocol functions + * + * Copyright (c) 2015-2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/proto.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/sysattr.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_type.h" +#include "libpq/pqformat.h" +#include "replication/logicalproto.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" + +/* + * Protocol message flags. + */ +#define LOGICALREP_IS_REPLICA_IDENTITY 1 + +#define MESSAGE_TRANSACTIONAL (1<<0) +#define TRUNCATE_CASCADE (1<<0) +#define TRUNCATE_RESTART_SEQS (1<<1) + +static void logicalrep_write_attrs(StringInfo out, Relation rel); +static void logicalrep_write_tuple(StringInfo out, Relation rel, + HeapTuple tuple, bool binary); + +static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); +static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); + +static void logicalrep_write_namespace(StringInfo out, Oid nspid); +static const char *logicalrep_read_namespace(StringInfo in); + +/* + * Write BEGIN to the output stream. + */ +void +logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN); + + /* fixed fields */ + pq_sendint64(out, txn->final_lsn); + pq_sendint64(out, txn->commit_time); + pq_sendint32(out, txn->xid); +} + +/* + * Read transaction BEGIN from the stream. + */ +void +logicalrep_read_begin(StringInfo in, LogicalRepBeginData *begin_data) +{ + /* read fields */ + begin_data->final_lsn = pq_getmsgint64(in); + if (begin_data->final_lsn == InvalidXLogRecPtr) + elog(ERROR, "final_lsn not set in begin message"); + begin_data->committime = pq_getmsgint64(in); + begin_data->xid = pq_getmsgint(in, 4); +} + + +/* + * Write COMMIT to the output stream. + */ +void +logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT); + + /* send the flags field (unused for now) */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, commit_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); +} + +/* + * Read transaction COMMIT from the stream. + */ +void +logicalrep_read_commit(StringInfo in, LogicalRepCommitData *commit_data) +{ + /* read flags (unused for now) */ + uint8 flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in commit message", flags); + + /* read fields */ + commit_data->commit_lsn = pq_getmsgint64(in); + commit_data->end_lsn = pq_getmsgint64(in); + commit_data->committime = pq_getmsgint64(in); +} + +/* + * Write ORIGIN to the output stream. + */ +void +logicalrep_write_origin(StringInfo out, const char *origin, + XLogRecPtr origin_lsn) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN); + + /* fixed fields */ + pq_sendint64(out, origin_lsn); + + /* origin string */ + pq_sendstring(out, origin); +} + +/* + * Read ORIGIN from the output stream. + */ +char * +logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) +{ + /* fixed fields */ + *origin_lsn = pq_getmsgint64(in); + + /* return origin */ + return pstrdup(pq_getmsgstring(in)); +} + +/* + * Write INSERT to the output stream. + */ +void +logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, + HeapTuple newtuple, bool binary) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + /* use Oid as relation identifier */ + pq_sendint32(out, RelationGetRelid(rel)); + + pq_sendbyte(out, 'N'); /* new tuple follows */ + logicalrep_write_tuple(out, rel, newtuple, binary); +} + +/* + * Read INSERT from stream. + * + * Fills the new tuple. + */ +LogicalRepRelId +logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) +{ + char action; + LogicalRepRelId relid; + + /* read the relation id */ + relid = pq_getmsgint(in, 4); + + action = pq_getmsgbyte(in); + if (action != 'N') + elog(ERROR, "expected new tuple but got %d", + action); + + logicalrep_read_tuple(in, newtup); + + return relid; +} + +/* + * Write UPDATE to the output stream. + */ +void +logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, + HeapTuple oldtuple, HeapTuple newtuple, bool binary) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); + + Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || + rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || + rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + /* use Oid as relation identifier */ + pq_sendint32(out, RelationGetRelid(rel)); + + if (oldtuple != NULL) + { + if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + pq_sendbyte(out, 'O'); /* old tuple follows */ + else + pq_sendbyte(out, 'K'); /* old key follows */ + logicalrep_write_tuple(out, rel, oldtuple, binary); + } + + pq_sendbyte(out, 'N'); /* new tuple follows */ + logicalrep_write_tuple(out, rel, newtuple, binary); +} + +/* + * Read UPDATE from stream. + */ +LogicalRepRelId +logicalrep_read_update(StringInfo in, bool *has_oldtuple, + LogicalRepTupleData *oldtup, + LogicalRepTupleData *newtup) +{ + char action; + LogicalRepRelId relid; + + /* read the relation id */ + relid = pq_getmsgint(in, 4); + + /* read and verify action */ + action = pq_getmsgbyte(in); + if (action != 'K' && action != 'O' && action != 'N') + elog(ERROR, "expected action 'N', 'O' or 'K', got %c", + action); + + /* check for old tuple */ + if (action == 'K' || action == 'O') + { + logicalrep_read_tuple(in, oldtup); + *has_oldtuple = true; + + action = pq_getmsgbyte(in); + } + else + *has_oldtuple = false; + + /* check for new tuple */ + if (action != 'N') + elog(ERROR, "expected action 'N', got %c", + action); + + logicalrep_read_tuple(in, newtup); + + return relid; +} + +/* + * Write DELETE to the output stream. + */ +void +logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, + HeapTuple oldtuple, bool binary) +{ + Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || + rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || + rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); + + pq_sendbyte(out, LOGICAL_REP_MSG_DELETE); + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + /* use Oid as relation identifier */ + pq_sendint32(out, RelationGetRelid(rel)); + + if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + pq_sendbyte(out, 'O'); /* old tuple follows */ + else + pq_sendbyte(out, 'K'); /* old key follows */ + + logicalrep_write_tuple(out, rel, oldtuple, binary); +} + +/* + * Read DELETE from stream. + * + * Fills the old tuple. + */ +LogicalRepRelId +logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) +{ + char action; + LogicalRepRelId relid; + + /* read the relation id */ + relid = pq_getmsgint(in, 4); + + /* read and verify action */ + action = pq_getmsgbyte(in); + if (action != 'K' && action != 'O') + elog(ERROR, "expected action 'O' or 'K', got %c", action); + + logicalrep_read_tuple(in, oldtup); + + return relid; +} + +/* + * Write TRUNCATE to the output stream. + */ +void +logicalrep_write_truncate(StringInfo out, + TransactionId xid, + int nrelids, + Oid relids[], + bool cascade, bool restart_seqs) +{ + int i; + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE); + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + pq_sendint32(out, nrelids); + + /* encode and send truncate flags */ + if (cascade) + flags |= TRUNCATE_CASCADE; + if (restart_seqs) + flags |= TRUNCATE_RESTART_SEQS; + pq_sendint8(out, flags); + + for (i = 0; i < nrelids; i++) + pq_sendint32(out, relids[i]); +} + +/* + * Read TRUNCATE from stream. + */ +List * +logicalrep_read_truncate(StringInfo in, + bool *cascade, bool *restart_seqs) +{ + int i; + int nrelids; + List *relids = NIL; + uint8 flags; + + nrelids = pq_getmsgint(in, 4); + + /* read and decode truncate flags */ + flags = pq_getmsgint(in, 1); + *cascade = (flags & TRUNCATE_CASCADE) > 0; + *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0; + + for (i = 0; i < nrelids; i++) + relids = lappend_oid(relids, pq_getmsgint(in, 4)); + + return relids; +} + +/* + * Write MESSAGE to stream + */ +void +logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, + bool transactional, const char *prefix, Size sz, + const char *message) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE); + + /* encode and send message flags */ + if (transactional) + flags |= MESSAGE_TRANSACTIONAL; + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + pq_sendint8(out, flags); + pq_sendint64(out, lsn); + pq_sendstring(out, prefix); + pq_sendint32(out, sz); + pq_sendbytes(out, message, sz); +} + +/* + * Write relation description to the output stream. + */ +void +logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) +{ + char *relname; + + pq_sendbyte(out, LOGICAL_REP_MSG_RELATION); + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + /* use Oid as relation identifier */ + pq_sendint32(out, RelationGetRelid(rel)); + + /* send qualified relation name */ + logicalrep_write_namespace(out, RelationGetNamespace(rel)); + relname = RelationGetRelationName(rel); + pq_sendstring(out, relname); + + /* send replica identity */ + pq_sendbyte(out, rel->rd_rel->relreplident); + + /* send the attribute info */ + logicalrep_write_attrs(out, rel); +} + +/* + * Read the relation info from stream and return as LogicalRepRelation. + */ +LogicalRepRelation * +logicalrep_read_rel(StringInfo in) +{ + LogicalRepRelation *rel = palloc(sizeof(LogicalRepRelation)); + + rel->remoteid = pq_getmsgint(in, 4); + + /* Read relation name from stream */ + rel->nspname = pstrdup(logicalrep_read_namespace(in)); + rel->relname = pstrdup(pq_getmsgstring(in)); + + /* Read the replica identity. */ + rel->replident = pq_getmsgbyte(in); + + /* Get attribute description */ + logicalrep_read_attrs(in, rel); + + return rel; +} + +/* + * Write type info to the output stream. + * + * This function will always write base type info. + */ +void +logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid) +{ + Oid basetypoid = getBaseType(typoid); + HeapTuple tup; + Form_pg_type typtup; + + pq_sendbyte(out, LOGICAL_REP_MSG_TYPE); + + /* transaction ID (if not valid, we're not streaming) */ + if (TransactionIdIsValid(xid)) + pq_sendint32(out, xid); + + tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for type %u", basetypoid); + typtup = (Form_pg_type) GETSTRUCT(tup); + + /* use Oid as relation identifier */ + pq_sendint32(out, typoid); + + /* send qualified type name */ + logicalrep_write_namespace(out, typtup->typnamespace); + pq_sendstring(out, NameStr(typtup->typname)); + + ReleaseSysCache(tup); +} + +/* + * Read type info from the output stream. + */ +void +logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) +{ + ltyp->remoteid = pq_getmsgint(in, 4); + + /* Read type name from stream */ + ltyp->nspname = pstrdup(logicalrep_read_namespace(in)); + ltyp->typname = pstrdup(pq_getmsgstring(in)); +} + +/* + * Write a tuple to the outputstream, in the most efficient format possible. + */ +static void +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) +{ + TupleDesc desc; + Datum values[MaxTupleAttributeNumber]; + bool isnull[MaxTupleAttributeNumber]; + int i; + uint16 nliveatts = 0; + + desc = RelationGetDescr(rel); + + for (i = 0; i < desc->natts; i++) + { + if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) + continue; + nliveatts++; + } + pq_sendint16(out, nliveatts); + + /* try to allocate enough memory from the get-go */ + enlargeStringInfo(out, tuple->t_len + + nliveatts * (1 + 4)); + + heap_deform_tuple(tuple, desc, values, isnull); + + /* Write the values */ + for (i = 0; i < desc->natts; i++) + { + HeapTuple typtup; + Form_pg_type typclass; + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || att->attgenerated) + continue; + + if (isnull[i]) + { + pq_sendbyte(out, LOGICALREP_COLUMN_NULL); + continue; + } + + if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i])) + { + /* + * Unchanged toasted datum. (Note that we don't promise to detect + * unchanged data in general; this is just a cheap check to avoid + * sending large values unnecessarily.) + */ + pq_sendbyte(out, LOGICALREP_COLUMN_UNCHANGED); + continue; + } + + typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid)); + if (!HeapTupleIsValid(typtup)) + elog(ERROR, "cache lookup failed for type %u", att->atttypid); + typclass = (Form_pg_type) GETSTRUCT(typtup); + + /* + * Send in binary if requested and type has suitable send function. + */ + if (binary && OidIsValid(typclass->typsend)) + { + bytea *outputbytes; + int len; + + pq_sendbyte(out, LOGICALREP_COLUMN_BINARY); + outputbytes = OidSendFunctionCall(typclass->typsend, values[i]); + len = VARSIZE(outputbytes) - VARHDRSZ; + pq_sendint(out, len, 4); /* length */ + pq_sendbytes(out, VARDATA(outputbytes), len); /* data */ + pfree(outputbytes); + } + else + { + char *outputstr; + + pq_sendbyte(out, LOGICALREP_COLUMN_TEXT); + outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]); + pq_sendcountedtext(out, outputstr, strlen(outputstr), false); + pfree(outputstr); + } + + ReleaseSysCache(typtup); + } +} + +/* + * Read tuple in logical replication format from stream. + */ +static void +logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) +{ + int i; + int natts; + + /* Get number of attributes */ + natts = pq_getmsgint(in, 2); + + /* Allocate space for per-column values; zero out unused StringInfoDatas */ + tuple->colvalues = (StringInfoData *) palloc0(natts * sizeof(StringInfoData)); + tuple->colstatus = (char *) palloc(natts * sizeof(char)); + tuple->ncols = natts; + + /* Read the data */ + for (i = 0; i < natts; i++) + { + char kind; + int len; + StringInfo value = &tuple->colvalues[i]; + + kind = pq_getmsgbyte(in); + tuple->colstatus[i] = kind; + + switch (kind) + { + case LOGICALREP_COLUMN_NULL: + /* nothing more to do */ + break; + case LOGICALREP_COLUMN_UNCHANGED: + /* we don't receive the value of an unchanged column */ + break; + case LOGICALREP_COLUMN_TEXT: + len = pq_getmsgint(in, 4); /* read length */ + + /* and data */ + value->data = palloc(len + 1); + pq_copymsgbytes(in, value->data, len); + value->data[len] = '\0'; + /* make StringInfo fully valid */ + value->len = len; + value->cursor = 0; + value->maxlen = len; + break; + case LOGICALREP_COLUMN_BINARY: + len = pq_getmsgint(in, 4); /* read length */ + + /* and data */ + value->data = palloc(len + 1); + pq_copymsgbytes(in, value->data, len); + /* not strictly necessary but per StringInfo practice */ + value->data[len] = '\0'; + /* make StringInfo fully valid */ + value->len = len; + value->cursor = 0; + value->maxlen = len; + break; + default: + elog(ERROR, "unrecognized data representation type '%c'", kind); + } + } +} + +/* + * Write relation attribute metadata to the stream. + */ +static void +logicalrep_write_attrs(StringInfo out, Relation rel) +{ + TupleDesc desc; + int i; + uint16 nliveatts = 0; + Bitmapset *idattrs = NULL; + bool replidentfull; + + desc = RelationGetDescr(rel); + + /* send number of live attributes */ + for (i = 0; i < desc->natts; i++) + { + if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) + continue; + nliveatts++; + } + pq_sendint16(out, nliveatts); + + /* fetch bitmap of REPLICATION IDENTITY attributes */ + replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); + if (!replidentfull) + idattrs = RelationGetIdentityKeyBitmap(rel); + + /* send the attributes */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + uint8 flags = 0; + + if (att->attisdropped || att->attgenerated) + continue; + + /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ + if (replidentfull || + bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + idattrs)) + flags |= LOGICALREP_IS_REPLICA_IDENTITY; + + pq_sendbyte(out, flags); + + /* attribute name */ + pq_sendstring(out, NameStr(att->attname)); + + /* attribute type id */ + pq_sendint32(out, (int) att->atttypid); + + /* attribute mode */ + pq_sendint32(out, att->atttypmod); + } + + bms_free(idattrs); +} + +/* + * Read relation attribute metadata from the stream. + */ +static void +logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel) +{ + int i; + int natts; + char **attnames; + Oid *atttyps; + Bitmapset *attkeys = NULL; + + natts = pq_getmsgint(in, 2); + attnames = palloc(natts * sizeof(char *)); + atttyps = palloc(natts * sizeof(Oid)); + + /* read the attributes */ + for (i = 0; i < natts; i++) + { + uint8 flags; + + /* Check for replica identity column */ + flags = pq_getmsgbyte(in); + if (flags & LOGICALREP_IS_REPLICA_IDENTITY) + attkeys = bms_add_member(attkeys, i); + + /* attribute name */ + attnames[i] = pstrdup(pq_getmsgstring(in)); + + /* attribute type id */ + atttyps[i] = (Oid) pq_getmsgint(in, 4); + + /* we ignore attribute mode for now */ + (void) pq_getmsgint(in, 4); + } + + rel->attnames = attnames; + rel->atttyps = atttyps; + rel->attkeys = attkeys; + rel->natts = natts; +} + +/* + * Write the namespace name or empty string for pg_catalog (to save space). + */ +static void +logicalrep_write_namespace(StringInfo out, Oid nspid) +{ + if (nspid == PG_CATALOG_NAMESPACE) + pq_sendbyte(out, '\0'); + else + { + char *nspname = get_namespace_name(nspid); + + if (nspname == NULL) + elog(ERROR, "cache lookup failed for namespace %u", + nspid); + + pq_sendstring(out, nspname); + } +} + +/* + * Read the namespace name while treating empty string as pg_catalog. + */ +static const char * +logicalrep_read_namespace(StringInfo in) +{ + const char *nspname = pq_getmsgstring(in); + + if (nspname[0] == '\0') + nspname = "pg_catalog"; + + return nspname; +} + +/* + * Write the information for the start stream message to the output stream. + */ +void +logicalrep_write_stream_start(StringInfo out, + TransactionId xid, bool first_segment) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START); + + Assert(TransactionIdIsValid(xid)); + + /* transaction ID (we're starting to stream, so must be valid) */ + pq_sendint32(out, xid); + + /* 1 if this is the first streaming segment for this xid */ + pq_sendbyte(out, first_segment ? 1 : 0); +} + +/* + * Read the information about the start stream message from output stream. + */ +TransactionId +logicalrep_read_stream_start(StringInfo in, bool *first_segment) +{ + TransactionId xid; + + Assert(first_segment); + + xid = pq_getmsgint(in, 4); + *first_segment = (pq_getmsgbyte(in) == 1); + + return xid; +} + +/* + * Write the stop stream message to the output stream. + */ +void +logicalrep_write_stream_stop(StringInfo out) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_END); +} + +/* + * Write STREAM COMMIT to the output stream. + */ +void +logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT); + + Assert(TransactionIdIsValid(txn->xid)); + + /* transaction ID */ + pq_sendint32(out, txn->xid); + + /* send the flags field (unused for now) */ + pq_sendbyte(out, flags); + + /* send fields */ + pq_sendint64(out, commit_lsn); + pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, txn->commit_time); +} + +/* + * Read STREAM COMMIT from the output stream. + */ +TransactionId +logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data) +{ + TransactionId xid; + uint8 flags; + + xid = pq_getmsgint(in, 4); + + /* read flags (unused for now) */ + flags = pq_getmsgbyte(in); + + if (flags != 0) + elog(ERROR, "unrecognized flags %u in commit message", flags); + + /* read fields */ + commit_data->commit_lsn = pq_getmsgint64(in); + commit_data->end_lsn = pq_getmsgint64(in); + commit_data->committime = pq_getmsgint64(in); + + return xid; +} + +/* + * Write STREAM ABORT to the output stream. Note that xid and subxid will be + * same for the top-level transaction abort. + */ +void +logicalrep_write_stream_abort(StringInfo out, TransactionId xid, + TransactionId subxid) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT); + + Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid)); + + /* transaction ID */ + pq_sendint32(out, xid); + pq_sendint32(out, subxid); +} + +/* + * Read STREAM ABORT from the output stream. + */ +void +logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, + TransactionId *subxid) +{ + Assert(xid && subxid); + + *xid = pq_getmsgint(in, 4); + *subxid = pq_getmsgint(in, 4); +} |