/* * interface to SPI functions * * src/pl/plpython/plpy_spi.c */ #include "postgres.h" #include #include "access/htup_details.h" #include "access/xact.h" #include "catalog/pg_type.h" #include "executor/spi.h" #include "mb/pg_wchar.h" #include "parser/parse_type.h" #include "plpy_elog.h" #include "plpy_main.h" #include "plpy_planobject.h" #include "plpy_plpymodule.h" #include "plpy_procedure.h" #include "plpy_resultobject.h" #include "plpy_spi.h" #include "plpython.h" #include "utils/memutils.h" #include "utils/syscache.h" static PyObject *PLy_spi_execute_query(char *query, long limit); static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status); static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata); /* prepare(query="select * from foo") * prepare(query="select * from foo where bar = $1", params=["text"]) * prepare(query="select * from foo where bar = $1", params=["text"], limit=5) */ PyObject * PLy_spi_prepare(PyObject *self, PyObject *args) { PLyPlanObject *plan; PyObject *list = NULL; PyObject *volatile optr = NULL; char *query; PLyExecutionContext *exec_ctx = PLy_current_execution_context(); volatile MemoryContext oldcontext; volatile ResourceOwner oldowner; volatile int nargs; if (!PyArg_ParseTuple(args, "s|O:prepare", &query, &list)) return NULL; if (list && (!PySequence_Check(list))) { PLy_exception_set(PyExc_TypeError, "second argument of plpy.prepare must be a sequence"); return NULL; } if ((plan = (PLyPlanObject *) PLy_plan_new()) == NULL) return NULL; plan->mcxt = AllocSetContextCreate(TopMemoryContext, "PL/Python plan context", ALLOCSET_DEFAULT_SIZES); oldcontext = MemoryContextSwitchTo(plan->mcxt); nargs = list ? PySequence_Length(list) : 0; plan->nargs = nargs; plan->types = nargs ? palloc0(sizeof(Oid) * nargs) : NULL; plan->values = nargs ? palloc0(sizeof(Datum) * nargs) : NULL; plan->args = nargs ? palloc0(sizeof(PLyObToDatum) * nargs) : NULL; MemoryContextSwitchTo(oldcontext); oldcontext = CurrentMemoryContext; oldowner = CurrentResourceOwner; PLy_spi_subtransaction_begin(oldcontext, oldowner); PG_TRY(); { int i; for (i = 0; i < nargs; i++) { char *sptr; Oid typeId; int32 typmod; optr = PySequence_GetItem(list, i); if (PyUnicode_Check(optr)) sptr = PLyUnicode_AsString(optr); else { ereport(ERROR, (errmsg("plpy.prepare: type name at ordinal position %d is not a string", i))); sptr = NULL; /* keep compiler quiet */ } /******************************************************** * Resolve argument type names and then look them up by * oid in the system cache, and remember the required *information for input conversion. ********************************************************/ (void) parseTypeString(sptr, &typeId, &typmod, NULL); Py_DECREF(optr); /* * set optr to NULL, so we won't try to unref it again in case of * an error */ optr = NULL; plan->types[i] = typeId; PLy_output_setup_func(&plan->args[i], plan->mcxt, typeId, typmod, exec_ctx->curr_proc); } pg_verifymbstr(query, strlen(query), false); plan->plan = SPI_prepare(query, plan->nargs, plan->types); if (plan->plan == NULL) elog(ERROR, "SPI_prepare failed: %s", SPI_result_code_string(SPI_result)); /* transfer plan from procCxt to topCxt */ if (SPI_keepplan(plan->plan)) elog(ERROR, "SPI_keepplan failed"); PLy_spi_subtransaction_commit(oldcontext, oldowner); } PG_CATCH(); { Py_DECREF(plan); Py_XDECREF(optr); PLy_spi_subtransaction_abort(oldcontext, oldowner); return NULL; } PG_END_TRY(); Assert(plan->plan != NULL); return (PyObject *) plan; } /* execute(query="select * from foo", limit=5) * execute(plan=plan, values=(foo, bar), limit=5) */ PyObject * PLy_spi_execute(PyObject *self, PyObject *args) { char *query; PyObject *plan; PyObject *list = NULL; long limit = 0; if (PyArg_ParseTuple(args, "s|l", &query, &limit)) return PLy_spi_execute_query(query, limit); PyErr_Clear(); if (PyArg_ParseTuple(args, "O|Ol", &plan, &list, &limit) && is_PLyPlanObject(plan)) return PLy_spi_execute_plan(plan, list, limit); PLy_exception_set(PLy_exc_error, "plpy.execute expected a query or a plan"); return NULL; } PyObject * PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit) { volatile int nargs; int i, rv; PLyPlanObject *plan; volatile MemoryContext oldcontext; volatile ResourceOwner oldowner; PyObject *ret; if (list != NULL) { if (!PySequence_Check(list) || PyUnicode_Check(list)) { PLy_exception_set(PyExc_TypeError, "plpy.execute takes a sequence as its second argument"); return NULL; } nargs = PySequence_Length(list); } else nargs = 0; plan = (PLyPlanObject *) ob; if (nargs != plan->nargs) { char *sv; PyObject *so = PyObject_Str(list); if (!so) PLy_elog(ERROR, "could not execute plan"); sv = PLyUnicode_AsString(so); PLy_exception_set_plural(PyExc_TypeError, "Expected sequence of %d argument, got %d: %s", "Expected sequence of %d arguments, got %d: %s", plan->nargs, plan->nargs, nargs, sv); Py_DECREF(so); return NULL; } oldcontext = CurrentMemoryContext; oldowner = CurrentResourceOwner; PLy_spi_subtransaction_begin(oldcontext, oldowner); PG_TRY(); { PLyExecutionContext *exec_ctx = PLy_current_execution_context(); char *volatile nulls; volatile int j; if (nargs > 0) nulls = palloc(nargs * sizeof(char)); else nulls = NULL; for (j = 0; j < nargs; j++) { PLyObToDatum *arg = &plan->args[j]; PyObject *elem; elem = PySequence_GetItem(list, j); PG_TRY(2); { bool isnull; plan->values[j] = PLy_output_convert(arg, elem, &isnull); nulls[j] = isnull ? 'n' : ' '; } PG_FINALLY(2); { Py_DECREF(elem); } PG_END_TRY(2); } rv = SPI_execute_plan(plan->plan, plan->values, nulls, exec_ctx->curr_proc->fn_readonly, limit); ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv); if (nargs > 0) pfree(nulls); PLy_spi_subtransaction_commit(oldcontext, oldowner); } PG_CATCH(); { int k; /* * cleanup plan->values array */ for (k = 0; k < nargs; k++) { if (!plan->args[k].typbyval && (plan->values[k] != PointerGetDatum(NULL))) { pfree(DatumGetPointer(plan->values[k])); plan->values[k] = PointerGetDatum(NULL); } } PLy_spi_subtransaction_abort(oldcontext, oldowner); return NULL; } PG_END_TRY(); for (i = 0; i < nargs; i++) { if (!plan->args[i].typbyval && (plan->values[i] != PointerGetDatum(NULL))) { pfree(DatumGetPointer(plan->values[i])); plan->values[i] = PointerGetDatum(NULL); } } if (rv < 0) { PLy_exception_set(PLy_exc_spi_error, "SPI_execute_plan failed: %s", SPI_result_code_string(rv)); return NULL; } return ret; } static PyObject * PLy_spi_execute_query(char *query, long limit) { int rv; volatile MemoryContext oldcontext; volatile ResourceOwner oldowner; PyObject *ret = NULL; oldcontext = CurrentMemoryContext; oldowner = CurrentResourceOwner; PLy_spi_subtransaction_begin(oldcontext, oldowner); PG_TRY(); { PLyExecutionContext *exec_ctx = PLy_current_execution_context(); pg_verifymbstr(query, strlen(query), false); rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit); ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv); PLy_spi_subtransaction_commit(oldcontext, oldowner); } PG_CATCH(); { PLy_spi_subtransaction_abort(oldcontext, oldowner); return NULL; } PG_END_TRY(); if (rv < 0) { Py_XDECREF(ret); PLy_exception_set(PLy_exc_spi_error, "SPI_execute failed: %s", SPI_result_code_string(rv)); return NULL; } return ret; } static PyObject * PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status) { PLyResultObject *result; PLyExecutionContext *exec_ctx = PLy_current_execution_context(); volatile MemoryContext oldcontext; result = (PLyResultObject *) PLy_result_new(); if (!result) { SPI_freetuptable(tuptable); return NULL; } Py_DECREF(result->status); result->status = PyLong_FromLong(status); if (status > 0 && tuptable == NULL) { Py_DECREF(result->nrows); result->nrows = PyLong_FromUnsignedLongLong(rows); } else if (status > 0 && tuptable != NULL) { PLyDatumToOb ininfo; MemoryContext cxt; Py_DECREF(result->nrows); result->nrows = PyLong_FromUnsignedLongLong(rows); cxt = AllocSetContextCreate(CurrentMemoryContext, "PL/Python temp context", ALLOCSET_DEFAULT_SIZES); /* Initialize for converting result tuples to Python */ PLy_input_setup_func(&ininfo, cxt, RECORDOID, -1, exec_ctx->curr_proc); oldcontext = CurrentMemoryContext; PG_TRY(); { MemoryContext oldcontext2; if (rows) { uint64 i; /* * PyList_New() and PyList_SetItem() use Py_ssize_t for list * size and list indices; so we cannot support a result larger * than PY_SSIZE_T_MAX. */ if (rows > (uint64) PY_SSIZE_T_MAX) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("query result has too many rows to fit in a Python list"))); Py_DECREF(result->rows); result->rows = PyList_New(rows); if (result->rows) { PLy_input_setup_tuple(&ininfo, tuptable->tupdesc, exec_ctx->curr_proc); for (i = 0; i < rows; i++) { PyObject *row = PLy_input_from_tuple(&ininfo, tuptable->vals[i], tuptable->tupdesc, true); PyList_SetItem(result->rows, i, row); } } } /* * Save tuple descriptor for later use by result set metadata * functions. Save it in TopMemoryContext so that it survives * outside of an SPI context. We trust that PLy_result_dealloc() * will clean it up when the time is right. (Do this as late as * possible, to minimize the number of ways the tupdesc could get * leaked due to errors.) */ oldcontext2 = MemoryContextSwitchTo(TopMemoryContext); result->tupdesc = CreateTupleDescCopy(tuptable->tupdesc); MemoryContextSwitchTo(oldcontext2); } PG_CATCH(); { MemoryContextSwitchTo(oldcontext); MemoryContextDelete(cxt); Py_DECREF(result); PG_RE_THROW(); } PG_END_TRY(); MemoryContextDelete(cxt); SPI_freetuptable(tuptable); /* in case PyList_New() failed above */ if (!result->rows) { Py_DECREF(result); result = NULL; } } return (PyObject *) result; } PyObject * PLy_commit(PyObject *self, PyObject *args) { MemoryContext oldcontext = CurrentMemoryContext; PLyExecutionContext *exec_ctx = PLy_current_execution_context(); PG_TRY(); { SPI_commit(); /* was cleared at transaction end, reset pointer */ exec_ctx->scratch_ctx = NULL; } PG_CATCH(); { ErrorData *edata; PLyExceptionEntry *entry; PyObject *exc; /* Save error info */ MemoryContextSwitchTo(oldcontext); edata = CopyErrorData(); FlushErrorState(); /* was cleared at transaction end, reset pointer */ exec_ctx->scratch_ctx = NULL; /* Look up the correct exception */ entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode), HASH_FIND, NULL); /* * This could be a custom error code, if that's the case fallback to * SPIError */ exc = entry ? entry->exc : PLy_exc_spi_error; /* Make Python raise the exception */ PLy_spi_exception_set(exc, edata); FreeErrorData(edata); return NULL; } PG_END_TRY(); Py_RETURN_NONE; } PyObject * PLy_rollback(PyObject *self, PyObject *args) { MemoryContext oldcontext = CurrentMemoryContext; PLyExecutionContext *exec_ctx = PLy_current_execution_context(); PG_TRY(); { SPI_rollback(); /* was cleared at transaction end, reset pointer */ exec_ctx->scratch_ctx = NULL; } PG_CATCH(); { ErrorData *edata; PLyExceptionEntry *entry; PyObject *exc; /* Save error info */ MemoryContextSwitchTo(oldcontext); edata = CopyErrorData(); FlushErrorState(); /* was cleared at transaction end, reset pointer */ exec_ctx->scratch_ctx = NULL; /* Look up the correct exception */ entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode), HASH_FIND, NULL); /* * This could be a custom error code, if that's the case fallback to * SPIError */ exc = entry ? entry->exc : PLy_exc_spi_error; /* Make Python raise the exception */ PLy_spi_exception_set(exc, edata); FreeErrorData(edata); return NULL; } PG_END_TRY(); Py_RETURN_NONE; } /* * Utilities for running SPI functions in subtransactions. * * Usage: * * MemoryContext oldcontext = CurrentMemoryContext; * ResourceOwner oldowner = CurrentResourceOwner; * * PLy_spi_subtransaction_begin(oldcontext, oldowner); * PG_TRY(); * { * * PLy_spi_subtransaction_commit(oldcontext, oldowner); * } * PG_CATCH(); * { * * PLy_spi_subtransaction_abort(oldcontext, oldowner); * return NULL; * } * PG_END_TRY(); * * These utilities take care of restoring connection to the SPI manager and * setting a Python exception in case of an abort. */ void PLy_spi_subtransaction_begin(MemoryContext oldcontext, ResourceOwner oldowner) { BeginInternalSubTransaction(NULL); /* Want to run inside function's memory context */ MemoryContextSwitchTo(oldcontext); } void PLy_spi_subtransaction_commit(MemoryContext oldcontext, ResourceOwner oldowner) { /* Commit the inner transaction, return to outer xact context */ ReleaseCurrentSubTransaction(); MemoryContextSwitchTo(oldcontext); CurrentResourceOwner = oldowner; } void PLy_spi_subtransaction_abort(MemoryContext oldcontext, ResourceOwner oldowner) { ErrorData *edata; PLyExceptionEntry *entry; PyObject *exc; /* Save error info */ MemoryContextSwitchTo(oldcontext); edata = CopyErrorData(); FlushErrorState(); /* Abort the inner transaction */ RollbackAndReleaseCurrentSubTransaction(); MemoryContextSwitchTo(oldcontext); CurrentResourceOwner = oldowner; /* Look up the correct exception */ entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode), HASH_FIND, NULL); /* * This could be a custom error code, if that's the case fallback to * SPIError */ exc = entry ? entry->exc : PLy_exc_spi_error; /* Make Python raise the exception */ PLy_spi_exception_set(exc, edata); FreeErrorData(edata); } /* * Raise a SPIError, passing in it more error details, like the * internal query and error position. */ static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata) { PyObject *args = NULL; PyObject *spierror = NULL; PyObject *spidata = NULL; args = Py_BuildValue("(s)", edata->message); if (!args) goto failure; /* create a new SPI exception with the error message as the parameter */ spierror = PyObject_CallObject(excclass, args); if (!spierror) goto failure; spidata = Py_BuildValue("(izzzizzzzz)", edata->sqlerrcode, edata->detail, edata->hint, edata->internalquery, edata->internalpos, edata->schema_name, edata->table_name, edata->column_name, edata->datatype_name, edata->constraint_name); if (!spidata) goto failure; if (PyObject_SetAttrString(spierror, "spidata", spidata) == -1) goto failure; PyErr_SetObject(excclass, spierror); Py_DECREF(args); Py_DECREF(spierror); Py_DECREF(spidata); return; failure: Py_XDECREF(args); Py_XDECREF(spierror); Py_XDECREF(spidata); elog(ERROR, "could not convert SPI error to Python exception"); }