summaryrefslogtreecommitdiffstats
path: root/modules/http2/h2_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'modules/http2/h2_stream.c')
-rw-r--r--modules/http2/h2_stream.c136
1 files changed, 126 insertions, 10 deletions
diff --git a/modules/http2/h2_stream.c b/modules/http2/h2_stream.c
index cf6f798..ee87555 100644
--- a/modules/http2/h2_stream.c
+++ b/modules/http2/h2_stream.c
@@ -125,7 +125,7 @@ static int trans_on_event[][H2_SS_MAX] = {
{ S_XXX, S_ERR, S_ERR, S_CL_L, S_CLS, S_XXX, S_XXX, S_XXX, },/* EV_CLOSED_L*/
{ S_ERR, S_ERR, S_ERR, S_CL_R, S_ERR, S_CLS, S_NOP, S_NOP, },/* EV_CLOSED_R*/
{ S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_CLS, S_NOP, S_NOP, },/* EV_CANCELLED*/
-{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_XXX, },/* EV_EOS_SENT*/
+{ S_NOP, S_XXX, S_XXX, S_XXX, S_XXX, S_CLS, S_CLN, S_NOP, },/* EV_EOS_SENT*/
{ S_NOP, S_XXX, S_CLS, S_XXX, S_XXX, S_CLS, S_XXX, S_XXX, },/* EV_IN_ERROR*/
};
@@ -166,6 +166,7 @@ static int on_frame_recv(h2_stream_state_t state, int frame_type)
static int on_event(h2_stream* stream, h2_stream_event_t ev)
{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
if (stream->monitor && stream->monitor->on_event) {
stream->monitor->on_event(stream->monitor->ctx, stream, ev);
}
@@ -392,6 +393,7 @@ void h2_stream_dispatch(h2_stream *stream, h2_stream_event_t ev)
{
int new_state;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c1,
H2_STRM_MSG(stream, "dispatch event %d"), ev);
new_state = on_event(stream, ev);
@@ -425,6 +427,7 @@ apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags, size_
apr_status_t status = APR_SUCCESS;
int new_state, eos = 0;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
new_state = on_frame_send(stream->state, ftype);
if (new_state < 0) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1,
@@ -435,6 +438,12 @@ apr_status_t h2_stream_send_frame(h2_stream *stream, int ftype, int flags, size_
++stream->out_frames;
stream->out_frame_octets += frame_len;
+ if(stream->c2) {
+ h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2);
+ if(conn_ctx)
+ conn_ctx->bytes_sent = stream->out_frame_octets;
+ }
+
switch (ftype) {
case NGHTTP2_DATA:
eos = (flags & NGHTTP2_FLAG_END_STREAM);
@@ -468,6 +477,7 @@ apr_status_t h2_stream_recv_frame(h2_stream *stream, int ftype, int flags, size_
apr_status_t status = APR_SUCCESS;
int new_state, eos = 0;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
new_state = on_frame_recv(stream->state, ftype);
if (new_state < 0) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1,
@@ -522,6 +532,7 @@ apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
h2_session *session = stream->session;
apr_status_t status = APR_SUCCESS;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
stream->in_data_frames++;
if (len > 0) {
if (APLOGctrace3(session->c1)) {
@@ -542,11 +553,38 @@ apr_status_t h2_stream_recv_DATA(h2_stream *stream, uint8_t flags,
return status;
}
+#ifdef AP_DEBUG
+static apr_status_t stream_pool_destroy(void *data)
+{
+ h2_stream *stream = data;
+ switch (stream->magic) {
+ case H2_STRM_MAGIC_OK:
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, stream->session->c1,
+ H2_STRM_MSG(stream, "was not destroyed explicitly"));
+ AP_DEBUG_ASSERT(0);
+ break;
+ case H2_STRM_MAGIC_SDEL:
+ /* stream has been explicitly destroyed, as it should */
+ H2_STRM_ASSIGN_MAGIC(stream, H2_STRM_MAGIC_PDEL);
+ break;
+ case H2_STRM_MAGIC_PDEL:
+ ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, stream->session->c1,
+ H2_STRM_MSG(stream, "already pool destroyed"));
+ AP_DEBUG_ASSERT(0);
+ break;
+ default:
+ AP_DEBUG_ASSERT(0);
+ }
+ return APR_SUCCESS;
+}
+#endif
+
h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
h2_stream_monitor *monitor, int initiated_on)
{
h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
-
+
+ H2_STRM_ASSIGN_MAGIC(stream, H2_STRM_MAGIC_OK);
stream->id = id;
stream->initiated_on = initiated_on;
stream->created = apr_time_now();
@@ -554,6 +592,12 @@ h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session,
stream->pool = pool;
stream->session = session;
stream->monitor = monitor;
+#ifdef AP_DEBUG
+ if (id) { /* stream 0 has special lifetime */
+ apr_pool_cleanup_register(pool, stream, stream_pool_destroy,
+ apr_pool_cleanup_null);
+ }
+#endif
#ifdef H2_NG2_LOCAL_WIN_SIZE
if (id) {
@@ -575,6 +619,7 @@ void h2_stream_cleanup(h2_stream *stream)
* end of the in/out notifications get closed.
*/
ap_assert(stream);
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
if (stream->out_buffer) {
apr_brigade_cleanup(stream->out_buffer);
}
@@ -583,13 +628,16 @@ void h2_stream_cleanup(h2_stream *stream)
void h2_stream_destroy(h2_stream *stream)
{
ap_assert(stream);
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c1,
H2_STRM_MSG(stream, "destroy"));
+ H2_STRM_ASSIGN_MAGIC(stream, H2_STRM_MAGIC_SDEL);
apr_pool_destroy(stream->pool);
}
void h2_stream_rst(h2_stream *stream, int error_code)
{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
stream->rst_error = error_code;
if (stream->c2) {
h2_c2_abort(stream->c2, stream->session->c1);
@@ -605,6 +653,7 @@ apr_status_t h2_stream_set_request_rec(h2_stream *stream,
h2_request *req;
apr_status_t status;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
ap_assert(stream->request == NULL);
ap_assert(stream->rtmp == NULL);
if (stream->rst_error) {
@@ -626,6 +675,7 @@ apr_status_t h2_stream_set_request_rec(h2_stream *stream,
void h2_stream_set_request(h2_stream *stream, const h2_request *r)
{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
ap_assert(stream->request == NULL);
ap_assert(stream->rtmp == NULL);
stream->rtmp = h2_request_clone(stream->pool, r);
@@ -685,6 +735,7 @@ apr_status_t h2_stream_add_header(h2_stream *stream,
int error = 0, was_added = 0;
apr_status_t status = APR_SUCCESS;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
if (stream->response) {
return APR_EINVAL;
}
@@ -716,6 +767,9 @@ apr_status_t h2_stream_add_header(h2_stream *stream,
status = h2_request_add_header(stream->rtmp, stream->pool,
name, nlen, value, vlen,
session->s->limit_req_fieldsize, &was_added);
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, session->c1,
+ H2_STRM_MSG(stream, "add_header: '%.*s: %.*s"),
+ (int)nlen, name, (int)vlen, value);
if (was_added) ++stream->request_headers_added;
}
else if (H2_SS_OPEN == stream->state) {
@@ -759,6 +813,7 @@ apr_status_t h2_stream_add_header(h2_stream *stream,
cleanup:
if (error) {
+ ++stream->request_headers_failed;
set_error_response(stream, error);
return APR_EINVAL;
}
@@ -791,6 +846,7 @@ apr_status_t h2_stream_end_headers(h2_stream *stream, int eos, size_t raw_bytes)
int is_http_or_https;
h2_request *req = stream->rtmp;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
status = h2_request_end_headers(req, stream->pool, raw_bytes);
if (APR_SUCCESS != status || req->http_status != H2_HTTP_STATUS_UNSET) {
goto cleanup;
@@ -845,7 +901,26 @@ apr_status_t h2_stream_end_headers(h2_stream *stream, int eos, size_t raw_bytes)
* of CONNECT requests (see [RFC7230], Section 5.3)).
*/
if (!ap_cstr_casecmp(req->method, "CONNECT")) {
- if (req->scheme || req->path) {
+ if (req->protocol) {
+ if (!strcmp("websocket", req->protocol)) {
+ if (!req->scheme || !req->path) {
+ ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, stream->session->c1,
+ H2_STRM_LOG(APLOGNO(10457), stream, "Request to websocket CONNECT "
+ "without :scheme or :path, sending 400 answer"));
+ set_error_response(stream, HTTP_BAD_REQUEST);
+ goto cleanup;
+ }
+ }
+ else {
+ /* do not know that protocol */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->session->c1, APLOGNO(10460)
+ "':protocol: %s' header present in %s request",
+ req->protocol, req->method);
+ set_error_response(stream, HTTP_NOT_IMPLEMENTED);
+ goto cleanup;
+ }
+ }
+ else if (req->scheme || req->path) {
ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, stream->session->c1,
H2_STRM_LOG(APLOGNO(10384), stream, "Request to CONNECT "
"with :scheme or :path specified, sending 400 answer"));
@@ -1039,6 +1114,7 @@ apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb,
{
apr_status_t rv = APR_SUCCESS;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
if (stream->rst_error) {
return APR_ECONNRESET;
}
@@ -1133,6 +1209,7 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
apr_array_header_t *pushes;
int i;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
pushes = h2_push_collect_update(stream, stream->request, response);
if (pushes && !apr_is_empty_array(pushes)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c1,
@@ -1152,6 +1229,7 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream, h2_headers *response)
apr_table_t *h2_stream_get_trailers(h2_stream *stream)
{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
return NULL;
}
@@ -1163,6 +1241,7 @@ const h2_priority *h2_stream_get_priority(h2_stream *stream,
h2_headers *response)
#endif
{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
if (response && stream->initiated_on) {
const char *ctype = apr_table_get(response->headers, "content-type");
if (ctype) {
@@ -1176,6 +1255,7 @@ const h2_priority *h2_stream_get_priority(h2_stream *stream,
int h2_stream_is_ready(h2_stream *stream)
{
/* Have we sent a response or do we have the response in our buffer? */
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
if (stream->response) {
return 1;
}
@@ -1185,13 +1265,23 @@ int h2_stream_is_ready(h2_stream *stream)
return 0;
}
+int h2_stream_wants_send_data(h2_stream *stream)
+{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
+ return h2_stream_is_ready(stream) &&
+ ((stream->out_buffer && !APR_BRIGADE_EMPTY(stream->out_buffer)) ||
+ (stream->output && !h2_beam_empty(stream->output)));
+}
+
int h2_stream_is_at(const h2_stream *stream, h2_stream_state_t state)
{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
return stream->state == state;
}
int h2_stream_is_at_or_past(const h2_stream *stream, h2_stream_state_t state)
{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
switch (state) {
case H2_SS_IDLE:
return 1; /* by definition */
@@ -1214,6 +1304,7 @@ apr_status_t h2_stream_in_consumed(h2_stream *stream, apr_off_t amount)
{
h2_session *session = stream->session;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
if (amount > 0) {
apr_off_t consumed = amount;
@@ -1339,6 +1430,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
H2_SSSN_STRM_MSG(session, stream_id, "data_cb, stream not found"));
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
if (!stream->output || !stream->response || !stream->out_buffer) {
return NGHTTP2_ERR_DEFERRED;
}
@@ -1346,10 +1438,17 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
return NGHTTP2_ERR_DEFERRED;
}
if (h2_c1_io_needs_flush(&session->io)) {
- ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c1,
- H2_SSSN_STRM_MSG(session, stream_id, "suspending on c1 out needs flush"));
- h2_stream_dispatch(stream, H2_SEV_OUT_C1_BLOCK);
- return NGHTTP2_ERR_DEFERRED;
+ rv = h2_c1_io_pass(&session->io);
+ if (APR_STATUS_IS_EAGAIN(rv)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c1,
+ H2_SSSN_STRM_MSG(session, stream_id, "suspending on c1 out needs flush"));
+ h2_stream_dispatch(stream, H2_SEV_OUT_C1_BLOCK);
+ return NGHTTP2_ERR_DEFERRED;
+ }
+ else if (rv) {
+ h2_session_dispatch_event(session, H2_SESSION_EV_CONN_ERROR, rv, NULL);
+ return NGHTTP2_ERR_CALLBACK_FAILURE;
+ }
}
/* determine how much we'd like to send. We cannot send more than
@@ -1361,6 +1460,11 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
length = chunk_len;
}
}
+ /* We allow configurable max DATA frame length. */
+ if (stream->session->max_data_frame_len > 0
+ && length > stream->session->max_data_frame_len) {
+ length = stream->session->max_data_frame_len;
+ }
/* How much data do we have in our buffers that we can write?
* if not enough, receive more. */
@@ -1393,8 +1497,8 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
* it is all fine. */
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c1,
H2_SSSN_STRM_MSG(session, stream_id, "rst stream"));
- h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
- return NGHTTP2_ERR_CALLBACK_FAILURE;
+ h2_stream_rst(stream, H2_ERR_STREAM_CLOSED);
+ return NGHTTP2_ERR_DEFERRED;
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, rv, c1,
H2_SSSN_STRM_MSG(session, stream_id,
@@ -1403,10 +1507,17 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s,
eos = 1;
rv = APR_SUCCESS;
}
+ else if (APR_ECONNRESET == rv || APR_ECONNABORTED == rv) {
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c1,
+ H2_STRM_LOG(APLOGNO(10471), stream, "data_cb, reading data"));
+ h2_stream_rst(stream, H2_ERR_STREAM_CLOSED);
+ return NGHTTP2_ERR_DEFERRED;
+ }
else {
ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c1,
H2_STRM_LOG(APLOGNO(02938), stream, "data_cb, reading data"));
- return NGHTTP2_ERR_CALLBACK_FAILURE;
+ h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
+ return NGHTTP2_ERR_DEFERRED;
}
}
@@ -1465,6 +1576,7 @@ static apr_status_t stream_do_response(h2_stream *stream)
#endif
nghttp2_data_provider provider, *pprovider = NULL;
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
ap_assert(!stream->response);
ap_assert(stream->out_buffer);
@@ -1562,6 +1674,8 @@ static apr_status_t stream_do_response(h2_stream *stream)
* denies it, submit resources to push */
const char *s = apr_table_get(resp->notes, H2_PUSH_MODE_NOTE);
if (!s || strcmp(s, "0")) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c1,
+ H2_STRM_MSG(stream, "submit pushes, note=%s"), s);
h2_stream_submit_pushes(stream, resp);
}
}
@@ -1653,6 +1767,7 @@ void h2_stream_on_output_change(h2_stream *stream)
/* stream->pout_recv_write signalled a change. Check what has happend, read
* from it and act on seeing a response/data. */
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
if (!stream->output) {
/* c2 has not assigned the output beam to the stream (yet). */
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c1,
@@ -1701,6 +1816,7 @@ void h2_stream_on_output_change(h2_stream *stream)
void h2_stream_on_input_change(h2_stream *stream)
{
+ H2_STRM_ASSERT_MAGIC(stream, H2_STRM_MAGIC_OK);
ap_assert(stream->input);
h2_beam_report_consumption(stream->input);
if (h2_stream_is_at(stream, H2_SS_CLOSED_L)