diff options
Diffstat (limited to '')
-rw-r--r-- | src/interfaces/libpq/fe-exec.c | 73 |
1 files changed, 51 insertions, 22 deletions
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 8989f71..c9450ec 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -846,6 +846,8 @@ pqSaveWriteError(PGconn *conn) * using whatever is in conn->errorMessage. In any case, clear the async * result storage, and update our notion of how much error text has been * returned to the application. + * + * Note that in no case (not even OOM) do we return NULL. */ PGresult * pqPrepareAsyncResult(PGconn *conn) @@ -2119,29 +2121,21 @@ PQgetResult(PGconn *conn) /* * We're about to return the NULL that terminates the round of - * results from the current query; prepare to send the results - * of the next query, if any, when we're called next. If there's - * no next element in the command queue, this gets us in IDLE - * state. + * results from the current query; prepare to send the results of + * the next query, if any, when we're called next. If there's no + * next element in the command queue, this gets us in IDLE state. */ pqPipelineProcessQueue(conn); res = NULL; /* query is complete */ break; case PGASYNC_READY: - - /* - * For any query type other than simple query protocol, we advance - * the command queue here. This is because for simple query - * protocol we can get the READY state multiple times before the - * command is actually complete, since the command string can - * contain many queries. In simple query protocol, the queue - * advance is done by fe-protocol3 when it receives ReadyForQuery. - */ - if (conn->cmd_queue_head && - conn->cmd_queue_head->queryclass != PGQUERY_SIMPLE) - pqCommandQueueAdvance(conn); res = pqPrepareAsyncResult(conn); + + /* Advance the queue as appropriate */ + pqCommandQueueAdvance(conn, false, + res->resultStatus == PGRES_PIPELINE_SYNC); + if (conn->pipelineStatus != PQ_PIPELINE_OFF) { /* @@ -2161,7 +2155,7 @@ PQgetResult(PGconn *conn) * (In other words: we don't return a NULL after a pipeline * sync.) */ - if (res && res->resultStatus == PGRES_PIPELINE_SYNC) + if (res->resultStatus == PGRES_PIPELINE_SYNC) pqPipelineProcessQueue(conn); } else @@ -3040,18 +3034,44 @@ PQexitPipelineMode(PGconn *conn) /* * pqCommandQueueAdvance - * Remove one query from the command queue, when we receive - * all results from the server that pertain to it. + * Remove one query from the command queue, if appropriate. + * + * If we have received all results corresponding to the head element + * in the command queue, remove it. + * + * In simple query protocol we must not advance the command queue until the + * ReadyForQuery message has been received. This is because in simple mode a + * command can have multiple queries, and we must process result for all of + * them before moving on to the next command. + * + * Another consideration is synchronization during error processing in + * extended query protocol: we refuse to advance the queue past a SYNC queue + * element, unless the result we've received is also a SYNC. In particular + * this protects us from advancing when an error is received at an + * inappropriate moment. */ void -pqCommandQueueAdvance(PGconn *conn) +pqCommandQueueAdvance(PGconn *conn, bool isReadyForQuery, bool gotSync) { PGcmdQueueEntry *prevquery; if (conn->cmd_queue_head == NULL) return; - /* delink from queue */ + /* + * If processing a query of simple query protocol, we only advance the + * queue when we receive the ReadyForQuery message for it. + */ + if (conn->cmd_queue_head->queryclass == PGQUERY_SIMPLE && !isReadyForQuery) + return; + + /* + * If we're waiting for a SYNC, don't advance the queue until we get one. + */ + if (conn->cmd_queue_head->queryclass == PGQUERY_SYNC && !gotSync) + return; + + /* delink element from queue */ prevquery = conn->cmd_queue_head; conn->cmd_queue_head = conn->cmd_queue_head->next; @@ -3059,7 +3079,7 @@ pqCommandQueueAdvance(PGconn *conn) if (conn->cmd_queue_head == NULL) conn->cmd_queue_tail = NULL; - /* and make it recyclable */ + /* and make the queue element recyclable */ prevquery->next = NULL; pqRecycleCmdQueueEntry(conn, prevquery); } @@ -3083,6 +3103,7 @@ pqPipelineProcessQueue(PGconn *conn) return; case PGASYNC_IDLE: + /* * If we're in IDLE mode and there's some command in the queue, * get us into PIPELINE_IDLE mode and process normally. Otherwise @@ -3271,6 +3292,14 @@ PQsendFlushRequest(PGconn *conn) return 0; } + /* + * Give the data a push (in pipeline mode, only if we're past the size + * threshold). In nonblock mode, don't complain if we're unable to send + * it all; PQgetResult() will do any additional flushing needed. + */ + if (pqPipelineFlush(conn) < 0) + return 0; + return 1; } |