summaryrefslogtreecommitdiffstats
path: root/src/backend/executor/execAsync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/execAsync.c')
-rw-r--r--src/backend/executor/execAsync.c154
1 files changed, 154 insertions, 0 deletions
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644
index 0000000..94a284a
--- /dev/null
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,154 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ * Support routines for asynchronous execution
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/executor.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+
+/*
+ * Asynchronously request a tuple from a designed async-capable node.
+ */
+void
+ExecAsyncRequest(AsyncRequest *areq)
+{
+ if (areq->requestee->chgParam != NULL) /* something changed? */
+ ExecReScan(areq->requestee); /* let ReScan handle this */
+
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStartNode(areq->requestee->instrument);
+
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanRequest(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+
+ ExecAsyncResponse(areq);
+
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStopNode(areq->requestee->instrument,
+ TupIsNull(areq->result) ? 0.0 : 1.0);
+}
+
+/*
+ * Give the asynchronous node a chance to configure the file descriptor event
+ * for which it wishes to wait. We expect the node-type specific callback to
+ * make a single call of the following form:
+ *
+ * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
+ */
+void
+ExecAsyncConfigureWait(AsyncRequest *areq)
+{
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStartNode(areq->requestee->instrument);
+
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanConfigureWait(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStopNode(areq->requestee->instrument, 0.0);
+}
+
+/*
+ * Call the asynchronous node back when a relevant event has occurred.
+ */
+void
+ExecAsyncNotify(AsyncRequest *areq)
+{
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStartNode(areq->requestee->instrument);
+
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanNotify(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+
+ ExecAsyncResponse(areq);
+
+ /* must provide our own instrumentation support */
+ if (areq->requestee->instrument)
+ InstrStopNode(areq->requestee->instrument,
+ TupIsNull(areq->result) ? 0.0 : 1.0);
+}
+
+/*
+ * Call the requestor back when an asynchronous node has produced a result.
+ */
+void
+ExecAsyncResponse(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestor))
+ {
+ case T_AppendState:
+ ExecAsyncAppendResponse(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestor));
+ }
+}
+
+/*
+ * A requestee node should call this function to deliver the tuple to its
+ * requestor node. The requestee node can call this from its ExecAsyncRequest
+ * or ExecAsyncNotify callback.
+ */
+void
+ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
+{
+ areq->request_complete = true;
+ areq->result = result;
+}
+
+/*
+ * A requestee node should call this function to indicate that it is pending
+ * for a callback. The requestee node can call this from its ExecAsyncRequest
+ * or ExecAsyncNotify callback.
+ */
+void
+ExecAsyncRequestPending(AsyncRequest *areq)
+{
+ areq->callback_pending = true;
+ areq->request_complete = false;
+ areq->result = NULL;
+}