summaryrefslogtreecommitdiffstats
path: root/modules/http2/h2_mplx.c
diff options
context:
space:
mode:
Diffstat (limited to 'modules/http2/h2_mplx.c')
-rw-r--r--modules/http2/h2_mplx.c113
1 files changed, 93 insertions, 20 deletions
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c
index 99c47ea..2aeea42 100644
--- a/modules/http2/h2_mplx.c
+++ b/modules/http2/h2_mplx.c
@@ -146,6 +146,7 @@ static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)
if (c2_ctx->beam_in) {
h2_beam_on_send(c2_ctx->beam_in, NULL, NULL);
h2_beam_on_received(c2_ctx->beam_in, NULL, NULL);
+ h2_beam_on_eagain(c2_ctx->beam_in, NULL, NULL);
h2_beam_on_consumed(c2_ctx->beam_in, NULL, NULL);
}
}
@@ -333,7 +334,6 @@ h2_mplx *h2_mplx_c1_create(int child_num, apr_uint32_t id, h2_stream *stream0,
apr_pollset_add(m->pollset, &conn_ctx->pfd);
}
- m->scratch_r = apr_pcalloc(m->pool, sizeof(*m->scratch_r));
m->max_spare_transits = 3;
m->c2_transits = apr_array_make(m->pool, (int)m->max_spare_transits,
sizeof(h2_c2_transit*));
@@ -394,6 +394,31 @@ apr_status_t h2_mplx_c1_streams_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx)
return APR_SUCCESS;
}
+typedef struct {
+ int stream_count;
+ int stream_want_send;
+} stream_iter_aws_t;
+
+static int m_stream_want_send_data(void *ctx, void *stream)
+{
+ stream_iter_aws_t *x = ctx;
+ ++x->stream_count;
+ if (h2_stream_wants_send_data(stream))
+ ++x->stream_want_send;
+ return 1;
+}
+
+int h2_mplx_c1_all_streams_want_send_data(h2_mplx *m)
+{
+ stream_iter_aws_t x;
+ x.stream_count = 0;
+ x.stream_want_send = 0;
+ H2_MPLX_ENTER(m);
+ h2_ihash_iter(m->streams, m_stream_want_send_data, &x);
+ H2_MPLX_LEAVE(m);
+ return x.stream_count && (x.stream_count == x.stream_want_send);
+}
+
static int m_report_stream_iter(void *ctx, void *val) {
h2_mplx *m = ctx;
h2_stream *stream = val;
@@ -441,6 +466,8 @@ static int m_stream_cancel_iter(void *ctx, void *val) {
return 0;
}
+static void c1_purge_streams(h2_mplx *m);
+
void h2_mplx_c1_destroy(h2_mplx *m)
{
apr_status_t status;
@@ -509,7 +536,9 @@ void h2_mplx_c1_destroy(h2_mplx *m)
h2_ihash_count(m->shold));
h2_ihash_iter(m->shold, m_unexpected_stream_iter, m);
}
-
+
+ c1_purge_streams(m);
+
m->c1->aborted = old_aborted;
H2_MPLX_LEAVE(m);
@@ -542,16 +571,6 @@ const h2_stream *h2_mplx_c2_stream_get(h2_mplx *m, int stream_id)
}
-static void c1_update_scoreboard(h2_mplx *m, h2_stream *stream)
-{
- if (stream->c2) {
- m->scratch_r->connection = stream->c2;
- m->scratch_r->bytes_sent = stream->out_frame_octets;
- ap_increment_counts(m->c1->sbh, m->scratch_r);
- m->scratch_r->connection = NULL;
- }
-}
-
static void c1_purge_streams(h2_mplx *m)
{
h2_stream *stream;
@@ -561,8 +580,6 @@ static void c1_purge_streams(h2_mplx *m)
stream = APR_ARRAY_IDX(m->spurge, i, h2_stream*);
ap_assert(stream->state == H2_SS_CLEANUP);
- c1_update_scoreboard(m, stream);
-
if (stream->input) {
h2_beam_destroy(stream->input, m->c1);
stream->input = NULL;
@@ -585,6 +602,15 @@ static void c1_purge_streams(h2_mplx *m)
apr_array_clear(m->spurge);
}
+void h2_mplx_c1_going_keepalive(h2_mplx *m)
+{
+ H2_MPLX_ENTER_ALWAYS(m);
+ if (m->spurge->nelts) {
+ c1_purge_streams(m);
+ }
+ H2_MPLX_LEAVE(m);
+}
+
apr_status_t h2_mplx_c1_poll(h2_mplx *m, apr_interval_time_t timeout,
stream_ev_callback *on_stream_input,
stream_ev_callback *on_stream_output,
@@ -653,8 +679,12 @@ static apr_status_t c1_process_stream(h2_mplx *m,
if (APLOGctrace1(m->c1)) {
const h2_request *r = stream->request;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
- H2_STRM_MSG(stream, "process %s %s://%s%s"),
- r->method, r->scheme, r->authority, r->path);
+ H2_STRM_MSG(stream, "process %s%s%s %s%s%s%s"),
+ r->protocol? r->protocol : "",
+ r->protocol? " " : "",
+ r->method, r->scheme? r->scheme : "",
+ r->scheme? "://" : "",
+ r->authority, r->path? r->path: "");
}
stream->scheduled = 1;
@@ -765,6 +795,19 @@ static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam)
}
}
+static void c2_beam_input_read_eagain(void *ctx, h2_bucket_beam *beam)
+{
+ conn_rec *c = ctx;
+ h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
+ /* installed in the input bucket beams when we use pipes.
+ * Drain the pipe just before the beam returns APR_EAGAIN.
+ * A clean state for allowing polling on the pipe to rest
+ * when the beam is empty */
+ if (conn_ctx && conn_ctx->pipe_in[H2_PIPE_OUT]) {
+ h2_util_drain_pipe(conn_ctx->pipe_in[H2_PIPE_OUT]);
+ }
+}
+
static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam)
{
conn_rec *c = ctx;
@@ -809,6 +852,9 @@ static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream, h2_
c2->pool, c2->pool);
if (APR_SUCCESS != rv) goto cleanup;
#endif
+ h2_beam_on_eagain(stream->input, c2_beam_input_read_eagain, c2);
+ if (!h2_beam_empty(stream->input))
+ c2_beam_input_write_notify(c2, stream->input);
}
cleanup:
@@ -915,6 +961,15 @@ static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
"h2_c2(%s-%d): processing finished without final response",
conn_ctx->id, conn_ctx->stream_id);
c2->aborted = 1;
+ if (conn_ctx->beam_out)
+ h2_beam_abort(conn_ctx->beam_out, c2);
+ }
+ else if (!conn_ctx->beam_out || !h2_beam_is_complete(conn_ctx->beam_out)) {
+ ap_log_cerror(APLOG_MARK, APLOG_TRACE1, conn_ctx->last_err, c2,
+ "h2_c2(%s-%d): processing finished with incomplete output",
+ conn_ctx->id, conn_ctx->stream_id);
+ c2->aborted = 1;
+ h2_beam_abort(conn_ctx->beam_out, c2);
}
else if (!c2->aborted) {
s_mplx_be_happy(m, c2, conn_ctx);
@@ -1064,14 +1119,32 @@ static int reset_is_acceptable(h2_stream *stream)
return 1; /* otherwise, be forgiving */
}
-apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id)
+apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id, h2_stream *stream)
{
- h2_stream *stream;
apr_status_t status = APR_SUCCESS;
+ int registered;
H2_MPLX_ENTER_ALWAYS(m);
- stream = h2_ihash_get(m->streams, stream_id);
- if (stream && !reset_is_acceptable(stream)) {
+ registered = (h2_ihash_get(m->streams, stream_id) != NULL);
+ if (!stream) {
+ /* a RST might arrive so late, we have already forgotten
+ * about it. Seems ok. */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1,
+ H2_MPLX_MSG(m, "RST on unknown stream %d"), stream_id);
+ AP_DEBUG_ASSERT(!registered);
+ }
+ else if (!registered) {
+ /* a RST on a stream that mplx has not been told about, but
+ * which the session knows. Very early and annoying. */
+ ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1,
+ H2_STRM_MSG(stream, "very early RST, drop"));
+ h2_stream_set_monitor(stream, NULL);
+ h2_stream_rst(stream, H2_ERR_STREAM_CLOSED);
+ h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
+ m_stream_cleanup(m, stream);
+ m_be_annoyed(m);
+ }
+ else if (!reset_is_acceptable(stream)) {
m_be_annoyed(m);
}
H2_MPLX_LEAVE(m);