summaryrefslogtreecommitdiffstats
path: root/modules/http2/h2_proxy_session.c
diff options
context:
space:
mode:
Diffstat (limited to 'modules/http2/h2_proxy_session.c')
-rw-r--r--modules/http2/h2_proxy_session.c423
1 files changed, 292 insertions, 131 deletions
diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c
index 8389c7c..db22301 100644
--- a/modules/http2/h2_proxy_session.c
+++ b/modules/http2/h2_proxy_session.c
@@ -20,6 +20,7 @@
#include <mpm_common.h>
#include <httpd.h>
+#include <http_protocol.h>
#include <mod_proxy.h>
#include "mod_http2.h"
@@ -36,6 +37,7 @@ typedef struct h2_proxy_stream {
const char *url;
request_rec *r;
+ conn_rec *cfront;
h2_proxy_request *req;
const char *real_server_uri;
const char *p_server_uri;
@@ -45,6 +47,7 @@ typedef struct h2_proxy_stream {
unsigned int suspended : 1;
unsigned int waiting_on_100 : 1;
unsigned int waiting_on_ping : 1;
+ unsigned int headers_ended : 1;
uint32_t error_code;
apr_bucket_brigade *input;
@@ -61,7 +64,123 @@ static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev,
static void ping_arrived(h2_proxy_session *session);
static apr_status_t check_suspended(h2_proxy_session *session);
static void stream_resume(h2_proxy_stream *stream);
+static apr_status_t submit_trailers(h2_proxy_stream *stream);
+
+/*
+ * The H2_PING connection sub-state: a state independant of the H2_SESSION state
+ * of the connection:
+ * - H2_PING_ST_NONE: no interference with request handling, ProxyTimeout in effect.
+ * When entered, all suspended streams are unsuspended again.
+ * - H2_PING_ST_AWAIT_ANY: new requests are suspended, a possibly configured "ping"
+ * timeout is in effect. Any frame received transits to H2_PING_ST_NONE.
+ * - H2_PING_ST_AWAIT_PING: same as above, but only a PING frame transits
+ * to H2_PING_ST_NONE.
+ *
+ * An AWAIT state is entered on a new connection or when re-using a connection and
+ * the last frame received has been some time ago. The latter sends a PING frame
+ * and insists on an answer, the former is satisfied by any frame received from the
+ * backend.
+ *
+ * This works for new connections as there is always at least one SETTINGS frame
+ * that the backend sends. When re-using connection, we send a PING and insist on
+ * receiving one back, as there might be frames in our connection buffers from
+ * some time ago. Since some servers have protections against PING flooding, we
+ * only ever have one PING unanswered.
+ *
+ * Requests are suspended while in a PING state, as we do not want to send data
+ * before we can be reasonably sure that the connection is working (at least on
+ * the h2 protocol level). This also means that the session can do blocking reads
+ * when expecting PING answers.
+ */
+static void set_ping_timeout(h2_proxy_session *session)
+{
+ if (session->ping_timeout != -1 && session->save_timeout == -1) {
+ apr_socket_t *socket = NULL;
+ socket = ap_get_conn_socket(session->c);
+ if (socket) {
+ apr_socket_timeout_get(socket, &session->save_timeout);
+ apr_socket_timeout_set(socket, session->ping_timeout);
+ }
+ }
+}
+
+static void unset_ping_timeout(h2_proxy_session *session)
+{
+ if (session->save_timeout != -1) {
+ apr_socket_t *socket = NULL;
+
+ socket = ap_get_conn_socket(session->c);
+ if (socket) {
+ apr_socket_timeout_set(socket, session->save_timeout);
+ session->save_timeout = -1;
+ }
+ }
+}
+
+static void enter_ping_state(h2_proxy_session *session, h2_ping_state_t state)
+{
+ if (session->ping_state == state) return;
+ switch (session->ping_state) {
+ case H2_PING_ST_NONE:
+ /* leaving NONE, enforce timeout, send frame maybe */
+ if (H2_PING_ST_AWAIT_PING == state) {
+ unset_ping_timeout(session);
+ nghttp2_submit_ping(session->ngh2, 0, (const uint8_t *)"nevergonnagiveyouup");
+ }
+ set_ping_timeout(session);
+ session->ping_state = state;
+ break;
+ default:
+ /* no switching between the != NONE states */
+ if (H2_PING_ST_NONE == state) {
+ session->ping_state = state;
+ unset_ping_timeout(session);
+ ping_arrived(session);
+ }
+ break;
+ }
+}
+
+static void ping_new_session(h2_proxy_session *session, proxy_conn_rec *p_conn)
+{
+ session->save_timeout = -1;
+ session->ping_timeout = (p_conn->worker->s->ping_timeout_set?
+ p_conn->worker->s->ping_timeout : -1);
+ session->ping_state = H2_PING_ST_NONE;
+ enter_ping_state(session, H2_PING_ST_AWAIT_ANY);
+}
+
+static void ping_reuse_session(h2_proxy_session *session)
+{
+ if (H2_PING_ST_NONE == session->ping_state) {
+ apr_interval_time_t age = apr_time_now() - session->last_frame_received;
+ if (age > apr_time_from_sec(1)) {
+ enter_ping_state(session, H2_PING_ST_AWAIT_PING);
+ }
+ }
+}
+
+static void ping_ev_frame_received(h2_proxy_session *session, const nghttp2_frame *frame)
+{
+ session->last_frame_received = apr_time_now();
+ switch (session->ping_state) {
+ case H2_PING_ST_NONE:
+ /* nop */
+ break;
+ case H2_PING_ST_AWAIT_ANY:
+ enter_ping_state(session, H2_PING_ST_NONE);
+ break;
+ case H2_PING_ST_AWAIT_PING:
+ if (NGHTTP2_PING == frame->hd.type) {
+ enter_ping_state(session, H2_PING_ST_NONE);
+ }
+ /* we may receive many other frames while we are waiting for the
+ * PING answer. They may come all from our connection buffers and
+ * say nothing about the current state of the backend. */
+ break;
+ }
+}
static apr_status_t proxy_session_pre_close(void *theconn)
{
@@ -152,7 +271,8 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
session->id, buffer);
}
- session->last_frame_received = apr_time_now();
+ ping_ev_frame_received(session, frame);
+ /* Action for frame types: */
switch (frame->hd.type) {
case NGHTTP2_HEADERS:
stream = nghttp2_session_get_stream_user_data(ngh2, frame->hd.stream_id);
@@ -193,10 +313,6 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
stream_resume(stream);
break;
case NGHTTP2_PING:
- if (session->check_ping) {
- session->check_ping = 0;
- ping_arrived(session);
- }
break;
case NGHTTP2_PUSH_PROMISE:
break;
@@ -241,7 +357,8 @@ static int add_header(void *table, const char *n, const char *v)
return 1;
}
-static void process_proxy_header(h2_proxy_stream *stream, const char *n, const char *v)
+static void process_proxy_header(apr_table_t *headers, h2_proxy_stream *stream,
+ const char *n, const char *v)
{
static const struct {
const char *name;
@@ -262,20 +379,18 @@ static void process_proxy_header(h2_proxy_stream *stream, const char *n, const c
if (!dconf->preserve_host) {
for (i = 0; transform_hdrs[i].name; ++i) {
if (!ap_cstr_casecmp(transform_hdrs[i].name, n)) {
- apr_table_add(r->headers_out, n,
- (*transform_hdrs[i].func)(r, dconf, v));
+ apr_table_add(headers, n, (*transform_hdrs[i].func)(r, dconf, v));
return;
}
}
if (!ap_cstr_casecmp("Link", n)) {
dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
- apr_table_add(r->headers_out, n,
- h2_proxy_link_reverse_map(r, dconf,
- stream->real_server_uri, stream->p_server_uri, v));
+ apr_table_add(headers, n, h2_proxy_link_reverse_map(r, dconf,
+ stream->real_server_uri, stream->p_server_uri, v));
return;
}
}
- apr_table_add(r->headers_out, n, v);
+ apr_table_add(headers, n, v);
}
static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream,
@@ -287,7 +402,7 @@ static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream,
char *s = apr_pstrndup(stream->r->pool, v, vlen);
apr_table_setn(stream->r->notes, "proxy-status", s);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->cfront,
"h2_proxy_stream(%s-%d): got status %s",
stream->session->id, stream->id, s);
stream->r->status = (int)apr_atoi64(s);
@@ -299,17 +414,22 @@ static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream,
return APR_SUCCESS;
}
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->cfront,
+ "h2_proxy_stream(%s-%d): on_header %s: %s",
+ stream->session->id, stream->id, n, v);
if (!h2_proxy_res_ignore_header(n, nlen)) {
char *hname, *hvalue;
+ apr_table_t *headers = (stream->headers_ended?
+ stream->r->trailers_out : stream->r->headers_out);
hname = apr_pstrndup(stream->pool, n, nlen);
h2_proxy_util_camel_case_header(hname, nlen);
hvalue = apr_pstrndup(stream->pool, v, vlen);
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->cfront,
"h2_proxy_stream(%s-%d): got header %s: %s",
stream->session->id, stream->id, hname, hvalue);
- process_proxy_header(stream, hname, hvalue);
+ process_proxy_header(headers, stream, hname, hvalue);
}
return APR_SUCCESS;
}
@@ -328,6 +448,7 @@ static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream)
h2_proxy_session *session = stream->session;
request_rec *r = stream->r;
apr_pool_t *p = r->pool;
+ const char *buf;
/* Now, add in the cookies from the response to the ones already saved */
apr_table_do(add_header, stream->saves, r->headers_out, "Set-Cookie", NULL);
@@ -337,6 +458,10 @@ static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream)
apr_table_unset(r->headers_out, "Set-Cookie");
r->headers_out = apr_table_overlay(p, r->headers_out, stream->saves);
}
+
+ if ((buf = apr_table_get(r->headers_out, "Content-Type"))) {
+ ap_set_content_type(r, apr_pstrdup(p, buf));
+ }
/* handle Via header in response */
if (session->conf->viaopt != via_off
@@ -374,6 +499,7 @@ static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream)
server_name, portstr)
);
}
+ if (r->status >= 200) stream->headers_ended = 1;
if (APLOGrtrace2(stream->r)) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
@@ -407,34 +533,27 @@ static int stream_response_data(nghttp2_session *ngh2, uint8_t flags,
h2_proxy_stream_end_headers_out(stream);
}
stream->data_received += len;
-
- b = apr_bucket_transient_create((const char*)data, len,
- stream->r->connection->bucket_alloc);
+ b = apr_bucket_transient_create((const char*)data, len,
+ stream->cfront->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(stream->output, b);
/* always flush after a DATA frame, as we have no other indication
* of buffer use */
- b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
+ b = apr_bucket_flush_create(stream->cfront->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(stream->output, b);
-
+
status = ap_pass_brigade(stream->r->output_filters, stream->output);
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(03359)
"h2_proxy_session(%s): stream=%d, response DATA %ld, %ld"
" total", session->id, stream_id, (long)len,
(long)stream->data_received);
if (status != APR_SUCCESS) {
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03344)
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(03344)
"h2_proxy_session(%s): passing output on stream %d",
session->id, stream->id);
nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
stream_id, NGHTTP2_STREAM_CLOSED);
return NGHTTP2_ERR_STREAM_CLOSING;
}
- if (stream->standalone) {
- nghttp2_session_consume(ngh2, stream_id, len);
- ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r,
- "h2_proxy_session(%s): stream %d, win_update %d bytes",
- session->id, stream_id, (int)len);
- }
return 0;
}
@@ -493,12 +612,12 @@ static ssize_t stream_request_data(nghttp2_session *ngh2, int32_t stream_id,
stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
if (!stream) {
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03361)
- "h2_proxy_stream(%s): data_read, stream %d not found",
- stream->session->id, stream_id);
+ "h2_proxy_stream(NULL): data_read, stream %d not found",
+ stream_id);
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
- if (stream->session->check_ping) {
+ if (stream->session->ping_state != H2_PING_ST_NONE) {
/* suspend until we hear from the other side */
stream->waiting_on_ping = 1;
status = APR_EAGAIN;
@@ -518,7 +637,7 @@ static ssize_t stream_request_data(nghttp2_session *ngh2, int32_t stream_id,
}
if (status == APR_SUCCESS) {
- ssize_t readlen = 0;
+ size_t readlen = 0;
while (status == APR_SUCCESS
&& (readlen < length)
&& !APR_BRIGADE_EMPTY(stream->input)) {
@@ -537,7 +656,7 @@ static ssize_t stream_request_data(nghttp2_session *ngh2, int32_t stream_id,
status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
if (status == APR_SUCCESS && blen > 0) {
- ssize_t copylen = H2MIN(length - readlen, blen);
+ size_t copylen = H2MIN(length - readlen, blen);
memcpy(buf, bdata, copylen);
buf += copylen;
readlen += copylen;
@@ -553,9 +672,14 @@ static ssize_t stream_request_data(nghttp2_session *ngh2, int32_t stream_id,
stream->data_sent += readlen;
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(03468)
"h2_proxy_stream(%d): request DATA %ld, %ld"
- " total, flags=%d",
- stream->id, (long)readlen, (long)stream->data_sent,
+ " total, flags=%d", stream->id, (long)readlen, (long)stream->data_sent,
(int)*data_flags);
+ if ((*data_flags & NGHTTP2_DATA_FLAG_EOF) && !apr_is_empty_table(stream->r->trailers_in)) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(10179)
+ "h2_proxy_stream(%d): submit trailers", stream->id);
+ *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
+ submit_trailers(stream);
+ }
return readlen;
}
else if (APR_STATUS_IS_EAGAIN(status)) {
@@ -575,7 +699,7 @@ static ssize_t stream_request_data(nghttp2_session *ngh2, int32_t stream_id,
}
#ifdef H2_NG2_INVALID_HEADER_CB
-static int on_invalid_header_cb(nghttp2_session *ngh2,
+static int on_invalid_header_cb(nghttp2_session *ngh2,
const nghttp2_frame *frame,
const uint8_t *name, size_t namelen,
const uint8_t *value, size_t valuelen,
@@ -638,26 +762,22 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
#ifdef H2_NG2_INVALID_HEADER_CB
nghttp2_session_callbacks_set_on_invalid_header_callback(cbs, on_invalid_header_cb);
#endif
-
nghttp2_option_new(&option);
nghttp2_option_set_peer_max_concurrent_streams(option, 100);
- nghttp2_option_set_no_auto_window_update(option, 1);
+ nghttp2_option_set_no_auto_window_update(option, 0);
nghttp2_session_client_new2(&session->ngh2, cbs, session, option);
nghttp2_option_del(option);
nghttp2_session_callbacks_del(cbs);
+ ping_new_session(session, p_conn);
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03362)
"setup session for %s", p_conn->hostname);
}
else {
h2_proxy_session *session = p_conn->data;
- apr_interval_time_t age = apr_time_now() - session->last_frame_received;
- if (age > apr_time_from_sec(1)) {
- session->check_ping = 1;
- nghttp2_submit_ping(session->ngh2, 0, (const uint8_t *)"nevergonnagiveyouup");
- }
+ ping_reuse_session(session);
}
return p_conn->data;
}
@@ -698,7 +818,7 @@ static apr_status_t open_stream(h2_proxy_session *session, const char *url,
{
h2_proxy_stream *stream;
apr_uri_t puri;
- const char *authority, *scheme, *path;
+ const char *authority, *scheme, *path, *orig_host;
apr_status_t status;
proxy_dir_conf *dconf;
@@ -707,24 +827,29 @@ static apr_status_t open_stream(h2_proxy_session *session, const char *url,
stream->pool = r->pool;
stream->url = url;
stream->r = r;
+ stream->cfront = r->connection;
stream->standalone = standalone;
stream->session = session;
stream->state = H2_STREAM_ST_IDLE;
- stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc);
- stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc);
+ stream->input = apr_brigade_create(stream->pool, stream->cfront->bucket_alloc);
+ stream->output = apr_brigade_create(stream->pool, stream->cfront->bucket_alloc);
- stream->req = h2_proxy_req_create(1, stream->pool, 0);
+ stream->req = h2_proxy_req_create(1, stream->pool);
status = apr_uri_parse(stream->pool, url, &puri);
if (status != APR_SUCCESS)
return status;
scheme = (strcmp(puri.scheme, "h2")? "http" : "https");
-
+ orig_host = apr_table_get(r->headers_in, "Host");
+ if (orig_host == NULL) {
+ orig_host = r->hostname;
+ }
+
dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
if (dconf->preserve_host) {
- authority = r->hostname;
+ authority = orig_host;
}
else {
authority = puri.hostname;
@@ -733,20 +858,27 @@ static apr_status_t open_stream(h2_proxy_session *session, const char *url,
/* port info missing and port is not default for scheme: append */
authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port);
}
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->cfront,
+ "authority=%s from uri.hostname=%s and uri.port=%d",
+ authority, puri.hostname, puri.port);
}
-
+ /* See #235, we use only :authority when available and remove Host:
+ * since differing values are not acceptable, see RFC 9113 ch. 8.3.1 */
+ if (authority && strlen(authority)) {
+ apr_table_unset(r->headers_in, "Host");
+ }
+
/* we need this for mapping relative uris in headers ("Link") back
* to local uris */
stream->real_server_uri = apr_psprintf(stream->pool, "%s://%s", scheme, authority);
stream->p_server_uri = apr_psprintf(stream->pool, "%s://%s", puri.scheme, authority);
path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART);
+
h2_proxy_req_make(stream->req, stream->pool, r->method, scheme,
authority, path, r->headers_in);
if (dconf->add_forwarded_headers) {
if (PROXYREQ_REVERSE == r->proxyreq) {
- const char *buf;
-
/* Add X-Forwarded-For: so that the upstream has a chance to
* determine, where the original request came from.
*/
@@ -756,8 +888,9 @@ static apr_status_t open_stream(h2_proxy_session *session, const char *url,
/* Add X-Forwarded-Host: so that upstream knows what the
* original request hostname was.
*/
- if ((buf = apr_table_get(r->headers_in, "Host"))) {
- apr_table_mergen(stream->req->headers, "X-Forwarded-Host", buf);
+ if (orig_host) {
+ apr_table_mergen(stream->req->headers, "X-Forwarded-Host",
+ orig_host);
}
/* Add X-Forwarded-Server: so that upstream knows what the
@@ -768,7 +901,7 @@ static apr_status_t open_stream(h2_proxy_session *session, const char *url,
r->server->server_hostname);
}
}
-
+
/* Tuck away all already existing cookies */
stream->saves = apr_table_make(r->pool, 2);
apr_table_do(add_header, stream->saves, r->headers_out, "Set-Cookie", NULL);
@@ -811,7 +944,7 @@ static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *st
rv = nghttp2_submit_request(session->ngh2, NULL,
hd->nv, hd->nvlen, pp, stream);
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03363)
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->cfront, APLOGNO(03363)
"h2_proxy_session(%s): submit %s%s -> %d",
session->id, stream->req->authority, stream->req->path,
rv);
@@ -826,12 +959,22 @@ static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *st
return APR_EGENERAL;
}
+static apr_status_t submit_trailers(h2_proxy_stream *stream)
+{
+ h2_proxy_ngheader *hd;
+ int rv;
+
+ hd = h2_proxy_util_nghd_make(stream->pool, stream->r->trailers_in);
+ rv = nghttp2_submit_trailer(stream->session->ngh2, stream->id, hd->nv, hd->nvlen);
+ return rv == 0? APR_SUCCESS: APR_EGENERAL;
+}
+
static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb)
{
apr_status_t status = APR_SUCCESS;
apr_size_t readlen = 0;
ssize_t n;
-
+
while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
apr_bucket* b = APR_BRIGADE_FIRST(bb);
@@ -854,9 +997,10 @@ static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *
}
}
else {
- readlen += n;
- if (n < blen) {
- apr_bucket_split(b, n);
+ size_t rlen = (size_t)n;
+ readlen += rlen;
+ if (rlen < blen) {
+ apr_bucket_split(b, rlen);
}
}
}
@@ -882,7 +1026,7 @@ static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block,
apr_socket_t *socket = NULL;
apr_time_t save_timeout = -1;
- if (block) {
+ if (block && timeout > 0) {
socket = ap_get_conn_socket(session->c);
if (socket) {
apr_socket_timeout_get(socket, &save_timeout);
@@ -945,7 +1089,7 @@ apr_status_t h2_proxy_session_submit(h2_proxy_session *session,
static void stream_resume(h2_proxy_stream *stream)
{
h2_proxy_session *session = stream->session;
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->cfront,
"h2_proxy_stream(%s-%d): resuming",
session->id, stream->id);
stream->suspended = 0;
@@ -954,6 +1098,14 @@ static void stream_resume(h2_proxy_stream *stream)
dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
}
+static int is_waiting_for_backend(h2_proxy_session *session)
+{
+ return ((session->ping_state != H2_PING_ST_NONE)
+ || ((session->suspended->nelts <= 0)
+ && !nghttp2_session_want_write(session->ngh2)
+ && nghttp2_session_want_read(session->ngh2)));
+}
+
static apr_status_t check_suspended(h2_proxy_session *session)
{
h2_proxy_stream *stream;
@@ -978,7 +1130,7 @@ static apr_status_t check_suspended(h2_proxy_session *session)
return APR_SUCCESS;
}
else if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) {
- ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, session->c,
+ ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, stream->cfront,
APLOGNO(03382) "h2_proxy_stream(%s-%d): check input",
session->id, stream_id);
stream_resume(stream);
@@ -1006,7 +1158,7 @@ static apr_status_t session_shutdown(h2_proxy_session *session, int reason,
if (!err && reason) {
err = nghttp2_strerror(reason);
}
- nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0,
+ nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0,
reason, (uint8_t*)err, err? strlen(err):0);
status = nghttp2_session_send(session->ngh2);
dispatch_event(session, H2_PROXYS_EV_LOCAL_GOAWAY, reason, err);
@@ -1208,39 +1360,56 @@ static void ev_stream_done(h2_proxy_session *session, int stream_id,
const char *msg)
{
h2_proxy_stream *stream;
-
+ apr_bucket *b;
+
stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
if (stream) {
- int touched = (stream->data_sent ||
- stream_id <= session->last_stream_id);
+ /* if the stream's connection is aborted, do not send anything
+ * more on it. */
apr_status_t status = (stream->error_code == 0)? APR_SUCCESS : APR_EINVAL;
- ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03364)
- "h2_proxy_sesssion(%s): stream(%d) closed "
- "(touched=%d, error=%d)",
- session->id, stream_id, touched, stream->error_code);
-
- if (status != APR_SUCCESS) {
- stream->r->status = 500;
- }
- else if (!stream->data_received) {
- apr_bucket *b;
- /* if the response had no body, this is the time to flush
- * an empty brigade which will also write the resonse
- * headers */
- h2_proxy_stream_end_headers_out(stream);
- stream->data_received = 1;
- b = apr_bucket_flush_create(stream->r->connection->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(stream->output, b);
- b = apr_bucket_eos_create(stream->r->connection->bucket_alloc);
- APR_BRIGADE_INSERT_TAIL(stream->output, b);
- ap_pass_brigade(stream->r->output_filters, stream->output);
+ int touched = (stream->data_sent || stream->data_received ||
+ stream_id <= session->last_stream_id);
+ if (!stream->cfront->aborted) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->cfront, APLOGNO(03364)
+ "h2_proxy_sesssion(%s): stream(%d) closed "
+ "(touched=%d, error=%d)",
+ session->id, stream_id, touched, stream->error_code);
+
+ if (status != APR_SUCCESS) {
+ /* stream failed. If we have received (and forwarded) response
+ * data already, we need to append an error buckt to inform
+ * consumers.
+ * Otherwise, we have an early fail on the connection and may
+ * retry this request on a new one. In that case, keep the
+ * output virgin so that a new attempt can be made. */
+ if (stream->data_received) {
+ int http_status = ap_map_http_request_error(status, HTTP_BAD_REQUEST);
+ b = ap_bucket_error_create(http_status, NULL, stream->r->pool,
+ stream->cfront->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(stream->output, b);
+ b = apr_bucket_eos_create(stream->cfront->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(stream->output, b);
+ ap_pass_brigade(stream->r->output_filters, stream->output);
+ }
+ }
+ else if (!stream->data_received) {
+ /* if the response had no body, this is the time to flush
+ * an empty brigade which will also write the response headers */
+ h2_proxy_stream_end_headers_out(stream);
+ stream->data_received = 1;
+ b = apr_bucket_flush_create(stream->cfront->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(stream->output, b);
+ b = apr_bucket_eos_create(stream->cfront->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(stream->output, b);
+ ap_pass_brigade(stream->r->output_filters, stream->output);
+ }
}
-
+
stream->state = H2_STREAM_ST_CLOSED;
h2_proxy_ihash_remove(session->streams, stream_id);
h2_proxy_iq_remove(session->suspended, stream_id);
if (session->done) {
- session->done(session, stream->r, status, touched);
+ session->done(session, stream->r, status, touched, stream->error_code);
}
}
@@ -1408,7 +1577,22 @@ run_loop:
break;
case H2_PROXYS_ST_WAIT:
- if (check_suspended(session) == APR_EAGAIN) {
+ if (is_waiting_for_backend(session)) {
+ /* we can do a blocking read with the default timeout (as
+ * configured via ProxyTimeout in our socket. There is
+ * nothing we want to send or check until we get more data
+ * from the backend. */
+ status = h2_proxy_session_read(session, 1, 0);
+ if (status == APR_SUCCESS) {
+ have_read = 1;
+ dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
+ }
+ else {
+ dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
+ return status;
+ }
+ }
+ else if (check_suspended(session) == APR_EAGAIN) {
/* no stream has become resumed. Do a blocking read with
* ever increasing timeouts... */
if (session->wait_timeout < 25) {
@@ -1423,7 +1607,7 @@ run_loop:
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c,
APLOGNO(03365)
"h2_proxy_session(%s): WAIT read, timeout=%fms",
- session->id, (float)session->wait_timeout/1000.0);
+ session->id, session->wait_timeout/1000.0);
if (status == APR_SUCCESS) {
have_read = 1;
dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
@@ -1495,9 +1679,19 @@ static int done_iter(void *udata, void *val)
{
cleanup_iter_ctx *ctx = udata;
h2_proxy_stream *stream = val;
- int touched = (stream->data_sent ||
+ int touched = (stream->data_sent || stream->data_received ||
stream->id <= ctx->session->last_stream_id);
- ctx->done(ctx->session, stream->r, APR_ECONNABORTED, touched);
+ if (touched && stream->output) {
+ apr_bucket *b = ap_bucket_error_create(HTTP_BAD_GATEWAY, NULL,
+ stream->r->pool,
+ stream->cfront->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(stream->output, b);
+ b = apr_bucket_eos_create(stream->cfront->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(stream->output, b);
+ ap_pass_brigade(stream->r->output_filters, stream->output);
+ }
+ ctx->done(ctx->session, stream->r, APR_ECONNABORTED, touched,
+ stream->error_code);
return 1;
}
@@ -1516,6 +1710,12 @@ void h2_proxy_session_cleanup(h2_proxy_session *session,
}
}
+int h2_proxy_session_is_reusable(h2_proxy_session *session)
+{
+ return (session->state != H2_PROXYS_ST_DONE) &&
+ h2_proxy_ihash_empty(session->streams);
+}
+
static int ping_arrived_iter(void *udata, void *val)
{
h2_proxy_stream *stream = val;
@@ -1543,42 +1743,3 @@ typedef struct {
int updated;
} win_update_ctx;
-static int win_update_iter(void *udata, void *val)
-{
- win_update_ctx *ctx = udata;
- h2_proxy_stream *stream = val;
-
- if (stream->r && stream->r->connection == ctx->c) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, ctx->session->c,
- "h2_proxy_session(%s-%d): win_update %ld bytes",
- ctx->session->id, (int)stream->id, (long)ctx->bytes);
- nghttp2_session_consume(ctx->session->ngh2, stream->id, ctx->bytes);
- ctx->updated = 1;
- return 0;
- }
- return 1;
-}
-
-
-void h2_proxy_session_update_window(h2_proxy_session *session,
- conn_rec *c, apr_off_t bytes)
-{
- if (!h2_proxy_ihash_empty(session->streams)) {
- win_update_ctx ctx;
- ctx.session = session;
- ctx.c = c;
- ctx.bytes = bytes;
- ctx.updated = 0;
- h2_proxy_ihash_iter(session->streams, win_update_iter, &ctx);
-
- if (!ctx.updated) {
- /* could not find the stream any more, possibly closed, update
- * the connection window at least */
- ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c,
- "h2_proxy_session(%s): win_update conn %ld bytes",
- session->id, (long)bytes);
- nghttp2_session_consume_connection(session->ngh2, (size_t)bytes);
- }
- }
-}
-