diff options
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 7605 |
1 files changed, 7605 insertions, 0 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c new file mode 100644 index 0000000..b08b314 --- /dev/null +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -0,0 +1,7605 @@ +/*------------------------------------------------------------------------- + * + * postgres_fdw.c + * Foreign-data wrapper for remote PostgreSQL servers + * + * Portions Copyright (c) 2012-2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/postgres_fdw/postgres_fdw.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <limits.h> + +#include "access/htup_details.h" +#include "access/sysattr.h" +#include "access/table.h" +#include "catalog/pg_class.h" +#include "catalog/pg_opfamily.h" +#include "commands/defrem.h" +#include "commands/explain.h" +#include "commands/vacuum.h" +#include "executor/execAsync.h" +#include "foreign/fdwapi.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "optimizer/appendinfo.h" +#include "optimizer/clauses.h" +#include "optimizer/cost.h" +#include "optimizer/inherit.h" +#include "optimizer/optimizer.h" +#include "optimizer/pathnode.h" +#include "optimizer/paths.h" +#include "optimizer/planmain.h" +#include "optimizer/prep.h" +#include "optimizer/restrictinfo.h" +#include "optimizer/tlist.h" +#include "parser/parsetree.h" +#include "postgres_fdw.h" +#include "storage/latch.h" +#include "utils/builtins.h" +#include "utils/float.h" +#include "utils/guc.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/rel.h" +#include "utils/sampling.h" +#include "utils/selfuncs.h" + +PG_MODULE_MAGIC; + +/* Default CPU cost to start up a foreign query. */ +#define DEFAULT_FDW_STARTUP_COST 100.0 + +/* Default CPU cost to process 1 row (above and beyond cpu_tuple_cost). */ +#define DEFAULT_FDW_TUPLE_COST 0.01 + +/* If no remote estimates, assume a sort costs 20% extra */ +#define DEFAULT_FDW_SORT_MULTIPLIER 1.2 + +/* + * Indexes of FDW-private information stored in fdw_private lists. + * + * These items are indexed with the enum FdwScanPrivateIndex, so an item + * can be fetched with list_nth(). For example, to get the SELECT statement: + * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); + */ +enum FdwScanPrivateIndex +{ + /* SQL statement to execute remotely (as a String node) */ + FdwScanPrivateSelectSql, + /* Integer list of attribute numbers retrieved by the SELECT */ + FdwScanPrivateRetrievedAttrs, + /* Integer representing the desired fetch_size */ + FdwScanPrivateFetchSize, + + /* + * String describing join i.e. names of relations being joined and types + * of join, added when the scan is join + */ + FdwScanPrivateRelations +}; + +/* + * Similarly, this enum describes what's kept in the fdw_private list for + * a ModifyTable node referencing a postgres_fdw foreign table. We store: + * + * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server + * 2) Integer list of target attribute numbers for INSERT/UPDATE + * (NIL for a DELETE) + * 3) Length till the end of VALUES clause for INSERT + * (-1 for a DELETE/UPDATE) + * 4) Boolean flag showing if the remote query has a RETURNING clause + * 5) Integer list of attribute numbers retrieved by RETURNING, if any + */ +enum FdwModifyPrivateIndex +{ + /* SQL statement to execute remotely (as a String node) */ + FdwModifyPrivateUpdateSql, + /* Integer list of target attribute numbers for INSERT/UPDATE */ + FdwModifyPrivateTargetAttnums, + /* Length till the end of VALUES clause (as an Integer node) */ + FdwModifyPrivateLen, + /* has-returning flag (as a Boolean node) */ + FdwModifyPrivateHasReturning, + /* Integer list of attribute numbers retrieved by RETURNING */ + FdwModifyPrivateRetrievedAttrs +}; + +/* + * Similarly, this enum describes what's kept in the fdw_private list for + * a ForeignScan node that modifies a foreign table directly. We store: + * + * 1) UPDATE/DELETE statement text to be sent to the remote server + * 2) Boolean flag showing if the remote query has a RETURNING clause + * 3) Integer list of attribute numbers retrieved by RETURNING, if any + * 4) Boolean flag showing if we set the command es_processed + */ +enum FdwDirectModifyPrivateIndex +{ + /* SQL statement to execute remotely (as a String node) */ + FdwDirectModifyPrivateUpdateSql, + /* has-returning flag (as a Boolean node) */ + FdwDirectModifyPrivateHasReturning, + /* Integer list of attribute numbers retrieved by RETURNING */ + FdwDirectModifyPrivateRetrievedAttrs, + /* set-processed flag (as a Boolean node) */ + FdwDirectModifyPrivateSetProcessed +}; + +/* + * Execution state of a foreign scan using postgres_fdw. + */ +typedef struct PgFdwScanState +{ + Relation rel; /* relcache entry for the foreign table. NULL + * for a foreign join scan. */ + TupleDesc tupdesc; /* tuple descriptor of scan */ + AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ + + /* extracted fdw_private data */ + char *query; /* text of SELECT command */ + List *retrieved_attrs; /* list of retrieved attribute numbers */ + + /* for remote query execution */ + PGconn *conn; /* connection for the scan */ + PgFdwConnState *conn_state; /* extra per-connection state */ + unsigned int cursor_number; /* quasi-unique ID for my cursor */ + bool cursor_exists; /* have we created the cursor? */ + int numParams; /* number of parameters passed to query */ + FmgrInfo *param_flinfo; /* output conversion functions for them */ + List *param_exprs; /* executable expressions for param values */ + const char **param_values; /* textual values of query parameters */ + + /* for storing result tuples */ + HeapTuple *tuples; /* array of currently-retrieved tuples */ + int num_tuples; /* # of tuples in array */ + int next_tuple; /* index of next one to return */ + + /* batch-level state, for optimizing rewinds and avoiding useless fetch */ + int fetch_ct_2; /* Min(# of fetches done, 2) */ + bool eof_reached; /* true if last fetch reached EOF */ + + /* for asynchronous execution */ + bool async_capable; /* engage asynchronous-capable logic? */ + + /* working memory contexts */ + MemoryContext batch_cxt; /* context holding current batch of tuples */ + MemoryContext temp_cxt; /* context for per-tuple temporary data */ + + int fetch_size; /* number of tuples per fetch */ +} PgFdwScanState; + +/* + * Execution state of a foreign insert/update/delete operation. + */ +typedef struct PgFdwModifyState +{ + Relation rel; /* relcache entry for the foreign table */ + AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ + + /* for remote query execution */ + PGconn *conn; /* connection for the scan */ + PgFdwConnState *conn_state; /* extra per-connection state */ + char *p_name; /* name of prepared statement, if created */ + + /* extracted fdw_private data */ + char *query; /* text of INSERT/UPDATE/DELETE command */ + char *orig_query; /* original text of INSERT command */ + List *target_attrs; /* list of target attribute numbers */ + int values_end; /* length up to the end of VALUES */ + int batch_size; /* value of FDW option "batch_size" */ + bool has_returning; /* is there a RETURNING clause? */ + List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ + + /* info about parameters for prepared statement */ + AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ + int p_nums; /* number of parameters to transmit */ + FmgrInfo *p_flinfo; /* output conversion functions for them */ + + /* batch operation stuff */ + int num_slots; /* number of slots to insert */ + + /* working memory context */ + MemoryContext temp_cxt; /* context for per-tuple temporary data */ + + /* for update row movement if subplan result rel */ + struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if + * created */ +} PgFdwModifyState; + +/* + * Execution state of a foreign scan that modifies a foreign table directly. + */ +typedef struct PgFdwDirectModifyState +{ + Relation rel; /* relcache entry for the foreign table */ + AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ + + /* extracted fdw_private data */ + char *query; /* text of UPDATE/DELETE command */ + bool has_returning; /* is there a RETURNING clause? */ + List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ + bool set_processed; /* do we set the command es_processed? */ + + /* for remote query execution */ + PGconn *conn; /* connection for the update */ + PgFdwConnState *conn_state; /* extra per-connection state */ + int numParams; /* number of parameters passed to query */ + FmgrInfo *param_flinfo; /* output conversion functions for them */ + List *param_exprs; /* executable expressions for param values */ + const char **param_values; /* textual values of query parameters */ + + /* for storing result tuples */ + PGresult *result; /* result for query */ + int num_tuples; /* # of result tuples */ + int next_tuple; /* index of next one to return */ + Relation resultRel; /* relcache entry for the target relation */ + AttrNumber *attnoMap; /* array of attnums of input user columns */ + AttrNumber ctidAttno; /* attnum of input ctid column */ + AttrNumber oidAttno; /* attnum of input oid column */ + bool hasSystemCols; /* are there system columns of resultRel? */ + + /* working memory context */ + MemoryContext temp_cxt; /* context for per-tuple temporary data */ +} PgFdwDirectModifyState; + +/* + * Workspace for analyzing a foreign table. + */ +typedef struct PgFdwAnalyzeState +{ + Relation rel; /* relcache entry for the foreign table */ + AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ + List *retrieved_attrs; /* attr numbers retrieved by query */ + + /* collected sample rows */ + HeapTuple *rows; /* array of size targrows */ + int targrows; /* target # of sample rows */ + int numrows; /* # of sample rows collected */ + + /* for random sampling */ + double samplerows; /* # of rows fetched */ + double rowstoskip; /* # of rows to skip before next sample */ + ReservoirStateData rstate; /* state for reservoir sampling */ + + /* working memory contexts */ + MemoryContext anl_cxt; /* context for per-analyze lifespan data */ + MemoryContext temp_cxt; /* context for per-tuple temporary data */ +} PgFdwAnalyzeState; + +/* + * This enum describes what's kept in the fdw_private list for a ForeignPath. + * We store: + * + * 1) Boolean flag showing if the remote query has the final sort + * 2) Boolean flag showing if the remote query has the LIMIT clause + */ +enum FdwPathPrivateIndex +{ + /* has-final-sort flag (as a Boolean node) */ + FdwPathPrivateHasFinalSort, + /* has-limit flag (as a Boolean node) */ + FdwPathPrivateHasLimit +}; + +/* Struct for extra information passed to estimate_path_cost_size() */ +typedef struct +{ + PathTarget *target; + bool has_final_sort; + bool has_limit; + double limit_tuples; + int64 count_est; + int64 offset_est; +} PgFdwPathExtraData; + +/* + * Identify the attribute where data conversion fails. + */ +typedef struct ConversionLocation +{ + AttrNumber cur_attno; /* attribute number being processed, or 0 */ + Relation rel; /* foreign table being processed, or NULL */ + ForeignScanState *fsstate; /* plan node being processed, or NULL */ +} ConversionLocation; + +/* Callback argument for ec_member_matches_foreign */ +typedef struct +{ + Expr *current; /* current expr, or NULL if not yet found */ + List *already_used; /* expressions already dealt with */ +} ec_member_foreign_arg; + +/* + * SQL functions + */ +PG_FUNCTION_INFO_V1(postgres_fdw_handler); + +/* + * FDW callback routines + */ +static void postgresGetForeignRelSize(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid); +static void postgresGetForeignPaths(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid); +static ForeignScan *postgresGetForeignPlan(PlannerInfo *root, + RelOptInfo *foreignrel, + Oid foreigntableid, + ForeignPath *best_path, + List *tlist, + List *scan_clauses, + Plan *outer_plan); +static void postgresBeginForeignScan(ForeignScanState *node, int eflags); +static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node); +static void postgresReScanForeignScan(ForeignScanState *node); +static void postgresEndForeignScan(ForeignScanState *node); +static void postgresAddForeignUpdateTargets(PlannerInfo *root, + Index rtindex, + RangeTblEntry *target_rte, + Relation target_relation); +static List *postgresPlanForeignModify(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index); +static void postgresBeginForeignModify(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + List *fdw_private, + int subplan_index, + int eflags); +static TupleTableSlot *postgresExecForeignInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot); +static TupleTableSlot **postgresExecForeignBatchInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); +static int postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo); +static TupleTableSlot *postgresExecForeignUpdate(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot); +static TupleTableSlot *postgresExecForeignDelete(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot); +static void postgresEndForeignModify(EState *estate, + ResultRelInfo *resultRelInfo); +static void postgresBeginForeignInsert(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo); +static void postgresEndForeignInsert(EState *estate, + ResultRelInfo *resultRelInfo); +static int postgresIsForeignRelUpdatable(Relation rel); +static bool postgresPlanDirectModify(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index); +static void postgresBeginDirectModify(ForeignScanState *node, int eflags); +static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node); +static void postgresEndDirectModify(ForeignScanState *node); +static void postgresExplainForeignScan(ForeignScanState *node, + ExplainState *es); +static void postgresExplainForeignModify(ModifyTableState *mtstate, + ResultRelInfo *rinfo, + List *fdw_private, + int subplan_index, + ExplainState *es); +static void postgresExplainDirectModify(ForeignScanState *node, + ExplainState *es); +static void postgresExecForeignTruncate(List *rels, + DropBehavior behavior, + bool restart_seqs); +static bool postgresAnalyzeForeignTable(Relation relation, + AcquireSampleRowsFunc *func, + BlockNumber *totalpages); +static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, + Oid serverOid); +static void postgresGetForeignJoinPaths(PlannerInfo *root, + RelOptInfo *joinrel, + RelOptInfo *outerrel, + RelOptInfo *innerrel, + JoinType jointype, + JoinPathExtraData *extra); +static bool postgresRecheckForeignScan(ForeignScanState *node, + TupleTableSlot *slot); +static void postgresGetForeignUpperPaths(PlannerInfo *root, + UpperRelationKind stage, + RelOptInfo *input_rel, + RelOptInfo *output_rel, + void *extra); +static bool postgresIsForeignPathAsyncCapable(ForeignPath *path); +static void postgresForeignAsyncRequest(AsyncRequest *areq); +static void postgresForeignAsyncConfigureWait(AsyncRequest *areq); +static void postgresForeignAsyncNotify(AsyncRequest *areq); + +/* + * Helper functions + */ +static void estimate_path_cost_size(PlannerInfo *root, + RelOptInfo *foreignrel, + List *param_join_conds, + List *pathkeys, + PgFdwPathExtraData *fpextra, + double *p_rows, int *p_width, + Cost *p_startup_cost, Cost *p_total_cost); +static void get_remote_estimate(const char *sql, + PGconn *conn, + double *rows, + int *width, + Cost *startup_cost, + Cost *total_cost); +static void adjust_foreign_grouping_path_cost(PlannerInfo *root, + List *pathkeys, + double retrieved_rows, + double width, + double limit_tuples, + Cost *p_startup_cost, + Cost *p_run_cost); +static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, + EquivalenceClass *ec, EquivalenceMember *em, + void *arg); +static void create_cursor(ForeignScanState *node); +static void fetch_more_data(ForeignScanState *node); +static void close_cursor(PGconn *conn, unsigned int cursor_number, + PgFdwConnState *conn_state); +static PgFdwModifyState *create_foreign_modify(EState *estate, + RangeTblEntry *rte, + ResultRelInfo *resultRelInfo, + CmdType operation, + Plan *subplan, + char *query, + List *target_attrs, + int len, + bool has_returning, + List *retrieved_attrs); +static TupleTableSlot **execute_foreign_modify(EState *estate, + ResultRelInfo *resultRelInfo, + CmdType operation, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots); +static void prepare_foreign_modify(PgFdwModifyState *fmstate); +static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, + ItemPointer tupleid, + TupleTableSlot **slots, + int numSlots); +static void store_returning_result(PgFdwModifyState *fmstate, + TupleTableSlot *slot, PGresult *res); +static void finish_foreign_modify(PgFdwModifyState *fmstate); +static void deallocate_query(PgFdwModifyState *fmstate); +static List *build_remote_returning(Index rtindex, Relation rel, + List *returningList); +static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist); +static void execute_dml_stmt(ForeignScanState *node); +static TupleTableSlot *get_returning_data(ForeignScanState *node); +static void init_returning_filter(PgFdwDirectModifyState *dmstate, + List *fdw_scan_tlist, + Index rtindex); +static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + EState *estate); +static void prepare_query_params(PlanState *node, + List *fdw_exprs, + int numParams, + FmgrInfo **param_flinfo, + List **param_exprs, + const char ***param_values); +static void process_query_params(ExprContext *econtext, + FmgrInfo *param_flinfo, + List *param_exprs, + const char **param_values); +static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, + HeapTuple *rows, int targrows, + double *totalrows, + double *totaldeadrows); +static void analyze_row_processor(PGresult *res, int row, + PgFdwAnalyzeState *astate); +static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch); +static void fetch_more_data_begin(AsyncRequest *areq); +static void complete_pending_request(AsyncRequest *areq); +static HeapTuple make_tuple_from_result_row(PGresult *res, + int row, + Relation rel, + AttInMetadata *attinmeta, + List *retrieved_attrs, + ForeignScanState *fsstate, + MemoryContext temp_context); +static void conversion_error_callback(void *arg); +static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, + JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, + JoinPathExtraData *extra); +static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, + Node *havingQual); +static List *get_useful_pathkeys_for_relation(PlannerInfo *root, + RelOptInfo *rel); +static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel); +static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, + Path *epq_path); +static void add_foreign_grouping_paths(PlannerInfo *root, + RelOptInfo *input_rel, + RelOptInfo *grouped_rel, + GroupPathExtraData *extra); +static void add_foreign_ordered_paths(PlannerInfo *root, + RelOptInfo *input_rel, + RelOptInfo *ordered_rel); +static void add_foreign_final_paths(PlannerInfo *root, + RelOptInfo *input_rel, + RelOptInfo *final_rel, + FinalPathExtraData *extra); +static void apply_server_options(PgFdwRelationInfo *fpinfo); +static void apply_table_options(PgFdwRelationInfo *fpinfo); +static void merge_fdw_options(PgFdwRelationInfo *fpinfo, + const PgFdwRelationInfo *fpinfo_o, + const PgFdwRelationInfo *fpinfo_i); +static int get_batch_size_option(Relation rel); + + +/* + * Foreign-data wrapper handler function: return a struct with pointers + * to my callback routines. + */ +Datum +postgres_fdw_handler(PG_FUNCTION_ARGS) +{ + FdwRoutine *routine = makeNode(FdwRoutine); + + /* Functions for scanning foreign tables */ + routine->GetForeignRelSize = postgresGetForeignRelSize; + routine->GetForeignPaths = postgresGetForeignPaths; + routine->GetForeignPlan = postgresGetForeignPlan; + routine->BeginForeignScan = postgresBeginForeignScan; + routine->IterateForeignScan = postgresIterateForeignScan; + routine->ReScanForeignScan = postgresReScanForeignScan; + routine->EndForeignScan = postgresEndForeignScan; + + /* Functions for updating foreign tables */ + routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets; + routine->PlanForeignModify = postgresPlanForeignModify; + routine->BeginForeignModify = postgresBeginForeignModify; + routine->ExecForeignInsert = postgresExecForeignInsert; + routine->ExecForeignBatchInsert = postgresExecForeignBatchInsert; + routine->GetForeignModifyBatchSize = postgresGetForeignModifyBatchSize; + routine->ExecForeignUpdate = postgresExecForeignUpdate; + routine->ExecForeignDelete = postgresExecForeignDelete; + routine->EndForeignModify = postgresEndForeignModify; + routine->BeginForeignInsert = postgresBeginForeignInsert; + routine->EndForeignInsert = postgresEndForeignInsert; + routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable; + routine->PlanDirectModify = postgresPlanDirectModify; + routine->BeginDirectModify = postgresBeginDirectModify; + routine->IterateDirectModify = postgresIterateDirectModify; + routine->EndDirectModify = postgresEndDirectModify; + + /* Function for EvalPlanQual rechecks */ + routine->RecheckForeignScan = postgresRecheckForeignScan; + /* Support functions for EXPLAIN */ + routine->ExplainForeignScan = postgresExplainForeignScan; + routine->ExplainForeignModify = postgresExplainForeignModify; + routine->ExplainDirectModify = postgresExplainDirectModify; + + /* Support function for TRUNCATE */ + routine->ExecForeignTruncate = postgresExecForeignTruncate; + + /* Support functions for ANALYZE */ + routine->AnalyzeForeignTable = postgresAnalyzeForeignTable; + + /* Support functions for IMPORT FOREIGN SCHEMA */ + routine->ImportForeignSchema = postgresImportForeignSchema; + + /* Support functions for join push-down */ + routine->GetForeignJoinPaths = postgresGetForeignJoinPaths; + + /* Support functions for upper relation push-down */ + routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + + /* Support functions for asynchronous execution */ + routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable; + routine->ForeignAsyncRequest = postgresForeignAsyncRequest; + routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait; + routine->ForeignAsyncNotify = postgresForeignAsyncNotify; + + PG_RETURN_POINTER(routine); +} + +/* + * postgresGetForeignRelSize + * Estimate # of rows and width of the result of the scan + * + * We should consider the effect of all baserestrictinfo clauses here, but + * not any join clauses. + */ +static void +postgresGetForeignRelSize(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid) +{ + PgFdwRelationInfo *fpinfo; + ListCell *lc; + RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root); + + /* + * We use PgFdwRelationInfo to pass various information to subsequent + * functions. + */ + fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); + baserel->fdw_private = (void *) fpinfo; + + /* Base foreign tables need to be pushed down always. */ + fpinfo->pushdown_safe = true; + + /* Look up foreign-table catalog info. */ + fpinfo->table = GetForeignTable(foreigntableid); + fpinfo->server = GetForeignServer(fpinfo->table->serverid); + + /* + * Extract user-settable option values. Note that per-table settings of + * use_remote_estimate, fetch_size and async_capable override per-server + * settings of them, respectively. + */ + fpinfo->use_remote_estimate = false; + fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST; + fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST; + fpinfo->shippable_extensions = NIL; + fpinfo->fetch_size = 100; + fpinfo->async_capable = false; + + apply_server_options(fpinfo); + apply_table_options(fpinfo); + + /* + * If the table or the server is configured to use remote estimates, + * identify which user to do remote access as during planning. This + * should match what ExecCheckRTEPerms() does. If we fail due to lack of + * permissions, the query would have failed at runtime anyway. + */ + if (fpinfo->use_remote_estimate) + { + Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + + fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid); + } + else + fpinfo->user = NULL; + + /* + * Identify which baserestrictinfo clauses can be sent to the remote + * server and which can't. + */ + classifyConditions(root, baserel, baserel->baserestrictinfo, + &fpinfo->remote_conds, &fpinfo->local_conds); + + /* + * Identify which attributes will need to be retrieved from the remote + * server. These include all attrs needed for joins or final output, plus + * all attrs used in the local_conds. (Note: if we end up using a + * parameterized scan, it's possible that some of the join clauses will be + * sent to the remote and thus we wouldn't really need to retrieve the + * columns used in them. Doesn't seem worth detecting that case though.) + */ + fpinfo->attrs_used = NULL; + pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid, + &fpinfo->attrs_used); + foreach(lc, fpinfo->local_conds) + { + RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); + + pull_varattnos((Node *) rinfo->clause, baserel->relid, + &fpinfo->attrs_used); + } + + /* + * Compute the selectivity and cost of the local_conds, so we don't have + * to do it over again for each path. The best we can do for these + * conditions is to estimate selectivity on the basis of local statistics. + */ + fpinfo->local_conds_sel = clauselist_selectivity(root, + fpinfo->local_conds, + baserel->relid, + JOIN_INNER, + NULL); + + cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); + + /* + * Set # of retrieved rows and cached relation costs to some negative + * value, so that we can detect when they are set to some sensible values, + * during one (usually the first) of the calls to estimate_path_cost_size. + */ + fpinfo->retrieved_rows = -1; + fpinfo->rel_startup_cost = -1; + fpinfo->rel_total_cost = -1; + + /* + * If the table or the server is configured to use remote estimates, + * connect to the foreign server and execute EXPLAIN to estimate the + * number of rows selected by the restriction clauses, as well as the + * average row width. Otherwise, estimate using whatever statistics we + * have locally, in a way similar to ordinary tables. + */ + if (fpinfo->use_remote_estimate) + { + /* + * Get cost/size estimates with help of remote server. Save the + * values in fpinfo so we don't need to do it again to generate the + * basic foreign path. + */ + estimate_path_cost_size(root, baserel, NIL, NIL, NULL, + &fpinfo->rows, &fpinfo->width, + &fpinfo->startup_cost, &fpinfo->total_cost); + + /* Report estimated baserel size to planner. */ + baserel->rows = fpinfo->rows; + baserel->reltarget->width = fpinfo->width; + } + else + { + /* + * If the foreign table has never been ANALYZEd, it will have + * reltuples < 0, meaning "unknown". We can't do much if we're not + * allowed to consult the remote server, but we can use a hack similar + * to plancat.c's treatment of empty relations: use a minimum size + * estimate of 10 pages, and divide by the column-datatype-based width + * estimate to get the corresponding number of tuples. + */ + if (baserel->tuples < 0) + { + baserel->pages = 10; + baserel->tuples = + (10 * BLCKSZ) / (baserel->reltarget->width + + MAXALIGN(SizeofHeapTupleHeader)); + } + + /* Estimate baserel size as best we can with local statistics. */ + set_baserel_size_estimates(root, baserel); + + /* Fill in basically-bogus cost estimates for use later. */ + estimate_path_cost_size(root, baserel, NIL, NIL, NULL, + &fpinfo->rows, &fpinfo->width, + &fpinfo->startup_cost, &fpinfo->total_cost); + } + + /* + * fpinfo->relation_name gets the numeric rangetable index of the foreign + * table RTE. (If this query gets EXPLAIN'd, we'll convert that to a + * human-readable string at that time.) + */ + fpinfo->relation_name = psprintf("%u", baserel->relid); + + /* No outer and inner relations. */ + fpinfo->make_outerrel_subquery = false; + fpinfo->make_innerrel_subquery = false; + fpinfo->lower_subquery_rels = NULL; + /* Set the relation index. */ + fpinfo->relation_index = baserel->relid; +} + +/* + * get_useful_ecs_for_relation + * Determine which EquivalenceClasses might be involved in useful + * orderings of this relation. + * + * This function is in some respects a mirror image of the core function + * pathkeys_useful_for_merging: for a regular table, we know what indexes + * we have and want to test whether any of them are useful. For a foreign + * table, we don't know what indexes are present on the remote side but + * want to speculate about which ones we'd like to use if they existed. + * + * This function returns a list of potentially-useful equivalence classes, + * but it does not guarantee that an EquivalenceMember exists which contains + * Vars only from the given relation. For example, given ft1 JOIN t1 ON + * ft1.x + t1.x = 0, this function will say that the equivalence class + * containing ft1.x + t1.x is potentially useful. Supposing ft1 is remote and + * t1 is local (or on a different server), it will turn out that no useful + * ORDER BY clause can be generated. It's not our job to figure that out + * here; we're only interested in identifying relevant ECs. + */ +static List * +get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel) +{ + List *useful_eclass_list = NIL; + ListCell *lc; + Relids relids; + + /* + * First, consider whether any active EC is potentially useful for a merge + * join against this relation. + */ + if (rel->has_eclass_joins) + { + foreach(lc, root->eq_classes) + { + EquivalenceClass *cur_ec = (EquivalenceClass *) lfirst(lc); + + if (eclass_useful_for_merging(root, cur_ec, rel)) + useful_eclass_list = lappend(useful_eclass_list, cur_ec); + } + } + + /* + * Next, consider whether there are any non-EC derivable join clauses that + * are merge-joinable. If the joininfo list is empty, we can exit + * quickly. + */ + if (rel->joininfo == NIL) + return useful_eclass_list; + + /* If this is a child rel, we must use the topmost parent rel to search. */ + if (IS_OTHER_REL(rel)) + { + Assert(!bms_is_empty(rel->top_parent_relids)); + relids = rel->top_parent_relids; + } + else + relids = rel->relids; + + /* Check each join clause in turn. */ + foreach(lc, rel->joininfo) + { + RestrictInfo *restrictinfo = (RestrictInfo *) lfirst(lc); + + /* Consider only mergejoinable clauses */ + if (restrictinfo->mergeopfamilies == NIL) + continue; + + /* Make sure we've got canonical ECs. */ + update_mergeclause_eclasses(root, restrictinfo); + + /* + * restrictinfo->mergeopfamilies != NIL is sufficient to guarantee + * that left_ec and right_ec will be initialized, per comments in + * distribute_qual_to_rels. + * + * We want to identify which side of this merge-joinable clause + * contains columns from the relation produced by this RelOptInfo. We + * test for overlap, not containment, because there could be extra + * relations on either side. For example, suppose we've got something + * like ((A JOIN B ON A.x = B.x) JOIN C ON A.y = C.y) LEFT JOIN D ON + * A.y = D.y. The input rel might be the joinrel between A and B, and + * we'll consider the join clause A.y = D.y. relids contains a + * relation not involved in the join class (B) and the equivalence + * class for the left-hand side of the clause contains a relation not + * involved in the input rel (C). Despite the fact that we have only + * overlap and not containment in either direction, A.y is potentially + * useful as a sort column. + * + * Note that it's even possible that relids overlaps neither side of + * the join clause. For example, consider A LEFT JOIN B ON A.x = B.x + * AND A.x = 1. The clause A.x = 1 will appear in B's joininfo list, + * but overlaps neither side of B. In that case, we just skip this + * join clause, since it doesn't suggest a useful sort order for this + * relation. + */ + if (bms_overlap(relids, restrictinfo->right_ec->ec_relids)) + useful_eclass_list = list_append_unique_ptr(useful_eclass_list, + restrictinfo->right_ec); + else if (bms_overlap(relids, restrictinfo->left_ec->ec_relids)) + useful_eclass_list = list_append_unique_ptr(useful_eclass_list, + restrictinfo->left_ec); + } + + return useful_eclass_list; +} + +/* + * get_useful_pathkeys_for_relation + * Determine which orderings of a relation might be useful. + * + * Getting data in sorted order can be useful either because the requested + * order matches the final output ordering for the overall query we're + * planning, or because it enables an efficient merge join. Here, we try + * to figure out which pathkeys to consider. + */ +static List * +get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel) +{ + List *useful_pathkeys_list = NIL; + List *useful_eclass_list; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private; + EquivalenceClass *query_ec = NULL; + ListCell *lc; + + /* + * Pushing the query_pathkeys to the remote server is always worth + * considering, because it might let us avoid a local sort. + */ + fpinfo->qp_is_pushdown_safe = false; + if (root->query_pathkeys) + { + bool query_pathkeys_ok = true; + + foreach(lc, root->query_pathkeys) + { + PathKey *pathkey = (PathKey *) lfirst(lc); + + /* + * The planner and executor don't have any clever strategy for + * taking data sorted by a prefix of the query's pathkeys and + * getting it to be sorted by all of those pathkeys. We'll just + * end up resorting the entire data set. So, unless we can push + * down all of the query pathkeys, forget it. + */ + if (!is_foreign_pathkey(root, rel, pathkey)) + { + query_pathkeys_ok = false; + break; + } + } + + if (query_pathkeys_ok) + { + useful_pathkeys_list = list_make1(list_copy(root->query_pathkeys)); + fpinfo->qp_is_pushdown_safe = true; + } + } + + /* + * Even if we're not using remote estimates, having the remote side do the + * sort generally won't be any worse than doing it locally, and it might + * be much better if the remote side can generate data in the right order + * without needing a sort at all. However, what we're going to do next is + * try to generate pathkeys that seem promising for possible merge joins, + * and that's more speculative. A wrong choice might hurt quite a bit, so + * bail out if we can't use remote estimates. + */ + if (!fpinfo->use_remote_estimate) + return useful_pathkeys_list; + + /* Get the list of interesting EquivalenceClasses. */ + useful_eclass_list = get_useful_ecs_for_relation(root, rel); + + /* Extract unique EC for query, if any, so we don't consider it again. */ + if (list_length(root->query_pathkeys) == 1) + { + PathKey *query_pathkey = linitial(root->query_pathkeys); + + query_ec = query_pathkey->pk_eclass; + } + + /* + * As a heuristic, the only pathkeys we consider here are those of length + * one. It's surely possible to consider more, but since each one we + * choose to consider will generate a round-trip to the remote side, we + * need to be a bit cautious here. It would sure be nice to have a local + * cache of information about remote index definitions... + */ + foreach(lc, useful_eclass_list) + { + EquivalenceClass *cur_ec = lfirst(lc); + PathKey *pathkey; + + /* If redundant with what we did above, skip it. */ + if (cur_ec == query_ec) + continue; + + /* Can't push down the sort if the EC's opfamily is not shippable. */ + if (!is_shippable(linitial_oid(cur_ec->ec_opfamilies), + OperatorFamilyRelationId, fpinfo)) + continue; + + /* If no pushable expression for this rel, skip it. */ + if (find_em_for_rel(root, cur_ec, rel) == NULL) + continue; + + /* Looks like we can generate a pathkey, so let's do it. */ + pathkey = make_canonical_pathkey(root, cur_ec, + linitial_oid(cur_ec->ec_opfamilies), + BTLessStrategyNumber, + false); + useful_pathkeys_list = lappend(useful_pathkeys_list, + list_make1(pathkey)); + } + + return useful_pathkeys_list; +} + +/* + * postgresGetForeignPaths + * Create possible scan paths for a scan on the foreign table + */ +static void +postgresGetForeignPaths(PlannerInfo *root, + RelOptInfo *baserel, + Oid foreigntableid) +{ + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private; + ForeignPath *path; + List *ppi_list; + ListCell *lc; + + /* + * Create simplest ForeignScan path node and add it to baserel. This path + * corresponds to SeqScan path of regular tables (though depending on what + * baserestrict conditions we were able to send to remote, there might + * actually be an indexscan happening there). We already did all the work + * to estimate cost and size of this path. + * + * Although this path uses no join clauses, it could still have required + * parameterization due to LATERAL refs in its tlist. + */ + path = create_foreignscan_path(root, baserel, + NULL, /* default pathtarget */ + fpinfo->rows, + fpinfo->startup_cost, + fpinfo->total_cost, + NIL, /* no pathkeys */ + baserel->lateral_relids, + NULL, /* no extra plan */ + NIL); /* no fdw_private list */ + add_path(baserel, (Path *) path); + + /* Add paths with pathkeys */ + add_paths_with_pathkeys_for_rel(root, baserel, NULL); + + /* + * If we're not using remote estimates, stop here. We have no way to + * estimate whether any join clauses would be worth sending across, so + * don't bother building parameterized paths. + */ + if (!fpinfo->use_remote_estimate) + return; + + /* + * Thumb through all join clauses for the rel to identify which outer + * relations could supply one or more safe-to-send-to-remote join clauses. + * We'll build a parameterized path for each such outer relation. + * + * It's convenient to manage this by representing each candidate outer + * relation by the ParamPathInfo node for it. We can then use the + * ppi_clauses list in the ParamPathInfo node directly as a list of the + * interesting join clauses for that rel. This takes care of the + * possibility that there are multiple safe join clauses for such a rel, + * and also ensures that we account for unsafe join clauses that we'll + * still have to enforce locally (since the parameterized-path machinery + * insists that we handle all movable clauses). + */ + ppi_list = NIL; + foreach(lc, baserel->joininfo) + { + RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); + Relids required_outer; + ParamPathInfo *param_info; + + /* Check if clause can be moved to this rel */ + if (!join_clause_is_movable_to(rinfo, baserel)) + continue; + + /* See if it is safe to send to remote */ + if (!is_foreign_expr(root, baserel, rinfo->clause)) + continue; + + /* Calculate required outer rels for the resulting path */ + required_outer = bms_union(rinfo->clause_relids, + baserel->lateral_relids); + /* We do not want the foreign rel itself listed in required_outer */ + required_outer = bms_del_member(required_outer, baserel->relid); + + /* + * required_outer probably can't be empty here, but if it were, we + * couldn't make a parameterized path. + */ + if (bms_is_empty(required_outer)) + continue; + + /* Get the ParamPathInfo */ + param_info = get_baserel_parampathinfo(root, baserel, + required_outer); + Assert(param_info != NULL); + + /* + * Add it to list unless we already have it. Testing pointer equality + * is OK since get_baserel_parampathinfo won't make duplicates. + */ + ppi_list = list_append_unique_ptr(ppi_list, param_info); + } + + /* + * The above scan examined only "generic" join clauses, not those that + * were absorbed into EquivalenceClauses. See if we can make anything out + * of EquivalenceClauses. + */ + if (baserel->has_eclass_joins) + { + /* + * We repeatedly scan the eclass list looking for column references + * (or expressions) belonging to the foreign rel. Each time we find + * one, we generate a list of equivalence joinclauses for it, and then + * see if any are safe to send to the remote. Repeat till there are + * no more candidate EC members. + */ + ec_member_foreign_arg arg; + + arg.already_used = NIL; + for (;;) + { + List *clauses; + + /* Make clauses, skipping any that join to lateral_referencers */ + arg.current = NULL; + clauses = generate_implied_equalities_for_column(root, + baserel, + ec_member_matches_foreign, + (void *) &arg, + baserel->lateral_referencers); + + /* Done if there are no more expressions in the foreign rel */ + if (arg.current == NULL) + { + Assert(clauses == NIL); + break; + } + + /* Scan the extracted join clauses */ + foreach(lc, clauses) + { + RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); + Relids required_outer; + ParamPathInfo *param_info; + + /* Check if clause can be moved to this rel */ + if (!join_clause_is_movable_to(rinfo, baserel)) + continue; + + /* See if it is safe to send to remote */ + if (!is_foreign_expr(root, baserel, rinfo->clause)) + continue; + + /* Calculate required outer rels for the resulting path */ + required_outer = bms_union(rinfo->clause_relids, + baserel->lateral_relids); + required_outer = bms_del_member(required_outer, baserel->relid); + if (bms_is_empty(required_outer)) + continue; + + /* Get the ParamPathInfo */ + param_info = get_baserel_parampathinfo(root, baserel, + required_outer); + Assert(param_info != NULL); + + /* Add it to list unless we already have it */ + ppi_list = list_append_unique_ptr(ppi_list, param_info); + } + + /* Try again, now ignoring the expression we found this time */ + arg.already_used = lappend(arg.already_used, arg.current); + } + } + + /* + * Now build a path for each useful outer relation. + */ + foreach(lc, ppi_list) + { + ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc); + double rows; + int width; + Cost startup_cost; + Cost total_cost; + + /* Get a cost estimate from the remote */ + estimate_path_cost_size(root, baserel, + param_info->ppi_clauses, NIL, NULL, + &rows, &width, + &startup_cost, &total_cost); + + /* + * ppi_rows currently won't get looked at by anything, but still we + * may as well ensure that it matches our idea of the rowcount. + */ + param_info->ppi_rows = rows; + + /* Make the path */ + path = create_foreignscan_path(root, baserel, + NULL, /* default pathtarget */ + rows, + startup_cost, + total_cost, + NIL, /* no pathkeys */ + param_info->ppi_req_outer, + NULL, + NIL); /* no fdw_private list */ + add_path(baserel, (Path *) path); + } +} + +/* + * postgresGetForeignPlan + * Create ForeignScan plan node which implements selected best path + */ +static ForeignScan * +postgresGetForeignPlan(PlannerInfo *root, + RelOptInfo *foreignrel, + Oid foreigntableid, + ForeignPath *best_path, + List *tlist, + List *scan_clauses, + Plan *outer_plan) +{ + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; + Index scan_relid; + List *fdw_private; + List *remote_exprs = NIL; + List *local_exprs = NIL; + List *params_list = NIL; + List *fdw_scan_tlist = NIL; + List *fdw_recheck_quals = NIL; + List *retrieved_attrs; + StringInfoData sql; + bool has_final_sort = false; + bool has_limit = false; + ListCell *lc; + + /* + * Get FDW private data created by postgresGetForeignUpperPaths(), if any. + */ + if (best_path->fdw_private) + { + has_final_sort = boolVal(list_nth(best_path->fdw_private, + FdwPathPrivateHasFinalSort)); + has_limit = boolVal(list_nth(best_path->fdw_private, + FdwPathPrivateHasLimit)); + } + + if (IS_SIMPLE_REL(foreignrel)) + { + /* + * For base relations, set scan_relid as the relid of the relation. + */ + scan_relid = foreignrel->relid; + + /* + * In a base-relation scan, we must apply the given scan_clauses. + * + * Separate the scan_clauses into those that can be executed remotely + * and those that can't. baserestrictinfo clauses that were + * previously determined to be safe or unsafe by classifyConditions + * are found in fpinfo->remote_conds and fpinfo->local_conds. Anything + * else in the scan_clauses list will be a join clause, which we have + * to check for remote-safety. + * + * Note: the join clauses we see here should be the exact same ones + * previously examined by postgresGetForeignPaths. Possibly it'd be + * worth passing forward the classification work done then, rather + * than repeating it here. + * + * This code must match "extract_actual_clauses(scan_clauses, false)" + * except for the additional decision about remote versus local + * execution. + */ + foreach(lc, scan_clauses) + { + RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); + + /* Ignore any pseudoconstants, they're dealt with elsewhere */ + if (rinfo->pseudoconstant) + continue; + + if (list_member_ptr(fpinfo->remote_conds, rinfo)) + remote_exprs = lappend(remote_exprs, rinfo->clause); + else if (list_member_ptr(fpinfo->local_conds, rinfo)) + local_exprs = lappend(local_exprs, rinfo->clause); + else if (is_foreign_expr(root, foreignrel, rinfo->clause)) + remote_exprs = lappend(remote_exprs, rinfo->clause); + else + local_exprs = lappend(local_exprs, rinfo->clause); + } + + /* + * For a base-relation scan, we have to support EPQ recheck, which + * should recheck all the remote quals. + */ + fdw_recheck_quals = remote_exprs; + } + else + { + /* + * Join relation or upper relation - set scan_relid to 0. + */ + scan_relid = 0; + + /* + * For a join rel, baserestrictinfo is NIL and we are not considering + * parameterization right now, so there should be no scan_clauses for + * a joinrel or an upper rel either. + */ + Assert(!scan_clauses); + + /* + * Instead we get the conditions to apply from the fdw_private + * structure. + */ + remote_exprs = extract_actual_clauses(fpinfo->remote_conds, false); + local_exprs = extract_actual_clauses(fpinfo->local_conds, false); + + /* + * We leave fdw_recheck_quals empty in this case, since we never need + * to apply EPQ recheck clauses. In the case of a joinrel, EPQ + * recheck is handled elsewhere --- see postgresGetForeignJoinPaths(). + * If we're planning an upperrel (ie, remote grouping or aggregation) + * then there's no EPQ to do because SELECT FOR UPDATE wouldn't be + * allowed, and indeed we *can't* put the remote clauses into + * fdw_recheck_quals because the unaggregated Vars won't be available + * locally. + */ + + /* Build the list of columns to be fetched from the foreign server. */ + fdw_scan_tlist = build_tlist_to_deparse(foreignrel); + + /* + * Ensure that the outer plan produces a tuple whose descriptor + * matches our scan tuple slot. Also, remove the local conditions + * from outer plan's quals, lest they be evaluated twice, once by the + * local plan and once by the scan. + */ + if (outer_plan) + { + ListCell *lc; + + /* + * Right now, we only consider grouping and aggregation beyond + * joins. Queries involving aggregates or grouping do not require + * EPQ mechanism, hence should not have an outer plan here. + */ + Assert(!IS_UPPER_REL(foreignrel)); + + /* + * First, update the plan's qual list if possible. In some cases + * the quals might be enforced below the topmost plan level, in + * which case we'll fail to remove them; it's not worth working + * harder than this. + */ + foreach(lc, local_exprs) + { + Node *qual = lfirst(lc); + + outer_plan->qual = list_delete(outer_plan->qual, qual); + + /* + * For an inner join the local conditions of foreign scan plan + * can be part of the joinquals as well. (They might also be + * in the mergequals or hashquals, but we can't touch those + * without breaking the plan.) + */ + if (IsA(outer_plan, NestLoop) || + IsA(outer_plan, MergeJoin) || + IsA(outer_plan, HashJoin)) + { + Join *join_plan = (Join *) outer_plan; + + if (join_plan->jointype == JOIN_INNER) + join_plan->joinqual = list_delete(join_plan->joinqual, + qual); + } + } + + /* + * Now fix the subplan's tlist --- this might result in inserting + * a Result node atop the plan tree. + */ + outer_plan = change_plan_targetlist(outer_plan, fdw_scan_tlist, + best_path->path.parallel_safe); + } + } + + /* + * Build the query string to be sent for execution, and identify + * expressions to be sent as parameters. + */ + initStringInfo(&sql); + deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, + remote_exprs, best_path->path.pathkeys, + has_final_sort, has_limit, false, + &retrieved_attrs, ¶ms_list); + + /* Remember remote_exprs for possible use by postgresPlanDirectModify */ + fpinfo->final_remote_exprs = remote_exprs; + + /* + * Build the fdw_private list that will be available to the executor. + * Items in the list must match order in enum FdwScanPrivateIndex. + */ + fdw_private = list_make3(makeString(sql.data), + retrieved_attrs, + makeInteger(fpinfo->fetch_size)); + if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel)) + fdw_private = lappend(fdw_private, + makeString(fpinfo->relation_name)); + + /* + * Create the ForeignScan node for the given relation. + * + * Note that the remote parameter expressions are stored in the fdw_exprs + * field of the finished plan node; we can't keep them in private state + * because then they wouldn't be subject to later planner processing. + */ + return make_foreignscan(tlist, + local_exprs, + scan_relid, + params_list, + fdw_private, + fdw_scan_tlist, + fdw_recheck_quals, + outer_plan); +} + +/* + * Construct a tuple descriptor for the scan tuples handled by a foreign join. + */ +static TupleDesc +get_tupdesc_for_join_scan_tuples(ForeignScanState *node) +{ + ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; + EState *estate = node->ss.ps.state; + TupleDesc tupdesc; + + /* + * The core code has already set up a scan tuple slot based on + * fsplan->fdw_scan_tlist, and this slot's tupdesc is mostly good enough, + * but there's one case where it isn't. If we have any whole-row row + * identifier Vars, they may have vartype RECORD, and we need to replace + * that with the associated table's actual composite type. This ensures + * that when we read those ROW() expression values from the remote server, + * we can convert them to a composite type the local server knows. + */ + tupdesc = CreateTupleDescCopy(node->ss.ss_ScanTupleSlot->tts_tupleDescriptor); + for (int i = 0; i < tupdesc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, i); + Var *var; + RangeTblEntry *rte; + Oid reltype; + + /* Nothing to do if it's not a generic RECORD attribute */ + if (att->atttypid != RECORDOID || att->atttypmod >= 0) + continue; + + /* + * If we can't identify the referenced table, do nothing. This'll + * likely lead to failure later, but perhaps we can muddle through. + */ + var = (Var *) list_nth_node(TargetEntry, fsplan->fdw_scan_tlist, + i)->expr; + if (!IsA(var, Var) || var->varattno != 0) + continue; + rte = list_nth(estate->es_range_table, var->varno - 1); + if (rte->rtekind != RTE_RELATION) + continue; + reltype = get_rel_type_id(rte->relid); + if (!OidIsValid(reltype)) + continue; + att->atttypid = reltype; + /* shouldn't need to change anything else */ + } + return tupdesc; +} + +/* + * postgresBeginForeignScan + * Initiate an executor scan of a foreign PostgreSQL table. + */ +static void +postgresBeginForeignScan(ForeignScanState *node, int eflags) +{ + ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; + EState *estate = node->ss.ps.state; + PgFdwScanState *fsstate; + RangeTblEntry *rte; + Oid userid; + ForeignTable *table; + UserMapping *user; + int rtindex; + int numParams; + + /* + * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. + */ + if (eflags & EXEC_FLAG_EXPLAIN_ONLY) + return; + + /* + * We'll save private state in node->fdw_state. + */ + fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState)); + node->fdw_state = (void *) fsstate; + + /* + * Identify which user to do the remote access as. This should match what + * ExecCheckRTEPerms() does. In case of a join or aggregate, use the + * lowest-numbered member RTE as a representative; we would get the same + * result from any. + */ + if (fsplan->scan.scanrelid > 0) + rtindex = fsplan->scan.scanrelid; + else + rtindex = bms_next_member(fsplan->fs_relids, -1); + rte = exec_rt_fetch(rtindex, estate); + userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + + /* Get info about foreign table. */ + table = GetForeignTable(rte->relid); + user = GetUserMapping(userid, table->serverid); + + /* + * Get connection to the foreign server. Connection manager will + * establish new connection if necessary. + */ + fsstate->conn = GetConnection(user, false, &fsstate->conn_state); + + /* Assign a unique ID for my cursor */ + fsstate->cursor_number = GetCursorNumber(fsstate->conn); + fsstate->cursor_exists = false; + + /* Get private info created by planner functions. */ + fsstate->query = strVal(list_nth(fsplan->fdw_private, + FdwScanPrivateSelectSql)); + fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, + FdwScanPrivateRetrievedAttrs); + fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private, + FdwScanPrivateFetchSize)); + + /* Create contexts for batches of tuples and per-tuple temp workspace. */ + fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, + "postgres_fdw tuple data", + ALLOCSET_DEFAULT_SIZES); + fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, + "postgres_fdw temporary data", + ALLOCSET_SMALL_SIZES); + + /* + * Get info we'll need for converting data fetched from the foreign server + * into local representation and error reporting during that process. + */ + if (fsplan->scan.scanrelid > 0) + { + fsstate->rel = node->ss.ss_currentRelation; + fsstate->tupdesc = RelationGetDescr(fsstate->rel); + } + else + { + fsstate->rel = NULL; + fsstate->tupdesc = get_tupdesc_for_join_scan_tuples(node); + } + + fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc); + + /* + * Prepare for processing of parameters used in remote query, if any. + */ + numParams = list_length(fsplan->fdw_exprs); + fsstate->numParams = numParams; + if (numParams > 0) + prepare_query_params((PlanState *) node, + fsplan->fdw_exprs, + numParams, + &fsstate->param_flinfo, + &fsstate->param_exprs, + &fsstate->param_values); + + /* Set the async-capable flag */ + fsstate->async_capable = node->ss.ps.async_capable; +} + +/* + * postgresIterateForeignScan + * Retrieve next row from the result set, or clear tuple slot to indicate + * EOF. + */ +static TupleTableSlot * +postgresIterateForeignScan(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + + /* + * In sync mode, if this is the first call after Begin or ReScan, we need + * to create the cursor on the remote side. In async mode, we would have + * already created the cursor before we get here, even if this is the + * first call after Begin or ReScan. + */ + if (!fsstate->cursor_exists) + create_cursor(node); + + /* + * Get some more tuples, if we've run out. + */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + /* In async mode, just clear tuple slot. */ + if (fsstate->async_capable) + return ExecClearTuple(slot); + /* No point in another fetch if we already detected EOF, though. */ + if (!fsstate->eof_reached) + fetch_more_data(node); + /* If we didn't get any tuples, must be end of data. */ + if (fsstate->next_tuple >= fsstate->num_tuples) + return ExecClearTuple(slot); + } + + /* + * Return the next tuple. + */ + ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++], + slot, + false); + + return slot; +} + +/* + * postgresReScanForeignScan + * Restart the scan. + */ +static void +postgresReScanForeignScan(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + char sql[64]; + PGresult *res; + + /* If we haven't created the cursor yet, nothing to do. */ + if (!fsstate->cursor_exists) + return; + + /* + * If the node is async-capable, and an asynchronous fetch for it has been + * begun, the asynchronous fetch might not have yet completed. Check if + * the node is async-capable, and an asynchronous fetch for it is still in + * progress; if so, complete the asynchronous fetch before restarting the + * scan. + */ + if (fsstate->async_capable && + fsstate->conn_state->pendingAreq && + fsstate->conn_state->pendingAreq->requestee == (PlanState *) node) + fetch_more_data(node); + + /* + * If any internal parameters affecting this node have changed, we'd + * better destroy and recreate the cursor. Otherwise, rewinding it should + * be good enough. If we've only fetched zero or one batch, we needn't + * even rewind the cursor, just rescan what we have. + */ + if (node->ss.ps.chgParam != NULL) + { + fsstate->cursor_exists = false; + snprintf(sql, sizeof(sql), "CLOSE c%u", + fsstate->cursor_number); + } + else if (fsstate->fetch_ct_2 > 1) + { + snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u", + fsstate->cursor_number); + } + else + { + /* Easy: just rescan what we already have in memory, if anything */ + fsstate->next_tuple = 0; + return; + } + + /* + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fsstate->conn, true, sql); + PQclear(res); + + /* Now force a fresh FETCH. */ + fsstate->tuples = NULL; + fsstate->num_tuples = 0; + fsstate->next_tuple = 0; + fsstate->fetch_ct_2 = 0; + fsstate->eof_reached = false; +} + +/* + * postgresEndForeignScan + * Finish scanning foreign table and dispose objects used for this scan + */ +static void +postgresEndForeignScan(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + + /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ + if (fsstate == NULL) + return; + + /* Close the cursor if open, to prevent accumulation of cursors */ + if (fsstate->cursor_exists) + close_cursor(fsstate->conn, fsstate->cursor_number, + fsstate->conn_state); + + /* Release remote connection */ + ReleaseConnection(fsstate->conn); + fsstate->conn = NULL; + + /* MemoryContexts will be deleted automatically. */ +} + +/* + * postgresAddForeignUpdateTargets + * Add resjunk column(s) needed for update/delete on a foreign table + */ +static void +postgresAddForeignUpdateTargets(PlannerInfo *root, + Index rtindex, + RangeTblEntry *target_rte, + Relation target_relation) +{ + Var *var; + + /* + * In postgres_fdw, what we need is the ctid, same as for a regular table. + */ + + /* Make a Var representing the desired value */ + var = makeVar(rtindex, + SelfItemPointerAttributeNumber, + TIDOID, + -1, + InvalidOid, + 0); + + /* Register it as a row-identity column needed by this target rel */ + add_row_identity_var(root, var, rtindex, "ctid"); +} + +/* + * postgresPlanForeignModify + * Plan an insert/update/delete operation on a foreign table + */ +static List * +postgresPlanForeignModify(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index) +{ + CmdType operation = plan->operation; + RangeTblEntry *rte = planner_rt_fetch(resultRelation, root); + Relation rel; + StringInfoData sql; + List *targetAttrs = NIL; + List *withCheckOptionList = NIL; + List *returningList = NIL; + List *retrieved_attrs = NIL; + bool doNothing = false; + int values_end_len = -1; + + initStringInfo(&sql); + + /* + * Core code already has some lock on each rel being planned, so we can + * use NoLock here. + */ + rel = table_open(rte->relid, NoLock); + + /* + * In an INSERT, we transmit all columns that are defined in the foreign + * table. In an UPDATE, if there are BEFORE ROW UPDATE triggers on the + * foreign table, we transmit all columns like INSERT; else we transmit + * only columns that were explicitly targets of the UPDATE, so as to avoid + * unnecessary data transmission. (We can't do that for INSERT since we + * would miss sending default values for columns not listed in the source + * statement, and for UPDATE if there are BEFORE ROW UPDATE triggers since + * those triggers might change values for non-target columns, in which + * case we would miss sending changed values for those columns.) + */ + if (operation == CMD_INSERT || + (operation == CMD_UPDATE && + rel->trigdesc && + rel->trigdesc->trig_update_before_row)) + { + TupleDesc tupdesc = RelationGetDescr(rel); + int attnum; + + for (attnum = 1; attnum <= tupdesc->natts; attnum++) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + if (!attr->attisdropped) + targetAttrs = lappend_int(targetAttrs, attnum); + } + } + else if (operation == CMD_UPDATE) + { + int col; + RelOptInfo *rel = find_base_rel(root, resultRelation); + Bitmapset *allUpdatedCols = get_rel_all_updated_cols(root, rel); + + col = -1; + while ((col = bms_next_member(allUpdatedCols, col)) >= 0) + { + /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */ + AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber; + + if (attno <= InvalidAttrNumber) /* shouldn't happen */ + elog(ERROR, "system-column update is not supported"); + targetAttrs = lappend_int(targetAttrs, attno); + } + } + + /* + * Extract the relevant WITH CHECK OPTION list if any. + */ + if (plan->withCheckOptionLists) + withCheckOptionList = (List *) list_nth(plan->withCheckOptionLists, + subplan_index); + + /* + * Extract the relevant RETURNING list if any. + */ + if (plan->returningLists) + returningList = (List *) list_nth(plan->returningLists, subplan_index); + + /* + * ON CONFLICT DO UPDATE and DO NOTHING case with inference specification + * should have already been rejected in the optimizer, as presently there + * is no way to recognize an arbiter index on a foreign table. Only DO + * NOTHING is supported without an inference specification. + */ + if (plan->onConflictAction == ONCONFLICT_NOTHING) + doNothing = true; + else if (plan->onConflictAction != ONCONFLICT_NONE) + elog(ERROR, "unexpected ON CONFLICT specification: %d", + (int) plan->onConflictAction); + + /* + * Construct the SQL command string. + */ + switch (operation) + { + case CMD_INSERT: + deparseInsertSql(&sql, rte, resultRelation, rel, + targetAttrs, doNothing, + withCheckOptionList, returningList, + &retrieved_attrs, &values_end_len); + break; + case CMD_UPDATE: + deparseUpdateSql(&sql, rte, resultRelation, rel, + targetAttrs, + withCheckOptionList, returningList, + &retrieved_attrs); + break; + case CMD_DELETE: + deparseDeleteSql(&sql, rte, resultRelation, rel, + returningList, + &retrieved_attrs); + break; + default: + elog(ERROR, "unexpected operation: %d", (int) operation); + break; + } + + table_close(rel, NoLock); + + /* + * Build the fdw_private list that will be available to the executor. + * Items in the list must match enum FdwModifyPrivateIndex, above. + */ + return list_make5(makeString(sql.data), + targetAttrs, + makeInteger(values_end_len), + makeBoolean((retrieved_attrs != NIL)), + retrieved_attrs); +} + +/* + * postgresBeginForeignModify + * Begin an insert/update/delete operation on a foreign table + */ +static void +postgresBeginForeignModify(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + List *fdw_private, + int subplan_index, + int eflags) +{ + PgFdwModifyState *fmstate; + char *query; + List *target_attrs; + bool has_returning; + int values_end_len; + List *retrieved_attrs; + RangeTblEntry *rte; + + /* + * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState + * stays NULL. + */ + if (eflags & EXEC_FLAG_EXPLAIN_ONLY) + return; + + /* Deconstruct fdw_private data. */ + query = strVal(list_nth(fdw_private, + FdwModifyPrivateUpdateSql)); + target_attrs = (List *) list_nth(fdw_private, + FdwModifyPrivateTargetAttnums); + values_end_len = intVal(list_nth(fdw_private, + FdwModifyPrivateLen)); + has_returning = boolVal(list_nth(fdw_private, + FdwModifyPrivateHasReturning)); + retrieved_attrs = (List *) list_nth(fdw_private, + FdwModifyPrivateRetrievedAttrs); + + /* Find RTE. */ + rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex, + mtstate->ps.state); + + /* Construct an execution state. */ + fmstate = create_foreign_modify(mtstate->ps.state, + rte, + resultRelInfo, + mtstate->operation, + outerPlanState(mtstate)->plan, + query, + target_attrs, + values_end_len, + has_returning, + retrieved_attrs); + + resultRelInfo->ri_FdwState = fmstate; +} + +/* + * postgresExecForeignInsert + * Insert one row into a foreign table + */ +static TupleTableSlot * +postgresExecForeignInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + TupleTableSlot **rslot; + int numSlots = 1; + + /* + * If the fmstate has aux_fmstate set, use the aux_fmstate (see + * postgresBeginForeignInsert()) + */ + if (fmstate->aux_fmstate) + resultRelInfo->ri_FdwState = fmstate->aux_fmstate; + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, + &slot, &planSlot, &numSlots); + /* Revert that change */ + if (fmstate->aux_fmstate) + resultRelInfo->ri_FdwState = fmstate; + + return rslot ? *rslot : NULL; +} + +/* + * postgresExecForeignBatchInsert + * Insert multiple rows into a foreign table + */ +static TupleTableSlot ** +postgresExecForeignBatchInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + TupleTableSlot **rslot; + + /* + * If the fmstate has aux_fmstate set, use the aux_fmstate (see + * postgresBeginForeignInsert()) + */ + if (fmstate->aux_fmstate) + resultRelInfo->ri_FdwState = fmstate->aux_fmstate; + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_INSERT, + slots, planSlots, numSlots); + /* Revert that change */ + if (fmstate->aux_fmstate) + resultRelInfo->ri_FdwState = fmstate; + + return rslot; +} + +/* + * postgresGetForeignModifyBatchSize + * Determine the maximum number of tuples that can be inserted in bulk + * + * Returns the batch size specified for server or table. When batching is not + * allowed (e.g. for tables with BEFORE/AFTER ROW triggers or with RETURNING + * clause), returns 1. + */ +static int +postgresGetForeignModifyBatchSize(ResultRelInfo *resultRelInfo) +{ + int batch_size; + PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState ? + (PgFdwModifyState *) resultRelInfo->ri_FdwState : + NULL; + + /* should be called only once */ + Assert(resultRelInfo->ri_BatchSize == 0); + + /* + * Should never get called when the insert is being performed as part of a + * row movement operation. + */ + Assert(fmstate == NULL || fmstate->aux_fmstate == NULL); + + /* + * In EXPLAIN without ANALYZE, ri_FdwState is NULL, so we have to lookup + * the option directly in server/table options. Otherwise just use the + * value we determined earlier. + */ + if (fmstate) + batch_size = fmstate->batch_size; + else + batch_size = get_batch_size_option(resultRelInfo->ri_RelationDesc); + + /* + * Disable batching when we have to use RETURNING, there are any + * BEFORE/AFTER ROW INSERT triggers on the foreign table, or there are any + * WITH CHECK OPTION constraints from parent views. + * + * When there are any BEFORE ROW INSERT triggers on the table, we can't + * support it, because such triggers might query the table we're inserting + * into and act differently if the tuples that have already been processed + * and prepared for insertion are not there. + */ + if (resultRelInfo->ri_projectReturning != NULL || + resultRelInfo->ri_WithCheckOptions != NIL || + (resultRelInfo->ri_TrigDesc && + (resultRelInfo->ri_TrigDesc->trig_insert_before_row || + resultRelInfo->ri_TrigDesc->trig_insert_after_row))) + return 1; + + /* + * Otherwise use the batch size specified for server/table. The number of + * parameters in a batch is limited to 65535 (uint16), so make sure we + * don't exceed this limit by using the maximum batch_size possible. + */ + if (fmstate && fmstate->p_nums > 0) + batch_size = Min(batch_size, PQ_QUERY_PARAM_MAX_LIMIT / fmstate->p_nums); + + return batch_size; +} + +/* + * postgresExecForeignUpdate + * Update one row in a foreign table + */ +static TupleTableSlot * +postgresExecForeignUpdate(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot) +{ + TupleTableSlot **rslot; + int numSlots = 1; + + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_UPDATE, + &slot, &planSlot, &numSlots); + + return rslot ? rslot[0] : NULL; +} + +/* + * postgresExecForeignDelete + * Delete one row from a foreign table + */ +static TupleTableSlot * +postgresExecForeignDelete(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot) +{ + TupleTableSlot **rslot; + int numSlots = 1; + + rslot = execute_foreign_modify(estate, resultRelInfo, CMD_DELETE, + &slot, &planSlot, &numSlots); + + return rslot ? rslot[0] : NULL; +} + +/* + * postgresEndForeignModify + * Finish an insert/update/delete operation on a foreign table + */ +static void +postgresEndForeignModify(EState *estate, + ResultRelInfo *resultRelInfo) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + + /* If fmstate is NULL, we are in EXPLAIN; nothing to do */ + if (fmstate == NULL) + return; + + /* Destroy the execution state */ + finish_foreign_modify(fmstate); +} + +/* + * postgresBeginForeignInsert + * Begin an insert operation on a foreign table + */ +static void +postgresBeginForeignInsert(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo) +{ + PgFdwModifyState *fmstate; + ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan); + EState *estate = mtstate->ps.state; + Index resultRelation; + Relation rel = resultRelInfo->ri_RelationDesc; + RangeTblEntry *rte; + TupleDesc tupdesc = RelationGetDescr(rel); + int attnum; + int values_end_len; + StringInfoData sql; + List *targetAttrs = NIL; + List *retrieved_attrs = NIL; + bool doNothing = false; + + /* + * If the foreign table we are about to insert routed rows into is also an + * UPDATE subplan result rel that will be updated later, proceeding with + * the INSERT will result in the later UPDATE incorrectly modifying those + * routed rows, so prevent the INSERT --- it would be nice if we could + * handle this case; but for now, throw an error for safety. + */ + if (plan && plan->operation == CMD_UPDATE && + (resultRelInfo->ri_usesFdwDirectModify || + resultRelInfo->ri_FdwState)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot route tuples into foreign table to be updated \"%s\"", + RelationGetRelationName(rel)))); + + initStringInfo(&sql); + + /* We transmit all columns that are defined in the foreign table. */ + for (attnum = 1; attnum <= tupdesc->natts; attnum++) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + if (!attr->attisdropped) + targetAttrs = lappend_int(targetAttrs, attnum); + } + + /* Check if we add the ON CONFLICT clause to the remote query. */ + if (plan) + { + OnConflictAction onConflictAction = plan->onConflictAction; + + /* We only support DO NOTHING without an inference specification. */ + if (onConflictAction == ONCONFLICT_NOTHING) + doNothing = true; + else if (onConflictAction != ONCONFLICT_NONE) + elog(ERROR, "unexpected ON CONFLICT specification: %d", + (int) onConflictAction); + } + + /* + * If the foreign table is a partition that doesn't have a corresponding + * RTE entry, we need to create a new RTE describing the foreign table for + * use by deparseInsertSql and create_foreign_modify() below, after first + * copying the parent's RTE and modifying some fields to describe the + * foreign partition to work on. However, if this is invoked by UPDATE, + * the existing RTE may already correspond to this partition if it is one + * of the UPDATE subplan target rels; in that case, we can just use the + * existing RTE as-is. + */ + if (resultRelInfo->ri_RangeTableIndex == 0) + { + ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo; + + rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate); + rte = copyObject(rte); + rte->relid = RelationGetRelid(rel); + rte->relkind = RELKIND_FOREIGN_TABLE; + + /* + * For UPDATE, we must use the RT index of the first subplan target + * rel's RTE, because the core code would have built expressions for + * the partition, such as RETURNING, using that RT index as varno of + * Vars contained in those expressions. + */ + if (plan && plan->operation == CMD_UPDATE && + rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation) + resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex; + else + resultRelation = rootResultRelInfo->ri_RangeTableIndex; + } + else + { + resultRelation = resultRelInfo->ri_RangeTableIndex; + rte = exec_rt_fetch(resultRelation, estate); + } + + /* Construct the SQL command string. */ + deparseInsertSql(&sql, rte, resultRelation, rel, targetAttrs, doNothing, + resultRelInfo->ri_WithCheckOptions, + resultRelInfo->ri_returningList, + &retrieved_attrs, &values_end_len); + + /* Construct an execution state. */ + fmstate = create_foreign_modify(mtstate->ps.state, + rte, + resultRelInfo, + CMD_INSERT, + NULL, + sql.data, + targetAttrs, + values_end_len, + retrieved_attrs != NIL, + retrieved_attrs); + + /* + * If the given resultRelInfo already has PgFdwModifyState set, it means + * the foreign table is an UPDATE subplan result rel; in which case, store + * the resulting state into the aux_fmstate of the PgFdwModifyState. + */ + if (resultRelInfo->ri_FdwState) + { + Assert(plan && plan->operation == CMD_UPDATE); + Assert(resultRelInfo->ri_usesFdwDirectModify == false); + ((PgFdwModifyState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate; + } + else + resultRelInfo->ri_FdwState = fmstate; +} + +/* + * postgresEndForeignInsert + * Finish an insert operation on a foreign table + */ +static void +postgresEndForeignInsert(EState *estate, + ResultRelInfo *resultRelInfo) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + + Assert(fmstate != NULL); + + /* + * If the fmstate has aux_fmstate set, get the aux_fmstate (see + * postgresBeginForeignInsert()) + */ + if (fmstate->aux_fmstate) + fmstate = fmstate->aux_fmstate; + + /* Destroy the execution state */ + finish_foreign_modify(fmstate); +} + +/* + * postgresIsForeignRelUpdatable + * Determine whether a foreign table supports INSERT, UPDATE and/or + * DELETE. + */ +static int +postgresIsForeignRelUpdatable(Relation rel) +{ + bool updatable; + ForeignTable *table; + ForeignServer *server; + ListCell *lc; + + /* + * By default, all postgres_fdw foreign tables are assumed updatable. This + * can be overridden by a per-server setting, which in turn can be + * overridden by a per-table setting. + */ + updatable = true; + + table = GetForeignTable(RelationGetRelid(rel)); + server = GetForeignServer(table->serverid); + + foreach(lc, server->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "updatable") == 0) + updatable = defGetBoolean(def); + } + foreach(lc, table->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "updatable") == 0) + updatable = defGetBoolean(def); + } + + /* + * Currently "updatable" means support for INSERT, UPDATE and DELETE. + */ + return updatable ? + (1 << CMD_INSERT) | (1 << CMD_UPDATE) | (1 << CMD_DELETE) : 0; +} + +/* + * postgresRecheckForeignScan + * Execute a local join execution plan for a foreign join + */ +static bool +postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot) +{ + Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid; + PlanState *outerPlan = outerPlanState(node); + TupleTableSlot *result; + + /* For base foreign relations, it suffices to set fdw_recheck_quals */ + if (scanrelid > 0) + return true; + + Assert(outerPlan != NULL); + + /* Execute a local join execution plan */ + result = ExecProcNode(outerPlan); + if (TupIsNull(result)) + return false; + + /* Store result in the given slot */ + ExecCopySlot(slot, result); + + return true; +} + +/* + * find_modifytable_subplan + * Helper routine for postgresPlanDirectModify to find the + * ModifyTable subplan node that scans the specified RTI. + * + * Returns NULL if the subplan couldn't be identified. That's not a fatal + * error condition, we just abandon trying to do the update directly. + */ +static ForeignScan * +find_modifytable_subplan(PlannerInfo *root, + ModifyTable *plan, + Index rtindex, + int subplan_index) +{ + Plan *subplan = outerPlan(plan); + + /* + * The cases we support are (1) the desired ForeignScan is the immediate + * child of ModifyTable, or (2) it is the subplan_index'th child of an + * Append node that is the immediate child of ModifyTable. There is no + * point in looking further down, as that would mean that local joins are + * involved, so we can't do the update directly. + * + * There could be a Result atop the Append too, acting to compute the + * UPDATE targetlist values. We ignore that here; the tlist will be + * checked by our caller. + * + * In principle we could examine all the children of the Append, but it's + * currently unlikely that the core planner would generate such a plan + * with the children out-of-order. Moreover, such a search risks costing + * O(N^2) time when there are a lot of children. + */ + if (IsA(subplan, Append)) + { + Append *appendplan = (Append *) subplan; + + if (subplan_index < list_length(appendplan->appendplans)) + subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index); + } + else if (IsA(subplan, Result) && + outerPlan(subplan) != NULL && + IsA(outerPlan(subplan), Append)) + { + Append *appendplan = (Append *) outerPlan(subplan); + + if (subplan_index < list_length(appendplan->appendplans)) + subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index); + } + + /* Now, have we got a ForeignScan on the desired rel? */ + if (IsA(subplan, ForeignScan)) + { + ForeignScan *fscan = (ForeignScan *) subplan; + + if (bms_is_member(rtindex, fscan->fs_relids)) + return fscan; + } + + return NULL; +} + +/* + * postgresPlanDirectModify + * Consider a direct foreign table modification + * + * Decide whether it is safe to modify a foreign table directly, and if so, + * rewrite subplan accordingly. + */ +static bool +postgresPlanDirectModify(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index) +{ + CmdType operation = plan->operation; + RelOptInfo *foreignrel; + RangeTblEntry *rte; + PgFdwRelationInfo *fpinfo; + Relation rel; + StringInfoData sql; + ForeignScan *fscan; + List *processed_tlist = NIL; + List *targetAttrs = NIL; + List *remote_exprs; + List *params_list = NIL; + List *returningList = NIL; + List *retrieved_attrs = NIL; + + /* + * Decide whether it is safe to modify a foreign table directly. + */ + + /* + * The table modification must be an UPDATE or DELETE. + */ + if (operation != CMD_UPDATE && operation != CMD_DELETE) + return false; + + /* + * Try to locate the ForeignScan subplan that's scanning resultRelation. + */ + fscan = find_modifytable_subplan(root, plan, resultRelation, subplan_index); + if (!fscan) + return false; + + /* + * It's unsafe to modify a foreign table directly if there are any quals + * that should be evaluated locally. + */ + if (fscan->scan.plan.qual != NIL) + return false; + + /* Safe to fetch data about the target foreign rel */ + if (fscan->scan.scanrelid == 0) + { + foreignrel = find_join_rel(root, fscan->fs_relids); + /* We should have a rel for this foreign join. */ + Assert(foreignrel); + } + else + foreignrel = root->simple_rel_array[resultRelation]; + rte = root->simple_rte_array[resultRelation]; + fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; + + /* + * It's unsafe to update a foreign table directly, if any expressions to + * assign to the target columns are unsafe to evaluate remotely. + */ + if (operation == CMD_UPDATE) + { + ListCell *lc, + *lc2; + + /* + * The expressions of concern are the first N columns of the processed + * targetlist, where N is the length of the rel's update_colnos. + */ + get_translated_update_targetlist(root, resultRelation, + &processed_tlist, &targetAttrs); + forboth(lc, processed_tlist, lc2, targetAttrs) + { + TargetEntry *tle = lfirst_node(TargetEntry, lc); + AttrNumber attno = lfirst_int(lc2); + + /* update's new-value expressions shouldn't be resjunk */ + Assert(!tle->resjunk); + + if (attno <= InvalidAttrNumber) /* shouldn't happen */ + elog(ERROR, "system-column update is not supported"); + + if (!is_foreign_expr(root, foreignrel, (Expr *) tle->expr)) + return false; + } + } + + /* + * Ok, rewrite subplan so as to modify the foreign table directly. + */ + initStringInfo(&sql); + + /* + * Core code already has some lock on each rel being planned, so we can + * use NoLock here. + */ + rel = table_open(rte->relid, NoLock); + + /* + * Recall the qual clauses that must be evaluated remotely. (These are + * bare clauses not RestrictInfos, but deparse.c's appendConditions() + * doesn't care.) + */ + remote_exprs = fpinfo->final_remote_exprs; + + /* + * Extract the relevant RETURNING list if any. + */ + if (plan->returningLists) + { + returningList = (List *) list_nth(plan->returningLists, subplan_index); + + /* + * When performing an UPDATE/DELETE .. RETURNING on a join directly, + * we fetch from the foreign server any Vars specified in RETURNING + * that refer not only to the target relation but to non-target + * relations. So we'll deparse them into the RETURNING clause of the + * remote query; use a targetlist consisting of them instead, which + * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan + * node below. + */ + if (fscan->scan.scanrelid == 0) + returningList = build_remote_returning(resultRelation, rel, + returningList); + } + + /* + * Construct the SQL command string. + */ + switch (operation) + { + case CMD_UPDATE: + deparseDirectUpdateSql(&sql, root, resultRelation, rel, + foreignrel, + processed_tlist, + targetAttrs, + remote_exprs, ¶ms_list, + returningList, &retrieved_attrs); + break; + case CMD_DELETE: + deparseDirectDeleteSql(&sql, root, resultRelation, rel, + foreignrel, + remote_exprs, ¶ms_list, + returningList, &retrieved_attrs); + break; + default: + elog(ERROR, "unexpected operation: %d", (int) operation); + break; + } + + /* + * Update the operation and target relation info. + */ + fscan->operation = operation; + fscan->resultRelation = resultRelation; + + /* + * Update the fdw_exprs list that will be available to the executor. + */ + fscan->fdw_exprs = params_list; + + /* + * Update the fdw_private list that will be available to the executor. + * Items in the list must match enum FdwDirectModifyPrivateIndex, above. + */ + fscan->fdw_private = list_make4(makeString(sql.data), + makeBoolean((retrieved_attrs != NIL)), + retrieved_attrs, + makeBoolean(plan->canSetTag)); + + /* + * Update the foreign-join-related fields. + */ + if (fscan->scan.scanrelid == 0) + { + /* No need for the outer subplan. */ + fscan->scan.plan.lefttree = NULL; + + /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */ + if (returningList) + rebuild_fdw_scan_tlist(fscan, returningList); + } + + /* + * Finally, unset the async-capable flag if it is set, as we currently + * don't support asynchronous execution of direct modifications. + */ + if (fscan->scan.plan.async_capable) + fscan->scan.plan.async_capable = false; + + table_close(rel, NoLock); + return true; +} + +/* + * postgresBeginDirectModify + * Prepare a direct foreign table modification + */ +static void +postgresBeginDirectModify(ForeignScanState *node, int eflags) +{ + ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; + EState *estate = node->ss.ps.state; + PgFdwDirectModifyState *dmstate; + Index rtindex; + RangeTblEntry *rte; + Oid userid; + ForeignTable *table; + UserMapping *user; + int numParams; + + /* + * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. + */ + if (eflags & EXEC_FLAG_EXPLAIN_ONLY) + return; + + /* + * We'll save private state in node->fdw_state. + */ + dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState)); + node->fdw_state = (void *) dmstate; + + /* + * Identify which user to do the remote access as. This should match what + * ExecCheckRTEPerms() does. + */ + rtindex = node->resultRelInfo->ri_RangeTableIndex; + rte = exec_rt_fetch(rtindex, estate); + userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + + /* Get info about foreign table. */ + if (fsplan->scan.scanrelid == 0) + dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags); + else + dmstate->rel = node->ss.ss_currentRelation; + table = GetForeignTable(RelationGetRelid(dmstate->rel)); + user = GetUserMapping(userid, table->serverid); + + /* + * Get connection to the foreign server. Connection manager will + * establish new connection if necessary. + */ + dmstate->conn = GetConnection(user, false, &dmstate->conn_state); + + /* Update the foreign-join-related fields. */ + if (fsplan->scan.scanrelid == 0) + { + /* Save info about foreign table. */ + dmstate->resultRel = dmstate->rel; + + /* + * Set dmstate->rel to NULL to teach get_returning_data() and + * make_tuple_from_result_row() that columns fetched from the remote + * server are described by fdw_scan_tlist of the foreign-scan plan + * node, not the tuple descriptor for the target relation. + */ + dmstate->rel = NULL; + } + + /* Initialize state variable */ + dmstate->num_tuples = -1; /* -1 means not set yet */ + + /* Get private info created by planner functions. */ + dmstate->query = strVal(list_nth(fsplan->fdw_private, + FdwDirectModifyPrivateUpdateSql)); + dmstate->has_returning = boolVal(list_nth(fsplan->fdw_private, + FdwDirectModifyPrivateHasReturning)); + dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, + FdwDirectModifyPrivateRetrievedAttrs); + dmstate->set_processed = boolVal(list_nth(fsplan->fdw_private, + FdwDirectModifyPrivateSetProcessed)); + + /* Create context for per-tuple temp workspace. */ + dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, + "postgres_fdw temporary data", + ALLOCSET_SMALL_SIZES); + + /* Prepare for input conversion of RETURNING results. */ + if (dmstate->has_returning) + { + TupleDesc tupdesc; + + if (fsplan->scan.scanrelid == 0) + tupdesc = get_tupdesc_for_join_scan_tuples(node); + else + tupdesc = RelationGetDescr(dmstate->rel); + + dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); + + /* + * When performing an UPDATE/DELETE .. RETURNING on a join directly, + * initialize a filter to extract an updated/deleted tuple from a scan + * tuple. + */ + if (fsplan->scan.scanrelid == 0) + init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex); + } + + /* + * Prepare for processing of parameters used in remote query, if any. + */ + numParams = list_length(fsplan->fdw_exprs); + dmstate->numParams = numParams; + if (numParams > 0) + prepare_query_params((PlanState *) node, + fsplan->fdw_exprs, + numParams, + &dmstate->param_flinfo, + &dmstate->param_exprs, + &dmstate->param_values); +} + +/* + * postgresIterateDirectModify + * Execute a direct foreign table modification + */ +static TupleTableSlot * +postgresIterateDirectModify(ForeignScanState *node) +{ + PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; + EState *estate = node->ss.ps.state; + ResultRelInfo *resultRelInfo = node->resultRelInfo; + + /* + * If this is the first call after Begin, execute the statement. + */ + if (dmstate->num_tuples == -1) + execute_dml_stmt(node); + + /* + * If the local query doesn't specify RETURNING, just clear tuple slot. + */ + if (!resultRelInfo->ri_projectReturning) + { + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + Instrumentation *instr = node->ss.ps.instrument; + + Assert(!dmstate->has_returning); + + /* Increment the command es_processed count if necessary. */ + if (dmstate->set_processed) + estate->es_processed += dmstate->num_tuples; + + /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */ + if (instr) + instr->tuplecount += dmstate->num_tuples; + + return ExecClearTuple(slot); + } + + /* + * Get the next RETURNING tuple. + */ + return get_returning_data(node); +} + +/* + * postgresEndDirectModify + * Finish a direct foreign table modification + */ +static void +postgresEndDirectModify(ForeignScanState *node) +{ + PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; + + /* if dmstate is NULL, we are in EXPLAIN; nothing to do */ + if (dmstate == NULL) + return; + + /* Release PGresult */ + if (dmstate->result) + PQclear(dmstate->result); + + /* Release remote connection */ + ReleaseConnection(dmstate->conn); + dmstate->conn = NULL; + + /* MemoryContext will be deleted automatically. */ +} + +/* + * postgresExplainForeignScan + * Produce extra output for EXPLAIN of a ForeignScan on a foreign table + */ +static void +postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) +{ + ForeignScan *plan = castNode(ForeignScan, node->ss.ps.plan); + List *fdw_private = plan->fdw_private; + + /* + * Identify foreign scans that are really joins or upper relations. The + * input looks something like "(1) LEFT JOIN (2)", and we must replace the + * digit string(s), which are RT indexes, with the correct relation names. + * We do that here, not when the plan is created, because we can't know + * what aliases ruleutils.c will assign at plan creation time. + */ + if (list_length(fdw_private) > FdwScanPrivateRelations) + { + StringInfo relations; + char *rawrelations; + char *ptr; + int minrti, + rtoffset; + + rawrelations = strVal(list_nth(fdw_private, FdwScanPrivateRelations)); + + /* + * A difficulty with using a string representation of RT indexes is + * that setrefs.c won't update the string when flattening the + * rangetable. To find out what rtoffset was applied, identify the + * minimum RT index appearing in the string and compare it to the + * minimum member of plan->fs_relids. (We expect all the relids in + * the join will have been offset by the same amount; the Asserts + * below should catch it if that ever changes.) + */ + minrti = INT_MAX; + ptr = rawrelations; + while (*ptr) + { + if (isdigit((unsigned char) *ptr)) + { + int rti = strtol(ptr, &ptr, 10); + + if (rti < minrti) + minrti = rti; + } + else + ptr++; + } + rtoffset = bms_next_member(plan->fs_relids, -1) - minrti; + + /* Now we can translate the string */ + relations = makeStringInfo(); + ptr = rawrelations; + while (*ptr) + { + if (isdigit((unsigned char) *ptr)) + { + int rti = strtol(ptr, &ptr, 10); + RangeTblEntry *rte; + char *relname; + char *refname; + + rti += rtoffset; + Assert(bms_is_member(rti, plan->fs_relids)); + rte = rt_fetch(rti, es->rtable); + Assert(rte->rtekind == RTE_RELATION); + /* This logic should agree with explain.c's ExplainTargetRel */ + relname = get_rel_name(rte->relid); + if (es->verbose) + { + char *namespace; + + namespace = get_namespace_name_or_temp(get_rel_namespace(rte->relid)); + appendStringInfo(relations, "%s.%s", + quote_identifier(namespace), + quote_identifier(relname)); + } + else + appendStringInfoString(relations, + quote_identifier(relname)); + refname = (char *) list_nth(es->rtable_names, rti - 1); + if (refname == NULL) + refname = rte->eref->aliasname; + if (strcmp(refname, relname) != 0) + appendStringInfo(relations, " %s", + quote_identifier(refname)); + } + else + appendStringInfoChar(relations, *ptr++); + } + ExplainPropertyText("Relations", relations->data, es); + } + + /* + * Add remote query, when VERBOSE option is specified. + */ + if (es->verbose) + { + char *sql; + + sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); + ExplainPropertyText("Remote SQL", sql, es); + } +} + +/* + * postgresExplainForeignModify + * Produce extra output for EXPLAIN of a ModifyTable on a foreign table + */ +static void +postgresExplainForeignModify(ModifyTableState *mtstate, + ResultRelInfo *rinfo, + List *fdw_private, + int subplan_index, + ExplainState *es) +{ + if (es->verbose) + { + char *sql = strVal(list_nth(fdw_private, + FdwModifyPrivateUpdateSql)); + + ExplainPropertyText("Remote SQL", sql, es); + + /* + * For INSERT we should always have batch size >= 1, but UPDATE and + * DELETE don't support batching so don't show the property. + */ + if (rinfo->ri_BatchSize > 0) + ExplainPropertyInteger("Batch Size", NULL, rinfo->ri_BatchSize, es); + } +} + +/* + * postgresExplainDirectModify + * Produce extra output for EXPLAIN of a ForeignScan that modifies a + * foreign table directly + */ +static void +postgresExplainDirectModify(ForeignScanState *node, ExplainState *es) +{ + List *fdw_private; + char *sql; + + if (es->verbose) + { + fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; + sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql)); + ExplainPropertyText("Remote SQL", sql, es); + } +} + +/* + * postgresExecForeignTruncate + * Truncate one or more foreign tables + */ +static void +postgresExecForeignTruncate(List *rels, + DropBehavior behavior, + bool restart_seqs) +{ + Oid serverid = InvalidOid; + UserMapping *user = NULL; + PGconn *conn = NULL; + StringInfoData sql; + ListCell *lc; + bool server_truncatable = true; + + /* + * By default, all postgres_fdw foreign tables are assumed truncatable. + * This can be overridden by a per-server setting, which in turn can be + * overridden by a per-table setting. + */ + foreach(lc, rels) + { + ForeignServer *server = NULL; + Relation rel = lfirst(lc); + ForeignTable *table = GetForeignTable(RelationGetRelid(rel)); + ListCell *cell; + bool truncatable; + + /* + * First time through, determine whether the foreign server allows + * truncates. Since all specified foreign tables are assumed to belong + * to the same foreign server, this result can be used for other + * foreign tables. + */ + if (!OidIsValid(serverid)) + { + serverid = table->serverid; + server = GetForeignServer(serverid); + + foreach(cell, server->options) + { + DefElem *defel = (DefElem *) lfirst(cell); + + if (strcmp(defel->defname, "truncatable") == 0) + { + server_truncatable = defGetBoolean(defel); + break; + } + } + } + + /* + * Confirm that all specified foreign tables belong to the same + * foreign server. + */ + Assert(table->serverid == serverid); + + /* Determine whether this foreign table allows truncations */ + truncatable = server_truncatable; + foreach(cell, table->options) + { + DefElem *defel = (DefElem *) lfirst(cell); + + if (strcmp(defel->defname, "truncatable") == 0) + { + truncatable = defGetBoolean(defel); + break; + } + } + + if (!truncatable) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("foreign table \"%s\" does not allow truncates", + RelationGetRelationName(rel)))); + } + Assert(OidIsValid(serverid)); + + /* + * Get connection to the foreign server. Connection manager will + * establish new connection if necessary. + */ + user = GetUserMapping(GetUserId(), serverid); + conn = GetConnection(user, false, NULL); + + /* Construct the TRUNCATE command string */ + initStringInfo(&sql); + deparseTruncateSql(&sql, rels, behavior, restart_seqs); + + /* Issue the TRUNCATE command to remote server */ + do_sql_command(conn, sql.data); + + pfree(sql.data); +} + +/* + * estimate_path_cost_size + * Get cost and size estimates for a foreign scan on given foreign relation + * either a base relation or a join between foreign relations or an upper + * relation containing foreign relations. + * + * param_join_conds are the parameterization clauses with outer relations. + * pathkeys specify the expected sort order if any for given path being costed. + * fpextra specifies additional post-scan/join-processing steps such as the + * final sort and the LIMIT restriction. + * + * The function returns the cost and size estimates in p_rows, p_width, + * p_startup_cost and p_total_cost variables. + */ +static void +estimate_path_cost_size(PlannerInfo *root, + RelOptInfo *foreignrel, + List *param_join_conds, + List *pathkeys, + PgFdwPathExtraData *fpextra, + double *p_rows, int *p_width, + Cost *p_startup_cost, Cost *p_total_cost) +{ + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; + double rows; + double retrieved_rows; + int width; + Cost startup_cost; + Cost total_cost; + + /* Make sure the core code has set up the relation's reltarget */ + Assert(foreignrel->reltarget); + + /* + * If the table or the server is configured to use remote estimates, + * connect to the foreign server and execute EXPLAIN to estimate the + * number of rows selected by the restriction+join clauses. Otherwise, + * estimate rows using whatever statistics we have locally, in a way + * similar to ordinary tables. + */ + if (fpinfo->use_remote_estimate) + { + List *remote_param_join_conds; + List *local_param_join_conds; + StringInfoData sql; + PGconn *conn; + Selectivity local_sel; + QualCost local_cost; + List *fdw_scan_tlist = NIL; + List *remote_conds; + + /* Required only to be passed to deparseSelectStmtForRel */ + List *retrieved_attrs; + + /* + * param_join_conds might contain both clauses that are safe to send + * across, and clauses that aren't. + */ + classifyConditions(root, foreignrel, param_join_conds, + &remote_param_join_conds, &local_param_join_conds); + + /* Build the list of columns to be fetched from the foreign server. */ + if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel)) + fdw_scan_tlist = build_tlist_to_deparse(foreignrel); + else + fdw_scan_tlist = NIL; + + /* + * The complete list of remote conditions includes everything from + * baserestrictinfo plus any extra join_conds relevant to this + * particular path. + */ + remote_conds = list_concat(remote_param_join_conds, + fpinfo->remote_conds); + + /* + * Construct EXPLAIN query including the desired SELECT, FROM, and + * WHERE clauses. Params and other-relation Vars are replaced by dummy + * values, so don't request params_list. + */ + initStringInfo(&sql); + appendStringInfoString(&sql, "EXPLAIN "); + deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, + remote_conds, pathkeys, + fpextra ? fpextra->has_final_sort : false, + fpextra ? fpextra->has_limit : false, + false, &retrieved_attrs, NULL); + + /* Get the remote estimate */ + conn = GetConnection(fpinfo->user, false, NULL); + get_remote_estimate(sql.data, conn, &rows, &width, + &startup_cost, &total_cost); + ReleaseConnection(conn); + + retrieved_rows = rows; + + /* Factor in the selectivity of the locally-checked quals */ + local_sel = clauselist_selectivity(root, + local_param_join_conds, + foreignrel->relid, + JOIN_INNER, + NULL); + local_sel *= fpinfo->local_conds_sel; + + rows = clamp_row_est(rows * local_sel); + + /* Add in the eval cost of the locally-checked quals */ + startup_cost += fpinfo->local_conds_cost.startup; + total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; + cost_qual_eval(&local_cost, local_param_join_conds, root); + startup_cost += local_cost.startup; + total_cost += local_cost.per_tuple * retrieved_rows; + + /* + * Add in tlist eval cost for each output row. In case of an + * aggregate, some of the tlist expressions such as grouping + * expressions will be evaluated remotely, so adjust the costs. + */ + startup_cost += foreignrel->reltarget->cost.startup; + total_cost += foreignrel->reltarget->cost.startup; + total_cost += foreignrel->reltarget->cost.per_tuple * rows; + if (IS_UPPER_REL(foreignrel)) + { + QualCost tlist_cost; + + cost_qual_eval(&tlist_cost, fdw_scan_tlist, root); + startup_cost -= tlist_cost.startup; + total_cost -= tlist_cost.startup; + total_cost -= tlist_cost.per_tuple * rows; + } + } + else + { + Cost run_cost = 0; + + /* + * We don't support join conditions in this mode (hence, no + * parameterized paths can be made). + */ + Assert(param_join_conds == NIL); + + /* + * We will come here again and again with different set of pathkeys or + * additional post-scan/join-processing steps that caller wants to + * cost. We don't need to calculate the cost/size estimates for the + * underlying scan, join, or grouping each time. Instead, use those + * estimates if we have cached them already. + */ + if (fpinfo->rel_startup_cost >= 0 && fpinfo->rel_total_cost >= 0) + { + Assert(fpinfo->retrieved_rows >= 0); + + rows = fpinfo->rows; + retrieved_rows = fpinfo->retrieved_rows; + width = fpinfo->width; + startup_cost = fpinfo->rel_startup_cost; + run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost; + + /* + * If we estimate the costs of a foreign scan or a foreign join + * with additional post-scan/join-processing steps, the scan or + * join costs obtained from the cache wouldn't yet contain the + * eval costs for the final scan/join target, which would've been + * updated by apply_scanjoin_target_to_paths(); add the eval costs + * now. + */ + if (fpextra && !IS_UPPER_REL(foreignrel)) + { + /* Shouldn't get here unless we have LIMIT */ + Assert(fpextra->has_limit); + Assert(foreignrel->reloptkind == RELOPT_BASEREL || + foreignrel->reloptkind == RELOPT_JOINREL); + startup_cost += foreignrel->reltarget->cost.startup; + run_cost += foreignrel->reltarget->cost.per_tuple * rows; + } + } + else if (IS_JOIN_REL(foreignrel)) + { + PgFdwRelationInfo *fpinfo_i; + PgFdwRelationInfo *fpinfo_o; + QualCost join_cost; + QualCost remote_conds_cost; + double nrows; + + /* Use rows/width estimates made by the core code. */ + rows = foreignrel->rows; + width = foreignrel->reltarget->width; + + /* For join we expect inner and outer relations set */ + Assert(fpinfo->innerrel && fpinfo->outerrel); + + fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private; + fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; + + /* Estimate of number of rows in cross product */ + nrows = fpinfo_i->rows * fpinfo_o->rows; + + /* + * Back into an estimate of the number of retrieved rows. Just in + * case this is nuts, clamp to at most nrows. + */ + retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel); + retrieved_rows = Min(retrieved_rows, nrows); + + /* + * The cost of foreign join is estimated as cost of generating + * rows for the joining relations + cost for applying quals on the + * rows. + */ + + /* + * Calculate the cost of clauses pushed down to the foreign server + */ + cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root); + /* Calculate the cost of applying join clauses */ + cost_qual_eval(&join_cost, fpinfo->joinclauses, root); + + /* + * Startup cost includes startup cost of joining relations and the + * startup cost for join and other clauses. We do not include the + * startup cost specific to join strategy (e.g. setting up hash + * tables) since we do not know what strategy the foreign server + * is going to use. + */ + startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost; + startup_cost += join_cost.startup; + startup_cost += remote_conds_cost.startup; + startup_cost += fpinfo->local_conds_cost.startup; + + /* + * Run time cost includes: + * + * 1. Run time cost (total_cost - startup_cost) of relations being + * joined + * + * 2. Run time cost of applying join clauses on the cross product + * of the joining relations. + * + * 3. Run time cost of applying pushed down other clauses on the + * result of join + * + * 4. Run time cost of applying nonpushable other clauses locally + * on the result fetched from the foreign server. + */ + run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost; + run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost; + run_cost += nrows * join_cost.per_tuple; + nrows = clamp_row_est(nrows * fpinfo->joinclause_sel); + run_cost += nrows * remote_conds_cost.per_tuple; + run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; + + /* Add in tlist eval cost for each output row */ + startup_cost += foreignrel->reltarget->cost.startup; + run_cost += foreignrel->reltarget->cost.per_tuple * rows; + } + else if (IS_UPPER_REL(foreignrel)) + { + RelOptInfo *outerrel = fpinfo->outerrel; + PgFdwRelationInfo *ofpinfo; + AggClauseCosts aggcosts; + double input_rows; + int numGroupCols; + double numGroups = 1; + + /* The upper relation should have its outer relation set */ + Assert(outerrel); + /* and that outer relation should have its reltarget set */ + Assert(outerrel->reltarget); + + /* + * This cost model is mixture of costing done for sorted and + * hashed aggregates in cost_agg(). We are not sure which + * strategy will be considered at remote side, thus for + * simplicity, we put all startup related costs in startup_cost + * and all finalization and run cost are added in total_cost. + */ + + ofpinfo = (PgFdwRelationInfo *) outerrel->fdw_private; + + /* Get rows from input rel */ + input_rows = ofpinfo->rows; + + /* Collect statistics about aggregates for estimating costs. */ + MemSet(&aggcosts, 0, sizeof(AggClauseCosts)); + if (root->parse->hasAggs) + { + get_agg_clause_costs(root, AGGSPLIT_SIMPLE, &aggcosts); + } + + /* Get number of grouping columns and possible number of groups */ + numGroupCols = list_length(root->parse->groupClause); + numGroups = estimate_num_groups(root, + get_sortgrouplist_exprs(root->parse->groupClause, + fpinfo->grouped_tlist), + input_rows, NULL, NULL); + + /* + * Get the retrieved_rows and rows estimates. If there are HAVING + * quals, account for their selectivity. + */ + if (root->parse->havingQual) + { + /* Factor in the selectivity of the remotely-checked quals */ + retrieved_rows = + clamp_row_est(numGroups * + clauselist_selectivity(root, + fpinfo->remote_conds, + 0, + JOIN_INNER, + NULL)); + /* Factor in the selectivity of the locally-checked quals */ + rows = clamp_row_est(retrieved_rows * fpinfo->local_conds_sel); + } + else + { + rows = retrieved_rows = numGroups; + } + + /* Use width estimate made by the core code. */ + width = foreignrel->reltarget->width; + + /*----- + * Startup cost includes: + * 1. Startup cost for underneath input relation, adjusted for + * tlist replacement by apply_scanjoin_target_to_paths() + * 2. Cost of performing aggregation, per cost_agg() + *----- + */ + startup_cost = ofpinfo->rel_startup_cost; + startup_cost += outerrel->reltarget->cost.startup; + startup_cost += aggcosts.transCost.startup; + startup_cost += aggcosts.transCost.per_tuple * input_rows; + startup_cost += aggcosts.finalCost.startup; + startup_cost += (cpu_operator_cost * numGroupCols) * input_rows; + + /*----- + * Run time cost includes: + * 1. Run time cost of underneath input relation, adjusted for + * tlist replacement by apply_scanjoin_target_to_paths() + * 2. Run time cost of performing aggregation, per cost_agg() + *----- + */ + run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost; + run_cost += outerrel->reltarget->cost.per_tuple * input_rows; + run_cost += aggcosts.finalCost.per_tuple * numGroups; + run_cost += cpu_tuple_cost * numGroups; + + /* Account for the eval cost of HAVING quals, if any */ + if (root->parse->havingQual) + { + QualCost remote_cost; + + /* Add in the eval cost of the remotely-checked quals */ + cost_qual_eval(&remote_cost, fpinfo->remote_conds, root); + startup_cost += remote_cost.startup; + run_cost += remote_cost.per_tuple * numGroups; + /* Add in the eval cost of the locally-checked quals */ + startup_cost += fpinfo->local_conds_cost.startup; + run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; + } + + /* Add in tlist eval cost for each output row */ + startup_cost += foreignrel->reltarget->cost.startup; + run_cost += foreignrel->reltarget->cost.per_tuple * rows; + } + else + { + Cost cpu_per_tuple; + + /* Use rows/width estimates made by set_baserel_size_estimates. */ + rows = foreignrel->rows; + width = foreignrel->reltarget->width; + + /* + * Back into an estimate of the number of retrieved rows. Just in + * case this is nuts, clamp to at most foreignrel->tuples. + */ + retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel); + retrieved_rows = Min(retrieved_rows, foreignrel->tuples); + + /* + * Cost as though this were a seqscan, which is pessimistic. We + * effectively imagine the local_conds are being evaluated + * remotely, too. + */ + startup_cost = 0; + run_cost = 0; + run_cost += seq_page_cost * foreignrel->pages; + + startup_cost += foreignrel->baserestrictcost.startup; + cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple; + run_cost += cpu_per_tuple * foreignrel->tuples; + + /* Add in tlist eval cost for each output row */ + startup_cost += foreignrel->reltarget->cost.startup; + run_cost += foreignrel->reltarget->cost.per_tuple * rows; + } + + /* + * Without remote estimates, we have no real way to estimate the cost + * of generating sorted output. It could be free if the query plan + * the remote side would have chosen generates properly-sorted output + * anyway, but in most cases it will cost something. Estimate a value + * high enough that we won't pick the sorted path when the ordering + * isn't locally useful, but low enough that we'll err on the side of + * pushing down the ORDER BY clause when it's useful to do so. + */ + if (pathkeys != NIL) + { + if (IS_UPPER_REL(foreignrel)) + { + Assert(foreignrel->reloptkind == RELOPT_UPPER_REL && + fpinfo->stage == UPPERREL_GROUP_AGG); + adjust_foreign_grouping_path_cost(root, pathkeys, + retrieved_rows, width, + fpextra->limit_tuples, + &startup_cost, &run_cost); + } + else + { + startup_cost *= DEFAULT_FDW_SORT_MULTIPLIER; + run_cost *= DEFAULT_FDW_SORT_MULTIPLIER; + } + } + + total_cost = startup_cost + run_cost; + + /* Adjust the cost estimates if we have LIMIT */ + if (fpextra && fpextra->has_limit) + { + adjust_limit_rows_costs(&rows, &startup_cost, &total_cost, + fpextra->offset_est, fpextra->count_est); + retrieved_rows = rows; + } + } + + /* + * If this includes the final sort step, the given target, which will be + * applied to the resulting path, might have different expressions from + * the foreignrel's reltarget (see make_sort_input_target()); adjust tlist + * eval costs. + */ + if (fpextra && fpextra->has_final_sort && + fpextra->target != foreignrel->reltarget) + { + QualCost oldcost = foreignrel->reltarget->cost; + QualCost newcost = fpextra->target->cost; + + startup_cost += newcost.startup - oldcost.startup; + total_cost += newcost.startup - oldcost.startup; + total_cost += (newcost.per_tuple - oldcost.per_tuple) * rows; + } + + /* + * Cache the retrieved rows and cost estimates for scans, joins, or + * groupings without any parameterization, pathkeys, or additional + * post-scan/join-processing steps, before adding the costs for + * transferring data from the foreign server. These estimates are useful + * for costing remote joins involving this relation or costing other + * remote operations on this relation such as remote sorts and remote + * LIMIT restrictions, when the costs can not be obtained from the foreign + * server. This function will be called at least once for every foreign + * relation without any parameterization, pathkeys, or additional + * post-scan/join-processing steps. + */ + if (pathkeys == NIL && param_join_conds == NIL && fpextra == NULL) + { + fpinfo->retrieved_rows = retrieved_rows; + fpinfo->rel_startup_cost = startup_cost; + fpinfo->rel_total_cost = total_cost; + } + + /* + * Add some additional cost factors to account for connection overhead + * (fdw_startup_cost), transferring data across the network + * (fdw_tuple_cost per retrieved row), and local manipulation of the data + * (cpu_tuple_cost per retrieved row). + */ + startup_cost += fpinfo->fdw_startup_cost; + total_cost += fpinfo->fdw_startup_cost; + total_cost += fpinfo->fdw_tuple_cost * retrieved_rows; + total_cost += cpu_tuple_cost * retrieved_rows; + + /* + * If we have LIMIT, we should prefer performing the restriction remotely + * rather than locally, as the former avoids extra row fetches from the + * remote that the latter might cause. But since the core code doesn't + * account for such fetches when estimating the costs of the local + * restriction (see create_limit_path()), there would be no difference + * between the costs of the local restriction and the costs of the remote + * restriction estimated above if we don't use remote estimates (except + * for the case where the foreignrel is a grouping relation, the given + * pathkeys is not NIL, and the effects of a bounded sort for that rel is + * accounted for in costing the remote restriction). Tweak the costs of + * the remote restriction to ensure we'll prefer it if LIMIT is a useful + * one. + */ + if (!fpinfo->use_remote_estimate && + fpextra && fpextra->has_limit && + fpextra->limit_tuples > 0 && + fpextra->limit_tuples < fpinfo->rows) + { + Assert(fpinfo->rows > 0); + total_cost -= (total_cost - startup_cost) * 0.05 * + (fpinfo->rows - fpextra->limit_tuples) / fpinfo->rows; + } + + /* Return results. */ + *p_rows = rows; + *p_width = width; + *p_startup_cost = startup_cost; + *p_total_cost = total_cost; +} + +/* + * Estimate costs of executing a SQL statement remotely. + * The given "sql" must be an EXPLAIN command. + */ +static void +get_remote_estimate(const char *sql, PGconn *conn, + double *rows, int *width, + Cost *startup_cost, Cost *total_cost) +{ + PGresult *volatile res = NULL; + + /* PGresult must be released before leaving this function. */ + PG_TRY(); + { + char *line; + char *p; + int n; + + /* + * Execute EXPLAIN remotely. + */ + res = pgfdw_exec_query(conn, sql, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, sql); + + /* + * Extract cost numbers for topmost plan node. Note we search for a + * left paren from the end of the line to avoid being confused by + * other uses of parentheses. + */ + line = PQgetvalue(res, 0, 0); + p = strrchr(line, '('); + if (p == NULL) + elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); + n = sscanf(p, "(cost=%lf..%lf rows=%lf width=%d)", + startup_cost, total_cost, rows, width); + if (n != 4) + elog(ERROR, "could not interpret EXPLAIN output: \"%s\"", line); + } + PG_FINALLY(); + { + if (res) + PQclear(res); + } + PG_END_TRY(); +} + +/* + * Adjust the cost estimates of a foreign grouping path to include the cost of + * generating properly-sorted output. + */ +static void +adjust_foreign_grouping_path_cost(PlannerInfo *root, + List *pathkeys, + double retrieved_rows, + double width, + double limit_tuples, + Cost *p_startup_cost, + Cost *p_run_cost) +{ + /* + * If the GROUP BY clause isn't sort-able, the plan chosen by the remote + * side is unlikely to generate properly-sorted output, so it would need + * an explicit sort; adjust the given costs with cost_sort(). Likewise, + * if the GROUP BY clause is sort-able but isn't a superset of the given + * pathkeys, adjust the costs with that function. Otherwise, adjust the + * costs by applying the same heuristic as for the scan or join case. + */ + if (!grouping_is_sortable(root->parse->groupClause) || + !pathkeys_contained_in(pathkeys, root->group_pathkeys)) + { + Path sort_path; /* dummy for result of cost_sort */ + + cost_sort(&sort_path, + root, + pathkeys, + *p_startup_cost + *p_run_cost, + retrieved_rows, + width, + 0.0, + work_mem, + limit_tuples); + + *p_startup_cost = sort_path.startup_cost; + *p_run_cost = sort_path.total_cost - sort_path.startup_cost; + } + else + { + /* + * The default extra cost seems too large for foreign-grouping cases; + * add 1/4th of that default. + */ + double sort_multiplier = 1.0 + (DEFAULT_FDW_SORT_MULTIPLIER + - 1.0) * 0.25; + + *p_startup_cost *= sort_multiplier; + *p_run_cost *= sort_multiplier; + } +} + +/* + * Detect whether we want to process an EquivalenceClass member. + * + * This is a callback for use by generate_implied_equalities_for_column. + */ +static bool +ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, + EquivalenceClass *ec, EquivalenceMember *em, + void *arg) +{ + ec_member_foreign_arg *state = (ec_member_foreign_arg *) arg; + Expr *expr = em->em_expr; + + /* + * If we've identified what we're processing in the current scan, we only + * want to match that expression. + */ + if (state->current != NULL) + return equal(expr, state->current); + + /* + * Otherwise, ignore anything we've already processed. + */ + if (list_member(state->already_used, expr)) + return false; + + /* This is the new target to process. */ + state->current = expr; + return true; +} + +/* + * Create cursor for node's query with current parameter values. + */ +static void +create_cursor(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + ExprContext *econtext = node->ss.ps.ps_ExprContext; + int numParams = fsstate->numParams; + const char **values = fsstate->param_values; + PGconn *conn = fsstate->conn; + StringInfoData buf; + PGresult *res; + + /* First, process a pending asynchronous request, if any. */ + if (fsstate->conn_state->pendingAreq) + process_pending_request(fsstate->conn_state->pendingAreq); + + /* + * Construct array of query parameter values in text format. We do the + * conversions in the short-lived per-tuple context, so as not to cause a + * memory leak over repeated scans. + */ + if (numParams > 0) + { + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); + + process_query_params(econtext, + fsstate->param_flinfo, + fsstate->param_exprs, + values); + + MemoryContextSwitchTo(oldcontext); + } + + /* Construct the DECLARE CURSOR command */ + initStringInfo(&buf); + appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", + fsstate->cursor_number, fsstate->query); + + /* + * Notice that we pass NULL for paramTypes, thus forcing the remote server + * to infer types for all parameters. Since we explicitly cast every + * parameter (see deparse.c), the "inference" is trivial and will produce + * the desired result. This allows us to avoid assuming that the remote + * server has the same OIDs we do for the parameters' types. + */ + if (!PQsendQueryParams(conn, buf.data, numParams, + NULL, values, NULL, NULL, 0)) + pgfdw_report_error(ERROR, NULL, conn, false, buf.data); + + /* + * Get the result, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_get_result(conn, buf.data); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, conn, true, fsstate->query); + PQclear(res); + + /* Mark the cursor as created, and show no tuples have been retrieved */ + fsstate->cursor_exists = true; + fsstate->tuples = NULL; + fsstate->num_tuples = 0; + fsstate->next_tuple = 0; + fsstate->fetch_ct_2 = 0; + fsstate->eof_reached = false; + + /* Clean up */ + pfree(buf.data); +} + +/* + * Fetch some more rows from the node's cursor. + */ +static void +fetch_more_data(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PGresult *volatile res = NULL; + MemoryContext oldcontext; + + /* + * We'll store the tuples in the batch_cxt. First, flush the previous + * batch. + */ + fsstate->tuples = NULL; + MemoryContextReset(fsstate->batch_cxt); + oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); + + /* PGresult must be released before leaving this function. */ + PG_TRY(); + { + PGconn *conn = fsstate->conn; + int numrows; + int i; + + if (fsstate->async_capable) + { + Assert(fsstate->conn_state->pendingAreq); + + /* + * The query was already sent by an earlier call to + * fetch_more_data_begin. So now we just fetch the result. + */ + res = pgfdw_get_result(conn, fsstate->query); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + + /* Reset per-connection state */ + fsstate->conn_state->pendingAreq = NULL; + } + else + { + char sql[64]; + + /* This is a regular synchronous fetch. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + res = pgfdw_exec_query(conn, sql, fsstate->conn_state); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + } + + /* Convert the data into HeapTuples */ + numrows = PQntuples(res); + fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); + fsstate->num_tuples = numrows; + fsstate->next_tuple = 0; + + for (i = 0; i < numrows; i++) + { + Assert(IsA(node->ss.ps.plan, ForeignScan)); + + fsstate->tuples[i] = + make_tuple_from_result_row(res, i, + fsstate->rel, + fsstate->attinmeta, + fsstate->retrieved_attrs, + node, + fsstate->temp_cxt); + } + + /* Update fetch_ct_2 */ + if (fsstate->fetch_ct_2 < 2) + fsstate->fetch_ct_2++; + + /* Must be EOF if we didn't get as many tuples as we asked for. */ + fsstate->eof_reached = (numrows < fsstate->fetch_size); + } + PG_FINALLY(); + { + if (res) + PQclear(res); + } + PG_END_TRY(); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Force assorted GUC parameters to settings that ensure that we'll output + * data values in a form that is unambiguous to the remote server. + * + * This is rather expensive and annoying to do once per row, but there's + * little choice if we want to be sure values are transmitted accurately; + * we can't leave the settings in place between rows for fear of affecting + * user-visible computations. + * + * We use the equivalent of a function SET option to allow the settings to + * persist only until the caller calls reset_transmission_modes(). If an + * error is thrown in between, guc.c will take care of undoing the settings. + * + * The return value is the nestlevel that must be passed to + * reset_transmission_modes() to undo things. + */ +int +set_transmission_modes(void) +{ + int nestlevel = NewGUCNestLevel(); + + /* + * The values set here should match what pg_dump does. See also + * configure_remote_session in connection.c. + */ + if (DateStyle != USE_ISO_DATES) + (void) set_config_option("datestyle", "ISO", + PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_SAVE, true, 0, false); + if (IntervalStyle != INTSTYLE_POSTGRES) + (void) set_config_option("intervalstyle", "postgres", + PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_SAVE, true, 0, false); + if (extra_float_digits < 3) + (void) set_config_option("extra_float_digits", "3", + PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_SAVE, true, 0, false); + + /* + * In addition force restrictive search_path, in case there are any + * regproc or similar constants to be printed. + */ + (void) set_config_option("search_path", "pg_catalog", + PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_SAVE, true, 0, false); + + return nestlevel; +} + +/* + * Undo the effects of set_transmission_modes(). + */ +void +reset_transmission_modes(int nestlevel) +{ + AtEOXact_GUC(true, nestlevel); +} + +/* + * Utility routine to close a cursor. + */ +static void +close_cursor(PGconn *conn, unsigned int cursor_number, + PgFdwConnState *conn_state) +{ + char sql[64]; + PGresult *res; + + snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number); + + /* + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_exec_query(conn, sql, conn_state); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, conn, true, sql); + PQclear(res); +} + +/* + * create_foreign_modify + * Construct an execution state of a foreign insert/update/delete + * operation + */ +static PgFdwModifyState * +create_foreign_modify(EState *estate, + RangeTblEntry *rte, + ResultRelInfo *resultRelInfo, + CmdType operation, + Plan *subplan, + char *query, + List *target_attrs, + int values_end, + bool has_returning, + List *retrieved_attrs) +{ + PgFdwModifyState *fmstate; + Relation rel = resultRelInfo->ri_RelationDesc; + TupleDesc tupdesc = RelationGetDescr(rel); + Oid userid; + ForeignTable *table; + UserMapping *user; + AttrNumber n_params; + Oid typefnoid; + bool isvarlena; + ListCell *lc; + + /* Begin constructing PgFdwModifyState. */ + fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState)); + fmstate->rel = rel; + + /* + * Identify which user to do the remote access as. This should match what + * ExecCheckRTEPerms() does. + */ + userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + + /* Get info about foreign table. */ + table = GetForeignTable(RelationGetRelid(rel)); + user = GetUserMapping(userid, table->serverid); + + /* Open connection; report that we'll create a prepared statement. */ + fmstate->conn = GetConnection(user, true, &fmstate->conn_state); + fmstate->p_name = NULL; /* prepared statement not made yet */ + + /* Set up remote query information. */ + fmstate->query = query; + if (operation == CMD_INSERT) + { + fmstate->query = pstrdup(fmstate->query); + fmstate->orig_query = pstrdup(fmstate->query); + } + fmstate->target_attrs = target_attrs; + fmstate->values_end = values_end; + fmstate->has_returning = has_returning; + fmstate->retrieved_attrs = retrieved_attrs; + + /* Create context for per-tuple temp workspace. */ + fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, + "postgres_fdw temporary data", + ALLOCSET_SMALL_SIZES); + + /* Prepare for input conversion of RETURNING results. */ + if (fmstate->has_returning) + fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); + + /* Prepare for output conversion of parameters used in prepared stmt. */ + n_params = list_length(fmstate->target_attrs) + 1; + fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); + fmstate->p_nums = 0; + + if (operation == CMD_UPDATE || operation == CMD_DELETE) + { + Assert(subplan != NULL); + + /* Find the ctid resjunk column in the subplan's result */ + fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, + "ctid"); + if (!AttributeNumberIsValid(fmstate->ctidAttno)) + elog(ERROR, "could not find junk ctid column"); + + /* First transmittable parameter will be ctid */ + getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); + fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); + fmstate->p_nums++; + } + + if (operation == CMD_INSERT || operation == CMD_UPDATE) + { + /* Set up for remaining transmittable parameters */ + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + Assert(!attr->attisdropped); + + /* Ignore generated columns; they are set to DEFAULT */ + if (attr->attgenerated) + continue; + getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); + fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); + fmstate->p_nums++; + } + } + + Assert(fmstate->p_nums <= n_params); + + /* Set batch_size from foreign server/table options. */ + if (operation == CMD_INSERT) + fmstate->batch_size = get_batch_size_option(rel); + + fmstate->num_slots = 1; + + /* Initialize auxiliary state */ + fmstate->aux_fmstate = NULL; + + return fmstate; +} + +/* + * execute_foreign_modify + * Perform foreign-table modification as required, and fetch RETURNING + * result if any. (This is the shared guts of postgresExecForeignInsert, + * postgresExecForeignBatchInsert, postgresExecForeignUpdate, and + * postgresExecForeignDelete.) + */ +static TupleTableSlot ** +execute_foreign_modify(EState *estate, + ResultRelInfo *resultRelInfo, + CmdType operation, + TupleTableSlot **slots, + TupleTableSlot **planSlots, + int *numSlots) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + ItemPointer ctid = NULL; + const char **p_values; + PGresult *res; + int n_rows; + StringInfoData sql; + + /* The operation should be INSERT, UPDATE, or DELETE */ + Assert(operation == CMD_INSERT || + operation == CMD_UPDATE || + operation == CMD_DELETE); + + /* First, process a pending asynchronous request, if any. */ + if (fmstate->conn_state->pendingAreq) + process_pending_request(fmstate->conn_state->pendingAreq); + + /* + * If the existing query was deparsed and prepared for a different number + * of rows, rebuild it for the proper number. + */ + if (operation == CMD_INSERT && fmstate->num_slots != *numSlots) + { + /* Destroy the prepared statement created previously */ + if (fmstate->p_name) + deallocate_query(fmstate); + + /* Build INSERT string with numSlots records in its VALUES clause. */ + initStringInfo(&sql); + rebuildInsertSql(&sql, fmstate->rel, + fmstate->orig_query, fmstate->target_attrs, + fmstate->values_end, fmstate->p_nums, + *numSlots - 1); + pfree(fmstate->query); + fmstate->query = sql.data; + fmstate->num_slots = *numSlots; + } + + /* Set up the prepared statement on the remote server, if we didn't yet */ + if (!fmstate->p_name) + prepare_foreign_modify(fmstate); + + /* + * For UPDATE/DELETE, get the ctid that was passed up as a resjunk column + */ + if (operation == CMD_UPDATE || operation == CMD_DELETE) + { + Datum datum; + bool isNull; + + datum = ExecGetJunkAttribute(planSlots[0], + fmstate->ctidAttno, + &isNull); + /* shouldn't ever get a null result... */ + if (isNull) + elog(ERROR, "ctid is NULL"); + ctid = (ItemPointer) DatumGetPointer(datum); + } + + /* Convert parameters needed by prepared statement to text form */ + p_values = convert_prep_stmt_params(fmstate, ctid, slots, *numSlots); + + /* + * Execute the prepared statement. + */ + if (!PQsendQueryPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums * (*numSlots), + p_values, + NULL, + NULL, + 0)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + + /* + * Get the result, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_get_result(fmstate->conn, fmstate->query); + if (PQresultStatus(res) != + (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + + /* Check number of rows affected, and fetch RETURNING tuple if any */ + if (fmstate->has_returning) + { + Assert(*numSlots == 1); + n_rows = PQntuples(res); + if (n_rows > 0) + store_returning_result(fmstate, slots[0], res); + } + else + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + *numSlots = n_rows; + + /* + * Return NULL if nothing was inserted/updated/deleted on the remote end + */ + return (n_rows > 0) ? slots : NULL; +} + +/* + * prepare_foreign_modify + * Establish a prepared statement for execution of INSERT/UPDATE/DELETE + */ +static void +prepare_foreign_modify(PgFdwModifyState *fmstate) +{ + char prep_name[NAMEDATALEN]; + char *p_name; + PGresult *res; + + /* + * The caller would already have processed a pending asynchronous request + * if any, so no need to do it here. + */ + + /* Construct name we'll use for the prepared statement. */ + snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", + GetPrepStmtNumber(fmstate->conn)); + p_name = pstrdup(prep_name); + + /* + * We intentionally do not specify parameter types here, but leave the + * remote server to derive them by default. This avoids possible problems + * with the remote server using different type OIDs than we do. All of + * the prepared statements we use in this module are simple enough that + * the remote server will make the right choices. + */ + if (!PQsendPrepare(fmstate->conn, + p_name, + fmstate->query, + 0, + NULL)) + pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query); + + /* + * Get the result, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_get_result(fmstate->conn, fmstate->query); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query); + PQclear(res); + + /* This action shows that the prepare has been done. */ + fmstate->p_name = p_name; +} + +/* + * convert_prep_stmt_params + * Create array of text strings representing parameter values + * + * tupleid is ctid to send, or NULL if none + * slot is slot to get remaining parameters from, or NULL if none + * + * Data is constructed in temp_cxt; caller should reset that after use. + */ +static const char ** +convert_prep_stmt_params(PgFdwModifyState *fmstate, + ItemPointer tupleid, + TupleTableSlot **slots, + int numSlots) +{ + const char **p_values; + int i; + int j; + int pindex = 0; + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); + + p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums * numSlots); + + /* ctid is provided only for UPDATE/DELETE, which don't allow batching */ + Assert(!(tupleid != NULL && numSlots > 1)); + + /* 1st parameter should be ctid, if it's in use */ + if (tupleid != NULL) + { + Assert(numSlots == 1); + /* don't need set_transmission_modes for TID output */ + p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], + PointerGetDatum(tupleid)); + pindex++; + } + + /* get following parameters from slots */ + if (slots != NULL && fmstate->target_attrs != NIL) + { + TupleDesc tupdesc = RelationGetDescr(fmstate->rel); + int nestlevel; + ListCell *lc; + + nestlevel = set_transmission_modes(); + + for (i = 0; i < numSlots; i++) + { + j = (tupleid != NULL) ? 1 : 0; + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + Datum value; + bool isnull; + + /* Ignore generated columns; they are set to DEFAULT */ + if (attr->attgenerated) + continue; + value = slot_getattr(slots[i], attnum, &isnull); + if (isnull) + p_values[pindex] = NULL; + else + p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[j], + value); + pindex++; + j++; + } + } + + reset_transmission_modes(nestlevel); + } + + Assert(pindex == fmstate->p_nums * numSlots); + + MemoryContextSwitchTo(oldcontext); + + return p_values; +} + +/* + * store_returning_result + * Store the result of a RETURNING clause + * + * On error, be sure to release the PGresult on the way out. Callers do not + * have PG_TRY blocks to ensure this happens. + */ +static void +store_returning_result(PgFdwModifyState *fmstate, + TupleTableSlot *slot, PGresult *res) +{ + PG_TRY(); + { + HeapTuple newtup; + + newtup = make_tuple_from_result_row(res, 0, + fmstate->rel, + fmstate->attinmeta, + fmstate->retrieved_attrs, + NULL, + fmstate->temp_cxt); + + /* + * The returning slot will not necessarily be suitable to store + * heaptuples directly, so allow for conversion. + */ + ExecForceStoreHeapTuple(newtup, slot, true); + } + PG_CATCH(); + { + if (res) + PQclear(res); + PG_RE_THROW(); + } + PG_END_TRY(); +} + +/* + * finish_foreign_modify + * Release resources for a foreign insert/update/delete operation + */ +static void +finish_foreign_modify(PgFdwModifyState *fmstate) +{ + Assert(fmstate != NULL); + + /* If we created a prepared statement, destroy it */ + deallocate_query(fmstate); + + /* Release remote connection */ + ReleaseConnection(fmstate->conn); + fmstate->conn = NULL; +} + +/* + * deallocate_query + * Deallocate a prepared statement for a foreign insert/update/delete + * operation + */ +static void +deallocate_query(PgFdwModifyState *fmstate) +{ + char sql[64]; + PGresult *res; + + /* do nothing if the query is not allocated */ + if (!fmstate->p_name) + return; + + snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); + + /* + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); + PQclear(res); + pfree(fmstate->p_name); + fmstate->p_name = NULL; +} + +/* + * build_remote_returning + * Build a RETURNING targetlist of a remote query for performing an + * UPDATE/DELETE .. RETURNING on a join directly + */ +static List * +build_remote_returning(Index rtindex, Relation rel, List *returningList) +{ + bool have_wholerow = false; + List *tlist = NIL; + List *vars; + ListCell *lc; + + Assert(returningList); + + vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS); + + /* + * If there's a whole-row reference to the target relation, then we'll + * need all the columns of the relation. + */ + foreach(lc, vars) + { + Var *var = (Var *) lfirst(lc); + + if (IsA(var, Var) && + var->varno == rtindex && + var->varattno == InvalidAttrNumber) + { + have_wholerow = true; + break; + } + } + + if (have_wholerow) + { + TupleDesc tupdesc = RelationGetDescr(rel); + int i; + + for (i = 1; i <= tupdesc->natts; i++) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1); + Var *var; + + /* Ignore dropped attributes. */ + if (attr->attisdropped) + continue; + + var = makeVar(rtindex, + i, + attr->atttypid, + attr->atttypmod, + attr->attcollation, + 0); + + tlist = lappend(tlist, + makeTargetEntry((Expr *) var, + list_length(tlist) + 1, + NULL, + false)); + } + } + + /* Now add any remaining columns to tlist. */ + foreach(lc, vars) + { + Var *var = (Var *) lfirst(lc); + + /* + * No need for whole-row references to the target relation. We don't + * need system columns other than ctid and oid either, since those are + * set locally. + */ + if (IsA(var, Var) && + var->varno == rtindex && + var->varattno <= InvalidAttrNumber && + var->varattno != SelfItemPointerAttributeNumber) + continue; /* don't need it */ + + if (tlist_member((Expr *) var, tlist)) + continue; /* already got it */ + + tlist = lappend(tlist, + makeTargetEntry((Expr *) var, + list_length(tlist) + 1, + NULL, + false)); + } + + list_free(vars); + + return tlist; +} + +/* + * rebuild_fdw_scan_tlist + * Build new fdw_scan_tlist of given foreign-scan plan node from given + * tlist + * + * There might be columns that the fdw_scan_tlist of the given foreign-scan + * plan node contains that the given tlist doesn't. The fdw_scan_tlist would + * have contained resjunk columns such as 'ctid' of the target relation and + * 'wholerow' of non-target relations, but the tlist might not contain them, + * for example. So, adjust the tlist so it contains all the columns specified + * in the fdw_scan_tlist; else setrefs.c will get confused. + */ +static void +rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist) +{ + List *new_tlist = tlist; + List *old_tlist = fscan->fdw_scan_tlist; + ListCell *lc; + + foreach(lc, old_tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + + if (tlist_member(tle->expr, new_tlist)) + continue; /* already got it */ + + new_tlist = lappend(new_tlist, + makeTargetEntry(tle->expr, + list_length(new_tlist) + 1, + NULL, + false)); + } + fscan->fdw_scan_tlist = new_tlist; +} + +/* + * Execute a direct UPDATE/DELETE statement. + */ +static void +execute_dml_stmt(ForeignScanState *node) +{ + PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; + ExprContext *econtext = node->ss.ps.ps_ExprContext; + int numParams = dmstate->numParams; + const char **values = dmstate->param_values; + + /* First, process a pending asynchronous request, if any. */ + if (dmstate->conn_state->pendingAreq) + process_pending_request(dmstate->conn_state->pendingAreq); + + /* + * Construct array of query parameter values in text format. + */ + if (numParams > 0) + process_query_params(econtext, + dmstate->param_flinfo, + dmstate->param_exprs, + values); + + /* + * Notice that we pass NULL for paramTypes, thus forcing the remote server + * to infer types for all parameters. Since we explicitly cast every + * parameter (see deparse.c), the "inference" is trivial and will produce + * the desired result. This allows us to avoid assuming that the remote + * server has the same OIDs we do for the parameters' types. + */ + if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams, + NULL, values, NULL, NULL, 0)) + pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query); + + /* + * Get the result, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query); + if (PQresultStatus(dmstate->result) != + (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true, + dmstate->query); + + /* Get the number of rows affected. */ + if (dmstate->has_returning) + dmstate->num_tuples = PQntuples(dmstate->result); + else + dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result)); +} + +/* + * Get the result of a RETURNING clause. + */ +static TupleTableSlot * +get_returning_data(ForeignScanState *node) +{ + PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; + EState *estate = node->ss.ps.state; + ResultRelInfo *resultRelInfo = node->resultRelInfo; + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + TupleTableSlot *resultSlot; + + Assert(resultRelInfo->ri_projectReturning); + + /* If we didn't get any tuples, must be end of data. */ + if (dmstate->next_tuple >= dmstate->num_tuples) + return ExecClearTuple(slot); + + /* Increment the command es_processed count if necessary. */ + if (dmstate->set_processed) + estate->es_processed += 1; + + /* + * Store a RETURNING tuple. If has_returning is false, just emit a dummy + * tuple. (has_returning is false when the local query is of the form + * "UPDATE/DELETE .. RETURNING 1" for example.) + */ + if (!dmstate->has_returning) + { + ExecStoreAllNullTuple(slot); + resultSlot = slot; + } + else + { + /* + * On error, be sure to release the PGresult on the way out. Callers + * do not have PG_TRY blocks to ensure this happens. + */ + PG_TRY(); + { + HeapTuple newtup; + + newtup = make_tuple_from_result_row(dmstate->result, + dmstate->next_tuple, + dmstate->rel, + dmstate->attinmeta, + dmstate->retrieved_attrs, + node, + dmstate->temp_cxt); + ExecStoreHeapTuple(newtup, slot, false); + } + PG_CATCH(); + { + if (dmstate->result) + PQclear(dmstate->result); + PG_RE_THROW(); + } + PG_END_TRY(); + + /* Get the updated/deleted tuple. */ + if (dmstate->rel) + resultSlot = slot; + else + resultSlot = apply_returning_filter(dmstate, resultRelInfo, slot, estate); + } + dmstate->next_tuple++; + + /* Make slot available for evaluation of the local query RETURNING list. */ + resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = + resultSlot; + + return slot; +} + +/* + * Initialize a filter to extract an updated/deleted tuple from a scan tuple. + */ +static void +init_returning_filter(PgFdwDirectModifyState *dmstate, + List *fdw_scan_tlist, + Index rtindex) +{ + TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel); + ListCell *lc; + int i; + + /* + * Calculate the mapping between the fdw_scan_tlist's entries and the + * result tuple's attributes. + * + * The "map" is an array of indexes of the result tuple's attributes in + * fdw_scan_tlist, i.e., one entry for every attribute of the result + * tuple. We store zero for any attributes that don't have the + * corresponding entries in that list, marking that a NULL is needed in + * the result tuple. + * + * Also get the indexes of the entries for ctid and oid if any. + */ + dmstate->attnoMap = (AttrNumber *) + palloc0(resultTupType->natts * sizeof(AttrNumber)); + + dmstate->ctidAttno = dmstate->oidAttno = 0; + + i = 1; + dmstate->hasSystemCols = false; + foreach(lc, fdw_scan_tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + Var *var = (Var *) tle->expr; + + Assert(IsA(var, Var)); + + /* + * If the Var is a column of the target relation to be retrieved from + * the foreign server, get the index of the entry. + */ + if (var->varno == rtindex && + list_member_int(dmstate->retrieved_attrs, i)) + { + int attrno = var->varattno; + + if (attrno < 0) + { + /* + * We don't retrieve system columns other than ctid and oid. + */ + if (attrno == SelfItemPointerAttributeNumber) + dmstate->ctidAttno = i; + else + Assert(false); + dmstate->hasSystemCols = true; + } + else + { + /* + * We don't retrieve whole-row references to the target + * relation either. + */ + Assert(attrno > 0); + + dmstate->attnoMap[attrno - 1] = i; + } + } + i++; + } +} + +/* + * Extract and return an updated/deleted tuple from a scan tuple. + */ +static TupleTableSlot * +apply_returning_filter(PgFdwDirectModifyState *dmstate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + EState *estate) +{ + TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel); + TupleTableSlot *resultSlot; + Datum *values; + bool *isnull; + Datum *old_values; + bool *old_isnull; + int i; + + /* + * Use the return tuple slot as a place to store the result tuple. + */ + resultSlot = ExecGetReturningSlot(estate, resultRelInfo); + + /* + * Extract all the values of the scan tuple. + */ + slot_getallattrs(slot); + old_values = slot->tts_values; + old_isnull = slot->tts_isnull; + + /* + * Prepare to build the result tuple. + */ + ExecClearTuple(resultSlot); + values = resultSlot->tts_values; + isnull = resultSlot->tts_isnull; + + /* + * Transpose data into proper fields of the result tuple. + */ + for (i = 0; i < resultTupType->natts; i++) + { + int j = dmstate->attnoMap[i]; + + if (j == 0) + { + values[i] = (Datum) 0; + isnull[i] = true; + } + else + { + values[i] = old_values[j - 1]; + isnull[i] = old_isnull[j - 1]; + } + } + + /* + * Build the virtual tuple. + */ + ExecStoreVirtualTuple(resultSlot); + + /* + * If we have any system columns to return, materialize a heap tuple in + * the slot from column values set above and install system columns in + * that tuple. + */ + if (dmstate->hasSystemCols) + { + HeapTuple resultTup = ExecFetchSlotHeapTuple(resultSlot, true, NULL); + + /* ctid */ + if (dmstate->ctidAttno) + { + ItemPointer ctid = NULL; + + ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]); + resultTup->t_self = *ctid; + } + + /* + * And remaining columns + * + * Note: since we currently don't allow the target relation to appear + * on the nullable side of an outer join, any system columns wouldn't + * go to NULL. + * + * Note: no need to care about tableoid here because it will be + * initialized in ExecProcessReturning(). + */ + HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId); + HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId); + HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId); + } + + /* + * And return the result tuple. + */ + return resultSlot; +} + +/* + * Prepare for processing of parameters used in remote query. + */ +static void +prepare_query_params(PlanState *node, + List *fdw_exprs, + int numParams, + FmgrInfo **param_flinfo, + List **param_exprs, + const char ***param_values) +{ + int i; + ListCell *lc; + + Assert(numParams > 0); + + /* Prepare for output conversion of parameters used in remote query. */ + *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams); + + i = 0; + foreach(lc, fdw_exprs) + { + Node *param_expr = (Node *) lfirst(lc); + Oid typefnoid; + bool isvarlena; + + getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena); + fmgr_info(typefnoid, &(*param_flinfo)[i]); + i++; + } + + /* + * Prepare remote-parameter expressions for evaluation. (Note: in + * practice, we expect that all these expressions will be just Params, so + * we could possibly do something more efficient than using the full + * expression-eval machinery for this. But probably there would be little + * benefit, and it'd require postgres_fdw to know more than is desirable + * about Param evaluation.) + */ + *param_exprs = ExecInitExprList(fdw_exprs, node); + + /* Allocate buffer for text form of query parameters. */ + *param_values = (const char **) palloc0(numParams * sizeof(char *)); +} + +/* + * Construct array of query parameter values in text format. + */ +static void +process_query_params(ExprContext *econtext, + FmgrInfo *param_flinfo, + List *param_exprs, + const char **param_values) +{ + int nestlevel; + int i; + ListCell *lc; + + nestlevel = set_transmission_modes(); + + i = 0; + foreach(lc, param_exprs) + { + ExprState *expr_state = (ExprState *) lfirst(lc); + Datum expr_value; + bool isNull; + + /* Evaluate the parameter expression */ + expr_value = ExecEvalExpr(expr_state, econtext, &isNull); + + /* + * Get string representation of each parameter value by invoking + * type-specific output function, unless the value is null. + */ + if (isNull) + param_values[i] = NULL; + else + param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value); + + i++; + } + + reset_transmission_modes(nestlevel); +} + +/* + * postgresAnalyzeForeignTable + * Test whether analyzing this foreign table is supported + */ +static bool +postgresAnalyzeForeignTable(Relation relation, + AcquireSampleRowsFunc *func, + BlockNumber *totalpages) +{ + ForeignTable *table; + UserMapping *user; + PGconn *conn; + StringInfoData sql; + PGresult *volatile res = NULL; + + /* Return the row-analysis function pointer */ + *func = postgresAcquireSampleRowsFunc; + + /* + * Now we have to get the number of pages. It's annoying that the ANALYZE + * API requires us to return that now, because it forces some duplication + * of effort between this routine and postgresAcquireSampleRowsFunc. But + * it's probably not worth redefining that API at this point. + */ + + /* + * Get the connection to use. We do the remote access as the table's + * owner, even if the ANALYZE was started by some other user. + */ + table = GetForeignTable(RelationGetRelid(relation)); + user = GetUserMapping(relation->rd_rel->relowner, table->serverid); + conn = GetConnection(user, false, NULL); + + /* + * Construct command to get page count for relation. + */ + initStringInfo(&sql); + deparseAnalyzeSizeSql(&sql, relation); + + /* In what follows, do not risk leaking any PGresults. */ + PG_TRY(); + { + res = pgfdw_exec_query(conn, sql.data, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, sql.data); + + if (PQntuples(res) != 1 || PQnfields(res) != 1) + elog(ERROR, "unexpected result from deparseAnalyzeSizeSql query"); + *totalpages = strtoul(PQgetvalue(res, 0, 0), NULL, 10); + } + PG_FINALLY(); + { + if (res) + PQclear(res); + } + PG_END_TRY(); + + ReleaseConnection(conn); + + return true; +} + +/* + * Acquire a random sample of rows from foreign table managed by postgres_fdw. + * + * We fetch the whole table from the remote side and pick out some sample rows. + * + * Selected rows are returned in the caller-allocated array rows[], + * which must have at least targrows entries. + * The actual number of rows selected is returned as the function result. + * We also count the total number of rows in the table and return it into + * *totalrows. Note that *totaldeadrows is always set to 0. + * + * Note that the returned list of rows is not always in order by physical + * position in the table. Therefore, correlation estimates derived later + * may be meaningless, but it's OK because we don't use the estimates + * currently (the planner only pays attention to correlation for indexscans). + */ +static int +postgresAcquireSampleRowsFunc(Relation relation, int elevel, + HeapTuple *rows, int targrows, + double *totalrows, + double *totaldeadrows) +{ + PgFdwAnalyzeState astate; + ForeignTable *table; + ForeignServer *server; + UserMapping *user; + PGconn *conn; + unsigned int cursor_number; + StringInfoData sql; + PGresult *volatile res = NULL; + + /* Initialize workspace state */ + astate.rel = relation; + astate.attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(relation)); + + astate.rows = rows; + astate.targrows = targrows; + astate.numrows = 0; + astate.samplerows = 0; + astate.rowstoskip = -1; /* -1 means not set yet */ + reservoir_init_selection_state(&astate.rstate, targrows); + + /* Remember ANALYZE context, and create a per-tuple temp context */ + astate.anl_cxt = CurrentMemoryContext; + astate.temp_cxt = AllocSetContextCreate(CurrentMemoryContext, + "postgres_fdw temporary data", + ALLOCSET_SMALL_SIZES); + + /* + * Get the connection to use. We do the remote access as the table's + * owner, even if the ANALYZE was started by some other user. + */ + table = GetForeignTable(RelationGetRelid(relation)); + server = GetForeignServer(table->serverid); + user = GetUserMapping(relation->rd_rel->relowner, table->serverid); + conn = GetConnection(user, false, NULL); + + /* + * Construct cursor that retrieves whole rows from remote. + */ + cursor_number = GetCursorNumber(conn); + initStringInfo(&sql); + appendStringInfo(&sql, "DECLARE c%u CURSOR FOR ", cursor_number); + deparseAnalyzeSql(&sql, relation, &astate.retrieved_attrs); + + /* In what follows, do not risk leaking any PGresults. */ + PG_TRY(); + { + char fetch_sql[64]; + int fetch_size; + ListCell *lc; + + res = pgfdw_exec_query(conn, sql.data, NULL); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, conn, false, sql.data); + PQclear(res); + res = NULL; + + /* + * Determine the fetch size. The default is arbitrary, but shouldn't + * be enormous. + */ + fetch_size = 100; + foreach(lc, server->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "fetch_size") == 0) + { + (void) parse_int(defGetString(def), &fetch_size, 0, NULL); + break; + } + } + foreach(lc, table->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "fetch_size") == 0) + { + (void) parse_int(defGetString(def), &fetch_size, 0, NULL); + break; + } + } + + /* Construct command to fetch rows from remote. */ + snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u", + fetch_size, cursor_number); + + /* Retrieve and process rows a batch at a time. */ + for (;;) + { + int numrows; + int i; + + /* Allow users to cancel long query */ + CHECK_FOR_INTERRUPTS(); + + /* + * XXX possible future improvement: if rowstoskip is large, we + * could issue a MOVE rather than physically fetching the rows, + * then just adjust rowstoskip and samplerows appropriately. + */ + + /* Fetch some rows */ + res = pgfdw_exec_query(conn, fetch_sql, NULL); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, sql.data); + + /* Process whatever we got. */ + numrows = PQntuples(res); + for (i = 0; i < numrows; i++) + analyze_row_processor(res, i, &astate); + + PQclear(res); + res = NULL; + + /* Must be EOF if we didn't get all the rows requested. */ + if (numrows < fetch_size) + break; + } + + /* Close the cursor, just to be tidy. */ + close_cursor(conn, cursor_number, NULL); + } + PG_CATCH(); + { + if (res) + PQclear(res); + PG_RE_THROW(); + } + PG_END_TRY(); + + ReleaseConnection(conn); + + /* We assume that we have no dead tuple. */ + *totaldeadrows = 0.0; + + /* We've retrieved all living tuples from foreign server. */ + *totalrows = astate.samplerows; + + /* + * Emit some interesting relation info + */ + ereport(elevel, + (errmsg("\"%s\": table contains %.0f rows, %d rows in sample", + RelationGetRelationName(relation), + astate.samplerows, astate.numrows))); + + return astate.numrows; +} + +/* + * Collect sample rows from the result of query. + * - Use all tuples in sample until target # of samples are collected. + * - Subsequently, replace already-sampled tuples randomly. + */ +static void +analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate) +{ + int targrows = astate->targrows; + int pos; /* array index to store tuple in */ + MemoryContext oldcontext; + + /* Always increment sample row counter. */ + astate->samplerows += 1; + + /* + * Determine the slot where this sample row should be stored. Set pos to + * negative value to indicate the row should be skipped. + */ + if (astate->numrows < targrows) + { + /* First targrows rows are always included into the sample */ + pos = astate->numrows++; + } + else + { + /* + * Now we start replacing tuples in the sample until we reach the end + * of the relation. Same algorithm as in acquire_sample_rows in + * analyze.c; see Jeff Vitter's paper. + */ + if (astate->rowstoskip < 0) + astate->rowstoskip = reservoir_get_next_S(&astate->rstate, astate->samplerows, targrows); + + if (astate->rowstoskip <= 0) + { + /* Choose a random reservoir element to replace. */ + pos = (int) (targrows * sampler_random_fract(&astate->rstate.randstate)); + Assert(pos >= 0 && pos < targrows); + heap_freetuple(astate->rows[pos]); + } + else + { + /* Skip this tuple. */ + pos = -1; + } + + astate->rowstoskip -= 1; + } + + if (pos >= 0) + { + /* + * Create sample tuple from current result row, and store it in the + * position determined above. The tuple has to be created in anl_cxt. + */ + oldcontext = MemoryContextSwitchTo(astate->anl_cxt); + + astate->rows[pos] = make_tuple_from_result_row(res, row, + astate->rel, + astate->attinmeta, + astate->retrieved_attrs, + NULL, + astate->temp_cxt); + + MemoryContextSwitchTo(oldcontext); + } +} + +/* + * Import a foreign schema + */ +static List * +postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) +{ + List *commands = NIL; + bool import_collate = true; + bool import_default = false; + bool import_generated = true; + bool import_not_null = true; + ForeignServer *server; + UserMapping *mapping; + PGconn *conn; + StringInfoData buf; + PGresult *volatile res = NULL; + int numrows, + i; + ListCell *lc; + + /* Parse statement options */ + foreach(lc, stmt->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "import_collate") == 0) + import_collate = defGetBoolean(def); + else if (strcmp(def->defname, "import_default") == 0) + import_default = defGetBoolean(def); + else if (strcmp(def->defname, "import_generated") == 0) + import_generated = defGetBoolean(def); + else if (strcmp(def->defname, "import_not_null") == 0) + import_not_null = defGetBoolean(def); + else + ereport(ERROR, + (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("invalid option \"%s\"", def->defname))); + } + + /* + * Get connection to the foreign server. Connection manager will + * establish new connection if necessary. + */ + server = GetForeignServer(serverOid); + mapping = GetUserMapping(GetUserId(), server->serverid); + conn = GetConnection(mapping, false, NULL); + + /* Don't attempt to import collation if remote server hasn't got it */ + if (PQserverVersion(conn) < 90100) + import_collate = false; + + /* Create workspace for strings */ + initStringInfo(&buf); + + /* In what follows, do not risk leaking any PGresults. */ + PG_TRY(); + { + /* Check that the schema really exists */ + appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = "); + deparseStringLiteral(&buf, stmt->remote_schema); + + res = pgfdw_exec_query(conn, buf.data, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, buf.data); + + if (PQntuples(res) != 1) + ereport(ERROR, + (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND), + errmsg("schema \"%s\" is not present on foreign server \"%s\"", + stmt->remote_schema, server->servername))); + + PQclear(res); + res = NULL; + resetStringInfo(&buf); + + /* + * Fetch all table data from this schema, possibly restricted by + * EXCEPT or LIMIT TO. (We don't actually need to pay any attention + * to EXCEPT/LIMIT TO here, because the core code will filter the + * statements we return according to those lists anyway. But it + * should save a few cycles to not process excluded tables in the + * first place.) + * + * Import table data for partitions only when they are explicitly + * specified in LIMIT TO clause. Otherwise ignore them and only + * include the definitions of the root partitioned tables to allow + * access to the complete remote data set locally in the schema + * imported. + * + * Note: because we run the connection with search_path restricted to + * pg_catalog, the format_type() and pg_get_expr() outputs will always + * include a schema name for types/functions in other schemas, which + * is what we want. + */ + appendStringInfoString(&buf, + "SELECT relname, " + " attname, " + " format_type(atttypid, atttypmod), " + " attnotnull, " + " pg_get_expr(adbin, adrelid), "); + + /* Generated columns are supported since Postgres 12 */ + if (PQserverVersion(conn) >= 120000) + appendStringInfoString(&buf, + " attgenerated, "); + else + appendStringInfoString(&buf, + " NULL, "); + + if (import_collate) + appendStringInfoString(&buf, + " collname, " + " collnsp.nspname "); + else + appendStringInfoString(&buf, + " NULL, NULL "); + + appendStringInfoString(&buf, + "FROM pg_class c " + " JOIN pg_namespace n ON " + " relnamespace = n.oid " + " LEFT JOIN pg_attribute a ON " + " attrelid = c.oid AND attnum > 0 " + " AND NOT attisdropped " + " LEFT JOIN pg_attrdef ad ON " + " adrelid = c.oid AND adnum = attnum "); + + if (import_collate) + appendStringInfoString(&buf, + " LEFT JOIN pg_collation coll ON " + " coll.oid = attcollation " + " LEFT JOIN pg_namespace collnsp ON " + " collnsp.oid = collnamespace "); + + appendStringInfoString(&buf, + "WHERE c.relkind IN (" + CppAsString2(RELKIND_RELATION) "," + CppAsString2(RELKIND_VIEW) "," + CppAsString2(RELKIND_FOREIGN_TABLE) "," + CppAsString2(RELKIND_MATVIEW) "," + CppAsString2(RELKIND_PARTITIONED_TABLE) ") " + " AND n.nspname = "); + deparseStringLiteral(&buf, stmt->remote_schema); + + /* Partitions are supported since Postgres 10 */ + if (PQserverVersion(conn) >= 100000 && + stmt->list_type != FDW_IMPORT_SCHEMA_LIMIT_TO) + appendStringInfoString(&buf, " AND NOT c.relispartition "); + + /* Apply restrictions for LIMIT TO and EXCEPT */ + if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO || + stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) + { + bool first_item = true; + + appendStringInfoString(&buf, " AND c.relname "); + if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) + appendStringInfoString(&buf, "NOT "); + appendStringInfoString(&buf, "IN ("); + + /* Append list of table names within IN clause */ + foreach(lc, stmt->table_list) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + + if (first_item) + first_item = false; + else + appendStringInfoString(&buf, ", "); + deparseStringLiteral(&buf, rv->relname); + } + appendStringInfoChar(&buf, ')'); + } + + /* Append ORDER BY at the end of query to ensure output ordering */ + appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum"); + + /* Fetch the data */ + res = pgfdw_exec_query(conn, buf.data, NULL); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, buf.data); + + /* Process results */ + numrows = PQntuples(res); + /* note: incrementation of i happens in inner loop's while() test */ + for (i = 0; i < numrows;) + { + char *tablename = PQgetvalue(res, i, 0); + bool first_item = true; + + resetStringInfo(&buf); + appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n", + quote_identifier(tablename)); + + /* Scan all rows for this table */ + do + { + char *attname; + char *typename; + char *attnotnull; + char *attgenerated; + char *attdefault; + char *collname; + char *collnamespace; + + /* If table has no columns, we'll see nulls here */ + if (PQgetisnull(res, i, 1)) + continue; + + attname = PQgetvalue(res, i, 1); + typename = PQgetvalue(res, i, 2); + attnotnull = PQgetvalue(res, i, 3); + attdefault = PQgetisnull(res, i, 4) ? (char *) NULL : + PQgetvalue(res, i, 4); + attgenerated = PQgetisnull(res, i, 5) ? (char *) NULL : + PQgetvalue(res, i, 5); + collname = PQgetisnull(res, i, 6) ? (char *) NULL : + PQgetvalue(res, i, 6); + collnamespace = PQgetisnull(res, i, 7) ? (char *) NULL : + PQgetvalue(res, i, 7); + + if (first_item) + first_item = false; + else + appendStringInfoString(&buf, ",\n"); + + /* Print column name and type */ + appendStringInfo(&buf, " %s %s", + quote_identifier(attname), + typename); + + /* + * Add column_name option so that renaming the foreign table's + * column doesn't break the association to the underlying + * column. + */ + appendStringInfoString(&buf, " OPTIONS (column_name "); + deparseStringLiteral(&buf, attname); + appendStringInfoChar(&buf, ')'); + + /* Add COLLATE if needed */ + if (import_collate && collname != NULL && collnamespace != NULL) + appendStringInfo(&buf, " COLLATE %s.%s", + quote_identifier(collnamespace), + quote_identifier(collname)); + + /* Add DEFAULT if needed */ + if (import_default && attdefault != NULL && + (!attgenerated || !attgenerated[0])) + appendStringInfo(&buf, " DEFAULT %s", attdefault); + + /* Add GENERATED if needed */ + if (import_generated && attgenerated != NULL && + attgenerated[0] == ATTRIBUTE_GENERATED_STORED) + { + Assert(attdefault != NULL); + appendStringInfo(&buf, + " GENERATED ALWAYS AS (%s) STORED", + attdefault); + } + + /* Add NOT NULL if needed */ + if (import_not_null && attnotnull[0] == 't') + appendStringInfoString(&buf, " NOT NULL"); + } + while (++i < numrows && + strcmp(PQgetvalue(res, i, 0), tablename) == 0); + + /* + * Add server name and table-level options. We specify remote + * schema and table name as options (the latter to ensure that + * renaming the foreign table doesn't break the association). + */ + appendStringInfo(&buf, "\n) SERVER %s\nOPTIONS (", + quote_identifier(server->servername)); + + appendStringInfoString(&buf, "schema_name "); + deparseStringLiteral(&buf, stmt->remote_schema); + appendStringInfoString(&buf, ", table_name "); + deparseStringLiteral(&buf, tablename); + + appendStringInfoString(&buf, ");"); + + commands = lappend(commands, pstrdup(buf.data)); + } + } + PG_FINALLY(); + { + if (res) + PQclear(res); + } + PG_END_TRY(); + + ReleaseConnection(conn); + + return commands; +} + +/* + * Assess whether the join between inner and outer relations can be pushed down + * to the foreign server. As a side effect, save information we obtain in this + * function to PgFdwRelationInfo passed in. + */ +static bool +foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, + RelOptInfo *outerrel, RelOptInfo *innerrel, + JoinPathExtraData *extra) +{ + PgFdwRelationInfo *fpinfo; + PgFdwRelationInfo *fpinfo_o; + PgFdwRelationInfo *fpinfo_i; + ListCell *lc; + List *joinclauses; + + /* + * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins. + * Constructing queries representing SEMI and ANTI joins is hard, hence + * not considered right now. + */ + if (jointype != JOIN_INNER && jointype != JOIN_LEFT && + jointype != JOIN_RIGHT && jointype != JOIN_FULL) + return false; + + /* + * If either of the joining relations is marked as unsafe to pushdown, the + * join can not be pushed down. + */ + fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private; + fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private; + fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private; + if (!fpinfo_o || !fpinfo_o->pushdown_safe || + !fpinfo_i || !fpinfo_i->pushdown_safe) + return false; + + /* + * If joining relations have local conditions, those conditions are + * required to be applied before joining the relations. Hence the join can + * not be pushed down. + */ + if (fpinfo_o->local_conds || fpinfo_i->local_conds) + return false; + + /* + * Merge FDW options. We might be tempted to do this after we have deemed + * the foreign join to be OK. But we must do this beforehand so that we + * know which quals can be evaluated on the foreign server, which might + * depend on shippable_extensions. + */ + fpinfo->server = fpinfo_o->server; + merge_fdw_options(fpinfo, fpinfo_o, fpinfo_i); + + /* + * Separate restrict list into join quals and pushed-down (other) quals. + * + * Join quals belonging to an outer join must all be shippable, else we + * cannot execute the join remotely. Add such quals to 'joinclauses'. + * + * Add other quals to fpinfo->remote_conds if they are shippable, else to + * fpinfo->local_conds. In an inner join it's okay to execute conditions + * either locally or remotely; the same is true for pushed-down conditions + * at an outer join. + * + * Note we might return failure after having already scribbled on + * fpinfo->remote_conds and fpinfo->local_conds. That's okay because we + * won't consult those lists again if we deem the join unshippable. + */ + joinclauses = NIL; + foreach(lc, extra->restrictlist) + { + RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); + bool is_remote_clause = is_foreign_expr(root, joinrel, + rinfo->clause); + + if (IS_OUTER_JOIN(jointype) && + !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids)) + { + if (!is_remote_clause) + return false; + joinclauses = lappend(joinclauses, rinfo); + } + else + { + if (is_remote_clause) + fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo); + else + fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo); + } + } + + /* + * deparseExplicitTargetList() isn't smart enough to handle anything other + * than a Var. In particular, if there's some PlaceHolderVar that would + * need to be evaluated within this join tree (because there's an upper + * reference to a quantity that may go to NULL as a result of an outer + * join), then we can't try to push the join down because we'll fail when + * we get to deparseExplicitTargetList(). However, a PlaceHolderVar that + * needs to be evaluated *at the top* of this join tree is OK, because we + * can do that locally after fetching the results from the remote side. + */ + foreach(lc, root->placeholder_list) + { + PlaceHolderInfo *phinfo = lfirst(lc); + Relids relids; + + /* PlaceHolderInfo refers to parent relids, not child relids. */ + relids = IS_OTHER_REL(joinrel) ? + joinrel->top_parent_relids : joinrel->relids; + + if (bms_is_subset(phinfo->ph_eval_at, relids) && + bms_nonempty_difference(relids, phinfo->ph_eval_at)) + return false; + } + + /* Save the join clauses, for later use. */ + fpinfo->joinclauses = joinclauses; + + fpinfo->outerrel = outerrel; + fpinfo->innerrel = innerrel; + fpinfo->jointype = jointype; + + /* + * By default, both the input relations are not required to be deparsed as + * subqueries, but there might be some relations covered by the input + * relations that are required to be deparsed as subqueries, so save the + * relids of those relations for later use by the deparser. + */ + fpinfo->make_outerrel_subquery = false; + fpinfo->make_innerrel_subquery = false; + Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids)); + Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids)); + fpinfo->lower_subquery_rels = bms_union(fpinfo_o->lower_subquery_rels, + fpinfo_i->lower_subquery_rels); + + /* + * Pull the other remote conditions from the joining relations into join + * clauses or other remote clauses (remote_conds) of this relation + * wherever possible. This avoids building subqueries at every join step. + * + * For an inner join, clauses from both the relations are added to the + * other remote clauses. For LEFT and RIGHT OUTER join, the clauses from + * the outer side are added to remote_conds since those can be evaluated + * after the join is evaluated. The clauses from inner side are added to + * the joinclauses, since they need to be evaluated while constructing the + * join. + * + * For a FULL OUTER JOIN, the other clauses from either relation can not + * be added to the joinclauses or remote_conds, since each relation acts + * as an outer relation for the other. + * + * The joining sides can not have local conditions, thus no need to test + * shippability of the clauses being pulled up. + */ + switch (jointype) + { + case JOIN_INNER: + fpinfo->remote_conds = list_concat(fpinfo->remote_conds, + fpinfo_i->remote_conds); + fpinfo->remote_conds = list_concat(fpinfo->remote_conds, + fpinfo_o->remote_conds); + break; + + case JOIN_LEFT: + fpinfo->joinclauses = list_concat(fpinfo->joinclauses, + fpinfo_i->remote_conds); + fpinfo->remote_conds = list_concat(fpinfo->remote_conds, + fpinfo_o->remote_conds); + break; + + case JOIN_RIGHT: + fpinfo->joinclauses = list_concat(fpinfo->joinclauses, + fpinfo_o->remote_conds); + fpinfo->remote_conds = list_concat(fpinfo->remote_conds, + fpinfo_i->remote_conds); + break; + + case JOIN_FULL: + + /* + * In this case, if any of the input relations has conditions, we + * need to deparse that relation as a subquery so that the + * conditions can be evaluated before the join. Remember it in + * the fpinfo of this relation so that the deparser can take + * appropriate action. Also, save the relids of base relations + * covered by that relation for later use by the deparser. + */ + if (fpinfo_o->remote_conds) + { + fpinfo->make_outerrel_subquery = true; + fpinfo->lower_subquery_rels = + bms_add_members(fpinfo->lower_subquery_rels, + outerrel->relids); + } + if (fpinfo_i->remote_conds) + { + fpinfo->make_innerrel_subquery = true; + fpinfo->lower_subquery_rels = + bms_add_members(fpinfo->lower_subquery_rels, + innerrel->relids); + } + break; + + default: + /* Should not happen, we have just checked this above */ + elog(ERROR, "unsupported join type %d", jointype); + } + + /* + * For an inner join, all restrictions can be treated alike. Treating the + * pushed down conditions as join conditions allows a top level full outer + * join to be deparsed without requiring subqueries. + */ + if (jointype == JOIN_INNER) + { + Assert(!fpinfo->joinclauses); + fpinfo->joinclauses = fpinfo->remote_conds; + fpinfo->remote_conds = NIL; + } + + /* Mark that this join can be pushed down safely */ + fpinfo->pushdown_safe = true; + + /* Get user mapping */ + if (fpinfo->use_remote_estimate) + { + if (fpinfo_o->use_remote_estimate) + fpinfo->user = fpinfo_o->user; + else + fpinfo->user = fpinfo_i->user; + } + else + fpinfo->user = NULL; + + /* + * Set # of retrieved rows and cached relation costs to some negative + * value, so that we can detect when they are set to some sensible values, + * during one (usually the first) of the calls to estimate_path_cost_size. + */ + fpinfo->retrieved_rows = -1; + fpinfo->rel_startup_cost = -1; + fpinfo->rel_total_cost = -1; + + /* + * Set the string describing this join relation to be used in EXPLAIN + * output of corresponding ForeignScan. Note that the decoration we add + * to the base relation names mustn't include any digits, or it'll confuse + * postgresExplainForeignScan. + */ + fpinfo->relation_name = psprintf("(%s) %s JOIN (%s)", + fpinfo_o->relation_name, + get_jointype_name(fpinfo->jointype), + fpinfo_i->relation_name); + + /* + * Set the relation index. This is defined as the position of this + * joinrel in the join_rel_list list plus the length of the rtable list. + * Note that since this joinrel is at the end of the join_rel_list list + * when we are called, we can get the position by list_length. + */ + Assert(fpinfo->relation_index == 0); /* shouldn't be set yet */ + fpinfo->relation_index = + list_length(root->parse->rtable) + list_length(root->join_rel_list); + + return true; +} + +static void +add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, + Path *epq_path) +{ + List *useful_pathkeys_list = NIL; /* List of all pathkeys */ + ListCell *lc; + + useful_pathkeys_list = get_useful_pathkeys_for_relation(root, rel); + + /* + * Before creating sorted paths, arrange for the passed-in EPQ path, if + * any, to return columns needed by the parent ForeignScan node so that + * they will propagate up through Sort nodes injected below, if necessary. + */ + if (epq_path != NULL && useful_pathkeys_list != NIL) + { + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private; + PathTarget *target = copy_pathtarget(epq_path->pathtarget); + + /* Include columns required for evaluating PHVs in the tlist. */ + add_new_columns_to_pathtarget(target, + pull_var_clause((Node *) target->exprs, + PVC_RECURSE_PLACEHOLDERS)); + + /* Include columns required for evaluating the local conditions. */ + foreach(lc, fpinfo->local_conds) + { + RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); + + add_new_columns_to_pathtarget(target, + pull_var_clause((Node *) rinfo->clause, + PVC_RECURSE_PLACEHOLDERS)); + } + + /* + * If we have added any new columns, adjust the tlist of the EPQ path. + * + * Note: the plan created using this path will only be used to execute + * EPQ checks, where accuracy of the plan cost and width estimates + * would not be important, so we do not do set_pathtarget_cost_width() + * for the new pathtarget here. See also postgresGetForeignPlan(). + */ + if (list_length(target->exprs) > list_length(epq_path->pathtarget->exprs)) + { + /* The EPQ path is a join path, so it is projection-capable. */ + Assert(is_projection_capable_path(epq_path)); + + /* + * Use create_projection_path() here, so as to avoid modifying it + * in place. + */ + epq_path = (Path *) create_projection_path(root, + rel, + epq_path, + target); + } + } + + /* Create one path for each set of pathkeys we found above. */ + foreach(lc, useful_pathkeys_list) + { + double rows; + int width; + Cost startup_cost; + Cost total_cost; + List *useful_pathkeys = lfirst(lc); + Path *sorted_epq_path; + + estimate_path_cost_size(root, rel, NIL, useful_pathkeys, NULL, + &rows, &width, &startup_cost, &total_cost); + + /* + * The EPQ path must be at least as well sorted as the path itself, in + * case it gets used as input to a mergejoin. + */ + sorted_epq_path = epq_path; + if (sorted_epq_path != NULL && + !pathkeys_contained_in(useful_pathkeys, + sorted_epq_path->pathkeys)) + sorted_epq_path = (Path *) + create_sort_path(root, + rel, + sorted_epq_path, + useful_pathkeys, + -1.0); + + if (IS_SIMPLE_REL(rel)) + add_path(rel, (Path *) + create_foreignscan_path(root, rel, + NULL, + rows, + startup_cost, + total_cost, + useful_pathkeys, + rel->lateral_relids, + sorted_epq_path, + NIL)); + else + add_path(rel, (Path *) + create_foreign_join_path(root, rel, + NULL, + rows, + startup_cost, + total_cost, + useful_pathkeys, + rel->lateral_relids, + sorted_epq_path, + NIL)); + } +} + +/* + * Parse options from foreign server and apply them to fpinfo. + * + * New options might also require tweaking merge_fdw_options(). + */ +static void +apply_server_options(PgFdwRelationInfo *fpinfo) +{ + ListCell *lc; + + foreach(lc, fpinfo->server->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "use_remote_estimate") == 0) + fpinfo->use_remote_estimate = defGetBoolean(def); + else if (strcmp(def->defname, "fdw_startup_cost") == 0) + (void) parse_real(defGetString(def), &fpinfo->fdw_startup_cost, 0, + NULL); + else if (strcmp(def->defname, "fdw_tuple_cost") == 0) + (void) parse_real(defGetString(def), &fpinfo->fdw_tuple_cost, 0, + NULL); + else if (strcmp(def->defname, "extensions") == 0) + fpinfo->shippable_extensions = + ExtractExtensionList(defGetString(def), false); + else if (strcmp(def->defname, "fetch_size") == 0) + (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL); + else if (strcmp(def->defname, "async_capable") == 0) + fpinfo->async_capable = defGetBoolean(def); + } +} + +/* + * Parse options from foreign table and apply them to fpinfo. + * + * New options might also require tweaking merge_fdw_options(). + */ +static void +apply_table_options(PgFdwRelationInfo *fpinfo) +{ + ListCell *lc; + + foreach(lc, fpinfo->table->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "use_remote_estimate") == 0) + fpinfo->use_remote_estimate = defGetBoolean(def); + else if (strcmp(def->defname, "fetch_size") == 0) + (void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL); + else if (strcmp(def->defname, "async_capable") == 0) + fpinfo->async_capable = defGetBoolean(def); + } +} + +/* + * Merge FDW options from input relations into a new set of options for a join + * or an upper rel. + * + * For a join relation, FDW-specific information about the inner and outer + * relations is provided using fpinfo_i and fpinfo_o. For an upper relation, + * fpinfo_o provides the information for the input relation; fpinfo_i is + * expected to NULL. + */ +static void +merge_fdw_options(PgFdwRelationInfo *fpinfo, + const PgFdwRelationInfo *fpinfo_o, + const PgFdwRelationInfo *fpinfo_i) +{ + /* We must always have fpinfo_o. */ + Assert(fpinfo_o); + + /* fpinfo_i may be NULL, but if present the servers must both match. */ + Assert(!fpinfo_i || + fpinfo_i->server->serverid == fpinfo_o->server->serverid); + + /* + * Copy the server specific FDW options. (For a join, both relations come + * from the same server, so the server options should have the same value + * for both relations.) + */ + fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost; + fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost; + fpinfo->shippable_extensions = fpinfo_o->shippable_extensions; + fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate; + fpinfo->fetch_size = fpinfo_o->fetch_size; + fpinfo->async_capable = fpinfo_o->async_capable; + + /* Merge the table level options from either side of the join. */ + if (fpinfo_i) + { + /* + * We'll prefer to use remote estimates for this join if any table + * from either side of the join is using remote estimates. This is + * most likely going to be preferred since they're already willing to + * pay the price of a round trip to get the remote EXPLAIN. In any + * case it's not entirely clear how we might otherwise handle this + * best. + */ + fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate || + fpinfo_i->use_remote_estimate; + + /* + * Set fetch size to maximum of the joining sides, since we are + * expecting the rows returned by the join to be proportional to the + * relation sizes. + */ + fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size); + + /* + * We'll prefer to consider this join async-capable if any table from + * either side of the join is considered async-capable. This would be + * reasonable because in that case the foreign server would have its + * own resources to scan that table asynchronously, and the join could + * also be computed asynchronously using the resources. + */ + fpinfo->async_capable = fpinfo_o->async_capable || + fpinfo_i->async_capable; + } +} + +/* + * postgresGetForeignJoinPaths + * Add possible ForeignPath to joinrel, if join is safe to push down. + */ +static void +postgresGetForeignJoinPaths(PlannerInfo *root, + RelOptInfo *joinrel, + RelOptInfo *outerrel, + RelOptInfo *innerrel, + JoinType jointype, + JoinPathExtraData *extra) +{ + PgFdwRelationInfo *fpinfo; + ForeignPath *joinpath; + double rows; + int width; + Cost startup_cost; + Cost total_cost; + Path *epq_path; /* Path to create plan to be executed when + * EvalPlanQual gets triggered. */ + + /* + * Skip if this join combination has been considered already. + */ + if (joinrel->fdw_private) + return; + + /* + * This code does not work for joins with lateral references, since those + * must have parameterized paths, which we don't generate yet. + */ + if (!bms_is_empty(joinrel->lateral_relids)) + return; + + /* + * Create unfinished PgFdwRelationInfo entry which is used to indicate + * that the join relation is already considered, so that we won't waste + * time in judging safety of join pushdown and adding the same paths again + * if found safe. Once we know that this join can be pushed down, we fill + * the entry. + */ + fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); + fpinfo->pushdown_safe = false; + joinrel->fdw_private = fpinfo; + /* attrs_used is only for base relations. */ + fpinfo->attrs_used = NULL; + + /* + * If there is a possibility that EvalPlanQual will be executed, we need + * to be able to reconstruct the row using scans of the base relations. + * GetExistingLocalJoinPath will find a suitable path for this purpose in + * the path list of the joinrel, if one exists. We must be careful to + * call it before adding any ForeignPath, since the ForeignPath might + * dominate the only suitable local path available. We also do it before + * calling foreign_join_ok(), since that function updates fpinfo and marks + * it as pushable if the join is found to be pushable. + */ + if (root->parse->commandType == CMD_DELETE || + root->parse->commandType == CMD_UPDATE || + root->rowMarks) + { + epq_path = GetExistingLocalJoinPath(joinrel); + if (!epq_path) + { + elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found"); + return; + } + } + else + epq_path = NULL; + + if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra)) + { + /* Free path required for EPQ if we copied one; we don't need it now */ + if (epq_path) + pfree(epq_path); + return; + } + + /* + * Compute the selectivity and cost of the local_conds, so we don't have + * to do it over again for each path. The best we can do for these + * conditions is to estimate selectivity on the basis of local statistics. + * The local conditions are applied after the join has been computed on + * the remote side like quals in WHERE clause, so pass jointype as + * JOIN_INNER. + */ + fpinfo->local_conds_sel = clauselist_selectivity(root, + fpinfo->local_conds, + 0, + JOIN_INNER, + NULL); + cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); + + /* + * If we are going to estimate costs locally, estimate the join clause + * selectivity here while we have special join info. + */ + if (!fpinfo->use_remote_estimate) + fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses, + 0, fpinfo->jointype, + extra->sjinfo); + + /* Estimate costs for bare join relation */ + estimate_path_cost_size(root, joinrel, NIL, NIL, NULL, + &rows, &width, &startup_cost, &total_cost); + /* Now update this information in the joinrel */ + joinrel->rows = rows; + joinrel->reltarget->width = width; + fpinfo->rows = rows; + fpinfo->width = width; + fpinfo->startup_cost = startup_cost; + fpinfo->total_cost = total_cost; + + /* + * Create a new join path and add it to the joinrel which represents a + * join between foreign tables. + */ + joinpath = create_foreign_join_path(root, + joinrel, + NULL, /* default pathtarget */ + rows, + startup_cost, + total_cost, + NIL, /* no pathkeys */ + joinrel->lateral_relids, + epq_path, + NIL); /* no fdw_private */ + + /* Add generated path into joinrel by add_path(). */ + add_path(joinrel, (Path *) joinpath); + + /* Consider pathkeys for the join relation */ + add_paths_with_pathkeys_for_rel(root, joinrel, epq_path); + + /* XXX Consider parameterized paths for the join relation */ +} + +/* + * Assess whether the aggregation, grouping and having operations can be pushed + * down to the foreign server. As a side effect, save information we obtain in + * this function to PgFdwRelationInfo of the input relation. + */ +static bool +foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, + Node *havingQual) +{ + Query *query = root->parse; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private; + PathTarget *grouping_target = grouped_rel->reltarget; + PgFdwRelationInfo *ofpinfo; + ListCell *lc; + int i; + List *tlist = NIL; + + /* We currently don't support pushing Grouping Sets. */ + if (query->groupingSets) + return false; + + /* Get the fpinfo of the underlying scan relation. */ + ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; + + /* + * If underlying scan relation has any local conditions, those conditions + * are required to be applied before performing aggregation. Hence the + * aggregate cannot be pushed down. + */ + if (ofpinfo->local_conds) + return false; + + /* + * Examine grouping expressions, as well as other expressions we'd need to + * compute, and check whether they are safe to push down to the foreign + * server. All GROUP BY expressions will be part of the grouping target + * and thus there is no need to search for them separately. Add grouping + * expressions into target list which will be passed to foreign server. + * + * A tricky fine point is that we must not put any expression into the + * target list that is just a foreign param (that is, something that + * deparse.c would conclude has to be sent to the foreign server). If we + * do, the expression will also appear in the fdw_exprs list of the plan + * node, and setrefs.c will get confused and decide that the fdw_exprs + * entry is actually a reference to the fdw_scan_tlist entry, resulting in + * a broken plan. Somewhat oddly, it's OK if the expression contains such + * a node, as long as it's not at top level; then no match is possible. + */ + i = 0; + foreach(lc, grouping_target->exprs) + { + Expr *expr = (Expr *) lfirst(lc); + Index sgref = get_pathtarget_sortgroupref(grouping_target, i); + ListCell *l; + + /* Check whether this expression is part of GROUP BY clause */ + if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause)) + { + TargetEntry *tle; + + /* + * If any GROUP BY expression is not shippable, then we cannot + * push down aggregation to the foreign server. + */ + if (!is_foreign_expr(root, grouped_rel, expr)) + return false; + + /* + * If it would be a foreign param, we can't put it into the tlist, + * so we have to fail. + */ + if (is_foreign_param(root, grouped_rel, expr)) + return false; + + /* + * Pushable, so add to tlist. We need to create a TLE for this + * expression and apply the sortgroupref to it. We cannot use + * add_to_flat_tlist() here because that avoids making duplicate + * entries in the tlist. If there are duplicate entries with + * distinct sortgrouprefs, we have to duplicate that situation in + * the output tlist. + */ + tle = makeTargetEntry(expr, list_length(tlist) + 1, NULL, false); + tle->ressortgroupref = sgref; + tlist = lappend(tlist, tle); + } + else + { + /* + * Non-grouping expression we need to compute. Can we ship it + * as-is to the foreign server? + */ + if (is_foreign_expr(root, grouped_rel, expr) && + !is_foreign_param(root, grouped_rel, expr)) + { + /* Yes, so add to tlist as-is; OK to suppress duplicates */ + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + else + { + /* Not pushable as a whole; extract its Vars and aggregates */ + List *aggvars; + + aggvars = pull_var_clause((Node *) expr, + PVC_INCLUDE_AGGREGATES); + + /* + * If any aggregate expression is not shippable, then we + * cannot push down aggregation to the foreign server. (We + * don't have to check is_foreign_param, since that certainly + * won't return true for any such expression.) + */ + if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars)) + return false; + + /* + * Add aggregates, if any, into the targetlist. Plain Vars + * outside an aggregate can be ignored, because they should be + * either same as some GROUP BY column or part of some GROUP + * BY expression. In either case, they are already part of + * the targetlist and thus no need to add them again. In fact + * including plain Vars in the tlist when they do not match a + * GROUP BY column would cause the foreign server to complain + * that the shipped query is invalid. + */ + foreach(l, aggvars) + { + Expr *expr = (Expr *) lfirst(l); + + if (IsA(expr, Aggref)) + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + } + } + + i++; + } + + /* + * Classify the pushable and non-pushable HAVING clauses and save them in + * remote_conds and local_conds of the grouped rel's fpinfo. + */ + if (havingQual) + { + ListCell *lc; + + foreach(lc, (List *) havingQual) + { + Expr *expr = (Expr *) lfirst(lc); + RestrictInfo *rinfo; + + /* + * Currently, the core code doesn't wrap havingQuals in + * RestrictInfos, so we must make our own. + */ + Assert(!IsA(expr, RestrictInfo)); + rinfo = make_restrictinfo(root, + expr, + true, + false, + false, + root->qual_security_level, + grouped_rel->relids, + NULL, + NULL); + if (is_foreign_expr(root, grouped_rel, expr)) + fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo); + else + fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo); + } + } + + /* + * If there are any local conditions, pull Vars and aggregates from it and + * check whether they are safe to pushdown or not. + */ + if (fpinfo->local_conds) + { + List *aggvars = NIL; + ListCell *lc; + + foreach(lc, fpinfo->local_conds) + { + RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc); + + aggvars = list_concat(aggvars, + pull_var_clause((Node *) rinfo->clause, + PVC_INCLUDE_AGGREGATES)); + } + + foreach(lc, aggvars) + { + Expr *expr = (Expr *) lfirst(lc); + + /* + * If aggregates within local conditions are not safe to push + * down, then we cannot push down the query. Vars are already + * part of GROUP BY clause which are checked above, so no need to + * access them again here. Again, we need not check + * is_foreign_param for a foreign aggregate. + */ + if (IsA(expr, Aggref)) + { + if (!is_foreign_expr(root, grouped_rel, expr)) + return false; + + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + } + } + + /* Store generated targetlist */ + fpinfo->grouped_tlist = tlist; + + /* Safe to pushdown */ + fpinfo->pushdown_safe = true; + + /* + * Set # of retrieved rows and cached relation costs to some negative + * value, so that we can detect when they are set to some sensible values, + * during one (usually the first) of the calls to estimate_path_cost_size. + */ + fpinfo->retrieved_rows = -1; + fpinfo->rel_startup_cost = -1; + fpinfo->rel_total_cost = -1; + + /* + * Set the string describing this grouped relation to be used in EXPLAIN + * output of corresponding ForeignScan. Note that the decoration we add + * to the base relation name mustn't include any digits, or it'll confuse + * postgresExplainForeignScan. + */ + fpinfo->relation_name = psprintf("Aggregate on (%s)", + ofpinfo->relation_name); + + return true; +} + +/* + * postgresGetForeignUpperPaths + * Add paths for post-join operations like aggregation, grouping etc. if + * corresponding operations are safe to push down. + */ +static void +postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, + RelOptInfo *input_rel, RelOptInfo *output_rel, + void *extra) +{ + PgFdwRelationInfo *fpinfo; + + /* + * If input rel is not safe to pushdown, then simply return as we cannot + * perform any post-join operations on the foreign server. + */ + if (!input_rel->fdw_private || + !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe) + return; + + /* Ignore stages we don't support; and skip any duplicate calls. */ + if ((stage != UPPERREL_GROUP_AGG && + stage != UPPERREL_ORDERED && + stage != UPPERREL_FINAL) || + output_rel->fdw_private) + return; + + fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); + fpinfo->pushdown_safe = false; + fpinfo->stage = stage; + output_rel->fdw_private = fpinfo; + + switch (stage) + { + case UPPERREL_GROUP_AGG: + add_foreign_grouping_paths(root, input_rel, output_rel, + (GroupPathExtraData *) extra); + break; + case UPPERREL_ORDERED: + add_foreign_ordered_paths(root, input_rel, output_rel); + break; + case UPPERREL_FINAL: + add_foreign_final_paths(root, input_rel, output_rel, + (FinalPathExtraData *) extra); + break; + default: + elog(ERROR, "unexpected upper relation: %d", (int) stage); + break; + } +} + +/* + * add_foreign_grouping_paths + * Add foreign path for grouping and/or aggregation. + * + * Given input_rel represents the underlying scan. The paths are added to the + * given grouped_rel. + */ +static void +add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, + RelOptInfo *grouped_rel, + GroupPathExtraData *extra) +{ + Query *parse = root->parse; + PgFdwRelationInfo *ifpinfo = input_rel->fdw_private; + PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private; + ForeignPath *grouppath; + double rows; + int width; + Cost startup_cost; + Cost total_cost; + + /* Nothing to be done, if there is no grouping or aggregation required. */ + if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs && + !root->hasHavingQual) + return; + + Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE || + extra->patype == PARTITIONWISE_AGGREGATE_FULL); + + /* save the input_rel as outerrel in fpinfo */ + fpinfo->outerrel = input_rel; + + /* + * Copy foreign table, foreign server, user mapping, FDW options etc. + * details from the input relation's fpinfo. + */ + fpinfo->table = ifpinfo->table; + fpinfo->server = ifpinfo->server; + fpinfo->user = ifpinfo->user; + merge_fdw_options(fpinfo, ifpinfo, NULL); + + /* + * Assess if it is safe to push down aggregation and grouping. + * + * Use HAVING qual from extra. In case of child partition, it will have + * translated Vars. + */ + if (!foreign_grouping_ok(root, grouped_rel, extra->havingQual)) + return; + + /* + * Compute the selectivity and cost of the local_conds, so we don't have + * to do it over again for each path. (Currently we create just a single + * path here, but in future it would be possible that we build more paths + * such as pre-sorted paths as in postgresGetForeignPaths and + * postgresGetForeignJoinPaths.) The best we can do for these conditions + * is to estimate selectivity on the basis of local statistics. + */ + fpinfo->local_conds_sel = clauselist_selectivity(root, + fpinfo->local_conds, + 0, + JOIN_INNER, + NULL); + + cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); + + /* Estimate the cost of push down */ + estimate_path_cost_size(root, grouped_rel, NIL, NIL, NULL, + &rows, &width, &startup_cost, &total_cost); + + /* Now update this information in the fpinfo */ + fpinfo->rows = rows; + fpinfo->width = width; + fpinfo->startup_cost = startup_cost; + fpinfo->total_cost = total_cost; + + /* Create and add foreign path to the grouping relation. */ + grouppath = create_foreign_upper_path(root, + grouped_rel, + grouped_rel->reltarget, + rows, + startup_cost, + total_cost, + NIL, /* no pathkeys */ + NULL, + NIL); /* no fdw_private */ + + /* Add generated path into grouped_rel by add_path(). */ + add_path(grouped_rel, (Path *) grouppath); +} + +/* + * add_foreign_ordered_paths + * Add foreign paths for performing the final sort remotely. + * + * Given input_rel contains the source-data Paths. The paths are added to the + * given ordered_rel. + */ +static void +add_foreign_ordered_paths(PlannerInfo *root, RelOptInfo *input_rel, + RelOptInfo *ordered_rel) +{ + Query *parse = root->parse; + PgFdwRelationInfo *ifpinfo = input_rel->fdw_private; + PgFdwRelationInfo *fpinfo = ordered_rel->fdw_private; + PgFdwPathExtraData *fpextra; + double rows; + int width; + Cost startup_cost; + Cost total_cost; + List *fdw_private; + ForeignPath *ordered_path; + ListCell *lc; + + /* Shouldn't get here unless the query has ORDER BY */ + Assert(parse->sortClause); + + /* We don't support cases where there are any SRFs in the targetlist */ + if (parse->hasTargetSRFs) + return; + + /* Save the input_rel as outerrel in fpinfo */ + fpinfo->outerrel = input_rel; + + /* + * Copy foreign table, foreign server, user mapping, FDW options etc. + * details from the input relation's fpinfo. + */ + fpinfo->table = ifpinfo->table; + fpinfo->server = ifpinfo->server; + fpinfo->user = ifpinfo->user; + merge_fdw_options(fpinfo, ifpinfo, NULL); + + /* + * If the input_rel is a base or join relation, we would already have + * considered pushing down the final sort to the remote server when + * creating pre-sorted foreign paths for that relation, because the + * query_pathkeys is set to the root->sort_pathkeys in that case (see + * standard_qp_callback()). + */ + if (input_rel->reloptkind == RELOPT_BASEREL || + input_rel->reloptkind == RELOPT_JOINREL) + { + Assert(root->query_pathkeys == root->sort_pathkeys); + + /* Safe to push down if the query_pathkeys is safe to push down */ + fpinfo->pushdown_safe = ifpinfo->qp_is_pushdown_safe; + + return; + } + + /* The input_rel should be a grouping relation */ + Assert(input_rel->reloptkind == RELOPT_UPPER_REL && + ifpinfo->stage == UPPERREL_GROUP_AGG); + + /* + * We try to create a path below by extending a simple foreign path for + * the underlying grouping relation to perform the final sort remotely, + * which is stored into the fdw_private list of the resulting path. + */ + + /* Assess if it is safe to push down the final sort */ + foreach(lc, root->sort_pathkeys) + { + PathKey *pathkey = (PathKey *) lfirst(lc); + EquivalenceClass *pathkey_ec = pathkey->pk_eclass; + + /* + * is_foreign_expr would detect volatile expressions as well, but + * checking ec_has_volatile here saves some cycles. + */ + if (pathkey_ec->ec_has_volatile) + return; + + /* + * Can't push down the sort if pathkey's opfamily is not shippable. + */ + if (!is_shippable(pathkey->pk_opfamily, OperatorFamilyRelationId, + fpinfo)) + return; + + /* + * The EC must contain a shippable EM that is computed in input_rel's + * reltarget, else we can't push down the sort. + */ + if (find_em_for_rel_target(root, + pathkey_ec, + input_rel) == NULL) + return; + } + + /* Safe to push down */ + fpinfo->pushdown_safe = true; + + /* Construct PgFdwPathExtraData */ + fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData)); + fpextra->target = root->upper_targets[UPPERREL_ORDERED]; + fpextra->has_final_sort = true; + + /* Estimate the costs of performing the final sort remotely */ + estimate_path_cost_size(root, input_rel, NIL, root->sort_pathkeys, fpextra, + &rows, &width, &startup_cost, &total_cost); + + /* + * Build the fdw_private list that will be used by postgresGetForeignPlan. + * Items in the list must match order in enum FdwPathPrivateIndex. + */ + fdw_private = list_make2(makeBoolean(true), makeBoolean(false)); + + /* Create foreign ordering path */ + ordered_path = create_foreign_upper_path(root, + input_rel, + root->upper_targets[UPPERREL_ORDERED], + rows, + startup_cost, + total_cost, + root->sort_pathkeys, + NULL, /* no extra plan */ + fdw_private); + + /* and add it to the ordered_rel */ + add_path(ordered_rel, (Path *) ordered_path); +} + +/* + * add_foreign_final_paths + * Add foreign paths for performing the final processing remotely. + * + * Given input_rel contains the source-data Paths. The paths are added to the + * given final_rel. + */ +static void +add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel, + RelOptInfo *final_rel, + FinalPathExtraData *extra) +{ + Query *parse = root->parse; + PgFdwRelationInfo *ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) final_rel->fdw_private; + bool has_final_sort = false; + List *pathkeys = NIL; + PgFdwPathExtraData *fpextra; + bool save_use_remote_estimate = false; + double rows; + int width; + Cost startup_cost; + Cost total_cost; + List *fdw_private; + ForeignPath *final_path; + + /* + * Currently, we only support this for SELECT commands + */ + if (parse->commandType != CMD_SELECT) + return; + + /* + * No work if there is no FOR UPDATE/SHARE clause and if there is no need + * to add a LIMIT node + */ + if (!parse->rowMarks && !extra->limit_needed) + return; + + /* We don't support cases where there are any SRFs in the targetlist */ + if (parse->hasTargetSRFs) + return; + + /* Save the input_rel as outerrel in fpinfo */ + fpinfo->outerrel = input_rel; + + /* + * Copy foreign table, foreign server, user mapping, FDW options etc. + * details from the input relation's fpinfo. + */ + fpinfo->table = ifpinfo->table; + fpinfo->server = ifpinfo->server; + fpinfo->user = ifpinfo->user; + merge_fdw_options(fpinfo, ifpinfo, NULL); + + /* + * If there is no need to add a LIMIT node, there might be a ForeignPath + * in the input_rel's pathlist that implements all behavior of the query. + * Note: we would already have accounted for the query's FOR UPDATE/SHARE + * (if any) before we get here. + */ + if (!extra->limit_needed) + { + ListCell *lc; + + Assert(parse->rowMarks); + + /* + * Grouping and aggregation are not supported with FOR UPDATE/SHARE, + * so the input_rel should be a base, join, or ordered relation; and + * if it's an ordered relation, its input relation should be a base or + * join relation. + */ + Assert(input_rel->reloptkind == RELOPT_BASEREL || + input_rel->reloptkind == RELOPT_JOINREL || + (input_rel->reloptkind == RELOPT_UPPER_REL && + ifpinfo->stage == UPPERREL_ORDERED && + (ifpinfo->outerrel->reloptkind == RELOPT_BASEREL || + ifpinfo->outerrel->reloptkind == RELOPT_JOINREL))); + + foreach(lc, input_rel->pathlist) + { + Path *path = (Path *) lfirst(lc); + + /* + * apply_scanjoin_target_to_paths() uses create_projection_path() + * to adjust each of its input paths if needed, whereas + * create_ordered_paths() uses apply_projection_to_path() to do + * that. So the former might have put a ProjectionPath on top of + * the ForeignPath; look through ProjectionPath and see if the + * path underneath it is ForeignPath. + */ + if (IsA(path, ForeignPath) || + (IsA(path, ProjectionPath) && + IsA(((ProjectionPath *) path)->subpath, ForeignPath))) + { + /* + * Create foreign final path; this gets rid of a + * no-longer-needed outer plan (if any), which makes the + * EXPLAIN output look cleaner + */ + final_path = create_foreign_upper_path(root, + path->parent, + path->pathtarget, + path->rows, + path->startup_cost, + path->total_cost, + path->pathkeys, + NULL, /* no extra plan */ + NULL); /* no fdw_private */ + + /* and add it to the final_rel */ + add_path(final_rel, (Path *) final_path); + + /* Safe to push down */ + fpinfo->pushdown_safe = true; + + return; + } + } + + /* + * If we get here it means no ForeignPaths; since we would already + * have considered pushing down all operations for the query to the + * remote server, give up on it. + */ + return; + } + + Assert(extra->limit_needed); + + /* + * If the input_rel is an ordered relation, replace the input_rel with its + * input relation + */ + if (input_rel->reloptkind == RELOPT_UPPER_REL && + ifpinfo->stage == UPPERREL_ORDERED) + { + input_rel = ifpinfo->outerrel; + ifpinfo = (PgFdwRelationInfo *) input_rel->fdw_private; + has_final_sort = true; + pathkeys = root->sort_pathkeys; + } + + /* The input_rel should be a base, join, or grouping relation */ + Assert(input_rel->reloptkind == RELOPT_BASEREL || + input_rel->reloptkind == RELOPT_JOINREL || + (input_rel->reloptkind == RELOPT_UPPER_REL && + ifpinfo->stage == UPPERREL_GROUP_AGG)); + + /* + * We try to create a path below by extending a simple foreign path for + * the underlying base, join, or grouping relation to perform the final + * sort (if has_final_sort) and the LIMIT restriction remotely, which is + * stored into the fdw_private list of the resulting path. (We + * re-estimate the costs of sorting the underlying relation, if + * has_final_sort.) + */ + + /* + * Assess if it is safe to push down the LIMIT and OFFSET to the remote + * server + */ + + /* + * If the underlying relation has any local conditions, the LIMIT/OFFSET + * cannot be pushed down. + */ + if (ifpinfo->local_conds) + return; + + /* + * Also, the LIMIT/OFFSET cannot be pushed down, if their expressions are + * not safe to remote. + */ + if (!is_foreign_expr(root, input_rel, (Expr *) parse->limitOffset) || + !is_foreign_expr(root, input_rel, (Expr *) parse->limitCount)) + return; + + /* Safe to push down */ + fpinfo->pushdown_safe = true; + + /* Construct PgFdwPathExtraData */ + fpextra = (PgFdwPathExtraData *) palloc0(sizeof(PgFdwPathExtraData)); + fpextra->target = root->upper_targets[UPPERREL_FINAL]; + fpextra->has_final_sort = has_final_sort; + fpextra->has_limit = extra->limit_needed; + fpextra->limit_tuples = extra->limit_tuples; + fpextra->count_est = extra->count_est; + fpextra->offset_est = extra->offset_est; + + /* + * Estimate the costs of performing the final sort and the LIMIT + * restriction remotely. If has_final_sort is false, we wouldn't need to + * execute EXPLAIN anymore if use_remote_estimate, since the costs can be + * roughly estimated using the costs we already have for the underlying + * relation, in the same way as when use_remote_estimate is false. Since + * it's pretty expensive to execute EXPLAIN, force use_remote_estimate to + * false in that case. + */ + if (!fpextra->has_final_sort) + { + save_use_remote_estimate = ifpinfo->use_remote_estimate; + ifpinfo->use_remote_estimate = false; + } + estimate_path_cost_size(root, input_rel, NIL, pathkeys, fpextra, + &rows, &width, &startup_cost, &total_cost); + if (!fpextra->has_final_sort) + ifpinfo->use_remote_estimate = save_use_remote_estimate; + + /* + * Build the fdw_private list that will be used by postgresGetForeignPlan. + * Items in the list must match order in enum FdwPathPrivateIndex. + */ + fdw_private = list_make2(makeBoolean(has_final_sort), + makeBoolean(extra->limit_needed)); + + /* + * Create foreign final path; this gets rid of a no-longer-needed outer + * plan (if any), which makes the EXPLAIN output look cleaner + */ + final_path = create_foreign_upper_path(root, + input_rel, + root->upper_targets[UPPERREL_FINAL], + rows, + startup_cost, + total_cost, + pathkeys, + NULL, /* no extra plan */ + fdw_private); + + /* and add it to the final_rel */ + add_path(final_rel, (Path *) final_path); +} + +/* + * postgresIsForeignPathAsyncCapable + * Check whether a given ForeignPath node is async-capable. + */ +static bool +postgresIsForeignPathAsyncCapable(ForeignPath *path) +{ + RelOptInfo *rel = ((Path *) path)->parent; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private; + + return fpinfo->async_capable; +} + +/* + * postgresForeignAsyncRequest + * Asynchronously request next tuple from a foreign PostgreSQL table. + */ +static void +postgresForeignAsyncRequest(AsyncRequest *areq) +{ + produce_tuple_asynchronously(areq, true); +} + +/* + * postgresForeignAsyncConfigureWait + * Configure a file descriptor event for which we wish to wait. + */ +static void +postgresForeignAsyncConfigureWait(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; + AppendState *requestor = (AppendState *) areq->requestor; + WaitEventSet *set = requestor->as_eventset; + + /* This should not be called unless callback_pending */ + Assert(areq->callback_pending); + + /* + * If process_pending_request() has been invoked on the given request + * before we get here, we might have some tuples already; in which case + * complete the request + */ + if (fsstate->next_tuple < fsstate->num_tuples) + { + complete_pending_request(areq); + if (areq->request_complete) + return; + Assert(areq->callback_pending); + } + + /* We must have run out of tuples */ + Assert(fsstate->next_tuple >= fsstate->num_tuples); + + /* The core code would have registered postmaster death event */ + Assert(GetNumRegisteredWaitEvents(set) >= 1); + + /* Begin an asynchronous data fetch if not already done */ + if (!pendingAreq) + fetch_more_data_begin(areq); + else if (pendingAreq->requestor != areq->requestor) + { + /* + * This is the case when the in-process request was made by another + * Append. Note that it might be useless to process the request, + * because the query might not need tuples from that Append anymore. + * If there are any child subplans of the same parent that are ready + * for new requests, skip the given request. Likewise, if there are + * any configured events other than the postmaster death event, skip + * it. Otherwise, process the in-process request, then begin a fetch + * to configure the event below, because we might otherwise end up + * with no configured events other than the postmaster death event. + */ + if (!bms_is_empty(requestor->as_needrequest)) + return; + if (GetNumRegisteredWaitEvents(set) > 1) + return; + process_pending_request(pendingAreq); + fetch_more_data_begin(areq); + } + else if (pendingAreq->requestee != areq->requestee) + { + /* + * This is the case when the in-process request was made by the same + * parent but for a different child. Since we configure only the + * event for the request made for that child, skip the given request. + */ + return; + } + else + Assert(pendingAreq == areq); + + AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn), + NULL, areq); +} + +/* + * postgresForeignAsyncNotify + * Fetch some more tuples from a file descriptor that becomes ready, + * requesting next tuple. + */ +static void +postgresForeignAsyncNotify(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + + /* The core code would have initialized the callback_pending flag */ + Assert(!areq->callback_pending); + + /* + * If process_pending_request() has been invoked on the given request + * before we get here, we might have some tuples already; in which case + * produce the next tuple + */ + if (fsstate->next_tuple < fsstate->num_tuples) + { + produce_tuple_asynchronously(areq, true); + return; + } + + /* We must have run out of tuples */ + Assert(fsstate->next_tuple >= fsstate->num_tuples); + + /* The request should be currently in-process */ + Assert(fsstate->conn_state->pendingAreq == areq); + + /* On error, report the original query, not the FETCH. */ + if (!PQconsumeInput(fsstate->conn)) + pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); + + fetch_more_data(node); + + produce_tuple_asynchronously(areq, true); +} + +/* + * Asynchronously produce next tuple from a foreign PostgreSQL table. + */ +static void +produce_tuple_asynchronously(AsyncRequest *areq, bool fetch) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; + TupleTableSlot *result; + + /* This should not be called if the request is currently in-process */ + Assert(areq != pendingAreq); + + /* Fetch some more tuples, if we've run out */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + /* No point in another fetch if we already detected EOF, though */ + if (!fsstate->eof_reached) + { + /* Mark the request as pending for a callback */ + ExecAsyncRequestPending(areq); + /* Begin another fetch if requested and if no pending request */ + if (fetch && !pendingAreq) + fetch_more_data_begin(areq); + } + else + { + /* There's nothing more to do; just return a NULL pointer */ + result = NULL; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + } + return; + } + + /* Get a tuple from the ForeignScan node */ + result = areq->requestee->ExecProcNodeReal(areq->requestee); + if (!TupIsNull(result)) + { + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + return; + } + + /* We must have run out of tuples */ + Assert(fsstate->next_tuple >= fsstate->num_tuples); + + /* Fetch some more tuples, if we've not detected EOF yet */ + if (!fsstate->eof_reached) + { + /* Mark the request as pending for a callback */ + ExecAsyncRequestPending(areq); + /* Begin another fetch if requested and if no pending request */ + if (fetch && !pendingAreq) + fetch_more_data_begin(areq); + } + else + { + /* There's nothing more to do; just return a NULL pointer */ + result = NULL; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + } +} + +/* + * Begin an asynchronous data fetch. + * + * Note: this function assumes there is no currently-in-progress asynchronous + * data fetch. + * + * Note: fetch_more_data must be called to fetch the result. + */ +static void +fetch_more_data_begin(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + char sql[64]; + + Assert(!fsstate->conn_state->pendingAreq); + + /* Create the cursor synchronously. */ + if (!fsstate->cursor_exists) + create_cursor(node); + + /* We will send this query, but not wait for the response. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + if (!PQsendQuery(fsstate->conn, sql)) + pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query); + + /* Remember that the request is in process */ + fsstate->conn_state->pendingAreq = areq; +} + +/* + * Process a pending asynchronous request. + */ +void +process_pending_request(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + + /* The request would have been pending for a callback */ + Assert(areq->callback_pending); + + /* The request should be currently in-process */ + Assert(fsstate->conn_state->pendingAreq == areq); + + fetch_more_data(node); + + /* + * If we didn't get any tuples, must be end of data; complete the request + * now. Otherwise, we postpone completing the request until we are called + * from postgresForeignAsyncConfigureWait()/postgresForeignAsyncNotify(). + */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + /* Unlike AsyncNotify, we unset callback_pending ourselves */ + areq->callback_pending = false; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, NULL); + /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ + ExecAsyncResponse(areq); + } +} + +/* + * Complete a pending asynchronous request. + */ +static void +complete_pending_request(AsyncRequest *areq) +{ + /* The request would have been pending for a callback */ + Assert(areq->callback_pending); + + /* Unlike AsyncNotify, we unset callback_pending ourselves */ + areq->callback_pending = false; + + /* We begin a fetch afterwards if necessary; don't fetch */ + produce_tuple_asynchronously(areq, false); + + /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */ + ExecAsyncResponse(areq); + + /* Also, we do instrumentation ourselves, if required */ + if (areq->requestee->instrument) + InstrUpdateTupleCount(areq->requestee->instrument, + TupIsNull(areq->result) ? 0.0 : 1.0); +} + +/* + * Create a tuple from the specified row of the PGresult. + * + * rel is the local representation of the foreign table, attinmeta is + * conversion data for the rel's tupdesc, and retrieved_attrs is an + * integer list of the table column numbers present in the PGresult. + * fsstate is the ForeignScan plan node's execution state. + * temp_context is a working context that can be reset after each tuple. + * + * Note: either rel or fsstate, but not both, can be NULL. rel is NULL + * if we're processing a remote join, while fsstate is NULL in a non-query + * context such as ANALYZE, or if we're processing a non-scan query node. + */ +static HeapTuple +make_tuple_from_result_row(PGresult *res, + int row, + Relation rel, + AttInMetadata *attinmeta, + List *retrieved_attrs, + ForeignScanState *fsstate, + MemoryContext temp_context) +{ + HeapTuple tuple; + TupleDesc tupdesc; + Datum *values; + bool *nulls; + ItemPointer ctid = NULL; + ConversionLocation errpos; + ErrorContextCallback errcallback; + MemoryContext oldcontext; + ListCell *lc; + int j; + + Assert(row < PQntuples(res)); + + /* + * Do the following work in a temp context that we reset after each tuple. + * This cleans up not only the data we have direct access to, but any + * cruft the I/O functions might leak. + */ + oldcontext = MemoryContextSwitchTo(temp_context); + + /* + * Get the tuple descriptor for the row. Use the rel's tupdesc if rel is + * provided, otherwise look to the scan node's ScanTupleSlot. + */ + if (rel) + tupdesc = RelationGetDescr(rel); + else + { + Assert(fsstate); + tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; + } + + values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum)); + nulls = (bool *) palloc(tupdesc->natts * sizeof(bool)); + /* Initialize to nulls for any columns not present in result */ + memset(nulls, true, tupdesc->natts * sizeof(bool)); + + /* + * Set up and install callback to report where conversion error occurs. + */ + errpos.cur_attno = 0; + errpos.rel = rel; + errpos.fsstate = fsstate; + errcallback.callback = conversion_error_callback; + errcallback.arg = (void *) &errpos; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* + * i indexes columns in the relation, j indexes columns in the PGresult. + */ + j = 0; + foreach(lc, retrieved_attrs) + { + int i = lfirst_int(lc); + char *valstr; + + /* fetch next column's textual value */ + if (PQgetisnull(res, row, j)) + valstr = NULL; + else + valstr = PQgetvalue(res, row, j); + + /* + * convert value to internal representation + * + * Note: we ignore system columns other than ctid and oid in result + */ + errpos.cur_attno = i; + if (i > 0) + { + /* ordinary column */ + Assert(i <= tupdesc->natts); + nulls[i - 1] = (valstr == NULL); + /* Apply the input function even to nulls, to support domains */ + values[i - 1] = InputFunctionCall(&attinmeta->attinfuncs[i - 1], + valstr, + attinmeta->attioparams[i - 1], + attinmeta->atttypmods[i - 1]); + } + else if (i == SelfItemPointerAttributeNumber) + { + /* ctid */ + if (valstr != NULL) + { + Datum datum; + + datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr)); + ctid = (ItemPointer) DatumGetPointer(datum); + } + } + errpos.cur_attno = 0; + + j++; + } + + /* Uninstall error context callback. */ + error_context_stack = errcallback.previous; + + /* + * Check we got the expected number of columns. Note: j == 0 and + * PQnfields == 1 is expected, since deparse emits a NULL if no columns. + */ + if (j > 0 && j != PQnfields(res)) + elog(ERROR, "remote query result does not match the foreign table"); + + /* + * Build the result tuple in caller's memory context. + */ + MemoryContextSwitchTo(oldcontext); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + /* + * If we have a CTID to return, install it in both t_self and t_ctid. + * t_self is the normal place, but if the tuple is converted to a + * composite Datum, t_self will be lost; setting t_ctid allows CTID to be + * preserved during EvalPlanQual re-evaluations (see ROW_MARK_COPY code). + */ + if (ctid) + tuple->t_self = tuple->t_data->t_ctid = *ctid; + + /* + * Stomp on the xmin, xmax, and cmin fields from the tuple created by + * heap_form_tuple. heap_form_tuple actually creates the tuple with + * DatumTupleFields, not HeapTupleFields, but the executor expects + * HeapTupleFields and will happily extract system columns on that + * assumption. If we don't do this then, for example, the tuple length + * ends up in the xmin field, which isn't what we want. + */ + HeapTupleHeaderSetXmax(tuple->t_data, InvalidTransactionId); + HeapTupleHeaderSetXmin(tuple->t_data, InvalidTransactionId); + HeapTupleHeaderSetCmin(tuple->t_data, InvalidTransactionId); + + /* Clean up */ + MemoryContextReset(temp_context); + + return tuple; +} + +/* + * Callback function which is called when error occurs during column value + * conversion. Print names of column and relation. + * + * Note that this function mustn't do any catalog lookups, since we are in + * an already-failed transaction. Fortunately, we can get the needed info + * from the relation or the query's rangetable instead. + */ +static void +conversion_error_callback(void *arg) +{ + ConversionLocation *errpos = (ConversionLocation *) arg; + Relation rel = errpos->rel; + ForeignScanState *fsstate = errpos->fsstate; + const char *attname = NULL; + const char *relname = NULL; + bool is_wholerow = false; + + /* + * If we're in a scan node, always use aliases from the rangetable, for + * consistency between the simple-relation and remote-join cases. Look at + * the relation's tupdesc only if we're not in a scan node. + */ + if (fsstate) + { + /* ForeignScan case */ + ForeignScan *fsplan = castNode(ForeignScan, fsstate->ss.ps.plan); + int varno = 0; + AttrNumber colno = 0; + + if (fsplan->scan.scanrelid > 0) + { + /* error occurred in a scan against a foreign table */ + varno = fsplan->scan.scanrelid; + colno = errpos->cur_attno; + } + else + { + /* error occurred in a scan against a foreign join */ + TargetEntry *tle; + + tle = list_nth_node(TargetEntry, fsplan->fdw_scan_tlist, + errpos->cur_attno - 1); + + /* + * Target list can have Vars and expressions. For Vars, we can + * get some information, however for expressions we can't. Thus + * for expressions, just show generic context message. + */ + if (IsA(tle->expr, Var)) + { + Var *var = (Var *) tle->expr; + + varno = var->varno; + colno = var->varattno; + } + } + + if (varno > 0) + { + EState *estate = fsstate->ss.ps.state; + RangeTblEntry *rte = exec_rt_fetch(varno, estate); + + relname = rte->eref->aliasname; + + if (colno == 0) + is_wholerow = true; + else if (colno > 0 && colno <= list_length(rte->eref->colnames)) + attname = strVal(list_nth(rte->eref->colnames, colno - 1)); + else if (colno == SelfItemPointerAttributeNumber) + attname = "ctid"; + } + } + else if (rel) + { + /* Non-ForeignScan case (we should always have a rel here) */ + TupleDesc tupdesc = RelationGetDescr(rel); + + relname = RelationGetRelationName(rel); + if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, + errpos->cur_attno - 1); + + attname = NameStr(attr->attname); + } + else if (errpos->cur_attno == SelfItemPointerAttributeNumber) + attname = "ctid"; + } + + if (relname && is_wholerow) + errcontext("whole-row reference to foreign table \"%s\"", relname); + else if (relname && attname) + errcontext("column \"%s\" of foreign table \"%s\"", attname, relname); + else + errcontext("processing expression at position %d in select list", + errpos->cur_attno); +} + +/* + * Given an EquivalenceClass and a foreign relation, find an EC member + * that can be used to sort the relation remotely according to a pathkey + * using this EC. + * + * If there is more than one suitable candidate, return an arbitrary + * one of them. If there is none, return NULL. + * + * This checks that the EC member expression uses only Vars from the given + * rel and is shippable. Caller must separately verify that the pathkey's + * ordering operator is shippable. + */ +EquivalenceMember * +find_em_for_rel(PlannerInfo *root, EquivalenceClass *ec, RelOptInfo *rel) +{ + ListCell *lc; + + foreach(lc, ec->ec_members) + { + EquivalenceMember *em = (EquivalenceMember *) lfirst(lc); + + /* + * Note we require !bms_is_empty, else we'd accept constant + * expressions which are not suitable for the purpose. + */ + if (bms_is_subset(em->em_relids, rel->relids) && + !bms_is_empty(em->em_relids) && + is_foreign_expr(root, rel, em->em_expr)) + return em; + } + + return NULL; +} + +/* + * Find an EquivalenceClass member that is to be computed as a sort column + * in the given rel's reltarget, and is shippable. + * + * If there is more than one suitable candidate, return an arbitrary + * one of them. If there is none, return NULL. + * + * This checks that the EC member expression uses only Vars from the given + * rel and is shippable. Caller must separately verify that the pathkey's + * ordering operator is shippable. + */ +EquivalenceMember * +find_em_for_rel_target(PlannerInfo *root, EquivalenceClass *ec, + RelOptInfo *rel) +{ + PathTarget *target = rel->reltarget; + ListCell *lc1; + int i; + + i = 0; + foreach(lc1, target->exprs) + { + Expr *expr = (Expr *) lfirst(lc1); + Index sgref = get_pathtarget_sortgroupref(target, i); + ListCell *lc2; + + /* Ignore non-sort expressions */ + if (sgref == 0 || + get_sortgroupref_clause_noerr(sgref, + root->parse->sortClause) == NULL) + { + i++; + continue; + } + + /* We ignore binary-compatible relabeling on both ends */ + while (expr && IsA(expr, RelabelType)) + expr = ((RelabelType *) expr)->arg; + + /* Locate an EquivalenceClass member matching this expr, if any */ + foreach(lc2, ec->ec_members) + { + EquivalenceMember *em = (EquivalenceMember *) lfirst(lc2); + Expr *em_expr; + + /* Don't match constants */ + if (em->em_is_const) + continue; + + /* Ignore child members */ + if (em->em_is_child) + continue; + + /* Match if same expression (after stripping relabel) */ + em_expr = em->em_expr; + while (em_expr && IsA(em_expr, RelabelType)) + em_expr = ((RelabelType *) em_expr)->arg; + + if (!equal(em_expr, expr)) + continue; + + /* Check that expression (including relabels!) is shippable */ + if (is_foreign_expr(root, rel, em->em_expr)) + return em; + } + + i++; + } + + return NULL; +} + +/* + * Determine batch size for a given foreign table. The option specified for + * a table has precedence. + */ +static int +get_batch_size_option(Relation rel) +{ + Oid foreigntableid = RelationGetRelid(rel); + ForeignTable *table; + ForeignServer *server; + List *options; + ListCell *lc; + + /* we use 1 by default, which means "no batching" */ + int batch_size = 1; + + /* + * Load options for table and server. We append server options after table + * options, because table options take precedence. + */ + table = GetForeignTable(foreigntableid); + server = GetForeignServer(table->serverid); + + options = NIL; + options = list_concat(options, table->options); + options = list_concat(options, server->options); + + /* See if either table or server specifies batch_size. */ + foreach(lc, options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "batch_size") == 0) + { + (void) parse_int(defGetString(def), &batch_size, 0, NULL); + break; + } + } + + return batch_size; +} |