diff options
Diffstat (limited to '')
-rw-r--r-- | contrib/tcn/Makefile | 18 | ||||
-rw-r--r-- | contrib/tcn/tcn--1.0.sql | 9 | ||||
-rw-r--r-- | contrib/tcn/tcn.c | 180 | ||||
-rw-r--r-- | contrib/tcn/tcn.control | 6 |
4 files changed, 213 insertions, 0 deletions
diff --git a/contrib/tcn/Makefile b/contrib/tcn/Makefile new file mode 100644 index 0000000..2de3425 --- /dev/null +++ b/contrib/tcn/Makefile @@ -0,0 +1,18 @@ +# contrib/tcn/Makefile + +MODULES = tcn + +EXTENSION = tcn +DATA = tcn--1.0.sql +PGFILEDESC = "tcn - trigger function notifying listeners" + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/tcn +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/tcn/tcn--1.0.sql b/contrib/tcn/tcn--1.0.sql new file mode 100644 index 0000000..027a4ef --- /dev/null +++ b/contrib/tcn/tcn--1.0.sql @@ -0,0 +1,9 @@ +/* contrib/tcn/tcn--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION tcn" to load this file. \quit + +CREATE FUNCTION triggered_change_notification() +RETURNS pg_catalog.trigger +AS 'MODULE_PATHNAME' +LANGUAGE C; diff --git a/contrib/tcn/tcn.c b/contrib/tcn/tcn.c new file mode 100644 index 0000000..552f107 --- /dev/null +++ b/contrib/tcn/tcn.c @@ -0,0 +1,180 @@ +/*------------------------------------------------------------------------- + * + * tcn.c + * triggered change notification support for PostgreSQL + * + * Portions Copyright (c) 2011-2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * contrib/tcn/tcn.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "commands/async.h" +#include "commands/trigger.h" +#include "executor/spi.h" +#include "lib/stringinfo.h" +#include "utils/rel.h" +#include "utils/syscache.h" + +PG_MODULE_MAGIC; + +/* + * Copy from s (for source) to r (for result), wrapping with q (quote) + * characters and doubling any quote characters found. + */ +static void +strcpy_quoted(StringInfo r, const char *s, const char q) +{ + appendStringInfoCharMacro(r, q); + while (*s) + { + if (*s == q) + appendStringInfoCharMacro(r, q); + appendStringInfoCharMacro(r, *s); + s++; + } + appendStringInfoCharMacro(r, q); +} + +/* + * triggered_change_notification + * + * This trigger function will send a notification of data modification with + * primary key values. The channel will be "tcn" unless the trigger is + * created with a parameter, in which case that parameter will be used. + */ +PG_FUNCTION_INFO_V1(triggered_change_notification); + +Datum +triggered_change_notification(PG_FUNCTION_ARGS) +{ + TriggerData *trigdata = (TriggerData *) fcinfo->context; + Trigger *trigger; + int nargs; + HeapTuple trigtuple; + Relation rel; + TupleDesc tupdesc; + char *channel; + char operation; + StringInfo payload = makeStringInfo(); + bool foundPK; + + List *indexoidlist; + ListCell *indexoidscan; + + /* make sure it's called as a trigger */ + if (!CALLED_AS_TRIGGER(fcinfo)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called as trigger"))); + + /* and that it's called after the change */ + if (!TRIGGER_FIRED_AFTER(trigdata->tg_event)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called after the change"))); + + /* and that it's called for each row */ + if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called for each row"))); + + if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) + operation = 'I'; + else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) + operation = 'U'; + else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) + operation = 'D'; + else + { + elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation"); + operation = 'X'; /* silence compiler warning */ + } + + trigger = trigdata->tg_trigger; + nargs = trigger->tgnargs; + if (nargs > 1) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must not be called with more than one parameter"))); + + if (nargs == 0) + channel = "tcn"; + else + channel = trigger->tgargs[0]; + + /* get tuple data */ + trigtuple = trigdata->tg_trigtuple; + rel = trigdata->tg_relation; + tupdesc = rel->rd_att; + + foundPK = false; + + /* + * Get the list of index OIDs for the table from the relcache, and look up + * each one in the pg_index syscache until we find one marked primary key + * (hopefully there isn't more than one such). + */ + indexoidlist = RelationGetIndexList(rel); + + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirst_oid(indexoidscan); + HeapTuple indexTuple; + Form_pg_index index; + + indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); + if (!HeapTupleIsValid(indexTuple)) /* should not happen */ + elog(ERROR, "cache lookup failed for index %u", indexoid); + index = (Form_pg_index) GETSTRUCT(indexTuple); + /* we're only interested if it is the primary key and valid */ + if (index->indisprimary && index->indisvalid) + { + int indnkeyatts = index->indnkeyatts; + + if (indnkeyatts > 0) + { + int i; + + foundPK = true; + + strcpy_quoted(payload, RelationGetRelationName(rel), '"'); + appendStringInfoCharMacro(payload, ','); + appendStringInfoCharMacro(payload, operation); + + for (i = 0; i < indnkeyatts; i++) + { + int colno = index->indkey.values[i]; + Form_pg_attribute attr = TupleDescAttr(tupdesc, colno - 1); + + appendStringInfoCharMacro(payload, ','); + strcpy_quoted(payload, NameStr(attr->attname), '"'); + appendStringInfoCharMacro(payload, '='); + strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\''); + } + + Async_Notify(channel, payload->data); + } + ReleaseSysCache(indexTuple); + break; + } + ReleaseSysCache(indexTuple); + } + + list_free(indexoidlist); + + if (!foundPK) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called on a table with a primary key"))); + + return PointerGetDatum(NULL); /* after trigger; value doesn't matter */ +} diff --git a/contrib/tcn/tcn.control b/contrib/tcn/tcn.control new file mode 100644 index 0000000..6972e11 --- /dev/null +++ b/contrib/tcn/tcn.control @@ -0,0 +1,6 @@ +# tcn extension +comment = 'Triggered change notifications' +default_version = '1.0' +module_pathname = '$libdir/tcn' +relocatable = true +trusted = true |