diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-16 19:46:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-16 19:46:48 +0000 |
commit | 311bcfc6b3acdd6fd152798c7f287ddf74fa2a98 (patch) | |
tree | 0ec307299b1dada3701e42f4ca6eda57d708261e /src/test/isolation/isolationtester.c | |
parent | Initial commit. (diff) | |
download | postgresql-15-311bcfc6b3acdd6fd152798c7f287ddf74fa2a98.tar.xz postgresql-15-311bcfc6b3acdd6fd152798c7f287ddf74fa2a98.zip |
Adding upstream version 15.4.upstream/15.4upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/test/isolation/isolationtester.c')
-rw-r--r-- | src/test/isolation/isolationtester.c | 1149 |
1 files changed, 1149 insertions, 0 deletions
diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c new file mode 100644 index 0000000..12179f2 --- /dev/null +++ b/src/test/isolation/isolationtester.c @@ -0,0 +1,1149 @@ +/* + * src/test/isolation/isolationtester.c + * + * isolationtester.c + * Runs an isolation test specified by a spec file. + */ + +#include "postgres_fe.h" + +#include <sys/time.h> +#ifdef HAVE_SYS_SELECT_H +#include <sys/select.h> +#endif + +#include "datatype/timestamp.h" +#include "isolationtester.h" +#include "libpq-fe.h" +#include "pg_getopt.h" +#include "pqexpbuffer.h" + +#define PREP_WAITING "isolationtester_waiting" + +/* + * conns[0] is the global setup, teardown, and watchdog connection. Additional + * connections represent spec-defined sessions. + */ +typedef struct IsoConnInfo +{ + /* The libpq connection object for this connection. */ + PGconn *conn; + /* The backend PID, in numeric and string formats. */ + int backend_pid; + const char *backend_pid_str; + /* Name of the associated session. */ + const char *sessionname; + /* Active step on this connection, or NULL if idle. */ + PermutationStep *active_step; + /* Number of NOTICE messages received from connection. */ + int total_notices; +} IsoConnInfo; + +static IsoConnInfo *conns = NULL; +static int nconns = 0; + +/* Flag indicating some new NOTICE has arrived */ +static bool any_new_notice = false; + +/* Maximum time to wait before giving up on a step (in usec) */ +static int64 max_step_wait = 300 * USECS_PER_SEC; + + +static void check_testspec(TestSpec *testspec); +static void run_testspec(TestSpec *testspec); +static void run_all_permutations(TestSpec *testspec); +static void run_all_permutations_recurse(TestSpec *testspec, int *piles, + int nsteps, PermutationStep **steps); +static void run_named_permutations(TestSpec *testspec); +static void run_permutation(TestSpec *testspec, int nsteps, + PermutationStep **steps); + +/* Flag bits for try_complete_step(s) */ +#define STEP_NONBLOCK 0x1 /* return as soon as cmd waits for a lock */ +#define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */ + +static int try_complete_steps(TestSpec *testspec, PermutationStep **waiting, + int nwaiting, int flags); +static bool try_complete_step(TestSpec *testspec, PermutationStep *pstep, + int flags); + +static int step_qsort_cmp(const void *a, const void *b); +static int step_bsearch_cmp(const void *a, const void *b); + +static bool step_has_blocker(PermutationStep *pstep); +static void printResultSet(PGresult *res); +static void isotesterNoticeProcessor(void *arg, const char *message); +static void blackholeNoticeProcessor(void *arg, const char *message); + +static void +disconnect_atexit(void) +{ + int i; + + for (i = 0; i < nconns; i++) + if (conns[i].conn) + PQfinish(conns[i].conn); +} + +int +main(int argc, char **argv) +{ + const char *conninfo; + const char *env_wait; + TestSpec *testspec; + PGresult *res; + PQExpBufferData wait_query; + int opt; + int i; + + while ((opt = getopt(argc, argv, "V")) != -1) + { + switch (opt) + { + case 'V': + puts("isolationtester (PostgreSQL) " PG_VERSION); + exit(0); + default: + fprintf(stderr, "Usage: isolationtester [CONNINFO]\n"); + return EXIT_FAILURE; + } + } + + /* + * Make stdout unbuffered to match stderr; and ensure stderr is unbuffered + * too, which it should already be everywhere except sometimes in Windows. + */ + setbuf(stdout, NULL); + setbuf(stderr, NULL); + + /* + * If the user supplies a non-option parameter on the command line, use it + * as the conninfo string; otherwise default to setting dbname=postgres + * and using environment variables or defaults for all other connection + * parameters. + */ + if (argc > optind) + conninfo = argv[optind]; + else + conninfo = "dbname = postgres"; + + /* + * If PGISOLATIONTIMEOUT is set in the environment, adopt its value (given + * in seconds) as the max time to wait for any one step to complete. + */ + env_wait = getenv("PGISOLATIONTIMEOUT"); + if (env_wait != NULL) + max_step_wait = ((int64) atoi(env_wait)) * USECS_PER_SEC; + + /* Read the test spec from stdin */ + spec_yyparse(); + testspec = &parseresult; + + /* Perform post-parse checking, and fill in linking fields */ + check_testspec(testspec); + + printf("Parsed test spec with %d sessions\n", testspec->nsessions); + + /* + * Establish connections to the database, one for each session and an + * extra for lock wait detection and global work. + */ + nconns = 1 + testspec->nsessions; + conns = (IsoConnInfo *) pg_malloc0(nconns * sizeof(IsoConnInfo)); + atexit(disconnect_atexit); + + for (i = 0; i < nconns; i++) + { + const char *sessionname; + + if (i == 0) + sessionname = "control connection"; + else + sessionname = testspec->sessions[i - 1]->name; + + conns[i].sessionname = sessionname; + + conns[i].conn = PQconnectdb(conninfo); + if (PQstatus(conns[i].conn) != CONNECTION_OK) + { + fprintf(stderr, "Connection %d failed: %s", + i, PQerrorMessage(conns[i].conn)); + exit(1); + } + + /* + * Set up notice processors for the user-defined connections, so that + * messages can get printed prefixed with the session names. The + * control connection gets a "blackhole" processor instead (hides all + * messages). + */ + if (i != 0) + PQsetNoticeProcessor(conns[i].conn, + isotesterNoticeProcessor, + (void *) &conns[i]); + else + PQsetNoticeProcessor(conns[i].conn, + blackholeNoticeProcessor, + NULL); + + /* + * Similarly, append the session name to application_name to make it + * easier to map spec file sessions to log output and + * pg_stat_activity. The reason to append instead of just setting the + * name is that we don't know the name of the test currently running. + */ + res = PQexecParams(conns[i].conn, + "SELECT set_config('application_name',\n" + " current_setting('application_name') || '/' || $1,\n" + " false)", + 1, NULL, + &sessionname, + NULL, NULL, 0); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, "setting of application name failed: %s", + PQerrorMessage(conns[i].conn)); + exit(1); + } + + /* Save each connection's backend PID for subsequent use. */ + conns[i].backend_pid = PQbackendPID(conns[i].conn); + conns[i].backend_pid_str = psprintf("%d", conns[i].backend_pid); + } + + /* + * Build the query we'll use to detect lock contention among sessions in + * the test specification. Most of the time, we could get away with + * simply checking whether a session is waiting for *any* lock: we don't + * exactly expect concurrent use of test tables. However, autovacuum will + * occasionally take AccessExclusiveLock to truncate a table, and we must + * ignore that transient wait. + */ + initPQExpBuffer(&wait_query); + appendPQExpBufferStr(&wait_query, + "SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{"); + /* The spec syntax requires at least one session; assume that here. */ + appendPQExpBufferStr(&wait_query, conns[1].backend_pid_str); + for (i = 2; i < nconns; i++) + appendPQExpBuffer(&wait_query, ",%s", conns[i].backend_pid_str); + appendPQExpBufferStr(&wait_query, "}')"); + + res = PQprepare(conns[0].conn, PREP_WAITING, wait_query.data, 0, NULL); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, "prepare of lock wait query failed: %s", + PQerrorMessage(conns[0].conn)); + exit(1); + } + PQclear(res); + termPQExpBuffer(&wait_query); + + /* + * Run the permutations specified in the spec, or all if none were + * explicitly specified. + */ + run_testspec(testspec); + + return 0; +} + +/* + * Validity-check the test spec and fill in cross-links between nodes. + */ +static void +check_testspec(TestSpec *testspec) +{ + int nallsteps; + Step **allsteps; + int i, + j, + k; + + /* Create a sorted lookup table of all steps. */ + nallsteps = 0; + for (i = 0; i < testspec->nsessions; i++) + nallsteps += testspec->sessions[i]->nsteps; + + allsteps = pg_malloc(nallsteps * sizeof(Step *)); + + k = 0; + for (i = 0; i < testspec->nsessions; i++) + { + for (j = 0; j < testspec->sessions[i]->nsteps; j++) + allsteps[k++] = testspec->sessions[i]->steps[j]; + } + + qsort(allsteps, nallsteps, sizeof(Step *), step_qsort_cmp); + + /* Verify that all step names are unique. */ + for (i = 1; i < nallsteps; i++) + { + if (strcmp(allsteps[i - 1]->name, + allsteps[i]->name) == 0) + { + fprintf(stderr, "duplicate step name: %s\n", + allsteps[i]->name); + exit(1); + } + } + + /* Set the session index fields in steps. */ + for (i = 0; i < testspec->nsessions; i++) + { + Session *session = testspec->sessions[i]; + + for (j = 0; j < session->nsteps; j++) + session->steps[j]->session = i; + } + + /* + * If we have manually-specified permutations, link PermutationSteps to + * Steps, and fill in blocker links. + */ + for (i = 0; i < testspec->npermutations; i++) + { + Permutation *p = testspec->permutations[i]; + + for (j = 0; j < p->nsteps; j++) + { + PermutationStep *pstep = p->steps[j]; + Step **this = (Step **) bsearch(pstep->name, + allsteps, + nallsteps, + sizeof(Step *), + step_bsearch_cmp); + + if (this == NULL) + { + fprintf(stderr, "undefined step \"%s\" specified in permutation\n", + pstep->name); + exit(1); + } + pstep->step = *this; + + /* Mark the step used, for check below */ + pstep->step->used = true; + } + + /* + * Identify any blocker steps. We search only the current + * permutation, since steps not used there couldn't be concurrent. + * Note that it's OK to reference later permutation steps, so this + * can't be combined with the previous loop. + */ + for (j = 0; j < p->nsteps; j++) + { + PermutationStep *pstep = p->steps[j]; + + for (k = 0; k < pstep->nblockers; k++) + { + PermutationStepBlocker *blocker = pstep->blockers[k]; + int n; + + if (blocker->blocktype == PSB_ONCE) + continue; /* nothing to link to */ + + blocker->step = NULL; + for (n = 0; n < p->nsteps; n++) + { + PermutationStep *otherp = p->steps[n]; + + if (strcmp(otherp->name, blocker->stepname) == 0) + { + blocker->step = otherp->step; + break; + } + } + if (blocker->step == NULL) + { + fprintf(stderr, "undefined blocking step \"%s\" referenced in permutation step \"%s\"\n", + blocker->stepname, pstep->name); + exit(1); + } + /* can't block on completion of step of own session */ + if (blocker->step->session == pstep->step->session) + { + fprintf(stderr, "permutation step \"%s\" cannot block on its own session\n", + pstep->name); + exit(1); + } + } + } + } + + /* + * If we have manually-specified permutations, verify that all steps have + * been used, warning about anything defined but not used. We can skip + * this when using automatically-generated permutations. + */ + if (testspec->permutations) + { + for (i = 0; i < nallsteps; i++) + { + if (!allsteps[i]->used) + fprintf(stderr, "unused step name: %s\n", allsteps[i]->name); + } + } + + free(allsteps); +} + +/* + * Run the permutations specified in the spec, or all if none were + * explicitly specified. + */ +static void +run_testspec(TestSpec *testspec) +{ + if (testspec->permutations) + run_named_permutations(testspec); + else + run_all_permutations(testspec); +} + +/* + * Run all permutations of the steps and sessions. + */ +static void +run_all_permutations(TestSpec *testspec) +{ + int nsteps; + int i; + PermutationStep *steps; + PermutationStep **stepptrs; + int *piles; + + /* Count the total number of steps in all sessions */ + nsteps = 0; + for (i = 0; i < testspec->nsessions; i++) + nsteps += testspec->sessions[i]->nsteps; + + /* Create PermutationStep workspace array */ + steps = (PermutationStep *) pg_malloc0(sizeof(PermutationStep) * nsteps); + stepptrs = (PermutationStep **) pg_malloc(sizeof(PermutationStep *) * nsteps); + for (i = 0; i < nsteps; i++) + stepptrs[i] = steps + i; + + /* + * To generate the permutations, we conceptually put the steps of each + * session on a pile. To generate a permutation, we pick steps from the + * piles until all piles are empty. By picking steps from piles in + * different order, we get different permutations. + * + * A pile is actually just an integer which tells how many steps we've + * already picked from this pile. + */ + piles = pg_malloc(sizeof(int) * testspec->nsessions); + for (i = 0; i < testspec->nsessions; i++) + piles[i] = 0; + + run_all_permutations_recurse(testspec, piles, 0, stepptrs); + + free(steps); + free(stepptrs); + free(piles); +} + +static void +run_all_permutations_recurse(TestSpec *testspec, int *piles, + int nsteps, PermutationStep **steps) +{ + int i; + bool found = false; + + for (i = 0; i < testspec->nsessions; i++) + { + /* If there's any more steps in this pile, pick it and recurse */ + if (piles[i] < testspec->sessions[i]->nsteps) + { + Step *newstep = testspec->sessions[i]->steps[piles[i]]; + + /* + * These automatically-generated PermutationSteps never have + * blocker conditions. So we need only fill these fields, relying + * on run_all_permutations() to have zeroed the rest: + */ + steps[nsteps]->name = newstep->name; + steps[nsteps]->step = newstep; + + piles[i]++; + + run_all_permutations_recurse(testspec, piles, nsteps + 1, steps); + + piles[i]--; + + found = true; + } + } + + /* If all the piles were empty, this permutation is completed. Run it */ + if (!found) + run_permutation(testspec, nsteps, steps); +} + +/* + * Run permutations given in the test spec + */ +static void +run_named_permutations(TestSpec *testspec) +{ + int i; + + for (i = 0; i < testspec->npermutations; i++) + { + Permutation *p = testspec->permutations[i]; + + run_permutation(testspec, p->nsteps, p->steps); + } +} + +static int +step_qsort_cmp(const void *a, const void *b) +{ + Step *stepa = *((Step **) a); + Step *stepb = *((Step **) b); + + return strcmp(stepa->name, stepb->name); +} + +static int +step_bsearch_cmp(const void *a, const void *b) +{ + char *stepname = (char *) a; + Step *step = *((Step **) b); + + return strcmp(stepname, step->name); +} + +/* + * Run one permutation + */ +static void +run_permutation(TestSpec *testspec, int nsteps, PermutationStep **steps) +{ + PGresult *res; + int i; + int nwaiting = 0; + PermutationStep **waiting; + + waiting = pg_malloc(sizeof(PermutationStep *) * testspec->nsessions); + + printf("\nstarting permutation:"); + for (i = 0; i < nsteps; i++) + printf(" %s", steps[i]->name); + printf("\n"); + + /* Perform setup */ + for (i = 0; i < testspec->nsetupsqls; i++) + { + res = PQexec(conns[0].conn, testspec->setupsqls[i]); + if (PQresultStatus(res) == PGRES_TUPLES_OK) + { + printResultSet(res); + } + else if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, "setup failed: %s", PQerrorMessage(conns[0].conn)); + exit(1); + } + PQclear(res); + } + + /* Perform per-session setup */ + for (i = 0; i < testspec->nsessions; i++) + { + if (testspec->sessions[i]->setupsql) + { + res = PQexec(conns[i + 1].conn, testspec->sessions[i]->setupsql); + if (PQresultStatus(res) == PGRES_TUPLES_OK) + { + printResultSet(res); + } + else if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, "setup of session %s failed: %s", + conns[i + 1].sessionname, + PQerrorMessage(conns[i + 1].conn)); + exit(1); + } + PQclear(res); + } + } + + /* Perform steps */ + for (i = 0; i < nsteps; i++) + { + PermutationStep *pstep = steps[i]; + Step *step = pstep->step; + IsoConnInfo *iconn = &conns[1 + step->session]; + PGconn *conn = iconn->conn; + bool mustwait; + int j; + + /* + * Check whether the session that needs to perform the next step is + * still blocked on an earlier step. If so, wait for it to finish. + */ + if (iconn->active_step != NULL) + { + struct timeval start_time; + + gettimeofday(&start_time, NULL); + + while (iconn->active_step != NULL) + { + PermutationStep *oldstep = iconn->active_step; + + /* + * Wait for oldstep. But even though we don't use + * STEP_NONBLOCK, it might not complete because of blocker + * conditions. + */ + if (!try_complete_step(testspec, oldstep, STEP_RETRY)) + { + /* Done, so remove oldstep from the waiting[] array. */ + int w; + + for (w = 0; w < nwaiting; w++) + { + if (oldstep == waiting[w]) + break; + } + if (w >= nwaiting) + abort(); /* can't happen */ + if (w + 1 < nwaiting) + memmove(&waiting[w], &waiting[w + 1], + (nwaiting - (w + 1)) * sizeof(PermutationStep *)); + nwaiting--; + } + + /* + * Check for other steps that have finished. We should do + * this if oldstep completed, as it might have unblocked + * something. On the other hand, if oldstep hasn't completed, + * we must poll all the active steps in hopes of unblocking + * oldstep. So either way, poll them. + */ + nwaiting = try_complete_steps(testspec, waiting, nwaiting, + STEP_NONBLOCK | STEP_RETRY); + + /* + * If the target session is still busy, apply a timeout to + * keep from hanging indefinitely, which could happen with + * incorrect blocker annotations. Use the same 2 * + * max_step_wait limit as try_complete_step does for deciding + * to die. (We don't bother with trying to cancel anything, + * since it's unclear what to cancel in this case.) + */ + if (iconn->active_step != NULL) + { + struct timeval current_time; + int64 td; + + gettimeofday(¤t_time, NULL); + td = (int64) current_time.tv_sec - (int64) start_time.tv_sec; + td *= USECS_PER_SEC; + td += (int64) current_time.tv_usec - (int64) start_time.tv_usec; + if (td > 2 * max_step_wait) + { + fprintf(stderr, "step %s timed out after %d seconds\n", + iconn->active_step->name, + (int) (td / USECS_PER_SEC)); + fprintf(stderr, "active steps are:"); + for (j = 1; j < nconns; j++) + { + IsoConnInfo *oconn = &conns[j]; + + if (oconn->active_step != NULL) + fprintf(stderr, " %s", + oconn->active_step->name); + } + fprintf(stderr, "\n"); + exit(1); + } + } + } + } + + /* Send the query for this step. */ + if (!PQsendQuery(conn, step->sql)) + { + fprintf(stdout, "failed to send query for step %s: %s\n", + step->name, PQerrorMessage(conn)); + exit(1); + } + + /* Remember we launched a step. */ + iconn->active_step = pstep; + + /* Remember target number of NOTICEs for any blocker conditions. */ + for (j = 0; j < pstep->nblockers; j++) + { + PermutationStepBlocker *blocker = pstep->blockers[j]; + + if (blocker->blocktype == PSB_NUM_NOTICES) + blocker->target_notices = blocker->num_notices + + conns[blocker->step->session + 1].total_notices; + } + + /* Try to complete this step without blocking. */ + mustwait = try_complete_step(testspec, pstep, STEP_NONBLOCK); + + /* Check for completion of any steps that were previously waiting. */ + nwaiting = try_complete_steps(testspec, waiting, nwaiting, + STEP_NONBLOCK | STEP_RETRY); + + /* If this step is waiting, add it to the array of waiters. */ + if (mustwait) + waiting[nwaiting++] = pstep; + } + + /* Wait for any remaining queries. */ + nwaiting = try_complete_steps(testspec, waiting, nwaiting, STEP_RETRY); + if (nwaiting != 0) + { + fprintf(stderr, "failed to complete permutation due to mutually-blocking steps\n"); + exit(1); + } + + /* Perform per-session teardown */ + for (i = 0; i < testspec->nsessions; i++) + { + if (testspec->sessions[i]->teardownsql) + { + res = PQexec(conns[i + 1].conn, testspec->sessions[i]->teardownsql); + if (PQresultStatus(res) == PGRES_TUPLES_OK) + { + printResultSet(res); + } + else if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, "teardown of session %s failed: %s", + conns[i + 1].sessionname, + PQerrorMessage(conns[i + 1].conn)); + /* don't exit on teardown failure */ + } + PQclear(res); + } + } + + /* Perform teardown */ + if (testspec->teardownsql) + { + res = PQexec(conns[0].conn, testspec->teardownsql); + if (PQresultStatus(res) == PGRES_TUPLES_OK) + { + printResultSet(res); + } + else if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, "teardown failed: %s", + PQerrorMessage(conns[0].conn)); + /* don't exit on teardown failure */ + } + PQclear(res); + } + + free(waiting); +} + +/* + * Check for completion of any waiting step(s). + * Remove completed ones from the waiting[] array, + * and return the new value of nwaiting. + * See try_complete_step for the meaning of the flags. + */ +static int +try_complete_steps(TestSpec *testspec, PermutationStep **waiting, + int nwaiting, int flags) +{ + int old_nwaiting; + bool have_blocker; + + do + { + int w = 0; + + /* Reset latch; we only care about notices received within loop. */ + any_new_notice = false; + + /* Likewise, these variables reset for each retry. */ + old_nwaiting = nwaiting; + have_blocker = false; + + /* Scan the array, try to complete steps. */ + while (w < nwaiting) + { + if (try_complete_step(testspec, waiting[w], flags)) + { + /* Still blocked, leave it alone. */ + if (waiting[w]->nblockers > 0) + have_blocker = true; + w++; + } + else + { + /* Done, remove it from array. */ + if (w + 1 < nwaiting) + memmove(&waiting[w], &waiting[w + 1], + (nwaiting - (w + 1)) * sizeof(PermutationStep *)); + nwaiting--; + } + } + + /* + * If any of the still-waiting steps have blocker conditions attached, + * it's possible that one of the steps we examined afterwards has + * released them (either by completing, or by sending a NOTICE). If + * any step completions or NOTICEs happened, repeat the loop until + * none occurs. Without this provision, completion timing could vary + * depending on the order in which the steps appear in the array. + */ + } while (have_blocker && (nwaiting < old_nwaiting || any_new_notice)); + return nwaiting; +} + +/* + * Our caller already sent the query associated with this step. Wait for it + * to either complete, or hit a blocking condition. + * + * When calling this function on behalf of a given step for a second or later + * time, pass the STEP_RETRY flag. Do not pass it on the first call. + * + * Returns true if the step was *not* completed, false if it was completed. + * Reasons for non-completion are (a) the STEP_NONBLOCK flag was specified + * and the query is waiting to acquire a lock, or (b) the step has an + * unsatisfied blocker condition. When STEP_NONBLOCK is given, we assume + * that any lock wait will persist until we have executed additional steps. + */ +static bool +try_complete_step(TestSpec *testspec, PermutationStep *pstep, int flags) +{ + Step *step = pstep->step; + IsoConnInfo *iconn = &conns[1 + step->session]; + PGconn *conn = iconn->conn; + fd_set read_set; + struct timeval start_time; + struct timeval timeout; + int sock = PQsocket(conn); + int ret; + PGresult *res; + PGnotify *notify; + bool canceled = false; + + /* + * If the step is annotated with (*), then on the first call, force it to + * wait. This is useful for ensuring consistent output when the step + * might or might not complete so fast that we don't observe it waiting. + */ + if (!(flags & STEP_RETRY)) + { + int i; + + for (i = 0; i < pstep->nblockers; i++) + { + PermutationStepBlocker *blocker = pstep->blockers[i]; + + if (blocker->blocktype == PSB_ONCE) + { + printf("step %s: %s <waiting ...>\n", + step->name, step->sql); + return true; + } + } + } + + if (sock < 0) + { + fprintf(stderr, "invalid socket: %s", PQerrorMessage(conn)); + exit(1); + } + + gettimeofday(&start_time, NULL); + FD_ZERO(&read_set); + + while (PQisBusy(conn)) + { + FD_SET(sock, &read_set); + timeout.tv_sec = 0; + timeout.tv_usec = 10000; /* Check for lock waits every 10ms. */ + + ret = select(sock + 1, &read_set, NULL, NULL, &timeout); + if (ret < 0) /* error in select() */ + { + if (errno == EINTR) + continue; + fprintf(stderr, "select failed: %s\n", strerror(errno)); + exit(1); + } + else if (ret == 0) /* select() timeout: check for lock wait */ + { + struct timeval current_time; + int64 td; + + /* If it's OK for the step to block, check whether it has. */ + if (flags & STEP_NONBLOCK) + { + bool waiting; + + res = PQexecPrepared(conns[0].conn, PREP_WAITING, 1, + &conns[step->session + 1].backend_pid_str, + NULL, NULL, 0); + if (PQresultStatus(res) != PGRES_TUPLES_OK || + PQntuples(res) != 1) + { + fprintf(stderr, "lock wait query failed: %s", + PQerrorMessage(conns[0].conn)); + exit(1); + } + waiting = ((PQgetvalue(res, 0, 0))[0] == 't'); + PQclear(res); + + if (waiting) /* waiting to acquire a lock */ + { + /* + * Since it takes time to perform the lock-check query, + * some data --- notably, NOTICE messages --- might have + * arrived since we looked. We must call PQconsumeInput + * and then PQisBusy to collect and process any such + * messages. In the (unlikely) case that PQisBusy then + * returns false, we might as well go examine the + * available result. + */ + if (!PQconsumeInput(conn)) + { + fprintf(stderr, "PQconsumeInput failed: %s\n", + PQerrorMessage(conn)); + exit(1); + } + if (!PQisBusy(conn)) + break; + + /* + * conn is still busy, so conclude that the step really is + * waiting. + */ + if (!(flags & STEP_RETRY)) + printf("step %s: %s <waiting ...>\n", + step->name, step->sql); + return true; + } + /* else, not waiting */ + } + + /* Figure out how long we've been waiting for this step. */ + gettimeofday(¤t_time, NULL); + td = (int64) current_time.tv_sec - (int64) start_time.tv_sec; + td *= USECS_PER_SEC; + td += (int64) current_time.tv_usec - (int64) start_time.tv_usec; + + /* + * After max_step_wait microseconds, try to cancel the query. + * + * If the user tries to test an invalid permutation, we don't want + * to hang forever, especially when this is running in the + * buildfarm. This will presumably lead to this permutation + * failing, but remaining permutations and tests should still be + * OK. + */ + if (td > max_step_wait && !canceled) + { + PGcancel *cancel = PQgetCancel(conn); + + if (cancel != NULL) + { + char buf[256]; + + if (PQcancel(cancel, buf, sizeof(buf))) + { + /* + * print to stdout not stderr, as this should appear + * in the test case's results + */ + printf("isolationtester: canceling step %s after %d seconds\n", + step->name, (int) (td / USECS_PER_SEC)); + canceled = true; + } + else + fprintf(stderr, "PQcancel failed: %s\n", buf); + PQfreeCancel(cancel); + } + } + + /* + * After twice max_step_wait, just give up and die. + * + * Since cleanup steps won't be run in this case, this may cause + * later tests to fail. That stinks, but it's better than waiting + * forever for the server to respond to the cancel. + */ + if (td > 2 * max_step_wait) + { + fprintf(stderr, "step %s timed out after %d seconds\n", + step->name, (int) (td / USECS_PER_SEC)); + exit(1); + } + } + else if (!PQconsumeInput(conn)) /* select(): data available */ + { + fprintf(stderr, "PQconsumeInput failed: %s\n", + PQerrorMessage(conn)); + exit(1); + } + } + + /* + * The step is done, but we won't report it as complete so long as there + * are blockers. + */ + if (step_has_blocker(pstep)) + { + if (!(flags & STEP_RETRY)) + printf("step %s: %s <waiting ...>\n", + step->name, step->sql); + return true; + } + + /* Otherwise, go ahead and complete it. */ + if (flags & STEP_RETRY) + printf("step %s: <... completed>\n", step->name); + else + printf("step %s: %s\n", step->name, step->sql); + + while ((res = PQgetResult(conn))) + { + switch (PQresultStatus(res)) + { + case PGRES_COMMAND_OK: + case PGRES_EMPTY_QUERY: + break; + case PGRES_TUPLES_OK: + printResultSet(res); + break; + case PGRES_FATAL_ERROR: + + /* + * Detail may contain XID values, so we want to just show + * primary. Beware however that libpq-generated error results + * may not contain subfields, only an old-style message. + */ + { + const char *sev = PQresultErrorField(res, + PG_DIAG_SEVERITY); + const char *msg = PQresultErrorField(res, + PG_DIAG_MESSAGE_PRIMARY); + + if (sev && msg) + printf("%s: %s\n", sev, msg); + else + printf("%s\n", PQresultErrorMessage(res)); + } + break; + default: + printf("unexpected result status: %s\n", + PQresStatus(PQresultStatus(res))); + } + PQclear(res); + } + + /* Report any available NOTIFY messages, too */ + PQconsumeInput(conn); + while ((notify = PQnotifies(conn)) != NULL) + { + /* Try to identify which session it came from */ + const char *sendername = NULL; + char pidstring[32]; + int i; + + for (i = 0; i < testspec->nsessions; i++) + { + if (notify->be_pid == conns[i + 1].backend_pid) + { + sendername = conns[i + 1].sessionname; + break; + } + } + if (sendername == NULL) + { + /* Doesn't seem to be any test session, so show the hard way */ + snprintf(pidstring, sizeof(pidstring), "PID %d", notify->be_pid); + sendername = pidstring; + } + printf("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n", + testspec->sessions[step->session]->name, + notify->relname, notify->extra, sendername); + PQfreemem(notify); + PQconsumeInput(conn); + } + + /* Connection is now idle. */ + iconn->active_step = NULL; + + return false; +} + +/* Detect whether a step has any unsatisfied blocker conditions */ +static bool +step_has_blocker(PermutationStep *pstep) +{ + int i; + + for (i = 0; i < pstep->nblockers; i++) + { + PermutationStepBlocker *blocker = pstep->blockers[i]; + IsoConnInfo *iconn; + + switch (blocker->blocktype) + { + case PSB_ONCE: + /* Ignore; try_complete_step handles this specially */ + break; + case PSB_OTHER_STEP: + /* Block if referenced step is active */ + iconn = &conns[1 + blocker->step->session]; + if (iconn->active_step && + iconn->active_step->step == blocker->step) + return true; + break; + case PSB_NUM_NOTICES: + /* Block if not enough notices received yet */ + iconn = &conns[1 + blocker->step->session]; + if (iconn->total_notices < blocker->target_notices) + return true; + break; + } + } + return false; +} + +static void +printResultSet(PGresult *res) +{ + PQprintOpt popt; + + memset(&popt, 0, sizeof(popt)); + popt.header = true; + popt.align = true; + popt.fieldSep = "|"; + PQprint(stdout, res, &popt); +} + +/* notice processor for regular user sessions */ +static void +isotesterNoticeProcessor(void *arg, const char *message) +{ + IsoConnInfo *myconn = (IsoConnInfo *) arg; + + /* Prefix the backend's message with the session name. */ + printf("%s: %s", myconn->sessionname, message); + /* Record notices, since we may need this to decide to unblock a step. */ + myconn->total_notices++; + any_new_notice = true; +} + +/* notice processor, hides the message */ +static void +blackholeNoticeProcessor(void *arg, const char *message) +{ + /* do nothing */ +} |