diff options
Diffstat (limited to 'src/backend/commands/matview.c')
-rw-r--r-- | src/backend/commands/matview.c | 955 |
1 files changed, 955 insertions, 0 deletions
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c new file mode 100644 index 0000000..9451a8c --- /dev/null +++ b/src/backend/commands/matview.c @@ -0,0 +1,955 @@ +/*------------------------------------------------------------------------- + * + * matview.c + * materialized view support + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/commands/matview.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/genam.h" +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/multixact.h" +#include "access/tableam.h" +#include "access/xact.h" +#include "access/xlog.h" +#include "catalog/catalog.h" +#include "catalog/indexing.h" +#include "catalog/namespace.h" +#include "catalog/pg_am.h" +#include "catalog/pg_opclass.h" +#include "catalog/pg_operator.h" +#include "commands/cluster.h" +#include "commands/matview.h" +#include "commands/tablecmds.h" +#include "commands/tablespace.h" +#include "executor/executor.h" +#include "executor/spi.h" +#include "miscadmin.h" +#include "parser/parse_relation.h" +#include "pgstat.h" +#include "rewrite/rewriteHandler.h" +#include "storage/lmgr.h" +#include "storage/smgr.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" + + +typedef struct +{ + DestReceiver pub; /* publicly-known function pointers */ + Oid transientoid; /* OID of new heap into which to store */ + /* These fields are filled by transientrel_startup: */ + Relation transientrel; /* relation to write to */ + CommandId output_cid; /* cmin to insert in output tuples */ + int ti_options; /* table_tuple_insert performance options */ + BulkInsertState bistate; /* bulk insert state */ +} DR_transientrel; + +static int matview_maintenance_depth = 0; + +static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo); +static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self); +static void transientrel_shutdown(DestReceiver *self); +static void transientrel_destroy(DestReceiver *self); +static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query, + const char *queryString); +static char *make_temptable_name_n(char *tempname, int n); +static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, + int save_sec_context); +static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence); +static bool is_usable_unique_index(Relation indexRel); +static void OpenMatViewIncrementalMaintenance(void); +static void CloseMatViewIncrementalMaintenance(void); + +/* + * SetMatViewPopulatedState + * Mark a materialized view as populated, or not. + * + * NOTE: caller must be holding an appropriate lock on the relation. + */ +void +SetMatViewPopulatedState(Relation relation, bool newstate) +{ + Relation pgrel; + HeapTuple tuple; + + Assert(relation->rd_rel->relkind == RELKIND_MATVIEW); + + /* + * Update relation's pg_class entry. Crucial side-effect: other backends + * (and this one too!) are sent SI message to make them rebuild relcache + * entries. + */ + pgrel = table_open(RelationRelationId, RowExclusiveLock); + tuple = SearchSysCacheCopy1(RELOID, + ObjectIdGetDatum(RelationGetRelid(relation))); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "cache lookup failed for relation %u", + RelationGetRelid(relation)); + + ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate; + + CatalogTupleUpdate(pgrel, &tuple->t_self, tuple); + + heap_freetuple(tuple); + table_close(pgrel, RowExclusiveLock); + + /* + * Advance command counter to make the updated pg_class row locally + * visible. + */ + CommandCounterIncrement(); +} + +/* + * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command + * + * This refreshes the materialized view by creating a new table and swapping + * the relfilenumbers of the new table and the old materialized view, so the OID + * of the original materialized view is preserved. Thus we do not lose GRANT + * nor references to this materialized view. + * + * If WITH NO DATA was specified, this is effectively like a TRUNCATE; + * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT + * statement associated with the materialized view. The statement node's + * skipData field shows whether the clause was used. + * + * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading + * the new heap, it's better to create the indexes afterwards than to fill them + * incrementally while we load. + * + * The matview's "populated" state is changed based on whether the contents + * reflect the result set of the materialized view's query. + */ +ObjectAddress +ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, + ParamListInfo params, QueryCompletion *qc) +{ + Oid matviewOid; + Relation matviewRel; + RewriteRule *rule; + List *actions; + Query *dataQuery; + Oid tableSpace; + Oid relowner; + Oid OIDNewHeap; + DestReceiver *dest; + uint64 processed = 0; + bool concurrent; + LOCKMODE lockmode; + char relpersistence; + Oid save_userid; + int save_sec_context; + int save_nestlevel; + ObjectAddress address; + + /* Determine strength of lock needed. */ + concurrent = stmt->concurrent; + lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock; + + /* + * Get a lock until end of transaction. + */ + matviewOid = RangeVarGetRelidExtended(stmt->relation, + lockmode, 0, + RangeVarCallbackOwnsTable, NULL); + matviewRel = table_open(matviewOid, NoLock); + relowner = matviewRel->rd_rel->relowner; + + /* + * Switch to the owner's userid, so that any functions are run as that + * user. Also lock down security-restricted operations and arrange to + * make GUC variable changes local to this command. + */ + GetUserIdAndSecContext(&save_userid, &save_sec_context); + SetUserIdAndSecContext(relowner, + save_sec_context | SECURITY_RESTRICTED_OPERATION); + save_nestlevel = NewGUCNestLevel(); + + /* Make sure it is a materialized view. */ + if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("\"%s\" is not a materialized view", + RelationGetRelationName(matviewRel)))); + + /* Check that CONCURRENTLY is not specified if not populated. */ + if (concurrent && !RelationIsPopulated(matviewRel)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("CONCURRENTLY cannot be used when the materialized view is not populated"))); + + /* Check that conflicting options have not been specified. */ + if (concurrent && stmt->skipData) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s and %s options cannot be used together", + "CONCURRENTLY", "WITH NO DATA"))); + + /* + * Check that everything is correct for a refresh. Problems at this point + * are internal errors, so elog is sufficient. + */ + if (matviewRel->rd_rel->relhasrules == false || + matviewRel->rd_rules->numLocks < 1) + elog(ERROR, + "materialized view \"%s\" is missing rewrite information", + RelationGetRelationName(matviewRel)); + + if (matviewRel->rd_rules->numLocks > 1) + elog(ERROR, + "materialized view \"%s\" has too many rules", + RelationGetRelationName(matviewRel)); + + rule = matviewRel->rd_rules->rules[0]; + if (rule->event != CMD_SELECT || !(rule->isInstead)) + elog(ERROR, + "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule", + RelationGetRelationName(matviewRel)); + + actions = rule->actions; + if (list_length(actions) != 1) + elog(ERROR, + "the rule for materialized view \"%s\" is not a single action", + RelationGetRelationName(matviewRel)); + + /* + * Check that there is a unique index with no WHERE clause on one or more + * columns of the materialized view if CONCURRENTLY is specified. + */ + if (concurrent) + { + List *indexoidlist = RelationGetIndexList(matviewRel); + ListCell *indexoidscan; + bool hasUniqueIndex = false; + + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirst_oid(indexoidscan); + Relation indexRel; + + indexRel = index_open(indexoid, AccessShareLock); + hasUniqueIndex = is_usable_unique_index(indexRel); + index_close(indexRel, AccessShareLock); + if (hasUniqueIndex) + break; + } + + list_free(indexoidlist); + + if (!hasUniqueIndex) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot refresh materialized view \"%s\" concurrently", + quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)), + RelationGetRelationName(matviewRel))), + errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view."))); + } + + /* + * The stored query was rewritten at the time of the MV definition, but + * has not been scribbled on by the planner. + */ + dataQuery = linitial_node(Query, actions); + + /* + * Check for active uses of the relation in the current transaction, such + * as open scans. + * + * NB: We count on this to protect us against problems with refreshing the + * data using TABLE_INSERT_FROZEN. + */ + CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW"); + + /* + * Tentatively mark the matview as populated or not (this will roll back + * if we fail later). + */ + SetMatViewPopulatedState(matviewRel, !stmt->skipData); + + /* Concurrent refresh builds new data in temp tablespace, and does diff. */ + if (concurrent) + { + tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false); + relpersistence = RELPERSISTENCE_TEMP; + } + else + { + tableSpace = matviewRel->rd_rel->reltablespace; + relpersistence = matviewRel->rd_rel->relpersistence; + } + + /* + * Create the transient table that will receive the regenerated data. Lock + * it against access by any other process until commit (by which time it + * will be gone). + */ + OIDNewHeap = make_new_heap(matviewOid, tableSpace, + matviewRel->rd_rel->relam, + relpersistence, ExclusiveLock); + LockRelationOid(OIDNewHeap, AccessExclusiveLock); + dest = CreateTransientRelDestReceiver(OIDNewHeap); + + /* Generate the data, if wanted. */ + if (!stmt->skipData) + processed = refresh_matview_datafill(dest, dataQuery, queryString); + + /* Make the matview match the newly generated data. */ + if (concurrent) + { + int old_depth = matview_maintenance_depth; + + PG_TRY(); + { + refresh_by_match_merge(matviewOid, OIDNewHeap, relowner, + save_sec_context); + } + PG_CATCH(); + { + matview_maintenance_depth = old_depth; + PG_RE_THROW(); + } + PG_END_TRY(); + Assert(matview_maintenance_depth == old_depth); + } + else + { + refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence); + + /* + * Inform cumulative stats system about our activity: basically, we + * truncated the matview and inserted some new data. (The concurrent + * code path above doesn't need to worry about this because the + * inserts and deletes it issues get counted by lower-level code.) + */ + pgstat_count_truncate(matviewRel); + if (!stmt->skipData) + pgstat_count_heap_insert(matviewRel, processed); + } + + table_close(matviewRel, NoLock); + + /* Roll back any GUC changes */ + AtEOXact_GUC(false, save_nestlevel); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); + + ObjectAddressSet(address, RelationRelationId, matviewOid); + + /* + * Save the rowcount so that pg_stat_statements can track the total number + * of rows processed by REFRESH MATERIALIZED VIEW command. Note that we + * still don't display the rowcount in the command completion tag output, + * i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW + * command tag is left false in cmdtaglist.h. Otherwise, the change of + * completion tag output might break applications using it. + */ + if (qc) + SetQueryCompletion(qc, CMDTAG_REFRESH_MATERIALIZED_VIEW, processed); + + return address; +} + +/* + * refresh_matview_datafill + * + * Execute the given query, sending result rows to "dest" (which will + * insert them into the target matview). + * + * Returns number of rows inserted. + */ +static uint64 +refresh_matview_datafill(DestReceiver *dest, Query *query, + const char *queryString) +{ + List *rewritten; + PlannedStmt *plan; + QueryDesc *queryDesc; + Query *copied_query; + uint64 processed; + + /* Lock and rewrite, using a copy to preserve the original query. */ + copied_query = copyObject(query); + AcquireRewriteLocks(copied_query, true, false); + rewritten = QueryRewrite(copied_query); + + /* SELECT should never rewrite to more or less than one SELECT query */ + if (list_length(rewritten) != 1) + elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW"); + query = (Query *) linitial(rewritten); + + /* Check for user-requested abort. */ + CHECK_FOR_INTERRUPTS(); + + /* Plan the query which will generate data for the refresh. */ + plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL); + + /* + * Use a snapshot with an updated command ID to ensure this query sees + * results of any previously executed queries. (This could only matter if + * the planner executed an allegedly-stable function that changed the + * database contents, but let's do it anyway to be safe.) + */ + PushCopiedSnapshot(GetActiveSnapshot()); + UpdateActiveSnapshotCommandId(); + + /* Create a QueryDesc, redirecting output to our tuple receiver */ + queryDesc = CreateQueryDesc(plan, queryString, + GetActiveSnapshot(), InvalidSnapshot, + dest, NULL, NULL, 0); + + /* call ExecutorStart to prepare the plan for execution */ + ExecutorStart(queryDesc, 0); + + /* run the plan */ + ExecutorRun(queryDesc, ForwardScanDirection, 0, true); + + processed = queryDesc->estate->es_processed; + + /* and clean up */ + ExecutorFinish(queryDesc); + ExecutorEnd(queryDesc); + + FreeQueryDesc(queryDesc); + + PopActiveSnapshot(); + + return processed; +} + +DestReceiver * +CreateTransientRelDestReceiver(Oid transientoid) +{ + DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel)); + + self->pub.receiveSlot = transientrel_receive; + self->pub.rStartup = transientrel_startup; + self->pub.rShutdown = transientrel_shutdown; + self->pub.rDestroy = transientrel_destroy; + self->pub.mydest = DestTransientRel; + self->transientoid = transientoid; + + return (DestReceiver *) self; +} + +/* + * transientrel_startup --- executor startup + */ +static void +transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) +{ + DR_transientrel *myState = (DR_transientrel *) self; + Relation transientrel; + + transientrel = table_open(myState->transientoid, NoLock); + + /* + * Fill private fields of myState for use by later routines + */ + myState->transientrel = transientrel; + myState->output_cid = GetCurrentCommandId(true); + myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN; + myState->bistate = GetBulkInsertState(); + + /* + * Valid smgr_targblock implies something already wrote to the relation. + * This may be harmless, but this function hasn't planned for it. + */ + Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber); +} + +/* + * transientrel_receive --- receive one tuple + */ +static bool +transientrel_receive(TupleTableSlot *slot, DestReceiver *self) +{ + DR_transientrel *myState = (DR_transientrel *) self; + + /* + * Note that the input slot might not be of the type of the target + * relation. That's supported by table_tuple_insert(), but slightly less + * efficient than inserting with the right slot - but the alternative + * would be to copy into a slot of the right type, which would not be + * cheap either. This also doesn't allow accessing per-AM data (say a + * tuple's xmin), but since we don't do that here... + */ + + table_tuple_insert(myState->transientrel, + slot, + myState->output_cid, + myState->ti_options, + myState->bistate); + + /* We know this is a newly created relation, so there are no indexes */ + + return true; +} + +/* + * transientrel_shutdown --- executor end + */ +static void +transientrel_shutdown(DestReceiver *self) +{ + DR_transientrel *myState = (DR_transientrel *) self; + + FreeBulkInsertState(myState->bistate); + + table_finish_bulk_insert(myState->transientrel, myState->ti_options); + + /* close transientrel, but keep lock until commit */ + table_close(myState->transientrel, NoLock); + myState->transientrel = NULL; +} + +/* + * transientrel_destroy --- release DestReceiver object + */ +static void +transientrel_destroy(DestReceiver *self) +{ + pfree(self); +} + + +/* + * Given a qualified temporary table name, append an underscore followed by + * the given integer, to make a new table name based on the old one. + * The result is a palloc'd string. + * + * As coded, this would fail to make a valid SQL name if the given name were, + * say, "FOO"."BAR". Currently, the table name portion of the input will + * never be double-quoted because it's of the form "pg_temp_NNN", cf + * make_new_heap(). But we might have to work harder someday. + */ +static char * +make_temptable_name_n(char *tempname, int n) +{ + StringInfoData namebuf; + + initStringInfo(&namebuf); + appendStringInfoString(&namebuf, tempname); + appendStringInfo(&namebuf, "_%d", n); + return namebuf.data; +} + +/* + * refresh_by_match_merge + * + * Refresh a materialized view with transactional semantics, while allowing + * concurrent reads. + * + * This is called after a new version of the data has been created in a + * temporary table. It performs a full outer join against the old version of + * the data, producing "diff" results. This join cannot work if there are any + * duplicated rows in either the old or new versions, in the sense that every + * column would compare as equal between the two rows. It does work correctly + * in the face of rows which have at least one NULL value, with all non-NULL + * columns equal. The behavior of NULLs on equality tests and on UNIQUE + * indexes turns out to be quite convenient here; the tests we need to make + * are consistent with default behavior. If there is at least one UNIQUE + * index on the materialized view, we have exactly the guarantee we need. + * + * The temporary table used to hold the diff results contains just the TID of + * the old record (if matched) and the ROW from the new table as a single + * column of complex record type (if matched). + * + * Once we have the diff table, we perform set-based DELETE and INSERT + * operations against the materialized view, and discard both temporary + * tables. + * + * Everything from the generation of the new data to applying the differences + * takes place under cover of an ExclusiveLock, since it seems as though we + * would want to prohibit not only concurrent REFRESH operations, but also + * incremental maintenance. It also doesn't seem reasonable or safe to allow + * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by + * this command. + */ +static void +refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, + int save_sec_context) +{ + StringInfoData querybuf; + Relation matviewRel; + Relation tempRel; + char *matviewname; + char *tempname; + char *diffname; + TupleDesc tupdesc; + bool foundUniqueIndex; + List *indexoidlist; + ListCell *indexoidscan; + int16 relnatts; + Oid *opUsedForQual; + + initStringInfo(&querybuf); + matviewRel = table_open(matviewOid, NoLock); + matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)), + RelationGetRelationName(matviewRel)); + tempRel = table_open(tempOid, NoLock); + tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)), + RelationGetRelationName(tempRel)); + diffname = make_temptable_name_n(tempname, 2); + + relnatts = RelationGetNumberOfAttributes(matviewRel); + + /* Open SPI context. */ + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + /* Analyze the temp table with the new contents. */ + appendStringInfo(&querybuf, "ANALYZE %s", tempname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* + * We need to ensure that there are not duplicate rows without NULLs in + * the new data set before we can count on the "diff" results. Check for + * that in a way that allows showing the first duplicated row found. Even + * after we pass this test, a unique index on the materialized view may + * find a duplicate key problem. + * + * Note: here and below, we use "tablename.*::tablerowtype" as a hack to + * keep ".*" from being expanded into multiple columns in a SELECT list. + * Compare ruleutils.c's get_variable(). + */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "SELECT newdata.*::%s FROM %s newdata " + "WHERE newdata.* IS NOT NULL AND EXISTS " + "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL " + "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* " + "AND newdata2.ctid OPERATOR(pg_catalog.<>) " + "newdata.ctid)", + tempname, tempname, tempname); + if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + if (SPI_processed > 0) + { + /* + * Note that this ereport() is returning data to the user. Generally, + * we would want to make sure that the user has been granted access to + * this data. However, REFRESH MAT VIEW is only able to be run by the + * owner of the mat view (or a superuser) and therefore there is no + * need to check for access to data in the mat view. + */ + ereport(ERROR, + (errcode(ERRCODE_CARDINALITY_VIOLATION), + errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns", + RelationGetRelationName(matviewRel)), + errdetail("Row: %s", + SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1)))); + } + + /* + * Create the temporary "diff" table. + * + * Temporarily switch out of the SECURITY_RESTRICTED_OPERATION context, + * because you cannot create temp tables in SRO context. For extra + * paranoia, add the composite type column only after switching back to + * SRO context. + */ + SetUserIdAndSecContext(relowner, + save_sec_context | SECURITY_LOCAL_USERID_CHANGE); + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "CREATE TEMP TABLE %s (tid pg_catalog.tid)", + diffname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + SetUserIdAndSecContext(relowner, + save_sec_context | SECURITY_RESTRICTED_OPERATION); + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "ALTER TABLE %s ADD COLUMN newdata %s", + diffname, tempname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* Start building the query for populating the diff table. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "INSERT INTO %s " + "SELECT mv.ctid AS tid, newdata.*::%s AS newdata " + "FROM %s mv FULL JOIN %s newdata ON (", + diffname, tempname, matviewname, tempname); + + /* + * Get the list of index OIDs for the table from the relcache, and look up + * each one in the pg_index syscache. We will test for equality on all + * columns present in all unique indexes which only reference columns and + * include all rows. + */ + tupdesc = matviewRel->rd_att; + opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts); + foundUniqueIndex = false; + + indexoidlist = RelationGetIndexList(matviewRel); + + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirst_oid(indexoidscan); + Relation indexRel; + + indexRel = index_open(indexoid, RowExclusiveLock); + if (is_usable_unique_index(indexRel)) + { + Form_pg_index indexStruct = indexRel->rd_index; + int indnkeyatts = indexStruct->indnkeyatts; + oidvector *indclass; + Datum indclassDatum; + int i; + + /* Must get indclass the hard way. */ + indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, + indexRel->rd_indextuple, + Anum_pg_index_indclass); + indclass = (oidvector *) DatumGetPointer(indclassDatum); + + /* Add quals for all columns from this index. */ + for (i = 0; i < indnkeyatts; i++) + { + int attnum = indexStruct->indkey.values[i]; + Oid opclass = indclass->values[i]; + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + Oid attrtype = attr->atttypid; + HeapTuple cla_ht; + Form_pg_opclass cla_tup; + Oid opfamily; + Oid opcintype; + Oid op; + const char *leftop; + const char *rightop; + + /* + * Identify the equality operator associated with this index + * column. First we need to look up the column's opclass. + */ + cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass)); + if (!HeapTupleIsValid(cla_ht)) + elog(ERROR, "cache lookup failed for opclass %u", opclass); + cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht); + Assert(cla_tup->opcmethod == BTREE_AM_OID); + opfamily = cla_tup->opcfamily; + opcintype = cla_tup->opcintype; + ReleaseSysCache(cla_ht); + + op = get_opfamily_member(opfamily, opcintype, opcintype, + BTEqualStrategyNumber); + if (!OidIsValid(op)) + elog(ERROR, "missing operator %d(%u,%u) in opfamily %u", + BTEqualStrategyNumber, opcintype, opcintype, opfamily); + + /* + * If we find the same column with the same equality semantics + * in more than one index, we only need to emit the equality + * clause once. + * + * Since we only remember the last equality operator, this + * code could be fooled into emitting duplicate clauses given + * multiple indexes with several different opclasses ... but + * that's so unlikely it doesn't seem worth spending extra + * code to avoid. + */ + if (opUsedForQual[attnum - 1] == op) + continue; + opUsedForQual[attnum - 1] = op; + + /* + * Actually add the qual, ANDed with any others. + */ + if (foundUniqueIndex) + appendStringInfoString(&querybuf, " AND "); + + leftop = quote_qualified_identifier("newdata", + NameStr(attr->attname)); + rightop = quote_qualified_identifier("mv", + NameStr(attr->attname)); + + generate_operator_clause(&querybuf, + leftop, attrtype, + op, + rightop, attrtype); + + foundUniqueIndex = true; + } + } + + /* Keep the locks, since we're about to run DML which needs them. */ + index_close(indexRel, NoLock); + } + + list_free(indexoidlist); + + /* + * There must be at least one usable unique index on the matview. + * + * ExecRefreshMatView() checks that after taking the exclusive lock on the + * matview. So at least one unique index is guaranteed to exist here + * because the lock is still being held. (One known exception is if a + * function called as part of refreshing the matview drops the index. + * That's a pretty silly thing to do.) + */ + if (!foundUniqueIndex) + elog(ERROR, "could not find suitable unique index on materialized view"); + + appendStringInfoString(&querybuf, + " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) " + "WHERE newdata.* IS NULL OR mv.* IS NULL " + "ORDER BY tid"); + + /* Populate the temporary "diff" table. */ + if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* + * We have no further use for data from the "full-data" temp table, but we + * must keep it around because its type is referenced from the diff table. + */ + + /* Analyze the diff table. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, "ANALYZE %s", diffname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + OpenMatViewIncrementalMaintenance(); + + /* Deletes must come before inserts; do them first. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY " + "(SELECT diff.tid FROM %s diff " + "WHERE diff.tid IS NOT NULL " + "AND diff.newdata IS NULL)", + matviewname, diffname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* Inserts go last. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "INSERT INTO %s SELECT (diff.newdata).* " + "FROM %s diff WHERE tid IS NULL", + matviewname, diffname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* We're done maintaining the materialized view. */ + CloseMatViewIncrementalMaintenance(); + table_close(tempRel, NoLock); + table_close(matviewRel, NoLock); + + /* Clean up temp tables. */ + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname); + if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + + /* Close SPI context. */ + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); +} + +/* + * Swap the physical files of the target and transient tables, then rebuild + * the target's indexes and throw away the transient table. Security context + * swapping is handled by the called function, so it is not needed here. + */ +static void +refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence) +{ + finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, + RecentXmin, ReadNextMultiXactId(), relpersistence); +} + +/* + * Check whether specified index is usable for match merge. + */ +static bool +is_usable_unique_index(Relation indexRel) +{ + Form_pg_index indexStruct = indexRel->rd_index; + + /* + * Must be unique, valid, immediate, non-partial, and be defined over + * plain user columns (not expressions). We also require it to be a + * btree. Even if we had any other unique index kinds, we'd not know how + * to identify the corresponding equality operator, nor could we be sure + * that the planner could implement the required FULL JOIN with non-btree + * operators. + */ + if (indexStruct->indisunique && + indexStruct->indimmediate && + indexRel->rd_rel->relam == BTREE_AM_OID && + indexStruct->indisvalid && + RelationGetIndexPredicate(indexRel) == NIL && + indexStruct->indnatts > 0) + { + /* + * The point of groveling through the index columns individually is to + * reject both index expressions and system columns. Currently, + * matviews couldn't have OID columns so there's no way to create an + * index on a system column; but maybe someday that wouldn't be true, + * so let's be safe. + */ + int numatts = indexStruct->indnatts; + int i; + + for (i = 0; i < numatts; i++) + { + int attnum = indexStruct->indkey.values[i]; + + if (attnum <= 0) + return false; + } + return true; + } + return false; +} + + +/* + * This should be used to test whether the backend is in a context where it is + * OK to allow DML statements to modify materialized views. We only want to + * allow that for internal code driven by the materialized view definition, + * not for arbitrary user-supplied code. + * + * While the function names reflect the fact that their main intended use is + * incremental maintenance of materialized views (in response to changes to + * the data in referenced relations), they are initially used to allow REFRESH + * without blocking concurrent reads. + */ +bool +MatViewIncrementalMaintenanceIsEnabled(void) +{ + return matview_maintenance_depth > 0; +} + +static void +OpenMatViewIncrementalMaintenance(void) +{ + matview_maintenance_depth++; +} + +static void +CloseMatViewIncrementalMaintenance(void) +{ + matview_maintenance_depth--; + Assert(matview_maintenance_depth >= 0); +} |