diff options
Diffstat (limited to '')
-rw-r--r-- | modules/http2/h2_stream.c | 136 |
1 files changed, 126 insertions, 10 deletions
diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c index cf6f798..ee87555 100644 --- a/modules/http2/h2_stream.c +++ b/modules/http2/h2_stream.c @@ -125,7 +125,7 @@ static int trans_on_event[][H2_SS_MAX] = { { S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, },/* EV_CLOSED_L*/ { S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, },/* EV_CLOSED_R*/ { S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* EV_CANCELLED*/ -{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, },/* EV_EOS_SENT*/ +{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_NOP, },/* EV_EOS_SENT*/ { S_NOP, S_XXX, S_CLS, S_XXX, S_XXX, S_CLS, S_XXX, S_XXX, },/* EV_IN_ERROR*/ }; @@ -166,6 +166,7 @@ static int on_frame_recv(h2_stream_state_t state, int frame_type) static int on_event(h2_stream* stream, h2_stream_event_t ev) { + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); if (stream->monitor && stream->monitor->on_event) { stream->monitor->on_event(stream->monitor->ctx, stream, ev); } @@ -392,6 +393,7 @@ void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev) { int new_state; + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c1, H2_STRM_MSG(stream, "dispatch event %d"), ev); new_state = on_event(stream, ev); @@ -425,6 +427,7 @@ apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags, size_ apr_status_t status = APR_SUCCESS; int new_state, eos = 0; + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); new_state = on_frame_send(stream->state, ftype); if (new_state < 0) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1, @@ -435,6 +438,12 @@ apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags, size_ ++stream->out_frames; stream->out_frame_octets += frame_len; + if(stream->c2) { + h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2); + if(conn_ctx) + conn_ctx->bytes_sent = stream->out_frame_octets; + } + switch (ftype) { case NGHTTP2_DATA: eos = (flags & NGHTTP2_FLAG_END_STREAM); @@ -468,6 +477,7 @@ apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags, size_ apr_status_t status = APR_SUCCESS; int new_state, eos = 0; + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); new_state = on_frame_recv(stream->state, ftype); if (new_state < 0) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1, @@ -522,6 +532,7 @@ apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags, h2_session *session = stream->session; apr_status_t status = APR_SUCCESS; + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); stream->in_data_frames++; if (len > 0) { if (APLOGctrace3(session->c1)) { @@ -542,11 +553,38 @@ apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags, return status; } +#ifdef AP_DEBUG +static apr_status_t stream_pool_destroy(void *data) +{ + h2_stream *stream = data; + switch (stream->magic) { + case H2_STRM_MAGIC_OK: + ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, stream->session->c1, + H2_STRM_MSG(stream, "was not destroyed explicitly")); + AP_DEBUG_ASSERT(0); + break; + case H2_STRM_MAGIC_SDEL: + /* stream has been explicitly destroyed, as it should */ + H2_STRM_ASSIGN_MAGIC(stream, H2_STRM_MAGIC_PDEL); + break; + case H2_STRM_MAGIC_PDEL: + ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, stream->session->c1, + H2_STRM_MSG(stream, "already pool destroyed")); + AP_DEBUG_ASSERT(0); + break; + default: + AP_DEBUG_ASSERT(0); + } + return APR_SUCCESS; +} +#endif + h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session, h2_stream_monitor *monitor, int initiated_on) { h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream)); - + + H2_STRM_ASSIGN_MAGIC(stream, H2_STRM_MAGIC_OK); stream->id = id; stream->initiated_on = initiated_on; stream->created = apr_time_now(); @@ -554,6 +592,12 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session, stream->pool = pool; stream->session = session; stream->monitor = monitor; +#ifdef AP_DEBUG + if (id) { /* stream 0 has special lifetime */ + apr_pool_cleanup_register(pool, stream, stream_pool_destroy, + apr_pool_cleanup_null); + } +#endif #ifdef H2_NG2_LOCAL_WIN_SIZE if (id) { @@ -575,6 +619,7 @@ void h2_stream_cleanup(h2_stream *stream) * end of the in/out notifications get closed. */ ap_assert(stream); + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); if (stream->out_buffer) { apr_brigade_cleanup(stream->out_buffer); } @@ -583,13 +628,16 @@ void h2_stream_cleanup(h2_stream *stream) void h2_stream_destroy(h2_stream *stream) { ap_assert(stream); + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c1, H2_STRM_MSG(stream, "destroy")); + H2_STRM_ASSIGN_MAGIC(stream, H2_STRM_MAGIC_SDEL); apr_pool_destroy(stream->pool); } void h2_stream_rst(h2_stream *stream, int error_code) { + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); stream->rst_error = error_code; if (stream->c2) { h2_c2_abort(stream->c2, stream->session->c1); @@ -605,6 +653,7 @@ apr_status_t h2_stream_set_request_rec(h2_stream *stream, h2_request *req; apr_status_t status; + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); ap_assert(stream->request == NULL); ap_assert(stream->rtmp == NULL); if (stream->rst_error) { @@ -626,6 +675,7 @@ apr_status_t h2_stream_set_request_rec(h2_stream *stream, void h2_stream_set_request(h2_stream *stream, const h2_request *r) { + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); ap_assert(stream->request == NULL); ap_assert(stream->rtmp == NULL); stream->rtmp = h2_request_clone(stream->pool, r); @@ -685,6 +735,7 @@ apr_status_t h2_stream_add_header(h2_stream *stream, int error = 0, was_added = 0; apr_status_t status = APR_SUCCESS; + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); if (stream->response) { return APR_EINVAL; } @@ -716,6 +767,9 @@ apr_status_t h2_stream_add_header(h2_stream *stream, status = h2_request_add_header(stream->rtmp, stream->pool, name, nlen, value, vlen, session->s->limit_req_fieldsize, &was_added); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c1, + H2_STRM_MSG(stream, "add_header: '%.*s: %.*s"), + (int)nlen, name, (int)vlen, value); if (was_added) ++stream->request_headers_added; } else if (H2_SS_OPEN == stream->state) { @@ -759,6 +813,7 @@ apr_status_t h2_stream_add_header(h2_stream *stream, cleanup: if (error) { + ++stream->request_headers_failed; set_error_response(stream, error); return APR_EINVAL; } @@ -791,6 +846,7 @@ apr_status_t h2_stream_end_headers(h2_stream *stream, int eos, size_t raw_bytes) int is_http_or_https; h2_request *req = stream->rtmp; + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); status = h2_request_end_headers(req, stream->pool, raw_bytes); if (APR_SUCCESS != status || req->http_status != H2_HTTP_STATUS_UNSET) { goto cleanup; @@ -845,7 +901,26 @@ apr_status_t h2_stream_end_headers(h2_stream *stream, int eos, size_t raw_bytes) * of CONNECT requests (see [RFC7230], Section 5.3)). */ if (!ap_cstr_casecmp(req->method, "CONNECT")) { - if (req->scheme || req->path) { + if (req->protocol) { + if (!strcmp("websocket", req->protocol)) { + if (!req->scheme || !req->path) { + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, stream->session->c1, + H2_STRM_LOG(APLOGNO(10457), stream, "Request to websocket CONNECT " + "without :scheme or :path, sending 400 answer")); + set_error_response(stream, HTTP_BAD_REQUEST); + goto cleanup; + } + } + else { + /* do not know that protocol */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c1, APLOGNO(10460) + "':protocol: %s' header present in %s request", + req->protocol, req->method); + set_error_response(stream, HTTP_NOT_IMPLEMENTED); + goto cleanup; + } + } + else if (req->scheme || req->path) { ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, stream->session->c1, H2_STRM_LOG(APLOGNO(10384), stream, "Request to CONNECT " "with :scheme or :path specified, sending 400 answer")); @@ -1039,6 +1114,7 @@ apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, { apr_status_t rv = APR_SUCCESS; + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); if (stream->rst_error) { return APR_ECONNRESET; } @@ -1133,6 +1209,7 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response) apr_array_header_t *pushes; int i; + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); pushes = h2_push_collect_update(stream, stream->request, response); if (pushes && !apr_is_empty_array(pushes)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1, @@ -1152,6 +1229,7 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response) apr_table_t *h2_stream_get_trailers(h2_stream *stream) { + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); return NULL; } @@ -1163,6 +1241,7 @@ const h2_priority *h2_stream_get_priority(h2_stream *stream, h2_headers *response) #endif { + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); if (response && stream->initiated_on) { const char *ctype = apr_table_get(response->headers, "content-type"); if (ctype) { @@ -1176,6 +1255,7 @@ const h2_priority *h2_stream_get_priority(h2_stream *stream, int h2_stream_is_ready(h2_stream *stream) { /* Have we sent a response or do we have the response in our buffer? */ + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); if (stream->response) { return 1; } @@ -1185,13 +1265,23 @@ int h2_stream_is_ready(h2_stream *stream) return 0; } +int h2_stream_wants_send_data(h2_stream *stream) +{ + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); + return h2_stream_is_ready(stream) && + ((stream->out_buffer && !APR_BRIGADE_EMPTY(stream->out_buffer)) || + (stream->output && !h2_beam_empty(stream->output))); +} + int h2_stream_is_at(const h2_stream *stream, h2_stream_state_t state) { + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); return stream->state == state; } int h2_stream_is_at_or_past(const h2_stream *stream, h2_stream_state_t state) { + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); switch (state) { case H2_SS_IDLE: return 1; /* by definition */ @@ -1214,6 +1304,7 @@ apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount) { h2_session *session = stream->session; + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); if (amount > 0) { apr_off_t consumed = amount; @@ -1339,6 +1430,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, H2_SSSN_STRM_MSG(session, stream_id, "data_cb, stream not found")); return NGHTTP2_ERR_CALLBACK_FAILURE; } + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); if (!stream->output || !stream->response || !stream->out_buffer) { return NGHTTP2_ERR_DEFERRED; } @@ -1346,10 +1438,17 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, return NGHTTP2_ERR_DEFERRED; } if (h2_c1_io_needs_flush(&session->io)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c1, - H2_SSSN_STRM_MSG(session, stream_id, "suspending on c1 out needs flush")); - h2_stream_dispatch(stream, H2_SEV_OUT_C1_BLOCK); - return NGHTTP2_ERR_DEFERRED; + rv = h2_c1_io_pass(&session->io); + if (APR_STATUS_IS_EAGAIN(rv)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c1, + H2_SSSN_STRM_MSG(session, stream_id, "suspending on c1 out needs flush")); + h2_stream_dispatch(stream, H2_SEV_OUT_C1_BLOCK); + return NGHTTP2_ERR_DEFERRED; + } + else if (rv) { + h2_session_dispatch_event(session, H2_SESSION_EV_CONN_ERROR, rv, NULL); + return NGHTTP2_ERR_CALLBACK_FAILURE; + } } /* determine how much we'd like to send. We cannot send more than @@ -1361,6 +1460,11 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, length = chunk_len; } } + /* We allow configurable max DATA frame length. */ + if (stream->session->max_data_frame_len > 0 + && length > stream->session->max_data_frame_len) { + length = stream->session->max_data_frame_len; + } /* How much data do we have in our buffers that we can write? * if not enough, receive more. */ @@ -1393,8 +1497,8 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, * it is all fine. */ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c1, H2_SSSN_STRM_MSG(session, stream_id, "rst stream")); - h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR); - return NGHTTP2_ERR_CALLBACK_FAILURE; + h2_stream_rst(stream, H2_ERR_STREAM_CLOSED); + return NGHTTP2_ERR_DEFERRED; } ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c1, H2_SSSN_STRM_MSG(session, stream_id, @@ -1403,10 +1507,17 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, eos = 1; rv = APR_SUCCESS; } + else if (APR_ECONNRESET == rv || APR_ECONNABORTED == rv) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c1, + H2_STRM_LOG(APLOGNO(10471), stream, "data_cb, reading data")); + h2_stream_rst(stream, H2_ERR_STREAM_CLOSED); + return NGHTTP2_ERR_DEFERRED; + } else { ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c1, H2_STRM_LOG(APLOGNO(02938), stream, "data_cb, reading data")); - return NGHTTP2_ERR_CALLBACK_FAILURE; + h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR); + return NGHTTP2_ERR_DEFERRED; } } @@ -1465,6 +1576,7 @@ static apr_status_t stream_do_response(h2_stream *stream) #endif nghttp2_data_provider provider, *pprovider = NULL; + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); ap_assert(!stream->response); ap_assert(stream->out_buffer); @@ -1562,6 +1674,8 @@ static apr_status_t stream_do_response(h2_stream *stream) * denies it, submit resources to push */ const char *s = apr_table_get(resp->notes, H2_PUSH_MODE_NOTE); if (!s || strcmp(s, "0")) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c1, + H2_STRM_MSG(stream, "submit pushes, note=%s"), s); h2_stream_submit_pushes(stream, resp); } } @@ -1653,6 +1767,7 @@ void h2_stream_on_output_change(h2_stream *stream) /* stream->pout_recv_write signalled a change. Check what has happend, read * from it and act on seeing a response/data. */ + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); if (!stream->output) { /* c2 has not assigned the output beam to the stream (yet). */ ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c1, @@ -1701,6 +1816,7 @@ void h2_stream_on_output_change(h2_stream *stream) void h2_stream_on_input_change(h2_stream *stream) { + H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK); ap_assert(stream->input); h2_beam_report_consumption(stream->input); if (h2_stream_is_at(stream, H2_SS_CLOSED_L) |