summaryrefslogtreecommitdiffstats
path: root/src/test/modules/libpq_pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/modules/libpq_pipeline')
-rw-r--r--src/test/modules/libpq_pipeline/.gitignore5
-rw-r--r--src/test/modules/libpq_pipeline/Makefile25
-rw-r--r--src/test/modules/libpq_pipeline/README1
-rw-r--r--src/test/modules/libpq_pipeline/libpq_pipeline.c1818
-rw-r--r--src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl78
-rw-r--r--src/test/modules/libpq_pipeline/traces/disallowed_in_pipeline.trace6
-rw-r--r--src/test/modules/libpq_pipeline/traces/multi_pipelines.trace23
-rw-r--r--src/test/modules/libpq_pipeline/traces/nosync.trace92
-rw-r--r--src/test/modules/libpq_pipeline/traces/pipeline_abort.trace62
-rw-r--r--src/test/modules/libpq_pipeline/traces/pipeline_idle.trace32
-rw-r--r--src/test/modules/libpq_pipeline/traces/prepared.trace18
-rw-r--r--src/test/modules/libpq_pipeline/traces/simple_pipeline.trace12
-rw-r--r--src/test/modules/libpq_pipeline/traces/singlerow.trace59
-rw-r--r--src/test/modules/libpq_pipeline/traces/transaction.trace61
14 files changed, 2292 insertions, 0 deletions
diff --git a/src/test/modules/libpq_pipeline/.gitignore b/src/test/modules/libpq_pipeline/.gitignore
new file mode 100644
index 0000000..3a11e78
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/.gitignore
@@ -0,0 +1,5 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
+/libpq_pipeline
diff --git a/src/test/modules/libpq_pipeline/Makefile b/src/test/modules/libpq_pipeline/Makefile
new file mode 100644
index 0000000..65acc3e
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/Makefile
@@ -0,0 +1,25 @@
+# src/test/modules/libpq_pipeline/Makefile
+
+PGFILEDESC = "libpq_pipeline - test program for pipeline execution"
+PGAPPICON = win32
+
+PROGRAM = libpq_pipeline
+OBJS = $(WIN32RES) libpq_pipeline.o
+
+NO_INSTALL = 1
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS_INTERNAL += $(libpq_pgport)
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/libpq_pipeline
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/libpq_pipeline/README b/src/test/modules/libpq_pipeline/README
new file mode 100644
index 0000000..d8174dd
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/README
@@ -0,0 +1 @@
+Test programs and libraries for libpq
diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c
new file mode 100644
index 0000000..b30a97d
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c
@@ -0,0 +1,1818 @@
+/*-------------------------------------------------------------------------
+ *
+ * libpq_pipeline.c
+ * Verify libpq pipeline execution functionality
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/test/modules/libpq_pipeline/libpq_pipeline.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <sys/time.h>
+#ifdef HAVE_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
+#include "catalog/pg_type_d.h"
+#include "common/fe_memutils.h"
+#include "libpq-fe.h"
+#include "pg_getopt.h"
+#include "portability/instr_time.h"
+
+
+static void exit_nicely(PGconn *conn);
+static void pg_attribute_noreturn() pg_fatal_impl(int line, const char *fmt,...)
+ pg_attribute_printf(2, 3);
+static bool process_result(PGconn *conn, PGresult *res, int results,
+ int numsent);
+
+const char *const progname = "libpq_pipeline";
+
+/* Options and defaults */
+char *tracefile = NULL; /* path to PQtrace() file */
+
+
+#ifdef DEBUG_OUTPUT
+#define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
+#else
+#define pg_debug(...)
+#endif
+
+static const char *const drop_table_sql =
+"DROP TABLE IF EXISTS pq_pipeline_demo";
+static const char *const create_table_sql =
+"CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
+"int8filler int8);";
+static const char *const insert_sql =
+"INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
+static const char *const insert_sql2 =
+"INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
+
+/* max char length of an int32/64, plus sign and null terminator */
+#define MAXINTLEN 12
+#define MAXINT8LEN 20
+
+static void
+exit_nicely(PGconn *conn)
+{
+ PQfinish(conn);
+ exit(1);
+}
+
+/*
+ * Print an error to stderr and terminate the program.
+ */
+#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
+static void
+pg_attribute_noreturn()
+pg_fatal_impl(int line, const char *fmt,...)
+{
+ va_list args;
+
+
+ fflush(stdout);
+
+ fprintf(stderr, "\n%s:%d: ", progname, line);
+ va_start(args, fmt);
+ vfprintf(stderr, fmt, args);
+ va_end(args);
+ Assert(fmt[strlen(fmt) - 1] != '\n');
+ fprintf(stderr, "\n");
+ exit(1);
+}
+
+static void
+test_disallowed_in_pipeline(PGconn *conn)
+{
+ PGresult *res = NULL;
+
+ fprintf(stderr, "test error cases... ");
+
+ if (PQisnonblocking(conn))
+ pg_fatal("Expected blocking connection mode");
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("Unable to enter pipeline mode");
+
+ if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+ pg_fatal("Pipeline mode not activated properly");
+
+ /* PQexec should fail in pipeline mode */
+ res = PQexec(conn, "SELECT 1");
+ if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+ pg_fatal("PQexec should fail in pipeline mode but succeeded");
+ if (strcmp(PQerrorMessage(conn),
+ "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
+ pg_fatal("did not get expected error message; got: \"%s\"",
+ PQerrorMessage(conn));
+
+ /* PQsendQuery should fail in pipeline mode */
+ if (PQsendQuery(conn, "SELECT 1") != 0)
+ pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
+ if (strcmp(PQerrorMessage(conn),
+ "PQsendQuery not allowed in pipeline mode\n") != 0)
+ pg_fatal("did not get expected error message; got: \"%s\"",
+ PQerrorMessage(conn));
+
+ /* Entering pipeline mode when already in pipeline mode is OK */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("re-entering pipeline mode should be a no-op but failed");
+
+ if (PQisBusy(conn) != 0)
+ pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
+
+ /* ok, back to normal command mode */
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("couldn't exit idle empty pipeline mode");
+
+ if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+ pg_fatal("Pipeline mode not terminated properly");
+
+ /* exiting pipeline mode when not in pipeline mode should be a no-op */
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
+
+ /* can now PQexec again */
+ res = PQexec(conn, "SELECT 1");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
+ PQerrorMessage(conn));
+
+ fprintf(stderr, "ok\n");
+}
+
+static void
+test_multi_pipelines(PGconn *conn)
+{
+ PGresult *res = NULL;
+ const char *dummy_params[1] = {"1"};
+ Oid dummy_param_oids[1] = {INT4OID};
+
+ fprintf(stderr, "multi pipeline... ");
+
+ /*
+ * Queue up a couple of small pipelines and process each without returning
+ * to command mode first.
+ */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+ if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
+
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
+
+ if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
+
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+ /* OK, start processing the results */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+ res = NULL;
+
+ if (PQgetResult(conn) != NULL)
+ pg_fatal("PQgetResult returned something extra after first result");
+
+ if (PQexitPipelineMode(conn) != 0)
+ pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when sync result expected: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s instead of sync result, error: %s",
+ PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+ PQclear(res);
+
+ /* second pipeline */
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Unexpected result code %s from second pipeline item",
+ PQresStatus(PQresultStatus(res)));
+
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("Expected null result, got %s",
+ PQresStatus(PQresultStatus(res)));
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s from second pipeline sync",
+ PQresStatus(PQresultStatus(res)));
+
+ /* We're still in pipeline mode ... */
+ if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+ pg_fatal("Fell out of pipeline mode somehow");
+
+ /* until we end it, which we can safely do now */
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+ PQerrorMessage(conn));
+
+ if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+ pg_fatal("exiting pipeline mode didn't seem to work");
+
+ fprintf(stderr, "ok\n");
+}
+
+/*
+ * Test behavior when a pipeline dispatches a number of commands that are
+ * not flushed by a sync point.
+ */
+static void
+test_nosync(PGconn *conn)
+{
+ int numqueries = 10;
+ int results = 0;
+ int sock = PQsocket(conn);
+
+ fprintf(stderr, "nosync... ");
+
+ if (sock < 0)
+ pg_fatal("invalid socket");
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("could not enter pipeline mode");
+ for (int i = 0; i < numqueries; i++)
+ {
+ fd_set input_mask;
+ struct timeval tv;
+
+ if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("error sending select: %s", PQerrorMessage(conn));
+ PQflush(conn);
+
+ /*
+ * If the server has written anything to us, read (some of) it now.
+ */
+ FD_ZERO(&input_mask);
+ FD_SET(sock, &input_mask);
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
+ {
+ fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ exit_nicely(conn);
+ }
+ if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
+ pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
+ }
+
+ /* tell server to flush its output buffer */
+ if (PQsendFlushRequest(conn) != 1)
+ pg_fatal("failed to send flush request");
+ PQflush(conn);
+
+ /* Now read all results */
+ for (;;)
+ {
+ PGresult *res;
+
+ res = PQgetResult(conn);
+
+ /* NULL results are only expected after TUPLES_OK */
+ if (res == NULL)
+ pg_fatal("got unexpected NULL result after %d results", results);
+
+ /* We expect exactly one TUPLES_OK result for each query we sent */
+ if (PQresultStatus(res) == PGRES_TUPLES_OK)
+ {
+ PGresult *res2;
+
+ /* and one NULL result should follow each */
+ res2 = PQgetResult(conn);
+ if (res2 != NULL)
+ pg_fatal("expected NULL, got %s",
+ PQresStatus(PQresultStatus(res2)));
+ PQclear(res);
+ results++;
+
+ /* if we're done, we're done */
+ if (results == numqueries)
+ break;
+
+ continue;
+ }
+
+ /* anything else is unexpected */
+ pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
+ }
+
+ fprintf(stderr, "ok\n");
+}
+
+/*
+ * When an operation in a pipeline fails the rest of the pipeline is flushed. We
+ * still have to get results for each pipeline item, but the item will just be
+ * a PGRES_PIPELINE_ABORTED code.
+ *
+ * This intentionally doesn't use a transaction to wrap the pipeline. You should
+ * usually use an xact, but in this case we want to observe the effects of each
+ * statement.
+ */
+static void
+test_pipeline_abort(PGconn *conn)
+{
+ PGresult *res = NULL;
+ const char *dummy_params[1] = {"1"};
+ Oid dummy_param_oids[1] = {INT4OID};
+ int i;
+ int gotrows;
+ bool goterror;
+
+ fprintf(stderr, "aborted pipeline... ");
+
+ res = PQexec(conn, drop_table_sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
+
+ res = PQexec(conn, create_table_sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
+
+ /*
+ * Queue up a couple of small pipelines and process each without returning
+ * to command mode first. Make sure the second operation in the first
+ * pipeline ERRORs.
+ */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+ dummy_params[0] = "1";
+ if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
+
+ if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
+ 1, dummy_param_oids, dummy_params,
+ NULL, NULL, 0) != 1)
+ pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
+
+ dummy_params[0] = "2";
+ if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
+
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+ dummy_params[0] = "3";
+ if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching second-pipeline insert failed: %s",
+ PQerrorMessage(conn));
+
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+ /*
+ * OK, start processing the pipeline results.
+ *
+ * We should get a command-ok for the first query, then a fatal error and
+ * a pipeline aborted message for the second insert, a pipeline-end, then
+ * a command-ok and a pipeline-ok for the second pipeline operation.
+ */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("Unexpected result status %s: %s",
+ PQresStatus(PQresultStatus(res)),
+ PQresultErrorMessage(res));
+ PQclear(res);
+
+ /* NULL result to signal end-of-results for this command */
+ if ((res = PQgetResult(conn)) != NULL)
+ pg_fatal("Expected null result, got %s",
+ PQresStatus(PQresultStatus(res)));
+
+ /* Second query caused error, so we expect an error next */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+ pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ /* NULL result to signal end-of-results for this command */
+ if ((res = PQgetResult(conn)) != NULL)
+ pg_fatal("Expected null result, got %s",
+ PQresStatus(PQresultStatus(res)));
+
+ /*
+ * pipeline should now be aborted.
+ *
+ * Note that we could still queue more queries at this point if we wanted;
+ * they'd get added to a new third pipeline since we've already sent a
+ * second. The aborted flag relates only to the pipeline being received.
+ */
+ if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
+ pg_fatal("pipeline should be flagged as aborted but isn't");
+
+ /* third query in pipeline, the second insert */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
+ pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ /* NULL result to signal end-of-results for this command */
+ if ((res = PQgetResult(conn)) != NULL)
+ pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
+
+ if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
+ pg_fatal("pipeline should be flagged as aborted but isn't");
+
+ /* Ensure we're still in pipeline */
+ if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+ pg_fatal("Fell out of pipeline mode somehow");
+
+ /*
+ * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
+ *
+ * (This is so clients know to start processing results normally again and
+ * can tell the difference between skipped commands and the sync.)
+ */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code from first pipeline sync\n"
+ "Expected PGRES_PIPELINE_SYNC, got %s",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
+ pg_fatal("sync should've cleared the aborted flag but didn't");
+
+ /* We're still in pipeline mode... */
+ if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+ pg_fatal("Fell out of pipeline mode somehow");
+
+ /* the insert from the second pipeline */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("Unexpected result code %s from first item in second pipeline",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ /* Read the NULL result at the end of the command */
+ if ((res = PQgetResult(conn)) != NULL)
+ pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
+
+ /* the second pipeline sync */
+ if ((res = PQgetResult(conn)) == NULL)
+ pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s from second pipeline sync",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ if ((res = PQgetResult(conn)) != NULL)
+ pg_fatal("Expected null result, got %s: %s",
+ PQresStatus(PQresultStatus(res)),
+ PQerrorMessage(conn));
+
+ /* Try to send two queries in one command */
+ if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ goterror = false;
+ while ((res = PQgetResult(conn)) != NULL)
+ {
+ switch (PQresultStatus(res))
+ {
+ case PGRES_FATAL_ERROR:
+ if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
+ pg_fatal("expected error about multiple commands, got %s",
+ PQerrorMessage(conn));
+ printf("got expected %s", PQerrorMessage(conn));
+ goterror = true;
+ break;
+ default:
+ pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
+ break;
+ }
+ }
+ if (!goterror)
+ pg_fatal("did not get cannot-insert-multiple-commands error");
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("got NULL result");
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s from pipeline sync",
+ PQresStatus(PQresultStatus(res)));
+ fprintf(stderr, "ok\n");
+
+ /* Test single-row mode with an error partways */
+ if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ PQsetSingleRowMode(conn);
+ goterror = false;
+ gotrows = 0;
+ while ((res = PQgetResult(conn)) != NULL)
+ {
+ switch (PQresultStatus(res))
+ {
+ case PGRES_SINGLE_TUPLE:
+ printf("got row: %s\n", PQgetvalue(res, 0, 0));
+ gotrows++;
+ break;
+ case PGRES_FATAL_ERROR:
+ if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
+ pg_fatal("expected division-by-zero, got: %s (%s)",
+ PQerrorMessage(conn),
+ PQresultErrorField(res, PG_DIAG_SQLSTATE));
+ printf("got expected division-by-zero\n");
+ goterror = true;
+ break;
+ default:
+ pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
+ }
+ PQclear(res);
+ }
+ if (!goterror)
+ pg_fatal("did not get division-by-zero error");
+ if (gotrows != 3)
+ pg_fatal("did not get three rows");
+ /* the third pipeline sync */
+ if ((res = PQgetResult(conn)) == NULL)
+ pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s from third pipeline sync",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ /* We're still in pipeline mode... */
+ if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+ pg_fatal("Fell out of pipeline mode somehow");
+
+ /* until we end it, which we can safely do now */
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+ PQerrorMessage(conn));
+
+ if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+ pg_fatal("exiting pipeline mode didn't seem to work");
+
+ /*-
+ * Since we fired the pipelines off without a surrounding xact, the results
+ * should be:
+ *
+ * - Implicit xact started by server around 1st pipeline
+ * - First insert applied
+ * - Second statement aborted xact
+ * - Third insert skipped
+ * - Sync rolled back first implicit xact
+ * - Implicit xact created by server around 2nd pipeline
+ * - insert applied from 2nd pipeline
+ * - Sync commits 2nd xact
+ *
+ * So we should only have the value 3 that we inserted.
+ */
+ res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Expected tuples, got %s: %s",
+ PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+ if (PQntuples(res) != 1)
+ pg_fatal("expected 1 result, got %d", PQntuples(res));
+ for (i = 0; i < PQntuples(res); i++)
+ {
+ const char *val = PQgetvalue(res, i, 0);
+
+ if (strcmp(val, "3") != 0)
+ pg_fatal("expected only insert with value 3, got %s", val);
+ }
+
+ PQclear(res);
+
+ fprintf(stderr, "ok\n");
+}
+
+/* State machine enum for test_pipelined_insert */
+enum PipelineInsertStep
+{
+ BI_BEGIN_TX,
+ BI_DROP_TABLE,
+ BI_CREATE_TABLE,
+ BI_PREPARE,
+ BI_INSERT_ROWS,
+ BI_COMMIT_TX,
+ BI_SYNC,
+ BI_DONE
+};
+
+static void
+test_pipelined_insert(PGconn *conn, int n_rows)
+{
+ Oid insert_param_oids[2] = {INT4OID, INT8OID};
+ const char *insert_params[2];
+ char insert_param_0[MAXINTLEN];
+ char insert_param_1[MAXINT8LEN];
+ enum PipelineInsertStep send_step = BI_BEGIN_TX,
+ recv_step = BI_BEGIN_TX;
+ int rows_to_send,
+ rows_to_receive;
+
+ insert_params[0] = insert_param_0;
+ insert_params[1] = insert_param_1;
+
+ rows_to_send = rows_to_receive = n_rows;
+
+ /*
+ * Do a pipelined insert into a table created at the start of the pipeline
+ */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+ while (send_step != BI_PREPARE)
+ {
+ const char *sql;
+
+ switch (send_step)
+ {
+ case BI_BEGIN_TX:
+ sql = "BEGIN TRANSACTION";
+ send_step = BI_DROP_TABLE;
+ break;
+
+ case BI_DROP_TABLE:
+ sql = drop_table_sql;
+ send_step = BI_CREATE_TABLE;
+ break;
+
+ case BI_CREATE_TABLE:
+ sql = create_table_sql;
+ send_step = BI_PREPARE;
+ break;
+
+ default:
+ pg_fatal("invalid state");
+ sql = NULL; /* keep compiler quiet */
+ }
+
+ pg_debug("sending: %s\n", sql);
+ if (PQsendQueryParams(conn, sql,
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
+ }
+
+ Assert(send_step == BI_PREPARE);
+ pg_debug("sending: %s\n", insert_sql2);
+ if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
+ pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
+ send_step = BI_INSERT_ROWS;
+
+ /*
+ * Now we start inserting. We'll be sending enough data that we could fill
+ * our output buffer, so to avoid deadlocking we need to enter nonblocking
+ * mode and consume input while we send more output. As results of each
+ * query are processed we should pop them to allow processing of the next
+ * query. There's no need to finish the pipeline before processing
+ * results.
+ */
+ if (PQsetnonblocking(conn, 1) != 0)
+ pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
+
+ while (recv_step != BI_DONE)
+ {
+ int sock;
+ fd_set input_mask;
+ fd_set output_mask;
+
+ sock = PQsocket(conn);
+
+ if (sock < 0)
+ break; /* shouldn't happen */
+
+ FD_ZERO(&input_mask);
+ FD_SET(sock, &input_mask);
+ FD_ZERO(&output_mask);
+ FD_SET(sock, &output_mask);
+
+ if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
+ {
+ fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ exit_nicely(conn);
+ }
+
+ /*
+ * Process any results, so we keep the server's output buffer free
+ * flowing and it can continue to process input
+ */
+ if (FD_ISSET(sock, &input_mask))
+ {
+ PQconsumeInput(conn);
+
+ /* Read until we'd block if we tried to read */
+ while (!PQisBusy(conn) && recv_step < BI_DONE)
+ {
+ PGresult *res;
+ const char *cmdtag = "";
+ const char *description = "";
+ int status;
+
+ /*
+ * Read next result. If no more results from this query,
+ * advance to the next query
+ */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ continue;
+
+ status = PGRES_COMMAND_OK;
+ switch (recv_step)
+ {
+ case BI_BEGIN_TX:
+ cmdtag = "BEGIN";
+ recv_step++;
+ break;
+ case BI_DROP_TABLE:
+ cmdtag = "DROP TABLE";
+ recv_step++;
+ break;
+ case BI_CREATE_TABLE:
+ cmdtag = "CREATE TABLE";
+ recv_step++;
+ break;
+ case BI_PREPARE:
+ cmdtag = "";
+ description = "PREPARE";
+ recv_step++;
+ break;
+ case BI_INSERT_ROWS:
+ cmdtag = "INSERT";
+ rows_to_receive--;
+ if (rows_to_receive == 0)
+ recv_step++;
+ break;
+ case BI_COMMIT_TX:
+ cmdtag = "COMMIT";
+ recv_step++;
+ break;
+ case BI_SYNC:
+ cmdtag = "";
+ description = "SYNC";
+ status = PGRES_PIPELINE_SYNC;
+ recv_step++;
+ break;
+ case BI_DONE:
+ /* unreachable */
+ pg_fatal("unreachable state");
+ }
+
+ if (PQresultStatus(res) != status)
+ pg_fatal("%s reported status %s, expected %s\n"
+ "Error message: \"%s\"",
+ description, PQresStatus(PQresultStatus(res)),
+ PQresStatus(status), PQerrorMessage(conn));
+
+ if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
+ pg_fatal("%s expected command tag '%s', got '%s'",
+ description, cmdtag, PQcmdStatus(res));
+
+ pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
+
+ PQclear(res);
+ }
+ }
+
+ /* Write more rows and/or the end pipeline message, if needed */
+ if (FD_ISSET(sock, &output_mask))
+ {
+ PQflush(conn);
+
+ if (send_step == BI_INSERT_ROWS)
+ {
+ snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
+ /* use up some buffer space with a wide value */
+ snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
+
+ if (PQsendQueryPrepared(conn, "my_insert",
+ 2, insert_params, NULL, NULL, 0) == 1)
+ {
+ pg_debug("sent row %d\n", rows_to_send);
+
+ rows_to_send--;
+ if (rows_to_send == 0)
+ send_step++;
+ }
+ else
+ {
+ /*
+ * in nonblocking mode, so it's OK for an insert to fail
+ * to send
+ */
+ fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
+ rows_to_send, PQerrorMessage(conn));
+ }
+ }
+ else if (send_step == BI_COMMIT_TX)
+ {
+ if (PQsendQueryParams(conn, "COMMIT",
+ 0, NULL, NULL, NULL, NULL, 0) == 1)
+ {
+ pg_debug("sent COMMIT\n");
+ send_step++;
+ }
+ else
+ {
+ fprintf(stderr, "WARNING: failed to send commit: %s\n",
+ PQerrorMessage(conn));
+ }
+ }
+ else if (send_step == BI_SYNC)
+ {
+ if (PQpipelineSync(conn) == 1)
+ {
+ fprintf(stdout, "pipeline sync sent\n");
+ send_step++;
+ }
+ else
+ {
+ fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
+ PQerrorMessage(conn));
+ }
+ }
+ }
+ }
+
+ /* We've got the sync message and the pipeline should be done */
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+ PQerrorMessage(conn));
+
+ if (PQsetnonblocking(conn, 0) != 0)
+ pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
+
+ fprintf(stderr, "ok\n");
+}
+
+static void
+test_prepared(PGconn *conn)
+{
+ PGresult *res = NULL;
+ Oid param_oids[1] = {INT4OID};
+ Oid expected_oids[4];
+ Oid typ;
+
+ fprintf(stderr, "prepared... ");
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+ if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
+ "interval '1 sec'",
+ 1, param_oids) != 1)
+ pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
+ expected_oids[0] = INT4OID;
+ expected_oids[1] = TEXTOID;
+ expected_oids[2] = NUMERICOID;
+ expected_oids[3] = INTERVALOID;
+ if (PQsendDescribePrepared(conn, "select_one") != 1)
+ pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("expected NULL result");
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned NULL");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
+ if (PQnfields(res) != lengthof(expected_oids))
+ pg_fatal("expected %zd columns, got %d",
+ lengthof(expected_oids), PQnfields(res));
+ for (int i = 0; i < PQnfields(res); i++)
+ {
+ typ = PQftype(res, i);
+ if (typ != expected_oids[i])
+ pg_fatal("field %d: expected type %u, got %u",
+ i, expected_oids[i], typ);
+ }
+ PQclear(res);
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("expected NULL result");
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
+
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
+
+ PQexec(conn, "BEGIN");
+ PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
+ PQenterPipelineMode(conn);
+ if (PQsendDescribePortal(conn, "cursor_one") != 1)
+ pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
+
+ typ = PQftype(res, 0);
+ if (typ != INT4OID)
+ pg_fatal("portal: expected type %u, got %u",
+ INT4OID, typ);
+ PQclear(res);
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("expected NULL result");
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
+
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
+
+ fprintf(stderr, "ok\n");
+}
+
+/* Notice processor: print notices, and count how many we got */
+static void
+notice_processor(void *arg, const char *message)
+{
+ int *n_notices = (int *) arg;
+
+ (*n_notices)++;
+ fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
+}
+
+/* Verify behavior in "idle" state */
+static void
+test_pipeline_idle(PGconn *conn)
+{
+ PGresult *res;
+ int n_notices = 0;
+
+ fprintf(stderr, "\npipeline idle...\n");
+
+ PQsetNoticeProcessor(conn, notice_processor, &n_notices);
+
+ /* Try to exit pipeline mode in pipeline-idle state */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+ if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("did not receive terminating NULL");
+ if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ if (PQexitPipelineMode(conn) == 1)
+ pg_fatal("exiting pipeline succeeded when it shouldn't");
+ if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
+ strlen("cannot exit pipeline mode")) != 0)
+ pg_fatal("did not get expected error; got: %s",
+ PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s from second pipeline item",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("did not receive terminating NULL");
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
+
+ if (n_notices > 0)
+ pg_fatal("got %d notice(s)", n_notices);
+ fprintf(stderr, "ok - 1\n");
+
+ /* Have a WARNING in the middle of a resultset */
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
+ if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s", PQerrorMessage(conn));
+ PQsendFlushRequest(conn);
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("unexpected NULL result received");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
+ fprintf(stderr, "ok - 2\n");
+}
+
+static void
+test_simple_pipeline(PGconn *conn)
+{
+ PGresult *res = NULL;
+ const char *dummy_params[1] = {"1"};
+ Oid dummy_param_oids[1] = {INT4OID};
+
+ fprintf(stderr, "simple pipeline... ");
+
+ /*
+ * Enter pipeline mode and dispatch a set of operations, which we'll then
+ * process the results of as they come in.
+ *
+ * For a simple case we should be able to do this without interim
+ * processing of results since our output buffer will give us enough slush
+ * to work with and we won't block on sending. So blocking mode is fine.
+ */
+ if (PQisnonblocking(conn))
+ pg_fatal("Expected blocking connection mode");
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
+
+ if (PQsendQueryParams(conn, "SELECT $1",
+ 1, dummy_param_oids, dummy_params,
+ NULL, NULL, 0) != 1)
+ pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
+
+ if (PQexitPipelineMode(conn) != 0)
+ pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
+
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Unexpected result code %s from first pipeline item",
+ PQresStatus(PQresultStatus(res)));
+
+ PQclear(res);
+ res = NULL;
+
+ if (PQgetResult(conn) != NULL)
+ pg_fatal("PQgetResult returned something extra after first query result.");
+
+ /*
+ * Even though we've processed the result there's still a sync to come and
+ * we can't exit pipeline mode yet
+ */
+ if (PQexitPipelineMode(conn) != 0)
+ pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
+ pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
+ PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+
+ PQclear(res);
+ res = NULL;
+
+ if (PQgetResult(conn) != NULL)
+ pg_fatal("PQgetResult returned something extra after pipeline end: %s",
+ PQresStatus(PQresultStatus(res)));
+
+ /* We're still in pipeline mode... */
+ if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
+ pg_fatal("Fell out of pipeline mode somehow");
+
+ /* ... until we end it, which we can safely do now */
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
+ PQerrorMessage(conn));
+
+ if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
+ pg_fatal("Exiting pipeline mode didn't seem to work");
+
+ fprintf(stderr, "ok\n");
+}
+
+static void
+test_singlerowmode(PGconn *conn)
+{
+ PGresult *res;
+ int i;
+ bool pipeline_ended = false;
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s",
+ PQerrorMessage(conn));
+
+ /* One series of three commands, using single-row mode for the first two. */
+ for (i = 0; i < 3; i++)
+ {
+ char *param[1];
+
+ param[0] = psprintf("%d", 44 + i);
+
+ if (PQsendQueryParams(conn,
+ "SELECT generate_series(42, $1)",
+ 1,
+ NULL,
+ (const char **) param,
+ NULL,
+ NULL,
+ 0) != 1)
+ pg_fatal("failed to send query: %s",
+ PQerrorMessage(conn));
+ pfree(param[0]);
+ }
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+
+ for (i = 0; !pipeline_ended; i++)
+ {
+ bool first = true;
+ bool saw_ending_tuplesok;
+ bool isSingleTuple = false;
+
+ /* Set single row mode for only first 2 SELECT queries */
+ if (i < 2)
+ {
+ if (PQsetSingleRowMode(conn) != 1)
+ pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
+ }
+
+ /* Consume rows for this query */
+ saw_ending_tuplesok = false;
+ while ((res = PQgetResult(conn)) != NULL)
+ {
+ ExecStatusType est = PQresultStatus(res);
+
+ if (est == PGRES_PIPELINE_SYNC)
+ {
+ fprintf(stderr, "end of pipeline reached\n");
+ pipeline_ended = true;
+ PQclear(res);
+ if (i != 3)
+ pg_fatal("Expected three results, got %d", i);
+ break;
+ }
+
+ /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
+ if (first)
+ {
+ if (i <= 1 && est != PGRES_SINGLE_TUPLE)
+ pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
+ i, PQresStatus(est));
+ if (i >= 2 && est != PGRES_TUPLES_OK)
+ pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
+ i, PQresStatus(est));
+ first = false;
+ }
+
+ fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
+ switch (est)
+ {
+ case PGRES_TUPLES_OK:
+ fprintf(stderr, ", tuples: %d\n", PQntuples(res));
+ saw_ending_tuplesok = true;
+ if (isSingleTuple)
+ {
+ if (PQntuples(res) == 0)
+ fprintf(stderr, "all tuples received in query %d\n", i);
+ else
+ pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
+ }
+ break;
+
+ case PGRES_SINGLE_TUPLE:
+ isSingleTuple = true;
+ fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
+ break;
+
+ default:
+ pg_fatal("unexpected");
+ }
+ PQclear(res);
+ }
+ if (!pipeline_ended && !saw_ending_tuplesok)
+ pg_fatal("didn't get expected terminating TUPLES_OK");
+ }
+
+ /*
+ * Now issue one command, get its results in with single-row mode, then
+ * issue another command, and get its results in normal mode; make sure
+ * the single-row mode flag is reset as expected.
+ */
+ if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s",
+ PQerrorMessage(conn));
+ if (PQsendFlushRequest(conn) != 1)
+ pg_fatal("failed to send flush request");
+ if (PQsetSingleRowMode(conn) != 1)
+ pg_fatal("PQsetSingleRowMode() failed");
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("unexpected NULL");
+ if (PQresultStatus(res) != PGRES_SINGLE_TUPLE)
+ pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
+ PQresStatus(PQresultStatus(res)));
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("unexpected NULL");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Expected PGRES_TUPLES_OK, got %s",
+ PQresStatus(PQresultStatus(res)));
+ if (PQgetResult(conn) != NULL)
+ pg_fatal("expected NULL result");
+
+ if (PQsendQueryParams(conn, "SELECT 1",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s",
+ PQerrorMessage(conn));
+ if (PQsendFlushRequest(conn) != 1)
+ pg_fatal("failed to send flush request");
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("unexpected NULL");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Expected PGRES_TUPLES_OK, got %s",
+ PQresStatus(PQresultStatus(res)));
+ if (PQgetResult(conn) != NULL)
+ pg_fatal("expected NULL result");
+
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
+
+ fprintf(stderr, "ok\n");
+}
+
+/*
+ * Simple test to verify that a pipeline is discarded as a whole when there's
+ * an error, ignoring transaction commands.
+ */
+static void
+test_transaction(PGconn *conn)
+{
+ PGresult *res;
+ bool expect_null;
+ int num_syncs = 0;
+
+ res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
+ "CREATE TABLE pq_pipeline_tst (id int)");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("failed to create test table: %s",
+ PQerrorMessage(conn));
+ PQclear(res);
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode: %s",
+ PQerrorMessage(conn));
+ if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
+ pg_fatal("could not send prepare on pipeline: %s",
+ PQerrorMessage(conn));
+
+ if (PQsendQueryParams(conn,
+ "BEGIN",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s",
+ PQerrorMessage(conn));
+ if (PQsendQueryParams(conn,
+ "SELECT 0/0",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s",
+ PQerrorMessage(conn));
+
+ /*
+ * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
+ * get out of the pipeline-aborted state first.
+ */
+ if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
+ pg_fatal("failed to execute prepared: %s",
+ PQerrorMessage(conn));
+
+ /* This insert fails because we're in pipeline-aborted state */
+ if (PQsendQueryParams(conn,
+ "INSERT INTO pq_pipeline_tst VALUES (1)",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s",
+ PQerrorMessage(conn));
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ num_syncs++;
+
+ /*
+ * This insert fails even though the pipeline got a SYNC, because we're in
+ * an aborted transaction
+ */
+ if (PQsendQueryParams(conn,
+ "INSERT INTO pq_pipeline_tst VALUES (2)",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s",
+ PQerrorMessage(conn));
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ num_syncs++;
+
+ /*
+ * Send ROLLBACK using prepared stmt. This one works because we just did
+ * PQpipelineSync above.
+ */
+ if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
+ pg_fatal("failed to execute prepared: %s",
+ PQerrorMessage(conn));
+
+ /*
+ * Now that we're out of a transaction and in pipeline-good mode, this
+ * insert works
+ */
+ if (PQsendQueryParams(conn,
+ "INSERT INTO pq_pipeline_tst VALUES (3)",
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("failed to send query: %s",
+ PQerrorMessage(conn));
+ /* Send two syncs now -- match up to SYNC messages below */
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ num_syncs++;
+ if (PQpipelineSync(conn) != 1)
+ pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
+ num_syncs++;
+
+ expect_null = false;
+ for (int i = 0;; i++)
+ {
+ ExecStatusType restype;
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ {
+ printf("%d: got NULL result\n", i);
+ if (!expect_null)
+ pg_fatal("did not expect NULL here");
+ expect_null = false;
+ continue;
+ }
+ restype = PQresultStatus(res);
+ printf("%d: got status %s", i, PQresStatus(restype));
+ if (expect_null)
+ pg_fatal("expected NULL");
+ if (restype == PGRES_FATAL_ERROR)
+ printf("; error: %s", PQerrorMessage(conn));
+ else if (restype == PGRES_PIPELINE_ABORTED)
+ {
+ printf(": command didn't run because pipeline aborted\n");
+ }
+ else
+ printf("\n");
+ PQclear(res);
+
+ if (restype == PGRES_PIPELINE_SYNC)
+ num_syncs--;
+ else
+ expect_null = true;
+ if (num_syncs <= 0)
+ break;
+ }
+ if (PQgetResult(conn) != NULL)
+ pg_fatal("returned something extra after all the syncs: %s",
+ PQresStatus(PQresultStatus(res)));
+
+ if (PQexitPipelineMode(conn) != 1)
+ pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
+
+ /* We expect to find one tuple containing the value "3" */
+ res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
+ if (PQntuples(res) != 1)
+ pg_fatal("did not get 1 tuple");
+ if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
+ pg_fatal("did not get expected tuple");
+ PQclear(res);
+
+ fprintf(stderr, "ok\n");
+}
+
+/*
+ * In this test mode we send a stream of queries, with one in the middle
+ * causing an error. Verify that we can still send some more after the
+ * error and have libpq work properly.
+ */
+static void
+test_uniqviol(PGconn *conn)
+{
+ int sock = PQsocket(conn);
+ PGresult *res;
+ Oid paramTypes[2] = {INT8OID, INT8OID};
+ const char *paramValues[2];
+ char paramValue0[MAXINT8LEN];
+ char paramValue1[MAXINT8LEN];
+ int ctr = 0;
+ int numsent = 0;
+ int results = 0;
+ bool read_done = false;
+ bool write_done = false;
+ bool error_sent = false;
+ bool got_error = false;
+ int switched = 0;
+ int socketful = 0;
+ fd_set in_fds;
+ fd_set out_fds;
+
+ fprintf(stderr, "uniqviol ...");
+
+ PQsetnonblocking(conn, 1);
+
+ paramValues[0] = paramValue0;
+ paramValues[1] = paramValue1;
+ sprintf(paramValue1, "42");
+
+ res = PQexec(conn, "drop table if exists ppln_uniqviol;"
+ "create table ppln_uniqviol(id bigint primary key, idata bigint)");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("failed to create table: %s", PQerrorMessage(conn));
+
+ res = PQexec(conn, "begin");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
+
+ res = PQprepare(conn, "insertion",
+ "insert into ppln_uniqviol values ($1, $2) returning id",
+ 2, paramTypes);
+ if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
+
+ if (PQenterPipelineMode(conn) != 1)
+ pg_fatal("failed to enter pipeline mode");
+
+ while (!read_done)
+ {
+ /*
+ * Avoid deadlocks by reading everything the server has sent before
+ * sending anything. (Special precaution is needed here to process
+ * PQisBusy before testing the socket for read-readiness, because the
+ * socket does not turn read-ready after "sending" queries in aborted
+ * pipeline mode.)
+ */
+ while (PQisBusy(conn) == 0)
+ {
+ bool new_error;
+
+ if (results >= numsent)
+ {
+ if (write_done)
+ read_done = true;
+ break;
+ }
+
+ res = PQgetResult(conn);
+ new_error = process_result(conn, res, results, numsent);
+ if (new_error && got_error)
+ pg_fatal("got two errors");
+ got_error |= new_error;
+ if (results++ >= numsent - 1)
+ {
+ if (write_done)
+ read_done = true;
+ break;
+ }
+ }
+
+ if (read_done)
+ break;
+
+ FD_ZERO(&out_fds);
+ FD_SET(sock, &out_fds);
+
+ FD_ZERO(&in_fds);
+ FD_SET(sock, &in_fds);
+
+ if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
+ {
+ if (errno == EINTR)
+ continue;
+ pg_fatal("select() failed: %m");
+ }
+
+ if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
+ pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
+
+ /*
+ * If the socket is writable and we haven't finished sending queries,
+ * send some.
+ */
+ if (!write_done && FD_ISSET(sock, &out_fds))
+ {
+ for (;;)
+ {
+ int flush;
+
+ /*
+ * provoke uniqueness violation exactly once after having
+ * switched to read mode.
+ */
+ if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
+ {
+ sprintf(paramValue0, "%d", numsent / 2);
+ fprintf(stderr, "E");
+ error_sent = true;
+ }
+ else
+ {
+ fprintf(stderr, ".");
+ sprintf(paramValue0, "%d", ctr++);
+ }
+
+ if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
+ pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
+ numsent++;
+
+ /* Are we done writing? */
+ if (socketful != 0 && numsent % socketful == 42 && error_sent)
+ {
+ if (PQsendFlushRequest(conn) != 1)
+ pg_fatal("failed to send flush request");
+ write_done = true;
+ fprintf(stderr, "\ndone writing\n");
+ PQflush(conn);
+ break;
+ }
+
+ /* is the outgoing socket full? */
+ flush = PQflush(conn);
+ if (flush == -1)
+ pg_fatal("failed to flush: %s", PQerrorMessage(conn));
+ if (flush == 1)
+ {
+ if (socketful == 0)
+ socketful = numsent;
+ fprintf(stderr, "\nswitch to reading\n");
+ switched++;
+ break;
+ }
+ }
+ }
+ }
+
+ if (!got_error)
+ pg_fatal("did not get expected error");
+
+ fprintf(stderr, "ok\n");
+}
+
+/*
+ * Subroutine for test_uniqviol; given a PGresult, print it out and consume
+ * the expected NULL that should follow it.
+ *
+ * Returns true if we read a fatal error message, otherwise false.
+ */
+static bool
+process_result(PGconn *conn, PGresult *res, int results, int numsent)
+{
+ PGresult *res2;
+ bool got_error = false;
+
+ if (res == NULL)
+ pg_fatal("got unexpected NULL");
+
+ switch (PQresultStatus(res))
+ {
+ case PGRES_FATAL_ERROR:
+ got_error = true;
+ fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
+ PQclear(res);
+
+ res2 = PQgetResult(conn);
+ if (res2 != NULL)
+ pg_fatal("expected NULL, got %s",
+ PQresStatus(PQresultStatus(res2)));
+ break;
+
+ case PGRES_TUPLES_OK:
+ fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
+ PQclear(res);
+
+ res2 = PQgetResult(conn);
+ if (res2 != NULL)
+ pg_fatal("expected NULL, got %s",
+ PQresStatus(PQresultStatus(res2)));
+ break;
+
+ case PGRES_PIPELINE_ABORTED:
+ fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
+ res2 = PQgetResult(conn);
+ if (res2 != NULL)
+ pg_fatal("expected NULL, got %s",
+ PQresStatus(PQresultStatus(res2)));
+ break;
+
+ default:
+ pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
+ }
+
+ return got_error;
+}
+
+
+static void
+usage(const char *progname)
+{
+ fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
+ fprintf(stderr, "Usage:\n");
+ fprintf(stderr, " %s [OPTION] tests\n", progname);
+ fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
+ fprintf(stderr, "\nOptions:\n");
+ fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
+ fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
+}
+
+static void
+print_test_list(void)
+{
+ printf("disallowed_in_pipeline\n");
+ printf("multi_pipelines\n");
+ printf("nosync\n");
+ printf("pipeline_abort\n");
+ printf("pipeline_idle\n");
+ printf("pipelined_insert\n");
+ printf("prepared\n");
+ printf("simple_pipeline\n");
+ printf("singlerow\n");
+ printf("transaction\n");
+ printf("uniqviol\n");
+}
+
+int
+main(int argc, char **argv)
+{
+ const char *conninfo = "";
+ PGconn *conn;
+ FILE *trace;
+ char *testname;
+ int numrows = 10000;
+ PGresult *res;
+ int c;
+
+ while ((c = getopt(argc, argv, "t:r:")) != -1)
+ {
+ switch (c)
+ {
+ case 't': /* trace file */
+ tracefile = pg_strdup(optarg);
+ break;
+ case 'r': /* numrows */
+ errno = 0;
+ numrows = strtol(optarg, NULL, 10);
+ if (errno != 0 || numrows <= 0)
+ {
+ fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
+ optarg);
+ exit(1);
+ }
+ break;
+ }
+ }
+
+ if (optind < argc)
+ {
+ testname = pg_strdup(argv[optind]);
+ optind++;
+ }
+ else
+ {
+ usage(argv[0]);
+ exit(1);
+ }
+
+ if (strcmp(testname, "tests") == 0)
+ {
+ print_test_list();
+ exit(0);
+ }
+
+ if (optind < argc)
+ {
+ conninfo = pg_strdup(argv[optind]);
+ optind++;
+ }
+
+ /* Make a connection to the database */
+ conn = PQconnectdb(conninfo);
+ if (PQstatus(conn) != CONNECTION_OK)
+ {
+ fprintf(stderr, "Connection to database failed: %s\n",
+ PQerrorMessage(conn));
+ exit_nicely(conn);
+ }
+
+ res = PQexec(conn, "SET lc_messages TO \"C\"");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
+ res = PQexec(conn, "SET force_parallel_mode = off");
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("failed to set force_parallel_mode: %s", PQerrorMessage(conn));
+
+ /* Set the trace file, if requested */
+ if (tracefile != NULL)
+ {
+ if (strcmp(tracefile, "-") == 0)
+ trace = stdout;
+ else
+ trace = fopen(tracefile, "w");
+ if (trace == NULL)
+ pg_fatal("could not open file \"%s\": %m", tracefile);
+
+ /* Make it line-buffered */
+ setvbuf(trace, NULL, PG_IOLBF, 0);
+
+ PQtrace(conn, trace);
+ PQsetTraceFlags(conn,
+ PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
+ }
+
+ if (strcmp(testname, "disallowed_in_pipeline") == 0)
+ test_disallowed_in_pipeline(conn);
+ else if (strcmp(testname, "multi_pipelines") == 0)
+ test_multi_pipelines(conn);
+ else if (strcmp(testname, "nosync") == 0)
+ test_nosync(conn);
+ else if (strcmp(testname, "pipeline_abort") == 0)
+ test_pipeline_abort(conn);
+ else if (strcmp(testname, "pipeline_idle") == 0)
+ test_pipeline_idle(conn);
+ else if (strcmp(testname, "pipelined_insert") == 0)
+ test_pipelined_insert(conn, numrows);
+ else if (strcmp(testname, "prepared") == 0)
+ test_prepared(conn);
+ else if (strcmp(testname, "simple_pipeline") == 0)
+ test_simple_pipeline(conn);
+ else if (strcmp(testname, "singlerow") == 0)
+ test_singlerowmode(conn);
+ else if (strcmp(testname, "transaction") == 0)
+ test_transaction(conn);
+ else if (strcmp(testname, "uniqviol") == 0)
+ test_uniqviol(conn);
+ else
+ {
+ fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
+ exit(1);
+ }
+
+ /* close the connection to the database and cleanup */
+ PQfinish(conn);
+ return 0;
+}
diff --git a/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
new file mode 100644
index 0000000..0821329
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/t/001_libpq_pipeline.pl
@@ -0,0 +1,78 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('main');
+$node->init;
+$node->start;
+
+my $numrows = 700;
+
+my ($out, $err) = run_command([ 'libpq_pipeline', 'tests' ]);
+die "oops: $err" unless $err eq '';
+my @tests = split(/\s+/, $out);
+
+mkdir "$PostgreSQL::Test::Utils::tmp_check/traces";
+
+for my $testname (@tests)
+{
+ my @extraargs = ('-r', $numrows);
+ my $cmptrace = grep(/^$testname$/,
+ qw(simple_pipeline nosync multi_pipelines prepared singlerow
+ pipeline_abort pipeline_idle transaction
+ disallowed_in_pipeline)) > 0;
+
+ # For a bunch of tests, generate a libpq trace file too.
+ my $traceout =
+ "$PostgreSQL::Test::Utils::tmp_check/traces/$testname.trace";
+ if ($cmptrace)
+ {
+ push @extraargs, "-t", $traceout;
+ }
+
+ # Execute the test
+ $node->command_ok(
+ [
+ 'libpq_pipeline', @extraargs,
+ $testname, $node->connstr('postgres')
+ ],
+ "libpq_pipeline $testname");
+
+ # Compare the trace, if requested
+ if ($cmptrace)
+ {
+ my $expected;
+ my $result;
+
+ $expected = slurp_file_eval("traces/$testname.trace");
+ next unless $expected ne "";
+ $result = slurp_file_eval($traceout);
+ next unless $result ne "";
+
+ is($result, $expected, "$testname trace match");
+ }
+}
+
+$node->stop('fast');
+
+done_testing();
+
+sub slurp_file_eval
+{
+ my $filepath = shift;
+ my $contents;
+
+ eval { $contents = slurp_file($filepath); };
+ if ($@)
+ {
+ fail "reading $filepath: $@";
+ return "";
+ }
+ return $contents;
+}
diff --git a/src/test/modules/libpq_pipeline/traces/disallowed_in_pipeline.trace b/src/test/modules/libpq_pipeline/traces/disallowed_in_pipeline.trace
new file mode 100644
index 0000000..dd6df03
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/traces/disallowed_in_pipeline.trace
@@ -0,0 +1,6 @@
+F 13 Query "SELECT 1"
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '1'
+B 13 CommandComplete "SELECT 1"
+B 5 ReadyForQuery I
+F 4 Terminate
diff --git a/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace
new file mode 100644
index 0000000..4b9ab07
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/traces/multi_pipelines.trace
@@ -0,0 +1,23 @@
+F 21 Parse "" "SELECT $1" 1 NNNN
+F 19 Bind "" "" 0 1 1 '1' 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Sync
+F 21 Parse "" "SELECT $1" 1 NNNN
+F 19 Bind "" "" 0 1 1 '1' 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Sync
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '1'
+B 13 CommandComplete "SELECT 1"
+B 5 ReadyForQuery I
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '1'
+B 13 CommandComplete "SELECT 1"
+B 5 ReadyForQuery I
+F 4 Terminate
diff --git a/src/test/modules/libpq_pipeline/traces/nosync.trace b/src/test/modules/libpq_pipeline/traces/nosync.trace
new file mode 100644
index 0000000..d99aac6
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/traces/nosync.trace
@@ -0,0 +1,92 @@
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 34 Parse "" "SELECT repeat('xyzxz', 12)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+B 4 ParseComplete
+B 4 BindComplete
+B 31 RowDescription 1 "repeat" NNNN 0 NNNN 65535 -1 0
+B 70 DataRow 1 60 'xyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxzxyzxz'
+B 13 CommandComplete "SELECT 1"
+F 4 Terminate
diff --git a/src/test/modules/libpq_pipeline/traces/pipeline_abort.trace b/src/test/modules/libpq_pipeline/traces/pipeline_abort.trace
new file mode 100644
index 0000000..cf6ccec
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/traces/pipeline_abort.trace
@@ -0,0 +1,62 @@
+F 42 Query "DROP TABLE IF EXISTS pq_pipeline_demo"
+B NN NoticeResponse S "NOTICE" V "NOTICE" C "00000" M "table "pq_pipeline_demo" does not exist, skipping" F "SSSS" L "SSSS" R "SSSS" \x00
+B 15 CommandComplete "DROP TABLE"
+B 5 ReadyForQuery I
+F 99 Query "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,int8filler int8);"
+B 17 CommandComplete "CREATE TABLE"
+B 5 ReadyForQuery I
+F 60 Parse "" "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)" 1 NNNN
+F 19 Bind "" "" 0 1 1 '1' 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 39 Parse "" "SELECT no_such_function($1)" 1 NNNN
+F 19 Bind "" "" 0 1 1 '1' 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 60 Parse "" "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)" 1 NNNN
+F 19 Bind "" "" 0 1 1 '2' 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Sync
+F 60 Parse "" "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)" 1 NNNN
+F 19 Bind "" "" 0 1 1 '3' 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Sync
+B 4 ParseComplete
+B 4 BindComplete
+B 4 NoData
+B 15 CommandComplete "INSERT 0 1"
+B NN ErrorResponse S "ERROR" V "ERROR" C "42883" M "function no_such_function(integer) does not exist" H "No function matches the given name and argument types. You might need to add explicit type casts." P "8" F "SSSS" L "SSSS" R "SSSS" \x00
+B 5 ReadyForQuery I
+B 4 ParseComplete
+B 4 BindComplete
+B 4 NoData
+B 15 CommandComplete "INSERT 0 1"
+B 5 ReadyForQuery I
+F 26 Parse "" "SELECT 1; SELECT 2" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Sync
+B NN ErrorResponse S "ERROR" V "ERROR" C "42601" M "cannot insert multiple commands into a prepared statement" F "SSSS" L "SSSS" R "SSSS" \x00
+B 5 ReadyForQuery I
+F 54 Parse "" "SELECT 1.0/g FROM generate_series(3, -1, -1) g" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Sync
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 65535 -1 0
+B 32 DataRow 1 22 '0.33333333333333333333'
+B 32 DataRow 1 22 '0.50000000000000000000'
+B 32 DataRow 1 22 '1.00000000000000000000'
+B NN ErrorResponse S "ERROR" V "ERROR" C "22012" M "division by zero" F "SSSS" L "SSSS" R "SSSS" \x00
+B 5 ReadyForQuery I
+F 40 Query "SELECT itemno FROM pq_pipeline_demo"
+B 31 RowDescription 1 "itemno" NNNN 2 NNNN 4 -1 0
+B 11 DataRow 1 1 '3'
+B 13 CommandComplete "SELECT 1"
+B 5 ReadyForQuery I
+F 4 Terminate
diff --git a/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace b/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace
new file mode 100644
index 0000000..83ee415
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/traces/pipeline_idle.trace
@@ -0,0 +1,32 @@
+F 16 Parse "" "SELECT 1" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '1'
+B 13 CommandComplete "SELECT 1"
+F 16 Parse "" "SELECT 2" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '2'
+B 13 CommandComplete "SELECT 1"
+F 49 Parse "" "SELECT pg_catalog.pg_advisory_unlock(1,1)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 43 RowDescription 1 "pg_advisory_unlock" NNNN 0 NNNN 1 -1 0
+B NN NoticeResponse S "WARNING" V "WARNING" C "01000" M "you don't own a lock of type ExclusiveLock" F "SSSS" L "SSSS" R "SSSS" \x00
+B 11 DataRow 1 1 'f'
+B 13 CommandComplete "SELECT 1"
+F 4 Terminate
diff --git a/src/test/modules/libpq_pipeline/traces/prepared.trace b/src/test/modules/libpq_pipeline/traces/prepared.trace
new file mode 100644
index 0000000..1a7de5c
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/traces/prepared.trace
@@ -0,0 +1,18 @@
+F 68 Parse "select_one" "SELECT $1, '42', $1::numeric, interval '1 sec'" 1 NNNN
+F 16 Describe S "select_one"
+F 4 Sync
+B 4 ParseComplete
+B 10 ParameterDescription 1 NNNN
+B 113 RowDescription 4 "?column?" NNNN 0 NNNN 4 -1 0 "?column?" NNNN 0 NNNN 65535 -1 0 "numeric" NNNN 0 NNNN 65535 -1 0 "interval" NNNN 0 NNNN 16 -1 0
+B 5 ReadyForQuery I
+F 10 Query "BEGIN"
+B 10 CommandComplete "BEGIN"
+B 5 ReadyForQuery T
+F 43 Query "DECLARE cursor_one CURSOR FOR SELECT 1"
+B 19 CommandComplete "DECLARE CURSOR"
+B 5 ReadyForQuery T
+F 16 Describe P "cursor_one"
+F 4 Sync
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 5 ReadyForQuery T
+F 4 Terminate
diff --git a/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
new file mode 100644
index 0000000..5c94749
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/traces/simple_pipeline.trace
@@ -0,0 +1,12 @@
+F 21 Parse "" "SELECT $1" 1 NNNN
+F 19 Bind "" "" 0 1 1 '1' 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Sync
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '1'
+B 13 CommandComplete "SELECT 1"
+B 5 ReadyForQuery I
+F 4 Terminate
diff --git a/src/test/modules/libpq_pipeline/traces/singlerow.trace b/src/test/modules/libpq_pipeline/traces/singlerow.trace
new file mode 100644
index 0000000..83043e1
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/traces/singlerow.trace
@@ -0,0 +1,59 @@
+F 38 Parse "" "SELECT generate_series(42, $1)" 0
+F 20 Bind "" "" 0 1 2 '44' 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 38 Parse "" "SELECT generate_series(42, $1)" 0
+F 20 Bind "" "" 0 1 2 '45' 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 38 Parse "" "SELECT generate_series(42, $1)" 0
+F 20 Bind "" "" 0 1 2 '46' 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Sync
+B 4 ParseComplete
+B 4 BindComplete
+B 40 RowDescription 1 "generate_series" NNNN 0 NNNN 4 -1 0
+B 12 DataRow 1 2 '42'
+B 12 DataRow 1 2 '43'
+B 12 DataRow 1 2 '44'
+B 13 CommandComplete "SELECT 3"
+B 4 ParseComplete
+B 4 BindComplete
+B 40 RowDescription 1 "generate_series" NNNN 0 NNNN 4 -1 0
+B 12 DataRow 1 2 '42'
+B 12 DataRow 1 2 '43'
+B 12 DataRow 1 2 '44'
+B 12 DataRow 1 2 '45'
+B 13 CommandComplete "SELECT 4"
+B 4 ParseComplete
+B 4 BindComplete
+B 40 RowDescription 1 "generate_series" NNNN 0 NNNN 4 -1 0
+B 12 DataRow 1 2 '42'
+B 12 DataRow 1 2 '43'
+B 12 DataRow 1 2 '44'
+B 12 DataRow 1 2 '45'
+B 12 DataRow 1 2 '46'
+B 13 CommandComplete "SELECT 5"
+B 5 ReadyForQuery I
+F 36 Parse "" "SELECT generate_series(0, 0)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 40 RowDescription 1 "generate_series" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '0'
+B 13 CommandComplete "SELECT 1"
+F 16 Parse "" "SELECT 1" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Flush
+B 4 ParseComplete
+B 4 BindComplete
+B 33 RowDescription 1 "?column?" NNNN 0 NNNN 4 -1 0
+B 11 DataRow 1 1 '1'
+B 13 CommandComplete "SELECT 1"
+F 4 Terminate
diff --git a/src/test/modules/libpq_pipeline/traces/transaction.trace b/src/test/modules/libpq_pipeline/traces/transaction.trace
new file mode 100644
index 0000000..1dcc237
--- /dev/null
+++ b/src/test/modules/libpq_pipeline/traces/transaction.trace
@@ -0,0 +1,61 @@
+F 79 Query "DROP TABLE IF EXISTS pq_pipeline_tst;CREATE TABLE pq_pipeline_tst (id int)"
+B NN NoticeResponse S "NOTICE" V "NOTICE" C "00000" M "table "pq_pipeline_tst" does not exist, skipping" F "SSSS" L "SSSS" R "SSSS" \x00
+B 15 CommandComplete "DROP TABLE"
+B 17 CommandComplete "CREATE TABLE"
+B 5 ReadyForQuery I
+F 24 Parse "rollback" "ROLLBACK" 0
+F 13 Parse "" "BEGIN" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 18 Parse "" "SELECT 0/0" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 22 Bind "" "rollback" 0 0 1 1
+F 6 Describe P ""
+F 9 Execute "" 0
+F 46 Parse "" "INSERT INTO pq_pipeline_tst VALUES (1)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Sync
+F 46 Parse "" "INSERT INTO pq_pipeline_tst VALUES (2)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Sync
+F 22 Bind "" "rollback" 0 0 1 1
+F 6 Describe P ""
+F 9 Execute "" 0
+F 46 Parse "" "INSERT INTO pq_pipeline_tst VALUES (3)" 0
+F 14 Bind "" "" 0 0 1 0
+F 6 Describe P ""
+F 9 Execute "" 0
+F 4 Sync
+F 4 Sync
+B 4 ParseComplete
+B 4 ParseComplete
+B 4 BindComplete
+B 4 NoData
+B 10 CommandComplete "BEGIN"
+B 4 ParseComplete
+B NN ErrorResponse S "ERROR" V "ERROR" C "22012" M "division by zero" F "SSSS" L "SSSS" R "SSSS" \x00
+B 5 ReadyForQuery E
+B NN ErrorResponse S "ERROR" V "ERROR" C "25P02" M "current transaction is aborted, commands ignored until end of transaction block" F "SSSS" L "SSSS" R "SSSS" \x00
+B 5 ReadyForQuery E
+B 4 BindComplete
+B 4 NoData
+B 13 CommandComplete "ROLLBACK"
+B 4 ParseComplete
+B 4 BindComplete
+B 4 NoData
+B 15 CommandComplete "INSERT 0 1"
+B 5 ReadyForQuery I
+B 5 ReadyForQuery I
+F 34 Query "SELECT * FROM pq_pipeline_tst"
+B 27 RowDescription 1 "id" NNNN 1 NNNN 4 -1 0
+B 11 DataRow 1 1 '3'
+B 13 CommandComplete "SELECT 1"
+B 5 ReadyForQuery I
+F 4 Terminate