diff options
Diffstat (limited to 'modules/http2/h2_mplx.c')
-rw-r--r-- | modules/http2/h2_mplx.c | 113 |
1 files changed, 93 insertions, 20 deletions
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 99c47ea..2aeea42 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -146,6 +146,7 @@ static void m_stream_cleanup(h2_mplx *m, h2_stream *stream) if (c2_ctx->beam_in) { h2_beam_on_send(c2_ctx->beam_in, NULL, NULL); h2_beam_on_received(c2_ctx->beam_in, NULL, NULL); + h2_beam_on_eagain(c2_ctx->beam_in, NULL, NULL); h2_beam_on_consumed(c2_ctx->beam_in, NULL, NULL); } } @@ -333,7 +334,6 @@ h2_mplx *h2_mplx_c1_create(int child_num, apr_uint32_t id, h2_stream *stream0, apr_pollset_add(m->pollset, &conn_ctx->pfd); } - m->scratch_r = apr_pcalloc(m->pool, sizeof(*m->scratch_r)); m->max_spare_transits = 3; m->c2_transits = apr_array_make(m->pool, (int)m->max_spare_transits, sizeof(h2_c2_transit*)); @@ -394,6 +394,31 @@ apr_status_t h2_mplx_c1_streams_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx) return APR_SUCCESS; } +typedef struct { + int stream_count; + int stream_want_send; +} stream_iter_aws_t; + +static int m_stream_want_send_data(void *ctx, void *stream) +{ + stream_iter_aws_t *x = ctx; + ++x->stream_count; + if (h2_stream_wants_send_data(stream)) + ++x->stream_want_send; + return 1; +} + +int h2_mplx_c1_all_streams_want_send_data(h2_mplx *m) +{ + stream_iter_aws_t x; + x.stream_count = 0; + x.stream_want_send = 0; + H2_MPLX_ENTER(m); + h2_ihash_iter(m->streams, m_stream_want_send_data, &x); + H2_MPLX_LEAVE(m); + return x.stream_count && (x.stream_count == x.stream_want_send); +} + static int m_report_stream_iter(void *ctx, void *val) { h2_mplx *m = ctx; h2_stream *stream = val; @@ -441,6 +466,8 @@ static int m_stream_cancel_iter(void *ctx, void *val) { return 0; } +static void c1_purge_streams(h2_mplx *m); + void h2_mplx_c1_destroy(h2_mplx *m) { apr_status_t status; @@ -509,7 +536,9 @@ void h2_mplx_c1_destroy(h2_mplx *m) h2_ihash_count(m->shold)); h2_ihash_iter(m->shold, m_unexpected_stream_iter, m); } - + + c1_purge_streams(m); + m->c1->aborted = old_aborted; H2_MPLX_LEAVE(m); @@ -542,16 +571,6 @@ const h2_stream *h2_mplx_c2_stream_get(h2_mplx *m, int stream_id) } -static void c1_update_scoreboard(h2_mplx *m, h2_stream *stream) -{ - if (stream->c2) { - m->scratch_r->connection = stream->c2; - m->scratch_r->bytes_sent = stream->out_frame_octets; - ap_increment_counts(m->c1->sbh, m->scratch_r); - m->scratch_r->connection = NULL; - } -} - static void c1_purge_streams(h2_mplx *m) { h2_stream *stream; @@ -561,8 +580,6 @@ static void c1_purge_streams(h2_mplx *m) stream = APR_ARRAY_IDX(m->spurge, i, h2_stream*); ap_assert(stream->state == H2_SS_CLEANUP); - c1_update_scoreboard(m, stream); - if (stream->input) { h2_beam_destroy(stream->input, m->c1); stream->input = NULL; @@ -585,6 +602,15 @@ static void c1_purge_streams(h2_mplx *m) apr_array_clear(m->spurge); } +void h2_mplx_c1_going_keepalive(h2_mplx *m) +{ + H2_MPLX_ENTER_ALWAYS(m); + if (m->spurge->nelts) { + c1_purge_streams(m); + } + H2_MPLX_LEAVE(m); +} + apr_status_t h2_mplx_c1_poll(h2_mplx *m, apr_interval_time_t timeout, stream_ev_callback *on_stream_input, stream_ev_callback *on_stream_output, @@ -653,8 +679,12 @@ static apr_status_t c1_process_stream(h2_mplx *m, if (APLOGctrace1(m->c1)) { const h2_request *r = stream->request; ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1, - H2_STRM_MSG(stream, "process %s %s://%s%s"), - r->method, r->scheme, r->authority, r->path); + H2_STRM_MSG(stream, "process %s%s%s %s%s%s%s"), + r->protocol? r->protocol : "", + r->protocol? " " : "", + r->method, r->scheme? r->scheme : "", + r->scheme? "://" : "", + r->authority, r->path? r->path: ""); } stream->scheduled = 1; @@ -765,6 +795,19 @@ static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam) } } +static void c2_beam_input_read_eagain(void *ctx, h2_bucket_beam *beam) +{ + conn_rec *c = ctx; + h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c); + /* installed in the input bucket beams when we use pipes. + * Drain the pipe just before the beam returns APR_EAGAIN. + * A clean state for allowing polling on the pipe to rest + * when the beam is empty */ + if (conn_ctx && conn_ctx->pipe_in[H2_PIPE_OUT]) { + h2_util_drain_pipe(conn_ctx->pipe_in[H2_PIPE_OUT]); + } +} + static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam) { conn_rec *c = ctx; @@ -809,6 +852,9 @@ static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream, h2_ c2->pool, c2->pool); if (APR_SUCCESS != rv) goto cleanup; #endif + h2_beam_on_eagain(stream->input, c2_beam_input_read_eagain, c2); + if (!h2_beam_empty(stream->input)) + c2_beam_input_write_notify(c2, stream->input); } cleanup: @@ -915,6 +961,15 @@ static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx) "h2_c2(%s-%d): processing finished without final response", conn_ctx->id, conn_ctx->stream_id); c2->aborted = 1; + if (conn_ctx->beam_out) + h2_beam_abort(conn_ctx->beam_out, c2); + } + else if (!conn_ctx->beam_out || !h2_beam_is_complete(conn_ctx->beam_out)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, conn_ctx->last_err, c2, + "h2_c2(%s-%d): processing finished with incomplete output", + conn_ctx->id, conn_ctx->stream_id); + c2->aborted = 1; + h2_beam_abort(conn_ctx->beam_out, c2); } else if (!c2->aborted) { s_mplx_be_happy(m, c2, conn_ctx); @@ -1064,14 +1119,32 @@ static int reset_is_acceptable(h2_stream *stream) return 1; /* otherwise, be forgiving */ } -apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id) +apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id, h2_stream *stream) { - h2_stream *stream; apr_status_t status = APR_SUCCESS; + int registered; H2_MPLX_ENTER_ALWAYS(m); - stream = h2_ihash_get(m->streams, stream_id); - if (stream && !reset_is_acceptable(stream)) { + registered = (h2_ihash_get(m->streams, stream_id) != NULL); + if (!stream) { + /* a RST might arrive so late, we have already forgotten + * about it. Seems ok. */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, + H2_MPLX_MSG(m, "RST on unknown stream %d"), stream_id); + AP_DEBUG_ASSERT(!registered); + } + else if (!registered) { + /* a RST on a stream that mplx has not been told about, but + * which the session knows. Very early and annoying. */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, + H2_STRM_MSG(stream, "very early RST, drop")); + h2_stream_set_monitor(stream, NULL); + h2_stream_rst(stream, H2_ERR_STREAM_CLOSED); + h2_stream_dispatch(stream, H2_SEV_EOS_SENT); + m_stream_cleanup(m, stream); + m_be_annoyed(m); + } + else if (!reset_is_acceptable(stream)) { m_be_annoyed(m); } H2_MPLX_LEAVE(m); |