diff options
Diffstat (limited to 'contrib/tcn/tcn.c')
-rw-r--r-- | contrib/tcn/tcn.c | 180 |
1 files changed, 180 insertions, 0 deletions
diff --git a/contrib/tcn/tcn.c b/contrib/tcn/tcn.c new file mode 100644 index 0000000..0684702 --- /dev/null +++ b/contrib/tcn/tcn.c @@ -0,0 +1,180 @@ +/*------------------------------------------------------------------------- + * + * tcn.c + * triggered change notification support for PostgreSQL + * + * Portions Copyright (c) 2011-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * contrib/tcn/tcn.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "commands/async.h" +#include "commands/trigger.h" +#include "executor/spi.h" +#include "lib/stringinfo.h" +#include "utils/rel.h" +#include "utils/syscache.h" + +PG_MODULE_MAGIC; + +/* + * Copy from s (for source) to r (for result), wrapping with q (quote) + * characters and doubling any quote characters found. + */ +static void +strcpy_quoted(StringInfo r, const char *s, const char q) +{ + appendStringInfoCharMacro(r, q); + while (*s) + { + if (*s == q) + appendStringInfoCharMacro(r, q); + appendStringInfoCharMacro(r, *s); + s++; + } + appendStringInfoCharMacro(r, q); +} + +/* + * triggered_change_notification + * + * This trigger function will send a notification of data modification with + * primary key values. The channel will be "tcn" unless the trigger is + * created with a parameter, in which case that parameter will be used. + */ +PG_FUNCTION_INFO_V1(triggered_change_notification); + +Datum +triggered_change_notification(PG_FUNCTION_ARGS) +{ + TriggerData *trigdata = (TriggerData *) fcinfo->context; + Trigger *trigger; + int nargs; + HeapTuple trigtuple; + Relation rel; + TupleDesc tupdesc; + char *channel; + char operation; + StringInfo payload = makeStringInfo(); + bool foundPK; + + List *indexoidlist; + ListCell *indexoidscan; + + /* make sure it's called as a trigger */ + if (!CALLED_AS_TRIGGER(fcinfo)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called as trigger"))); + + /* and that it's called after the change */ + if (!TRIGGER_FIRED_AFTER(trigdata->tg_event)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called after the change"))); + + /* and that it's called for each row */ + if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called for each row"))); + + if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) + operation = 'I'; + else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) + operation = 'U'; + else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) + operation = 'D'; + else + { + elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation"); + operation = 'X'; /* silence compiler warning */ + } + + trigger = trigdata->tg_trigger; + nargs = trigger->tgnargs; + if (nargs > 1) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must not be called with more than one parameter"))); + + if (nargs == 0) + channel = "tcn"; + else + channel = trigger->tgargs[0]; + + /* get tuple data */ + trigtuple = trigdata->tg_trigtuple; + rel = trigdata->tg_relation; + tupdesc = rel->rd_att; + + foundPK = false; + + /* + * Get the list of index OIDs for the table from the relcache, and look up + * each one in the pg_index syscache until we find one marked primary key + * (hopefully there isn't more than one such). + */ + indexoidlist = RelationGetIndexList(rel); + + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirst_oid(indexoidscan); + HeapTuple indexTuple; + Form_pg_index index; + + indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); + if (!HeapTupleIsValid(indexTuple)) /* should not happen */ + elog(ERROR, "cache lookup failed for index %u", indexoid); + index = (Form_pg_index) GETSTRUCT(indexTuple); + /* we're only interested if it is the primary key and valid */ + if (index->indisprimary && index->indisvalid) + { + int indnkeyatts = index->indnkeyatts; + + if (indnkeyatts > 0) + { + int i; + + foundPK = true; + + strcpy_quoted(payload, RelationGetRelationName(rel), '"'); + appendStringInfoCharMacro(payload, ','); + appendStringInfoCharMacro(payload, operation); + + for (i = 0; i < indnkeyatts; i++) + { + int colno = index->indkey.values[i]; + Form_pg_attribute attr = TupleDescAttr(tupdesc, colno - 1); + + appendStringInfoCharMacro(payload, ','); + strcpy_quoted(payload, NameStr(attr->attname), '"'); + appendStringInfoCharMacro(payload, '='); + strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\''); + } + + Async_Notify(channel, payload->data); + } + ReleaseSysCache(indexTuple); + break; + } + ReleaseSysCache(indexTuple); + } + + list_free(indexoidlist); + + if (!foundPK) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called on a table with a primary key"))); + + return PointerGetDatum(NULL); /* after trigger; value doesn't matter */ +} |