diff options
Diffstat (limited to 'modules/http2/h2_proxy_session.c')
-rw-r--r-- | modules/http2/h2_proxy_session.c | 423 |
1 files changed, 292 insertions, 131 deletions
diff --git a/modules/http2/h2_proxy_session.c b/modules/http2/h2_proxy_session.c index 8389c7c..db22301 100644 --- a/modules/http2/h2_proxy_session.c +++ b/modules/http2/h2_proxy_session.c @@ -20,6 +20,7 @@ #include <mpm_common.h> #include <httpd.h> +#include <http_protocol.h> #include <mod_proxy.h> #include "mod_http2.h" @@ -36,6 +37,7 @@ typedef struct h2_proxy_stream { const char *url; request_rec *r; + conn_rec *cfront; h2_proxy_request *req; const char *real_server_uri; const char *p_server_uri; @@ -45,6 +47,7 @@ typedef struct h2_proxy_stream { unsigned int suspended : 1; unsigned int waiting_on_100 : 1; unsigned int waiting_on_ping : 1; + unsigned int headers_ended : 1; uint32_t error_code; apr_bucket_brigade *input; @@ -61,7 +64,123 @@ static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, static void ping_arrived(h2_proxy_session *session); static apr_status_t check_suspended(h2_proxy_session *session); static void stream_resume(h2_proxy_stream *stream); +static apr_status_t submit_trailers(h2_proxy_stream *stream); + +/* + * The H2_PING connection sub-state: a state independant of the H2_SESSION state + * of the connection: + * - H2_PING_ST_NONE: no interference with request handling, ProxyTimeout in effect. + * When entered, all suspended streams are unsuspended again. + * - H2_PING_ST_AWAIT_ANY: new requests are suspended, a possibly configured "ping" + * timeout is in effect. Any frame received transits to H2_PING_ST_NONE. + * - H2_PING_ST_AWAIT_PING: same as above, but only a PING frame transits + * to H2_PING_ST_NONE. + * + * An AWAIT state is entered on a new connection or when re-using a connection and + * the last frame received has been some time ago. The latter sends a PING frame + * and insists on an answer, the former is satisfied by any frame received from the + * backend. + * + * This works for new connections as there is always at least one SETTINGS frame + * that the backend sends. When re-using connection, we send a PING and insist on + * receiving one back, as there might be frames in our connection buffers from + * some time ago. Since some servers have protections against PING flooding, we + * only ever have one PING unanswered. + * + * Requests are suspended while in a PING state, as we do not want to send data + * before we can be reasonably sure that the connection is working (at least on + * the h2 protocol level). This also means that the session can do blocking reads + * when expecting PING answers. + */ +static void set_ping_timeout(h2_proxy_session *session) +{ + if (session->ping_timeout != -1 && session->save_timeout == -1) { + apr_socket_t *socket = NULL; + socket = ap_get_conn_socket(session->c); + if (socket) { + apr_socket_timeout_get(socket, &session->save_timeout); + apr_socket_timeout_set(socket, session->ping_timeout); + } + } +} + +static void unset_ping_timeout(h2_proxy_session *session) +{ + if (session->save_timeout != -1) { + apr_socket_t *socket = NULL; + + socket = ap_get_conn_socket(session->c); + if (socket) { + apr_socket_timeout_set(socket, session->save_timeout); + session->save_timeout = -1; + } + } +} + +static void enter_ping_state(h2_proxy_session *session, h2_ping_state_t state) +{ + if (session->ping_state == state) return; + switch (session->ping_state) { + case H2_PING_ST_NONE: + /* leaving NONE, enforce timeout, send frame maybe */ + if (H2_PING_ST_AWAIT_PING == state) { + unset_ping_timeout(session); + nghttp2_submit_ping(session->ngh2, 0, (const uint8_t *)"nevergonnagiveyouup"); + } + set_ping_timeout(session); + session->ping_state = state; + break; + default: + /* no switching between the != NONE states */ + if (H2_PING_ST_NONE == state) { + session->ping_state = state; + unset_ping_timeout(session); + ping_arrived(session); + } + break; + } +} + +static void ping_new_session(h2_proxy_session *session, proxy_conn_rec *p_conn) +{ + session->save_timeout = -1; + session->ping_timeout = (p_conn->worker->s->ping_timeout_set? + p_conn->worker->s->ping_timeout : -1); + session->ping_state = H2_PING_ST_NONE; + enter_ping_state(session, H2_PING_ST_AWAIT_ANY); +} + +static void ping_reuse_session(h2_proxy_session *session) +{ + if (H2_PING_ST_NONE == session->ping_state) { + apr_interval_time_t age = apr_time_now() - session->last_frame_received; + if (age > apr_time_from_sec(1)) { + enter_ping_state(session, H2_PING_ST_AWAIT_PING); + } + } +} + +static void ping_ev_frame_received(h2_proxy_session *session, const nghttp2_frame *frame) +{ + session->last_frame_received = apr_time_now(); + switch (session->ping_state) { + case H2_PING_ST_NONE: + /* nop */ + break; + case H2_PING_ST_AWAIT_ANY: + enter_ping_state(session, H2_PING_ST_NONE); + break; + case H2_PING_ST_AWAIT_PING: + if (NGHTTP2_PING == frame->hd.type) { + enter_ping_state(session, H2_PING_ST_NONE); + } + /* we may receive many other frames while we are waiting for the + * PING answer. They may come all from our connection buffers and + * say nothing about the current state of the backend. */ + break; + } +} static apr_status_t proxy_session_pre_close(void *theconn) { @@ -152,7 +271,8 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame, session->id, buffer); } - session->last_frame_received = apr_time_now(); + ping_ev_frame_received(session, frame); + /* Action for frame types: */ switch (frame->hd.type) { case NGHTTP2_HEADERS: stream = nghttp2_session_get_stream_user_data(ngh2, frame->hd.stream_id); @@ -193,10 +313,6 @@ static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame, stream_resume(stream); break; case NGHTTP2_PING: - if (session->check_ping) { - session->check_ping = 0; - ping_arrived(session); - } break; case NGHTTP2_PUSH_PROMISE: break; @@ -241,7 +357,8 @@ static int add_header(void *table, const char *n, const char *v) return 1; } -static void process_proxy_header(h2_proxy_stream *stream, const char *n, const char *v) +static void process_proxy_header(apr_table_t *headers, h2_proxy_stream *stream, + const char *n, const char *v) { static const struct { const char *name; @@ -262,20 +379,18 @@ static void process_proxy_header(h2_proxy_stream *stream, const char *n, const c if (!dconf->preserve_host) { for (i = 0; transform_hdrs[i].name; ++i) { if (!ap_cstr_casecmp(transform_hdrs[i].name, n)) { - apr_table_add(r->headers_out, n, - (*transform_hdrs[i].func)(r, dconf, v)); + apr_table_add(headers, n, (*transform_hdrs[i].func)(r, dconf, v)); return; } } if (!ap_cstr_casecmp("Link", n)) { dconf = ap_get_module_config(r->per_dir_config, &proxy_module); - apr_table_add(r->headers_out, n, - h2_proxy_link_reverse_map(r, dconf, - stream->real_server_uri, stream->p_server_uri, v)); + apr_table_add(headers, n, h2_proxy_link_reverse_map(r, dconf, + stream->real_server_uri, stream->p_server_uri, v)); return; } } - apr_table_add(r->headers_out, n, v); + apr_table_add(headers, n, v); } static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream, @@ -287,7 +402,7 @@ static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream, char *s = apr_pstrndup(stream->r->pool, v, vlen); apr_table_setn(stream->r->notes, "proxy-status", s); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->cfront, "h2_proxy_stream(%s-%d): got status %s", stream->session->id, stream->id, s); stream->r->status = (int)apr_atoi64(s); @@ -299,17 +414,22 @@ static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream, return APR_SUCCESS; } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->cfront, + "h2_proxy_stream(%s-%d): on_header %s: %s", + stream->session->id, stream->id, n, v); if (!h2_proxy_res_ignore_header(n, nlen)) { char *hname, *hvalue; + apr_table_t *headers = (stream->headers_ended? + stream->r->trailers_out : stream->r->headers_out); hname = apr_pstrndup(stream->pool, n, nlen); h2_proxy_util_camel_case_header(hname, nlen); hvalue = apr_pstrndup(stream->pool, v, vlen); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->cfront, "h2_proxy_stream(%s-%d): got header %s: %s", stream->session->id, stream->id, hname, hvalue); - process_proxy_header(stream, hname, hvalue); + process_proxy_header(headers, stream, hname, hvalue); } return APR_SUCCESS; } @@ -328,6 +448,7 @@ static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream) h2_proxy_session *session = stream->session; request_rec *r = stream->r; apr_pool_t *p = r->pool; + const char *buf; /* Now, add in the cookies from the response to the ones already saved */ apr_table_do(add_header, stream->saves, r->headers_out, "Set-Cookie", NULL); @@ -337,6 +458,10 @@ static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream) apr_table_unset(r->headers_out, "Set-Cookie"); r->headers_out = apr_table_overlay(p, r->headers_out, stream->saves); } + + if ((buf = apr_table_get(r->headers_out, "Content-Type"))) { + ap_set_content_type(r, apr_pstrdup(p, buf)); + } /* handle Via header in response */ if (session->conf->viaopt != via_off @@ -374,6 +499,7 @@ static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream) server_name, portstr) ); } + if (r->status >= 200) stream->headers_ended = 1; if (APLOGrtrace2(stream->r)) { ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, @@ -407,34 +533,27 @@ static int stream_response_data(nghttp2_session *ngh2, uint8_t flags, h2_proxy_stream_end_headers_out(stream); } stream->data_received += len; - - b = apr_bucket_transient_create((const char*)data, len, - stream->r->connection->bucket_alloc); + b = apr_bucket_transient_create((const char*)data, len, + stream->cfront->bucket_alloc); APR_BRIGADE_INSERT_TAIL(stream->output, b); /* always flush after a DATA frame, as we have no other indication * of buffer use */ - b = apr_bucket_flush_create(stream->r->connection->bucket_alloc); + b = apr_bucket_flush_create(stream->cfront->bucket_alloc); APR_BRIGADE_INSERT_TAIL(stream->output, b); - + status = ap_pass_brigade(stream->r->output_filters, stream->output); ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(03359) "h2_proxy_session(%s): stream=%d, response DATA %ld, %ld" " total", session->id, stream_id, (long)len, (long)stream->data_received); if (status != APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03344) + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(03344) "h2_proxy_session(%s): passing output on stream %d", session->id, stream->id); nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, stream_id, NGHTTP2_STREAM_CLOSED); return NGHTTP2_ERR_STREAM_CLOSING; } - if (stream->standalone) { - nghttp2_session_consume(ngh2, stream_id, len); - ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, - "h2_proxy_session(%s): stream %d, win_update %d bytes", - session->id, stream_id, (int)len); - } return 0; } @@ -493,12 +612,12 @@ static ssize_t stream_request_data(nghttp2_session *ngh2, int32_t stream_id, stream = nghttp2_session_get_stream_user_data(ngh2, stream_id); if (!stream) { ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03361) - "h2_proxy_stream(%s): data_read, stream %d not found", - stream->session->id, stream_id); + "h2_proxy_stream(NULL): data_read, stream %d not found", + stream_id); return NGHTTP2_ERR_CALLBACK_FAILURE; } - if (stream->session->check_ping) { + if (stream->session->ping_state != H2_PING_ST_NONE) { /* suspend until we hear from the other side */ stream->waiting_on_ping = 1; status = APR_EAGAIN; @@ -518,7 +637,7 @@ static ssize_t stream_request_data(nghttp2_session *ngh2, int32_t stream_id, } if (status == APR_SUCCESS) { - ssize_t readlen = 0; + size_t readlen = 0; while (status == APR_SUCCESS && (readlen < length) && !APR_BRIGADE_EMPTY(stream->input)) { @@ -537,7 +656,7 @@ static ssize_t stream_request_data(nghttp2_session *ngh2, int32_t stream_id, status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ); if (status == APR_SUCCESS && blen > 0) { - ssize_t copylen = H2MIN(length - readlen, blen); + size_t copylen = H2MIN(length - readlen, blen); memcpy(buf, bdata, copylen); buf += copylen; readlen += copylen; @@ -553,9 +672,14 @@ static ssize_t stream_request_data(nghttp2_session *ngh2, int32_t stream_id, stream->data_sent += readlen; ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(03468) "h2_proxy_stream(%d): request DATA %ld, %ld" - " total, flags=%d", - stream->id, (long)readlen, (long)stream->data_sent, + " total, flags=%d", stream->id, (long)readlen, (long)stream->data_sent, (int)*data_flags); + if ((*data_flags & NGHTTP2_DATA_FLAG_EOF) && !apr_is_empty_table(stream->r->trailers_in)) { + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(10179) + "h2_proxy_stream(%d): submit trailers", stream->id); + *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM; + submit_trailers(stream); + } return readlen; } else if (APR_STATUS_IS_EAGAIN(status)) { @@ -575,7 +699,7 @@ static ssize_t stream_request_data(nghttp2_session *ngh2, int32_t stream_id, } #ifdef H2_NG2_INVALID_HEADER_CB -static int on_invalid_header_cb(nghttp2_session *ngh2, +static int on_invalid_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame, const uint8_t *name, size_t namelen, const uint8_t *value, size_t valuelen, @@ -638,26 +762,22 @@ h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn, #ifdef H2_NG2_INVALID_HEADER_CB nghttp2_session_callbacks_set_on_invalid_header_callback(cbs, on_invalid_header_cb); #endif - nghttp2_option_new(&option); nghttp2_option_set_peer_max_concurrent_streams(option, 100); - nghttp2_option_set_no_auto_window_update(option, 1); + nghttp2_option_set_no_auto_window_update(option, 0); nghttp2_session_client_new2(&session->ngh2, cbs, session, option); nghttp2_option_del(option); nghttp2_session_callbacks_del(cbs); + ping_new_session(session, p_conn); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03362) "setup session for %s", p_conn->hostname); } else { h2_proxy_session *session = p_conn->data; - apr_interval_time_t age = apr_time_now() - session->last_frame_received; - if (age > apr_time_from_sec(1)) { - session->check_ping = 1; - nghttp2_submit_ping(session->ngh2, 0, (const uint8_t *)"nevergonnagiveyouup"); - } + ping_reuse_session(session); } return p_conn->data; } @@ -698,7 +818,7 @@ static apr_status_t open_stream(h2_proxy_session *session, const char *url, { h2_proxy_stream *stream; apr_uri_t puri; - const char *authority, *scheme, *path; + const char *authority, *scheme, *path, *orig_host; apr_status_t status; proxy_dir_conf *dconf; @@ -707,24 +827,29 @@ static apr_status_t open_stream(h2_proxy_session *session, const char *url, stream->pool = r->pool; stream->url = url; stream->r = r; + stream->cfront = r->connection; stream->standalone = standalone; stream->session = session; stream->state = H2_STREAM_ST_IDLE; - stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc); - stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc); + stream->input = apr_brigade_create(stream->pool, stream->cfront->bucket_alloc); + stream->output = apr_brigade_create(stream->pool, stream->cfront->bucket_alloc); - stream->req = h2_proxy_req_create(1, stream->pool, 0); + stream->req = h2_proxy_req_create(1, stream->pool); status = apr_uri_parse(stream->pool, url, &puri); if (status != APR_SUCCESS) return status; scheme = (strcmp(puri.scheme, "h2")? "http" : "https"); - + orig_host = apr_table_get(r->headers_in, "Host"); + if (orig_host == NULL) { + orig_host = r->hostname; + } + dconf = ap_get_module_config(r->per_dir_config, &proxy_module); if (dconf->preserve_host) { - authority = r->hostname; + authority = orig_host; } else { authority = puri.hostname; @@ -733,20 +858,27 @@ static apr_status_t open_stream(h2_proxy_session *session, const char *url, /* port info missing and port is not default for scheme: append */ authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port); } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->cfront, + "authority=%s from uri.hostname=%s and uri.port=%d", + authority, puri.hostname, puri.port); } - + /* See #235, we use only :authority when available and remove Host: + * since differing values are not acceptable, see RFC 9113 ch. 8.3.1 */ + if (authority && strlen(authority)) { + apr_table_unset(r->headers_in, "Host"); + } + /* we need this for mapping relative uris in headers ("Link") back * to local uris */ stream->real_server_uri = apr_psprintf(stream->pool, "%s://%s", scheme, authority); stream->p_server_uri = apr_psprintf(stream->pool, "%s://%s", puri.scheme, authority); path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART); + h2_proxy_req_make(stream->req, stream->pool, r->method, scheme, authority, path, r->headers_in); if (dconf->add_forwarded_headers) { if (PROXYREQ_REVERSE == r->proxyreq) { - const char *buf; - /* Add X-Forwarded-For: so that the upstream has a chance to * determine, where the original request came from. */ @@ -756,8 +888,9 @@ static apr_status_t open_stream(h2_proxy_session *session, const char *url, /* Add X-Forwarded-Host: so that upstream knows what the * original request hostname was. */ - if ((buf = apr_table_get(r->headers_in, "Host"))) { - apr_table_mergen(stream->req->headers, "X-Forwarded-Host", buf); + if (orig_host) { + apr_table_mergen(stream->req->headers, "X-Forwarded-Host", + orig_host); } /* Add X-Forwarded-Server: so that upstream knows what the @@ -768,7 +901,7 @@ static apr_status_t open_stream(h2_proxy_session *session, const char *url, r->server->server_hostname); } } - + /* Tuck away all already existing cookies */ stream->saves = apr_table_make(r->pool, 2); apr_table_do(add_header, stream->saves, r->headers_out, "Set-Cookie", NULL); @@ -811,7 +944,7 @@ static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *st rv = nghttp2_submit_request(session->ngh2, NULL, hd->nv, hd->nvlen, pp, stream); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03363) + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->cfront, APLOGNO(03363) "h2_proxy_session(%s): submit %s%s -> %d", session->id, stream->req->authority, stream->req->path, rv); @@ -826,12 +959,22 @@ static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *st return APR_EGENERAL; } +static apr_status_t submit_trailers(h2_proxy_stream *stream) +{ + h2_proxy_ngheader *hd; + int rv; + + hd = h2_proxy_util_nghd_make(stream->pool, stream->r->trailers_in); + rv = nghttp2_submit_trailer(stream->session->ngh2, stream->id, hd->nv, hd->nvlen); + return rv == 0? APR_SUCCESS: APR_EGENERAL; +} + static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb) { apr_status_t status = APR_SUCCESS; apr_size_t readlen = 0; ssize_t n; - + while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) { apr_bucket* b = APR_BRIGADE_FIRST(bb); @@ -854,9 +997,10 @@ static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade * } } else { - readlen += n; - if (n < blen) { - apr_bucket_split(b, n); + size_t rlen = (size_t)n; + readlen += rlen; + if (rlen < blen) { + apr_bucket_split(b, rlen); } } } @@ -882,7 +1026,7 @@ static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block, apr_socket_t *socket = NULL; apr_time_t save_timeout = -1; - if (block) { + if (block && timeout > 0) { socket = ap_get_conn_socket(session->c); if (socket) { apr_socket_timeout_get(socket, &save_timeout); @@ -945,7 +1089,7 @@ apr_status_t h2_proxy_session_submit(h2_proxy_session *session, static void stream_resume(h2_proxy_stream *stream) { h2_proxy_session *session = stream->session; - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->cfront, "h2_proxy_stream(%s-%d): resuming", session->id, stream->id); stream->suspended = 0; @@ -954,6 +1098,14 @@ static void stream_resume(h2_proxy_stream *stream) dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL); } +static int is_waiting_for_backend(h2_proxy_session *session) +{ + return ((session->ping_state != H2_PING_ST_NONE) + || ((session->suspended->nelts <= 0) + && !nghttp2_session_want_write(session->ngh2) + && nghttp2_session_want_read(session->ngh2))); +} + static apr_status_t check_suspended(h2_proxy_session *session) { h2_proxy_stream *stream; @@ -978,7 +1130,7 @@ static apr_status_t check_suspended(h2_proxy_session *session) return APR_SUCCESS; } else if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, session->c, + ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, stream->cfront, APLOGNO(03382) "h2_proxy_stream(%s-%d): check input", session->id, stream_id); stream_resume(stream); @@ -1006,7 +1158,7 @@ static apr_status_t session_shutdown(h2_proxy_session *session, int reason, if (!err && reason) { err = nghttp2_strerror(reason); } - nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0, + nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0, reason, (uint8_t*)err, err? strlen(err):0); status = nghttp2_session_send(session->ngh2); dispatch_event(session, H2_PROXYS_EV_LOCAL_GOAWAY, reason, err); @@ -1208,39 +1360,56 @@ static void ev_stream_done(h2_proxy_session *session, int stream_id, const char *msg) { h2_proxy_stream *stream; - + apr_bucket *b; + stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id); if (stream) { - int touched = (stream->data_sent || - stream_id <= session->last_stream_id); + /* if the stream's connection is aborted, do not send anything + * more on it. */ apr_status_t status = (stream->error_code == 0)? APR_SUCCESS : APR_EINVAL; - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03364) - "h2_proxy_sesssion(%s): stream(%d) closed " - "(touched=%d, error=%d)", - session->id, stream_id, touched, stream->error_code); - - if (status != APR_SUCCESS) { - stream->r->status = 500; - } - else if (!stream->data_received) { - apr_bucket *b; - /* if the response had no body, this is the time to flush - * an empty brigade which will also write the resonse - * headers */ - h2_proxy_stream_end_headers_out(stream); - stream->data_received = 1; - b = apr_bucket_flush_create(stream->r->connection->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(stream->output, b); - b = apr_bucket_eos_create(stream->r->connection->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(stream->output, b); - ap_pass_brigade(stream->r->output_filters, stream->output); + int touched = (stream->data_sent || stream->data_received || + stream_id <= session->last_stream_id); + if (!stream->cfront->aborted) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->cfront, APLOGNO(03364) + "h2_proxy_sesssion(%s): stream(%d) closed " + "(touched=%d, error=%d)", + session->id, stream_id, touched, stream->error_code); + + if (status != APR_SUCCESS) { + /* stream failed. If we have received (and forwarded) response + * data already, we need to append an error buckt to inform + * consumers. + * Otherwise, we have an early fail on the connection and may + * retry this request on a new one. In that case, keep the + * output virgin so that a new attempt can be made. */ + if (stream->data_received) { + int http_status = ap_map_http_request_error(status, HTTP_BAD_REQUEST); + b = ap_bucket_error_create(http_status, NULL, stream->r->pool, + stream->cfront->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(stream->output, b); + b = apr_bucket_eos_create(stream->cfront->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(stream->output, b); + ap_pass_brigade(stream->r->output_filters, stream->output); + } + } + else if (!stream->data_received) { + /* if the response had no body, this is the time to flush + * an empty brigade which will also write the response headers */ + h2_proxy_stream_end_headers_out(stream); + stream->data_received = 1; + b = apr_bucket_flush_create(stream->cfront->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(stream->output, b); + b = apr_bucket_eos_create(stream->cfront->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(stream->output, b); + ap_pass_brigade(stream->r->output_filters, stream->output); + } } - + stream->state = H2_STREAM_ST_CLOSED; h2_proxy_ihash_remove(session->streams, stream_id); h2_proxy_iq_remove(session->suspended, stream_id); if (session->done) { - session->done(session, stream->r, status, touched); + session->done(session, stream->r, status, touched, stream->error_code); } } @@ -1408,7 +1577,22 @@ run_loop: break; case H2_PROXYS_ST_WAIT: - if (check_suspended(session) == APR_EAGAIN) { + if (is_waiting_for_backend(session)) { + /* we can do a blocking read with the default timeout (as + * configured via ProxyTimeout in our socket. There is + * nothing we want to send or check until we get more data + * from the backend. */ + status = h2_proxy_session_read(session, 1, 0); + if (status == APR_SUCCESS) { + have_read = 1; + dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL); + } + else { + dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL); + return status; + } + } + else if (check_suspended(session) == APR_EAGAIN) { /* no stream has become resumed. Do a blocking read with * ever increasing timeouts... */ if (session->wait_timeout < 25) { @@ -1423,7 +1607,7 @@ run_loop: ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c, APLOGNO(03365) "h2_proxy_session(%s): WAIT read, timeout=%fms", - session->id, (float)session->wait_timeout/1000.0); + session->id, session->wait_timeout/1000.0); if (status == APR_SUCCESS) { have_read = 1; dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL); @@ -1495,9 +1679,19 @@ static int done_iter(void *udata, void *val) { cleanup_iter_ctx *ctx = udata; h2_proxy_stream *stream = val; - int touched = (stream->data_sent || + int touched = (stream->data_sent || stream->data_received || stream->id <= ctx->session->last_stream_id); - ctx->done(ctx->session, stream->r, APR_ECONNABORTED, touched); + if (touched && stream->output) { + apr_bucket *b = ap_bucket_error_create(HTTP_BAD_GATEWAY, NULL, + stream->r->pool, + stream->cfront->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(stream->output, b); + b = apr_bucket_eos_create(stream->cfront->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(stream->output, b); + ap_pass_brigade(stream->r->output_filters, stream->output); + } + ctx->done(ctx->session, stream->r, APR_ECONNABORTED, touched, + stream->error_code); return 1; } @@ -1516,6 +1710,12 @@ void h2_proxy_session_cleanup(h2_proxy_session *session, } } +int h2_proxy_session_is_reusable(h2_proxy_session *session) +{ + return (session->state != H2_PROXYS_ST_DONE) && + h2_proxy_ihash_empty(session->streams); +} + static int ping_arrived_iter(void *udata, void *val) { h2_proxy_stream *stream = val; @@ -1543,42 +1743,3 @@ typedef struct { int updated; } win_update_ctx; -static int win_update_iter(void *udata, void *val) -{ - win_update_ctx *ctx = udata; - h2_proxy_stream *stream = val; - - if (stream->r && stream->r->connection == ctx->c) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, ctx->session->c, - "h2_proxy_session(%s-%d): win_update %ld bytes", - ctx->session->id, (int)stream->id, (long)ctx->bytes); - nghttp2_session_consume(ctx->session->ngh2, stream->id, ctx->bytes); - ctx->updated = 1; - return 0; - } - return 1; -} - - -void h2_proxy_session_update_window(h2_proxy_session *session, - conn_rec *c, apr_off_t bytes) -{ - if (!h2_proxy_ihash_empty(session->streams)) { - win_update_ctx ctx; - ctx.session = session; - ctx.c = c; - ctx.bytes = bytes; - ctx.updated = 0; - h2_proxy_ihash_iter(session->streams, win_update_iter, &ctx); - - if (!ctx.updated) { - /* could not find the stream any more, possibly closed, update - * the connection window at least */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_proxy_session(%s): win_update conn %ld bytes", - session->id, (long)bytes); - nghttp2_session_consume_connection(session->ngh2, (size_t)bytes); - } - } -} - |