diff options
Diffstat (limited to 'modules/http2/h2_mplx.c')
-rw-r--r-- | modules/http2/h2_mplx.c | 1834 |
1 files changed, 908 insertions, 926 deletions
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 0fae117..2aeea42 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -26,7 +26,9 @@ #include <httpd.h> #include <http_core.h> +#include <http_connection.h> #include <http_log.h> +#include <http_protocol.h> #include <mpm_common.h> @@ -36,15 +38,14 @@ #include "h2_private.h" #include "h2_bucket_beam.h" #include "h2_config.h" -#include "h2_conn.h" -#include "h2_ctx.h" -#include "h2_h2.h" +#include "h2_c1.h" +#include "h2_conn_ctx.h" +#include "h2_protocol.h" #include "h2_mplx.h" -#include "h2_ngn_shed.h" #include "h2_request.h" #include "h2_stream.h" #include "h2_session.h" -#include "h2_task.h" +#include "h2_c2.h" #include "h2_workers.h" #include "h2_util.h" @@ -54,16 +55,40 @@ typedef struct { h2_mplx *m; h2_stream *stream; apr_time_t now; + apr_size_t count; } stream_iter_ctx; -apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s) +static conn_rec *c2_prod_next(void *baton, int *phas_more); +static void c2_prod_done(void *baton, conn_rec *c2); +static void workers_shutdown(void *baton, int graceful); + +static void s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn_ctx); +static void m_be_annoyed(h2_mplx *m); + +static apr_status_t mplx_pollset_create(h2_mplx *m); +static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, + stream_ev_callback *on_stream_input, + stream_ev_callback *on_stream_output, + void *on_ctx); + +static apr_pool_t *pchild; + +/* APR callback invoked if allocation fails. */ +static int abort_on_oom(int retcode) { + ap_abort_on_oom(); + return retcode; /* unreachable, hopefully. */ +} + +apr_status_t h2_mplx_c1_child_init(apr_pool_t *pool, server_rec *s) +{ + pchild = pool; return APR_SUCCESS; } #define H2_MPLX_ENTER(m) \ - do { apr_status_t rv; if ((rv = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\ - return rv;\ + do { apr_status_t rv_lock; if ((rv_lock = apr_thread_mutex_lock(m->lock)) != APR_SUCCESS) {\ + return rv_lock;\ } } while(0) #define H2_MPLX_LEAVE(m) \ @@ -72,71 +97,150 @@ apr_status_t h2_mplx_child_init(apr_pool_t *pool, server_rec *s) #define H2_MPLX_ENTER_ALWAYS(m) \ apr_thread_mutex_lock(m->lock) -#define H2_MPLX_ENTER_MAYBE(m, lock) \ - if (lock) apr_thread_mutex_lock(m->lock) - -#define H2_MPLX_LEAVE_MAYBE(m, lock) \ - if (lock) apr_thread_mutex_unlock(m->lock) +#define H2_MPLX_ENTER_MAYBE(m, dolock) \ + if (dolock) apr_thread_mutex_lock(m->lock) -static void check_data_for(h2_mplx *m, h2_stream *stream, int lock); +#define H2_MPLX_LEAVE_MAYBE(m, dolock) \ + if (dolock) apr_thread_mutex_unlock(m->lock) -static void stream_output_consumed(void *ctx, - h2_bucket_beam *beam, apr_off_t length) +static void c1_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) { - h2_stream *stream = ctx; - h2_task *task = stream->task; - - if (length > 0 && task && task->assigned) { - h2_req_engine_out_consumed(task->assigned, task->c, length); - } + h2_stream_in_consumed(ctx, length); } -static void stream_input_ev(void *ctx, h2_bucket_beam *beam) +static int stream_is_running(h2_stream *stream) { - h2_stream *stream = ctx; - h2_mplx *m = stream->session->mplx; - apr_atomic_set32(&m->event_pending, 1); + h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2); + return conn_ctx && apr_atomic_read32(&conn_ctx->started) != 0 + && apr_atomic_read32(&conn_ctx->done) == 0; } -static void stream_input_consumed(void *ctx, h2_bucket_beam *beam, apr_off_t length) +int h2_mplx_c1_stream_is_running(h2_mplx *m, h2_stream *stream) { - h2_stream_in_consumed(ctx, length); + int rv; + + H2_MPLX_ENTER(m); + rv = stream_is_running(stream); + H2_MPLX_LEAVE(m); + return rv; } -static void stream_joined(h2_mplx *m, h2_stream *stream) +static void c1c2_stream_joined(h2_mplx *m, h2_stream *stream) { - ap_assert(!stream->task || stream->task->worker_done); + ap_assert(!stream_is_running(stream)); h2_ihash_remove(m->shold, stream->id); - h2_ihash_add(m->spurge, stream); + APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream; } -static void stream_cleanup(h2_mplx *m, h2_stream *stream) +static void m_stream_cleanup(h2_mplx *m, h2_stream *stream) { - ap_assert(stream->state == H2_SS_CLEANUP); + h2_conn_ctx_t *c2_ctx = h2_conn_ctx_get(stream->c2); - if (stream->input) { - h2_beam_on_consumed(stream->input, NULL, NULL, NULL); - h2_beam_abort(stream->input); - } - if (stream->output) { - h2_beam_on_produced(stream->output, NULL, NULL); - h2_beam_leave(stream->output); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, + H2_STRM_MSG(stream, "cleanup, unsubscribing from beam events")); + if (c2_ctx) { + if (c2_ctx->beam_out) { + h2_beam_on_was_empty(c2_ctx->beam_out, NULL, NULL); + } + if (c2_ctx->beam_in) { + h2_beam_on_send(c2_ctx->beam_in, NULL, NULL); + h2_beam_on_received(c2_ctx->beam_in, NULL, NULL); + h2_beam_on_eagain(c2_ctx->beam_in, NULL, NULL); + h2_beam_on_consumed(c2_ctx->beam_in, NULL, NULL); + } } - - h2_stream_cleanup(stream); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, + H2_STRM_MSG(stream, "cleanup, removing from registries")); + ap_assert(stream->state == H2_SS_CLEANUP); + h2_stream_cleanup(stream); h2_ihash_remove(m->streams, stream->id); h2_iq_remove(m->q, stream->id); - h2_ififo_remove(m->readyq, stream->id); - h2_ihash_add(m->shold, stream); - - if (!stream->task || stream->task->worker_done) { - stream_joined(m, stream); + + if (c2_ctx) { + if (!stream_is_running(stream)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, + H2_STRM_MSG(stream, "cleanup, c2 is done, move to spurge")); + /* processing has finished */ + APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream; + } + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, + H2_STRM_MSG(stream, "cleanup, c2 is running, abort")); + /* c2 is still running */ + h2_c2_abort(stream->c2, m->c1); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, + H2_STRM_MSG(stream, "cleanup, c2 is done, move to shold")); + h2_ihash_add(m->shold, stream); + } } - else if (stream->task) { - stream->task->c->aborted = 1; - apr_thread_cond_broadcast(m->task_thawed); + else { + /* never started */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, + H2_STRM_MSG(stream, "cleanup, never started, move to spurge")); + APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream; + } +} + +static h2_c2_transit *c2_transit_create(h2_mplx *m) +{ + apr_allocator_t *allocator; + apr_pool_t *ptrans; + h2_c2_transit *transit; + apr_status_t rv; + + /* We create a pool with its own allocator to be used for + * processing a request. This is the only way to have the processing + * independent of its parent pool in the sense that it can work in + * another thread. + */ + + rv = apr_allocator_create(&allocator); + if (rv == APR_SUCCESS) { + apr_allocator_max_free_set(allocator, ap_max_mem_free); + rv = apr_pool_create_ex(&ptrans, m->pool, NULL, allocator); + } + if (rv != APR_SUCCESS) { + /* maybe the log goes through, maybe not. */ + ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, + APLOGNO(10004) "h2_mplx: create transit pool"); + ap_abort_on_oom(); + return NULL; /* should never be reached. */ + } + + apr_allocator_owner_set(allocator, ptrans); + apr_pool_abort_set(abort_on_oom, ptrans); + apr_pool_tag(ptrans, "h2_c2_transit"); + + transit = apr_pcalloc(ptrans, sizeof(*transit)); + transit->pool = ptrans; + transit->bucket_alloc = apr_bucket_alloc_create(ptrans); + return transit; +} + +static void c2_transit_destroy(h2_c2_transit *transit) +{ + apr_pool_destroy(transit->pool); +} + +static h2_c2_transit *c2_transit_get(h2_mplx *m) +{ + h2_c2_transit **ptransit = apr_array_pop(m->c2_transits); + if (ptransit) { + return *ptransit; + } + return c2_transit_create(m); +} + +static void c2_transit_recycle(h2_mplx *m, h2_c2_transit *transit) +{ + if (m->c2_transits->nelts >= APR_INT32_MAX || + (apr_uint32_t)m->c2_transits->nelts >= m->max_spare_transits) { + c2_transit_destroy(transit); + } + else { + APR_ARRAY_PUSH(m->c2_transits, h2_c2_transit*) = transit; } } @@ -151,208 +255,118 @@ static void stream_cleanup(h2_mplx *m, h2_stream *stream) * their HTTP/1 cousins, the separate allocator seems to work better * than protecting a shared h2_session one with an own lock. */ -h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, - const h2_config *conf, - h2_workers *workers) +h2_mplx *h2_mplx_c1_create(int child_num, apr_uint32_t id, h2_stream *stream0, + server_rec *s, apr_pool_t *parent, + h2_workers *workers) { + h2_conn_ctx_t *conn_ctx; apr_status_t status = APR_SUCCESS; apr_allocator_t *allocator; - apr_thread_mutex_t *mutex; - h2_mplx *m; - h2_ctx *ctx = h2_ctx_get(c, 0); - ap_assert(conf); + apr_thread_mutex_t *mutex = NULL; + h2_mplx *m = NULL; m = apr_pcalloc(parent, sizeof(h2_mplx)); - if (m) { - m->id = c->id; - m->c = c; - m->s = (ctx? h2_ctx_server_get(ctx) : NULL); - if (!m->s) { - m->s = c->base_server; - } - - /* We create a pool with its own allocator to be used for - * processing slave connections. This is the only way to have the - * processing independant of its parent pool in the sense that it - * can work in another thread. Also, the new allocator needs its own - * mutex to synchronize sub-pools. - */ - status = apr_allocator_create(&allocator); - if (status != APR_SUCCESS) { - return NULL; - } - apr_allocator_max_free_set(allocator, ap_max_mem_free); - apr_pool_create_ex(&m->pool, parent, NULL, allocator); - if (!m->pool) { - apr_allocator_destroy(allocator); - return NULL; - } - apr_pool_tag(m->pool, "h2_mplx"); - apr_allocator_owner_set(allocator, m->pool); - status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, - m->pool); - if (status != APR_SUCCESS) { - apr_pool_destroy(m->pool); - return NULL; - } - apr_allocator_mutex_set(allocator, mutex); - - status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT, - m->pool); - if (status != APR_SUCCESS) { - apr_pool_destroy(m->pool); - return NULL; - } - - status = apr_thread_cond_create(&m->task_thawed, m->pool); - if (status != APR_SUCCESS) { - apr_pool_destroy(m->pool); - return NULL; - } - - m->max_streams = h2_config_geti(conf, H2_CONF_MAX_STREAMS); - m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); - - m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id)); - m->sredo = h2_ihash_create(m->pool, offsetof(h2_stream,id)); - m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id)); - m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id)); - m->q = h2_iq_create(m->pool, m->max_streams); - - status = h2_ififo_set_create(&m->readyq, m->pool, m->max_streams); - if (status != APR_SUCCESS) { - apr_pool_destroy(m->pool); - return NULL; - } + m->stream0 = stream0; + m->c1 = stream0->c2; + m->s = s; + m->child_num = child_num; + m->id = id; + + /* We create a pool with its own allocator to be used for + * processing secondary connections. This is the only way to have the + * processing independent of its parent pool in the sense that it + * can work in another thread. Also, the new allocator needs its own + * mutex to synchronize sub-pools. + */ + status = apr_allocator_create(&allocator); + if (status != APR_SUCCESS) { + allocator = NULL; + goto failure; + } + + apr_allocator_max_free_set(allocator, ap_max_mem_free); + apr_pool_create_ex(&m->pool, parent, NULL, allocator); + if (!m->pool) goto failure; + + apr_pool_tag(m->pool, "h2_mplx"); + apr_allocator_owner_set(allocator, m->pool); + + status = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, + m->pool); + if (APR_SUCCESS != status) goto failure; + apr_allocator_mutex_set(allocator, mutex); + + status = apr_thread_mutex_create(&m->lock, APR_THREAD_MUTEX_DEFAULT, + m->pool); + if (APR_SUCCESS != status) goto failure; + + m->max_streams = h2_config_sgeti(s, H2_CONF_MAX_STREAMS); + m->stream_max_mem = h2_config_sgeti(s, H2_CONF_STREAM_MAX_MEM); + + m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id)); + m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id)); + m->spurge = apr_array_make(m->pool, 10, sizeof(h2_stream*)); + m->q = h2_iq_create(m->pool, m->max_streams); + + m->workers = workers; + m->processing_max = H2MIN(h2_workers_get_max_workers(workers), m->max_streams); + m->processing_limit = 6; /* the original h1 max parallel connections */ + m->last_mood_change = apr_time_now(); + m->mood_update_interval = apr_time_from_msec(100); + + status = mplx_pollset_create(m); + if (APR_SUCCESS != status) { + ap_log_cerror(APLOG_MARK, APLOG_ERR, status, m->c1, APLOGNO(10308) + "nghttp2: could not create pollset"); + goto failure; + } + m->streams_ev_in = apr_array_make(m->pool, 10, sizeof(h2_stream*)); + m->streams_ev_out = apr_array_make(m->pool, 10, sizeof(h2_stream*)); + + m->streams_input_read = h2_iq_create(m->pool, 10); + m->streams_output_written = h2_iq_create(m->pool, 10); + status = apr_thread_mutex_create(&m->poll_lock, APR_THREAD_MUTEX_DEFAULT, + m->pool); + if (APR_SUCCESS != status) goto failure; + + conn_ctx = h2_conn_ctx_get(m->c1); + if (conn_ctx->pfd.reqevents) { + apr_pollset_add(m->pollset, &conn_ctx->pfd); + } + + m->max_spare_transits = 3; + m->c2_transits = apr_array_make(m->pool, (int)m->max_spare_transits, + sizeof(h2_c2_transit*)); + + m->producer = h2_workers_register(workers, m->pool, + apr_psprintf(m->pool, "h2-%u", + (unsigned int)m->id), + c2_prod_next, c2_prod_done, + workers_shutdown, m); + return m; - m->workers = workers; - m->max_active = workers->max_workers; - m->limit_active = 6; /* the original h1 max parallel connections */ - m->last_limit_change = m->last_idle_block = apr_time_now(); - m->limit_change_interval = apr_time_from_msec(100); - - m->spare_slaves = apr_array_make(m->pool, 10, sizeof(conn_rec*)); - - m->ngn_shed = h2_ngn_shed_create(m->pool, m->c, m->max_streams, - m->stream_max_mem); - h2_ngn_shed_set_ctx(m->ngn_shed , m); +failure: + if (m->pool) { + apr_pool_destroy(m->pool); } - return m; + else if (allocator) { + apr_allocator_destroy(allocator); + } + return NULL; } -int h2_mplx_shutdown(h2_mplx *m) +int h2_mplx_c1_shutdown(h2_mplx *m) { - int max_stream_started = 0; + int max_stream_id_started = 0; H2_MPLX_ENTER(m); - max_stream_started = m->max_stream_started; + max_stream_id_started = m->max_stream_id_started; /* Clear schedule queue, disabling existing streams from starting */ h2_iq_clear(m->q); H2_MPLX_LEAVE(m); - return max_stream_started; -} - -static int input_consumed_signal(h2_mplx *m, h2_stream *stream) -{ - if (stream->input) { - return h2_beam_report_consumption(stream->input); - } - return 0; -} - -static int report_consumption_iter(void *ctx, void *val) -{ - h2_stream *stream = val; - h2_mplx *m = ctx; - - input_consumed_signal(m, stream); - if (stream->state == H2_SS_CLOSED_L - && (!stream->task || stream->task->worker_done)) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, - H2_STRM_LOG(APLOGNO(10026), stream, "remote close missing")); - nghttp2_submit_rst_stream(stream->session->ngh2, NGHTTP2_FLAG_NONE, - stream->id, NGHTTP2_NO_ERROR); - } - return 1; -} - -static int output_consumed_signal(h2_mplx *m, h2_task *task) -{ - if (task->output.beam) { - return h2_beam_report_consumption(task->output.beam); - } - return 0; -} - -static int stream_destroy_iter(void *ctx, void *val) -{ - h2_mplx *m = ctx; - h2_stream *stream = val; - - h2_ihash_remove(m->spurge, stream->id); - ap_assert(stream->state == H2_SS_CLEANUP); - - if (stream->input) { - /* Process outstanding events before destruction */ - input_consumed_signal(m, stream); - h2_beam_log(stream->input, m->c, APLOG_TRACE2, "stream_destroy"); - h2_beam_destroy(stream->input); - stream->input = NULL; - } - - if (stream->task) { - h2_task *task = stream->task; - conn_rec *slave; - int reuse_slave = 0; - - stream->task = NULL; - slave = task->c; - if (slave) { - /* On non-serialized requests, the IO logging has not accounted for any - * meta data send over the network: response headers and h2 frame headers. we - * counted this on the stream and need to add this now. - * This is supposed to happen before the EOR bucket triggers the - * logging of the transaction. *fingers crossed* */ - if (task->request && !task->request->serialize && h2_task_logio_add_bytes_out) { - apr_off_t unaccounted = stream->out_frame_octets - stream->out_data_octets; - if (unaccounted > 0) { - h2_task_logio_add_bytes_out(slave, unaccounted); - } - } - - if (m->s->keep_alive_max == 0 || slave->keepalives < m->s->keep_alive_max) { - reuse_slave = ((m->spare_slaves->nelts < (m->limit_active * 3 / 2)) - && !task->rst_error); - } - - if (reuse_slave && slave->keepalive == AP_CONN_KEEPALIVE) { - h2_beam_log(task->output.beam, m->c, APLOG_DEBUG, - APLOGNO(03385) "h2_task_destroy, reuse slave"); - h2_task_destroy(task); - APR_ARRAY_PUSH(m->spare_slaves, conn_rec*) = slave; - } - else { - h2_beam_log(task->output.beam, m->c, APLOG_TRACE1, - "h2_task_destroy, destroy slave"); - h2_slave_destroy(slave); - } - } - } - h2_stream_destroy(stream); - return 0; -} - -static void purge_streams(h2_mplx *m, int lock) -{ - if (!h2_ihash_empty(m->spurge)) { - H2_MPLX_ENTER_MAYBE(m, lock); - while (!h2_ihash_iter(m->spurge, stream_destroy_iter, m)) { - /* repeat until empty */ - } - H2_MPLX_LEAVE_MAYBE(m, lock); - } + return max_stream_id_started; } typedef struct { @@ -360,13 +374,13 @@ typedef struct { void *ctx; } stream_iter_ctx_t; -static int stream_iter_wrap(void *ctx, void *stream) +static int m_stream_iter_wrap(void *ctx, void *stream) { stream_iter_ctx_t *x = ctx; return x->cb(stream, x->ctx); } -apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx) +apr_status_t h2_mplx_c1_streams_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx) { stream_iter_ctx_t x; @@ -374,276 +388,260 @@ apr_status_t h2_mplx_stream_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx) x.cb = cb; x.ctx = ctx; - h2_ihash_iter(m->streams, stream_iter_wrap, &x); + h2_ihash_iter(m->streams, m_stream_iter_wrap, &x); H2_MPLX_LEAVE(m); return APR_SUCCESS; } -static int report_stream_iter(void *ctx, void *val) { +typedef struct { + int stream_count; + int stream_want_send; +} stream_iter_aws_t; + +static int m_stream_want_send_data(void *ctx, void *stream) +{ + stream_iter_aws_t *x = ctx; + ++x->stream_count; + if (h2_stream_wants_send_data(stream)) + ++x->stream_want_send; + return 1; +} + +int h2_mplx_c1_all_streams_want_send_data(h2_mplx *m) +{ + stream_iter_aws_t x; + x.stream_count = 0; + x.stream_want_send = 0; + H2_MPLX_ENTER(m); + h2_ihash_iter(m->streams, m_stream_want_send_data, &x); + H2_MPLX_LEAVE(m); + return x.stream_count && (x.stream_count == x.stream_want_send); +} + +static int m_report_stream_iter(void *ctx, void *val) { h2_mplx *m = ctx; h2_stream *stream = val; - h2_task *task = stream->task; - if (APLOGctrace1(m->c)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - H2_STRM_MSG(stream, "started=%d, scheduled=%d, ready=%d, out_buffer=%ld"), - !!stream->task, stream->scheduled, h2_stream_is_ready(stream), - (long)h2_beam_get_buffered(stream->output)); - } - if (task) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */ + h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(stream->c2); + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1, + H2_STRM_MSG(stream, "started=%d, scheduled=%d, ready=%d, out_buffer=%ld"), + !!stream->c2, stream->scheduled, h2_stream_is_ready(stream), + (long)(stream->output? h2_beam_get_buffered(stream->output) : -1)); + if (conn_ctx) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */ H2_STRM_MSG(stream, "->03198: %s %s %s" - "[started=%d/done=%d/frozen=%d]"), - task->request->method, task->request->authority, - task->request->path, task->worker_started, - task->worker_done, task->frozen); + "[started=%u/done=%u]"), + conn_ctx->request->method, conn_ctx->request->authority, + conn_ctx->request->path, + apr_atomic_read32(&conn_ctx->started), + apr_atomic_read32(&conn_ctx->done)); } else { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, /* NO APLOGNO */ - H2_STRM_MSG(stream, "->03198: no task")); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, /* NO APLOGNO */ + H2_STRM_MSG(stream, "->03198: not started")); } return 1; } -static int unexpected_stream_iter(void *ctx, void *val) { +static int m_unexpected_stream_iter(void *ctx, void *val) { h2_mplx *m = ctx; h2_stream *stream = val; - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1, /* NO APLOGNO */ H2_STRM_MSG(stream, "unexpected, started=%d, scheduled=%d, ready=%d"), - !!stream->task, stream->scheduled, h2_stream_is_ready(stream)); + !!stream->c2, stream->scheduled, h2_stream_is_ready(stream)); return 1; } -static int stream_cancel_iter(void *ctx, void *val) { +static int m_stream_cancel_iter(void *ctx, void *val) { h2_mplx *m = ctx; h2_stream *stream = val; - /* disabled input consumed reporting */ - if (stream->input) { - h2_beam_on_consumed(stream->input, NULL, NULL, NULL); - } /* take over event monitoring */ h2_stream_set_monitor(stream, NULL); /* Reset, should transit to CLOSED state */ h2_stream_rst(stream, H2_ERR_NO_ERROR); /* All connection data has been sent, simulate cleanup */ h2_stream_dispatch(stream, H2_SEV_EOS_SENT); - stream_cleanup(m, stream); + m_stream_cleanup(m, stream); return 0; } -void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) +static void c1_purge_streams(h2_mplx *m); + +void h2_mplx_c1_destroy(h2_mplx *m) { apr_status_t status; - int i, wait_secs = 60; + unsigned int i, wait_secs = 60; + int old_aborted; + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, + H2_MPLX_MSG(m, "start release")); /* How to shut down a h2 connection: - * 0. abort and tell the workers that no more tasks will come from us */ - m->aborted = 1; - h2_workers_unregister(m->workers, m); - + * 0. abort and tell the workers that no more work will come from us */ + m->shutdown = m->aborted = 1; + H2_MPLX_ENTER_ALWAYS(m); + /* While really terminating any c2 connections, treat the master + * connection as aborted. It's not as if we could send any more data + * at this point. */ + old_aborted = m->c1->aborted; + m->c1->aborted = 1; + /* How to shut down a h2 connection: * 1. cancel all streams still active */ - while (!h2_ihash_iter(m->streams, stream_cancel_iter, m)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1, + H2_MPLX_MSG(m, "release, %u/%u/%d streams (total/hold/purge), %d streams"), + h2_ihash_count(m->streams), + h2_ihash_count(m->shold), + m->spurge->nelts, m->processing_count); + while (!h2_ihash_iter(m->streams, m_stream_cancel_iter, m)) { /* until empty */ } - /* 2. terminate ngn_shed, no more streams - * should be scheduled or in the active set */ - h2_ngn_shed_abort(m->ngn_shed); + /* 2. no more streams should be scheduled or in the active set */ ap_assert(h2_ihash_empty(m->streams)); ap_assert(h2_iq_empty(m->q)); /* 3. while workers are busy on this connection, meaning they - * are processing tasks from this connection, wait on them finishing + * are processing streams from this connection, wait on them finishing * in order to wake us and let us check again. - * Eventually, this has to succeed. */ - m->join_wait = wait; - for (i = 0; h2_ihash_count(m->shold) > 0; ++i) { - status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs)); + * Eventually, this has to succeed. */ + if (!m->join_wait) { + apr_thread_cond_create(&m->join_wait, m->pool); + } + + for (i = 0; h2_ihash_count(m->shold) > 0; ++i) { + status = apr_thread_cond_timedwait(m->join_wait, m->lock, apr_time_from_sec(wait_secs)); if (APR_STATUS_IS_TIMEUP(status)) { /* This can happen if we have very long running requests * that do not time out on IO. */ - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03198) - "h2_mplx(%ld): waited %d sec for %d tasks", - m->id, i*wait_secs, (int)h2_ihash_count(m->shold)); - h2_ihash_iter(m->shold, report_stream_iter, m); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, APLOGNO(03198) + H2_MPLX_MSG(m, "waited %u sec for %u streams"), + i*wait_secs, h2_ihash_count(m->shold)); + h2_ihash_iter(m->shold, m_report_stream_iter, m); } } - ap_assert(m->tasks_active == 0); - m->join_wait = NULL; - - /* 4. close the h2_req_enginge shed */ - h2_ngn_shed_destroy(m->ngn_shed); - m->ngn_shed = NULL; - + + H2_MPLX_LEAVE(m); + h2_workers_join(m->workers, m->producer); + H2_MPLX_ENTER_ALWAYS(m); + /* 4. With all workers done, all streams should be in spurge */ + ap_assert(m->processing_count == 0); if (!h2_ihash_empty(m->shold)) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03516) - "h2_mplx(%ld): unexpected %d streams in hold", - m->id, (int)h2_ihash_count(m->shold)); - h2_ihash_iter(m->shold, unexpected_stream_iter, m); + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1, APLOGNO(03516) + H2_MPLX_MSG(m, "unexpected %u streams in hold"), + h2_ihash_count(m->shold)); + h2_ihash_iter(m->shold, m_unexpected_stream_iter, m); } - + + c1_purge_streams(m); + + m->c1->aborted = old_aborted; H2_MPLX_LEAVE(m); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): released", m->id); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1, + H2_MPLX_MSG(m, "released")); } -apr_status_t h2_mplx_stream_cleanup(h2_mplx *m, h2_stream *stream) +apr_status_t h2_mplx_c1_stream_cleanup(h2_mplx *m, h2_stream *stream, + unsigned int *pstream_count) { H2_MPLX_ENTER(m); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, H2_STRM_MSG(stream, "cleanup")); - stream_cleanup(m, stream); - + m_stream_cleanup(m, stream); + *pstream_count = h2_ihash_count(m->streams); H2_MPLX_LEAVE(m); return APR_SUCCESS; } -h2_stream *h2_mplx_stream_get(h2_mplx *m, int id) +const h2_stream *h2_mplx_c2_stream_get(h2_mplx *m, int stream_id) { h2_stream *s = NULL; H2_MPLX_ENTER_ALWAYS(m); - - s = h2_ihash_get(m->streams, id); - + s = h2_ihash_get(m->streams, stream_id); H2_MPLX_LEAVE(m); + return s; } -static void output_produced(void *ctx, h2_bucket_beam *beam, apr_off_t bytes) -{ - h2_stream *stream = ctx; - h2_mplx *m = stream->session->mplx; - - check_data_for(m, stream, 1); -} -static apr_status_t out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) +static void c1_purge_streams(h2_mplx *m) { - apr_status_t status = APR_SUCCESS; - h2_stream *stream = h2_ihash_get(m->streams, stream_id); - - if (!stream || !stream->task || m->aborted) { - return APR_ECONNABORTED; - } - - ap_assert(stream->output == NULL); - stream->output = beam; - - if (APLOGctrace2(m->c)) { - h2_beam_log(beam, m->c, APLOG_TRACE2, "out_open"); - } - else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c, - "h2_mplx(%s): out open", stream->task->id); - } - - h2_beam_on_consumed(stream->output, NULL, stream_output_consumed, stream); - h2_beam_on_produced(stream->output, output_produced, stream); - if (stream->task->output.copy_files) { - h2_beam_on_file_beam(stream->output, h2_beam_no_files, NULL); - } - - /* we might see some file buckets in the output, see - * if we have enough handles reserved. */ - check_data_for(m, stream, 0); - return status; -} + h2_stream *stream; + int i; -apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_bucket_beam *beam) -{ - apr_status_t status; - - H2_MPLX_ENTER(m); + for (i = 0; i < m->spurge->nelts; ++i) { + stream = APR_ARRAY_IDX(m->spurge, i, h2_stream*); + ap_assert(stream->state == H2_SS_CLEANUP); - if (m->aborted) { - status = APR_ECONNABORTED; - } - else { - status = out_open(m, stream_id, beam); + if (stream->input) { + h2_beam_destroy(stream->input, m->c1); + stream->input = NULL; + } + if (stream->c2) { + conn_rec *c2 = stream->c2; + h2_conn_ctx_t *c2_ctx = h2_conn_ctx_get(c2); + h2_c2_transit *transit; + + stream->c2 = NULL; + ap_assert(c2_ctx); + transit = c2_ctx->transit; + h2_c2_destroy(c2); /* c2_ctx is gone as well */ + if (transit) { + c2_transit_recycle(m, transit); + } + } + h2_stream_destroy(stream); } - - H2_MPLX_LEAVE(m); - return status; + apr_array_clear(m->spurge); } -static apr_status_t out_close(h2_mplx *m, h2_task *task) +void h2_mplx_c1_going_keepalive(h2_mplx *m) { - apr_status_t status = APR_SUCCESS; - h2_stream *stream; - - if (!task) { - return APR_ECONNABORTED; - } - if (task->c) { - ++task->c->keepalives; - } - - stream = h2_ihash_get(m->streams, task->stream_id); - if (!stream) { - return APR_ECONNABORTED; + H2_MPLX_ENTER_ALWAYS(m); + if (m->spurge->nelts) { + c1_purge_streams(m); } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, - "h2_mplx(%s): close", task->id); - status = h2_beam_close(task->output.beam); - h2_beam_log(task->output.beam, m->c, APLOG_TRACE2, "out_close"); - output_consumed_signal(m, task); - check_data_for(m, stream, 0); - return status; + H2_MPLX_LEAVE(m); } -apr_status_t h2_mplx_out_trywait(h2_mplx *m, apr_interval_time_t timeout, - apr_thread_cond_t *iowait) +apr_status_t h2_mplx_c1_poll(h2_mplx *m, apr_interval_time_t timeout, + stream_ev_callback *on_stream_input, + stream_ev_callback *on_stream_output, + void *on_ctx) { - apr_status_t status; - + apr_status_t rv; + H2_MPLX_ENTER(m); if (m->aborted) { - status = APR_ECONNABORTED; - } - else if (h2_mplx_has_master_events(m)) { - status = APR_SUCCESS; - } - else { - purge_streams(m, 0); - h2_ihash_iter(m->streams, report_consumption_iter, m); - m->added_output = iowait; - status = apr_thread_cond_timedwait(m->added_output, m->lock, timeout); - if (APLOGctrace2(m->c)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): trywait on data for %f ms)", - m->id, timeout/1000.0); - } - m->added_output = NULL; + rv = APR_ECONNABORTED; + goto cleanup; + } + /* Purge (destroy) streams outside of pollset processing. + * Streams that are registered in the pollset, will be removed + * when they are destroyed, but the pollset works on copies + * of these registrations. So, if we destroy streams while + * processing pollset events, we might access freed memory. + */ + if (m->spurge->nelts) { + c1_purge_streams(m); } + rv = mplx_pollset_poll(m, timeout, on_stream_input, on_stream_output, on_ctx); +cleanup: H2_MPLX_LEAVE(m); - return status; -} - -static void check_data_for(h2_mplx *m, h2_stream *stream, int lock) -{ - if (h2_ififo_push(m->readyq, stream->id) == APR_SUCCESS) { - apr_atomic_set32(&m->event_pending, 1); - H2_MPLX_ENTER_MAYBE(m, lock); - if (m->added_output) { - apr_thread_cond_signal(m->added_output); - } - H2_MPLX_LEAVE_MAYBE(m, lock); - } + return rv; } -apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) +apr_status_t h2_mplx_c1_reprioritize(h2_mplx *m, h2_stream_pri_cmp_fn *cmp, + h2_session *session) { apr_status_t status; @@ -653,9 +651,9 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) status = APR_ECONNABORTED; } else { - h2_iq_sort(m->q, cmp, ctx); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): reprioritize tasks", m->id); + h2_iq_sort(m->q, cmp, session); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1, + H2_MPLX_MSG(m, "reprioritize streams")); status = APR_SUCCESS; } @@ -663,560 +661,435 @@ apr_status_t h2_mplx_reprioritize(h2_mplx *m, h2_stream_pri_cmp *cmp, void *ctx) return status; } -static void register_if_needed(h2_mplx *m) -{ - if (!m->aborted && !m->is_registered && !h2_iq_empty(m->q)) { - apr_status_t status = h2_workers_register(m->workers, m); - if (status == APR_SUCCESS) { - m->is_registered = 1; - } - else { - ap_log_cerror(APLOG_MARK, APLOG_ERR, status, m->c, APLOGNO(10021) - "h2_mplx(%ld): register at workers", m->id); - } - } -} - -apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream, - h2_stream_pri_cmp *cmp, void *ctx) +static apr_status_t c1_process_stream(h2_mplx *m, + h2_stream *stream, + h2_stream_pri_cmp_fn *cmp, + h2_session *session) { - apr_status_t status; - - H2_MPLX_ENTER(m); + apr_status_t rv = APR_SUCCESS; if (m->aborted) { - status = APR_ECONNABORTED; + rv = APR_ECONNABORTED; + goto cleanup; + } + if (!stream->request) { + rv = APR_EINVAL; + goto cleanup; + } + if (APLOGctrace1(m->c1)) { + const h2_request *r = stream->request; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1, + H2_STRM_MSG(stream, "process %s%s%s %s%s%s%s"), + r->protocol? r->protocol : "", + r->protocol? " " : "", + r->method, r->scheme? r->scheme : "", + r->scheme? "://" : "", + r->authority, r->path? r->path: ""); + } + + stream->scheduled = 1; + h2_ihash_add(m->streams, stream); + if (h2_stream_is_ready(stream)) { + /* already have a response */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1, + H2_STRM_MSG(stream, "process, ready already")); } else { - status = APR_SUCCESS; - h2_ihash_add(m->streams, stream); - if (h2_stream_is_ready(stream)) { - /* already have a response */ - check_data_for(m, stream, 0); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - H2_STRM_MSG(stream, "process, add to readyq")); - } - else { - h2_iq_add(m->q, stream->id, cmp, ctx); - register_if_needed(m); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - H2_STRM_MSG(stream, "process, added to q")); - } + /* last chance to set anything up before stream is processed + * by worker threads. */ + rv = h2_stream_prepare_processing(stream); + if (APR_SUCCESS != rv) goto cleanup; + h2_iq_add(m->q, stream->id, cmp, session); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1, + H2_STRM_MSG(stream, "process, added to q")); } - H2_MPLX_LEAVE(m); - return status; +cleanup: + return rv; } -static h2_task *next_stream_task(h2_mplx *m) +void h2_mplx_c1_process(h2_mplx *m, + h2_iqueue *ready_to_process, + h2_stream_get_fn *get_stream, + h2_stream_pri_cmp_fn *stream_pri_cmp, + h2_session *session, + unsigned int *pstream_count) { - h2_stream *stream; + apr_status_t rv; int sid; - while (!m->aborted && (m->tasks_active < m->limit_active) - && (sid = h2_iq_shift(m->q)) > 0) { - - stream = h2_ihash_get(m->streams, sid); - if (stream) { - conn_rec *slave, **pslave; - pslave = (conn_rec **)apr_array_pop(m->spare_slaves); - if (pslave) { - slave = *pslave; - slave->aborted = 0; - } - else { - slave = h2_slave_create(m->c, stream->id, m->pool); - } - - if (!stream->task) { + H2_MPLX_ENTER_ALWAYS(m); - if (sid > m->max_stream_started) { - m->max_stream_started = sid; - } - if (stream->input) { - h2_beam_on_consumed(stream->input, stream_input_ev, - stream_input_consumed, stream); - } - - stream->task = h2_task_create(slave, stream->id, - stream->request, m, stream->input, - stream->session->s->timeout, - m->stream_max_mem); - if (!stream->task) { - ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_ENOMEM, slave, - H2_STRM_LOG(APLOGNO(02941), stream, - "create task")); - return NULL; - } - + while ((sid = h2_iq_shift(ready_to_process)) > 0) { + h2_stream *stream = get_stream(session, sid); + if (stream) { + ap_assert(!stream->scheduled); + rv = c1_process_stream(session->mplx, stream, stream_pri_cmp, session); + if (APR_SUCCESS != rv) { + h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR); } - - ++m->tasks_active; - return stream->task; - } - } - return NULL; -} - -apr_status_t h2_mplx_pop_task(h2_mplx *m, h2_task **ptask) -{ - apr_status_t rv = APR_EOF; - - *ptask = NULL; - ap_assert(m); - ap_assert(m->lock); - - if (APR_SUCCESS != (rv = apr_thread_mutex_lock(m->lock))) { - return rv; - } - - if (m->aborted) { - rv = APR_EOF; - } - else { - *ptask = next_stream_task(m); - rv = (*ptask != NULL && !h2_iq_empty(m->q))? APR_EAGAIN : APR_SUCCESS; - } - if (APR_EAGAIN != rv) { - m->is_registered = 0; /* h2_workers will discard this mplx */ - } - H2_MPLX_LEAVE(m); - return rv; -} - -static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) -{ - h2_stream *stream; - - if (task->frozen) { - /* this task was handed over to an engine for processing - * and the original worker has finished. That means the - * engine may start processing now. */ - h2_task_thaw(task); - apr_thread_cond_broadcast(m->task_thawed); - return; - } - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): task(%s) done", m->id, task->id); - out_close(m, task); - - if (ngn) { - apr_off_t bytes = 0; - h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ); - bytes += h2_beam_get_buffered(task->output.beam); - if (bytes > 0) { - /* we need to report consumed and current buffered output - * to the engine. The request will be streamed out or cancelled, - * no more data is coming from it and the engine should update - * its calculations before we destroy this information. */ - h2_req_engine_out_consumed(ngn, task->c, bytes); - } - } - - if (task->engine) { - if (!m->aborted && !task->c->aborted - && !h2_req_engine_is_shutdown(task->engine)) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(10022) - "h2_mplx(%ld): task(%s) has not-shutdown " - "engine(%s)", m->id, task->id, - h2_req_engine_get_id(task->engine)); - } - h2_ngn_shed_done_ngn(m->ngn_shed, task->engine); - } - - task->worker_done = 1; - task->done_at = apr_time_now(); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%s): request done, %f ms elapsed", task->id, - (task->done_at - task->started_at) / 1000.0); - - if (task->started_at > m->last_idle_block) { - /* this task finished without causing an 'idle block', e.g. - * a block by flow control. - */ - if (task->done_at- m->last_limit_change >= m->limit_change_interval - && m->limit_active < m->max_active) { - /* Well behaving stream, allow it more workers */ - m->limit_active = H2MIN(m->limit_active * 2, - m->max_active); - m->last_limit_change = task->done_at; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): increase worker limit to %d", - m->id, m->limit_active); - } - } - - stream = h2_ihash_get(m->streams, task->stream_id); - if (stream) { - /* stream not done yet. */ - if (!m->aborted && h2_ihash_get(m->sredo, stream->id)) { - /* reset and schedule again */ - h2_task_redo(task); - h2_ihash_remove(m->sredo, stream->id); - h2_iq_add(m->q, stream->id, NULL, NULL); } else { - /* stream not cleaned up, stay around */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - H2_STRM_MSG(stream, "task_done, stream open")); - if (stream->input) { - h2_beam_leave(stream->input); - } - - /* more data will not arrive, resume the stream */ - check_data_for(m, stream, 0); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1, + H2_MPLX_MSG(m, "stream %d not found to process"), sid); } } - else if ((stream = h2_ihash_get(m->shold, task->stream_id)) != NULL) { - /* stream is done, was just waiting for this. */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - H2_STRM_MSG(stream, "task_done, in hold")); - if (stream->input) { - h2_beam_leave(stream->input); + if ((m->processing_count < m->processing_limit) && !h2_iq_empty(m->q)) { + H2_MPLX_LEAVE(m); + rv = h2_workers_activate(m->workers, m->producer); + H2_MPLX_ENTER_ALWAYS(m); + if (rv != APR_SUCCESS) { + ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, APLOGNO(10021) + H2_MPLX_MSG(m, "activate at workers")); } - stream_joined(m, stream); - } - else if ((stream = h2_ihash_get(m->spurge, task->stream_id)) != NULL) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, - H2_STRM_LOG(APLOGNO(03517), stream, "already in spurge")); - ap_assert("stream should not be in spurge" == NULL); } - else { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03518) - "h2_mplx(%s): task_done, stream not found", - task->id); - ap_assert("stream should still be available" == NULL); - } -} + *pstream_count = h2_ihash_count(m->streams); -void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask) -{ - H2_MPLX_ENTER_ALWAYS(m); +#if APR_POOL_DEBUG + do { + apr_size_t mem_g, mem_m, mem_s, mem_c1; - task_done(m, task, NULL); - --m->tasks_active; - - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } - if (ptask) { - /* caller wants another task */ - *ptask = next_stream_task(m); - } - register_if_needed(m); + mem_g = pchild? apr_pool_num_bytes(pchild, 1) : 0; + mem_m = apr_pool_num_bytes(m->pool, 1); + mem_s = apr_pool_num_bytes(session->pool, 1); + mem_c1 = apr_pool_num_bytes(m->c1->pool, 1); + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, m->c1, + H2_MPLX_MSG(m, "child mem=%ld, mplx mem=%ld, session mem=%ld, c1=%ld"), + (long)mem_g, (long)mem_m, (long)mem_s, (long)mem_c1); + + } while (0); +#endif H2_MPLX_LEAVE(m); } -/******************************************************************************* - * h2_mplx DoS protection - ******************************************************************************/ +static void c2_beam_input_write_notify(void *ctx, h2_bucket_beam *beam) +{ + conn_rec *c = ctx; + h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c); -static int latest_repeatable_unsubmitted_iter(void *data, void *val) + (void)beam; + if (conn_ctx && conn_ctx->stream_id && conn_ctx->pipe_in[H2_PIPE_IN]) { + apr_file_putc(1, conn_ctx->pipe_in[H2_PIPE_IN]); + } +} + +static void add_stream_poll_event(h2_mplx *m, int stream_id, h2_iqueue *q) { - stream_iter_ctx *ctx = data; - h2_stream *stream = val; - - if (stream->task && !stream->task->worker_done - && h2_task_can_redo(stream->task) - && !h2_ihash_get(ctx->m->sredo, stream->id)) { - if (!h2_stream_is_ready(stream)) { - /* this task occupies a worker, the response has not been submitted - * yet, not been cancelled and it is a repeatable request - * -> it can be re-scheduled later */ - if (!ctx->stream - || (ctx->stream->task->started_at < stream->task->started_at)) { - /* we did not have one or this one was started later */ - ctx->stream = stream; - } - } + apr_thread_mutex_lock(m->poll_lock); + if (h2_iq_append(q, stream_id) && h2_iq_count(q) == 1) { + /* newly added first */ + apr_pollset_wakeup(m->pollset); } - return 1; + apr_thread_mutex_unlock(m->poll_lock); } -static h2_stream *get_latest_repeatable_unsubmitted_stream(h2_mplx *m) +static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam) { - stream_iter_ctx ctx; - ctx.m = m; - ctx.stream = NULL; - h2_ihash_iter(m->streams, latest_repeatable_unsubmitted_iter, &ctx); - return ctx.stream; + conn_rec *c = ctx; + h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c); + + if (conn_ctx && conn_ctx->stream_id) { + add_stream_poll_event(conn_ctx->mplx, conn_ctx->stream_id, + conn_ctx->mplx->streams_input_read); + } } -static int timed_out_busy_iter(void *data, void *val) +static void c2_beam_input_read_eagain(void *ctx, h2_bucket_beam *beam) { - stream_iter_ctx *ctx = data; - h2_stream *stream = val; - if (stream->task && !stream->task->worker_done - && (ctx->now - stream->task->started_at) > stream->task->timeout) { - /* timed out stream occupying a worker, found */ - ctx->stream = stream; - return 0; + conn_rec *c = ctx; + h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c); + /* installed in the input bucket beams when we use pipes. + * Drain the pipe just before the beam returns APR_EAGAIN. + * A clean state for allowing polling on the pipe to rest + * when the beam is empty */ + if (conn_ctx && conn_ctx->pipe_in[H2_PIPE_OUT]) { + h2_util_drain_pipe(conn_ctx->pipe_in[H2_PIPE_OUT]); } - return 1; } -static h2_stream *get_timed_out_busy_stream(h2_mplx *m) +static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam) { - stream_iter_ctx ctx; - ctx.m = m; - ctx.stream = NULL; - ctx.now = apr_time_now(); - h2_ihash_iter(m->streams, timed_out_busy_iter, &ctx); - return ctx.stream; + conn_rec *c = ctx; + h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c); + + if (conn_ctx && conn_ctx->stream_id) { + add_stream_poll_event(conn_ctx->mplx, conn_ctx->stream_id, + conn_ctx->mplx->streams_output_written); + } } -static apr_status_t unschedule_slow_tasks(h2_mplx *m) +static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream, h2_c2_transit *transit) { - h2_stream *stream; - int n; - - /* Try to get rid of streams that occupy workers. Look for safe requests - * that are repeatable. If none found, fail the connection. - */ - n = (m->tasks_active - m->limit_active - (int)h2_ihash_count(m->sredo)); - while (n > 0 && (stream = get_latest_repeatable_unsubmitted_stream(m))) { - h2_task_rst(stream->task, H2_ERR_CANCEL); - h2_ihash_add(m->sredo, stream); - --n; + h2_conn_ctx_t *conn_ctx; + apr_status_t rv = APR_SUCCESS; + const char *action = "init"; + + rv = h2_conn_ctx_init_for_c2(&conn_ctx, c2, m, stream, transit); + if (APR_SUCCESS != rv) goto cleanup; + + if (!conn_ctx->beam_out) { + action = "create output beam"; + rv = h2_beam_create(&conn_ctx->beam_out, c2, conn_ctx->req_pool, + stream->id, "output", 0, c2->base_server->timeout); + if (APR_SUCCESS != rv) goto cleanup; + + h2_beam_buffer_size_set(conn_ctx->beam_out, m->stream_max_mem); + h2_beam_on_was_empty(conn_ctx->beam_out, c2_beam_output_write_notify, c2); } - - if ((m->tasks_active - h2_ihash_count(m->sredo)) > m->limit_active) { - h2_stream *stream = get_timed_out_busy_stream(m); - if (stream) { - /* Too many busy workers, unable to cancel enough streams - * and with a busy, timed out stream, we tell the client - * to go away... */ - return APR_TIMEUP; - } + + memset(&conn_ctx->pipe_in, 0, sizeof(conn_ctx->pipe_in)); + if (stream->input) { + conn_ctx->beam_in = stream->input; + h2_beam_on_send(stream->input, c2_beam_input_write_notify, c2); + h2_beam_on_received(stream->input, c2_beam_input_read_notify, c2); + h2_beam_on_consumed(stream->input, c1_input_consumed, stream); +#if H2_USE_PIPES + action = "create input write pipe"; + rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in[H2_PIPE_OUT], + &conn_ctx->pipe_in[H2_PIPE_IN], + APR_READ_BLOCK, + c2->pool, c2->pool); + if (APR_SUCCESS != rv) goto cleanup; +#endif + h2_beam_on_eagain(stream->input, c2_beam_input_read_eagain, c2); + if (!h2_beam_empty(stream->input)) + c2_beam_input_write_notify(c2, stream->input); + } + +cleanup: + stream->output = (APR_SUCCESS == rv)? conn_ctx->beam_out : NULL; + if (APR_SUCCESS != rv) { + ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c2, + H2_STRM_LOG(APLOGNO(10309), stream, + "error %s"), action); } - return APR_SUCCESS; + return rv; } -apr_status_t h2_mplx_idle(h2_mplx *m) +static conn_rec *s_next_c2(h2_mplx *m) { - apr_status_t status = APR_SUCCESS; - apr_time_t now; - apr_size_t scount; - - H2_MPLX_ENTER(m); + h2_stream *stream = NULL; + apr_status_t rv = APR_SUCCESS; + apr_uint32_t sid; + conn_rec *c2 = NULL; + h2_c2_transit *transit = NULL; - scount = h2_ihash_count(m->streams); - if (scount > 0) { - if (m->tasks_active) { - /* If we have streams in connection state 'IDLE', meaning - * all streams are ready to sent data out, but lack - * WINDOW_UPDATEs. - * - * This is ok, unless we have streams that still occupy - * h2 workers. As worker threads are a scarce resource, - * we need to take measures that we do not get DoSed. - * - * This is what we call an 'idle block'. Limit the amount - * of busy workers we allow for this connection until it - * well behaves. - */ - now = apr_time_now(); - m->last_idle_block = now; - if (m->limit_active > 2 - && now - m->last_limit_change >= m->limit_change_interval) { - if (m->limit_active > 16) { - m->limit_active = 16; - } - else if (m->limit_active > 8) { - m->limit_active = 8; - } - else if (m->limit_active > 4) { - m->limit_active = 4; - } - else if (m->limit_active > 2) { - m->limit_active = 2; - } - m->last_limit_change = now; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): decrease worker limit to %d", - m->id, m->limit_active); - } - - if (m->tasks_active > m->limit_active) { - status = unschedule_slow_tasks(m); - } - } - else if (!h2_iq_empty(m->q)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): idle, but %d streams to process", - m->id, (int)h2_iq_count(m->q)); - status = APR_EAGAIN; - } - else { - /* idle, have streams, but no tasks active. what are we waiting for? - * WINDOW_UPDATEs from client? */ - h2_stream *stream = NULL; - - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): idle, no tasks ongoing, %d streams", - m->id, (int)h2_ihash_count(m->streams)); - h2_ihash_shift(m->streams, (void**)&stream, 1); - if (stream) { - h2_ihash_add(m->streams, stream); - if (stream->output && !stream->out_checked) { - /* FIXME: this looks like a race between the session thinking - * it is idle and the EOF on a stream not being sent. - * Signal to caller to leave IDLE state. - */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - H2_STRM_MSG(stream, "output closed=%d, mplx idle" - ", out has %ld bytes buffered"), - h2_beam_is_closed(stream->output), - (long)h2_beam_get_buffered(stream->output)); - h2_ihash_add(m->streams, stream); - check_data_for(m, stream, 0); - stream->out_checked = 1; - status = APR_EAGAIN; - } - } + while (!m->aborted && !stream && (m->processing_count < m->processing_limit) + && (sid = h2_iq_shift(m->q)) > 0) { + stream = h2_ihash_get(m->streams, sid); + } + + if (!stream) { + if (m->processing_count >= m->processing_limit && !h2_iq_empty(m->q)) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, + H2_MPLX_MSG(m, "delaying request processing. " + "Current limit is %d and %d workers are in use."), + m->processing_limit, m->processing_count); } + goto cleanup; } - register_if_needed(m); - H2_MPLX_LEAVE(m); - return status; -} + if (sid > m->max_stream_id_started) { + m->max_stream_id_started = sid; + } -/******************************************************************************* - * HTTP/2 request engines - ******************************************************************************/ + transit = c2_transit_get(m); +#if AP_HAS_RESPONSE_BUCKETS + c2 = ap_create_secondary_connection(transit->pool, m->c1, transit->bucket_alloc); +#else + c2 = h2_c2_create(m->c1, transit->pool, transit->bucket_alloc); +#endif + if (!c2) goto cleanup; + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c1, + H2_STRM_MSG(stream, "created new c2")); -typedef struct { - h2_mplx * m; - h2_req_engine *ngn; - int streams_updated; -} ngn_update_ctx; + rv = c2_setup_io(m, c2, stream, transit); + if (APR_SUCCESS != rv) goto cleanup; -static int ngn_update_window(void *ctx, void *val) -{ - ngn_update_ctx *uctx = ctx; - h2_stream *stream = val; - if (stream->task && stream->task->assigned == uctx->ngn - && output_consumed_signal(uctx->m, stream->task)) { - ++uctx->streams_updated; + stream->c2 = c2; + ++m->processing_count; + +cleanup: + if (APR_SUCCESS != rv && c2) { + h2_c2_destroy(c2); + c2 = NULL; } - return 1; + if (transit && !c2) { + c2_transit_recycle(m, transit); + } + return c2; } -static apr_status_t ngn_out_update_windows(h2_mplx *m, h2_req_engine *ngn) +static conn_rec *c2_prod_next(void *baton, int *phas_more) { - ngn_update_ctx ctx; - - ctx.m = m; - ctx.ngn = ngn; - ctx.streams_updated = 0; - h2_ihash_iter(m->streams, ngn_update_window, &ctx); - - return ctx.streams_updated? APR_SUCCESS : APR_EAGAIN; + h2_mplx *m = baton; + conn_rec *c = NULL; + + H2_MPLX_ENTER_ALWAYS(m); + if (!m->aborted) { + c = s_next_c2(m); + *phas_more = (c != NULL && !h2_iq_empty(m->q)); + } + H2_MPLX_LEAVE(m); + return c; } -apr_status_t h2_mplx_req_engine_push(const char *ngn_type, - request_rec *r, - http2_req_engine_init *einit) +static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx) { - apr_status_t status; - h2_mplx *m; - h2_task *task; h2_stream *stream; + + ap_assert(conn_ctx); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c2, + "h2_mplx(%s-%d): c2 done", conn_ctx->id, conn_ctx->stream_id); + + AP_DEBUG_ASSERT(apr_atomic_read32(&conn_ctx->done) == 0); + apr_atomic_set32(&conn_ctx->done, 1); + conn_ctx->done_at = apr_time_now(); + ++c2->keepalives; + /* From here on, the final handling of c2 is done by c1 processing. + * Which means we can give it c1's scoreboard handle for updates. */ + c2->sbh = m->c1->sbh; + + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2, + "h2_mplx(%s-%d): request done, %f ms elapsed", + conn_ctx->id, conn_ctx->stream_id, + (conn_ctx->done_at - conn_ctx->started_at) / 1000.0); - task = h2_ctx_rget_task(r); - if (!task) { - return APR_ECONNABORTED; + if (!conn_ctx->has_final_response) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, conn_ctx->last_err, c2, + "h2_c2(%s-%d): processing finished without final response", + conn_ctx->id, conn_ctx->stream_id); + c2->aborted = 1; + if (conn_ctx->beam_out) + h2_beam_abort(conn_ctx->beam_out, c2); + } + else if (!conn_ctx->beam_out || !h2_beam_is_complete(conn_ctx->beam_out)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, conn_ctx->last_err, c2, + "h2_c2(%s-%d): processing finished with incomplete output", + conn_ctx->id, conn_ctx->stream_id); + c2->aborted = 1; + h2_beam_abort(conn_ctx->beam_out, c2); + } + else if (!c2->aborted) { + s_mplx_be_happy(m, c2, conn_ctx); } - m = task->mplx; - H2_MPLX_ENTER(m); - - stream = h2_ihash_get(m->streams, task->stream_id); + stream = h2_ihash_get(m->streams, conn_ctx->stream_id); if (stream) { - status = h2_ngn_shed_push_request(m->ngn_shed, ngn_type, r, einit); + /* stream not done yet. trigger a potential polling on the output + * since nothing more will happening here. */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2, + H2_STRM_MSG(stream, "c2_done, stream open")); + c2_beam_output_write_notify(c2, NULL); } - else { - status = APR_ECONNABORTED; + else if ((stream = h2_ihash_get(m->shold, conn_ctx->stream_id)) != NULL) { + /* stream is done, was just waiting for this. */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2, + H2_STRM_MSG(stream, "c2_done, in hold")); + c1c2_stream_joined(m, stream); } + else { + int i; + + for (i = 0; i < m->spurge->nelts; ++i) { + if (stream == APR_ARRAY_IDX(m->spurge, i, h2_stream*)) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c2, + H2_STRM_LOG(APLOGNO(03517), stream, "already in spurge")); + ap_assert("stream should not be in spurge" == NULL); + return; + } + } - H2_MPLX_LEAVE(m); - return status; + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c2, APLOGNO(03518) + "h2_mplx(%s-%d): c2_done, stream not found", + conn_ctx->id, conn_ctx->stream_id); + ap_assert("stream should still be available" == NULL); + } } -apr_status_t h2_mplx_req_engine_pull(h2_req_engine *ngn, - apr_read_type_e block, - int capacity, - request_rec **pr) -{ - h2_ngn_shed *shed = h2_ngn_shed_get_shed(ngn); - h2_mplx *m = h2_ngn_shed_get_ctx(shed); - apr_status_t status; - int want_shutdown; - - H2_MPLX_ENTER(m); +static void c2_prod_done(void *baton, conn_rec *c2) +{ + h2_mplx *m = baton; + h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c2); - want_shutdown = (block == APR_BLOCK_READ); + AP_DEBUG_ASSERT(conn_ctx); + H2_MPLX_ENTER_ALWAYS(m); - /* Take this opportunity to update output consummation - * for this engine */ - ngn_out_update_windows(m, ngn); - - if (want_shutdown && !h2_iq_empty(m->q)) { - /* For a blocking read, check first if requests are to be - * had and, if not, wait a short while before doing the - * blocking, and if unsuccessful, terminating read. - */ - status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr); - if (APR_STATUS_IS_EAGAIN(status)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): start block engine pull", m->id); - apr_thread_cond_timedwait(m->task_thawed, m->lock, - apr_time_from_msec(20)); - status = h2_ngn_shed_pull_request(shed, ngn, capacity, 1, pr); - } - } - else { - status = h2_ngn_shed_pull_request(shed, ngn, capacity, - want_shutdown, pr); - } + --m->processing_count; + s_c2_done(m, c2, conn_ctx); + if (m->join_wait) apr_thread_cond_signal(m->join_wait); H2_MPLX_LEAVE(m); - return status; } - -void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn, - apr_status_t status) + +static void workers_shutdown(void *baton, int graceful) { - h2_task *task = h2_ctx_cget_task(r_conn); - - if (task) { - h2_mplx *m = task->mplx; - h2_stream *stream; + h2_mplx *m = baton; + + apr_thread_mutex_lock(m->poll_lock); + /* time to wakeup and assess what to do */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, + H2_MPLX_MSG(m, "workers shutdown, waking pollset")); + m->shutdown = 1; + if (!graceful) { + m->aborted = 1; + } + apr_pollset_wakeup(m->pollset); + apr_thread_mutex_unlock(m->poll_lock); +} - H2_MPLX_ENTER_ALWAYS(m); +/******************************************************************************* + * h2_mplx DoS protection + ******************************************************************************/ - stream = h2_ihash_get(m->streams, task->stream_id); - - ngn_out_update_windows(m, ngn); - h2_ngn_shed_done_task(m->ngn_shed, ngn, task); - - if (status != APR_SUCCESS && stream - && h2_task_can_redo(task) - && !h2_ihash_get(m->sredo, stream->id)) { - h2_ihash_add(m->sredo, stream); - } +static void s_mplx_be_happy(h2_mplx *m, conn_rec *c, h2_conn_ctx_t *conn_ctx) +{ + apr_time_t now; - if (task->engine) { - /* cannot report that as done until engine returns */ - } - else { - task_done(m, task, ngn); + if (m->processing_limit < m->processing_max + && conn_ctx->started_at > m->last_mood_change) { + --m->irritations_since; + if (m->processing_limit < m->processing_max + && ((now = apr_time_now()) - m->last_mood_change >= m->mood_update_interval + || m->irritations_since < -m->processing_limit)) { + m->processing_limit = H2MIN(m->processing_limit * 2, m->processing_max); + m->last_mood_change = now; + m->irritations_since = 0; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, + H2_MPLX_MSG(m, "mood update, increasing worker limit to %d"), + m->processing_limit); } + } +} - H2_MPLX_LEAVE(m); +static void m_be_annoyed(h2_mplx *m) +{ + apr_time_t now; + + if (m->processing_limit > 2) { + ++m->irritations_since; + if (((now = apr_time_now()) - m->last_mood_change >= m->mood_update_interval) + || (m->irritations_since >= m->processing_limit)) { + + if (m->processing_limit > 16) { + m->processing_limit = 16; + } + else if (m->processing_limit > 8) { + m->processing_limit = 8; + } + else if (m->processing_limit > 4) { + m->processing_limit = 4; + } + else if (m->processing_limit > 2) { + m->processing_limit = 2; + } + m->last_mood_change = now; + m->irritations_since = 0; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1, + H2_MPLX_MSG(m, "mood update, decreasing worker limit to %d"), + m->processing_limit); + } } } @@ -1224,59 +1097,168 @@ void h2_mplx_req_engine_done(h2_req_engine *ngn, conn_rec *r_conn, * mplx master events dispatching ******************************************************************************/ -int h2_mplx_has_master_events(h2_mplx *m) +static int reset_is_acceptable(h2_stream *stream) { - return apr_atomic_read32(&m->event_pending) > 0; + /* client may terminate a stream via H2 RST_STREAM message at any time. + * This is annyoing when we have committed resources (e.g. worker threads) + * to it, so our mood (e.g. willingness to commit resources on this + * connection in the future) goes down. + * + * This is a DoS protection. We do not want to make it too easy for + * a client to eat up server resources. + * + * However: there are cases where a RST_STREAM is the only way to end + * a request. This includes websockets and server-side-event streams (SSEs). + * The responses to such requests continue forever otherwise. + * + */ + if (!stream_is_running(stream)) return 1; + if (!(stream->id & 0x01)) return 1; /* stream initiated by us. acceptable. */ + if (!stream->response) return 0; /* no response headers produced yet. bad. */ + if (!stream->out_data_frames) return 0; /* no response body data sent yet. bad. */ + return 1; /* otherwise, be forgiving */ } -apr_status_t h2_mplx_dispatch_master_events(h2_mplx *m, - stream_ev_callback *on_resume, - void *on_ctx) +apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id, h2_stream *stream) { - h2_stream *stream; - int n, id; - - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld): dispatch events", m->id); - apr_atomic_set32(&m->event_pending, 0); + apr_status_t status = APR_SUCCESS; + int registered; - /* update input windows for streams */ - h2_ihash_iter(m->streams, report_consumption_iter, m); - purge_streams(m, 1); - - n = h2_ififo_count(m->readyq); - while (n > 0 - && (h2_ififo_try_pull(m->readyq, &id) == APR_SUCCESS)) { - --n; - stream = h2_ihash_get(m->streams, id); - if (stream) { - on_resume(on_ctx, stream); - } + H2_MPLX_ENTER_ALWAYS(m); + registered = (h2_ihash_get(m->streams, stream_id) != NULL); + if (!stream) { + /* a RST might arrive so late, we have already forgotten + * about it. Seems ok. */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, + H2_MPLX_MSG(m, "RST on unknown stream %d"), stream_id); + AP_DEBUG_ASSERT(!registered); + } + else if (!registered) { + /* a RST on a stream that mplx has not been told about, but + * which the session knows. Very early and annoying. */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, + H2_STRM_MSG(stream, "very early RST, drop")); + h2_stream_set_monitor(stream, NULL); + h2_stream_rst(stream, H2_ERR_STREAM_CLOSED); + h2_stream_dispatch(stream, H2_SEV_EOS_SENT); + m_stream_cleanup(m, stream); + m_be_annoyed(m); + } + else if (!reset_is_acceptable(stream)) { + m_be_annoyed(m); } - - return APR_SUCCESS; + H2_MPLX_LEAVE(m); + return status; } -apr_status_t h2_mplx_keep_active(h2_mplx *m, h2_stream *stream) +static apr_status_t mplx_pollset_create(h2_mplx *m) { - check_data_for(m, stream, 1); - return APR_SUCCESS; + /* stream0 output only */ + return apr_pollset_create(&m->pollset, 1, m->pool, + APR_POLLSET_WAKEABLE); } -int h2_mplx_awaits_data(h2_mplx *m) +static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout, + stream_ev_callback *on_stream_input, + stream_ev_callback *on_stream_output, + void *on_ctx) { - int waiting = 1; - - H2_MPLX_ENTER_ALWAYS(m); + apr_status_t rv; + const apr_pollfd_t *results, *pfd; + apr_int32_t nresults, i; + h2_conn_ctx_t *conn_ctx; + h2_stream *stream; - if (h2_ihash_empty(m->streams)) { - waiting = 0; - } - else if (!m->tasks_active && !h2_ififo_count(m->readyq) - && h2_iq_empty(m->q)) { - waiting = 0; - } + /* Make sure we are not called recursively. */ + ap_assert(!m->polling); + m->polling = 1; + do { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, + H2_MPLX_MSG(m, "enter polling timeout=%d"), + (int)apr_time_sec(timeout)); + + apr_array_clear(m->streams_ev_in); + apr_array_clear(m->streams_ev_out); + + do { + /* add streams we started processing in the meantime */ + apr_thread_mutex_lock(m->poll_lock); + if (!h2_iq_empty(m->streams_input_read) + || !h2_iq_empty(m->streams_output_written)) { + while ((i = h2_iq_shift(m->streams_input_read))) { + stream = h2_ihash_get(m->streams, i); + if (stream) { + APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream; + } + } + while ((i = h2_iq_shift(m->streams_output_written))) { + stream = h2_ihash_get(m->streams, i); + if (stream) { + APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream; + } + } + nresults = 0; + rv = APR_SUCCESS; + apr_thread_mutex_unlock(m->poll_lock); + break; + } + apr_thread_mutex_unlock(m->poll_lock); + + H2_MPLX_LEAVE(m); + rv = apr_pollset_poll(m->pollset, timeout >= 0? timeout : -1, &nresults, &results); + H2_MPLX_ENTER_ALWAYS(m); + if (APR_STATUS_IS_EINTR(rv) && m->shutdown) { + if (!m->aborted) { + rv = APR_SUCCESS; + } + goto cleanup; + } + } while (APR_STATUS_IS_EINTR(rv)); - H2_MPLX_LEAVE(m); - return waiting; + if (APR_SUCCESS != rv) { + if (APR_STATUS_IS_TIMEUP(rv)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1, + H2_MPLX_MSG(m, "polling timed out ")); + } + else { + ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1, APLOGNO(10310) \ + H2_MPLX_MSG(m, "polling failed")); + } + goto cleanup; + } + + for (i = 0; i < nresults; i++) { + pfd = &results[i]; + conn_ctx = pfd->client_data; + + AP_DEBUG_ASSERT(conn_ctx); + if (conn_ctx->stream_id == 0) { + if (on_stream_input) { + APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = m->stream0; + } + continue; + } + } + + if (on_stream_input && m->streams_ev_in->nelts) { + H2_MPLX_LEAVE(m); + for (i = 0; i < m->streams_ev_in->nelts; ++i) { + on_stream_input(on_ctx, APR_ARRAY_IDX(m->streams_ev_in, i, h2_stream*)); + } + H2_MPLX_ENTER_ALWAYS(m); + } + if (on_stream_output && m->streams_ev_out->nelts) { + H2_MPLX_LEAVE(m); + for (i = 0; i < m->streams_ev_out->nelts; ++i) { + on_stream_output(on_ctx, APR_ARRAY_IDX(m->streams_ev_out, i, h2_stream*)); + } + H2_MPLX_ENTER_ALWAYS(m); + } + break; + } while(1); + +cleanup: + m->polling = 0; + return rv; } + |