/*------------------------------------------------------------------------- * * matview.c * materialized view support * * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * src/backend/commands/matview.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/genam.h" #include "access/heapam.h" #include "access/htup_details.h" #include "access/multixact.h" #include "access/tableam.h" #include "access/xact.h" #include "access/xlog.h" #include "catalog/catalog.h" #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_am.h" #include "catalog/pg_opclass.h" #include "catalog/pg_operator.h" #include "commands/cluster.h" #include "commands/matview.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" #include "executor/executor.h" #include "executor/spi.h" #include "miscadmin.h" #include "parser/parse_relation.h" #include "pgstat.h" #include "rewrite/rewriteHandler.h" #include "storage/lmgr.h" #include "storage/smgr.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/snapmgr.h" #include "utils/syscache.h" typedef struct { DestReceiver pub; /* publicly-known function pointers */ Oid transientoid; /* OID of new heap into which to store */ /* These fields are filled by transientrel_startup: */ Relation transientrel; /* relation to write to */ CommandId output_cid; /* cmin to insert in output tuples */ int ti_options; /* table_tuple_insert performance options */ BulkInsertState bistate; /* bulk insert state */ } DR_transientrel; static int matview_maintenance_depth = 0; static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo); static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self); static void transientrel_shutdown(DestReceiver *self); static void transientrel_destroy(DestReceiver *self); static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query, const char *queryString); static char *make_temptable_name_n(char *tempname, int n); static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, int save_sec_context); static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence); static bool is_usable_unique_index(Relation indexRel); static void OpenMatViewIncrementalMaintenance(void); static void CloseMatViewIncrementalMaintenance(void); /* * SetMatViewPopulatedState * Mark a materialized view as populated, or not. * * NOTE: caller must be holding an appropriate lock on the relation. */ void SetMatViewPopulatedState(Relation relation, bool newstate) { Relation pgrel; HeapTuple tuple; Assert(relation->rd_rel->relkind == RELKIND_MATVIEW); /* * Update relation's pg_class entry. Crucial side-effect: other backends * (and this one too!) are sent SI message to make them rebuild relcache * entries. */ pgrel = table_open(RelationRelationId, RowExclusiveLock); tuple = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(RelationGetRelid(relation))); if (!HeapTupleIsValid(tuple)) elog(ERROR, "cache lookup failed for relation %u", RelationGetRelid(relation)); ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate; CatalogTupleUpdate(pgrel, &tuple->t_self, tuple); heap_freetuple(tuple); table_close(pgrel, RowExclusiveLock); /* * Advance command counter to make the updated pg_class row locally * visible. */ CommandCounterIncrement(); } /* * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command * * This refreshes the materialized view by creating a new table and swapping * the relfilenodes of the new table and the old materialized view, so the OID * of the original materialized view is preserved. Thus we do not lose GRANT * nor references to this materialized view. * * If WITH NO DATA was specified, this is effectively like a TRUNCATE; * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT * statement associated with the materialized view. The statement node's * skipData field shows whether the clause was used. * * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading * the new heap, it's better to create the indexes afterwards than to fill them * incrementally while we load. * * The matview's "populated" state is changed based on whether the contents * reflect the result set of the materialized view's query. */ ObjectAddress ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString, ParamListInfo params, QueryCompletion *qc) { Oid matviewOid; Relation matviewRel; RewriteRule *rule; List *actions; Query *dataQuery; Oid tableSpace; Oid relowner; Oid OIDNewHeap; DestReceiver *dest; uint64 processed = 0; bool concurrent; LOCKMODE lockmode; char relpersistence; Oid save_userid; int save_sec_context; int save_nestlevel; ObjectAddress address; /* Determine strength of lock needed. */ concurrent = stmt->concurrent; lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock; /* * Get a lock until end of transaction. */ matviewOid = RangeVarGetRelidExtended(stmt->relation, lockmode, 0, RangeVarCallbackOwnsTable, NULL); matviewRel = table_open(matviewOid, NoLock); relowner = matviewRel->rd_rel->relowner; /* * Switch to the owner's userid, so that any functions are run as that * user. Also lock down security-restricted operations and arrange to * make GUC variable changes local to this command. */ GetUserIdAndSecContext(&save_userid, &save_sec_context); SetUserIdAndSecContext(relowner, save_sec_context | SECURITY_RESTRICTED_OPERATION); save_nestlevel = NewGUCNestLevel(); /* Make sure it is a materialized view. */ if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("\"%s\" is not a materialized view", RelationGetRelationName(matviewRel)))); /* Check that CONCURRENTLY is not specified if not populated. */ if (concurrent && !RelationIsPopulated(matviewRel)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CONCURRENTLY cannot be used when the materialized view is not populated"))); /* Check that conflicting options have not been specified. */ if (concurrent && stmt->skipData) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s options cannot be used together", "CONCURRENTLY", "WITH NO DATA"))); /* * Check that everything is correct for a refresh. Problems at this point * are internal errors, so elog is sufficient. */ if (matviewRel->rd_rel->relhasrules == false || matviewRel->rd_rules->numLocks < 1) elog(ERROR, "materialized view \"%s\" is missing rewrite information", RelationGetRelationName(matviewRel)); if (matviewRel->rd_rules->numLocks > 1) elog(ERROR, "materialized view \"%s\" has too many rules", RelationGetRelationName(matviewRel)); rule = matviewRel->rd_rules->rules[0]; if (rule->event != CMD_SELECT || !(rule->isInstead)) elog(ERROR, "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule", RelationGetRelationName(matviewRel)); actions = rule->actions; if (list_length(actions) != 1) elog(ERROR, "the rule for materialized view \"%s\" is not a single action", RelationGetRelationName(matviewRel)); /* * Check that there is a unique index with no WHERE clause on one or more * columns of the materialized view if CONCURRENTLY is specified. */ if (concurrent) { List *indexoidlist = RelationGetIndexList(matviewRel); ListCell *indexoidscan; bool hasUniqueIndex = false; foreach(indexoidscan, indexoidlist) { Oid indexoid = lfirst_oid(indexoidscan); Relation indexRel; indexRel = index_open(indexoid, AccessShareLock); hasUniqueIndex = is_usable_unique_index(indexRel); index_close(indexRel, AccessShareLock); if (hasUniqueIndex) break; } list_free(indexoidlist); if (!hasUniqueIndex) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot refresh materialized view \"%s\" concurrently", quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)), RelationGetRelationName(matviewRel))), errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view."))); } /* * The stored query was rewritten at the time of the MV definition, but * has not been scribbled on by the planner. */ dataQuery = linitial_node(Query, actions); /* * Check for active uses of the relation in the current transaction, such * as open scans. * * NB: We count on this to protect us against problems with refreshing the * data using TABLE_INSERT_FROZEN. */ CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW"); /* * Tentatively mark the matview as populated or not (this will roll back * if we fail later). */ SetMatViewPopulatedState(matviewRel, !stmt->skipData); /* Concurrent refresh builds new data in temp tablespace, and does diff. */ if (concurrent) { tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false); relpersistence = RELPERSISTENCE_TEMP; } else { tableSpace = matviewRel->rd_rel->reltablespace; relpersistence = matviewRel->rd_rel->relpersistence; } /* * Create the transient table that will receive the regenerated data. Lock * it against access by any other process until commit (by which time it * will be gone). */ OIDNewHeap = make_new_heap(matviewOid, tableSpace, relpersistence, ExclusiveLock); LockRelationOid(OIDNewHeap, AccessExclusiveLock); dest = CreateTransientRelDestReceiver(OIDNewHeap); /* Generate the data, if wanted. */ if (!stmt->skipData) processed = refresh_matview_datafill(dest, dataQuery, queryString); /* Make the matview match the newly generated data. */ if (concurrent) { int old_depth = matview_maintenance_depth; PG_TRY(); { refresh_by_match_merge(matviewOid, OIDNewHeap, relowner, save_sec_context); } PG_CATCH(); { matview_maintenance_depth = old_depth; PG_RE_THROW(); } PG_END_TRY(); Assert(matview_maintenance_depth == old_depth); } else { refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence); /* * Inform stats collector about our activity: basically, we truncated * the matview and inserted some new data. (The concurrent code path * above doesn't need to worry about this because the inserts and * deletes it issues get counted by lower-level code.) */ pgstat_count_truncate(matviewRel); if (!stmt->skipData) pgstat_count_heap_insert(matviewRel, processed); } table_close(matviewRel, NoLock); /* Roll back any GUC changes */ AtEOXact_GUC(false, save_nestlevel); /* Restore userid and security context */ SetUserIdAndSecContext(save_userid, save_sec_context); ObjectAddressSet(address, RelationRelationId, matviewOid); /* * Save the rowcount so that pg_stat_statements can track the total number * of rows processed by REFRESH MATERIALIZED VIEW command. Note that we * still don't display the rowcount in the command completion tag output, * i.e., the display_rowcount flag of CMDTAG_REFRESH_MATERIALIZED_VIEW * command tag is left false in cmdtaglist.h. Otherwise, the change of * completion tag output might break applications using it. */ if (qc) SetQueryCompletion(qc, CMDTAG_REFRESH_MATERIALIZED_VIEW, processed); return address; } /* * refresh_matview_datafill * * Execute the given query, sending result rows to "dest" (which will * insert them into the target matview). * * Returns number of rows inserted. */ static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query, const char *queryString) { List *rewritten; PlannedStmt *plan; QueryDesc *queryDesc; Query *copied_query; uint64 processed; /* Lock and rewrite, using a copy to preserve the original query. */ copied_query = copyObject(query); AcquireRewriteLocks(copied_query, true, false); rewritten = QueryRewrite(copied_query); /* SELECT should never rewrite to more or less than one SELECT query */ if (list_length(rewritten) != 1) elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW"); query = (Query *) linitial(rewritten); /* Check for user-requested abort. */ CHECK_FOR_INTERRUPTS(); /* Plan the query which will generate data for the refresh. */ plan = pg_plan_query(query, queryString, CURSOR_OPT_PARALLEL_OK, NULL); /* * Use a snapshot with an updated command ID to ensure this query sees * results of any previously executed queries. (This could only matter if * the planner executed an allegedly-stable function that changed the * database contents, but let's do it anyway to be safe.) */ PushCopiedSnapshot(GetActiveSnapshot()); UpdateActiveSnapshotCommandId(); /* Create a QueryDesc, redirecting output to our tuple receiver */ queryDesc = CreateQueryDesc(plan, queryString, GetActiveSnapshot(), InvalidSnapshot, dest, NULL, NULL, 0); /* call ExecutorStart to prepare the plan for execution */ ExecutorStart(queryDesc, 0); /* run the plan */ ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); processed = queryDesc->estate->es_processed; /* and clean up */ ExecutorFinish(queryDesc); ExecutorEnd(queryDesc); FreeQueryDesc(queryDesc); PopActiveSnapshot(); return processed; } DestReceiver * CreateTransientRelDestReceiver(Oid transientoid) { DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel)); self->pub.receiveSlot = transientrel_receive; self->pub.rStartup = transientrel_startup; self->pub.rShutdown = transientrel_shutdown; self->pub.rDestroy = transientrel_destroy; self->pub.mydest = DestTransientRel; self->transientoid = transientoid; return (DestReceiver *) self; } /* * transientrel_startup --- executor startup */ static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) { DR_transientrel *myState = (DR_transientrel *) self; Relation transientrel; transientrel = table_open(myState->transientoid, NoLock); /* * Fill private fields of myState for use by later routines */ myState->transientrel = transientrel; myState->output_cid = GetCurrentCommandId(true); myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN; myState->bistate = GetBulkInsertState(); /* * Valid smgr_targblock implies something already wrote to the relation. * This may be harmless, but this function hasn't planned for it. */ Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber); } /* * transientrel_receive --- receive one tuple */ static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; /* * Note that the input slot might not be of the type of the target * relation. That's supported by table_tuple_insert(), but slightly less * efficient than inserting with the right slot - but the alternative * would be to copy into a slot of the right type, which would not be * cheap either. This also doesn't allow accessing per-AM data (say a * tuple's xmin), but since we don't do that here... */ table_tuple_insert(myState->transientrel, slot, myState->output_cid, myState->ti_options, myState->bistate); /* We know this is a newly created relation, so there are no indexes */ return true; } /* * transientrel_shutdown --- executor end */ static void transientrel_shutdown(DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; FreeBulkInsertState(myState->bistate); table_finish_bulk_insert(myState->transientrel, myState->ti_options); /* close transientrel, but keep lock until commit */ table_close(myState->transientrel, NoLock); myState->transientrel = NULL; } /* * transientrel_destroy --- release DestReceiver object */ static void transientrel_destroy(DestReceiver *self) { pfree(self); } /* * Given a qualified temporary table name, append an underscore followed by * the given integer, to make a new table name based on the old one. * The result is a palloc'd string. * * As coded, this would fail to make a valid SQL name if the given name were, * say, "FOO"."BAR". Currently, the table name portion of the input will * never be double-quoted because it's of the form "pg_temp_NNN", cf * make_new_heap(). But we might have to work harder someday. */ static char * make_temptable_name_n(char *tempname, int n) { StringInfoData namebuf; initStringInfo(&namebuf); appendStringInfoString(&namebuf, tempname); appendStringInfo(&namebuf, "_%d", n); return namebuf.data; } /* * refresh_by_match_merge * * Refresh a materialized view with transactional semantics, while allowing * concurrent reads. * * This is called after a new version of the data has been created in a * temporary table. It performs a full outer join against the old version of * the data, producing "diff" results. This join cannot work if there are any * duplicated rows in either the old or new versions, in the sense that every * column would compare as equal between the two rows. It does work correctly * in the face of rows which have at least one NULL value, with all non-NULL * columns equal. The behavior of NULLs on equality tests and on UNIQUE * indexes turns out to be quite convenient here; the tests we need to make * are consistent with default behavior. If there is at least one UNIQUE * index on the materialized view, we have exactly the guarantee we need. * * The temporary table used to hold the diff results contains just the TID of * the old record (if matched) and the ROW from the new table as a single * column of complex record type (if matched). * * Once we have the diff table, we perform set-based DELETE and INSERT * operations against the materialized view, and discard both temporary * tables. * * Everything from the generation of the new data to applying the differences * takes place under cover of an ExclusiveLock, since it seems as though we * would want to prohibit not only concurrent REFRESH operations, but also * incremental maintenance. It also doesn't seem reasonable or safe to allow * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by * this command. */ static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, int save_sec_context) { StringInfoData querybuf; Relation matviewRel; Relation tempRel; char *matviewname; char *tempname; char *diffname; TupleDesc tupdesc; bool foundUniqueIndex; List *indexoidlist; ListCell *indexoidscan; int16 relnatts; Oid *opUsedForQual; initStringInfo(&querybuf); matviewRel = table_open(matviewOid, NoLock); matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)), RelationGetRelationName(matviewRel)); tempRel = table_open(tempOid, NoLock); tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)), RelationGetRelationName(tempRel)); diffname = make_temptable_name_n(tempname, 2); relnatts = RelationGetNumberOfAttributes(matviewRel); /* Open SPI context. */ if (SPI_connect() != SPI_OK_CONNECT) elog(ERROR, "SPI_connect failed"); /* Analyze the temp table with the new contents. */ appendStringInfo(&querybuf, "ANALYZE %s", tempname); if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) elog(ERROR, "SPI_exec failed: %s", querybuf.data); /* * We need to ensure that there are not duplicate rows without NULLs in * the new data set before we can count on the "diff" results. Check for * that in a way that allows showing the first duplicated row found. Even * after we pass this test, a unique index on the materialized view may * find a duplicate key problem. * * Note: here and below, we use "tablename.*::tablerowtype" as a hack to * keep ".*" from being expanded into multiple columns in a SELECT list. * Compare ruleutils.c's get_variable(). */ resetStringInfo(&querybuf); appendStringInfo(&querybuf, "SELECT newdata.*::%s FROM %s newdata " "WHERE newdata.* IS NOT NULL AND EXISTS " "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL " "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* " "AND newdata2.ctid OPERATOR(pg_catalog.<>) " "newdata.ctid)", tempname, tempname, tempname); if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT) elog(ERROR, "SPI_exec failed: %s", querybuf.data); if (SPI_processed > 0) { /* * Note that this ereport() is returning data to the user. Generally, * we would want to make sure that the user has been granted access to * this data. However, REFRESH MAT VIEW is only able to be run by the * owner of the mat view (or a superuser) and therefore there is no * need to check for access to data in the mat view. */ ereport(ERROR, (errcode(ERRCODE_CARDINALITY_VIOLATION), errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns", RelationGetRelationName(matviewRel)), errdetail("Row: %s", SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1)))); } SetUserIdAndSecContext(relowner, save_sec_context | SECURITY_LOCAL_USERID_CHANGE); /* Start building the query for creating the diff table. */ resetStringInfo(&querybuf); appendStringInfo(&querybuf, "CREATE TEMP TABLE %s AS " "SELECT mv.ctid AS tid, newdata.*::%s AS newdata " "FROM %s mv FULL JOIN %s newdata ON (", diffname, tempname, matviewname, tempname); /* * Get the list of index OIDs for the table from the relcache, and look up * each one in the pg_index syscache. We will test for equality on all * columns present in all unique indexes which only reference columns and * include all rows. */ tupdesc = matviewRel->rd_att; opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts); foundUniqueIndex = false; indexoidlist = RelationGetIndexList(matviewRel); foreach(indexoidscan, indexoidlist) { Oid indexoid = lfirst_oid(indexoidscan); Relation indexRel; indexRel = index_open(indexoid, RowExclusiveLock); if (is_usable_unique_index(indexRel)) { Form_pg_index indexStruct = indexRel->rd_index; int indnkeyatts = indexStruct->indnkeyatts; oidvector *indclass; Datum indclassDatum; bool isnull; int i; /* Must get indclass the hard way. */ indclassDatum = SysCacheGetAttr(INDEXRELID, indexRel->rd_indextuple, Anum_pg_index_indclass, &isnull); Assert(!isnull); indclass = (oidvector *) DatumGetPointer(indclassDatum); /* Add quals for all columns from this index. */ for (i = 0; i < indnkeyatts; i++) { int attnum = indexStruct->indkey.values[i]; Oid opclass = indclass->values[i]; Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); Oid attrtype = attr->atttypid; HeapTuple cla_ht; Form_pg_opclass cla_tup; Oid opfamily; Oid opcintype; Oid op; const char *leftop; const char *rightop; /* * Identify the equality operator associated with this index * column. First we need to look up the column's opclass. */ cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass)); if (!HeapTupleIsValid(cla_ht)) elog(ERROR, "cache lookup failed for opclass %u", opclass); cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht); Assert(cla_tup->opcmethod == BTREE_AM_OID); opfamily = cla_tup->opcfamily; opcintype = cla_tup->opcintype; ReleaseSysCache(cla_ht); op = get_opfamily_member(opfamily, opcintype, opcintype, BTEqualStrategyNumber); if (!OidIsValid(op)) elog(ERROR, "missing operator %d(%u,%u) in opfamily %u", BTEqualStrategyNumber, opcintype, opcintype, opfamily); /* * If we find the same column with the same equality semantics * in more than one index, we only need to emit the equality * clause once. * * Since we only remember the last equality operator, this * code could be fooled into emitting duplicate clauses given * multiple indexes with several different opclasses ... but * that's so unlikely it doesn't seem worth spending extra * code to avoid. */ if (opUsedForQual[attnum - 1] == op) continue; opUsedForQual[attnum - 1] = op; /* * Actually add the qual, ANDed with any others. */ if (foundUniqueIndex) appendStringInfoString(&querybuf, " AND "); leftop = quote_qualified_identifier("newdata", NameStr(attr->attname)); rightop = quote_qualified_identifier("mv", NameStr(attr->attname)); generate_operator_clause(&querybuf, leftop, attrtype, op, rightop, attrtype); foundUniqueIndex = true; } } /* Keep the locks, since we're about to run DML which needs them. */ index_close(indexRel, NoLock); } list_free(indexoidlist); /* * There must be at least one usable unique index on the matview. * * ExecRefreshMatView() checks that after taking the exclusive lock on the * matview. So at least one unique index is guaranteed to exist here * because the lock is still being held; so an Assert seems sufficient. */ Assert(foundUniqueIndex); appendStringInfoString(&querybuf, " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) " "WHERE newdata.* IS NULL OR mv.* IS NULL " "ORDER BY tid"); /* Create the temporary "diff" table. */ if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) elog(ERROR, "SPI_exec failed: %s", querybuf.data); SetUserIdAndSecContext(relowner, save_sec_context | SECURITY_RESTRICTED_OPERATION); /* * We have no further use for data from the "full-data" temp table, but we * must keep it around because its type is referenced from the diff table. */ /* Analyze the diff table. */ resetStringInfo(&querybuf); appendStringInfo(&querybuf, "ANALYZE %s", diffname); if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) elog(ERROR, "SPI_exec failed: %s", querybuf.data); OpenMatViewIncrementalMaintenance(); /* Deletes must come before inserts; do them first. */ resetStringInfo(&querybuf); appendStringInfo(&querybuf, "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY " "(SELECT diff.tid FROM %s diff " "WHERE diff.tid IS NOT NULL " "AND diff.newdata IS NULL)", matviewname, diffname); if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE) elog(ERROR, "SPI_exec failed: %s", querybuf.data); /* Inserts go last. */ resetStringInfo(&querybuf); appendStringInfo(&querybuf, "INSERT INTO %s SELECT (diff.newdata).* " "FROM %s diff WHERE tid IS NULL", matviewname, diffname); if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT) elog(ERROR, "SPI_exec failed: %s", querybuf.data); /* We're done maintaining the materialized view. */ CloseMatViewIncrementalMaintenance(); table_close(tempRel, NoLock); table_close(matviewRel, NoLock); /* Clean up temp tables. */ resetStringInfo(&querybuf); appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname); if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY) elog(ERROR, "SPI_exec failed: %s", querybuf.data); /* Close SPI context. */ if (SPI_finish() != SPI_OK_FINISH) elog(ERROR, "SPI_finish failed"); } /* * Swap the physical files of the target and transient tables, then rebuild * the target's indexes and throw away the transient table. Security context * swapping is handled by the called function, so it is not needed here. */ static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence) { finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, RecentXmin, ReadNextMultiXactId(), relpersistence); } /* * Check whether specified index is usable for match merge. */ static bool is_usable_unique_index(Relation indexRel) { Form_pg_index indexStruct = indexRel->rd_index; /* * Must be unique, valid, immediate, non-partial, and be defined over * plain user columns (not expressions). We also require it to be a * btree. Even if we had any other unique index kinds, we'd not know how * to identify the corresponding equality operator, nor could we be sure * that the planner could implement the required FULL JOIN with non-btree * operators. */ if (indexStruct->indisunique && indexStruct->indimmediate && indexRel->rd_rel->relam == BTREE_AM_OID && indexStruct->indisvalid && RelationGetIndexPredicate(indexRel) == NIL && indexStruct->indnatts > 0) { /* * The point of groveling through the index columns individually is to * reject both index expressions and system columns. Currently, * matviews couldn't have OID columns so there's no way to create an * index on a system column; but maybe someday that wouldn't be true, * so let's be safe. */ int numatts = indexStruct->indnatts; int i; for (i = 0; i < numatts; i++) { int attnum = indexStruct->indkey.values[i]; if (attnum <= 0) return false; } return true; } return false; } /* * This should be used to test whether the backend is in a context where it is * OK to allow DML statements to modify materialized views. We only want to * allow that for internal code driven by the materialized view definition, * not for arbitrary user-supplied code. * * While the function names reflect the fact that their main intended use is * incremental maintenance of materialized views (in response to changes to * the data in referenced relations), they are initially used to allow REFRESH * without blocking concurrent reads. */ bool MatViewIncrementalMaintenanceIsEnabled(void) { return matview_maintenance_depth > 0; } static void OpenMatViewIncrementalMaintenance(void) { matview_maintenance_depth++; } static void CloseMatViewIncrementalMaintenance(void) { matview_maintenance_depth--; Assert(matview_maintenance_depth >= 0); }